diff --git a/BUILD b/BUILD index aab73dbccbe..94bab543e83 100644 --- a/BUILD +++ b/BUILD @@ -552,6 +552,7 @@ GRPC_XDS_TARGETS = [ "grpc_lb_policy_xds_cluster_impl", "grpc_lb_policy_xds_cluster_manager", "grpc_lb_policy_xds_cluster_resolver", + "grpc_lb_policy_xds_wrr_locality", "grpc_resolver_xds", "grpc_resolver_c2p", "grpc_xds_server_config_fetcher", @@ -5023,15 +5024,20 @@ grpc_cc_library( ) grpc_cc_library( - name = "grpc_lb_xds_common", + name = "grpc_lb_xds_attributes", + srcs = [ + "src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.cc", + ], hdrs = [ - "src/core/ext/filters/client_channel/lb_policy/xds/xds.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.h", ], + external_deps = ["absl/strings"], language = "c++", deps = [ "gpr_platform", "ref_counted_ptr", "server_address", + "useful", "xds_client", ], ) @@ -5056,9 +5062,8 @@ grpc_cc_library( "grpc_base", "grpc_client_channel", "grpc_lb_address_filtering", - "grpc_lb_policy_ring_hash", + "grpc_lb_xds_attributes", "grpc_lb_xds_channel_args", - "grpc_lb_xds_common", "grpc_public_hdrs", "grpc_resolver", "grpc_resolver_fake", @@ -5102,8 +5107,8 @@ grpc_cc_library( "gpr", "grpc_base", "grpc_client_channel", + "grpc_lb_xds_attributes", "grpc_lb_xds_channel_args", - "grpc_lb_xds_common", "grpc_public_hdrs", "grpc_trace", "grpc_xds_client", @@ -5166,6 +5171,42 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "grpc_lb_policy_xds_wrr_locality", + srcs = [ + "src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc", + ], + external_deps = [ + "absl/status", + "absl/status:statusor", + "absl/strings", + ], + language = "c++", + deps = [ + "channel_args", + "config", + "debug_location", + "gpr", + "grpc_base", + "grpc_lb_xds_attributes", + "grpc_public_hdrs", + "grpc_trace", + "json", + "json_args", + "json_object_loader", + "lb_policy", + "lb_policy_factory", + "lb_policy_registry", + "orphanable", + "pollset_set", + "ref_counted_ptr", + "server_address", + "subchannel_interface", + "validation_errors", + "xds_client", + ], +) + grpc_cc_library( name = "grpc_lb_address_filtering", srcs = [ diff --git a/CMakeLists.txt b/CMakeLists.txt index 87bfa48d5c8..8a042cba728 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1716,9 +1716,11 @@ add_library(grpc src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc src/core/ext/filters/client_channel/lb_policy/xds/cds.cc + src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.cc src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc + src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc src/core/ext/filters/client_channel/local_subchannel_pool.cc src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc diff --git a/Makefile b/Makefile index 5e416f9ef87..42c94d8e2bb 100644 --- a/Makefile +++ b/Makefile @@ -989,9 +989,11 @@ LIBGRPC_SRC = \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ + src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc \ + src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc \ src/core/ext/filters/client_channel/local_subchannel_pool.cc \ src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc \ @@ -2793,9 +2795,11 @@ ifneq ($(OPENSSL_DEP),) # installing headers to their final destination on the drive. We need this # otherwise parallel compilation will fail if a source is compiled first. src/core/ext/filters/client_channel/lb_policy/xds/cds.cc: $(OPENSSL_DEP) +src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc: $(OPENSSL_DEP) +src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/resolver/google_c2p/google_c2p_resolver.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc: $(OPENSSL_DEP) src/core/ext/filters/rbac/rbac_filter.cc: $(OPENSSL_DEP) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index b0970e4a3ab..3f55ecf428d 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -336,7 +336,7 @@ libs: - src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h - src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h - src/core/ext/filters/client_channel/lb_policy/subchannel_list.h - - src/core/ext/filters/client_channel/lb_policy/xds/xds.h + - src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.h - src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h - src/core/ext/filters/client_channel/local_subchannel_pool.h - src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h @@ -1047,9 +1047,11 @@ libs: - src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc - src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc - src/core/ext/filters/client_channel/lb_policy/xds/cds.cc + - src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.cc - src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc - src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc - src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc + - src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc - src/core/ext/filters/client_channel/local_subchannel_pool.cc - src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc - src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc diff --git a/config.m4 b/config.m4 index 2ea580cf2ca..713c8ac41e0 100644 --- a/config.m4 +++ b/config.m4 @@ -71,9 +71,11 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ + src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc \ + src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc \ src/core/ext/filters/client_channel/local_subchannel_pool.cc \ src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc \ diff --git a/config.w32 b/config.w32 index eea9718381e..d6045ba1670 100644 --- a/config.w32 +++ b/config.w32 @@ -37,9 +37,11 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\client_channel\\lb_policy\\round_robin\\round_robin.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\weighted_target\\weighted_target.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\cds.cc " + + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_attributes.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_impl.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_manager.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_resolver.cc " + + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_wrr_locality.cc " + "src\\core\\ext\\filters\\client_channel\\local_subchannel_pool.cc " + "src\\core\\ext\\filters\\client_channel\\resolver\\binder\\binder_resolver.cc " + "src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\dns_resolver_ares.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 4eb9d98e9e8..ff7e8e3ba9f 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -249,7 +249,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h', 'src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h', 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.h', - 'src/core/ext/filters/client_channel/lb_policy/xds/xds.h', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.h', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h', 'src/core/ext/filters/client_channel/local_subchannel_pool.h', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h', @@ -1132,7 +1132,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h', 'src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h', 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.h', - 'src/core/ext/filters/client_channel/lb_policy/xds/xds.h', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.h', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h', 'src/core/ext/filters/client_channel/local_subchannel_pool.h', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 972c11ea77e..b327246f582 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -255,11 +255,13 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.h', 'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc', - 'src/core/ext/filters/client_channel/lb_policy/xds/xds.h', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.cc', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.h', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc', 'src/core/ext/filters/client_channel/local_subchannel_pool.cc', 'src/core/ext/filters/client_channel/local_subchannel_pool.h', 'src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc', @@ -1781,7 +1783,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h', 'src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h', 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.h', - 'src/core/ext/filters/client_channel/lb_policy/xds/xds.h', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.h', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h', 'src/core/ext/filters/client_channel/local_subchannel_pool.h', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h', diff --git a/grpc.gemspec b/grpc.gemspec index 6dc84c8a9d7..93e9e90e37e 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -166,11 +166,13 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/lb_policy/subchannel_list.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/cds.cc ) - s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds.h ) + s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.cc ) + s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc ) + s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc ) s.files += %w( src/core/ext/filters/client_channel/local_subchannel_pool.cc ) s.files += %w( src/core/ext/filters/client_channel/local_subchannel_pool.h ) s.files += %w( src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc ) diff --git a/grpc.gyp b/grpc.gyp index 37e7226fe68..f496f3d1f1e 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -403,9 +403,11 @@ 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc', 'src/core/ext/filters/client_channel/local_subchannel_pool.cc', 'src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc', diff --git a/package.xml b/package.xml index 5af3a17129d..95d29a830d0 100644 --- a/package.xml +++ b/package.xml @@ -148,11 +148,13 @@ - + + + diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index d86f66cb830..5bc13e7aece 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -504,27 +504,15 @@ void CdsLb::OnClusterChanged(const std::string& name, // underlying cluster that we may be processing an update for. auto it = watchers_.find(config_->cluster()); GPR_ASSERT(it != watchers_.end()); - const std::string& lb_policy = it->second.update->lb_policy; // Construct config for child policy. - Json::Object xds_lb_policy; - if (lb_policy == "RING_HASH") { - xds_lb_policy["RING_HASH"] = Json::Object{ - {"min_ring_size", cluster_data.min_ring_size}, - {"max_ring_size", cluster_data.max_ring_size}, - }; - } else { - xds_lb_policy["ROUND_ROBIN"] = Json::Object(); - } - Json::Object child_config = { - {"xdsLbPolicy", - Json::Array{ - xds_lb_policy, - }}, - {"discoveryMechanisms", std::move(discovery_mechanisms)}, - }; Json json = Json::Array{ Json::Object{ - {"xds_cluster_resolver_experimental", std::move(child_config)}, + {"xds_cluster_resolver_experimental", + Json::Object{ + {"xdsLbPolicy", + std::move(it->second.update->lb_policy_config)}, + {"discoveryMechanisms", std::move(discovery_mechanisms)}, + }}, }, }; if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.cc new file mode 100644 index 00000000000..e6918fa4d4e --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.cc @@ -0,0 +1,42 @@ +// +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include + +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.h" + +#include "absl/strings/str_cat.h" + +#include "src/core/lib/gpr/useful.h" + +namespace grpc_core { + +const char* kXdsLocalityNameAttributeKey = "xds_locality_name"; + +int XdsLocalityAttribute::Cmp(const AttributeInterface* other) const { + const auto* other_locality_attr = + static_cast(other); + int r = locality_name_->Compare(*other_locality_attr->locality_name_); + if (r != 0) return r; + return QsortCompare(weight_, other_locality_attr->weight_); +} + +std::string XdsLocalityAttribute::ToString() const { + return absl::StrCat("{name=", locality_name_->AsHumanReadableString(), + ", weight=", weight_, "}"); +} + +} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.h similarity index 65% rename from src/core/ext/filters/client_channel/lb_policy/xds/xds.h rename to src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.h index 0b2ea4e2f9a..17382c34d4e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.h +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.h @@ -14,11 +14,13 @@ // limitations under the License. // -#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H -#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_ATTRIBUTES_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_ATTRIBUTES_H #include +#include + #include #include #include @@ -29,36 +31,34 @@ namespace grpc_core { -// Defined in the EDS policy. extern const char* kXdsLocalityNameAttributeKey; class XdsLocalityAttribute : public ServerAddress::AttributeInterface { public: - explicit XdsLocalityAttribute(RefCountedPtr locality_name) - : locality_name_(std::move(locality_name)) {} + XdsLocalityAttribute(RefCountedPtr locality_name, + uint32_t weight) + : locality_name_(std::move(locality_name)), weight_(weight) {} RefCountedPtr locality_name() const { return locality_name_; } + uint32_t weight() const { return weight_; } + std::unique_ptr Copy() const override { - return std::make_unique(locality_name_->Ref()); + return std::make_unique(locality_name_->Ref(), + weight_); } - int Cmp(const AttributeInterface* other) const override { - const auto* other_locality_attr = - static_cast(other); - return locality_name_->Compare(*other_locality_attr->locality_name_); - } + int Cmp(const AttributeInterface* other) const override; - std::string ToString() const override { - return locality_name_->AsHumanReadableString(); - } + std::string ToString() const override; private: RefCountedPtr locality_name_; + uint32_t weight_; }; } // namespace grpc_core -#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H */ +#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_ATTRIBUTES_H diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc index e1d8f28c4de..fd1c72ba8ae 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc @@ -38,7 +38,7 @@ #include #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" -#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h" +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.h" #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h" #include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_bootstrap_grpc.h" diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc index ed7e740479e..5ebbcd8f9ef 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc @@ -40,8 +40,7 @@ #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h" #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" -#include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h" -#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h" +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.h" #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/ext/xds/xds_bootstrap.h" @@ -78,8 +77,6 @@ namespace grpc_core { TraceFlag grpc_lb_xds_cluster_resolver_trace(false, "xds_cluster_resolver_lb"); -const char* kXdsLocalityNameAttributeKey = "xds_locality_name"; - namespace { constexpr absl::string_view kXdsClusterResolver = @@ -147,7 +144,7 @@ class XdsClusterResolverLbConfig : public LoadBalancingPolicy::Config { private: std::vector discovery_mechanisms_; - Json xds_lb_policy_ = Json::Object{{"ROUND_ROBIN", Json::Object()}}; + Json xds_lb_policy_; }; // Xds Cluster Resolver LB policy. @@ -839,7 +836,7 @@ ServerAddressList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() { MakeHierarchicalPathAttribute(hierarchical_path)) .WithAttribute(kXdsLocalityNameAttributeKey, std::make_unique( - locality_name->Ref())) + locality_name->Ref(), locality.lb_weight)) .WithAttribute( ServerAddressWeightAttribute:: kServerAddressWeightAttributeKey, @@ -870,60 +867,13 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() { for (size_t priority = 0; priority < discovery_entry.latest_update->priorities.size(); ++priority) { - const auto& priority_entry = - discovery_entry.latest_update->priorities[priority]; Json child_policy; if (!discovery_entry.discovery_mechanism->override_child_policy() .empty()) { child_policy = discovery_entry.discovery_mechanism->override_child_policy(); } else { - const auto& xds_lb_policy = config_->xds_lb_policy().object_value(); - if (xds_lb_policy.find("ROUND_ROBIN") != xds_lb_policy.end()) { - const auto& localities = priority_entry.localities; - Json::Object weighted_targets; - for (const auto& p : localities) { - XdsLocalityName* locality_name = p.first; - const auto& locality = p.second; - // Add weighted target entry. - weighted_targets[locality_name->AsHumanReadableString()] = - Json::Object{ - {"weight", locality.lb_weight}, - {"childPolicy", - Json::Array{ - Json::Object{ - {"round_robin", Json::Object()}, - }, - }}, - }; - } - // Construct locality-picking policy. - // Start with field from our config and add the "targets" field. - child_policy = Json::Array{ - Json::Object{ - {"weighted_target_experimental", - Json::Object{ - {"targets", Json::Object()}, - }}, - }, - }; - Json::Object& config = - *(*child_policy.mutable_array())[0].mutable_object(); - auto it = config.begin(); - GPR_ASSERT(it != config.end()); - (*it->second.mutable_object())["targets"] = - std::move(weighted_targets); - } else { - auto it = xds_lb_policy.find("RING_HASH"); - GPR_ASSERT(it != xds_lb_policy.end()); - Json::Object ring_hash_experimental_policy = - it->second.object_value(); - child_policy = Json::Array{ - Json::Object{ - {"ring_hash_experimental", ring_hash_experimental_policy}, - }, - }; - } + child_policy = config_->xds_lb_policy(); } // Wrap it in the drop policy. Json::Array drop_categories; @@ -1128,8 +1078,7 @@ const JsonLoaderInterface* XdsClusterResolverLbConfig::JsonLoader( return loader; } -void XdsClusterResolverLbConfig::JsonPostLoad(const Json& json, - const JsonArgs& args, +void XdsClusterResolverLbConfig::JsonPostLoad(const Json& json, const JsonArgs&, ValidationErrors* errors) { // Validate discoveryMechanisms. { @@ -1142,37 +1091,14 @@ void XdsClusterResolverLbConfig::JsonPostLoad(const Json& json, { ValidationErrors::ScopedField field(errors, ".xdsLbPolicy"); auto it = json.object_value().find("xdsLbPolicy"); - if (it != json.object_value().end()) { - if (it->second.type() != Json::Type::ARRAY) { - errors->AddError("is not an array"); - } else { - const Json::Array& array = it->second.array_value(); - for (size_t i = 0; i < array.size(); ++i) { - ValidationErrors::ScopedField field(errors, - absl::StrCat("[", i, "]")); - if (array[i].type() != Json::Type::OBJECT) { - errors->AddError("is not an object"); - continue; - } - const Json::Object& policy = array[i].object_value(); - auto policy_it = policy.find("ROUND_ROBIN"); - if (policy_it != policy.end()) { - ValidationErrors::ScopedField field(errors, "[\"ROUND_ROBIN\"]"); - if (policy_it->second.type() != Json::Type::OBJECT) { - errors->AddError("is not an object"); - } - break; - } - { - ValidationErrors::ScopedField field(errors, "[\"RING_HASH\"]"); - policy_it = policy.find("RING_HASH"); - if (policy_it != policy.end()) { - LoadFromJson(policy_it->second, args, errors); - xds_lb_policy_ = array[i]; - } - } - } - } + if (it == json.object_value().end()) { + errors->AddError("field not present"); + } else { + auto lb_config = CoreConfiguration::Get() + .lb_policy_registry() + .ParseLoadBalancingConfig(it->second); + if (!lb_config.ok()) errors->AddError(lb_config.status().message()); + xds_lb_policy_ = it->second; } } } diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc new file mode 100644 index 00000000000..1f7d1c83936 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc @@ -0,0 +1,357 @@ +// +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include + +#include +#include +#include +#include +#include +#include + +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" + +#include +#include + +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.h" +#include "src/core/ext/xds/xds_client_stats.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/gprpp/debug_location.h" +#include "src/core/lib/gprpp/orphanable.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/validation_errors.h" +#include "src/core/lib/iomgr/pollset_set.h" +#include "src/core/lib/json/json.h" +#include "src/core/lib/json/json_args.h" +#include "src/core/lib/json/json_object_loader.h" +#include "src/core/lib/load_balancing/lb_policy.h" +#include "src/core/lib/load_balancing/lb_policy_factory.h" +#include "src/core/lib/load_balancing/lb_policy_registry.h" +#include "src/core/lib/load_balancing/subchannel_interface.h" +#include "src/core/lib/resolver/server_address.h" +#include "src/core/lib/transport/connectivity_state.h" + +namespace grpc_core { + +TraceFlag grpc_xds_wrr_locality_lb_trace(false, "xds_wrr_locality_lb"); + +namespace { + +constexpr absl::string_view kXdsWrrLocality = "xds_wrr_locality_experimental"; + +// Config for xds_wrr_locality LB policy. +class XdsWrrLocalityLbConfig : public LoadBalancingPolicy::Config { + public: + XdsWrrLocalityLbConfig() = default; + + XdsWrrLocalityLbConfig(const XdsWrrLocalityLbConfig&) = delete; + XdsWrrLocalityLbConfig& operator=(const XdsWrrLocalityLbConfig&) = delete; + + XdsWrrLocalityLbConfig(XdsWrrLocalityLbConfig&& other) = delete; + XdsWrrLocalityLbConfig& operator=(XdsWrrLocalityLbConfig&& other) = delete; + + absl::string_view name() const override { return kXdsWrrLocality; } + + const Json& child_config() const { return child_config_; } + + static const JsonLoaderInterface* JsonLoader(const JsonArgs&) { + // Note: The "childPolicy" field requires custom processing, so + // it's handled in JsonPostLoad() instead. + static const auto* loader = + JsonObjectLoader().Finish(); + return loader; + } + + void JsonPostLoad(const Json& json, const JsonArgs&, + ValidationErrors* errors) { + ValidationErrors::ScopedField field(errors, ".childPolicy"); + auto it = json.object_value().find("childPolicy"); + if (it == json.object_value().end()) { + errors->AddError("field not present"); + return; + } + auto lb_config = + CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig( + it->second); + if (!lb_config.ok()) { + errors->AddError(lb_config.status().message()); + return; + } + child_config_ = it->second; + } + + private: + Json child_config_; +}; + +// xds_wrr_locality LB policy. +class XdsWrrLocalityLb : public LoadBalancingPolicy { + public: + explicit XdsWrrLocalityLb(Args args); + + absl::string_view name() const override { return kXdsWrrLocality; } + + absl::Status UpdateLocked(UpdateArgs args) override; + void ExitIdleLocked() override; + void ResetBackoffLocked() override; + + private: + class Helper : public ChannelControlHelper { + public: + explicit Helper(RefCountedPtr xds_wrr_locality) + : xds_wrr_locality_(std::move(xds_wrr_locality)) {} + + ~Helper() override { xds_wrr_locality_.reset(DEBUG_LOCATION, "Helper"); } + + RefCountedPtr CreateSubchannel( + ServerAddress address, const ChannelArgs& args) override; + void UpdateState(grpc_connectivity_state state, const absl::Status& status, + std::unique_ptr picker) override; + void RequestReresolution() override; + absl::string_view GetAuthority() override; + void AddTraceEvent(TraceSeverity severity, + absl::string_view message) override; + + private: + RefCountedPtr xds_wrr_locality_; + }; + + ~XdsWrrLocalityLb() override; + + void ShutdownLocked() override; + + OrphanablePtr CreateChildPolicyLocked( + const ChannelArgs& args); + + OrphanablePtr child_policy_; +}; + +// +// XdsWrrLocalityLb +// + +XdsWrrLocalityLb::XdsWrrLocalityLb(Args args) + : LoadBalancingPolicy(std::move(args)) {} + +XdsWrrLocalityLb::~XdsWrrLocalityLb() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_wrr_locality_lb_trace)) { + gpr_log(GPR_INFO, "[xds_wrr_locality_lb %p] destroying", this); + } +} + +void XdsWrrLocalityLb::ShutdownLocked() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_wrr_locality_lb_trace)) { + gpr_log(GPR_INFO, "[xds_wrr_locality_lb %p] shutting down", this); + } + if (child_policy_ != nullptr) { + grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), + interested_parties()); + child_policy_.reset(); + } +} + +void XdsWrrLocalityLb::ExitIdleLocked() { + if (child_policy_ != nullptr) child_policy_->ExitIdleLocked(); +} + +void XdsWrrLocalityLb::ResetBackoffLocked() { + if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); +} + +absl::Status XdsWrrLocalityLb::UpdateLocked(UpdateArgs args) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_wrr_locality_lb_trace)) { + gpr_log(GPR_INFO, "[xds_wrr_locality_lb %p] Received update", this); + } + RefCountedPtr config = std::move(args.config); + // Scan the addresses to find the weight for each locality. + std::map locality_weights; + if (args.addresses.ok()) { + for (const auto& address : *args.addresses) { + auto* attribute = static_cast( + address.GetAttribute(kXdsLocalityNameAttributeKey)); + if (attribute != nullptr) { + auto p = locality_weights.emplace( + attribute->locality_name()->AsHumanReadableString(), + attribute->weight()); + if (!p.second && p.first->second != attribute->weight()) { + gpr_log(GPR_ERROR, + "INTERNAL ERROR: xds_wrr_locality found different weights " + "for locality %s (%d vs %d); using first value", + p.first->first.c_str(), p.first->second, attribute->weight()); + } + } + } + } + // Construct the config for the weighted_target policy. + Json::Object weighted_targets; + for (const auto& p : locality_weights) { + const std::string& locality_name = p.first; + uint32_t weight = p.second; + // Add weighted target entry. + weighted_targets[locality_name] = Json::Object{ + {"weight", weight}, + {"childPolicy", config->child_config()}, + }; + } + Json child_config_json = Json::Array{ + Json::Object{ + {"weighted_target_experimental", + Json::Object{ + {"targets", std::move(weighted_targets)}, + }}, + }, + }; + // Parse config. + auto child_config = + CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig( + child_config_json); + if (!child_config.ok()) { + // This should never happen, but if it does, we basically have no + // way to fix it, so we put the channel in TRANSIENT_FAILURE. + gpr_log(GPR_ERROR, + "[xds_wrr_locality %p] error parsing generated child policy " + "config -- putting channel in TRANSIENT_FAILURE: %s", + this, child_config.status().ToString().c_str()); + absl::Status status = absl::InternalError(absl::StrCat( + "xds_wrr_locality LB policy: error parsing generated child policy " + "config: ", + child_config.status().ToString())); + channel_control_helper()->UpdateState( + GRPC_CHANNEL_TRANSIENT_FAILURE, status, + std::make_unique(status)); + return status; + } + // Create child policy if needed (i.e., on first update). + if (child_policy_ == nullptr) { + child_policy_ = CreateChildPolicyLocked(args.args); + } + // Construct update args. + UpdateArgs update_args; + update_args.addresses = std::move(args.addresses); + update_args.config = std::move(*child_config); + update_args.resolution_note = std::move(args.resolution_note); + update_args.args = std::move(args.args); + // Update the policy. + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_wrr_locality_lb_trace)) { + gpr_log(GPR_INFO, "[xds_wrr_locality_lb %p] updating child policy %p", this, + child_policy_.get()); + } + return child_policy_->UpdateLocked(std::move(update_args)); +} + +OrphanablePtr XdsWrrLocalityLb::CreateChildPolicyLocked( + const ChannelArgs& args) { + LoadBalancingPolicy::Args lb_policy_args; + lb_policy_args.work_serializer = work_serializer(); + lb_policy_args.args = args; + lb_policy_args.channel_control_helper = + std::make_unique(this->Ref(DEBUG_LOCATION, "Helper")); + auto lb_policy = + CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy( + "weighted_target_experimental", std::move(lb_policy_args)); + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_wrr_locality_lb_trace)) { + gpr_log(GPR_INFO, "[xds_wrr_locality_lb %p] created new child policy %p", + this, lb_policy.get()); + } + // Add our interested_parties pollset_set to that of the newly created + // child policy. This will make the child policy progress upon activity on + // this LB policy, which in turn is tied to the application's call. + grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), + interested_parties()); + return lb_policy; +} + +// +// XdsWrrLocalityLb::Helper +// + +RefCountedPtr XdsWrrLocalityLb::Helper::CreateSubchannel( + ServerAddress address, const ChannelArgs& args) { + return xds_wrr_locality_->channel_control_helper()->CreateSubchannel( + std::move(address), args); +} + +void XdsWrrLocalityLb::Helper::UpdateState( + grpc_connectivity_state state, const absl::Status& status, + std::unique_ptr picker) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_wrr_locality_lb_trace)) { + gpr_log( + GPR_INFO, + "[xds_wrr_locality_lb %p] update from child: state=%s (%s) picker=%p", + xds_wrr_locality_.get(), ConnectivityStateName(state), + status.ToString().c_str(), picker.get()); + } + xds_wrr_locality_->channel_control_helper()->UpdateState(state, status, + std::move(picker)); +} + +void XdsWrrLocalityLb::Helper::RequestReresolution() { + xds_wrr_locality_->channel_control_helper()->RequestReresolution(); +} + +absl::string_view XdsWrrLocalityLb::Helper::GetAuthority() { + return xds_wrr_locality_->channel_control_helper()->GetAuthority(); +} + +void XdsWrrLocalityLb::Helper::AddTraceEvent(TraceSeverity severity, + absl::string_view message) { + xds_wrr_locality_->channel_control_helper()->AddTraceEvent(severity, message); +} + +// +// factory +// + +class XdsWrrLocalityLbFactory : public LoadBalancingPolicyFactory { + public: + OrphanablePtr CreateLoadBalancingPolicy( + LoadBalancingPolicy::Args args) const override { + return MakeOrphanable(std::move(args)); + } + + absl::string_view name() const override { return kXdsWrrLocality; } + + absl::StatusOr> + ParseLoadBalancingConfig(const Json& json) const override { + if (json.type() == Json::Type::JSON_NULL) { + // xds_wrr_locality was mentioned as a policy in the deprecated + // loadBalancingPolicy field or in the client API. + return absl::InvalidArgumentError( + "field:loadBalancingPolicy error:xds_wrr_locality policy requires " + "configuration. Please use loadBalancingConfig field of service " + "config instead."); + } + return LoadRefCountedFromJson( + json, JsonArgs(), + "errors validating xds_wrr_locality LB policy config"); + } +}; + +} // namespace + +void RegisterXdsWrrLocalityLbPolicy(CoreConfiguration::Builder* builder) { + builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory( + std::make_unique()); +} + +} // namespace grpc_core diff --git a/src/core/ext/xds/xds_cluster.cc b/src/core/ext/xds/xds_cluster.cc index 1c08dcd934c..c3d0f697bca 100644 --- a/src/core/ext/xds/xds_cluster.cc +++ b/src/core/ext/xds/xds_cluster.cc @@ -87,11 +87,8 @@ std::string XdsClusterResource::ToString() const { contents.push_back(absl::StrCat("lrs_load_reporting_server_name=", lrs_load_reporting_server->server_uri())); } - contents.push_back(absl::StrCat("lb_policy=", lb_policy)); - if (lb_policy == "RING_HASH") { - contents.push_back(absl::StrCat("min_ring_size=", min_ring_size)); - contents.push_back(absl::StrCat("max_ring_size=", max_ring_size)); - } + contents.push_back( + absl::StrCat("lb_policy_config=", Json{lb_policy_config}.Dump())); contents.push_back( absl::StrCat("max_concurrent_requests=", max_concurrent_requests)); return absl::StrCat("{", absl::StrJoin(contents, ", "), "}"); @@ -327,39 +324,48 @@ absl::StatusOr CdsResourceParse( // Check the LB policy. if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) == envoy_config_cluster_v3_Cluster_ROUND_ROBIN) { - cds_update.lb_policy = "ROUND_ROBIN"; + cds_update.lb_policy_config = { + Json::Object{ + {"xds_wrr_locality_experimental", + Json::Object{ + {"childPolicy", + Json::Array{ + Json::Object{ + {"round_robin", Json::Object()}, + }, + }}, + }}, + }, + }; } else if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) == envoy_config_cluster_v3_Cluster_RING_HASH) { - cds_update.lb_policy = "RING_HASH"; // Record ring hash lb config auto* ring_hash_config = envoy_config_cluster_v3_Cluster_ring_hash_lb_config(cluster); + uint64_t min_ring_size = 1024; + uint64_t max_ring_size = 8388608; if (ring_hash_config != nullptr) { ValidationErrors::ScopedField field(&errors, ".ring_hash_lb_config"); - const google_protobuf_UInt64Value* max_ring_size = + const google_protobuf_UInt64Value* uint64_value = envoy_config_cluster_v3_Cluster_RingHashLbConfig_maximum_ring_size( ring_hash_config); - if (max_ring_size != nullptr) { + if (uint64_value != nullptr) { ValidationErrors::ScopedField field(&errors, ".maximum_ring_size"); - cds_update.max_ring_size = - google_protobuf_UInt64Value_value(max_ring_size); - if (cds_update.max_ring_size > 8388608 || - cds_update.max_ring_size == 0) { + max_ring_size = google_protobuf_UInt64Value_value(uint64_value); + if (max_ring_size > 8388608 || max_ring_size == 0) { errors.AddError("must be in the range of 1 to 8388608"); } } - const google_protobuf_UInt64Value* min_ring_size = + uint64_value = envoy_config_cluster_v3_Cluster_RingHashLbConfig_minimum_ring_size( ring_hash_config); - if (min_ring_size != nullptr) { + if (uint64_value != nullptr) { ValidationErrors::ScopedField field(&errors, ".minimum_ring_size"); - cds_update.min_ring_size = - google_protobuf_UInt64Value_value(min_ring_size); - if (cds_update.min_ring_size > 8388608 || - cds_update.min_ring_size == 0) { + min_ring_size = google_protobuf_UInt64Value_value(uint64_value); + if (min_ring_size > 8388608 || min_ring_size == 0) { errors.AddError("must be in the range of 1 to 8388608"); } - if (cds_update.min_ring_size > cds_update.max_ring_size) { + if (min_ring_size > max_ring_size) { errors.AddError("cannot be greater than maximum_ring_size"); } } @@ -370,6 +376,15 @@ absl::StatusOr CdsResourceParse( errors.AddError("invalid hash function"); } } + cds_update.lb_policy_config = { + Json::Object{ + {"ring_hash_experimental", + Json::Object{ + {"min_ring_size", min_ring_size}, + {"max_ring_size", max_ring_size}, + }}, + }, + }; } else { ValidationErrors::ScopedField field(&errors, ".lb_policy"); errors.AddError("LB policy is not supported"); diff --git a/src/core/ext/xds/xds_cluster.h b/src/core/ext/xds/xds_cluster.h index 8b838c696db..a1335643f1b 100644 --- a/src/core/ext/xds/xds_cluster.h +++ b/src/core/ext/xds/xds_cluster.h @@ -39,6 +39,7 @@ #include "src/core/ext/xds/xds_common_types.h" #include "src/core/ext/xds/xds_resource_type.h" #include "src/core/ext/xds/xds_resource_type_impl.h" +#include "src/core/lib/json/json.h" namespace grpc_core { @@ -63,11 +64,9 @@ struct XdsClusterResource : public XdsResourceType::ResourceData { // If not set, load reporting will be disabled. absl::optional lrs_load_reporting_server; - // The LB policy to use (e.g., "ROUND_ROBIN" or "RING_HASH"). - std::string lb_policy; - // Used for RING_HASH LB policy only. - uint64_t min_ring_size = 1024; - uint64_t max_ring_size = 8388608; + // The LB policy to use for locality and endpoint picking. + Json::Array lb_policy_config; + // Maximum number of outstanding requests can be made to the upstream // cluster. uint32_t max_concurrent_requests = 1024; @@ -81,9 +80,7 @@ struct XdsClusterResource : public XdsResourceType::ResourceData { prioritized_cluster_names == other.prioritized_cluster_names && common_tls_context == other.common_tls_context && lrs_load_reporting_server == other.lrs_load_reporting_server && - lb_policy == other.lb_policy && - min_ring_size == other.min_ring_size && - max_ring_size == other.max_ring_size && + lb_policy_config == other.lb_policy_config && max_concurrent_requests == other.max_concurrent_requests && outlier_detection == other.outlier_detection; } diff --git a/src/core/plugin_registry/grpc_plugin_registry_extra.cc b/src/core/plugin_registry/grpc_plugin_registry_extra.cc index 211884ced30..a2c80cd7d73 100644 --- a/src/core/plugin_registry/grpc_plugin_registry_extra.cc +++ b/src/core/plugin_registry/grpc_plugin_registry_extra.cc @@ -35,6 +35,7 @@ extern void RegisterXdsClusterImplLbPolicy(CoreConfiguration::Builder* builder); extern void RegisterCdsLbPolicy(CoreConfiguration::Builder* builder); extern void RegisterXdsClusterResolverLbPolicy( CoreConfiguration::Builder* builder); +extern void RegisterXdsWrrLocalityLbPolicy(CoreConfiguration::Builder* builder); extern void RegisterFileWatcherCertificateProvider( CoreConfiguration::Builder* builder); #endif @@ -53,6 +54,7 @@ void RegisterExtraFilters(CoreConfiguration::Builder* builder) { RegisterXdsClusterImplLbPolicy(builder); RegisterCdsLbPolicy(builder); RegisterXdsClusterResolverLbPolicy(builder); + RegisterXdsWrrLocalityLbPolicy(builder); RegisterFileWatcherCertificateProvider(builder); #endif } diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 198de98527d..e6195dd00a3 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -46,9 +46,11 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc', 'src/core/ext/filters/client_channel/local_subchannel_pool.cc', 'src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc', diff --git a/test/core/client_channel/service_config_test.cc b/test/core/client_channel/service_config_test.cc index 37fce590590..f6c24d700a0 100644 --- a/test/core/client_channel/service_config_test.cc +++ b/test/core/client_channel/service_config_test.cc @@ -536,7 +536,8 @@ TEST_F(ClientChannelParserTest, ValidLoadBalancingConfigXds) { " \"discoveryMechanisms\": [\n" " { \"clusterName\": \"foo\",\n" " \"type\": \"EDS\"\n" - " } ]\n" + " } ],\n" + " \"xdsLbPolicy\": [{\"round_robin\":{}}]\n" " } }\n" " ]\n" "}"; diff --git a/test/core/xds/xds_cluster_resource_type_test.cc b/test/core/xds/xds_cluster_resource_type_test.cc index dd905d49cdb..75d974e211d 100644 --- a/test/core/xds/xds_cluster_resource_type_test.cc +++ b/test/core/xds/xds_cluster_resource_type_test.cc @@ -44,6 +44,7 @@ #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/json/json.h" #include "src/proto/grpc/testing/xds/v3/address.pb.h" #include "src/proto/grpc/testing/xds/v3/aggregate_cluster.pb.h" #include "src/proto/grpc/testing/xds/v3/base.pb.h" @@ -147,7 +148,9 @@ TEST_F(XdsClusterTest, MinimumValidConfig) { EXPECT_EQ(resource.cluster_type, resource.EDS); EXPECT_EQ(resource.eds_service_name, ""); // Check defaults. - EXPECT_EQ(resource.lb_policy, "ROUND_ROBIN"); + EXPECT_EQ(Json{resource.lb_policy_config}.Dump(), + "[{\"xds_wrr_locality_experimental\":{\"childPolicy\":" + "[{\"round_robin\":{}}]}}]"); EXPECT_FALSE(resource.lrs_load_reporting_server.has_value()); EXPECT_EQ(resource.max_concurrent_requests, 1024); EXPECT_FALSE(resource.outlier_detection.has_value()); @@ -580,9 +583,9 @@ TEST_F(LbPolicyTest, LbPolicyRingHash) { ASSERT_TRUE(decode_result.name.has_value()); EXPECT_EQ(*decode_result.name, "foo"); auto& resource = static_cast(**decode_result.resource); - EXPECT_EQ(resource.lb_policy, "RING_HASH"); - EXPECT_EQ(resource.min_ring_size, 1024); - EXPECT_EQ(resource.max_ring_size, 8388608); + EXPECT_EQ(Json{resource.lb_policy_config}.Dump(), + "[{\"ring_hash_experimental\":{" + "\"max_ring_size\":8388608,\"min_ring_size\":1024}}]"); } TEST_F(LbPolicyTest, LbPolicyRingHashSetMinAndMaxRingSize) { @@ -603,9 +606,9 @@ TEST_F(LbPolicyTest, LbPolicyRingHashSetMinAndMaxRingSize) { ASSERT_TRUE(decode_result.name.has_value()); EXPECT_EQ(*decode_result.name, "foo"); auto& resource = static_cast(**decode_result.resource); - EXPECT_EQ(resource.lb_policy, "RING_HASH"); - EXPECT_EQ(resource.min_ring_size, 2048); - EXPECT_EQ(resource.max_ring_size, 4096); + EXPECT_EQ(Json{resource.lb_policy_config}.Dump(), + "[{\"ring_hash_experimental\":{" + "\"max_ring_size\":4096,\"min_ring_size\":2048}}]"); } TEST_F(LbPolicyTest, LbPolicyRingHashSetMinAndMaxRingSizeToZero) { @@ -762,9 +765,6 @@ TEST_F(TlsConfigTest, MinimumValidConfig) { ASSERT_TRUE(decode_result.name.has_value()); EXPECT_EQ(*decode_result.name, "foo"); auto& resource = static_cast(**decode_result.resource); - EXPECT_EQ(resource.cluster_type, resource.EDS); - EXPECT_EQ(resource.eds_service_name, ""); - EXPECT_EQ(resource.lb_policy, "ROUND_ROBIN"); EXPECT_EQ(resource.common_tls_context.certificate_validation_context .ca_certificate_provider_instance.instance_name, "provider1"); @@ -907,9 +907,6 @@ TEST_F(LrsTest, Valid) { ASSERT_TRUE(decode_result.name.has_value()); EXPECT_EQ(*decode_result.name, "foo"); auto& resource = static_cast(**decode_result.resource); - EXPECT_EQ(resource.cluster_type, resource.EDS); - EXPECT_EQ(resource.eds_service_name, ""); - EXPECT_EQ(resource.lb_policy, "ROUND_ROBIN"); ASSERT_TRUE(resource.lrs_load_reporting_server.has_value()); EXPECT_EQ(*resource.lrs_load_reporting_server, xds_client_->bootstrap().server()); diff --git a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc index 8e35e7b15ed..1776453d95b 100644 --- a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc @@ -429,8 +429,9 @@ TEST_P(EdsTest, OneLocalityWithNoEndpoints) { balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); // RPCs should fail. constexpr char kErrorMessage[] = - "empty address list: EDS resource eds_service_name contains empty " - "localities: \\[\\{region=\"xds_default_locality_region\", " + "no children in weighted_target policy: " + "EDS resource eds_service_name contains empty localities: " + "\\[\\{region=\"xds_default_locality_region\", " "zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]"; CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage); // Send EDS resource that has an endpoint. diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index de0608926f3..dc8b6f46e06 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1113,11 +1113,13 @@ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/client_channel/lb_policy/subchannel_list.h \ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ -src/core/ext/filters/client_channel/lb_policy/xds/xds.h \ +src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.cc \ +src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.h \ src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc \ +src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc \ src/core/ext/filters/client_channel/local_subchannel_pool.cc \ src/core/ext/filters/client_channel/local_subchannel_pool.h \ src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 5b77ca1172d..9ab56454471 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -929,11 +929,13 @@ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/client_channel/lb_policy/subchannel_list.h \ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ -src/core/ext/filters/client_channel/lb_policy/xds/xds.h \ +src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.cc \ +src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.h \ src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc \ +src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc \ src/core/ext/filters/client_channel/local_subchannel_pool.cc \ src/core/ext/filters/client_channel/local_subchannel_pool.h \ src/core/ext/filters/client_channel/resolver/README.md \