diff --git a/BUILD b/BUILD index 59120423dde..2164228d744 100644 --- a/BUILD +++ b/BUILD @@ -326,6 +326,7 @@ grpc_cc_library( "//conditions:default": [ "grpc_lb_policy_cds", "grpc_lb_policy_eds", + "grpc_lb_policy_eds_drop", "grpc_lb_policy_lrs", "grpc_lb_policy_xds_cluster_manager", "grpc_resolver_xds", @@ -1394,6 +1395,22 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "grpc_lb_policy_eds_drop", + srcs = [ + "src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc", + ], + external_deps = [ + "absl/strings", + ], + language = "c++", + deps = [ + "grpc_base", + "grpc_client_channel", + "grpc_xds_client", + ], +) + grpc_cc_library( name = "grpc_lb_policy_lrs", srcs = [ diff --git a/BUILD.gn b/BUILD.gn index 1cfdab71471..88dd6433c63 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -249,6 +249,7 @@ config("grpc_config") { "src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc", "src/core/ext/filters/client_channel/lb_policy/xds/cds.cc", "src/core/ext/filters/client_channel/lb_policy/xds/eds.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc", "src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc", "src/core/ext/filters/client_channel/lb_policy/xds/xds.h", "src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc", diff --git a/CMakeLists.txt b/CMakeLists.txt index 621e38c7b08..abb6abf1645 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1443,6 +1443,7 @@ add_library(grpc src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc src/core/ext/filters/client_channel/lb_policy/xds/cds.cc src/core/ext/filters/client_channel/lb_policy/xds/eds.cc + src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc src/core/ext/filters/client_channel/lb_policy_registry.cc diff --git a/Makefile b/Makefile index 9bd63ff3004..7849277f6b9 100644 --- a/Makefile +++ b/Makefile @@ -1846,6 +1846,7 @@ LIBGRPC_SRC = \ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \ + src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc \ src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ src/core/ext/filters/client_channel/lb_policy_registry.cc \ @@ -4509,6 +4510,7 @@ ifneq ($(OPENSSL_DEP),) src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/cds.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/eds.cc: $(OPENSSL_DEP) +src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc: $(OPENSSL_DEP) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 270e70aeb49..0ff1bc8564d 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -800,6 +800,7 @@ libs: - src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc - src/core/ext/filters/client_channel/lb_policy/xds/cds.cc - src/core/ext/filters/client_channel/lb_policy/xds/eds.cc + - src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc - src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc - src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc - src/core/ext/filters/client_channel/lb_policy_registry.cc diff --git a/config.m4 b/config.m4 index 5e190a341c9..3fa396803f3 100644 --- a/config.m4 +++ b/config.m4 @@ -67,6 +67,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \ + src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc \ src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ src/core/ext/filters/client_channel/lb_policy_registry.cc \ diff --git a/config.w32 b/config.w32 index ecbaf53b4ec..5129d8e2e9d 100644 --- a/config.w32 +++ b/config.w32 @@ -34,6 +34,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\client_channel\\lb_policy\\weighted_target\\weighted_target.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\cds.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\eds.cc " + + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\eds_drop.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\lrs.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_manager.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy_registry.cc " + diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 7a5f7e0d044..8a7822ee70f 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -235,6 +235,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc', + 'src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds.h', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc', diff --git a/grpc.gemspec b/grpc.gemspec index 9f197a2d645..2b84e0918a6 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -153,6 +153,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/cds.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/eds.cc ) + s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc ) diff --git a/grpc.gyp b/grpc.gyp index 3c1b3a96a09..02574487daa 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -472,6 +472,7 @@ 'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc', + 'src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc', 'src/core/ext/filters/client_channel/lb_policy_registry.cc', diff --git a/package.xml b/package.xml index e76f692138d..f21b7f4b5d1 100644 --- a/package.xml +++ b/package.xml @@ -133,6 +133,7 @@ + diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc index 1d3306c820d..e255d44e07d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc @@ -262,7 +262,6 @@ class EdsLb : public LoadBalancingPolicy { EdsLb::DropPicker::DropPicker(RefCountedPtr eds_policy) : eds_policy_(std::move(eds_policy)), - drop_config_(eds_policy_->drop_config_), drop_stats_(eds_policy_->drop_stats_), child_picker_(eds_policy_->child_picker_), max_concurrent_requests_( @@ -274,14 +273,6 @@ EdsLb::DropPicker::DropPicker(RefCountedPtr eds_policy) } EdsLb::PickResult EdsLb::DropPicker::Pick(PickArgs args) { - // Handle drop. - const std::string* drop_category; - if (drop_config_->ShouldDrop(&drop_category)) { - if (drop_stats_ != nullptr) drop_stats_->AddCallDropped(*drop_category); - PickResult result; - result.type = PickResult::PICK_COMPLETE; - return result; - } // Check and see if we exceeded the max concurrent requests count. uint32_t current = eds_policy_->concurrent_requests_.FetchAdd(1); if (current >= max_concurrent_requests_) { @@ -293,7 +284,7 @@ EdsLb::PickResult EdsLb::DropPicker::Pick(PickArgs args) { result.type = PickResult::PICK_COMPLETE; return result; } - // If we're not dropping all calls, we should always have a child picker. + // If we're not dropping the call, we should always have a child picker. if (child_picker_ == nullptr) { // Should never happen. PickResult result; result.type = PickResult::PICK_FAILED; @@ -564,26 +555,12 @@ void EdsLb::OnEndpointChanged(XdsApi::EdsUpdate update) { gpr_log(GPR_INFO, "[edslb %p] Received EDS update from xds client", this); } // Update the drop config. - const bool drop_config_changed = - drop_config_ == nullptr || *drop_config_ != *update.drop_config; - if (drop_config_changed) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] Updating drop config", this); - } - drop_config_ = std::move(update.drop_config); - MaybeUpdateDropPickerLocked(); - } else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] Drop config unchanged, ignoring", this); - } - // Update priority and locality info. - if (child_policy_ == nullptr || priority_list_ != update.priorities) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] Updating priority list", this); - } - UpdatePriorityList(std::move(update.priorities)); - } else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] Priority list unchanged, ignoring", this); - } + drop_config_ = std::move(update.drop_config); + // If priority list is empty, add a single priority, just so that we + // have a child in which to create the eds_drop policy. + if (update.priorities.empty()) update.priorities.emplace_back(); + // Update child policy. + UpdatePriorityList(std::move(update.priorities)); } void EdsLb::OnError(grpc_error* error) { @@ -705,6 +682,7 @@ ServerAddressList EdsLb::CreateChildPolicyAddressesLocked() { RefCountedPtr EdsLb::CreateChildPolicyConfigLocked() { + const auto lrs_key = GetLrsClusterKey(); Json::Object priority_children; Json::Array priority_priorities; for (size_t priority = 0; priority < priority_list_.size(); ++priority) { @@ -728,16 +706,15 @@ EdsLb::CreateChildPolicyConfigLocked() { // Wrap it in the LRS policy if load reporting is enabled. Json endpoint_picking_policy; if (config_->lrs_load_reporting_server_name().has_value()) { - const auto key = GetLrsClusterKey(); Json::Object lrs_config = { - {"clusterName", std::string(key.first)}, + {"clusterName", std::string(lrs_key.first)}, {"locality", std::move(locality_name_json)}, {"lrsLoadReportingServerName", config_->lrs_load_reporting_server_name().value()}, {"childPolicy", config_->endpoint_picking_policy()}, }; - if (!key.second.empty()) { - lrs_config["edsServiceName"] = std::string(key.second); + if (!lrs_key.second.empty()) { + lrs_config["edsServiceName"] = std::string(lrs_key.second); } endpoint_picking_policy = Json::Array{Json::Object{ {"lrs_experimental", std::move(lrs_config)}, @@ -751,18 +728,43 @@ EdsLb::CreateChildPolicyConfigLocked() { {"childPolicy", std::move(endpoint_picking_policy)}, }; } - // Add priority entry. - const size_t child_number = priority_child_numbers_[priority]; - std::string child_name = absl::StrCat("child", child_number); - priority_priorities.emplace_back(child_name); + // Construct locality-picking policy. + // Start with field from our config and add the "targets" field. Json locality_picking_config = config_->locality_picking_policy(); Json::Object& config = *(*locality_picking_config.mutable_array())[0].mutable_object(); auto it = config.begin(); GPR_ASSERT(it != config.end()); (*it->second.mutable_object())["targets"] = std::move(weighted_targets); + // Wrap it in the drop policy. + Json::Array drop_categories; + for (const auto& category : drop_config_->drop_category_list()) { + drop_categories.push_back(Json::Object{ + {"category", category.name}, + {"requests_per_million", category.parts_per_million}, + }); + } + Json::Object eds_drop_config = { + {"clusterName", std::string(lrs_key.first)}, + {"childPolicy", std::move(locality_picking_config)}, + {"dropCategories", std::move(drop_categories)}, + }; + if (!lrs_key.second.empty()) { + eds_drop_config["edsServiceName"] = std::string(lrs_key.second); + } + if (config_->lrs_load_reporting_server_name().has_value()) { + eds_drop_config["lrsLoadReportingServerName"] = + config_->lrs_load_reporting_server_name().value(); + } + Json locality_picking_policy = Json::Array{Json::Object{ + {"eds_drop_experimental", std::move(eds_drop_config)}, + }}; + // Add priority entry. + const size_t child_number = priority_child_numbers_[priority]; + std::string child_name = absl::StrCat("child", child_number); + priority_priorities.emplace_back(child_name); priority_children[child_name] = Json::Object{ - {"config", std::move(locality_picking_config)}, + {"config", std::move(locality_picking_policy)}, }; } Json json = Json::Array{Json::Object{ @@ -864,14 +866,6 @@ OrphanablePtr EdsLb::CreateChildPolicyLocked( } void EdsLb::MaybeUpdateDropPickerLocked() { - // If we're dropping all calls, report READY, regardless of what (or - // whether) the child has reported. - if (drop_config_ != nullptr && drop_config_->drop_all()) { - channel_control_helper()->UpdateState( - GRPC_CHANNEL_READY, absl::Status(), - absl::make_unique(Ref(DEBUG_LOCATION, "DropPicker"))); - return; - } // Update only if we have a child picker. if (child_picker_ != nullptr) { channel_control_helper()->UpdateState( diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc b/src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc new file mode 100644 index 00000000000..e5c8f4dfaad --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc @@ -0,0 +1,571 @@ +// +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include + +#include "absl/strings/string_view.h" + +#include + +#include "src/core/ext/filters/client_channel/lb_policy.h" +#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" +#include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/xds/xds_client.h" +#include "src/core/ext/xds/xds_client_stats.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/orphanable.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/work_serializer.h" + +namespace grpc_core { + +TraceFlag grpc_eds_drop_lb_trace(false, "eds_drop_lb"); + +namespace { + +constexpr char kEdsDrop[] = "eds_drop_experimental"; + +// Config for EDS drop LB policy. +class EdsDropLbConfig : public LoadBalancingPolicy::Config { + public: + EdsDropLbConfig(RefCountedPtr child_policy, + std::string cluster_name, std::string eds_service_name, + absl::optional lrs_load_reporting_server_name, + RefCountedPtr drop_config) + : child_policy_(std::move(child_policy)), + cluster_name_(std::move(cluster_name)), + eds_service_name_(std::move(eds_service_name)), + lrs_load_reporting_server_name_( + std::move(lrs_load_reporting_server_name)), + drop_config_(std::move(drop_config)) {} + + const char* name() const override { return kEdsDrop; } + + RefCountedPtr child_policy() const { + return child_policy_; + } + const std::string& cluster_name() const { return cluster_name_; } + const std::string& eds_service_name() const { return eds_service_name_; } + const absl::optional& lrs_load_reporting_server_name() const { + return lrs_load_reporting_server_name_; + }; + RefCountedPtr drop_config() const { + return drop_config_; + } + + private: + RefCountedPtr child_policy_; + std::string cluster_name_; + std::string eds_service_name_; + absl::optional lrs_load_reporting_server_name_; + RefCountedPtr drop_config_; +}; + +// EDS Drop LB policy. +class EdsDropLb : public LoadBalancingPolicy { + public: + EdsDropLb(RefCountedPtr xds_client, Args args); + + const char* name() const override { return kEdsDrop; } + + void UpdateLocked(UpdateArgs args) override; + void ExitIdleLocked() override; + void ResetBackoffLocked() override; + + private: + // A simple wrapper for ref-counting a picker from the child policy. + class RefCountedPicker : public RefCounted { + public: + explicit RefCountedPicker(std::unique_ptr picker) + : picker_(std::move(picker)) {} + PickResult Pick(PickArgs args) { return picker_->Pick(args); } + + private: + std::unique_ptr picker_; + }; + + // A picker that wraps the picker from the child to perform drops. + class DropPicker : public SubchannelPicker { + public: + DropPicker(EdsDropLb* eds_drop_lb, RefCountedPtr picker) + : drop_config_(eds_drop_lb->config_->drop_config()), + drop_stats_(eds_drop_lb->drop_stats_), + picker_(std::move(picker)) {} + + PickResult Pick(PickArgs args); + + private: + RefCountedPtr drop_config_; + RefCountedPtr drop_stats_; + RefCountedPtr picker_; + }; + + class Helper : public ChannelControlHelper { + public: + explicit Helper(RefCountedPtr eds_drop_policy) + : eds_drop_policy_(std::move(eds_drop_policy)) {} + + ~Helper() { eds_drop_policy_.reset(DEBUG_LOCATION, "Helper"); } + + RefCountedPtr CreateSubchannel( + ServerAddress address, const grpc_channel_args& args) override; + void UpdateState(grpc_connectivity_state state, const absl::Status& status, + std::unique_ptr picker) override; + void RequestReresolution() override; + void AddTraceEvent(TraceSeverity severity, + absl::string_view message) override; + + private: + RefCountedPtr eds_drop_policy_; + }; + + ~EdsDropLb(); + + void ShutdownLocked() override; + + OrphanablePtr CreateChildPolicyLocked( + const grpc_channel_args* args); + void UpdateChildPolicyLocked(ServerAddressList addresses, + const grpc_channel_args* args); + + void MaybeUpdatePickerLocked(); + + // Current config from the resolver. + RefCountedPtr config_; + + // Internal state. + bool shutting_down_ = false; + + // The xds client. + RefCountedPtr xds_client_; + + // The stats for client-side load reporting. + RefCountedPtr drop_stats_; + + OrphanablePtr child_policy_; + + // Latest state and picker reported by the child policy. + grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE; + absl::Status status_; + RefCountedPtr picker_; +}; + +// +// EdsDropLb::DropPicker +// + +LoadBalancingPolicy::PickResult EdsDropLb::DropPicker::Pick( + LoadBalancingPolicy::PickArgs args) { + // Handle drop. + const std::string* drop_category; + if (drop_config_->ShouldDrop(&drop_category)) { + if (drop_stats_ != nullptr) drop_stats_->AddCallDropped(*drop_category); + PickResult result; + result.type = PickResult::PICK_COMPLETE; + return result; + } + // If we're not dropping the call, we should always have a child picker. + if (picker_ == nullptr) { // Should never happen. + PickResult result; + result.type = PickResult::PICK_FAILED; + result.error = + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "eds_drop picker not given any child picker"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL); + return result; + } + // Not dropping, so delegate to child picker. + return picker_->Pick(args); +} + +// +// EdsDropLb +// + +EdsDropLb::EdsDropLb(RefCountedPtr xds_client, Args args) + : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { + gpr_log(GPR_INFO, "[eds_drop_lb %p] created -- using xds client %p", this, + xds_client_.get()); + } +} + +EdsDropLb::~EdsDropLb() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { + gpr_log(GPR_INFO, "[eds_drop_lb %p] destroying xds LB policy", this); + } +} + +void EdsDropLb::ShutdownLocked() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { + gpr_log(GPR_INFO, "[eds_drop_lb %p] shutting down", this); + } + shutting_down_ = true; + // Remove the child policy's interested_parties pollset_set from the + // xDS policy. + if (child_policy_ != nullptr) { + grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), + interested_parties()); + child_policy_.reset(); + } + // Drop our ref to the child's picker, in case it's holding a ref to + // the child. + picker_.reset(); + drop_stats_.reset(); + xds_client_.reset(); +} + +void EdsDropLb::ExitIdleLocked() { + if (child_policy_ != nullptr) child_policy_->ExitIdleLocked(); +} + +void EdsDropLb::ResetBackoffLocked() { + // The XdsClient will have its backoff reset by the xds resolver, so we + // don't need to do it here. + if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); +} + +void EdsDropLb::UpdateLocked(UpdateArgs args) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { + gpr_log(GPR_INFO, "[eds_drop_lb %p] Received update", this); + } + // Update config. + auto old_config = std::move(config_); + config_ = std::move(args.config); + // Update load reporting if needed. + if (old_config == nullptr || + config_->lrs_load_reporting_server_name() != + old_config->lrs_load_reporting_server_name() || + config_->cluster_name() != old_config->cluster_name() || + config_->eds_service_name() != old_config->eds_service_name()) { + drop_stats_.reset(); + if (config_->lrs_load_reporting_server_name().has_value()) { + drop_stats_ = xds_client_->AddClusterDropStats( + config_->lrs_load_reporting_server_name().value(), + config_->cluster_name(), config_->eds_service_name()); + } + MaybeUpdatePickerLocked(); + } + // Update child policy. + UpdateChildPolicyLocked(std::move(args.addresses), args.args); + args.args = nullptr; +} + +void EdsDropLb::MaybeUpdatePickerLocked() { + // If we're dropping all calls, report READY, regardless of what (or + // whether) the child has reported. + if (config_->drop_config() != nullptr && config_->drop_config()->drop_all()) { + auto drop_picker = absl::make_unique(this, picker_); + if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { + gpr_log(GPR_INFO, + "[eds_drop_lb %p] updating connectivity (drop all): state=READY " + "picker=%p", + this, drop_picker.get()); + } + channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(), + std::move(drop_picker)); + return; + } + // Otherwise, update only if we have a child picker. + if (picker_ != nullptr) { + auto drop_picker = absl::make_unique(this, picker_); + if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { + gpr_log(GPR_INFO, + "[eds_drop_lb %p] updating connectivity: state=%s status=(%s) " + "picker=%p", + this, ConnectivityStateName(state_), status_.ToString().c_str(), + drop_picker.get()); + } + channel_control_helper()->UpdateState(state_, status_, + std::move(drop_picker)); + } +} + +OrphanablePtr EdsDropLb::CreateChildPolicyLocked( + const grpc_channel_args* args) { + LoadBalancingPolicy::Args lb_policy_args; + lb_policy_args.work_serializer = work_serializer(); + lb_policy_args.args = args; + lb_policy_args.channel_control_helper = + absl::make_unique(Ref(DEBUG_LOCATION, "Helper")); + OrphanablePtr lb_policy = + MakeOrphanable(std::move(lb_policy_args), + &grpc_eds_drop_lb_trace); + if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { + gpr_log(GPR_INFO, "[eds_drop_lb %p] Created new child policy handler %p", + this, lb_policy.get()); + } + // Add our interested_parties pollset_set to that of the newly created + // child policy. This will make the child policy progress upon activity on + // this policy, which in turn is tied to the application's call. + grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), + interested_parties()); + return lb_policy; +} + +void EdsDropLb::UpdateChildPolicyLocked(ServerAddressList addresses, + const grpc_channel_args* args) { + // Create policy if needed. + if (child_policy_ == nullptr) { + child_policy_ = CreateChildPolicyLocked(args); + } + // Construct update args. + UpdateArgs update_args; + update_args.addresses = std::move(addresses); + update_args.config = config_->child_policy(); + update_args.args = args; + // Update the policy. + if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { + gpr_log(GPR_INFO, "[eds_drop_lb %p] Updating child policy handler %p", this, + child_policy_.get()); + } + child_policy_->UpdateLocked(std::move(update_args)); +} + +// +// EdsDropLb::Helper +// + +RefCountedPtr EdsDropLb::Helper::CreateSubchannel( + ServerAddress address, const grpc_channel_args& args) { + if (eds_drop_policy_->shutting_down_) return nullptr; + return eds_drop_policy_->channel_control_helper()->CreateSubchannel( + std::move(address), args); +} + +void EdsDropLb::Helper::UpdateState(grpc_connectivity_state state, + const absl::Status& status, + std::unique_ptr picker) { + if (eds_drop_policy_->shutting_down_) return; + if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { + gpr_log(GPR_INFO, + "[eds_drop_lb %p] child connectivity state update: state=%s (%s) " + "picker=%p", + eds_drop_policy_.get(), ConnectivityStateName(state), + status.ToString().c_str(), picker.get()); + } + // Save the state and picker. + eds_drop_policy_->state_ = state; + eds_drop_policy_->status_ = status; + eds_drop_policy_->picker_ = + MakeRefCounted(std::move(picker)); + // Wrap the picker and return it to the channel. + eds_drop_policy_->MaybeUpdatePickerLocked(); +} + +void EdsDropLb::Helper::RequestReresolution() { + if (eds_drop_policy_->shutting_down_) return; + eds_drop_policy_->channel_control_helper()->RequestReresolution(); +} + +void EdsDropLb::Helper::AddTraceEvent(TraceSeverity severity, + absl::string_view message) { + if (eds_drop_policy_->shutting_down_) return; + eds_drop_policy_->channel_control_helper()->AddTraceEvent(severity, message); +} + +// +// factory +// + +class EdsDropLbFactory : public LoadBalancingPolicyFactory { + public: + OrphanablePtr CreateLoadBalancingPolicy( + LoadBalancingPolicy::Args args) const override { + grpc_error* error = GRPC_ERROR_NONE; + RefCountedPtr xds_client = XdsClient::GetOrCreate(&error); + if (error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, + "cannot get XdsClient to instantiate eds_drop LB policy: %s", + grpc_error_string(error)); + GRPC_ERROR_UNREF(error); + return nullptr; + } + return MakeOrphanable(std::move(xds_client), std::move(args)); + } + + const char* name() const override { return kEdsDrop; } + + RefCountedPtr ParseLoadBalancingConfig( + const Json& json, grpc_error** error) const override { + GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); + if (json.type() == Json::Type::JSON_NULL) { + // This policy was configured in the deprecated loadBalancingPolicy + // field or in the client API. + *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:loadBalancingPolicy error:eds_drop policy requires " + "configuration. Please use loadBalancingConfig field of service " + "config instead."); + return nullptr; + } + std::vector error_list; + // Child policy. + RefCountedPtr child_policy; + auto it = json.object_value().find("childPolicy"); + if (it == json.object_value().end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:childPolicy error:required field missing")); + } else { + grpc_error* parse_error = GRPC_ERROR_NONE; + child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( + it->second, &parse_error); + if (child_policy == nullptr) { + GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); + std::vector child_errors; + child_errors.push_back(parse_error); + error_list.push_back( + GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors)); + } + } + // Cluster name. + std::string cluster_name; + it = json.object_value().find("clusterName"); + if (it == json.object_value().end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:clusterName error:required field missing")); + } else if (it->second.type() != Json::Type::STRING) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:clusterName error:type should be string")); + } else { + cluster_name = it->second.string_value(); + } + // EDS service name. + std::string eds_service_name; + it = json.object_value().find("edsServiceName"); + if (it != json.object_value().end()) { + if (it->second.type() != Json::Type::STRING) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:edsServiceName error:type should be string")); + } else { + eds_service_name = it->second.string_value(); + } + } + // LRS load reporting server name. + absl::optional lrs_load_reporting_server_name; + it = json.object_value().find("lrsLoadReportingServerName"); + if (it != json.object_value().end()) { + if (it->second.type() != Json::Type::STRING) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:lrsLoadReportingServerName error:type should be string")); + } else { + lrs_load_reporting_server_name = it->second.string_value(); + } + } + // Drop config. + auto drop_config = MakeRefCounted(); + it = json.object_value().find("dropCategories"); + if (it == json.object_value().end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:dropCategories error:required field missing")); + } else { + std::vector child_errors = + ParseDropCategories(it->second, drop_config.get()); + if (!child_errors.empty()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR( + "field:dropCategories", &child_errors)); + } + } + if (!error_list.empty()) { + *error = GRPC_ERROR_CREATE_FROM_VECTOR( + "eds_drop_experimental LB policy config", &error_list); + return nullptr; + } + return MakeRefCounted( + std::move(child_policy), std::move(cluster_name), + std::move(eds_service_name), std::move(lrs_load_reporting_server_name), + std::move(drop_config)); + } + + private: + static std::vector ParseDropCategories( + const Json& json, XdsApi::EdsUpdate::DropConfig* drop_config) { + std::vector error_list; + if (json.type() != Json::Type::ARRAY) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "dropCategories field is not an array")); + return error_list; + } + for (size_t i = 0; i < json.array_value().size(); ++i) { + const Json& entry = json.array_value()[i]; + std::vector child_errors = + ParseDropCategory(entry, drop_config); + if (!child_errors.empty()) { + grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("errors parsing index ", i).c_str()); + for (size_t i = 0; i < child_errors.size(); ++i) { + error = grpc_error_add_child(error, child_errors[i]); + } + error_list.push_back(error); + } + } + return error_list; + } + + static std::vector ParseDropCategory( + const Json& json, XdsApi::EdsUpdate::DropConfig* drop_config) { + std::vector error_list; + if (json.type() != Json::Type::OBJECT) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "dropCategories entry is not an object")); + return error_list; + } + std::string category; + auto it = json.object_value().find("category"); + if (it == json.object_value().end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "\"category\" field not present")); + } else if (it->second.type() != Json::Type::STRING) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "\"category\" field is not a string")); + } else { + category = it->second.string_value(); + } + uint32_t requests_per_million = 0; + it = json.object_value().find("requests_per_million"); + if (it == json.object_value().end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "\"requests_per_million\" field is not present")); + } else if (it->second.type() != Json::Type::NUMBER) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "\"requests_per_million\" field is not a number")); + } else { + requests_per_million = + gpr_parse_nonnegative_int(it->second.string_value().c_str()); + } + if (error_list.empty()) { + drop_config->AddCategory(std::move(category), requests_per_million); + } + return error_list; + } +}; + +} // namespace + +} // namespace grpc_core + +// +// Plugin registration +// + +void grpc_lb_policy_eds_drop_init() { + grpc_core::LoadBalancingPolicyRegistry::Builder:: + RegisterLoadBalancingPolicyFactory( + absl::make_unique()); +} + +void grpc_lb_policy_eds_drop_shutdown() {} diff --git a/src/core/ext/filters/client_channel/lb_policy_registry.cc b/src/core/ext/filters/client_channel/lb_policy_registry.cc index d0cfab6dfca..d64a6434494 100644 --- a/src/core/ext/filters/client_channel/lb_policy_registry.cc +++ b/src/core/ext/filters/client_channel/lb_policy_registry.cc @@ -23,7 +23,9 @@ #include #include "absl/container/inlined_vector.h" +#include "absl/strings/str_cat.h" #include "absl/strings/str_format.h" +#include "absl/strings/str_join.h" #include "src/core/lib/gpr/string.h" @@ -126,6 +128,7 @@ grpc_error* ParseLoadBalancingConfigHelper( return GRPC_ERROR_CREATE_FROM_STATIC_STRING("type should be array"); } // Find the first LB policy that this client supports. + std::vector policies_tried; for (const Json& lb_config : lb_config_array.array_value()) { if (lb_config.type() != Json::Type::OBJECT) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( @@ -149,8 +152,12 @@ grpc_error* ParseLoadBalancingConfigHelper( *result = it; return GRPC_ERROR_NONE; } + policies_tried.push_back(it->first); } - return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No known policy"); + return GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("No known policies in list: ", + absl::StrJoin(policies_tried, " ")) + .c_str()); } } // namespace diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc index b079267442f..4e7054f73f1 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_plugin_registry.cc @@ -72,6 +72,8 @@ void grpc_lb_policy_cds_init(void); void grpc_lb_policy_cds_shutdown(void); void grpc_lb_policy_eds_init(void); void grpc_lb_policy_eds_shutdown(void); +void grpc_lb_policy_eds_drop_init(void); +void grpc_lb_policy_eds_drop_shutdown(void); void grpc_lb_policy_lrs_init(void); void grpc_lb_policy_lrs_shutdown(void); void grpc_lb_policy_xds_cluster_manager_init(void); @@ -130,6 +132,8 @@ void grpc_register_built_in_plugins(void) { grpc_lb_policy_cds_shutdown); grpc_register_plugin(grpc_lb_policy_eds_init, grpc_lb_policy_eds_shutdown); + grpc_register_plugin(grpc_lb_policy_eds_drop_init, + grpc_lb_policy_eds_drop_shutdown); grpc_register_plugin(grpc_lb_policy_lrs_init, grpc_lb_policy_lrs_shutdown); grpc_register_plugin(grpc_lb_policy_xds_cluster_manager_init, diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index b68f113c808..5f69672805f 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -43,6 +43,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc', + 'src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc', 'src/core/ext/filters/client_channel/lb_policy_registry.cc', diff --git a/test/core/client_channel/service_config_test.cc b/test/core/client_channel/service_config_test.cc index 25c7b9821f1..2c3ed355bd7 100644 --- a/test/core/client_channel/service_config_test.cc +++ b/test/core/client_channel/service_config_test.cc @@ -532,7 +532,7 @@ TEST_F(ClientChannelParserTest, UnknownLoadBalancingConfig) { "Global Params.*referenced_errors.*" "Client channel global parser.*referenced_errors.*" "field:loadBalancingConfig.*referenced_errors.*" - "No known policy"); + "No known policies in list: unknown"); VerifyRegexMatch(error, regex); } diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 7426fe7f1f9..c3ed61a3081 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1089,6 +1089,7 @@ src/core/ext/filters/client_channel/lb_policy/subchannel_list.h \ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \ +src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc \ src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds.h \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index fcce55fd9c5..1a073c89a09 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -914,6 +914,7 @@ src/core/ext/filters/client_channel/lb_policy/subchannel_list.h \ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \ +src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc \ src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds.h \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \