From cdd8a4ba87881a04669bc54fe824bad88cc285fb Mon Sep 17 00:00:00 2001 From: Donna Dionne Date: Thu, 4 Feb 2021 22:42:52 -0800 Subject: [PATCH] Change xds_cluster_resolver LB policy config to support combined locality and endpoint picking policies --- .../client_channel/lb_policy/xds/cds.cc | 39 ++- .../lb_policy/xds/xds_cluster_resolver.cc | 239 +++++++++++------- .../filters/client_channel/server_address.cc | 6 + .../filters/client_channel/server_address.h | 31 +++ src/core/ext/xds/xds_api.cc | 98 ++++++- src/core/ext/xds/xds_api.h | 7 + test/cpp/end2end/xds_end2end_test.cc | 2 +- 7 files changed, 316 insertions(+), 106 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index d65f43f698d..46cdb32f72a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -432,7 +432,7 @@ void CdsLb::OnClusterChanged(const std::string& name, // that was scheduled before the deletion, so we can just ignore it. auto it = watchers_.find(name); if (it == watchers_.end()) return; - it->second.update = std::move(cluster_data); + it->second.update = cluster_data; // Take care of integration with new certificate code. grpc_error* error = GRPC_ERROR_NONE; error = UpdateXdsCertificateProvider(name, it->second.update.value()); @@ -448,21 +448,32 @@ void CdsLb::OnClusterChanged(const std::string& name, if (GenerateDiscoveryMechanismForCluster( config_->cluster(), &discovery_mechanisms, &clusters_needed)) { // Construct config for child policy. + Json::Object xds_lb_policy; + if (cluster_data.lb_policy == "RING_HASH") { + std::string hash_function; + switch (cluster_data.hash_function) { + case XdsApi::CdsUpdate::HashFunction::XX_HASH: + hash_function = "XX_HASH"; + break; + case XdsApi::CdsUpdate::HashFunction::MURMUR_HASH_2: + hash_function = "MURMUR_HASH_2"; + break; + default: + GPR_ASSERT(0); + break; + } + xds_lb_policy["RING_HASH"] = Json::Object{ + {"min_ring_size", cluster_data.min_ring_size}, + {"max_ring_size", cluster_data.max_ring_size}, + {"hash_function", hash_function}, + }; + } else { + xds_lb_policy["ROUND_ROBIN"] = Json::Object(); + } Json::Object child_config = { - {"localityPickingPolicy", - Json::Array{ - Json::Object{ - {"weighted_target_experimental", - Json::Object{ - {"targets", Json::Object()}, - }}, - }, - }}, - {"endpointPickingPolicy", + {"xdsLbPolicy", Json::Array{ - Json::Object{ - {"round_robin", Json::Object()}, - }, + xds_lb_policy, }}, {"discoveryMechanisms", std::move(discovery_mechanisms)}, }; diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc index 1578ccd56c4..e9e0d0ef007 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc @@ -82,28 +82,20 @@ class XdsClusterResolverLbConfig : public LoadBalancingPolicy::Config { }; XdsClusterResolverLbConfig( - std::vector discovery_mechanisms, - Json locality_picking_policy, Json endpoint_picking_policy) + std::vector discovery_mechanisms, Json xds_lb_policy) : discovery_mechanisms_(std::move(discovery_mechanisms)), - locality_picking_policy_(std::move(locality_picking_policy)), - endpoint_picking_policy_(std::move(endpoint_picking_policy)) {} + xds_lb_policy_(std::move(xds_lb_policy)) {} const char* name() const override { return kXdsClusterResolver; } - const std::vector& discovery_mechanisms() const { return discovery_mechanisms_; } - const Json& locality_picking_policy() const { - return locality_picking_policy_; - } - const Json& endpoint_picking_policy() const { - return endpoint_picking_policy_; - } + + const Json& xds_lb_policy() const { return xds_lb_policy_; } private: std::vector discovery_mechanisms_; - Json locality_picking_policy_; - Json endpoint_picking_policy_; + Json xds_lb_policy_; }; // Xds Cluster Resolver LB policy. @@ -856,7 +848,11 @@ ServerAddressList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() { MakeHierarchicalPathAttribute(hierarchical_path)) .WithAttribute(kXdsLocalityNameAttributeKey, absl::make_unique( - locality_name->Ref()))); + locality_name->Ref())) + .WithAttribute(ServerAddressWeightAttribute:: + kServerAddressWeightAttributeKey, + absl::make_unique( + locality.lb_weight))); } } } @@ -882,36 +878,61 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() { child_policy = discovery_mechanisms_[discovery_index] .discovery_mechanism->override_child_policy(); } else { - const auto& localities = priority_list_[priority].localities; - Json::Object weighted_targets; - for (const auto& p : localities) { - XdsLocalityName* locality_name = p.first; - const auto& locality = p.second; - // Construct JSON object containing locality name. - Json::Object locality_name_json; - if (!locality_name->region().empty()) { - locality_name_json["region"] = locality_name->region(); - } - if (!locality_name->zone().empty()) { - locality_name_json["zone"] = locality_name->zone(); - } - if (!locality_name->sub_zone().empty()) { - locality_name_json["subzone"] = locality_name->sub_zone(); + const auto& xds_lb_policy = config_->xds_lb_policy().object_value(); + if (xds_lb_policy.find("ROUND_ROBIN") != xds_lb_policy.end()) { + const auto& localities = priority_list_[priority].localities; + Json::Object weighted_targets; + for (const auto& p : localities) { + XdsLocalityName* locality_name = p.first; + const auto& locality = p.second; + // Construct JSON object containing locality name. + Json::Object locality_name_json; + if (!locality_name->region().empty()) { + locality_name_json["region"] = locality_name->region(); + } + if (!locality_name->zone().empty()) { + locality_name_json["zone"] = locality_name->zone(); + } + if (!locality_name->sub_zone().empty()) { + locality_name_json["subzone"] = locality_name->sub_zone(); + } + // Add weighted target entry. + weighted_targets[locality_name->AsHumanReadableString()] = + Json::Object{ + {"weight", locality.lb_weight}, + {"childPolicy", + Json::Array{ + Json::Object{ + {"round_robin", Json::Object()}, + }, + }}, + }; } - // Add weighted target entry. - weighted_targets[locality_name->AsHumanReadableString()] = Json::Object{ - {"weight", locality.lb_weight}, - {"childPolicy", config_->endpoint_picking_policy()}, + // Construct locality-picking policy. + // Start with field from our config and add the "targets" field. + child_policy = Json::Array{ + Json::Object{ + {"weighted_target_experimental", + Json::Object{ + {"targets", Json::Object()}, + }}, + }, + }; + Json::Object& config = + *(*child_policy.mutable_array())[0].mutable_object(); + auto it = config.begin(); + GPR_ASSERT(it != config.end()); + (*it->second.mutable_object())["targets"] = std::move(weighted_targets); + } else { + auto it = xds_lb_policy.find("RING_HASH"); + GPR_ASSERT(it != xds_lb_policy.end()); + Json::Object ring_hash_experimental_policy = it->second.object_value(); + child_policy = Json::Array{ + Json::Object{ + {"ring_hash_experimental", ring_hash_experimental_policy}, + }, }; } - // Construct locality-picking policy. - // Start with field from our config and add the "targets" field. - child_policy = config_->locality_picking_policy(); - Json::Object& config = - *(*child_policy.mutable_array())[0].mutable_object(); - auto it = config.begin(); - GPR_ASSERT(it != config.end()); - (*it->second.mutable_object())["targets"] = std::move(weighted_targets); } // Wrap it in the drop policy. Json::Array drop_categories; @@ -1132,58 +1153,104 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory { discovery_mechanisms.emplace_back(std::move(discovery_mechanism)); } } - // Locality-picking policy. - Json locality_picking_policy; - it = json.object_value().find("localityPickingPolicy"); - if (it == json.object_value().end()) { - locality_picking_policy = Json::Array{ - Json::Object{ - {"weighted_target_experimental", - Json::Object{ - {"targets", Json::Object()}, - }}, - }, - }; - } else { - locality_picking_policy = it->second; - } - grpc_error* parse_error = GRPC_ERROR_NONE; - if (LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( - locality_picking_policy, &parse_error) == nullptr) { - GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); - error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "localityPickingPolicy", &parse_error, 1)); - GRPC_ERROR_UNREF(parse_error); - } - // Endpoint-picking policy. Called "childPolicy" for xds policy. - Json endpoint_picking_policy; - it = json.object_value().find("endpointPickingPolicy"); - if (it == json.object_value().end()) { - endpoint_picking_policy = Json::Array{ - Json::Object{ - {"round_robin", Json::Object()}, - }, - }; - } else { - endpoint_picking_policy = it->second; - } - parse_error = GRPC_ERROR_NONE; - if (LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( - endpoint_picking_policy, &parse_error) == nullptr) { - GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); - error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "endpointPickingPolicy", &parse_error, 1)); - GRPC_ERROR_UNREF(parse_error); - } if (discovery_mechanisms.empty()) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:discovery_mechanism error:list is missing or empty")); } + Json xds_lb_policy = Json::Object{ + {"ROUND_ROBIN", Json::Object()}, + }; + it = json.object_value().find("xdsLbPolicy"); + if (it != json.object_value().end()) { + if (it->second.type() != Json::Type::ARRAY) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:xdsLbPolicy error:type should be array")); + } else { + const Json::Array& array = it->second.array_value(); + for (size_t i = 0; i < array.size(); ++i) { + if (array[i].type() != Json::Type::OBJECT) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:xdsLbPolicy error:element should be of type object")); + continue; + } + const Json::Object& policy = array[i].object_value(); + auto policy_it = policy.find("ROUND_ROBIN"); + if (policy_it != policy.end()) { + if (policy_it->second.type() != Json::Type::OBJECT) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:ROUND_ROBIN error:type should be object")); + } + break; + } + policy_it = policy.find("RING_HASH"); + if (policy_it != policy.end()) { + if (policy_it->second.type() != Json::Type::OBJECT) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:RING_HASH error:type should be object")); + continue; + } + // TODO(donnadionne): Move this to a method in + // ring_hash_experimental and call it here. + const Json::Object& ring_hash = policy_it->second.object_value(); + xds_lb_policy = array[i]; + size_t min_ring_size = 1024; + size_t max_ring_size = 8388608; + auto ring_hash_it = ring_hash.find("min_ring_size"); + if (ring_hash_it == ring_hash.end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:min_ring_size missing")); + } else if (ring_hash_it->second.type() != Json::Type::NUMBER) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:min_ring_size error: should be of " + "number")); + } else { + min_ring_size = gpr_parse_nonnegative_int( + ring_hash_it->second.string_value().c_str()); + } + ring_hash_it = ring_hash.find("max_ring_size"); + if (ring_hash_it == ring_hash.end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:max_ring_size missing")); + } else if (ring_hash_it->second.type() != Json::Type::NUMBER) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:max_ring_size error: should be of " + "number")); + } else { + max_ring_size = gpr_parse_nonnegative_int( + ring_hash_it->second.string_value().c_str()); + } + if (min_ring_size <= 0 || min_ring_size > 8388608 || + max_ring_size <= 0 || max_ring_size > 8388608 || + min_ring_size > max_ring_size) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:max_ring_size and or min_ring_size error: " + "values need to be in the range of 1 to 8388608 " + "and max_ring_size cannot be smaller than " + "min_ring_size")); + } + ring_hash_it = ring_hash.find("hash_function"); + if (ring_hash_it == ring_hash.end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:hash_function missing")); + } else if (ring_hash_it->second.type() != Json::Type::STRING) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:hash_function error: should be a " + "string")); + } else if (ring_hash_it->second.string_value() != "XX_HASH" && + ring_hash_it->second.string_value() != "MURMUR_HASH_2") { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:hash_function error: unsupported " + "hash_function")); + } + break; + } + } + } + } // Construct config. if (error_list.empty()) { return MakeRefCounted( - std::move(discovery_mechanisms), std::move(locality_picking_policy), - std::move(endpoint_picking_policy)); + std::move(discovery_mechanisms), std::move(xds_lb_policy)); } else { *error = GRPC_ERROR_CREATE_FROM_VECTOR( "xds_cluster_resolver_experimental LB policy config", &error_list); diff --git a/src/core/ext/filters/client_channel/server_address.cc b/src/core/ext/filters/client_channel/server_address.cc index c8870cae0ea..89d7c90f79c 100644 --- a/src/core/ext/filters/client_channel/server_address.cc +++ b/src/core/ext/filters/client_channel/server_address.cc @@ -31,6 +31,12 @@ namespace grpc_core { +// +// ServerAddressWeightAttribute +// +const char* ServerAddressWeightAttribute::kServerAddressWeightAttributeKey = + "server_address_weight"; + // // ServerAddress // diff --git a/src/core/ext/filters/client_channel/server_address.h b/src/core/ext/filters/client_channel/server_address.h index 7a188a0ce45..1e0eaa11dc9 100644 --- a/src/core/ext/filters/client_channel/server_address.h +++ b/src/core/ext/filters/client_channel/server_address.h @@ -25,8 +25,10 @@ #include #include "absl/container/inlined_vector.h" +#include "absl/strings/str_format.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/useful.h" #include "src/core/lib/iomgr/resolve_address.h" namespace grpc_core { @@ -108,6 +110,35 @@ class ServerAddress { typedef absl::InlinedVector ServerAddressList; +// +// ServerAddressWeightAttribute +// +class ServerAddressWeightAttribute : public ServerAddress::AttributeInterface { + public: + static const char* kServerAddressWeightAttributeKey; + + explicit ServerAddressWeightAttribute(uint32_t weight) : weight_(weight) {} + + uint32_t weight() const { return weight_; } + + std::unique_ptr Copy() const override { + return absl::make_unique(weight_); + } + + int Cmp(const AttributeInterface* other) const override { + const auto* other_locality_attr = + static_cast(other); + return GPR_ICMP(weight_, other_locality_attr->weight_); + } + + std::string ToString() const override { + return absl::StrFormat("%d", weight_); + } + + private: + uint32_t weight_; +}; + } // namespace grpc_core #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SERVER_ADDRESS_H */ diff --git a/src/core/ext/xds/xds_api.cc b/src/core/ext/xds/xds_api.cc index 7c411ce5a4b..ce58dd9a2e7 100644 --- a/src/core/ext/xds/xds_api.cc +++ b/src/core/ext/xds/xds_api.cc @@ -91,7 +91,7 @@ namespace grpc_core { -// TODO (donnadionne): Check to see if timeout is enabled, this will be +// TODO(donnadionne): Check to see if timeout is enabled, this will be // removed once timeout feature is fully integration-tested and enabled by // default. bool XdsTimeoutEnabled() { @@ -102,7 +102,7 @@ bool XdsTimeoutEnabled() { return parse_succeeded && parsed_value; } -// TODO (donnadionne): Check to see if cluster types aggregate_cluster and +// TODO(donnadionne): Check to see if cluster types aggregate_cluster and // logical_dns are enabled, this will be // removed once the cluster types are fully integration-tested and enabled by // default. @@ -115,6 +115,17 @@ bool XdsAggregateAndLogicalDnsClusterEnabled() { return parse_succeeded && parsed_value; } +// TODO(donnadionne): Check to see if ring hash policy is enabled, this will be +// removed once ring hash policy is fully integration-tested and enabled by +// default. +bool XdsRingHashEnabled() { + char* value = gpr_getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); + bool parsed_value; + bool parse_succeeded = gpr_parse_bool_value(value, &parsed_value); + gpr_free(value); + return parse_succeeded && parsed_value; +} + // TODO(yashykt): Check to see if xDS security is enabled. This will be // removed once this feature is fully integration-tested and enabled by // default. @@ -1759,11 +1770,88 @@ grpc_error* CdsResponseParse( } } // Check the LB policy. - if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) != + if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) == envoy_config_cluster_v3_Cluster_ROUND_ROBIN) { + cds_update.lb_policy = "ROUND_ROBIN"; + } else if (XdsRingHashEnabled() && + envoy_config_cluster_v3_Cluster_lb_policy(cluster) == + envoy_config_cluster_v3_Cluster_RING_HASH) { + cds_update.lb_policy = "RING_HASH"; + // Record ring hash lb config + auto* ring_hash_config = + envoy_config_cluster_v3_Cluster_ring_hash_lb_config(cluster); + if (ring_hash_config == nullptr) { + errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat(cluster_name, + ": ring hash lb config required but not present.") + .c_str())); + resource_names_failed->insert(cluster_name); + continue; + } + const google_protobuf_UInt64Value* max_ring_size = + envoy_config_cluster_v3_Cluster_RingHashLbConfig_maximum_ring_size( + ring_hash_config); + if (max_ring_size != nullptr) { + cds_update.max_ring_size = + google_protobuf_UInt64Value_value(max_ring_size); + if (cds_update.max_ring_size > 8388608 || + cds_update.max_ring_size == 0) { + errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat( + cluster_name, + ": max_ring_size is not in the range of 1 to 8388608.") + .c_str())); + resource_names_failed->insert(cluster_name); + continue; + } + } + const google_protobuf_UInt64Value* min_ring_size = + envoy_config_cluster_v3_Cluster_RingHashLbConfig_minimum_ring_size( + ring_hash_config); + if (min_ring_size != nullptr) { + cds_update.min_ring_size = + google_protobuf_UInt64Value_value(min_ring_size); + if (cds_update.min_ring_size > 8388608 || + cds_update.min_ring_size == 0) { + errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat( + cluster_name, + ": min_ring_size is not in the range of 1 to 8388608.") + .c_str())); + resource_names_failed->insert(cluster_name); + continue; + } + if (cds_update.min_ring_size > cds_update.max_ring_size) { + errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat( + cluster_name, + ": min_ring_size cannot be greater than max_ring_size.") + .c_str())); + resource_names_failed->insert(cluster_name); + continue; + } + } + if (envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function( + ring_hash_config) == + envoy_config_cluster_v3_Cluster_RingHashLbConfig_XX_HASH) { + cds_update.hash_function = XdsApi::CdsUpdate::HashFunction::XX_HASH; + } else if ( + envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function( + ring_hash_config) == + envoy_config_cluster_v3_Cluster_RingHashLbConfig_MURMUR_HASH_2) { + cds_update.hash_function = + XdsApi::CdsUpdate::HashFunction::MURMUR_HASH_2; + } else { + errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat(cluster_name, + ": ring hash lb config has invalid hash function.") + .c_str())); + resource_names_failed->insert(cluster_name); + continue; + } + } else { errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": LB policy is not ROUND_ROBIN.") - .c_str())); + absl::StrCat(cluster_name, ": LB policy is not supported.").c_str())); resource_names_failed->insert(cluster_name); continue; } diff --git a/src/core/ext/xds/xds_api.h b/src/core/ext/xds/xds_api.h index 4a5ffe14bc7..88aeeb43a34 100644 --- a/src/core/ext/xds/xds_api.h +++ b/src/core/ext/xds/xds_api.h @@ -242,6 +242,13 @@ class XdsApi { // If set to the empty string, will use the same server we obtained the CDS // data from. absl::optional lrs_load_reporting_server_name; + // The LB policy to use (e.g., "ROUND_ROBIN" or "RING_HASH"). + std::string lb_policy; + // Used for RING_HASH LB policy only. + uint64_t min_ring_size = 1024; + uint64_t max_ring_size = 8388608; + enum HashFunction { XX_HASH, MURMUR_HASH_2 }; + HashFunction hash_function; // Maximum number of outstanding requests can be made to the upstream // cluster. uint32_t max_concurrent_requests = 1024; diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 6977eb72cf2..150e6964b28 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -5693,7 +5693,7 @@ TEST_P(CdsTest, WrongLbPolicy) { balancers_[0]->ads_service()->cds_response_state(); EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED); EXPECT_THAT(response_state.error_message, - ::testing::HasSubstr("LB policy is not ROUND_ROBIN.")); + ::testing::HasSubstr("LB policy is not supported.")); } // Tests that CDS client should send a NACK if the lrs_server in CDS response is