From e496705ea3e4d749a5238e9e198a8c134f3c14e1 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 14 Oct 2020 17:46:02 -0700 Subject: [PATCH] Move circuit breaking, EDS drops, and load reporting to xds_cluster_impl policy. --- BUILD | 24 +- BUILD.gn | 2 +- CMakeLists.txt | 2 +- Makefile | 4 +- build_autogenerated.yaml | 2 +- config.m4 | 2 +- config.w32 | 2 +- doc/environment_variables.md | 2 +- gRPC-Core.podspec | 2 +- grpc.gemspec | 2 +- grpc.gyp | 2 +- package.xml | 2 +- .../client_channel/lb_policy/xds/eds.cc | 266 +----------- .../client_channel/lb_policy/xds/xds.h | 70 +++- .../xds/{eds_drop.cc => xds_cluster_impl.cc} | 382 +++++++++++++----- .../plugin_registry/grpc_plugin_registry.cc | 8 +- src/python/grpcio/grpc_core_dependencies.py | 2 +- tools/doxygen/Doxyfile.c++.internal | 2 +- tools/doxygen/Doxyfile.core.internal | 2 +- .../grpc_xds_bazel_python_test_in_docker.sh | 2 +- .../linux/grpc_xds_bazel_test_in_docker.sh | 2 +- .../linux/grpc_xds_csharp_test_in_docker.sh | 2 +- .../linux/grpc_xds_php_test_in_docker.sh | 2 +- .../linux/grpc_xds_ruby_test_in_docker.sh | 2 +- 24 files changed, 383 insertions(+), 407 deletions(-) rename src/core/ext/filters/client_channel/lb_policy/xds/{eds_drop.cc => xds_cluster_impl.cc} (52%) diff --git a/BUILD b/BUILD index 9dd669e735b..a86bc53e096 100644 --- a/BUILD +++ b/BUILD @@ -323,7 +323,7 @@ grpc_cc_library( "//conditions:default": [ "grpc_lb_policy_cds", "grpc_lb_policy_eds", - "grpc_lb_policy_eds_drop", + "grpc_lb_policy_xds_cluster_impl", "grpc_lb_policy_xds_cluster_manager", "grpc_resolver_xds", "grpc_xds_credentials", @@ -1362,14 +1362,24 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "grpc_lb_xds_common", + hdrs = [ + "src/core/ext/filters/client_channel/lb_policy/xds/xds.h", + ], + language = "c++", + deps = [ + "grpc_base", + "grpc_client_channel", + "grpc_xds_client", + ], +) + grpc_cc_library( name = "grpc_lb_policy_eds", srcs = [ "src/core/ext/filters/client_channel/lb_policy/xds/eds.cc", ], - hdrs = [ - "src/core/ext/filters/client_channel/lb_policy/xds/xds.h", - ], external_deps = [ "absl/strings", ], @@ -1378,14 +1388,15 @@ grpc_cc_library( "grpc_base", "grpc_client_channel", "grpc_lb_address_filtering", + "grpc_lb_xds_common", "grpc_xds_client", ], ) grpc_cc_library( - name = "grpc_lb_policy_eds_drop", + name = "grpc_lb_policy_xds_cluster_impl", srcs = [ - "src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc", ], external_deps = [ "absl/strings", @@ -1394,6 +1405,7 @@ grpc_cc_library( deps = [ "grpc_base", "grpc_client_channel", + "grpc_lb_xds_common", "grpc_xds_client", ], ) diff --git a/BUILD.gn b/BUILD.gn index e30c5e268df..a1e7406649f 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -250,8 +250,8 @@ config("grpc_config") { "src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc", "src/core/ext/filters/client_channel/lb_policy/xds/cds.cc", "src/core/ext/filters/client_channel/lb_policy/xds/eds.cc", - "src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc", "src/core/ext/filters/client_channel/lb_policy/xds/xds.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc", "src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc", "src/core/ext/filters/client_channel/lb_policy_factory.h", "src/core/ext/filters/client_channel/lb_policy_registry.cc", diff --git a/CMakeLists.txt b/CMakeLists.txt index caa1f818190..9be0584b289 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1445,7 +1445,7 @@ add_library(grpc src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc src/core/ext/filters/client_channel/lb_policy/xds/cds.cc src/core/ext/filters/client_channel/lb_policy/xds/eds.cc - src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc + src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc src/core/ext/filters/client_channel/lb_policy_registry.cc src/core/ext/filters/client_channel/local_subchannel_pool.cc diff --git a/Makefile b/Makefile index ae10e17629b..d014cebea8d 100644 --- a/Makefile +++ b/Makefile @@ -1846,7 +1846,7 @@ LIBGRPC_SRC = \ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \ - src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc \ + src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ src/core/ext/filters/client_channel/lb_policy_registry.cc \ src/core/ext/filters/client_channel/local_subchannel_pool.cc \ @@ -4506,7 +4506,7 @@ ifneq ($(OPENSSL_DEP),) src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/cds.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/eds.cc: $(OPENSSL_DEP) -src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc: $(OPENSSL_DEP) +src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc: $(OPENSSL_DEP) src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc: $(OPENSSL_DEP) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index f13ad017dd3..c5ab68755fc 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -804,7 +804,7 @@ libs: - src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc - src/core/ext/filters/client_channel/lb_policy/xds/cds.cc - src/core/ext/filters/client_channel/lb_policy/xds/eds.cc - - src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc + - src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc - src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc - src/core/ext/filters/client_channel/lb_policy_registry.cc - src/core/ext/filters/client_channel/local_subchannel_pool.cc diff --git a/config.m4 b/config.m4 index f41f94ee407..82cfd2ec276 100644 --- a/config.m4 +++ b/config.m4 @@ -67,7 +67,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \ - src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc \ + src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ src/core/ext/filters/client_channel/lb_policy_registry.cc \ src/core/ext/filters/client_channel/local_subchannel_pool.cc \ diff --git a/config.w32 b/config.w32 index 7e7d025c831..645a2e094e6 100644 --- a/config.w32 +++ b/config.w32 @@ -34,7 +34,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\client_channel\\lb_policy\\weighted_target\\weighted_target.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\cds.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\eds.cc " + - "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\eds_drop.cc " + + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_impl.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_manager.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy_registry.cc " + "src\\core\\ext\\filters\\client_channel\\local_subchannel_pool.cc " + diff --git a/doc/environment_variables.md b/doc/environment_variables.md index f93a25156dd..73337826c90 100644 --- a/doc/environment_variables.md +++ b/doc/environment_variables.md @@ -68,7 +68,6 @@ some configuration as environment variables that can be set. - inproc - traces the in-process transport - http_keepalive - traces gRPC keepalive pings - flowctl - traces http2 flow control - - lrs_lb - traces lrs LB policy - op_failure - traces error information when failure is pushed onto a completion queue - pick_first - traces the pick first load balancing policy @@ -91,6 +90,7 @@ some configuration as environment variables that can be set. - weighted_target_lb - traces weighted_target LB policy - xds_client - traces xds client - xds_cluster_manager_lb - traces cluster manager LB policy + - xds_cluster_impl_lb - traces cluster impl LB policy - xds_resolver - traces xds resolver The following tracers will only run in binaries built in DEBUG mode. This is diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index a11224f65dd..f6ee79051ae 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -235,8 +235,8 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc', - 'src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds.h', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc', 'src/core/ext/filters/client_channel/lb_policy_factory.h', 'src/core/ext/filters/client_channel/lb_policy_registry.cc', diff --git a/grpc.gemspec b/grpc.gemspec index 4437badcb0a..5a60d0663c5 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -153,8 +153,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/cds.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/eds.cc ) - s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds.h ) + s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy_factory.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy_registry.cc ) diff --git a/grpc.gyp b/grpc.gyp index 2c66c4637c7..4c1de879221 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -472,7 +472,7 @@ 'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc', - 'src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc', 'src/core/ext/filters/client_channel/lb_policy_registry.cc', 'src/core/ext/filters/client_channel/local_subchannel_pool.cc', diff --git a/package.xml b/package.xml index f9dfce3a06d..298879b2c1c 100644 --- a/package.xml +++ b/package.xml @@ -133,8 +133,8 @@ - + diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc index 6afdce4d5cc..3eeac4ef63e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc @@ -36,11 +36,9 @@ #include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_client_stats.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/lib/uri/uri_parser.h" @@ -51,23 +49,12 @@ namespace grpc_core { TraceFlag grpc_lb_eds_trace(false, "eds_lb"); +const char* kXdsLocalityNameAttributeKey = "xds_locality_name"; + namespace { constexpr char kEds[] = "eds_experimental"; -const char* kXdsLocalityNameAttributeKey = "xds_locality_name"; - -// TODO (donnadionne): Check to see if circuit breaking is enabled, this will be -// removed once circuit breaking feature is fully integrated and enabled by -// default. -bool XdsCircuitBreakingEnabled() { - char* value = gpr_getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"); - bool parsed_value; - bool parse_succeeded = gpr_parse_bool_value(value, &parsed_value); - gpr_free(value); - return parse_succeeded && parsed_value; -} - // Config for EDS LB policy. class EdsLbConfig : public LoadBalancingPolicy::Config { public: @@ -120,49 +107,6 @@ class EdsLb : public LoadBalancingPolicy { void ResetBackoffLocked() override; private: - class XdsLocalityAttribute : public ServerAddress::AttributeInterface { - public: - explicit XdsLocalityAttribute(RefCountedPtr locality_name) - : locality_name_(std::move(locality_name)) {} - - RefCountedPtr locality_name() const { - return locality_name_; - } - - std::unique_ptr Copy() const override { - return absl::make_unique(locality_name_->Ref()); - } - - int Cmp(const AttributeInterface* other) const override { - const auto* other_locality_attr = - static_cast(other); - return locality_name_->Compare(*other_locality_attr->locality_name_); - } - - std::string ToString() const override { - return locality_name_->AsHumanReadableString(); - } - - private: - RefCountedPtr locality_name_; - }; - - class StatsSubchannelWrapper : public DelegatingSubchannel { - public: - StatsSubchannelWrapper( - RefCountedPtr wrapped_subchannel, - RefCountedPtr locality_stats) - : DelegatingSubchannel(std::move(wrapped_subchannel)), - locality_stats_(std::move(locality_stats)) {} - - XdsClusterLocalityStats* locality_stats() const { - return locality_stats_.get(); - } - - private: - RefCountedPtr locality_stats_; - }; - class EndpointWatcher : public XdsClient::EndpointWatcherInterface { public: explicit EndpointWatcher(RefCountedPtr parent) @@ -195,33 +139,6 @@ class EdsLb : public LoadBalancingPolicy { RefCountedPtr parent_; }; - // A simple wrapper for ref-counting a picker from the child policy. - class ChildPickerWrapper : public RefCounted { - public: - explicit ChildPickerWrapper(std::unique_ptr picker) - : picker_(std::move(picker)) {} - PickResult Pick(PickArgs args) { return picker_->Pick(args); } - - private: - std::unique_ptr picker_; - }; - - // A picker that handles drops. - class EdsPicker : public SubchannelPicker { - public: - explicit EdsPicker(RefCountedPtr eds_policy); - - PickResult Pick(PickArgs args) override; - - private: - RefCountedPtr eds_policy_; - RefCountedPtr drop_config_; - RefCountedPtr drop_stats_; - RefCountedPtr child_picker_; - bool xds_circuit_breaking_enabled_; - uint32_t max_concurrent_requests_; - }; - class Helper : public ChannelControlHelper { public: explicit Helper(RefCountedPtr eds_policy) @@ -261,7 +178,6 @@ class EdsLb : public LoadBalancingPolicy { RefCountedPtr CreateChildPolicyConfigLocked(); grpc_channel_args* CreateChildPolicyArgsLocked( const grpc_channel_args* args_in); - void MaybeUpdateEdsPickerLocked(); // Caller must ensure that config_ is set before calling. const absl::string_view GetEdsResourceName() const { @@ -302,109 +218,10 @@ class EdsLb : public LoadBalancingPolicy { std::vector priority_child_numbers_; RefCountedPtr drop_config_; - RefCountedPtr drop_stats_; - // Current concurrent number of requests; - Atomic concurrent_requests_{0}; OrphanablePtr child_policy_; - - // The latest state and picker returned from the child policy. - grpc_connectivity_state child_state_; - absl::Status child_status_; - RefCountedPtr child_picker_; }; -// -// EdsLb::EdsPicker -// - -EdsLb::EdsPicker::EdsPicker(RefCountedPtr eds_policy) - : eds_policy_(std::move(eds_policy)), - drop_stats_(eds_policy_->drop_stats_), - child_picker_(eds_policy_->child_picker_), - xds_circuit_breaking_enabled_(XdsCircuitBreakingEnabled()), - max_concurrent_requests_( - eds_policy_->config_->max_concurrent_requests()) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] constructed new drop picker %p", - eds_policy_.get(), this); - } -} - -EdsLb::PickResult EdsLb::EdsPicker::Pick(PickArgs args) { - uint32_t current = eds_policy_->concurrent_requests_.FetchAdd(1); - if (xds_circuit_breaking_enabled_) { - // Check and see if we exceeded the max concurrent requests count. - if (current >= max_concurrent_requests_) { - eds_policy_->concurrent_requests_.FetchSub(1); - if (drop_stats_ != nullptr) { - drop_stats_->AddUncategorizedDrops(); - } - PickResult result; - result.type = PickResult::PICK_COMPLETE; - return result; - } - } - // If we're not dropping the call, we should always have a child picker. - if (child_picker_ == nullptr) { // Should never happen. - PickResult result; - result.type = PickResult::PICK_FAILED; - result.error = - grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "eds drop picker not given any child picker"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL); - eds_policy_->concurrent_requests_.FetchSub(1); - return result; - } - // Not dropping, so delegate to child's picker. - PickResult result = child_picker_->Pick(args); - if (result.type == result.PICK_COMPLETE && result.subchannel != nullptr) { - XdsClusterLocalityStats* locality_stats = nullptr; - if (drop_stats_ != nullptr) { // If load reporting is enabled. - auto* subchannel_wrapper = - static_cast(result.subchannel.get()); - // Handle load reporting. - locality_stats = subchannel_wrapper->locality_stats()->Ref().release(); - // Record a call started. - locality_stats->AddCallStarted(); - // Unwrap subchannel to pass back up the stack. - result.subchannel = subchannel_wrapper->wrapped_subchannel(); - } - // Intercept the recv_trailing_metadata op to record call completion. - EdsLb* eds_policy = static_cast( - eds_policy_->Ref(DEBUG_LOCATION, "DropPickPicker+call").release()); - auto original_recv_trailing_metadata_ready = - result.recv_trailing_metadata_ready; - result.recv_trailing_metadata_ready = - // Note: This callback does not run in either the control plane - // work serializer or in the data plane mutex. - [locality_stats, original_recv_trailing_metadata_ready, eds_policy]( - grpc_error* error, MetadataInterface* metadata, - CallState* call_state) { - // Record call completion for load reporting. - if (locality_stats != nullptr) { - const bool call_failed = error != GRPC_ERROR_NONE; - locality_stats->AddCallFinished(call_failed); - locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call"); - } - // Decrement number of calls in flight. - eds_policy->concurrent_requests_.FetchSub(1); - eds_policy->Unref(DEBUG_LOCATION, "DropPickPicker+call"); - // Invoke the original recv_trailing_metadata_ready callback, if any. - if (original_recv_trailing_metadata_ready != nullptr) { - original_recv_trailing_metadata_ready(error, metadata, call_state); - } - }; - } else { - // TODO(roth): We should ideally also record call failures here in the case - // where a pick fails. This is challenging, because we don't know which - // picks are for wait_for_ready RPCs or how many times we'll return a - // failure for the same wait_for_ready RPC. - eds_policy_->concurrent_requests_.FetchSub(1); - } - return result; -} - // // EdsLb::Helper // @@ -412,27 +229,6 @@ EdsLb::PickResult EdsLb::EdsPicker::Pick(PickArgs args) { RefCountedPtr EdsLb::Helper::CreateSubchannel( ServerAddress address, const grpc_channel_args& args) { if (eds_policy_->shutting_down_) return nullptr; - // If load reporting is enabled, wrap the subchannel such that it - // includes the locality stats object, which will be used by the EdsPicker. - if (eds_policy_->config_->lrs_load_reporting_server_name().has_value()) { - RefCountedPtr locality_name; - auto* attribute = address.GetAttribute(kXdsLocalityNameAttributeKey); - if (attribute != nullptr) { - const auto* locality_attr = - static_cast(attribute); - locality_name = locality_attr->locality_name(); - } - RefCountedPtr locality_stats = - eds_policy_->xds_client_->AddClusterLocalityStats( - *eds_policy_->config_->lrs_load_reporting_server_name(), - eds_policy_->config_->cluster_name(), - eds_policy_->config_->eds_service_name(), std::move(locality_name)); - return MakeRefCounted( - eds_policy_->channel_control_helper()->CreateSubchannel( - std::move(address), args), - std::move(locality_stats)); - } - // Load reporting not enabled, so don't wrap the subchannel. return eds_policy_->channel_control_helper()->CreateSubchannel( std::move(address), args); } @@ -444,19 +240,12 @@ void EdsLb::Helper::UpdateState(grpc_connectivity_state state, return; } if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, - "[edslb %p] child policy updated state=%s (%s) " - "picker=%p", + gpr_log(GPR_INFO, "[edslb %p] child policy updated state=%s (%s) picker=%p", eds_policy_.get(), ConnectivityStateName(state), status.ToString().c_str(), picker.get()); } - // Save the state and picker. - eds_policy_->child_state_ = state; - eds_policy_->child_status_ = status; - eds_policy_->child_picker_ = - MakeRefCounted(std::move(picker)); - // Wrap the picker in a EdsPicker and pass it up. - eds_policy_->MaybeUpdateEdsPickerLocked(); + eds_policy_->channel_control_helper()->UpdateState(state, status, + std::move(picker)); } void EdsLb::Helper::AddTraceEvent(TraceSeverity severity, @@ -561,11 +350,7 @@ void EdsLb::ShutdownLocked() { gpr_log(GPR_INFO, "[edslb %p] shutting down", this); } shutting_down_ = true; - // Drop our ref to the child's picker, in case it's holding a ref to - // the child. - child_picker_.reset(); MaybeDestroyChildPolicyLocked(); - drop_stats_.reset(); // Cancel watcher. if (endpoint_watcher_ != nullptr) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { @@ -613,28 +398,7 @@ void EdsLb::UpdateLocked(UpdateArgs args) { grpc_channel_args_destroy(args_); args_ = args.args; args.args = nullptr; - const bool lrs_server_changed = - is_initial_update || config_->lrs_load_reporting_server_name() != - old_config->lrs_load_reporting_server_name(); - const bool max_concurrent_requests_changed = - is_initial_update || config_->max_concurrent_requests() != - old_config->max_concurrent_requests(); - // Update drop stats for load reporting if needed. - if (lrs_server_changed) { - drop_stats_.reset(); - if (config_->lrs_load_reporting_server_name().has_value()) { - const auto key = GetLrsClusterKey(); - drop_stats_ = xds_client_->AddClusterDropStats( - config_->lrs_load_reporting_server_name().value(), - key.first /*cluster_name*/, key.second /*eds_service_name*/); - } - } - if (lrs_server_changed || max_concurrent_requests_changed) { - MaybeUpdateEdsPickerLocked(); - } // Update child policy if needed. - // Note that this comes after updating drop_stats_, since we want that - // to be used by any new picker we create here. if (child_policy_ != nullptr) UpdateChildPolicyLocked(); // Create endpoint watcher if needed. if (is_initial_update) { @@ -665,7 +429,7 @@ void EdsLb::OnEndpointChanged(XdsApi::EdsUpdate update) { // Update the drop config. drop_config_ = std::move(update.drop_config); // If priority list is empty, add a single priority, just so that we - // have a child in which to create the eds_drop policy. + // have a child in which to create the xds_cluster_impl policy. if (update.priorities.empty()) update.priorities.emplace_back(); // Update child policy. UpdatePriorityList(std::move(update.priorities)); @@ -836,20 +600,21 @@ EdsLb::CreateChildPolicyConfigLocked() { {"requests_per_million", category.parts_per_million}, }); } - Json::Object eds_drop_config = { + Json::Object xds_cluster_impl_config = { {"clusterName", std::string(lrs_key.first)}, {"childPolicy", std::move(locality_picking_config)}, {"dropCategories", std::move(drop_categories)}, + {"maxConcurrentRequests", config_->max_concurrent_requests()}, }; if (!lrs_key.second.empty()) { - eds_drop_config["edsServiceName"] = std::string(lrs_key.second); + xds_cluster_impl_config["edsServiceName"] = std::string(lrs_key.second); } if (config_->lrs_load_reporting_server_name().has_value()) { - eds_drop_config["lrsLoadReportingServerName"] = + xds_cluster_impl_config["lrsLoadReportingServerName"] = config_->lrs_load_reporting_server_name().value(); } Json locality_picking_policy = Json::Array{Json::Object{ - {"eds_drop_experimental", std::move(eds_drop_config)}, + {"xds_cluster_impl_experimental", std::move(xds_cluster_impl_config)}, }}; // Add priority entry. const size_t child_number = priority_child_numbers_[priority]; @@ -957,15 +722,6 @@ OrphanablePtr EdsLb::CreateChildPolicyLocked( return lb_policy; } -void EdsLb::MaybeUpdateEdsPickerLocked() { - // Update only if we have a child picker. - if (child_picker_ != nullptr) { - channel_control_helper()->UpdateState( - child_state_, child_status_, - absl::make_unique(Ref(DEBUG_LOCATION, "EdsPicker"))); - } -} - // // factory // diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds.h index 1de3472a3a0..18d138b7d20 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.h +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.h @@ -1,26 +1,28 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H #include +#include "src/core/ext/filters/client_channel/server_address.h" +#include "src/core/ext/xds/xds_client_stats.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" + /** Channel arg indicating if a target corresponding to the address is a backend * received from a balancer. The type of this arg is an integer and the value is * treated as a bool. */ @@ -29,4 +31,38 @@ #define GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER \ "grpc.address_is_backend_from_xds_load_balancer" +namespace grpc_core { + +// Defined in the EDS policy. +extern const char* kXdsLocalityNameAttributeKey; + +class XdsLocalityAttribute : public ServerAddress::AttributeInterface { + public: + explicit XdsLocalityAttribute(RefCountedPtr locality_name) + : locality_name_(std::move(locality_name)) {} + + RefCountedPtr locality_name() const { + return locality_name_; + } + + std::unique_ptr Copy() const override { + return absl::make_unique(locality_name_->Ref()); + } + + int Cmp(const AttributeInterface* other) const override { + const auto* other_locality_attr = + static_cast(other); + return locality_name_->Compare(*other_locality_attr->locality_name_); + } + + std::string ToString() const override { + return locality_name_->AsHumanReadableString(); + } + + private: + RefCountedPtr locality_name_; +}; + +} // namespace grpc_core + #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H */ diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc similarity index 52% rename from src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc rename to src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc index 41af02a4499..506d4623f97 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc @@ -22,11 +22,13 @@ #include "src/core/ext/filters/client_channel/lb_policy.h" #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_client_stats.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -34,27 +36,41 @@ namespace grpc_core { -TraceFlag grpc_eds_drop_lb_trace(false, "eds_drop_lb"); +TraceFlag grpc_xds_cluster_impl_lb_trace(false, "xds_cluster_impl_lb"); namespace { -constexpr char kEdsDrop[] = "eds_drop_experimental"; +constexpr char kXdsClusterImpl[] = "xds_cluster_impl_experimental"; + +// TODO (donnadionne): Check to see if circuit breaking is enabled, this will be +// removed once circuit breaking feature is fully integrated and enabled by +// default. +bool XdsCircuitBreakingEnabled() { + char* value = gpr_getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"); + bool parsed_value; + bool parse_succeeded = gpr_parse_bool_value(value, &parsed_value); + gpr_free(value); + return parse_succeeded && parsed_value; +} -// Config for EDS drop LB policy. -class EdsDropLbConfig : public LoadBalancingPolicy::Config { +// Config for xDS Cluster Impl LB policy. +class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config { public: - EdsDropLbConfig(RefCountedPtr child_policy, - std::string cluster_name, std::string eds_service_name, - absl::optional lrs_load_reporting_server_name, - RefCountedPtr drop_config) + XdsClusterImplLbConfig( + RefCountedPtr child_policy, + std::string cluster_name, std::string eds_service_name, + absl::optional lrs_load_reporting_server_name, + uint32_t max_concurrent_requests, + RefCountedPtr drop_config) : child_policy_(std::move(child_policy)), cluster_name_(std::move(cluster_name)), eds_service_name_(std::move(eds_service_name)), lrs_load_reporting_server_name_( std::move(lrs_load_reporting_server_name)), + max_concurrent_requests_(max_concurrent_requests), drop_config_(std::move(drop_config)) {} - const char* name() const override { return kEdsDrop; } + const char* name() const override { return kXdsClusterImpl; } RefCountedPtr child_policy() const { return child_policy_; @@ -64,6 +80,9 @@ class EdsDropLbConfig : public LoadBalancingPolicy::Config { const absl::optional& lrs_load_reporting_server_name() const { return lrs_load_reporting_server_name_; }; + const uint32_t max_concurrent_requests() const { + return max_concurrent_requests_; + } RefCountedPtr drop_config() const { return drop_config_; } @@ -73,21 +92,38 @@ class EdsDropLbConfig : public LoadBalancingPolicy::Config { std::string cluster_name_; std::string eds_service_name_; absl::optional lrs_load_reporting_server_name_; + uint32_t max_concurrent_requests_; RefCountedPtr drop_config_; }; -// EDS Drop LB policy. -class EdsDropLb : public LoadBalancingPolicy { +// xDS Cluster Impl LB policy. +class XdsClusterImplLb : public LoadBalancingPolicy { public: - EdsDropLb(RefCountedPtr xds_client, Args args); + XdsClusterImplLb(RefCountedPtr xds_client, Args args); - const char* name() const override { return kEdsDrop; } + const char* name() const override { return kXdsClusterImpl; } void UpdateLocked(UpdateArgs args) override; void ExitIdleLocked() override; void ResetBackoffLocked() override; private: + class StatsSubchannelWrapper : public DelegatingSubchannel { + public: + StatsSubchannelWrapper( + RefCountedPtr wrapped_subchannel, + RefCountedPtr locality_stats) + : DelegatingSubchannel(std::move(wrapped_subchannel)), + locality_stats_(std::move(locality_stats)) {} + + XdsClusterLocalityStats* locality_stats() const { + return locality_stats_.get(); + } + + private: + RefCountedPtr locality_stats_; + }; + // A simple wrapper for ref-counting a picker from the child policy. class RefCountedPicker : public RefCounted { public: @@ -100,16 +136,17 @@ class EdsDropLb : public LoadBalancingPolicy { }; // A picker that wraps the picker from the child to perform drops. - class DropPicker : public SubchannelPicker { + class Picker : public SubchannelPicker { public: - DropPicker(EdsDropLb* eds_drop_lb, RefCountedPtr picker) - : drop_config_(eds_drop_lb->config_->drop_config()), - drop_stats_(eds_drop_lb->drop_stats_), - picker_(std::move(picker)) {} + Picker(RefCountedPtr xds_cluster_impl_lb, + RefCountedPtr picker); PickResult Pick(PickArgs args); private: + RefCountedPtr xds_cluster_impl_lb_; + bool xds_circuit_breaking_enabled_; + uint32_t max_concurrent_requests_; RefCountedPtr drop_config_; RefCountedPtr drop_stats_; RefCountedPtr picker_; @@ -117,10 +154,10 @@ class EdsDropLb : public LoadBalancingPolicy { class Helper : public ChannelControlHelper { public: - explicit Helper(RefCountedPtr eds_drop_policy) - : eds_drop_policy_(std::move(eds_drop_policy)) {} + explicit Helper(RefCountedPtr xds_cluster_impl_policy) + : xds_cluster_impl_policy_(std::move(xds_cluster_impl_policy)) {} - ~Helper() { eds_drop_policy_.reset(DEBUG_LOCATION, "Helper"); } + ~Helper() { xds_cluster_impl_policy_.reset(DEBUG_LOCATION, "Helper"); } RefCountedPtr CreateSubchannel( ServerAddress address, const grpc_channel_args& args) override; @@ -131,10 +168,10 @@ class EdsDropLb : public LoadBalancingPolicy { absl::string_view message) override; private: - RefCountedPtr eds_drop_policy_; + RefCountedPtr xds_cluster_impl_policy_; }; - ~EdsDropLb(); + ~XdsClusterImplLb(); void ShutdownLocked() override; @@ -146,7 +183,10 @@ class EdsDropLb : public LoadBalancingPolicy { void MaybeUpdatePickerLocked(); // Current config from the resolver. - RefCountedPtr config_; + RefCountedPtr config_; + + // Current concurrent number of requests; + Atomic concurrent_requests_{0}; // Internal state. bool shutting_down_ = false; @@ -166,12 +206,28 @@ class EdsDropLb : public LoadBalancingPolicy { }; // -// EdsDropLb::DropPicker +// XdsClusterImplLb::Picker // -LoadBalancingPolicy::PickResult EdsDropLb::DropPicker::Pick( +XdsClusterImplLb::Picker::Picker( + RefCountedPtr xds_cluster_impl_lb, + RefCountedPtr picker) + : xds_cluster_impl_lb_(std::move(xds_cluster_impl_lb)), + xds_circuit_breaking_enabled_(XdsCircuitBreakingEnabled()), + max_concurrent_requests_( + xds_cluster_impl_lb_->config_->max_concurrent_requests()), + drop_config_(xds_cluster_impl_lb_->config_->drop_config()), + drop_stats_(xds_cluster_impl_lb_->drop_stats_), + picker_(std::move(picker)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) { + gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] constructed new picker %p", + xds_cluster_impl_lb_.get(), this); + } +} + +LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick( LoadBalancingPolicy::PickArgs args) { - // Handle drop. + // Handle EDS drops. const std::string* drop_category; if (drop_config_->ShouldDrop(&drop_category)) { if (drop_stats_ != nullptr) drop_stats_->AddCallDropped(*drop_category); @@ -179,41 +235,103 @@ LoadBalancingPolicy::PickResult EdsDropLb::DropPicker::Pick( result.type = PickResult::PICK_COMPLETE; return result; } + // Handle circuit breaking. + uint32_t current = xds_cluster_impl_lb_->concurrent_requests_.FetchAdd(1); + if (xds_circuit_breaking_enabled_) { + // Check and see if we exceeded the max concurrent requests count. + if (current >= max_concurrent_requests_) { + xds_cluster_impl_lb_->concurrent_requests_.FetchSub(1); + if (drop_stats_ != nullptr) drop_stats_->AddUncategorizedDrops(); + PickResult result; + result.type = PickResult::PICK_COMPLETE; + return result; + } + } // If we're not dropping the call, we should always have a child picker. if (picker_ == nullptr) { // Should never happen. PickResult result; result.type = PickResult::PICK_FAILED; - result.error = - grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "eds_drop picker not given any child picker"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL); + result.error = grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "xds_cluster_impl picker not given any child picker"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL); + xds_cluster_impl_lb_->concurrent_requests_.FetchSub(1); return result; } // Not dropping, so delegate to child picker. - return picker_->Pick(args); + PickResult result = picker_->Pick(args); + if (result.type == result.PICK_COMPLETE && result.subchannel != nullptr) { + XdsClusterLocalityStats* locality_stats = nullptr; + if (drop_stats_ != nullptr) { // If load reporting is enabled. + auto* subchannel_wrapper = + static_cast(result.subchannel.get()); + // Handle load reporting. + locality_stats = subchannel_wrapper->locality_stats()->Ref().release(); + // Record a call started. + locality_stats->AddCallStarted(); + // Unwrap subchannel to pass back up the stack. + result.subchannel = subchannel_wrapper->wrapped_subchannel(); + } + // Intercept the recv_trailing_metadata op to record call completion. + XdsClusterImplLb* xds_cluster_impl_lb = static_cast( + xds_cluster_impl_lb_->Ref(DEBUG_LOCATION, "DropPickPicker+call") + .release()); + auto original_recv_trailing_metadata_ready = + result.recv_trailing_metadata_ready; + result.recv_trailing_metadata_ready = + // Note: This callback does not run in either the control plane + // work serializer or in the data plane mutex. + [locality_stats, original_recv_trailing_metadata_ready, + xds_cluster_impl_lb](grpc_error* error, MetadataInterface* metadata, + CallState* call_state) { + // Record call completion for load reporting. + if (locality_stats != nullptr) { + const bool call_failed = error != GRPC_ERROR_NONE; + locality_stats->AddCallFinished(call_failed); + locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call"); + } + // Decrement number of calls in flight. + xds_cluster_impl_lb->concurrent_requests_.FetchSub(1); + xds_cluster_impl_lb->Unref(DEBUG_LOCATION, "DropPickPicker+call"); + // Invoke the original recv_trailing_metadata_ready callback, if any. + if (original_recv_trailing_metadata_ready != nullptr) { + original_recv_trailing_metadata_ready(error, metadata, call_state); + } + }; + } else { + // TODO(roth): We should ideally also record call failures here in the case + // where a pick fails. This is challenging, because we don't know which + // picks are for wait_for_ready RPCs or how many times we'll return a + // failure for the same wait_for_ready RPC. + xds_cluster_impl_lb_->concurrent_requests_.FetchSub(1); + } + return result; } // -// EdsDropLb +// XdsClusterImplLb // -EdsDropLb::EdsDropLb(RefCountedPtr xds_client, Args args) +XdsClusterImplLb::XdsClusterImplLb(RefCountedPtr xds_client, + Args args) : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { - gpr_log(GPR_INFO, "[eds_drop_lb %p] created -- using xds client %p", this, - xds_client_.get()); + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) { + gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] created -- using xds client %p", + this, xds_client_.get()); } } -EdsDropLb::~EdsDropLb() { - if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { - gpr_log(GPR_INFO, "[eds_drop_lb %p] destroying eds_drop LB policy", this); +XdsClusterImplLb::~XdsClusterImplLb() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) { + gpr_log(GPR_INFO, + "[xds_cluster_impl_lb %p] destroying xds_cluster_impl LB policy", + this); } } -void EdsDropLb::ShutdownLocked() { - if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { - gpr_log(GPR_INFO, "[eds_drop_lb %p] shutting down", this); +void XdsClusterImplLb::ShutdownLocked() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) { + gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] shutting down", this); } shutting_down_ = true; // Remove the child policy's interested_parties pollset_set from the @@ -230,35 +348,43 @@ void EdsDropLb::ShutdownLocked() { xds_client_.reset(); } -void EdsDropLb::ExitIdleLocked() { +void XdsClusterImplLb::ExitIdleLocked() { if (child_policy_ != nullptr) child_policy_->ExitIdleLocked(); } -void EdsDropLb::ResetBackoffLocked() { +void XdsClusterImplLb::ResetBackoffLocked() { // The XdsClient will have its backoff reset by the xds resolver, so we // don't need to do it here. if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); } -void EdsDropLb::UpdateLocked(UpdateArgs args) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { - gpr_log(GPR_INFO, "[eds_drop_lb %p] Received update", this); +void XdsClusterImplLb::UpdateLocked(UpdateArgs args) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) { + gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] Received update", this); } // Update config. + const bool is_initial_update = config_ == nullptr; auto old_config = std::move(config_); config_ = std::move(args.config); - // Update load reporting if needed. - if (old_config == nullptr || - config_->lrs_load_reporting_server_name() != - old_config->lrs_load_reporting_server_name() || - config_->cluster_name() != old_config->cluster_name() || - config_->eds_service_name() != old_config->eds_service_name()) { - drop_stats_.reset(); + // On initial update, create drop stats. + if (is_initial_update) { if (config_->lrs_load_reporting_server_name().has_value()) { drop_stats_ = xds_client_->AddClusterDropStats( config_->lrs_load_reporting_server_name().value(), config_->cluster_name(), config_->eds_service_name()); } + } else { + // Cluster name, EDS service name, and LRS server name should never + // change, because the EDS policy above us should be swapped out if + // that happens. + GPR_ASSERT(config_->cluster_name() == old_config->cluster_name()); + GPR_ASSERT(config_->eds_service_name() == old_config->eds_service_name()); + GPR_ASSERT(config_->lrs_load_reporting_server_name() == + old_config->lrs_load_reporting_server_name()); + } + // Update picker if max_concurrent_requests has changed. + if (is_initial_update || config_->max_concurrent_requests() != + old_config->max_concurrent_requests()) { MaybeUpdatePickerLocked(); } // Update child policy. @@ -266,14 +392,16 @@ void EdsDropLb::UpdateLocked(UpdateArgs args) { args.args = nullptr; } -void EdsDropLb::MaybeUpdatePickerLocked() { +void XdsClusterImplLb::MaybeUpdatePickerLocked() { // If we're dropping all calls, report READY, regardless of what (or // whether) the child has reported. if (config_->drop_config() != nullptr && config_->drop_config()->drop_all()) { - auto drop_picker = absl::make_unique(this, picker_); - if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { + auto drop_picker = + absl::make_unique(Ref(DEBUG_LOCATION, "Picker"), picker_); + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) { gpr_log(GPR_INFO, - "[eds_drop_lb %p] updating connectivity (drop all): state=READY " + "[xds_cluster_impl_lb %p] updating connectivity (drop all): " + "state=READY " "picker=%p", this, drop_picker.get()); } @@ -283,10 +411,12 @@ void EdsDropLb::MaybeUpdatePickerLocked() { } // Otherwise, update only if we have a child picker. if (picker_ != nullptr) { - auto drop_picker = absl::make_unique(this, picker_); - if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { + auto drop_picker = + absl::make_unique(Ref(DEBUG_LOCATION, "Picker"), picker_); + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) { gpr_log(GPR_INFO, - "[eds_drop_lb %p] updating connectivity: state=%s status=(%s) " + "[xds_cluster_impl_lb %p] updating connectivity: state=%s " + "status=(%s) " "picker=%p", this, ConnectivityStateName(state_), status_.ToString().c_str(), drop_picker.get()); @@ -296,7 +426,7 @@ void EdsDropLb::MaybeUpdatePickerLocked() { } } -OrphanablePtr EdsDropLb::CreateChildPolicyLocked( +OrphanablePtr XdsClusterImplLb::CreateChildPolicyLocked( const grpc_channel_args* args) { LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.work_serializer = work_serializer(); @@ -305,9 +435,10 @@ OrphanablePtr EdsDropLb::CreateChildPolicyLocked( absl::make_unique(Ref(DEBUG_LOCATION, "Helper")); OrphanablePtr lb_policy = MakeOrphanable(std::move(lb_policy_args), - &grpc_eds_drop_lb_trace); - if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { - gpr_log(GPR_INFO, "[eds_drop_lb %p] Created new child policy handler %p", + &grpc_xds_cluster_impl_lb_trace); + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) { + gpr_log(GPR_INFO, + "[xds_cluster_impl_lb %p] Created new child policy handler %p", this, lb_policy.get()); } // Add our interested_parties pollset_set to that of the newly created @@ -318,8 +449,8 @@ OrphanablePtr EdsDropLb::CreateChildPolicyLocked( return lb_policy; } -void EdsDropLb::UpdateChildPolicyLocked(ServerAddressList addresses, - const grpc_channel_args* args) { +void XdsClusterImplLb::UpdateChildPolicyLocked(ServerAddressList addresses, + const grpc_channel_args* args) { // Create policy if needed. if (child_policy_ == nullptr) { child_policy_ = CreateChildPolicyLocked(args); @@ -330,76 +461,105 @@ void EdsDropLb::UpdateChildPolicyLocked(ServerAddressList addresses, update_args.config = config_->child_policy(); update_args.args = args; // Update the policy. - if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { - gpr_log(GPR_INFO, "[eds_drop_lb %p] Updating child policy handler %p", this, + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) { + gpr_log(GPR_INFO, + "[xds_cluster_impl_lb %p] Updating child policy handler %p", this, child_policy_.get()); } child_policy_->UpdateLocked(std::move(update_args)); } // -// EdsDropLb::Helper +// XdsClusterImplLb::Helper // -RefCountedPtr EdsDropLb::Helper::CreateSubchannel( +RefCountedPtr XdsClusterImplLb::Helper::CreateSubchannel( ServerAddress address, const grpc_channel_args& args) { - if (eds_drop_policy_->shutting_down_) return nullptr; - return eds_drop_policy_->channel_control_helper()->CreateSubchannel( + if (xds_cluster_impl_policy_->shutting_down_) return nullptr; + // If load reporting is enabled, wrap the subchannel such that it + // includes the locality stats object, which will be used by the EdsPicker. + if (xds_cluster_impl_policy_->config_->lrs_load_reporting_server_name() + .has_value()) { + RefCountedPtr locality_name; + auto* attribute = address.GetAttribute(kXdsLocalityNameAttributeKey); + if (attribute != nullptr) { + const auto* locality_attr = + static_cast(attribute); + locality_name = locality_attr->locality_name(); + } + RefCountedPtr locality_stats = + xds_cluster_impl_policy_->xds_client_->AddClusterLocalityStats( + *xds_cluster_impl_policy_->config_ + ->lrs_load_reporting_server_name(), + xds_cluster_impl_policy_->config_->cluster_name(), + xds_cluster_impl_policy_->config_->eds_service_name(), + std::move(locality_name)); + return MakeRefCounted( + xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel( + std::move(address), args), + std::move(locality_stats)); + } + // Load reporting not enabled, so don't wrap the subchannel. + return xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel( std::move(address), args); } -void EdsDropLb::Helper::UpdateState(grpc_connectivity_state state, - const absl::Status& status, - std::unique_ptr picker) { - if (eds_drop_policy_->shutting_down_) return; - if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { +void XdsClusterImplLb::Helper::UpdateState( + grpc_connectivity_state state, const absl::Status& status, + std::unique_ptr picker) { + if (xds_cluster_impl_policy_->shutting_down_) return; + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) { gpr_log(GPR_INFO, - "[eds_drop_lb %p] child connectivity state update: state=%s (%s) " + "[xds_cluster_impl_lb %p] child connectivity state update: " + "state=%s (%s) " "picker=%p", - eds_drop_policy_.get(), ConnectivityStateName(state), + xds_cluster_impl_policy_.get(), ConnectivityStateName(state), status.ToString().c_str(), picker.get()); } // Save the state and picker. - eds_drop_policy_->state_ = state; - eds_drop_policy_->status_ = status; - eds_drop_policy_->picker_ = + xds_cluster_impl_policy_->state_ = state; + xds_cluster_impl_policy_->status_ = status; + xds_cluster_impl_policy_->picker_ = MakeRefCounted(std::move(picker)); // Wrap the picker and return it to the channel. - eds_drop_policy_->MaybeUpdatePickerLocked(); + xds_cluster_impl_policy_->MaybeUpdatePickerLocked(); } -void EdsDropLb::Helper::RequestReresolution() { - if (eds_drop_policy_->shutting_down_) return; - eds_drop_policy_->channel_control_helper()->RequestReresolution(); +void XdsClusterImplLb::Helper::RequestReresolution() { + if (xds_cluster_impl_policy_->shutting_down_) return; + xds_cluster_impl_policy_->channel_control_helper()->RequestReresolution(); } -void EdsDropLb::Helper::AddTraceEvent(TraceSeverity severity, - absl::string_view message) { - if (eds_drop_policy_->shutting_down_) return; - eds_drop_policy_->channel_control_helper()->AddTraceEvent(severity, message); +void XdsClusterImplLb::Helper::AddTraceEvent(TraceSeverity severity, + absl::string_view message) { + if (xds_cluster_impl_policy_->shutting_down_) return; + xds_cluster_impl_policy_->channel_control_helper()->AddTraceEvent(severity, + message); } // // factory // -class EdsDropLbFactory : public LoadBalancingPolicyFactory { +class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory { public: OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { grpc_error* error = GRPC_ERROR_NONE; RefCountedPtr xds_client = XdsClient::GetOrCreate(&error); if (error != GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, - "cannot get XdsClient to instantiate eds_drop LB policy: %s", - grpc_error_string(error)); + gpr_log( + GPR_ERROR, + "cannot get XdsClient to instantiate xds_cluster_impl LB policy: %s", + grpc_error_string(error)); GRPC_ERROR_UNREF(error); return nullptr; } - return MakeOrphanable(std::move(xds_client), std::move(args)); + return MakeOrphanable(std::move(xds_client), + std::move(args)); } - const char* name() const override { return kEdsDrop; } + const char* name() const override { return kXdsClusterImpl; } RefCountedPtr ParseLoadBalancingConfig( const Json& json, grpc_error** error) const override { @@ -408,7 +568,7 @@ class EdsDropLbFactory : public LoadBalancingPolicyFactory { // This policy was configured in the deprecated loadBalancingPolicy // field or in the client API. *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:loadBalancingPolicy error:eds_drop policy requires " + "field:loadBalancingPolicy error:xds_cluster_impl policy requires " "configuration. Please use loadBalancingConfig field of service " "config instead."); return nullptr; @@ -466,6 +626,18 @@ class EdsDropLbFactory : public LoadBalancingPolicyFactory { lrs_load_reporting_server_name = it->second.string_value(); } } + // Max concurrent requests. + uint32_t max_concurrent_requests = 1024; + it = json.object_value().find("maxConcurrentRequests"); + if (it != json.object_value().end()) { + if (it->second.type() != Json::Type::NUMBER) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:max_concurrent_requests error:must be of type number")); + } else { + max_concurrent_requests = + gpr_parse_nonnegative_int(it->second.string_value().c_str()); + } + } // Drop config. auto drop_config = MakeRefCounted(); it = json.object_value().find("dropCategories"); @@ -482,13 +654,13 @@ class EdsDropLbFactory : public LoadBalancingPolicyFactory { } if (!error_list.empty()) { *error = GRPC_ERROR_CREATE_FROM_VECTOR( - "eds_drop_experimental LB policy config", &error_list); + "xds_cluster_impl_experimental LB policy config", &error_list); return nullptr; } - return MakeRefCounted( + return MakeRefCounted( std::move(child_policy), std::move(cluster_name), std::move(eds_service_name), std::move(lrs_load_reporting_server_name), - std::move(drop_config)); + max_concurrent_requests, std::move(drop_config)); } private: @@ -562,10 +734,10 @@ class EdsDropLbFactory : public LoadBalancingPolicyFactory { // Plugin registration // -void grpc_lb_policy_eds_drop_init() { +void grpc_lb_policy_xds_cluster_impl_init() { grpc_core::LoadBalancingPolicyRegistry::Builder:: RegisterLoadBalancingPolicyFactory( - absl::make_unique()); + absl::make_unique()); } -void grpc_lb_policy_eds_drop_shutdown() {} +void grpc_lb_policy_xds_cluster_impl_shutdown() {} diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc index e048760c7f1..55815238fec 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_plugin_registry.cc @@ -72,8 +72,8 @@ void grpc_lb_policy_cds_init(void); void grpc_lb_policy_cds_shutdown(void); void grpc_lb_policy_eds_init(void); void grpc_lb_policy_eds_shutdown(void); -void grpc_lb_policy_eds_drop_init(void); -void grpc_lb_policy_eds_drop_shutdown(void); +void grpc_lb_policy_xds_cluster_impl_init(void); +void grpc_lb_policy_xds_cluster_impl_shutdown(void); void grpc_lb_policy_xds_cluster_manager_init(void); void grpc_lb_policy_xds_cluster_manager_shutdown(void); void grpc_resolver_xds_init(void); @@ -130,8 +130,8 @@ void grpc_register_built_in_plugins(void) { grpc_lb_policy_cds_shutdown); grpc_register_plugin(grpc_lb_policy_eds_init, grpc_lb_policy_eds_shutdown); - grpc_register_plugin(grpc_lb_policy_eds_drop_init, - grpc_lb_policy_eds_drop_shutdown); + grpc_register_plugin(grpc_lb_policy_xds_cluster_impl_init, + grpc_lb_policy_xds_cluster_impl_shutdown); grpc_register_plugin(grpc_lb_policy_xds_cluster_manager_init, grpc_lb_policy_xds_cluster_manager_shutdown); grpc_register_plugin(grpc_resolver_xds_init, diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 35edeb799df..bd2f233ef22 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -43,7 +43,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc', - 'src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc', 'src/core/ext/filters/client_channel/lb_policy_registry.cc', 'src/core/ext/filters/client_channel/local_subchannel_pool.cc', diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 488cfb8b4c8..95285846c04 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1086,8 +1086,8 @@ src/core/ext/filters/client_channel/lb_policy/subchannel_list.h \ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \ -src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds.h \ +src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ src/core/ext/filters/client_channel/lb_policy_factory.h \ src/core/ext/filters/client_channel/lb_policy_registry.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 166c68f0cee..95f54f6a827 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -914,8 +914,8 @@ src/core/ext/filters/client_channel/lb_policy/subchannel_list.h \ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \ -src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds.h \ +src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ src/core/ext/filters/client_channel/lb_policy_factory.h \ src/core/ext/filters/client_channel/lb_policy_registry.cc \ diff --git a/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh index 7ab0898449f..d14ccc73e54 100755 --- a/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh @@ -62,7 +62,7 @@ bazel build //src/python/grpcio_tests/tests_py3_only/interop:xds_interop_client # Test cases "path_matching" and "header_matching" are not included in "all", # because not all interop clients in all languages support these new tests. -GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ +GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \ tools/run_tests/run_xds_tests.py \ --test_case="all,path_matching,header_matching" \ --project_id=grpc-testing \ diff --git a/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh index da306d1b2ae..c14fd7e62f9 100755 --- a/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh @@ -65,7 +65,7 @@ bazel build test/cpp/interop:xds_interop_client # # TODO: remove "path_matching" and "header_matching" from --test_case after # they are added into "all". -GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ +GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \ tools/run_tests/run_xds_tests.py \ --test_case="all,path_matching,header_matching" \ --project_id=grpc-testing \ diff --git a/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh index d043c7f7610..36c4c883073 100755 --- a/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh @@ -65,7 +65,7 @@ python tools/run_tests/run_tests.py -l csharp -c opt --build_only # # TODO(jtattermusch): remove "path_matching" and "header_matching" from # --test_case after they are added into "all". -GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ +GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \ tools/run_tests/run_xds_tests.py \ --test_case="all,path_matching,header_matching" \ --project_id=grpc-testing \ diff --git a/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh index 01f15b2a53f..bcb72cf7078 100755 --- a/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh @@ -70,7 +70,7 @@ export CC=/usr/bin/gcc composer install && \ ./bin/generate_proto_php.sh) -GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ +GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \ tools/run_tests/run_xds_tests.py \ --test_case="all,path_matching,header_matching" \ --project_id=grpc-testing \ diff --git a/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh index 9b84e4983d9..47ead811f43 100644 --- a/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh @@ -60,7 +60,7 @@ touch "$TOOLS_DIR"/src/proto/grpc/health/v1/__init__.py (cd src/ruby && bundle && rake compile) -GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ +GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \ tools/run_tests/run_xds_tests.py \ --test_case="all,path_matching,header_matching" \ --project_id=grpc-testing \