From 4f8ed98f3e57c1e80ecdbc023038b834be4ae882 Mon Sep 17 00:00:00 2001 From: Donna Dionne Date: Mon, 14 Dec 2020 16:05:15 -0800 Subject: [PATCH] Xds cluster resolver Implemented new xds cluster resolver config Separated policy and discovery mechanism --- BUILD | 6 +- BUILD.gn | 2 +- CMakeLists.txt | 2 +- Makefile | 4 +- build_autogenerated.yaml | 2 +- config.m4 | 2 +- config.w32 | 2 +- doc/environment_variables.md | 2 +- gRPC-Core.podspec | 2 +- grpc.gemspec | 2 +- grpc.gyp | 2 +- package.xml | 2 +- .../client_channel/lb_policy/xds/cds.cc | 25 +- .../client_channel/lb_policy/xds/eds.cc | 908 ------------- .../lb_policy/xds/xds_cluster_resolver.cc | 1169 +++++++++++++++++ .../plugin_registry/grpc_plugin_registry.cc | 8 +- src/python/grpcio/grpc_core_dependencies.py | 2 +- .../client_channel/service_config_test.cc | 15 +- test/cpp/end2end/xds_end2end_test.cc | 23 +- tools/doxygen/Doxyfile.c++.internal | 2 +- tools/doxygen/Doxyfile.core.internal | 2 +- .../grpc_xds_bazel_python_test_in_docker.sh | 2 +- .../linux/grpc_xds_bazel_test_in_docker.sh | 2 +- .../linux/grpc_xds_csharp_test_in_docker.sh | 2 +- .../linux/grpc_xds_php_test_in_docker.sh | 2 +- .../linux/grpc_xds_ruby_test_in_docker.sh | 2 +- 26 files changed, 1238 insertions(+), 956 deletions(-) delete mode 100644 src/core/ext/filters/client_channel/lb_policy/xds/eds.cc create mode 100644 src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc diff --git a/BUILD b/BUILD index c4bae380111..d0ea03ab388 100644 --- a/BUILD +++ b/BUILD @@ -323,9 +323,9 @@ grpc_cc_library( "grpc_no_xds": [], "//conditions:default": [ "grpc_lb_policy_cds", - "grpc_lb_policy_eds", "grpc_lb_policy_xds_cluster_impl", "grpc_lb_policy_xds_cluster_manager", + "grpc_lb_policy_xds_cluster_resolver", "grpc_resolver_xds", ], }, @@ -1426,9 +1426,9 @@ grpc_cc_library( ) grpc_cc_library( - name = "grpc_lb_policy_eds", + name = "grpc_lb_policy_xds_cluster_resolver", srcs = [ - "src/core/ext/filters/client_channel/lb_policy/xds/eds.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc", ], external_deps = [ "absl/strings", diff --git a/BUILD.gn b/BUILD.gn index 441fc2019d3..26816b10f7f 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -256,10 +256,10 @@ config("grpc_config") { "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h", "src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc", "src/core/ext/filters/client_channel/lb_policy/xds/cds.cc", - "src/core/ext/filters/client_channel/lb_policy/xds/eds.cc", "src/core/ext/filters/client_channel/lb_policy/xds/xds.h", "src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc", "src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc", "src/core/ext/filters/client_channel/lb_policy_factory.h", "src/core/ext/filters/client_channel/lb_policy_registry.cc", "src/core/ext/filters/client_channel/lb_policy_registry.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 0bca1ef7479..ffd6a24f801 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1471,9 +1471,9 @@ add_library(grpc src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc src/core/ext/filters/client_channel/lb_policy/xds/cds.cc - src/core/ext/filters/client_channel/lb_policy/xds/eds.cc src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc + src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc src/core/ext/filters/client_channel/lb_policy_registry.cc src/core/ext/filters/client_channel/local_subchannel_pool.cc src/core/ext/filters/client_channel/proxy_mapper_registry.cc diff --git a/Makefile b/Makefile index 52bdb1b9e23..521c90aec90 100644 --- a/Makefile +++ b/Makefile @@ -1060,9 +1060,9 @@ LIBGRPC_SRC = \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ - src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ + src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc \ src/core/ext/filters/client_channel/lb_policy_registry.cc \ src/core/ext/filters/client_channel/local_subchannel_pool.cc \ src/core/ext/filters/client_channel/proxy_mapper_registry.cc \ @@ -2663,9 +2663,9 @@ ifneq ($(OPENSSL_DEP),) # otherwise parallel compilation will fail if a source is compiled first. src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/cds.cc: $(OPENSSL_DEP) -src/core/ext/filters/client_channel/lb_policy/xds/eds.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc: $(OPENSSL_DEP) +src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc: $(OPENSSL_DEP) src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc: $(OPENSSL_DEP) src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.cc: $(OPENSSL_DEP) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 3dd7034abdf..6b136ff38d1 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -891,9 +891,9 @@ libs: - src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc - src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc - src/core/ext/filters/client_channel/lb_policy/xds/cds.cc - - src/core/ext/filters/client_channel/lb_policy/xds/eds.cc - src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc - src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc + - src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc - src/core/ext/filters/client_channel/lb_policy_registry.cc - src/core/ext/filters/client_channel/local_subchannel_pool.cc - src/core/ext/filters/client_channel/proxy_mapper_registry.cc diff --git a/config.m4 b/config.m4 index 4b450dedf16..fd8e803601c 100644 --- a/config.m4 +++ b/config.m4 @@ -66,9 +66,9 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ - src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ + src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc \ src/core/ext/filters/client_channel/lb_policy_registry.cc \ src/core/ext/filters/client_channel/local_subchannel_pool.cc \ src/core/ext/filters/client_channel/proxy_mapper_registry.cc \ diff --git a/config.w32 b/config.w32 index a2cb370c28e..d7b3c36b78b 100644 --- a/config.w32 +++ b/config.w32 @@ -33,9 +33,9 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\client_channel\\lb_policy\\round_robin\\round_robin.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\weighted_target\\weighted_target.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\cds.cc " + - "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\eds.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_impl.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_manager.cc " + + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_resolver.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy_registry.cc " + "src\\core\\ext\\filters\\client_channel\\local_subchannel_pool.cc " + "src\\core\\ext\\filters\\client_channel\\proxy_mapper_registry.cc " + diff --git a/doc/environment_variables.md b/doc/environment_variables.md index 82cb28d0f40..bdb54a06136 100644 --- a/doc/environment_variables.md +++ b/doc/environment_variables.md @@ -57,7 +57,6 @@ some configuration as environment variables that can be set. - compression - traces compression operations - connectivity_state - traces connectivity state changes to channels - cronet - traces state in the cronet transport engine - - eds_lb - traces eds LB policy - executor - traces grpc's internal thread pool ('the executor') - glb - traces the grpclb load balancer - handshaker - traces handshaking state @@ -91,6 +90,7 @@ some configuration as environment variables that can be set. - xds_client - traces xds client - xds_cluster_manager_lb - traces cluster manager LB policy - xds_cluster_impl_lb - traces cluster impl LB policy + - xds_cluster_resolver_lb - traces xds cluster resolver LB policy - xds_resolver - traces xds resolver The following tracers will only run in binaries built in DEBUG mode. This is diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 06d2a5a21ea..1fda7408eeb 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -237,10 +237,10 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.h', 'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc', - 'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds.h', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc', 'src/core/ext/filters/client_channel/lb_policy_factory.h', 'src/core/ext/filters/client_channel/lb_policy_registry.cc', 'src/core/ext/filters/client_channel/lb_policy_registry.h', diff --git a/grpc.gemspec b/grpc.gemspec index e3966d64194..b8a85c3e247 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -152,10 +152,10 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/lb_policy/subchannel_list.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/cds.cc ) - s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/eds.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc ) + s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy_factory.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy_registry.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy_registry.h ) diff --git a/grpc.gyp b/grpc.gyp index 5ed95707fb6..5e2688271e9 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -479,9 +479,9 @@ 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc', - 'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc', 'src/core/ext/filters/client_channel/lb_policy_registry.cc', 'src/core/ext/filters/client_channel/local_subchannel_pool.cc', 'src/core/ext/filters/client_channel/proxy_mapper_registry.cc', diff --git a/package.xml b/package.xml index ab66c008e3c..7ab90dfa3e3 100644 --- a/package.xml +++ b/package.xml @@ -132,10 +132,10 @@ - + diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index 7613406a3d4..8a10db52222 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -330,9 +330,23 @@ void CdsLb::OnClusterChanged(XdsApi::CdsUpdate cluster_data) { return OnError(error); } // Construct config for child policy. - Json::Object child_config = { + Json::Object discovery_mechanism = { {"clusterName", config_->cluster()}, {"max_concurrent_requests", cluster_data.max_concurrent_requests}, + {"type", "EDS"}, + }; + if (!cluster_data.eds_service_name.empty()) { + discovery_mechanism["edsServiceName"] = cluster_data.eds_service_name; + } + if (cluster_data.lrs_load_reporting_server_name.has_value()) { + discovery_mechanism["lrsLoadReportingServerName"] = + cluster_data.lrs_load_reporting_server_name.value(); + } + Json::Object child_config = { + {"discoveryMechanisms", + Json::Array{ + discovery_mechanism, + }}, {"localityPickingPolicy", Json::Array{ Json::Object{ @@ -349,16 +363,9 @@ void CdsLb::OnClusterChanged(XdsApi::CdsUpdate cluster_data) { }, }}, }; - if (!cluster_data.eds_service_name.empty()) { - child_config["edsServiceName"] = cluster_data.eds_service_name; - } - if (cluster_data.lrs_load_reporting_server_name.has_value()) { - child_config["lrsLoadReportingServerName"] = - cluster_data.lrs_load_reporting_server_name.value(); - } Json json = Json::Array{ Json::Object{ - {"eds_experimental", std::move(child_config)}, + {"xds_cluster_resolver_experimental", std::move(child_config)}, }, }; if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc deleted file mode 100644 index c92ee1b3c16..00000000000 --- a/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc +++ /dev/null @@ -1,908 +0,0 @@ -// -// Copyright 2018 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -#include - -#include -#include - -#include "absl/strings/str_cat.h" -#include "absl/types/optional.h" - -#include - -#include "src/core/ext/filters/client_channel/client_channel.h" -#include "src/core/ext/filters/client_channel/lb_policy.h" -#include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h" -#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" -#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h" -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" -#include "src/core/ext/filters/client_channel/lb_policy_registry.h" -#include "src/core/ext/filters/client_channel/server_address.h" -#include "src/core/ext/xds/xds_channel_args.h" -#include "src/core/ext/xds/xds_client.h" -#include "src/core/ext/xds/xds_client_stats.h" -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/gpr/string.h" -#include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/iomgr/work_serializer.h" -#include "src/core/lib/transport/error_utils.h" -#include "src/core/lib/uri/uri_parser.h" - -#define GRPC_EDS_DEFAULT_FALLBACK_TIMEOUT 10000 - -namespace grpc_core { - -TraceFlag grpc_lb_eds_trace(false, "eds_lb"); - -const char* kXdsLocalityNameAttributeKey = "xds_locality_name"; - -namespace { - -constexpr char kEds[] = "eds_experimental"; - -// Config for EDS LB policy. -class EdsLbConfig : public LoadBalancingPolicy::Config { - public: - EdsLbConfig(std::string cluster_name, std::string eds_service_name, - absl::optional lrs_load_reporting_server_name, - Json locality_picking_policy, Json endpoint_picking_policy, - uint32_t max_concurrent_requests) - : cluster_name_(std::move(cluster_name)), - eds_service_name_(std::move(eds_service_name)), - lrs_load_reporting_server_name_( - std::move(lrs_load_reporting_server_name)), - locality_picking_policy_(std::move(locality_picking_policy)), - endpoint_picking_policy_(std::move(endpoint_picking_policy)), - max_concurrent_requests_(max_concurrent_requests) {} - - const char* name() const override { return kEds; } - - const std::string& cluster_name() const { return cluster_name_; } - const std::string& eds_service_name() const { return eds_service_name_; } - const absl::optional& lrs_load_reporting_server_name() const { - return lrs_load_reporting_server_name_; - }; - const Json& locality_picking_policy() const { - return locality_picking_policy_; - } - const Json& endpoint_picking_policy() const { - return endpoint_picking_policy_; - } - const uint32_t max_concurrent_requests() const { - return max_concurrent_requests_; - } - - private: - std::string cluster_name_; - std::string eds_service_name_; - absl::optional lrs_load_reporting_server_name_; - Json locality_picking_policy_; - Json endpoint_picking_policy_; - uint32_t max_concurrent_requests_; -}; - -// EDS LB policy. -class EdsLb : public LoadBalancingPolicy { - public: - EdsLb(RefCountedPtr xds_client, Args args); - - const char* name() const override { return kEds; } - - void UpdateLocked(UpdateArgs args) override; - void ResetBackoffLocked() override; - - private: - class EndpointWatcher : public XdsClient::EndpointWatcherInterface { - public: - explicit EndpointWatcher(RefCountedPtr parent) - : parent_(std::move(parent)) {} - void OnEndpointChanged(XdsApi::EdsUpdate update) override { - new Notifier(parent_, std::move(update)); - } - void OnError(grpc_error* error) override { new Notifier(parent_, error); } - void OnResourceDoesNotExist() override { new Notifier(parent_); } - - private: - class Notifier { - public: - Notifier(RefCountedPtr parent, XdsApi::EdsUpdate update); - Notifier(RefCountedPtr parent, grpc_error* error); - explicit Notifier(RefCountedPtr parent); - - private: - enum Type { kUpdate, kError, kDoesNotExist }; - - static void RunInExecCtx(void* arg, grpc_error* error); - void RunInWorkSerializer(grpc_error* error); - - RefCountedPtr parent_; - grpc_closure closure_; - XdsApi::EdsUpdate update_; - Type type_; - }; - - RefCountedPtr parent_; - }; - - class Helper : public ChannelControlHelper { - public: - explicit Helper(RefCountedPtr eds_policy) - : eds_policy_(std::move(eds_policy)) {} - - ~Helper() override { eds_policy_.reset(DEBUG_LOCATION, "Helper"); } - - RefCountedPtr CreateSubchannel( - ServerAddress address, const grpc_channel_args& args) override; - void UpdateState(grpc_connectivity_state state, const absl::Status& status, - std::unique_ptr picker) override; - // This is a no-op, because we get the addresses from the xds - // client, which is a watch-based API. - void RequestReresolution() override {} - void AddTraceEvent(TraceSeverity severity, - absl::string_view message) override; - - private: - RefCountedPtr eds_policy_; - }; - - ~EdsLb() override; - - void ShutdownLocked() override; - - void OnEndpointChanged(XdsApi::EdsUpdate update); - void OnError(grpc_error* error); - void OnResourceDoesNotExist(); - - void MaybeDestroyChildPolicyLocked(); - - void UpdatePriorityList(XdsApi::EdsUpdate::PriorityList priority_list); - void UpdateChildPolicyLocked(); - OrphanablePtr CreateChildPolicyLocked( - const grpc_channel_args* args); - ServerAddressList CreateChildPolicyAddressesLocked(); - RefCountedPtr CreateChildPolicyConfigLocked(); - grpc_channel_args* CreateChildPolicyArgsLocked( - const grpc_channel_args* args_in); - - // Caller must ensure that config_ is set before calling. - const absl::string_view GetEdsResourceName() const { - if (!is_xds_uri_) return server_name_; - if (!config_->eds_service_name().empty()) { - return config_->eds_service_name(); - } - return config_->cluster_name(); - } - - // Returns a pair containing the cluster and eds_service_name to use - // for LRS load reporting. - // Caller must ensure that config_ is set before calling. - std::pair GetLrsClusterKey() const { - if (!is_xds_uri_) return {server_name_, nullptr}; - return {config_->cluster_name(), config_->eds_service_name()}; - } - - // Server name from target URI. - std::string server_name_; - bool is_xds_uri_; - - // Current channel args and config from the resolver. - const grpc_channel_args* args_ = nullptr; - RefCountedPtr config_; - - // Internal state. - bool shutting_down_ = false; - - // The xds client and endpoint watcher. - RefCountedPtr xds_client_; - // A pointer to the endpoint watcher, to be used when cancelling the watch. - // Note that this is not owned, so this pointer must never be derefernced. - EndpointWatcher* endpoint_watcher_ = nullptr; - // The latest data from the endpoint watcher. - XdsApi::EdsUpdate::PriorityList priority_list_; - // State used to retain child policy names for priority policy. - std::vector priority_child_numbers_; - - RefCountedPtr drop_config_; - - OrphanablePtr child_policy_; -}; - -// -// EdsLb::Helper -// - -RefCountedPtr EdsLb::Helper::CreateSubchannel( - ServerAddress address, const grpc_channel_args& args) { - if (eds_policy_->shutting_down_) return nullptr; - return eds_policy_->channel_control_helper()->CreateSubchannel( - std::move(address), args); -} - -void EdsLb::Helper::UpdateState(grpc_connectivity_state state, - const absl::Status& status, - std::unique_ptr picker) { - if (eds_policy_->shutting_down_ || eds_policy_->child_policy_ == nullptr) { - return; - } - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] child policy updated state=%s (%s) picker=%p", - eds_policy_.get(), ConnectivityStateName(state), - status.ToString().c_str(), picker.get()); - } - eds_policy_->channel_control_helper()->UpdateState(state, status, - std::move(picker)); -} - -void EdsLb::Helper::AddTraceEvent(TraceSeverity severity, - absl::string_view message) { - if (eds_policy_->shutting_down_) return; - eds_policy_->channel_control_helper()->AddTraceEvent(severity, message); -} - -// -// EdsLb::EndpointWatcher::Notifier -// - -EdsLb::EndpointWatcher::Notifier::Notifier(RefCountedPtr parent, - XdsApi::EdsUpdate update) - : parent_(std::move(parent)), update_(std::move(update)), type_(kUpdate) { - GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); - ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); -} - -EdsLb::EndpointWatcher::Notifier::Notifier(RefCountedPtr parent, - grpc_error* error) - : parent_(std::move(parent)), type_(kError) { - GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); - ExecCtx::Run(DEBUG_LOCATION, &closure_, error); -} - -EdsLb::EndpointWatcher::Notifier::Notifier(RefCountedPtr parent) - : parent_(std::move(parent)), type_(kDoesNotExist) { - GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); - ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); -} - -void EdsLb::EndpointWatcher::Notifier::RunInExecCtx(void* arg, - grpc_error* error) { - Notifier* self = static_cast(arg); - GRPC_ERROR_REF(error); - self->parent_->work_serializer()->Run( - [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION); -} - -void EdsLb::EndpointWatcher::Notifier::RunInWorkSerializer(grpc_error* error) { - switch (type_) { - case kUpdate: - parent_->OnEndpointChanged(std::move(update_)); - break; - case kError: - parent_->OnError(error); - break; - case kDoesNotExist: - parent_->OnResourceDoesNotExist(); - break; - }; - delete this; -} - -// -// EdsLb public methods -// - -EdsLb::EdsLb(RefCountedPtr xds_client, Args args) - : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] created -- using xds client %p", this, - xds_client_.get()); - } - // Record server name. - const char* server_uri = - grpc_channel_args_find_string(args.args, GRPC_ARG_SERVER_URI); - GPR_ASSERT(server_uri != nullptr); - absl::StatusOr uri = URI::Parse(server_uri); - GPR_ASSERT(uri.ok() && !uri->path().empty()); - server_name_ = std::string(absl::StripPrefix(uri->path(), "/")); - is_xds_uri_ = uri->scheme() == "xds"; - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] server name from channel (is_xds_uri=%d): %s", - this, is_xds_uri_, server_name_.c_str()); - } - // EDS-only flow. - if (!is_xds_uri_) { - // Setup channelz linkage. - channelz::ChannelNode* parent_channelz_node = - grpc_channel_args_find_pointer( - args.args, GRPC_ARG_CHANNELZ_CHANNEL_NODE); - if (parent_channelz_node != nullptr) { - xds_client_->AddChannelzLinkage(parent_channelz_node); - } - // Couple polling. - grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(), - interested_parties()); - } -} - -EdsLb::~EdsLb() { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] destroying eds LB policy", this); - } -} - -void EdsLb::ShutdownLocked() { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] shutting down", this); - } - shutting_down_ = true; - MaybeDestroyChildPolicyLocked(); - // Cancel watcher. - if (endpoint_watcher_ != nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] cancelling xds watch for %s", this, - std::string(GetEdsResourceName()).c_str()); - } - xds_client_->CancelEndpointDataWatch(GetEdsResourceName(), - endpoint_watcher_); - } - if (!is_xds_uri_) { - // Remove channelz linkage. - channelz::ChannelNode* parent_channelz_node = - grpc_channel_args_find_pointer( - args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE); - if (parent_channelz_node != nullptr) { - xds_client_->RemoveChannelzLinkage(parent_channelz_node); - } - // Decouple polling. - grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(), - interested_parties()); - } - xds_client_.reset(DEBUG_LOCATION, "EdsLb"); - // Destroy channel args. - grpc_channel_args_destroy(args_); - args_ = nullptr; -} - -void EdsLb::MaybeDestroyChildPolicyLocked() { - if (child_policy_ != nullptr) { - grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), - interested_parties()); - child_policy_.reset(); - } -} - -void EdsLb::UpdateLocked(UpdateArgs args) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] Received update", this); - } - const bool is_initial_update = args_ == nullptr; - // Update config. - auto old_config = std::move(config_); - config_ = std::move(args.config); - // Update args. - grpc_channel_args_destroy(args_); - args_ = args.args; - args.args = nullptr; - // Update child policy if needed. - if (child_policy_ != nullptr) UpdateChildPolicyLocked(); - // Create endpoint watcher if needed. - if (is_initial_update) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] starting xds watch for %s", this, - std::string(GetEdsResourceName()).c_str()); - } - auto watcher = absl::make_unique( - Ref(DEBUG_LOCATION, "EndpointWatcher")); - endpoint_watcher_ = watcher.get(); - xds_client_->WatchEndpointData(GetEdsResourceName(), std::move(watcher)); - } -} - -void EdsLb::ResetBackoffLocked() { - // When the XdsClient is instantiated in the resolver instead of in this - // LB policy, this is done via the resolver, so we don't need to do it here. - if (!is_xds_uri_ && xds_client_ != nullptr) xds_client_->ResetBackoff(); - if (child_policy_ != nullptr) { - child_policy_->ResetBackoffLocked(); - } -} - -void EdsLb::OnEndpointChanged(XdsApi::EdsUpdate update) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] Received EDS update from xds client", this); - } - // Update the drop config. - drop_config_ = std::move(update.drop_config); - // If priority list is empty, add a single priority, just so that we - // have a child in which to create the xds_cluster_impl policy. - if (update.priorities.empty()) update.priorities.emplace_back(); - // Update child policy. - UpdatePriorityList(std::move(update.priorities)); -} - -void EdsLb::OnError(grpc_error* error) { - gpr_log(GPR_ERROR, "[edslb %p] xds watcher reported error: %s", this, - grpc_error_string(error)); - // Go into TRANSIENT_FAILURE if we have not yet created the child - // policy (i.e., we have not yet received data from xds). Otherwise, - // we keep running with the data we had previously. - if (child_policy_ == nullptr) { - channel_control_helper()->UpdateState( - GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error), - absl::make_unique(error)); - } else { - GRPC_ERROR_UNREF(error); - } -} - -void EdsLb::OnResourceDoesNotExist() { - gpr_log( - GPR_ERROR, - "[edslb %p] EDS resource does not exist -- reporting TRANSIENT_FAILURE", - this); - grpc_error* error = grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("EDS resource does not exist"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); - channel_control_helper()->UpdateState( - GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error), - absl::make_unique(error)); - MaybeDestroyChildPolicyLocked(); -} - -// -// child policy-related methods -// - -void EdsLb::UpdatePriorityList(XdsApi::EdsUpdate::PriorityList priority_list) { - // Build some maps from locality to child number and the reverse from - // the old data in priority_list_ and priority_child_numbers_. - std::map - locality_child_map; - std::map> child_locality_map; - for (size_t priority = 0; priority < priority_list_.size(); ++priority) { - size_t child_number = priority_child_numbers_[priority]; - const auto& localities = priority_list_[priority].localities; - for (const auto& p : localities) { - XdsLocalityName* locality_name = p.first; - locality_child_map[locality_name] = child_number; - child_locality_map[child_number].insert(locality_name); - } - } - // Construct new list of children. - std::vector priority_child_numbers; - for (size_t priority = 0; priority < priority_list.size(); ++priority) { - const auto& localities = priority_list[priority].localities; - absl::optional child_number; - // If one of the localities in this priority already existed, reuse its - // child number. - for (const auto& p : localities) { - XdsLocalityName* locality_name = p.first; - if (!child_number.has_value()) { - auto it = locality_child_map.find(locality_name); - if (it != locality_child_map.end()) { - child_number = it->second; - locality_child_map.erase(it); - // Remove localities that *used* to be in this child number, so - // that we don't incorrectly reuse this child number for a - // subsequent priority. - for (XdsLocalityName* old_locality : - child_locality_map[*child_number]) { - locality_child_map.erase(old_locality); - } - } - } else { - // Remove all localities that are now in this child number, so - // that we don't accidentally reuse this child number for a - // subsequent priority. - locality_child_map.erase(locality_name); - } - } - // If we didn't find an existing child number, assign a new one. - if (!child_number.has_value()) { - for (child_number = 0; - child_locality_map.find(*child_number) != child_locality_map.end(); - ++(*child_number)) { - } - // Add entry so we know that the child number is in use. - // (Don't need to add the list of localities, since we won't use them.) - child_locality_map[*child_number]; - } - priority_child_numbers.push_back(*child_number); - } - // Save update. - priority_list_ = std::move(priority_list); - priority_child_numbers_ = std::move(priority_child_numbers); - // Update child policy. - UpdateChildPolicyLocked(); -} - -ServerAddressList EdsLb::CreateChildPolicyAddressesLocked() { - ServerAddressList addresses; - for (size_t priority = 0; priority < priority_list_.size(); ++priority) { - const auto& localities = priority_list_[priority].localities; - std::string priority_child_name = - absl::StrCat("child", priority_child_numbers_[priority]); - for (const auto& p : localities) { - const auto& locality_name = p.first; - const auto& locality = p.second; - std::vector hierarchical_path = { - priority_child_name, locality_name->AsHumanReadableString()}; - for (const auto& endpoint : locality.endpoints) { - addresses.emplace_back( - endpoint - .WithAttribute(kHierarchicalPathAttributeKey, - MakeHierarchicalPathAttribute(hierarchical_path)) - .WithAttribute(kXdsLocalityNameAttributeKey, - absl::make_unique( - locality_name->Ref()))); - } - } - } - return addresses; -} - -RefCountedPtr -EdsLb::CreateChildPolicyConfigLocked() { - const auto lrs_key = GetLrsClusterKey(); - Json::Object priority_children; - Json::Array priority_priorities; - for (size_t priority = 0; priority < priority_list_.size(); ++priority) { - const auto& localities = priority_list_[priority].localities; - Json::Object weighted_targets; - for (const auto& p : localities) { - XdsLocalityName* locality_name = p.first; - const auto& locality = p.second; - // Construct JSON object containing locality name. - Json::Object locality_name_json; - if (!locality_name->region().empty()) { - locality_name_json["region"] = locality_name->region(); - } - if (!locality_name->zone().empty()) { - locality_name_json["zone"] = locality_name->zone(); - } - if (!locality_name->sub_zone().empty()) { - locality_name_json["subzone"] = locality_name->sub_zone(); - } - // Add weighted target entry. - weighted_targets[locality_name->AsHumanReadableString()] = Json::Object{ - {"weight", locality.lb_weight}, - {"childPolicy", config_->endpoint_picking_policy()}, - }; - } - // Construct locality-picking policy. - // Start with field from our config and add the "targets" field. - Json locality_picking_config = config_->locality_picking_policy(); - Json::Object& config = - *(*locality_picking_config.mutable_array())[0].mutable_object(); - auto it = config.begin(); - GPR_ASSERT(it != config.end()); - (*it->second.mutable_object())["targets"] = std::move(weighted_targets); - // Wrap it in the drop policy. - Json::Array drop_categories; - for (const auto& category : drop_config_->drop_category_list()) { - drop_categories.push_back(Json::Object{ - {"category", category.name}, - {"requests_per_million", category.parts_per_million}, - }); - } - Json::Object xds_cluster_impl_config = { - {"clusterName", std::string(lrs_key.first)}, - {"childPolicy", std::move(locality_picking_config)}, - {"dropCategories", std::move(drop_categories)}, - {"maxConcurrentRequests", config_->max_concurrent_requests()}, - }; - if (!lrs_key.second.empty()) { - xds_cluster_impl_config["edsServiceName"] = std::string(lrs_key.second); - } - if (config_->lrs_load_reporting_server_name().has_value()) { - xds_cluster_impl_config["lrsLoadReportingServerName"] = - config_->lrs_load_reporting_server_name().value(); - } - Json locality_picking_policy = Json::Array{Json::Object{ - {"xds_cluster_impl_experimental", std::move(xds_cluster_impl_config)}, - }}; - // Add priority entry. - const size_t child_number = priority_child_numbers_[priority]; - std::string child_name = absl::StrCat("child", child_number); - priority_priorities.emplace_back(child_name); - priority_children[child_name] = Json::Object{ - {"config", std::move(locality_picking_policy)}, - {"ignore_reresolution_requests", true}, - }; - } - Json json = Json::Array{Json::Object{ - {"priority_experimental", - Json::Object{ - {"children", std::move(priority_children)}, - {"priorities", std::move(priority_priorities)}, - }}, - }}; - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - std::string json_str = json.Dump(/*indent=*/1); - gpr_log(GPR_INFO, "[edslb %p] generated config for child policy: %s", this, - json_str.c_str()); - } - grpc_error* error = GRPC_ERROR_NONE; - RefCountedPtr config = - LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error); - if (error != GRPC_ERROR_NONE) { - // This should never happen, but if it does, we basically have no - // way to fix it, so we put the channel in TRANSIENT_FAILURE. - gpr_log(GPR_ERROR, - "[edslb %p] error parsing generated child policy config -- " - "will put channel in TRANSIENT_FAILURE: %s", - this, grpc_error_string(error)); - error = grpc_error_set_int( - grpc_error_add_child( - GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "eds LB policy: error parsing generated child policy config"), - error), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL); - channel_control_helper()->UpdateState( - GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error), - absl::make_unique(error)); - return nullptr; - } - return config; -} - -void EdsLb::UpdateChildPolicyLocked() { - if (shutting_down_) return; - UpdateArgs update_args; - update_args.config = CreateChildPolicyConfigLocked(); - if (update_args.config == nullptr) return; - update_args.addresses = CreateChildPolicyAddressesLocked(); - update_args.args = CreateChildPolicyArgsLocked(args_); - if (child_policy_ == nullptr) { - child_policy_ = CreateChildPolicyLocked(update_args.args); - } - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] Updating child policy %p", this, - child_policy_.get()); - } - child_policy_->UpdateLocked(std::move(update_args)); -} - -grpc_channel_args* EdsLb::CreateChildPolicyArgsLocked( - const grpc_channel_args* args) { - grpc_arg args_to_add[] = { - // A channel arg indicating if the target is a backend inferred from an - // xds load balancer. - // TODO(roth): This isn't needed with the new fallback design. - // Remove as part of implementing the new fallback functionality. - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER), - 1), - // Inhibit client-side health checking, since the balancer does - // this for us. - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1), - }; - return grpc_channel_args_copy_and_add(args, args_to_add, - GPR_ARRAY_SIZE(args_to_add)); -} - -OrphanablePtr EdsLb::CreateChildPolicyLocked( - const grpc_channel_args* args) { - LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.work_serializer = work_serializer(); - lb_policy_args.args = args; - lb_policy_args.channel_control_helper = - absl::make_unique(Ref(DEBUG_LOCATION, "Helper")); - OrphanablePtr lb_policy = - LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( - "priority_experimental", std::move(lb_policy_args)); - if (GPR_UNLIKELY(lb_policy == nullptr)) { - gpr_log(GPR_ERROR, "[edslb %p] failure creating child policy", this); - return nullptr; - } - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p]: Created new child policy %p", this, - lb_policy.get()); - } - // Add our interested_parties pollset_set to that of the newly created - // child policy. This will make the child policy progress upon activity on - // this policy, which in turn is tied to the application's call. - grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), - interested_parties()); - return lb_policy; -} - -// -// factory -// - -class EdsLbFactory : public LoadBalancingPolicyFactory { - public: - OrphanablePtr CreateLoadBalancingPolicy( - LoadBalancingPolicy::Args args) const override { - grpc_error* error = GRPC_ERROR_NONE; - RefCountedPtr xds_client = XdsClient::GetOrCreate(&error); - if (error != GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, - "cannot get XdsClient to instantiate eds LB policy: %s", - grpc_error_string(error)); - GRPC_ERROR_UNREF(error); - return nullptr; - } - return MakeOrphanable(std::move(xds_client), - std::move(args)); - } - - const char* name() const override { return kEds; } - - RefCountedPtr ParseLoadBalancingConfig( - const Json& json, grpc_error** error) const override { - GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); - if (json.type() == Json::Type::JSON_NULL) { - // eds was mentioned as a policy in the deprecated loadBalancingPolicy - // field or in the client API. - *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:loadBalancingPolicy error:eds policy requires configuration. " - "Please use loadBalancingConfig field of service config instead."); - return nullptr; - } - std::vector error_list; - // EDS service name. - std::string eds_service_name; - auto it = json.object_value().find("edsServiceName"); - if (it != json.object_value().end()) { - if (it->second.type() != Json::Type::STRING) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:edsServiceName error:type should be string")); - } else { - eds_service_name = it->second.string_value(); - } - } - // Cluster name. - std::string cluster_name; - it = json.object_value().find("clusterName"); - if (it == json.object_value().end()) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:clusterName error:required field missing")); - } else if (it->second.type() != Json::Type::STRING) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:clusterName error:type should be string")); - } else { - cluster_name = it->second.string_value(); - } - // LRS load reporting server name. - absl::optional lrs_load_reporting_server_name; - it = json.object_value().find("lrsLoadReportingServerName"); - if (it != json.object_value().end()) { - if (it->second.type() != Json::Type::STRING) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:lrsLoadReportingServerName error:type should be string")); - } else { - lrs_load_reporting_server_name.emplace(it->second.string_value()); - } - } - // Locality-picking policy. - Json locality_picking_policy; - it = json.object_value().find("localityPickingPolicy"); - if (it == json.object_value().end()) { - locality_picking_policy = Json::Array{ - Json::Object{ - {"weighted_target_experimental", - Json::Object{ - {"targets", Json::Object()}, - }}, - }, - }; - } else { - locality_picking_policy = it->second; - } - grpc_error* parse_error = GRPC_ERROR_NONE; - if (LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( - locality_picking_policy, &parse_error) == nullptr) { - GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); - error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "localityPickingPolicy", &parse_error, 1)); - GRPC_ERROR_UNREF(parse_error); - } - // Endpoint-picking policy. Called "childPolicy" for xds policy. - Json endpoint_picking_policy; - it = json.object_value().find("endpointPickingPolicy"); - if (it == json.object_value().end()) { - endpoint_picking_policy = Json::Array{ - Json::Object{ - {"round_robin", Json::Object()}, - }, - }; - } else { - endpoint_picking_policy = it->second; - } - parse_error = GRPC_ERROR_NONE; - if (LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( - endpoint_picking_policy, &parse_error) == nullptr) { - GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); - error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "endpointPickingPolicy", &parse_error, 1)); - GRPC_ERROR_UNREF(parse_error); - } - // Max concurrent requests. - uint32_t max_concurrent_requests = 1024; - it = json.object_value().find("max_concurrent_requests"); - if (it != json.object_value().end()) { - if (it->second.type() != Json::Type::NUMBER) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:max_concurrent_requests error:must be of type number")); - } else { - max_concurrent_requests = - gpr_parse_nonnegative_int(it->second.string_value().c_str()); - } - } - // Construct config. - if (error_list.empty()) { - return MakeRefCounted( - std::move(cluster_name), std::move(eds_service_name), - std::move(lrs_load_reporting_server_name), - std::move(locality_picking_policy), - std::move(endpoint_picking_policy), max_concurrent_requests); - } else { - *error = GRPC_ERROR_CREATE_FROM_VECTOR( - "eds_experimental LB policy config", &error_list); - return nullptr; - } - } - - private: - class EdsChildHandler : public ChildPolicyHandler { - public: - EdsChildHandler(RefCountedPtr xds_client, Args args) - : ChildPolicyHandler(std::move(args), &grpc_lb_eds_trace), - xds_client_(std::move(xds_client)) {} - - bool ConfigChangeRequiresNewPolicyInstance( - LoadBalancingPolicy::Config* old_config, - LoadBalancingPolicy::Config* new_config) const override { - GPR_ASSERT(old_config->name() == kEds); - GPR_ASSERT(new_config->name() == kEds); - EdsLbConfig* old_eds_config = static_cast(old_config); - EdsLbConfig* new_eds_config = static_cast(new_config); - return old_eds_config->cluster_name() != new_eds_config->cluster_name() || - old_eds_config->eds_service_name() != - new_eds_config->eds_service_name() || - old_eds_config->lrs_load_reporting_server_name() != - new_eds_config->lrs_load_reporting_server_name(); - } - - OrphanablePtr CreateLoadBalancingPolicy( - const char* name, LoadBalancingPolicy::Args args) const override { - return MakeOrphanable(xds_client_, std::move(args)); - } - - private: - RefCountedPtr xds_client_; - }; -}; - -} // namespace - -} // namespace grpc_core - -// -// Plugin registration -// - -void grpc_lb_policy_eds_init() { - grpc_core::LoadBalancingPolicyRegistry::Builder:: - RegisterLoadBalancingPolicyFactory( - absl::make_unique()); -} - -void grpc_lb_policy_eds_shutdown() {} diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc new file mode 100644 index 00000000000..b817dbf1c18 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc @@ -0,0 +1,1169 @@ +// +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include + +#include +#include + +#include "absl/strings/str_cat.h" +#include "absl/types/optional.h" + +#include + +#include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/ext/filters/client_channel/lb_policy.h" +#include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h" +#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h" +#include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/filters/client_channel/server_address.h" +#include "src/core/ext/xds/xds_channel_args.h" +#include "src/core/ext/xds/xds_client.h" +#include "src/core/ext/xds/xds_client_stats.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/orphanable.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/work_serializer.h" +#include "src/core/lib/transport/error_utils.h" +#include "src/core/lib/uri/uri_parser.h" + +#define GRPC_EDS_DEFAULT_FALLBACK_TIMEOUT 10000 + +namespace grpc_core { + +TraceFlag grpc_lb_xds_cluster_resolver_trace(false, "xds_cluster_resolver_lb"); + +const char* kXdsLocalityNameAttributeKey = "xds_locality_name"; + +namespace { + +constexpr char kXdsClusterResolver[] = "xds_cluster_resolver_experimental"; + +// Config for EDS LB policy. +class XdsClusterResolverLbConfig : public LoadBalancingPolicy::Config { + public: + struct DiscoveryMechanism { + std::string cluster_name; + absl::optional lrs_load_reporting_server_name; + uint32_t max_concurrent_requests; + enum DiscoveryMechanismType { + EDS, + LOGICAL_DNS, + }; + DiscoveryMechanismType type; + std::string eds_service_name; + + bool operator==(const DiscoveryMechanism& other) const { + return (cluster_name == other.cluster_name && + lrs_load_reporting_server_name == + other.lrs_load_reporting_server_name && + max_concurrent_requests == other.max_concurrent_requests && + type == other.type && eds_service_name == other.eds_service_name); + } + }; + + XdsClusterResolverLbConfig( + std::vector discovery_mechanisms, + Json locality_picking_policy, Json endpoint_picking_policy) + : discovery_mechanisms_(std::move(discovery_mechanisms)), + locality_picking_policy_(std::move(locality_picking_policy)), + endpoint_picking_policy_(std::move(endpoint_picking_policy)) {} + + const char* name() const override { return kXdsClusterResolver; } + + const std::vector& discovery_mechanisms() const { + return discovery_mechanisms_; + } + const Json& locality_picking_policy() const { + return locality_picking_policy_; + } + const Json& endpoint_picking_policy() const { + return endpoint_picking_policy_; + } + + private: + std::vector discovery_mechanisms_; + Json locality_picking_policy_; + Json endpoint_picking_policy_; +}; + +// Xds Cluster Resolver LB policy. +class XdsClusterResolverLb : public LoadBalancingPolicy { + public: + XdsClusterResolverLb(RefCountedPtr xds_client, Args args); + + const char* name() const override { return kXdsClusterResolver; } + + void UpdateLocked(UpdateArgs args) override; + void ResetBackoffLocked() override; + + private: + // Discovery Mechanism Base class + // + // Implemented by EDS and LOGICAL_DNS. + // + // Implementations are responsible for calling the LB policy's + // OnEndpointChanged(), OnError(), and OnResourceDoesNotExist() + // methods when the corresponding events occur. + // + // Must implement Orphan() method to cancel the watchers. + class DiscoveryMechanism : public InternallyRefCounted { + public: + DiscoveryMechanism( + RefCountedPtr xds_cluster_resolver_lb, + size_t index) + : parent_(std::move(xds_cluster_resolver_lb)), index_(index) {} + void Orphan() override = 0; + + // Caller must ensure that config_ is set before calling. + const absl::string_view GetXdsClusterResolverResourceName() const { + if (!parent_->is_xds_uri_) return parent_->server_name_; + if (!parent_->config_->discovery_mechanisms()[index_] + .eds_service_name.empty()) { + return parent_->config_->discovery_mechanisms()[index_] + .eds_service_name; + } + return parent_->config_->discovery_mechanisms()[index_].cluster_name; + } + + // Returns a pair containing the cluster and eds_service_name + // to use for LRS load reporting. Caller must ensure that config_ is set + // before calling. + std::pair GetLrsClusterKey() const { + if (!parent_->is_xds_uri_) return {parent_->server_name_, nullptr}; + return { + parent_->config_->discovery_mechanisms()[index_].cluster_name, + parent_->config_->discovery_mechanisms()[index_].eds_service_name}; + } + + protected: + XdsClusterResolverLb* parent() const { return parent_.get(); } + size_t index() const { return index_; } + + private: + RefCountedPtr parent_; + // Stores its own index in the vector of DiscoveryMechanism. + size_t index_; + }; + + class EdsDiscoveryMechanism : public DiscoveryMechanism { + public: + EdsDiscoveryMechanism( + RefCountedPtr xds_cluster_resolver_lb, + size_t index); + void Orphan() override; + + private: + class EndpointWatcher : public XdsClient::EndpointWatcherInterface { + public: + explicit EndpointWatcher( + RefCountedPtr discovery_mechanism) + : discovery_mechanism_(std::move(discovery_mechanism)) {} + ~EndpointWatcher() override { + discovery_mechanism_.reset(DEBUG_LOCATION, "EndpointWatcher"); + } + void OnEndpointChanged(XdsApi::EdsUpdate update) override { + new Notifier(discovery_mechanism_, std::move(update)); + } + void OnError(grpc_error* error) override { + new Notifier(discovery_mechanism_, error); + } + void OnResourceDoesNotExist() override { + new Notifier(discovery_mechanism_); + } + + private: + class Notifier { + public: + Notifier(RefCountedPtr discovery_mechanism, + XdsApi::EdsUpdate update); + Notifier(RefCountedPtr discovery_mechanism, + grpc_error* error); + explicit Notifier( + RefCountedPtr discovery_mechanism); + ~Notifier() { discovery_mechanism_.reset(DEBUG_LOCATION, "Notifier"); } + + private: + enum Type { kUpdate, kError, kDoesNotExist }; + + static void RunInExecCtx(void* arg, grpc_error* error); + void RunInWorkSerializer(grpc_error* error); + + RefCountedPtr discovery_mechanism_; + grpc_closure closure_; + XdsApi::EdsUpdate update_; + Type type_; + }; + + RefCountedPtr discovery_mechanism_; + }; + + // Note that this is not owned, so this pointer must never be dereferenced. + EndpointWatcher* watcher_ = nullptr; + }; + + struct DiscoveryMechanismEntry { + OrphanablePtr discovery_mechanism; + bool first_update_received = false; + // Number of priorities this mechanism has contributed to priority_list_. + // (The sum of this across all discovery mechanisms should always equal + // the number of priorities in priority_list_.) + uint32_t num_priorities = 0; + RefCountedPtr drop_config; + // Populated only when an update has been delivered by the mechanism + // but has not yet been applied to the LB policy's combined priority_list_. + absl::optional pending_priority_list; + }; + + class Helper : public ChannelControlHelper { + public: + explicit Helper( + RefCountedPtr xds_cluster_resolver_policy) + : xds_cluster_resolver_policy_(std::move(xds_cluster_resolver_policy)) { + } + + ~Helper() override { + xds_cluster_resolver_policy_.reset(DEBUG_LOCATION, "Helper"); + } + + RefCountedPtr CreateSubchannel( + ServerAddress address, const grpc_channel_args& args) override; + void UpdateState(grpc_connectivity_state state, const absl::Status& status, + std::unique_ptr picker) override; + // This is a no-op, because we get the addresses from the xds + // client, which is a watch-based API. + void RequestReresolution() override {} + void AddTraceEvent(TraceSeverity severity, + absl::string_view message) override; + + private: + RefCountedPtr xds_cluster_resolver_policy_; + }; + + ~XdsClusterResolverLb() override; + + void ShutdownLocked() override; + + void OnEndpointChanged(size_t index, XdsApi::EdsUpdate update); + void OnError(size_t index, grpc_error* error); + void OnResourceDoesNotExist(size_t index); + + void MaybeDestroyChildPolicyLocked(); + + void UpdatePriorityList(XdsApi::EdsUpdate::PriorityList priority_list); + void UpdateChildPolicyLocked(); + OrphanablePtr CreateChildPolicyLocked( + const grpc_channel_args* args); + ServerAddressList CreateChildPolicyAddressesLocked(); + RefCountedPtr CreateChildPolicyConfigLocked(); + grpc_channel_args* CreateChildPolicyArgsLocked( + const grpc_channel_args* args_in); + + // Server name from target URI. + std::string server_name_; + bool is_xds_uri_; + + // Current channel args and config from the resolver. + const grpc_channel_args* args_ = nullptr; + RefCountedPtr config_; + + // Internal state. + bool shutting_down_ = false; + + // The xds client and endpoint watcher. + RefCountedPtr xds_client_; + + // Vector of discovery mechansism entries in priority order. + std::vector discovery_mechanisms_; + + // The latest data from the endpoint watcher. + XdsApi::EdsUpdate::PriorityList priority_list_; + // State used to retain child policy names for priority policy. + std::vector priority_child_numbers_; + + OrphanablePtr child_policy_; +}; + +// +// XdsClusterResolverLb::Helper +// + +RefCountedPtr +XdsClusterResolverLb::Helper::CreateSubchannel(ServerAddress address, + const grpc_channel_args& args) { + if (xds_cluster_resolver_policy_->shutting_down_) return nullptr; + return xds_cluster_resolver_policy_->channel_control_helper() + ->CreateSubchannel(std::move(address), args); +} + +void XdsClusterResolverLb::Helper::UpdateState( + grpc_connectivity_state state, const absl::Status& status, + std::unique_ptr picker) { + if (xds_cluster_resolver_policy_->shutting_down_ || + xds_cluster_resolver_policy_->child_policy_ == nullptr) { + return; + } + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { + gpr_log(GPR_INFO, + "[xds_cluster_resolver_lb %p] child policy updated state=%s (%s) " + "picker=%p", + xds_cluster_resolver_policy_.get(), ConnectivityStateName(state), + status.ToString().c_str(), picker.get()); + } + xds_cluster_resolver_policy_->channel_control_helper()->UpdateState( + state, status, std::move(picker)); +} + +void XdsClusterResolverLb::Helper::AddTraceEvent(TraceSeverity severity, + absl::string_view message) { + if (xds_cluster_resolver_policy_->shutting_down_) return; + xds_cluster_resolver_policy_->channel_control_helper()->AddTraceEvent( + severity, message); +} + +// +// XdsClusterResolverLb::EdsDiscoveryMechanism +// +XdsClusterResolverLb::EdsDiscoveryMechanism::EdsDiscoveryMechanism( + RefCountedPtr xds_cluster_resolver_lb, size_t index) + : DiscoveryMechanism(std::move(xds_cluster_resolver_lb), index) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { + gpr_log(GPR_INFO, + "[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR + ":%p starting xds watch for %s", + parent(), index, this, + std::string(GetXdsClusterResolverResourceName()).c_str()); + } + auto watcher = absl::make_unique( + Ref(DEBUG_LOCATION, "EdsDiscoveryMechanism")); + watcher_ = watcher.get(); + parent()->xds_client_->WatchEndpointData(GetXdsClusterResolverResourceName(), + std::move(watcher)); +} + +void XdsClusterResolverLb::EdsDiscoveryMechanism::Orphan() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { + gpr_log(GPR_INFO, + "[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR + ":%p cancelling xds watch for %s", + parent(), index(), this, + std::string(GetXdsClusterResolverResourceName()).c_str()); + } + parent()->xds_client_->CancelEndpointDataWatch( + GetXdsClusterResolverResourceName(), watcher_); + Unref(); +} + +// +// XdsClusterResolverLb::EndpointWatcher::Notifier +// + +XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier:: + Notifier(RefCountedPtr + discovery_mechanism, + XdsApi::EdsUpdate update) + : discovery_mechanism_(std::move(discovery_mechanism)), + update_(std::move(update)), + type_(kUpdate) { + GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); + ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); +} + +XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier:: + Notifier(RefCountedPtr + discovery_mechanism, + grpc_error* error) + : discovery_mechanism_(std::move(discovery_mechanism)), type_(kError) { + GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); + ExecCtx::Run(DEBUG_LOCATION, &closure_, error); +} + +XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier:: + Notifier(RefCountedPtr + discovery_mechanism) + : discovery_mechanism_(std::move(discovery_mechanism)), + type_(kDoesNotExist) { + GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); + ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); +} + +void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier:: + RunInExecCtx(void* arg, grpc_error* error) { + Notifier* self = static_cast(arg); + GRPC_ERROR_REF(error); + self->discovery_mechanism_->parent()->work_serializer()->Run( + [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION); +} + +void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier:: + RunInWorkSerializer(grpc_error* error) { + switch (type_) { + case kUpdate: + discovery_mechanism_->parent()->OnEndpointChanged( + discovery_mechanism_->index(), std::move(update_)); + break; + case kError: + discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(), + error); + break; + case kDoesNotExist: + discovery_mechanism_->parent()->OnResourceDoesNotExist( + discovery_mechanism_->index()); + break; + }; + delete this; +} + +// +// XdsClusterResolverLb public methods +// + +XdsClusterResolverLb::XdsClusterResolverLb(RefCountedPtr xds_client, + Args args) + : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { + gpr_log(GPR_INFO, + "[xds_cluster_resolver_lb %p] created -- using xds client %p", this, + xds_client_.get()); + } + // Record server name. + const char* server_uri = + grpc_channel_args_find_string(args.args, GRPC_ARG_SERVER_URI); + GPR_ASSERT(server_uri != nullptr); + absl::StatusOr uri = URI::Parse(server_uri); + GPR_ASSERT(uri.ok() && !uri->path().empty()); + server_name_ = std::string(absl::StripPrefix(uri->path(), "/")); + is_xds_uri_ = uri->scheme() == "xds"; + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { + gpr_log(GPR_INFO, + "[xds_cluster_resolver_lb %p] server name from channel " + "(is_xds_uri=%d): %s", + this, is_xds_uri_, server_name_.c_str()); + } + // EDS-only flow. + if (!is_xds_uri_) { + // Setup channelz linkage. + channelz::ChannelNode* parent_channelz_node = + grpc_channel_args_find_pointer( + args.args, GRPC_ARG_CHANNELZ_CHANNEL_NODE); + if (parent_channelz_node != nullptr) { + xds_client_->AddChannelzLinkage(parent_channelz_node); + } + // Couple polling. + grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(), + interested_parties()); + } +} + +XdsClusterResolverLb::~XdsClusterResolverLb() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { + gpr_log(GPR_INFO, + "[xds_cluster_resolver_lb %p] destroying xds_cluster_resolver LB " + "policy", + this); + } +} + +void XdsClusterResolverLb::ShutdownLocked() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { + gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] shutting down", this); + } + shutting_down_ = true; + MaybeDestroyChildPolicyLocked(); + discovery_mechanisms_.clear(); + if (!is_xds_uri_) { + // Remove channelz linkage. + channelz::ChannelNode* parent_channelz_node = + grpc_channel_args_find_pointer( + args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE); + if (parent_channelz_node != nullptr) { + xds_client_->RemoveChannelzLinkage(parent_channelz_node); + } + // Decouple polling. + grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(), + interested_parties()); + } + xds_client_.reset(DEBUG_LOCATION, "XdsClusterResolverLb"); + // Destroy channel args. + grpc_channel_args_destroy(args_); + args_ = nullptr; +} + +void XdsClusterResolverLb::MaybeDestroyChildPolicyLocked() { + if (child_policy_ != nullptr) { + grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), + interested_parties()); + child_policy_.reset(); + } +} + +void XdsClusterResolverLb::UpdateLocked(UpdateArgs args) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { + gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] Received update", this); + } + const bool is_initial_update = args_ == nullptr; + // Update config. + auto old_config = std::move(config_); + config_ = std::move(args.config); + // Update args. + grpc_channel_args_destroy(args_); + args_ = args.args; + args.args = nullptr; + // Update child policy if needed. + if (child_policy_ != nullptr) UpdateChildPolicyLocked(); + // Create endpoint watcher if needed. + if (is_initial_update) { + for (auto config : config_->discovery_mechanisms()) { + // TODO(donnadionne): need to add new types of + // watchers. + DiscoveryMechanismEntry entry; + entry.discovery_mechanism = + grpc_core::MakeOrphanable( + Ref(DEBUG_LOCATION, "EdsDiscoveryMechanism"), + discovery_mechanisms_.size()); + discovery_mechanisms_.push_back(std::move(entry)); + } + } +} + +void XdsClusterResolverLb::ResetBackoffLocked() { + // When the XdsClient is instantiated in the resolver instead of in this + // LB policy, this is done via the resolver, so we don't need to do it here. + if (!is_xds_uri_ && xds_client_ != nullptr) xds_client_->ResetBackoff(); + if (child_policy_ != nullptr) { + child_policy_->ResetBackoffLocked(); + } +} + +void XdsClusterResolverLb::OnEndpointChanged(size_t index, + XdsApi::EdsUpdate update) { + if (shutting_down_) return; + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { + gpr_log(GPR_INFO, + "[xds_cluster_resolver_lb %p] Received update from xds client" + " for discovery mechanism %" PRIuPTR "", + this, index); + } + // We need at least one priority for each discovery mechanism, just so that we + // have a child in which to create the xds_cluster_impl policy. This ensures + // that we properly handle the case of a discovery mechanism dropping 100% of + // calls, the OnError() case, and the OnResourceDoesNotExist() case. + if (update.priorities.empty()) update.priorities.emplace_back(); + discovery_mechanisms_[index].drop_config = std::move(update.drop_config); + discovery_mechanisms_[index].pending_priority_list = + std::move(update.priorities); + discovery_mechanisms_[index].first_update_received = true; + if (!discovery_mechanisms_[0].first_update_received) { + // We have not yet received an update for index 0, so wait until that + // happens to create the child policy. + return; + } + // Construct new priority list. + XdsApi::EdsUpdate::PriorityList priority_list; + size_t priority_index = 0; + for (DiscoveryMechanismEntry& mechanism : discovery_mechanisms_) { + // If the mechanism has a pending update, use that. + // Otherwise, use the priorities that it previously contributed to the + // combined list. + if (mechanism.pending_priority_list.has_value()) { + priority_list.insert(priority_list.end(), + mechanism.pending_priority_list->begin(), + mechanism.pending_priority_list->end()); + priority_index += mechanism.num_priorities; + mechanism.num_priorities = mechanism.pending_priority_list->size(); + mechanism.pending_priority_list.reset(); + } else { + priority_list.insert( + priority_list.end(), priority_list_.begin() + priority_index, + priority_list_.begin() + priority_index + mechanism.num_priorities); + priority_index += mechanism.num_priorities; + } + } + // Update child policy. + UpdatePriorityList(std::move(priority_list)); +} + +void XdsClusterResolverLb::OnError(size_t index, grpc_error* error) { + gpr_log(GPR_ERROR, + "[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR + " xds watcher reported error: %s", + this, index, grpc_error_string(error)); + GRPC_ERROR_UNREF(error); + if (shutting_down_) return; + if (!discovery_mechanisms_[index].first_update_received) { + // Call OnEndpointChanged with an empty update just like + // OnResourceDoesNotExist. + OnEndpointChanged(index, XdsApi::EdsUpdate()); + } +} + +void XdsClusterResolverLb::OnResourceDoesNotExist(size_t index) { + gpr_log(GPR_ERROR, + "[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR + " resource does not exist", + this, index); + if (shutting_down_) return; + // Call OnEndpointChanged with an empty update. + OnEndpointChanged(index, XdsApi::EdsUpdate()); +} + +// +// child policy-related methods +// + +void XdsClusterResolverLb::UpdatePriorityList( + XdsApi::EdsUpdate::PriorityList priority_list) { + // Build some maps from locality to child number and the reverse from + // the old data in priority_list_ and priority_child_numbers_. + std::map + locality_child_map; + std::map> child_locality_map; + for (size_t priority = 0; priority < priority_list_.size(); ++priority) { + size_t child_number = priority_child_numbers_[priority]; + const auto& localities = priority_list_[priority].localities; + for (const auto& p : localities) { + XdsLocalityName* locality_name = p.first; + locality_child_map[locality_name] = child_number; + child_locality_map[child_number].insert(locality_name); + } + } + // Construct new list of children. + std::vector priority_child_numbers; + for (size_t priority = 0; priority < priority_list.size(); ++priority) { + const auto& localities = priority_list[priority].localities; + absl::optional child_number; + // If one of the localities in this priority already existed, reuse its + // child number. + for (const auto& p : localities) { + XdsLocalityName* locality_name = p.first; + if (!child_number.has_value()) { + auto it = locality_child_map.find(locality_name); + if (it != locality_child_map.end()) { + child_number = it->second; + locality_child_map.erase(it); + // Remove localities that *used* to be in this child number, so + // that we don't incorrectly reuse this child number for a + // subsequent priority. + for (XdsLocalityName* old_locality : + child_locality_map[*child_number]) { + locality_child_map.erase(old_locality); + } + } + } else { + // Remove all localities that are now in this child number, so + // that we don't accidentally reuse this child number for a + // subsequent priority. + locality_child_map.erase(locality_name); + } + } + // If we didn't find an existing child number, assign a new one. + if (!child_number.has_value()) { + for (child_number = 0; + child_locality_map.find(*child_number) != child_locality_map.end(); + ++(*child_number)) { + } + // Add entry so we know that the child number is in use. + // (Don't need to add the list of localities, since we won't use them.) + child_locality_map[*child_number]; + } + priority_child_numbers.push_back(*child_number); + } + // Save update. + priority_list_ = std::move(priority_list); + priority_child_numbers_ = std::move(priority_child_numbers); + // Update child policy. + UpdateChildPolicyLocked(); +} + +ServerAddressList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() { + ServerAddressList addresses; + for (size_t priority = 0; priority < priority_list_.size(); ++priority) { + const auto& localities = priority_list_[priority].localities; + std::string priority_child_name = + absl::StrCat("child", priority_child_numbers_[priority]); + for (const auto& p : localities) { + const auto& locality_name = p.first; + const auto& locality = p.second; + std::vector hierarchical_path = { + priority_child_name, locality_name->AsHumanReadableString()}; + for (const auto& endpoint : locality.endpoints) { + addresses.emplace_back( + endpoint + .WithAttribute(kHierarchicalPathAttributeKey, + MakeHierarchicalPathAttribute(hierarchical_path)) + .WithAttribute(kXdsLocalityNameAttributeKey, + absl::make_unique( + locality_name->Ref()))); + } + } + } + return addresses; +} + +RefCountedPtr +XdsClusterResolverLb::CreateChildPolicyConfigLocked() { + Json::Object priority_children; + Json::Array priority_priorities; + // Setting up index to iterate through the discovery mechanisms and keeping + // track the discovery_mechanism each prioirty belongs to. + size_t discovery_index = 0; + // Setting up num_priorities_remaining to track the priorities in each + // discovery_mechanism. + size_t num_priorities_remaining_in_discovery = + discovery_mechanisms_[discovery_index].num_priorities; + for (size_t priority = 0; priority < priority_list_.size(); ++priority) { + // Each prioirty in the priority_list_ should correspond to a priority in a + // discovery mechanism in discovery_mechanisms_ (both in the same order). + // Keeping track of the discovery_mechanism each prioirty belongs to. + if (num_priorities_remaining_in_discovery == 0) { + ++discovery_index; + num_priorities_remaining_in_discovery = + discovery_mechanisms_[discovery_index].num_priorities; + } else { + --num_priorities_remaining_in_discovery; + } + const auto& localities = priority_list_[priority].localities; + Json::Object weighted_targets; + for (const auto& p : localities) { + XdsLocalityName* locality_name = p.first; + const auto& locality = p.second; + // Construct JSON object containing locality name. + Json::Object locality_name_json; + if (!locality_name->region().empty()) { + locality_name_json["region"] = locality_name->region(); + } + if (!locality_name->zone().empty()) { + locality_name_json["zone"] = locality_name->zone(); + } + if (!locality_name->sub_zone().empty()) { + locality_name_json["subzone"] = locality_name->sub_zone(); + } + // Add weighted target entry. + weighted_targets[locality_name->AsHumanReadableString()] = Json::Object{ + {"weight", locality.lb_weight}, + {"childPolicy", config_->endpoint_picking_policy()}, + }; + } + // Construct locality-picking policy. + // Start with field from our config and add the "targets" field. + Json locality_picking_config = config_->locality_picking_policy(); + Json::Object& config = + *(*locality_picking_config.mutable_array())[0].mutable_object(); + auto it = config.begin(); + GPR_ASSERT(it != config.end()); + (*it->second.mutable_object())["targets"] = std::move(weighted_targets); + // Wrap it in the drop policy. + Json::Array drop_categories; + if (discovery_mechanisms_[discovery_index].drop_config != nullptr) { + for (const auto& category : discovery_mechanisms_[discovery_index] + .drop_config->drop_category_list()) { + drop_categories.push_back(Json::Object{ + {"category", category.name}, + {"requests_per_million", category.parts_per_million}, + }); + } + } + const auto lrs_key = discovery_mechanisms_[discovery_index] + .discovery_mechanism->GetLrsClusterKey(); + Json::Object xds_cluster_impl_config = { + {"clusterName", std::string(lrs_key.first)}, + {"childPolicy", std::move(locality_picking_config)}, + {"dropCategories", std::move(drop_categories)}, + {"maxConcurrentRequests", + config_->discovery_mechanisms()[discovery_index] + .max_concurrent_requests}, + }; + if (!lrs_key.second.empty()) { + xds_cluster_impl_config["edsServiceName"] = std::string(lrs_key.second); + } + if (config_->discovery_mechanisms()[discovery_index] + .lrs_load_reporting_server_name.has_value()) { + xds_cluster_impl_config["lrsLoadReportingServerName"] = + config_->discovery_mechanisms()[discovery_index] + .lrs_load_reporting_server_name.value(); + } + Json locality_picking_policy = Json::Array{Json::Object{ + {"xds_cluster_impl_experimental", std::move(xds_cluster_impl_config)}, + }}; + // Add priority entry. + const size_t child_number = priority_child_numbers_[priority]; + std::string child_name = absl::StrCat("child", child_number); + priority_priorities.emplace_back(child_name); + priority_children[child_name] = Json::Object{ + {"config", std::move(locality_picking_policy)}, + {"ignore_reresolution_requests", true}, + }; + } + // There should be matching number of priorities in discovery_mechanisms_ and + // in priority_list_; therefore at the end of looping through all the + // priorities, num_priorities_remaining should be down to 0, and index should + // be the last index in discovery_mechanisms_. + GPR_ASSERT(num_priorities_remaining_in_discovery == 0); + GPR_ASSERT(discovery_index == discovery_mechanisms_.size() - 1); + Json json = Json::Array{Json::Object{ + {"priority_experimental", + Json::Object{ + {"children", std::move(priority_children)}, + {"priorities", std::move(priority_priorities)}, + }}, + }}; + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { + std::string json_str = json.Dump(/*indent=*/1); + gpr_log( + GPR_INFO, + "[xds_cluster_resolver_lb %p] generated config for child policy: %s", + this, json_str.c_str()); + } + grpc_error* error = GRPC_ERROR_NONE; + RefCountedPtr config = + LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error); + if (error != GRPC_ERROR_NONE) { + // This should never happen, but if it does, we basically have no + // way to fix it, so we put the channel in TRANSIENT_FAILURE. + gpr_log(GPR_ERROR, + "[xds_cluster_resolver_lb %p] error parsing generated child policy " + "config -- " + "will put channel in TRANSIENT_FAILURE: %s", + this, grpc_error_string(error)); + error = grpc_error_set_int( + grpc_error_add_child(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "xds_cluster_resolver LB policy: error " + "parsing generated child policy config"), + error), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL); + channel_control_helper()->UpdateState( + GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error), + absl::make_unique(error)); + return nullptr; + } + return config; +} + +void XdsClusterResolverLb::UpdateChildPolicyLocked() { + if (shutting_down_) return; + UpdateArgs update_args; + update_args.config = CreateChildPolicyConfigLocked(); + if (update_args.config == nullptr) return; + update_args.addresses = CreateChildPolicyAddressesLocked(); + update_args.args = CreateChildPolicyArgsLocked(args_); + if (child_policy_ == nullptr) { + child_policy_ = CreateChildPolicyLocked(update_args.args); + } + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { + gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] Updating child policy %p", + this, child_policy_.get()); + } + child_policy_->UpdateLocked(std::move(update_args)); +} + +grpc_channel_args* XdsClusterResolverLb::CreateChildPolicyArgsLocked( + const grpc_channel_args* args) { + grpc_arg args_to_add[] = { + // A channel arg indicating if the target is a backend inferred from an + // xds load balancer. + // TODO(roth): This isn't needed with the new fallback design. + // Remove as part of implementing the new fallback functionality. + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER), + 1), + // Inhibit client-side health checking, since the balancer does + // this for us. + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1), + }; + return grpc_channel_args_copy_and_add(args, args_to_add, + GPR_ARRAY_SIZE(args_to_add)); +} + +OrphanablePtr +XdsClusterResolverLb::CreateChildPolicyLocked(const grpc_channel_args* args) { + LoadBalancingPolicy::Args lb_policy_args; + lb_policy_args.work_serializer = work_serializer(); + lb_policy_args.args = args; + lb_policy_args.channel_control_helper = + absl::make_unique(Ref(DEBUG_LOCATION, "Helper")); + OrphanablePtr lb_policy = + LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( + "priority_experimental", std::move(lb_policy_args)); + if (GPR_UNLIKELY(lb_policy == nullptr)) { + gpr_log(GPR_ERROR, + "[xds_cluster_resolver_lb %p] failure creating child policy", this); + return nullptr; + } + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { + gpr_log(GPR_INFO, + "[xds_cluster_resolver_lb %p]: Created new child policy %p", this, + lb_policy.get()); + } + // Add our interested_parties pollset_set to that of the newly created + // child policy. This will make the child policy progress upon activity on + // this policy, which in turn is tied to the application's call. + grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), + interested_parties()); + return lb_policy; +} + +// +// factory +// + +class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory { + public: + OrphanablePtr CreateLoadBalancingPolicy( + LoadBalancingPolicy::Args args) const override { + grpc_error* error = GRPC_ERROR_NONE; + RefCountedPtr xds_client = XdsClient::GetOrCreate(&error); + if (error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, + "cannot get XdsClient to instantiate xds_cluster_resolver LB " + "policy: %s", + grpc_error_string(error)); + GRPC_ERROR_UNREF(error); + return nullptr; + } + return MakeOrphanable(std::move(xds_client), + std::move(args)); + } + + const char* name() const override { return kXdsClusterResolver; } + + RefCountedPtr ParseLoadBalancingConfig( + const Json& json, grpc_error** error) const override { + GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); + if (json.type() == Json::Type::JSON_NULL) { + // xds_cluster_resolver was mentioned as a policy in the deprecated + // loadBalancingPolicy field or in the client API. + *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:loadBalancingPolicy error:xds_cluster_resolver policy " + "requires configuration. " + "Please use loadBalancingConfig field of service config instead."); + return nullptr; + } + std::vector error_list; + std::vector + discovery_mechanisms; + auto it = json.object_value().find("discoveryMechanisms"); + if (it == json.object_value().end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:discoveryMechanisms error:required field missing")); + } else if (it->second.type() != Json::Type::ARRAY) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:discoveryMechanisms error:type should be array")); + } else { + const Json::Array& array = it->second.array_value(); + for (size_t i = 0; i < array.size(); ++i) { + XdsClusterResolverLbConfig::DiscoveryMechanism discovery_mechanism; + std::vector discovery_mechanism_errors = + ParseDiscoveryMechanism(array[i], &discovery_mechanism); + if (!discovery_mechanism_errors.empty()) { + grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("field:discovery_mechanism element: ", i, " error") + .c_str()); + for (grpc_error* discovery_mechanism_error : + discovery_mechanism_errors) { + error = grpc_error_add_child(error, discovery_mechanism_error); + } + error_list.push_back(error); + } + discovery_mechanisms.emplace_back(std::move(discovery_mechanism)); + } + } + // Locality-picking policy. + Json locality_picking_policy; + it = json.object_value().find("localityPickingPolicy"); + if (it == json.object_value().end()) { + locality_picking_policy = Json::Array{ + Json::Object{ + {"weighted_target_experimental", + Json::Object{ + {"targets", Json::Object()}, + }}, + }, + }; + } else { + locality_picking_policy = it->second; + } + grpc_error* parse_error = GRPC_ERROR_NONE; + if (LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( + locality_picking_policy, &parse_error) == nullptr) { + GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); + error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "localityPickingPolicy", &parse_error, 1)); + GRPC_ERROR_UNREF(parse_error); + } + // Endpoint-picking policy. Called "childPolicy" for xds policy. + Json endpoint_picking_policy; + it = json.object_value().find("endpointPickingPolicy"); + if (it == json.object_value().end()) { + endpoint_picking_policy = Json::Array{ + Json::Object{ + {"round_robin", Json::Object()}, + }, + }; + } else { + endpoint_picking_policy = it->second; + } + parse_error = GRPC_ERROR_NONE; + if (LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( + endpoint_picking_policy, &parse_error) == nullptr) { + GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); + error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "endpointPickingPolicy", &parse_error, 1)); + GRPC_ERROR_UNREF(parse_error); + } + if (discovery_mechanisms.empty()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:discovery_mechanism error:list is missing or empty")); + } + // Construct config. + if (error_list.empty()) { + return MakeRefCounted( + std::move(discovery_mechanisms), std::move(locality_picking_policy), + std::move(endpoint_picking_policy)); + } else { + *error = GRPC_ERROR_CREATE_FROM_VECTOR( + "xds_cluster_resolver_experimental LB policy config", &error_list); + return nullptr; + } + } + + private: + static std::vector ParseDiscoveryMechanism( + const Json& json, + XdsClusterResolverLbConfig::DiscoveryMechanism* discovery_mechanism) { + std::vector error_list; + if (json.type() != Json::Type::OBJECT) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "value should be of type object")); + return error_list; + } + // Cluster name. + auto it = json.object_value().find("clusterName"); + if (it == json.object_value().end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:clusterName error:required field missing")); + } else if (it->second.type() != Json::Type::STRING) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:clusterName error:type should be string")); + } else { + discovery_mechanism->cluster_name = it->second.string_value(); + } + // LRS load reporting server name. + it = json.object_value().find("lrsLoadReportingServerName"); + if (it != json.object_value().end()) { + if (it->second.type() != Json::Type::STRING) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:lrsLoadReportingServerName error:type should be string")); + } else { + discovery_mechanism->lrs_load_reporting_server_name.emplace( + it->second.string_value()); + } + } + // Max concurrent requests. + discovery_mechanism->max_concurrent_requests = 1024; + it = json.object_value().find("max_concurrent_requests"); + if (it != json.object_value().end()) { + if (it->second.type() != Json::Type::NUMBER) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:max_concurrent_requests error:must be of type number")); + } else { + discovery_mechanism->max_concurrent_requests = + gpr_parse_nonnegative_int(it->second.string_value().c_str()); + } + } + // Discovery Mechanism type + it = json.object_value().find("type"); + if (it == json.object_value().end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:type error:required field missing")); + } else if (it->second.type() != Json::Type::STRING) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:type error:type should be string")); + } else { + if (it->second.string_value() == "EDS") { + discovery_mechanism->type = XdsClusterResolverLbConfig:: + DiscoveryMechanism::DiscoveryMechanismType::EDS; + } else if (it->second.string_value() == "LOGICAL_DNS") { + discovery_mechanism->type = XdsClusterResolverLbConfig:: + DiscoveryMechanism::DiscoveryMechanismType::LOGICAL_DNS; + } else { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:type error:invalid type")); + } + } + // EDS service name. + it = json.object_value().find("edsServiceName"); + if (it != json.object_value().end()) { + if (it->second.type() != Json::Type::STRING) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:xds_cluster_resolverServiceName error:type should be " + "string")); + } else { + discovery_mechanism->eds_service_name = it->second.string_value(); + } + } + return error_list; + } + + class XdsClusterResolverChildHandler : public ChildPolicyHandler { + public: + XdsClusterResolverChildHandler(RefCountedPtr xds_client, + Args args) + : ChildPolicyHandler(std::move(args), + &grpc_lb_xds_cluster_resolver_trace), + xds_client_(std::move(xds_client)) {} + + bool ConfigChangeRequiresNewPolicyInstance( + LoadBalancingPolicy::Config* old_config, + LoadBalancingPolicy::Config* new_config) const override { + GPR_ASSERT(old_config->name() == kXdsClusterResolver); + GPR_ASSERT(new_config->name() == kXdsClusterResolver); + XdsClusterResolverLbConfig* old_xds_cluster_resolver_config = + static_cast(old_config); + XdsClusterResolverLbConfig* new_xds_cluster_resolver_config = + static_cast(new_config); + return old_xds_cluster_resolver_config->discovery_mechanisms() != + new_xds_cluster_resolver_config->discovery_mechanisms(); + } + + OrphanablePtr CreateLoadBalancingPolicy( + const char* name, LoadBalancingPolicy::Args args) const override { + return MakeOrphanable(xds_client_, std::move(args)); + } + + private: + RefCountedPtr xds_client_; + }; +}; + +} // namespace + +} // namespace grpc_core + +// +// Plugin registration +// + +void grpc_lb_policy_xds_cluster_resolver_init() { + grpc_core::LoadBalancingPolicyRegistry::Builder:: + RegisterLoadBalancingPolicyFactory( + absl::make_unique()); +} + +void grpc_lb_policy_xds_cluster_resolver_shutdown() {} diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc index b532c0042cf..ae3993afbbd 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_plugin_registry.cc @@ -74,10 +74,10 @@ void FileWatcherCertificateProviderShutdown(); } // namespace grpc_core void grpc_lb_policy_cds_init(void); void grpc_lb_policy_cds_shutdown(void); -void grpc_lb_policy_eds_init(void); -void grpc_lb_policy_eds_shutdown(void); void grpc_lb_policy_xds_cluster_impl_init(void); void grpc_lb_policy_xds_cluster_impl_shutdown(void); +void grpc_lb_policy_xds_cluster_resolver_init(void); +void grpc_lb_policy_xds_cluster_resolver_shutdown(void); void grpc_lb_policy_xds_cluster_manager_init(void); void grpc_lb_policy_xds_cluster_manager_shutdown(void); void grpc_resolver_xds_init(void); @@ -134,10 +134,10 @@ void grpc_register_built_in_plugins(void) { grpc_core::FileWatcherCertificateProviderShutdown); grpc_register_plugin(grpc_lb_policy_cds_init, grpc_lb_policy_cds_shutdown); - grpc_register_plugin(grpc_lb_policy_eds_init, - grpc_lb_policy_eds_shutdown); grpc_register_plugin(grpc_lb_policy_xds_cluster_impl_init, grpc_lb_policy_xds_cluster_impl_shutdown); + grpc_register_plugin(grpc_lb_policy_xds_cluster_resolver_init, + grpc_lb_policy_xds_cluster_resolver_shutdown); grpc_register_plugin(grpc_lb_policy_xds_cluster_manager_init, grpc_lb_policy_xds_cluster_manager_shutdown); grpc_register_plugin(grpc_resolver_xds_init, diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 253e595e859..d104aeee405 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -42,9 +42,9 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc', - 'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc', 'src/core/ext/filters/client_channel/lb_policy_registry.cc', 'src/core/ext/filters/client_channel/local_subchannel_pool.cc', 'src/core/ext/filters/client_channel/proxy_mapper_registry.cc', diff --git a/test/core/client_channel/service_config_test.cc b/test/core/client_channel/service_config_test.cc index 6df340f5b47..b8449d2bd64 100644 --- a/test/core/client_channel/service_config_test.cc +++ b/test/core/client_channel/service_config_test.cc @@ -518,7 +518,12 @@ TEST_F(ClientChannelParserTest, ValidLoadBalancingConfigXds) { "{\n" " \"loadBalancingConfig\":[\n" " { \"does_not_exist\":{} },\n" - " { \"eds_experimental\":{ \"clusterName\": \"foo\" } }\n" + " { \"xds_cluster_resolver_experimental\":{\n" + " \"discoveryMechanisms\": [\n" + " { \"clusterName\": \"foo\",\n" + " \"type\": \"EDS\"\n" + " } ]\n" + " } }\n" " ]\n" "}"; grpc_error* error = GRPC_ERROR_NONE; @@ -528,7 +533,7 @@ TEST_F(ClientChannelParserTest, ValidLoadBalancingConfigXds) { static_cast( svc_cfg->GetGlobalParsedConfig(0)); auto lb_config = parsed_config->parsed_lb_config(); - EXPECT_STREQ(lb_config->name(), "eds_experimental"); + EXPECT_STREQ(lb_config->name(), "xds_cluster_resolver_experimental"); } TEST_F(ClientChannelParserTest, UnknownLoadBalancingConfig) { @@ -601,7 +606,8 @@ TEST_F(ClientChannelParserTest, UnknownLoadBalancingPolicy) { } TEST_F(ClientChannelParserTest, LoadBalancingPolicyXdsNotAllowed) { - const char* test_json = "{\"loadBalancingPolicy\":\"eds_experimental\"}"; + const char* test_json = + "{\"loadBalancingPolicy\":\"xds_cluster_resolver_experimental\"}"; grpc_error* error = GRPC_ERROR_NONE; auto svc_cfg = ServiceConfig::Create(nullptr, test_json, &error); EXPECT_THAT(grpc_error_string(error), @@ -609,7 +615,8 @@ TEST_F(ClientChannelParserTest, LoadBalancingPolicyXdsNotAllowed) { "Service config parsing error.*referenced_errors.*" "Global Params.*referenced_errors.*" "Client channel global parser.*referenced_errors.*" - "field:loadBalancingPolicy error:eds_experimental requires " + "field:loadBalancingPolicy " + "error:xds_cluster_resolver_experimental requires " "a config. Please use loadBalancingConfig instead.")); GRPC_ERROR_UNREF(error); } diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 9fe4de26932..a98bfb3f8c7 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -138,9 +138,12 @@ constexpr char kDefaultServiceConfig[] = "{\n" " \"loadBalancingConfig\":[\n" " { \"does_not_exist\":{} },\n" - " { \"eds_experimental\":{\n" - " \"clusterName\": \"server.example.com\",\n" - " \"lrsLoadReportingServerName\": \"\"\n" + " { \"xds_cluster_resolver_experimental\":{\n" + " \"discoveryMechanisms\": [\n" + " { \"clusterName\": \"server.example.com\",\n" + " \"type\": \"EDS\",\n" + " \"lrsLoadReportingServerName\": \"\"\n" + " } ]\n" " } }\n" " ]\n" "}"; @@ -148,8 +151,11 @@ constexpr char kDefaultServiceConfigWithoutLoadReporting[] = "{\n" " \"loadBalancingConfig\":[\n" " { \"does_not_exist\":{} },\n" - " { \"eds_experimental\":{\n" - " \"clusterName\": \"server.example.com\"\n" + " { \"xds_cluster_resolver_experimental\":{\n" + " \"discoveryMechanisms\": [\n" + " { \"clusterName\": \"server.example.com\",\n" + " \"type\": \"EDS\"\n" + " } ]\n" " } }\n" " ]\n" "}"; @@ -2280,9 +2286,10 @@ TEST_P(BasicTest, Vanilla) { backends_[i]->backend_service()->request_count()); } // Check LB policy name for the channel. - EXPECT_EQ((GetParam().use_xds_resolver() ? "xds_cluster_manager_experimental" - : "eds_experimental"), - channel_->GetLoadBalancingPolicyName()); + EXPECT_EQ( + (GetParam().use_xds_resolver() ? "xds_cluster_manager_experimental" + : "xds_cluster_resolver_experimental"), + channel_->GetLoadBalancingPolicyName()); } TEST_P(BasicTest, IgnoresUnhealthyEndpoints) { diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 7afde6fe2eb..3d841eabfbd 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1086,10 +1086,10 @@ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/client_channel/lb_policy/subchannel_list.h \ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ -src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds.h \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ +src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc \ src/core/ext/filters/client_channel/lb_policy_factory.h \ src/core/ext/filters/client_channel/lb_policy_registry.cc \ src/core/ext/filters/client_channel/lb_policy_registry.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 0d0f312303f..e1f1f3935ca 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -913,10 +913,10 @@ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/client_channel/lb_policy/subchannel_list.h \ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ -src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds.h \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ +src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc \ src/core/ext/filters/client_channel/lb_policy_factory.h \ src/core/ext/filters/client_channel/lb_policy_registry.cc \ src/core/ext/filters/client_channel/lb_policy_registry.h \ diff --git a/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh index d14ccc73e54..2b364b85041 100755 --- a/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh @@ -62,7 +62,7 @@ bazel build //src/python/grpcio_tests/tests_py3_only/interop:xds_interop_client # Test cases "path_matching" and "header_matching" are not included in "all", # because not all interop clients in all languages support these new tests. -GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \ +GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,xds_cluster_resolver_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \ tools/run_tests/run_xds_tests.py \ --test_case="all,path_matching,header_matching" \ --project_id=grpc-testing \ diff --git a/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh index 18fe553377d..7057adee90f 100755 --- a/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh @@ -65,7 +65,7 @@ bazel build test/cpp/interop:xds_interop_client # # TODO: remove "path_matching" and "header_matching" from --test_case after # they are added into "all". -GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \ +GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,xds_cluster_resolver_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \ tools/run_tests/run_xds_tests.py \ --test_case="all,path_matching,header_matching,circuit_breaking" \ --project_id=grpc-testing \ diff --git a/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh index 36c4c883073..83a539d364e 100755 --- a/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh @@ -65,7 +65,7 @@ python tools/run_tests/run_tests.py -l csharp -c opt --build_only # # TODO(jtattermusch): remove "path_matching" and "header_matching" from # --test_case after they are added into "all". -GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \ +GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,xds_cluster_resolver_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \ tools/run_tests/run_xds_tests.py \ --test_case="all,path_matching,header_matching" \ --project_id=grpc-testing \ diff --git a/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh index bcb72cf7078..125f6a51297 100755 --- a/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh @@ -70,7 +70,7 @@ export CC=/usr/bin/gcc composer install && \ ./bin/generate_proto_php.sh) -GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \ +GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,xds_cluster_resolver_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \ tools/run_tests/run_xds_tests.py \ --test_case="all,path_matching,header_matching" \ --project_id=grpc-testing \ diff --git a/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh index 47ead811f43..dfacac3da90 100644 --- a/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh @@ -60,7 +60,7 @@ touch "$TOOLS_DIR"/src/proto/grpc/health/v1/__init__.py (cd src/ruby && bundle && rake compile) -GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \ +GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,xds_cluster_resolver_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \ tools/run_tests/run_xds_tests.py \ --test_case="all,path_matching,header_matching" \ --project_id=grpc-testing \