Merge pull request #25270 from donnadionne/combo_policy

Change xds_cluster_resolver LB policy config to support combined loca…
pull/25363/head
donnadionne 4 years ago committed by GitHub
commit 5399e7e209
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 39
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  2. 239
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  3. 6
      src/core/ext/filters/client_channel/server_address.cc
  4. 31
      src/core/ext/filters/client_channel/server_address.h
  5. 98
      src/core/ext/xds/xds_api.cc
  6. 7
      src/core/ext/xds/xds_api.h
  7. 2
      test/cpp/end2end/xds_end2end_test.cc

@ -432,7 +432,7 @@ void CdsLb::OnClusterChanged(const std::string& name,
// that was scheduled before the deletion, so we can just ignore it.
auto it = watchers_.find(name);
if (it == watchers_.end()) return;
it->second.update = std::move(cluster_data);
it->second.update = cluster_data;
// Take care of integration with new certificate code.
grpc_error* error = GRPC_ERROR_NONE;
error = UpdateXdsCertificateProvider(name, it->second.update.value());
@ -448,21 +448,32 @@ void CdsLb::OnClusterChanged(const std::string& name,
if (GenerateDiscoveryMechanismForCluster(
config_->cluster(), &discovery_mechanisms, &clusters_needed)) {
// Construct config for child policy.
Json::Object xds_lb_policy;
if (cluster_data.lb_policy == "RING_HASH") {
std::string hash_function;
switch (cluster_data.hash_function) {
case XdsApi::CdsUpdate::HashFunction::XX_HASH:
hash_function = "XX_HASH";
break;
case XdsApi::CdsUpdate::HashFunction::MURMUR_HASH_2:
hash_function = "MURMUR_HASH_2";
break;
default:
GPR_ASSERT(0);
break;
}
xds_lb_policy["RING_HASH"] = Json::Object{
{"min_ring_size", cluster_data.min_ring_size},
{"max_ring_size", cluster_data.max_ring_size},
{"hash_function", hash_function},
};
} else {
xds_lb_policy["ROUND_ROBIN"] = Json::Object();
}
Json::Object child_config = {
{"localityPickingPolicy",
Json::Array{
Json::Object{
{"weighted_target_experimental",
Json::Object{
{"targets", Json::Object()},
}},
},
}},
{"endpointPickingPolicy",
{"xdsLbPolicy",
Json::Array{
Json::Object{
{"round_robin", Json::Object()},
},
xds_lb_policy,
}},
{"discoveryMechanisms", std::move(discovery_mechanisms)},
};

@ -82,28 +82,20 @@ class XdsClusterResolverLbConfig : public LoadBalancingPolicy::Config {
};
XdsClusterResolverLbConfig(
std::vector<DiscoveryMechanism> discovery_mechanisms,
Json locality_picking_policy, Json endpoint_picking_policy)
std::vector<DiscoveryMechanism> discovery_mechanisms, Json xds_lb_policy)
: discovery_mechanisms_(std::move(discovery_mechanisms)),
locality_picking_policy_(std::move(locality_picking_policy)),
endpoint_picking_policy_(std::move(endpoint_picking_policy)) {}
xds_lb_policy_(std::move(xds_lb_policy)) {}
const char* name() const override { return kXdsClusterResolver; }
const std::vector<DiscoveryMechanism>& discovery_mechanisms() const {
return discovery_mechanisms_;
}
const Json& locality_picking_policy() const {
return locality_picking_policy_;
}
const Json& endpoint_picking_policy() const {
return endpoint_picking_policy_;
}
const Json& xds_lb_policy() const { return xds_lb_policy_; }
private:
std::vector<DiscoveryMechanism> discovery_mechanisms_;
Json locality_picking_policy_;
Json endpoint_picking_policy_;
Json xds_lb_policy_;
};
// Xds Cluster Resolver LB policy.
@ -856,7 +848,11 @@ ServerAddressList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() {
MakeHierarchicalPathAttribute(hierarchical_path))
.WithAttribute(kXdsLocalityNameAttributeKey,
absl::make_unique<XdsLocalityAttribute>(
locality_name->Ref())));
locality_name->Ref()))
.WithAttribute(ServerAddressWeightAttribute::
kServerAddressWeightAttributeKey,
absl::make_unique<ServerAddressWeightAttribute>(
locality.lb_weight)));
}
}
}
@ -882,36 +878,61 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
child_policy = discovery_mechanisms_[discovery_index]
.discovery_mechanism->override_child_policy();
} else {
const auto& localities = priority_list_[priority].localities;
Json::Object weighted_targets;
for (const auto& p : localities) {
XdsLocalityName* locality_name = p.first;
const auto& locality = p.second;
// Construct JSON object containing locality name.
Json::Object locality_name_json;
if (!locality_name->region().empty()) {
locality_name_json["region"] = locality_name->region();
}
if (!locality_name->zone().empty()) {
locality_name_json["zone"] = locality_name->zone();
}
if (!locality_name->sub_zone().empty()) {
locality_name_json["subzone"] = locality_name->sub_zone();
const auto& xds_lb_policy = config_->xds_lb_policy().object_value();
if (xds_lb_policy.find("ROUND_ROBIN") != xds_lb_policy.end()) {
const auto& localities = priority_list_[priority].localities;
Json::Object weighted_targets;
for (const auto& p : localities) {
XdsLocalityName* locality_name = p.first;
const auto& locality = p.second;
// Construct JSON object containing locality name.
Json::Object locality_name_json;
if (!locality_name->region().empty()) {
locality_name_json["region"] = locality_name->region();
}
if (!locality_name->zone().empty()) {
locality_name_json["zone"] = locality_name->zone();
}
if (!locality_name->sub_zone().empty()) {
locality_name_json["subzone"] = locality_name->sub_zone();
}
// Add weighted target entry.
weighted_targets[locality_name->AsHumanReadableString()] =
Json::Object{
{"weight", locality.lb_weight},
{"childPolicy",
Json::Array{
Json::Object{
{"round_robin", Json::Object()},
},
}},
};
}
// Add weighted target entry.
weighted_targets[locality_name->AsHumanReadableString()] = Json::Object{
{"weight", locality.lb_weight},
{"childPolicy", config_->endpoint_picking_policy()},
// Construct locality-picking policy.
// Start with field from our config and add the "targets" field.
child_policy = Json::Array{
Json::Object{
{"weighted_target_experimental",
Json::Object{
{"targets", Json::Object()},
}},
},
};
Json::Object& config =
*(*child_policy.mutable_array())[0].mutable_object();
auto it = config.begin();
GPR_ASSERT(it != config.end());
(*it->second.mutable_object())["targets"] = std::move(weighted_targets);
} else {
auto it = xds_lb_policy.find("RING_HASH");
GPR_ASSERT(it != xds_lb_policy.end());
Json::Object ring_hash_experimental_policy = it->second.object_value();
child_policy = Json::Array{
Json::Object{
{"ring_hash_experimental", ring_hash_experimental_policy},
},
};
}
// Construct locality-picking policy.
// Start with field from our config and add the "targets" field.
child_policy = config_->locality_picking_policy();
Json::Object& config =
*(*child_policy.mutable_array())[0].mutable_object();
auto it = config.begin();
GPR_ASSERT(it != config.end());
(*it->second.mutable_object())["targets"] = std::move(weighted_targets);
}
// Wrap it in the drop policy.
Json::Array drop_categories;
@ -1132,58 +1153,104 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
discovery_mechanisms.emplace_back(std::move(discovery_mechanism));
}
}
// Locality-picking policy.
Json locality_picking_policy;
it = json.object_value().find("localityPickingPolicy");
if (it == json.object_value().end()) {
locality_picking_policy = Json::Array{
Json::Object{
{"weighted_target_experimental",
Json::Object{
{"targets", Json::Object()},
}},
},
};
} else {
locality_picking_policy = it->second;
}
grpc_error* parse_error = GRPC_ERROR_NONE;
if (LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
locality_picking_policy, &parse_error) == nullptr) {
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"localityPickingPolicy", &parse_error, 1));
GRPC_ERROR_UNREF(parse_error);
}
// Endpoint-picking policy. Called "childPolicy" for xds policy.
Json endpoint_picking_policy;
it = json.object_value().find("endpointPickingPolicy");
if (it == json.object_value().end()) {
endpoint_picking_policy = Json::Array{
Json::Object{
{"round_robin", Json::Object()},
},
};
} else {
endpoint_picking_policy = it->second;
}
parse_error = GRPC_ERROR_NONE;
if (LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
endpoint_picking_policy, &parse_error) == nullptr) {
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"endpointPickingPolicy", &parse_error, 1));
GRPC_ERROR_UNREF(parse_error);
}
if (discovery_mechanisms.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:discovery_mechanism error:list is missing or empty"));
}
Json xds_lb_policy = Json::Object{
{"ROUND_ROBIN", Json::Object()},
};
it = json.object_value().find("xdsLbPolicy");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::ARRAY) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:xdsLbPolicy error:type should be array"));
} else {
const Json::Array& array = it->second.array_value();
for (size_t i = 0; i < array.size(); ++i) {
if (array[i].type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:xdsLbPolicy error:element should be of type object"));
continue;
}
const Json::Object& policy = array[i].object_value();
auto policy_it = policy.find("ROUND_ROBIN");
if (policy_it != policy.end()) {
if (policy_it->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:ROUND_ROBIN error:type should be object"));
}
break;
}
policy_it = policy.find("RING_HASH");
if (policy_it != policy.end()) {
if (policy_it->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:RING_HASH error:type should be object"));
continue;
}
// TODO(donnadionne): Move this to a method in
// ring_hash_experimental and call it here.
const Json::Object& ring_hash = policy_it->second.object_value();
xds_lb_policy = array[i];
size_t min_ring_size = 1024;
size_t max_ring_size = 8388608;
auto ring_hash_it = ring_hash.find("min_ring_size");
if (ring_hash_it == ring_hash.end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:min_ring_size missing"));
} else if (ring_hash_it->second.type() != Json::Type::NUMBER) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:min_ring_size error: should be of "
"number"));
} else {
min_ring_size = gpr_parse_nonnegative_int(
ring_hash_it->second.string_value().c_str());
}
ring_hash_it = ring_hash.find("max_ring_size");
if (ring_hash_it == ring_hash.end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:max_ring_size missing"));
} else if (ring_hash_it->second.type() != Json::Type::NUMBER) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:max_ring_size error: should be of "
"number"));
} else {
max_ring_size = gpr_parse_nonnegative_int(
ring_hash_it->second.string_value().c_str());
}
if (min_ring_size <= 0 || min_ring_size > 8388608 ||
max_ring_size <= 0 || max_ring_size > 8388608 ||
min_ring_size > max_ring_size) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:max_ring_size and or min_ring_size error: "
"values need to be in the range of 1 to 8388608 "
"and max_ring_size cannot be smaller than "
"min_ring_size"));
}
ring_hash_it = ring_hash.find("hash_function");
if (ring_hash_it == ring_hash.end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:hash_function missing"));
} else if (ring_hash_it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:hash_function error: should be a "
"string"));
} else if (ring_hash_it->second.string_value() != "XX_HASH" &&
ring_hash_it->second.string_value() != "MURMUR_HASH_2") {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:hash_function error: unsupported "
"hash_function"));
}
break;
}
}
}
}
// Construct config.
if (error_list.empty()) {
return MakeRefCounted<XdsClusterResolverLbConfig>(
std::move(discovery_mechanisms), std::move(locality_picking_policy),
std::move(endpoint_picking_policy));
std::move(discovery_mechanisms), std::move(xds_lb_policy));
} else {
*error = GRPC_ERROR_CREATE_FROM_VECTOR(
"xds_cluster_resolver_experimental LB policy config", &error_list);

@ -31,6 +31,12 @@
namespace grpc_core {
//
// ServerAddressWeightAttribute
//
const char* ServerAddressWeightAttribute::kServerAddressWeightAttributeKey =
"server_address_weight";
//
// ServerAddress
//

@ -25,8 +25,10 @@
#include <memory>
#include "absl/container/inlined_vector.h"
#include "absl/strings/str_format.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/resolve_address.h"
namespace grpc_core {
@ -108,6 +110,35 @@ class ServerAddress {
typedef absl::InlinedVector<ServerAddress, 1> ServerAddressList;
//
// ServerAddressWeightAttribute
//
class ServerAddressWeightAttribute : public ServerAddress::AttributeInterface {
public:
static const char* kServerAddressWeightAttributeKey;
explicit ServerAddressWeightAttribute(uint32_t weight) : weight_(weight) {}
uint32_t weight() const { return weight_; }
std::unique_ptr<AttributeInterface> Copy() const override {
return absl::make_unique<ServerAddressWeightAttribute>(weight_);
}
int Cmp(const AttributeInterface* other) const override {
const auto* other_locality_attr =
static_cast<const ServerAddressWeightAttribute*>(other);
return GPR_ICMP(weight_, other_locality_attr->weight_);
}
std::string ToString() const override {
return absl::StrFormat("%d", weight_);
}
private:
uint32_t weight_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SERVER_ADDRESS_H */

@ -91,7 +91,7 @@
namespace grpc_core {
// TODO (donnadionne): Check to see if timeout is enabled, this will be
// TODO(donnadionne): Check to see if timeout is enabled, this will be
// removed once timeout feature is fully integration-tested and enabled by
// default.
bool XdsTimeoutEnabled() {
@ -102,7 +102,7 @@ bool XdsTimeoutEnabled() {
return parse_succeeded && parsed_value;
}
// TODO (donnadionne): Check to see if cluster types aggregate_cluster and
// TODO(donnadionne): Check to see if cluster types aggregate_cluster and
// logical_dns are enabled, this will be
// removed once the cluster types are fully integration-tested and enabled by
// default.
@ -115,6 +115,17 @@ bool XdsAggregateAndLogicalDnsClusterEnabled() {
return parse_succeeded && parsed_value;
}
// TODO(donnadionne): Check to see if ring hash policy is enabled, this will be
// removed once ring hash policy is fully integration-tested and enabled by
// default.
bool XdsRingHashEnabled() {
char* value = gpr_getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
bool parsed_value;
bool parse_succeeded = gpr_parse_bool_value(value, &parsed_value);
gpr_free(value);
return parse_succeeded && parsed_value;
}
// TODO(yashykt): Check to see if xDS security is enabled. This will be
// removed once this feature is fully integration-tested and enabled by
// default.
@ -1759,11 +1770,88 @@ grpc_error* CdsResponseParse(
}
}
// Check the LB policy.
if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) !=
if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) ==
envoy_config_cluster_v3_Cluster_ROUND_ROBIN) {
cds_update.lb_policy = "ROUND_ROBIN";
} else if (XdsRingHashEnabled() &&
envoy_config_cluster_v3_Cluster_lb_policy(cluster) ==
envoy_config_cluster_v3_Cluster_RING_HASH) {
cds_update.lb_policy = "RING_HASH";
// Record ring hash lb config
auto* ring_hash_config =
envoy_config_cluster_v3_Cluster_ring_hash_lb_config(cluster);
if (ring_hash_config == nullptr) {
errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat(cluster_name,
": ring hash lb config required but not present.")
.c_str()));
resource_names_failed->insert(cluster_name);
continue;
}
const google_protobuf_UInt64Value* max_ring_size =
envoy_config_cluster_v3_Cluster_RingHashLbConfig_maximum_ring_size(
ring_hash_config);
if (max_ring_size != nullptr) {
cds_update.max_ring_size =
google_protobuf_UInt64Value_value(max_ring_size);
if (cds_update.max_ring_size > 8388608 ||
cds_update.max_ring_size == 0) {
errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat(
cluster_name,
": max_ring_size is not in the range of 1 to 8388608.")
.c_str()));
resource_names_failed->insert(cluster_name);
continue;
}
}
const google_protobuf_UInt64Value* min_ring_size =
envoy_config_cluster_v3_Cluster_RingHashLbConfig_minimum_ring_size(
ring_hash_config);
if (min_ring_size != nullptr) {
cds_update.min_ring_size =
google_protobuf_UInt64Value_value(min_ring_size);
if (cds_update.min_ring_size > 8388608 ||
cds_update.min_ring_size == 0) {
errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat(
cluster_name,
": min_ring_size is not in the range of 1 to 8388608.")
.c_str()));
resource_names_failed->insert(cluster_name);
continue;
}
if (cds_update.min_ring_size > cds_update.max_ring_size) {
errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat(
cluster_name,
": min_ring_size cannot be greater than max_ring_size.")
.c_str()));
resource_names_failed->insert(cluster_name);
continue;
}
}
if (envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function(
ring_hash_config) ==
envoy_config_cluster_v3_Cluster_RingHashLbConfig_XX_HASH) {
cds_update.hash_function = XdsApi::CdsUpdate::HashFunction::XX_HASH;
} else if (
envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function(
ring_hash_config) ==
envoy_config_cluster_v3_Cluster_RingHashLbConfig_MURMUR_HASH_2) {
cds_update.hash_function =
XdsApi::CdsUpdate::HashFunction::MURMUR_HASH_2;
} else {
errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat(cluster_name,
": ring hash lb config has invalid hash function.")
.c_str()));
resource_names_failed->insert(cluster_name);
continue;
}
} else {
errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat(cluster_name, ": LB policy is not ROUND_ROBIN.")
.c_str()));
absl::StrCat(cluster_name, ": LB policy is not supported.").c_str()));
resource_names_failed->insert(cluster_name);
continue;
}

@ -242,6 +242,13 @@ class XdsApi {
// If set to the empty string, will use the same server we obtained the CDS
// data from.
absl::optional<std::string> lrs_load_reporting_server_name;
// The LB policy to use (e.g., "ROUND_ROBIN" or "RING_HASH").
std::string lb_policy;
// Used for RING_HASH LB policy only.
uint64_t min_ring_size = 1024;
uint64_t max_ring_size = 8388608;
enum HashFunction { XX_HASH, MURMUR_HASH_2 };
HashFunction hash_function;
// Maximum number of outstanding requests can be made to the upstream
// cluster.
uint32_t max_concurrent_requests = 1024;

@ -5693,7 +5693,7 @@ TEST_P(CdsTest, WrongLbPolicy) {
balancers_[0]->ads_service()->cds_response_state();
EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
EXPECT_THAT(response_state.error_message,
::testing::HasSubstr("LB policy is not ROUND_ROBIN."));
::testing::HasSubstr("LB policy is not supported."));
}
// Tests that CDS client should send a NACK if the lrs_server in CDS response is

Loading…
Cancel
Save