Merge pull request #24167 from markdroth/hierarchical_address_attributes

Store address hierarchy information in attributes instead of channel args.
pull/24173/head
Mark D. Roth 4 years ago committed by GitHub
commit 8787ee66d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 83
      src/core/ext/filters/client_channel/lb_policy/address_filtering.cc
  2. 12
      src/core/ext/filters/client_channel/lb_policy/address_filtering.h
  3. 7
      src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
  4. 80
      src/core/ext/filters/client_channel/server_address.cc
  5. 57
      src/core/ext/filters/client_channel/server_address.h
  6. 4
      src/core/ext/xds/xds_api.cc

@ -18,64 +18,77 @@
#include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "src/core/lib/channel/channel_args.h"
#define GRPC_ARG_HIERARCHICAL_PATH "grpc.internal.address.hierarchical_path"
namespace grpc_core {
const char* kHierarchicalPathAttributeKey = "hierarchical_path";
namespace {
void* HierarchicalPathCopy(void* p) {
std::vector<std::string>* path = static_cast<std::vector<std::string>*>(p);
return static_cast<void*>(new std::vector<std::string>(*path));
}
class HierarchicalPathAttribute : public ServerAddress::AttributeInterface {
public:
explicit HierarchicalPathAttribute(std::vector<std::string> path)
: path_(std::move(path)) {}
void HierarchicalPathDestroy(void* p) {
std::vector<std::string>* path = static_cast<std::vector<std::string>*>(p);
delete path;
}
std::unique_ptr<AttributeInterface> Copy() const override {
return absl::make_unique<HierarchicalPathAttribute>(path_);
}
int HierarchicalPathCompare(void* p1, void* p2) {
std::vector<std::string>* path1 = static_cast<std::vector<std::string>*>(p1);
std::vector<std::string>* path2 = static_cast<std::vector<std::string>*>(p2);
for (size_t i = 0; i < path1->size(); ++i) {
if (path2->size() == i) return 1;
int r = (*path1)[i].compare((*path2)[i]);
if (r != 0) return r;
int Cmp(const AttributeInterface* other) const override {
const std::vector<std::string>& other_path =
static_cast<const HierarchicalPathAttribute*>(other)->path_;
for (size_t i = 0; i < path_.size(); ++i) {
if (other_path.size() == i) return 1;
int r = path_[i].compare(other_path[i]);
if (r != 0) return r;
}
if (other_path.size() > path_.size()) return -1;
return 0;
}
if (path2->size() > path1->size()) return -1;
return 0;
}
const grpc_arg_pointer_vtable hierarchical_path_arg_vtable = {
HierarchicalPathCopy, HierarchicalPathDestroy, HierarchicalPathCompare};
std::string ToString() const override {
return absl::StrCat("[", absl::StrJoin(path_, ", "), "]");
}
const std::vector<std::string>& path() const { return path_; }
private:
std::vector<std::string> path_;
};
} // namespace
grpc_arg MakeHierarchicalPathArg(const std::vector<std::string>& path) {
return grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_HIERARCHICAL_PATH),
const_cast<std::vector<std::string>*>(&path),
&hierarchical_path_arg_vtable);
std::unique_ptr<ServerAddress::AttributeInterface>
MakeHierarchicalPathAttribute(std::vector<std::string> path) {
return absl::make_unique<HierarchicalPathAttribute>(std::move(path));
}
HierarchicalAddressMap MakeHierarchicalAddressMap(
const ServerAddressList& addresses) {
HierarchicalAddressMap result;
for (const ServerAddress& address : addresses) {
auto* path = grpc_channel_args_find_pointer<std::vector<std::string>>(
address.args(), GRPC_ARG_HIERARCHICAL_PATH);
if (path == nullptr || path->empty()) continue;
auto it = path->begin();
const HierarchicalPathAttribute* path_attribute =
static_cast<const HierarchicalPathAttribute*>(
address.GetAttribute(kHierarchicalPathAttributeKey));
if (path_attribute == nullptr) continue;
const std::vector<std::string>& path = path_attribute->path();
auto it = path.begin();
ServerAddressList& target_list = result[*it];
std::unique_ptr<HierarchicalPathAttribute> new_attribute;
++it;
std::vector<std::string> remaining_path(it, path->end());
const char* name_to_remove = GRPC_ARG_HIERARCHICAL_PATH;
grpc_arg new_arg = MakeHierarchicalPathArg(remaining_path);
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
address.args(), &name_to_remove, 1, &new_arg, 1);
target_list.emplace_back(address.address(), new_args);
if (it != path.end()) {
std::vector<std::string> remaining_path(it, path.end());
new_attribute = absl::make_unique<HierarchicalPathAttribute>(
std::move(remaining_path));
}
target_list.emplace_back(address.WithAttribute(
kHierarchicalPathAttributeKey, std::move(new_attribute)));
}
return result;
}

@ -23,8 +23,6 @@
#include <string>
#include <vector>
#include "absl/strings/string_view.h"
#include "src/core/ext/filters/client_channel/server_address.h"
// The resolver returns a flat list of addresses. When a hierarchy of
@ -81,9 +79,13 @@
namespace grpc_core {
// Constructs a channel arg containing the hierarchical path
// to be associated with an address.
grpc_arg MakeHierarchicalPathArg(const std::vector<std::string>& path);
// The attribute key to be used for hierarchical paths in ServerAddress.
extern const char* kHierarchicalPathAttributeKey;
// Constructs an address attribute containing the hierarchical path
// to be associated with the address.
std::unique_ptr<ServerAddress::AttributeInterface>
MakeHierarchicalPathAttribute(std::vector<std::string> path);
// A map from the next path element to the addresses that fall under
// that path element.

@ -591,10 +591,9 @@ ServerAddressList EdsLb::CreateChildPolicyAddressesLocked() {
std::vector<std::string> hierarchical_path = {
priority_child_name, locality_name->AsHumanReadableString()};
for (const auto& endpoint : locality.endpoints) {
grpc_arg new_arg = MakeHierarchicalPathArg(hierarchical_path);
grpc_channel_args* args =
grpc_channel_args_copy_and_add(endpoint.args(), &new_arg, 1);
addresses.emplace_back(endpoint.address(), args);
addresses.emplace_back(endpoint.WithAttribute(
kHierarchicalPathAttributeKey,
MakeHierarchicalPathAttribute(hierarchical_path)));
}
}
}

@ -20,6 +20,15 @@
#include "src/core/ext/filters/client_channel/server_address.h"
#include <memory>
#include <string>
#include <vector>
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
namespace grpc_core {
//
@ -39,6 +48,38 @@ ServerAddress::ServerAddress(
address_.len = static_cast<socklen_t>(address_len);
}
ServerAddress::ServerAddress(const ServerAddress& other)
: address_(other.address_), args_(grpc_channel_args_copy(other.args_)) {
for (const auto& p : other.attributes_) {
attributes_[p.first] = p.second->Copy();
}
}
ServerAddress& ServerAddress::operator=(const ServerAddress& other) {
address_ = other.address_;
grpc_channel_args_destroy(args_);
args_ = grpc_channel_args_copy(other.args_);
attributes_.clear();
for (const auto& p : other.attributes_) {
attributes_[p.first] = p.second->Copy();
}
return *this;
}
ServerAddress::ServerAddress(ServerAddress&& other) noexcept
: address_(other.address_),
args_(other.args_),
attributes_(std::move(other.attributes_)) {
other.args_ = nullptr;
}
ServerAddress& ServerAddress::operator=(ServerAddress&& other) noexcept {
address_ = other.address_;
grpc_channel_args_destroy(args_);
args_ = other.args_;
other.args_ = nullptr;
attributes_ = std::move(other.attributes_);
return *this;
}
namespace {
int CompareAttributes(
@ -78,4 +119,43 @@ int ServerAddress::Cmp(const ServerAddress& other) const {
return CompareAttributes(attributes_, other.attributes_);
}
const ServerAddress::AttributeInterface* ServerAddress::GetAttribute(
const char* key) const {
auto it = attributes_.find(key);
if (it == attributes_.end()) return nullptr;
return it->second.get();
}
// Returns a copy of the address with a modified attribute.
// If the new value is null, the attribute is removed.
ServerAddress ServerAddress::WithAttribute(
const char* key, std::unique_ptr<AttributeInterface> value) const {
ServerAddress address = *this;
if (value == nullptr) {
address.attributes_.erase(key);
} else {
address.attributes_[key] = std::move(value);
}
return address;
}
std::string ServerAddress::ToString() const {
std::vector<std::string> parts = {
grpc_sockaddr_to_string(&address_, false),
};
if (args_ != nullptr) {
parts.emplace_back(
absl::StrCat("args={", grpc_channel_args_string(args_), "}"));
}
if (!attributes_.empty()) {
std::vector<std::string> attrs;
for (const auto& p : attributes_) {
attrs.emplace_back(absl::StrCat(p.first, "=", p.second->ToString()));
}
parts.emplace_back(
absl::StrCat("attributes={", absl::StrJoin(attrs, ", "), "}"));
}
return absl::StrJoin(parts, " ");
}
} // namespace grpc_core

@ -22,6 +22,7 @@
#include <grpc/support/port_platform.h>
#include <map>
#include <memory>
#include "absl/container/inlined_vector.h"
@ -42,15 +43,22 @@ class ServerAddress {
// Base class for resolver-supplied attributes.
// Unlike channel args, these attributes don't affect subchannel
// uniqueness or behavior. They are for use by LB policies only.
//
// Attributes are keyed by a C string that is unique by address, not
// by value. All attributes added with the same key must be of the
// same type.
class AttributeInterface {
public:
virtual ~AttributeInterface();
virtual ~AttributeInterface() = default;
// Creates a copy of the attribute.
virtual std::unique_ptr<AttributeInterface> Copy() const = 0;
// Compares this attribute with another.
virtual int Cmp(const AttributeInterface* other) const = 0;
// Returns a human-readable representation of the attribute.
virtual std::string ToString() const = 0;
};
// Takes ownership of args.
@ -65,38 +73,12 @@ class ServerAddress {
~ServerAddress() { grpc_channel_args_destroy(args_); }
// Copyable.
ServerAddress(const ServerAddress& other)
: address_(other.address_), args_(grpc_channel_args_copy(other.args_)) {
for (const auto& p : other.attributes_) {
attributes_[p.first] = p.second->Copy();
}
}
ServerAddress& operator=(const ServerAddress& other) {
address_ = other.address_;
grpc_channel_args_destroy(args_);
args_ = grpc_channel_args_copy(other.args_);
attributes_.clear();
for (const auto& p : other.attributes_) {
attributes_[p.first] = p.second->Copy();
}
return *this;
}
ServerAddress(const ServerAddress& other);
ServerAddress& operator=(const ServerAddress& other);
// Movable.
ServerAddress(ServerAddress&& other)
: address_(other.address_),
args_(other.args_),
attributes_(std::move(other.attributes_)) {
other.args_ = nullptr;
}
ServerAddress& operator=(ServerAddress&& other) {
address_ = other.address_;
grpc_channel_args_destroy(args_);
args_ = other.args_;
other.args_ = nullptr;
attributes_ = std::move(other.attributes_);
return *this;
}
ServerAddress(ServerAddress&& other) noexcept;
ServerAddress& operator=(ServerAddress&& other) noexcept;
bool operator==(const ServerAddress& other) const { return Cmp(other) == 0; }
@ -105,11 +87,14 @@ class ServerAddress {
const grpc_resolved_address& address() const { return address_; }
const grpc_channel_args* args() const { return args_; }
const AttributeInterface* GetAttribute(const char* key) const {
auto it = attributes_.find(key);
if (it == attributes_.end()) return nullptr;
return it->second.get();
}
const AttributeInterface* GetAttribute(const char* key) const;
// Returns a copy of the address with a modified attribute.
// If the new value is null, the attribute is removed.
ServerAddress WithAttribute(const char* key,
std::unique_ptr<AttributeInterface> value) const;
std::string ToString() const;
private:
grpc_resolved_address address_;

@ -381,9 +381,7 @@ XdsApi::RdsUpdate::VirtualHost* XdsApi::RdsUpdate::FindVirtualHostForDomain(
std::string XdsApi::EdsUpdate::Priority::Locality::ToString() const {
std::vector<std::string> endpoint_strings;
for (const ServerAddress& endpoint : endpoints) {
endpoint_strings.emplace_back(
absl::StrCat(grpc_sockaddr_to_string(&endpoint.address(), false), "{",
grpc_channel_args_string(endpoint.args()), "}"));
endpoint_strings.emplace_back(endpoint.ToString());
}
return absl::StrCat("{name=", name->AsHumanReadableString(),
", lb_weight=", lb_weight, ", endpoints=[",

Loading…
Cancel
Save