From 7a146722db8cd12a8a710bd91ee86a5cbdf25ea8 Mon Sep 17 00:00:00 2001 From: Donna Dionne Date: Mon, 9 Mar 2020 16:47:24 -0700 Subject: [PATCH] Harded coded a service config to point to xds_routing_experimental With just 1 action, which has 1 child policy: cds_experimental Basically wrapping the previous cds_experimental config inside the new xds_routing_experimental Tested to make sure all current tests still pass. This is just a skeleton code to allow new parsing code to be added and tested. --- BUILD | 26 + .../client_channel/lb_policy/xds/rds.cc | 456 ++++++++---------- .../filters/client_channel/xds/xds_client.cc | 11 +- .../plugin_registry/grpc_plugin_registry.cc | 4 + .../grpc_unsecure_plugin_registry.cc | 4 + test/cpp/end2end/xds_end2end_test.cc | 6 +- 6 files changed, 246 insertions(+), 261 deletions(-) diff --git a/BUILD b/BUILD index 8fe2d6d2e15..a0c9247e74e 100644 --- a/BUILD +++ b/BUILD @@ -320,6 +320,7 @@ grpc_cc_library( "grpc_common", "grpc_lb_policy_cds", "grpc_lb_policy_grpclb", + "grpc_lb_policy_rds", "grpc_lb_policy_xds", "grpc_resolver_xds", ], @@ -338,6 +339,7 @@ grpc_cc_library( "grpc_common", "grpc_lb_policy_cds_secure", "grpc_lb_policy_grpclb_secure", + "grpc_lb_policy_rds_secure", "grpc_lb_policy_xds_secure", "grpc_resolver_xds_secure", "grpc_secure", @@ -1397,6 +1399,30 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "grpc_lb_policy_rds", + srcs = [ + "src/core/ext/filters/client_channel/lb_policy/xds/rds.cc", + ], + language = "c++", + deps = [ + "grpc_base", + "grpc_client_channel", + ], +) + +grpc_cc_library( + name = "grpc_lb_policy_rds_secure", + srcs = [ + "src/core/ext/filters/client_channel/lb_policy/xds/rds.cc", + ], + language = "c++", + deps = [ + "grpc_base", + "grpc_client_channel", + ], +) + grpc_cc_library( name = "grpc_lb_subchannel_list", hdrs = [ diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/rds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/rds.cc index af091fdfad6..6a730d2a512 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/rds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/rds.cc @@ -25,9 +25,9 @@ #include #include "src/core/ext/filters/client_channel/lb_policy.h" +#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" -#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/orphanable.h" @@ -39,39 +39,38 @@ namespace grpc_core { -TraceFlag grpc_lb_weighted_target_trace(false, "weighted_target_lb"); +TraceFlag grpc_rds_lb_trace(false, "rds_lb"); namespace { -constexpr char kWeightedTarget[] = "weighted_target_experimental"; +constexpr char kRds[] = "xds_routing_experimental"; -// Config for weighted_target LB policy. -class WeightedTargetLbConfig : public LoadBalancingPolicy::Config { +// Config for rds LB policy. +class RdsLbConfig : public LoadBalancingPolicy::Config { public: struct ChildConfig { - uint32_t weight; RefCountedPtr config; }; - using TargetMap = std::map; + using ActionMap = std::map; - explicit WeightedTargetLbConfig(TargetMap target_map) - : target_map_(std::move(target_map)) {} + explicit RdsLbConfig(ActionMap action_map) + : action_map_(std::move(action_map)) {} - const char* name() const override { return kWeightedTarget; } + const char* name() const override { return kRds; } - const TargetMap& target_map() const { return target_map_; } + const ActionMap& action_map() const { return action_map_; } private: - TargetMap target_map_; + ActionMap action_map_; }; -// weighted_target LB policy. -class WeightedTargetLb : public LoadBalancingPolicy { +// rds LB policy. +class RdsLb : public LoadBalancingPolicy { public: - explicit WeightedTargetLb(Args args); + explicit RdsLb(Args args); - const char* name() const override { return kWeightedTarget; } + const char* name() const override { return kRds; } void UpdateLocked(UpdateArgs args) override; void ExitIdleLocked() override; @@ -83,53 +82,48 @@ class WeightedTargetLb : public LoadBalancingPolicy { public: explicit ChildPickerWrapper(std::unique_ptr picker) : picker_(std::move(picker)) {} - PickResult Pick(PickArgs args) { - return picker_->Pick(std::move(args)); - } + PickResult Pick(PickArgs args) { return picker_->Pick(std::move(args)); } + private: std::unique_ptr picker_; }; // Picks a child using stateless WRR and then delegates to that // child's picker. - class WeightedPicker : public SubchannelPicker { + class RdsPicker : public SubchannelPicker { public: - // Maintains a weighted list of pickers from each child that is in + // Maintains a rds list of pickers from each child that is in // ready state. The first element in the pair represents the end of a // range proportional to the child's weight. The start of the range // is the previous value in the vector and is 0 for the first element. - using PickerList = - InlinedVector>, 1>; + using PickerList = InlinedVector, 1>; - WeightedPicker(RefCountedPtr parent, PickerList pickers) + RdsPicker(RefCountedPtr parent, PickerList pickers) : parent_(std::move(parent)), pickers_(std::move(pickers)) {} - ~WeightedPicker() { parent_.reset(DEBUG_LOCATION, "WeightedPicker"); } + ~RdsPicker() { parent_.reset(DEBUG_LOCATION, "RdsPicker"); } PickResult Pick(PickArgs args) override; private: - RefCountedPtr parent_; + RefCountedPtr parent_; PickerList pickers_; }; - // Each WeightedChild holds a ref to its parent WeightedTargetLb. - class WeightedChild : public InternallyRefCounted { + // Each RdsChild holds a ref to its parent RdsLb. + class RdsChild : public InternallyRefCounted { public: - WeightedChild(RefCountedPtr weighted_target_policy, - const std::string& name); - ~WeightedChild(); + RdsChild(RefCountedPtr rds_policy, const std::string& name); + ~RdsChild(); void Orphan() override; - void UpdateLocked(const WeightedTargetLbConfig::ChildConfig& config, + void UpdateLocked(const RdsLbConfig::ChildConfig& config, const ServerAddressList& addresses, const grpc_channel_args* args); void ExitIdleLocked(); void ResetBackoffLocked(); void DeactivateLocked(); - uint32_t weight() const { return weight_; } grpc_connectivity_state connectivity_state() const { return connectivity_state_; } @@ -140,10 +134,10 @@ class WeightedTargetLb : public LoadBalancingPolicy { private: class Helper : public ChannelControlHelper { public: - explicit Helper(RefCountedPtr weighted_child) - : weighted_child_(std::move(weighted_child)) {} + explicit Helper(RefCountedPtr rds_child) + : rds_child_(std::move(rds_child)) {} - ~Helper() { weighted_child_.reset(DEBUG_LOCATION, "Helper"); } + ~Helper() { rds_child_.reset(DEBUG_LOCATION, "Helper"); } RefCountedPtr CreateSubchannel( const grpc_channel_args& args) override; @@ -153,7 +147,7 @@ class WeightedTargetLb : public LoadBalancingPolicy { void AddTraceEvent(TraceSeverity severity, StringView message) override; private: - RefCountedPtr weighted_child_; + RefCountedPtr rds_child_; }; // Methods for dealing with the child policy. @@ -164,13 +158,11 @@ class WeightedTargetLb : public LoadBalancingPolicy { static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); // The owning LB policy. - RefCountedPtr weighted_target_policy_; + RefCountedPtr rds_policy_; - // Points to the corresponding key in WeightedTargetLb::targets_. + // Points to the corresponding key in RdsLb::actions_. const std::string& name_; - uint32_t weight_; - OrphanablePtr child_policy_; RefCountedPtr picker_wrapper_; @@ -184,7 +176,7 @@ class WeightedTargetLb : public LoadBalancingPolicy { bool shutdown_ = false; }; - ~WeightedTargetLb(); + ~RdsLb(); void ShutdownLocked() override; @@ -193,138 +185,116 @@ class WeightedTargetLb : public LoadBalancingPolicy { const grpc_millis child_retention_interval_ms_; // Current config from the resolver. - RefCountedPtr config_; + RefCountedPtr config_; // Internal state. bool shutting_down_ = false; // Children. - std::map> targets_; + std::map> actions_; }; // -// WeightedTargetLb::WeightedPicker +// RdsLb::RdsPicker // -WeightedTargetLb::PickResult WeightedTargetLb::WeightedPicker::Pick( - PickArgs args) { - // Generate a random number in [0, total weight). - const uint32_t key = rand() % pickers_[pickers_.size() - 1].first; - // Find the index in pickers_ corresponding to key. - size_t mid = 0; - size_t start_index = 0; - size_t end_index = pickers_.size() - 1; - size_t index = 0; - while (end_index > start_index) { - mid = (start_index + end_index) / 2; - if (pickers_[mid].first > key) { - end_index = mid; - } else if (pickers_[mid].first < key) { - start_index = mid + 1; - } else { - index = mid + 1; - break; - } - } - if (index == 0) index = start_index; - GPR_ASSERT(pickers_[index].first > key); - // Delegate to the child picker. - return pickers_[index].second->Pick(args); +RdsLb::PickResult RdsLb::RdsPicker::Pick(PickArgs args) { + gpr_log( + GPR_INFO, + "donna Picking not implemented yet, just always use the one and only"); + return pickers_[0]->Pick(args); } // -// WeightedTargetLb +// RdsLb // -WeightedTargetLb::WeightedTargetLb(Args args) +RdsLb::RdsLb(Args args) : LoadBalancingPolicy(std::move(args)), -// FIXME: new channel arg + // FIXME: new channel arg child_retention_interval_ms_(grpc_channel_args_find_integer( args.args, GRPC_ARG_LOCALITY_RETENTION_INTERVAL_MS, {GRPC_WEIGHTED_TARGET_CHILD_RETENTION_INTERVAL_MS, 0, INT_MAX})) {} -WeightedTargetLb::~WeightedTargetLb() { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { - gpr_log(GPR_INFO, "[weighted_target_lb %p] destroying weighted_target LB policy", - this); +RdsLb::~RdsLb() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) { + gpr_log(GPR_INFO, "[rds_lb %p] destroying rds LB policy", this); } } -void WeightedTargetLb::ShutdownLocked() { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { - gpr_log(GPR_INFO, "[weighted_target_lb %p] shutting down", this); +void RdsLb::ShutdownLocked() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) { + gpr_log(GPR_INFO, "[rds_lb %p] shutting down", this); } shutting_down_ = true; - targets_.clear(); + actions_.clear(); } -void WeightedTargetLb::ExitIdleLocked() { - for (auto& p : targets_) p.second->ExitIdleLocked(); +void RdsLb::ExitIdleLocked() { + for (auto& p : actions_) p.second->ExitIdleLocked(); } -void WeightedTargetLb::ResetBackoffLocked() { - for (auto& p : targets_) p.second->ResetBackoffLocked(); +void RdsLb::ResetBackoffLocked() { + for (auto& p : actions_) p.second->ResetBackoffLocked(); } -void WeightedTargetLb::UpdateLocked(UpdateArgs args) { +void RdsLb::UpdateLocked(UpdateArgs args) { if (shutting_down_) return; - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { - gpr_log(GPR_INFO, "[weighted_target_lb %p] Received update", this); + if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) { + gpr_log(GPR_INFO, "[rds_lb %p] Received update", this); } // Update config. config_ = std::move(args.config); - // Deactivate the targets not in the new config. - for (auto it = targets_.begin(); it != targets_.end();) { + // Deactivate the actions not in the new config. + for (auto it = actions_.begin(); it != actions_.end();) { const std::string& name = it->first; - WeightedChild* child = it->second.get(); - if (config_->target_map().find(name) != config_->target_map().end()) { + RdsChild* child = it->second.get(); + if (config_->action_map().find(name) != config_->action_map().end()) { ++it; continue; } if (child_retention_interval_ms_ == 0) { - it = targets_.erase(it); + it = actions_.erase(it); } else { child->DeactivateLocked(); ++it; } } - // Add or update the targets in the new config. - for (const auto& p : config_->target_map()) { + // Add or update the actions in the new config. + for (const auto& p : config_->action_map()) { const std::string& name = p.first; - const WeightedTargetLbConfig::ChildConfig& config = p.second; - auto it = targets_.find(name); - if (it == targets_.end()) { - it = targets_.emplace(std::make_pair(name, nullptr)).first; - it->second = MakeOrphanable( - Ref(DEBUG_LOCATION, "WeightedChild"), it->first); + const RdsLbConfig::ChildConfig& config = p.second; + auto it = actions_.find(name); + if (it == actions_.end()) { + it = actions_.emplace(std::make_pair(name, nullptr)).first; + it->second = + MakeOrphanable(Ref(DEBUG_LOCATION, "RdsChild"), it->first); } it->second->UpdateLocked(config, args.addresses, args.args); } } -void WeightedTargetLb::UpdateStateLocked() { +void RdsLb::UpdateStateLocked() { // Construct a new picker which maintains a map of all child pickers // that are ready. Each child is represented by a portion of the range // proportional to its weight, such that the total range is the sum of the // weights of all children. - WeightedPicker::PickerList picker_list; - uint32_t end = 0; + RdsPicker::PickerList picker_list; // Also count the number of children in each state, to determine the // overall state. size_t num_connecting = 0; size_t num_idle = 0; size_t num_transient_failures = 0; - for (const auto& p : targets_) { + for (const auto& p : actions_) { const auto& child_name = p.first; - const WeightedChild* child = p.second.get(); - // Skip the targets that are not in the latest update. - if (config_->target_map().find(child_name) == config_->target_map().end()) { + const RdsChild* child = p.second.get(); + // Skip the actions that are not in the latest update. + if (config_->action_map().find(child_name) == config_->action_map().end()) { continue; } switch (child->connectivity_state()) { case GRPC_CHANNEL_READY: { - end += child->weight(); - picker_list.push_back(std::make_pair(end, child->picker_wrapper())); + picker_list.push_back(child->picker_wrapper()); break; } case GRPC_CHANNEL_CONNECTING: { @@ -354,62 +324,59 @@ void WeightedTargetLb::UpdateStateLocked() { } else { connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE; } - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { - gpr_log(GPR_INFO, "[weighted_target_lb %p] connectivity changed to %s", - this, ConnectivityStateName(connectivity_state)); + if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) { + gpr_log(GPR_INFO, "[rds_lb %p] connectivity changed to %s", this, + ConnectivityStateName(connectivity_state)); } std::unique_ptr picker; switch (connectivity_state) { case GRPC_CHANNEL_READY: - picker = absl::make_unique( - Ref(DEBUG_LOCATION, "WeightedPicker"), std::move(picker_list)); + picker = absl::make_unique(Ref(DEBUG_LOCATION, "RdsPicker"), + std::move(picker_list)); break; case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: - picker = absl::make_unique( - Ref(DEBUG_LOCATION, "QueuePicker")); + picker = + absl::make_unique(Ref(DEBUG_LOCATION, "QueuePicker")); break; default: picker = absl::make_unique( GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "weighted_target: all children report state TRANSIENT_FAILURE")); + "rds: all children report state TRANSIENT_FAILURE")); } channel_control_helper()->UpdateState(connectivity_state, std::move(picker)); } // -// WeightedTargetLb::WeightedChild +// RdsLb::RdsChild // -WeightedTargetLb::WeightedChild::WeightedChild( - RefCountedPtr weighted_target_policy, - const std::string& name) - : weighted_target_policy_(std::move(weighted_target_policy)), name_(name) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { - gpr_log(GPR_INFO, "[weighted_target_lb %p] created WeightedChild %p for %s", - weighted_target_policy_.get(), this, name_.c_str()); +RdsLb::RdsChild::RdsChild(RefCountedPtr rds_policy, + const std::string& name) + : rds_policy_(std::move(rds_policy)), name_(name) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) { + gpr_log(GPR_INFO, "[rds_lb %p] created RdsChild %p for %s", + rds_policy_.get(), this, name_.c_str()); } } -WeightedTargetLb::WeightedChild::~WeightedChild() { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { - gpr_log(GPR_INFO, - "[weighted_target_lb %p] WeightedChild %p %s: destroying child", - weighted_target_policy_.get(), this, name_.c_str()); +RdsLb::RdsChild::~RdsChild() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) { + gpr_log(GPR_INFO, "[rds_lb %p] RdsChild %p %s: destroying child", + rds_policy_.get(), this, name_.c_str()); } - weighted_target_policy_.reset(DEBUG_LOCATION, "WeightedChild"); + rds_policy_.reset(DEBUG_LOCATION, "RdsChild"); } -void WeightedTargetLb::WeightedChild::Orphan() { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { - gpr_log(GPR_INFO, - "[weighted_target_lb %p] WeightedChild %p %s: shutting down child", - weighted_target_policy_.get(), this, name_.c_str()); +void RdsLb::RdsChild::Orphan() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) { + gpr_log(GPR_INFO, "[rds_lb %p] RdsChild %p %s: shutting down child", + rds_policy_.get(), this, name_.c_str()); } // Remove the child policy's interested_parties pollset_set from the // xDS policy. grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), - weighted_target_policy_->interested_parties()); + rds_policy_->interested_parties()); child_policy_.reset(); // Drop our ref to the child's picker, in case it's holding a ref to // the child. @@ -421,39 +388,35 @@ void WeightedTargetLb::WeightedChild::Orphan() { Unref(); } -OrphanablePtr -WeightedTargetLb::WeightedChild::CreateChildPolicyLocked( +OrphanablePtr RdsLb::RdsChild::CreateChildPolicyLocked( const grpc_channel_args* args) { LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.combiner = weighted_target_policy_->combiner(); + lb_policy_args.combiner = rds_policy_->combiner(); lb_policy_args.args = args; lb_policy_args.channel_control_helper = absl::make_unique(this->Ref(DEBUG_LOCATION, "Helper")); OrphanablePtr lb_policy = MakeOrphanable(std::move(lb_policy_args), - &grpc_lb_weighted_target_trace); - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { + &grpc_rds_lb_trace); + if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) { gpr_log(GPR_INFO, - "[weighted_target_lb %p] WeightedChild %p %s: Created new child " + "[rds_lb %p] RdsChild %p %s: Created new child " "policy handler %p", - weighted_target_policy_.get(), this, name_.c_str(), - lb_policy.get()); + rds_policy_.get(), this, name_.c_str(), lb_policy.get()); } // Add the xDS's interested_parties pollset_set to that of the newly created // child policy. This will make the child policy progress upon activity on // xDS LB, which in turn is tied to the application's call. - grpc_pollset_set_add_pollset_set( - lb_policy->interested_parties(), - weighted_target_policy_->interested_parties()); + grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), + rds_policy_->interested_parties()); return lb_policy; } -void WeightedTargetLb::WeightedChild::UpdateLocked( - const WeightedTargetLbConfig::ChildConfig& config, - const ServerAddressList& addresses, const grpc_channel_args* args) { - if (weighted_target_policy_->shutting_down_) return; +void RdsLb::RdsChild::UpdateLocked(const RdsLbConfig::ChildConfig& config, + const ServerAddressList& addresses, + const grpc_channel_args* args) { + if (rds_policy_->shutting_down_) return; // Update child weight. - weight_ = config.weight; // Reactivate if needed. if (delayed_removal_timer_callback_pending_) { grpc_timer_cancel(&delayed_removal_timer_); @@ -468,212 +431,193 @@ void WeightedTargetLb::WeightedChild::UpdateLocked( update_args.addresses = addresses; update_args.args = grpc_channel_args_copy(args); // Update the policy. - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) { gpr_log(GPR_INFO, - "[weighted_target_lb %p] WeightedChild %p %s: Updating child " + "[rds_lb %p] RdsChild %p %s: Updating child " "policy handler %p", - weighted_target_policy_.get(), this, name_.c_str(), - child_policy_.get()); + rds_policy_.get(), this, name_.c_str(), child_policy_.get()); } child_policy_->UpdateLocked(std::move(update_args)); } -void WeightedTargetLb::WeightedChild::ExitIdleLocked() { - child_policy_->ExitIdleLocked(); -} +void RdsLb::RdsChild::ExitIdleLocked() { child_policy_->ExitIdleLocked(); } -void WeightedTargetLb::WeightedChild::ResetBackoffLocked() { +void RdsLb::RdsChild::ResetBackoffLocked() { child_policy_->ResetBackoffLocked(); } -void WeightedTargetLb::WeightedChild::DeactivateLocked() { +void RdsLb::RdsChild::DeactivateLocked() { // If already deactivated, don't do that again. - if (weight_ == 0) return; // Set the child weight to 0 so that future picker won't contain this child. - weight_ = 0; // Start a timer to delete the child. - Ref(DEBUG_LOCATION, "WeightedChild+timer").release(); + Ref(DEBUG_LOCATION, "RdsChild+timer").release(); GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this, grpc_schedule_on_exec_ctx); grpc_timer_init( &delayed_removal_timer_, - ExecCtx::Get()->Now() + - weighted_target_policy_->child_retention_interval_ms_, + ExecCtx::Get()->Now() + rds_policy_->child_retention_interval_ms_, &on_delayed_removal_timer_); delayed_removal_timer_callback_pending_ = true; } -void WeightedTargetLb::WeightedChild::OnDelayedRemovalTimer(void* arg, - grpc_error* error) { - WeightedChild* self = static_cast(arg); - self->weighted_target_policy_->combiner()->Run( +void RdsLb::RdsChild::OnDelayedRemovalTimer(void* arg, grpc_error* error) { + RdsChild* self = static_cast(arg); + self->rds_policy_->combiner()->Run( GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_, OnDelayedRemovalTimerLocked, self, nullptr), GRPC_ERROR_REF(error)); } -void WeightedTargetLb::WeightedChild::OnDelayedRemovalTimerLocked( - void* arg, grpc_error* error) { - WeightedChild* self = static_cast(arg); +void RdsLb::RdsChild::OnDelayedRemovalTimerLocked(void* arg, + grpc_error* error) { + RdsChild* self = static_cast(arg); self->delayed_removal_timer_callback_pending_ = false; - if (error == GRPC_ERROR_NONE && !self->shutdown_ && self->weight_ == 0) { - self->weighted_target_policy_->targets_.erase(self->name_); + if (error == GRPC_ERROR_NONE && !self->shutdown_) { + self->rds_policy_->actions_.erase(self->name_); } - self->Unref(DEBUG_LOCATION, "WeightedChild+timer"); + self->Unref(DEBUG_LOCATION, "RdsChild+timer"); } // -// WeightedTargetLb::WeightedChild::Helper +// RdsLb::RdsChild::Helper // -RefCountedPtr -WeightedTargetLb::WeightedChild::Helper::CreateSubchannel( +RefCountedPtr RdsLb::RdsChild::Helper::CreateSubchannel( const grpc_channel_args& args) { - if (weighted_child_->weighted_target_policy_->shutting_down_) return nullptr; - return weighted_child_->weighted_target_policy_->channel_control_helper()->CreateSubchannel( + if (rds_child_->rds_policy_->shutting_down_) return nullptr; + return rds_child_->rds_policy_->channel_control_helper()->CreateSubchannel( args); } -void WeightedTargetLb::WeightedChild::Helper::UpdateState( +void RdsLb::RdsChild::Helper::UpdateState( grpc_connectivity_state state, std::unique_ptr picker) { - if (weighted_child_->weighted_target_policy_->shutting_down_) return; - // Cache the picker in the WeightedChild. - weighted_child_->picker_wrapper_ = + if (rds_child_->rds_policy_->shutting_down_) return; + // Cache the picker in the RdsChild. + rds_child_->picker_wrapper_ = MakeRefCounted(std::move(picker)); // Decide what state to report for aggregation purposes. // If we haven't seen a failure since the last time we were in state // READY, then we report the state change as-is. However, once we do see // a failure, we report TRANSIENT_FAILURE and ignore any subsequent state // changes until we go back into state READY. - if (!weighted_child_->seen_failure_since_ready_) { + if (!rds_child_->seen_failure_since_ready_) { if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - weighted_child_->seen_failure_since_ready_ = true; + rds_child_->seen_failure_since_ready_ = true; } } else { if (state != GRPC_CHANNEL_READY) return; - weighted_child_->seen_failure_since_ready_ = false; + rds_child_->seen_failure_since_ready_ = false; } - weighted_child_->connectivity_state_ = state; + rds_child_->connectivity_state_ = state; // Notify the LB policy. - weighted_child_->weighted_target_policy_->UpdateStateLocked(); + rds_child_->rds_policy_->UpdateStateLocked(); } -void WeightedTargetLb::WeightedChild::Helper::RequestReresolution() { - if (weighted_child_->weighted_target_policy_->shutting_down_) return; - weighted_child_->weighted_target_policy_->channel_control_helper()->RequestReresolution(); +void RdsLb::RdsChild::Helper::RequestReresolution() { + if (rds_child_->rds_policy_->shutting_down_) return; + rds_child_->rds_policy_->channel_control_helper()->RequestReresolution(); } -void WeightedTargetLb::WeightedChild::Helper::AddTraceEvent(TraceSeverity severity, - StringView message) { - if (weighted_child_->weighted_target_policy_->shutting_down_) return; - weighted_child_->weighted_target_policy_->channel_control_helper()->AddTraceEvent( - severity, message); +void RdsLb::RdsChild::Helper::AddTraceEvent(TraceSeverity severity, + StringView message) { + if (rds_child_->rds_policy_->shutting_down_) return; + rds_child_->rds_policy_->channel_control_helper()->AddTraceEvent(severity, + message); } // // factory // -class WeightedTargetLbFactory : public LoadBalancingPolicyFactory { +class RdsLbFactory : public LoadBalancingPolicyFactory { public: OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { - return MakeOrphanable(std::move(args)); + return MakeOrphanable(std::move(args)); } - const char* name() const override { return kWeightedTarget; } + const char* name() const override { return kRds; } RefCountedPtr ParseLoadBalancingConfig( const Json& json, grpc_error** error) const override { GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); if (json.type() == Json::Type::JSON_NULL) { - // weighted_target was mentioned as a policy in the deprecated + // rds was mentioned as a policy in the deprecated // loadBalancingPolicy field or in the client API. *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:loadBalancingPolicy error:weighted_target policy requires " + "field:loadBalancingPolicy error:rds policy requires " "configuration. Please use loadBalancingConfig field of service " "config instead."); return nullptr; } std::vector error_list; // Weight map. - WeightedTargetLbConfig::TargetMap target_map; - auto it = json.object_value().find("targets"); + RdsLbConfig::ActionMap action_map; + auto it = json.object_value().find("actions"); if (it == json.object_value().end()) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:targets error:required field not present")); - } else if (it->second.type() != Json::Type::OBJECT) { + "field:actions error:required field not present")); + } else if (it->second.type() != Json::Type::ARRAY) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:targets error:type should be object")); + "field:actions error:type should be array")); } else { - for (const auto& p : it->second.object_value()) { - WeightedTargetLbConfig::ChildConfig child_config; + for (const auto& p : it->second.array_value()) { + auto it_name = p.object_value().find("name"); + if (it_name == p.object_value().end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:actions error: each action needs a name")); + } + auto it_child_policy = p.object_value().find("child_policy"); + if (it_child_policy == p.object_value().end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:actions error: each action needs child policies")); + } + RdsLbConfig::ChildConfig child_config; std::vector child_errors = - ParseChildConfig(p.second, &child_config); + ParseChildConfig(it_child_policy->second, &child_config); if (!child_errors.empty()) { // Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error // string is not static in this case. grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("field:targets key:", p.first).c_str()); + absl::StrCat("field:actions name:", + it_name->second.string_value()) + .c_str()); for (grpc_error* child_error : child_errors) { error = grpc_error_add_child(error, child_error); } error_list.push_back(error); } else { - target_map[p.first] = std::move(child_config); + action_map[it_name->second.string_value()] = std::move(child_config); } } } if (!error_list.empty()) { *error = GRPC_ERROR_CREATE_FROM_VECTOR( - "weighted_target_experimental LB policy config", &error_list); + "rds_experimental LB policy config", &error_list); return nullptr; } - return MakeRefCounted(std::move(target_map)); + return MakeRefCounted(std::move(action_map)); } private: static std::vector ParseChildConfig( - const Json& json, WeightedTargetLbConfig::ChildConfig* child_config) { + const Json& json, RdsLbConfig::ChildConfig* child_config) { std::vector error_list; - if (json.type() != Json::Type::OBJECT) { + if (json.type() != Json::Type::ARRAY) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "value should be of type object")); + "value should be of type array")); return error_list; } - // Weight. - auto it = json.object_value().find("weight"); - if (it == json.object_value().end()) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "required field \"weight\" not specified")); - } else if (it->second.type() != Json::Type::NUMBER) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:weight error:must be of type number")); - } else { - child_config->weight = - gpr_parse_nonnegative_int(it->second.string_value().c_str()); - if (child_config->weight == -1) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:weight error:unparseable value")); - } else if (child_config->weight == 0) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:weight error:value must be greater than zero")); - } - } - // Child policy. - it = json.object_value().find("childPolicy"); - if (it != json.object_value().end()) { - grpc_error* parse_error = GRPC_ERROR_NONE; - child_config->config = - LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(it->second, - &parse_error); - if (child_config->config == nullptr) { - GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); - std::vector child_errors; - child_errors.push_back(parse_error); - error_list.push_back( - GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors)); - } + grpc_error* parse_error = GRPC_ERROR_NONE; + child_config->config = + LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( + json.array_value(), &parse_error); + if (child_config->config == nullptr) { + GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); + std::vector child_errors; + child_errors.push_back(parse_error); + error_list.push_back( + GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors)); } return error_list; } @@ -687,10 +631,10 @@ class WeightedTargetLbFactory : public LoadBalancingPolicyFactory { // Plugin registration // -void grpc_lb_policy_weighted_target_init() { +void grpc_lb_policy_rds_init() { grpc_core::LoadBalancingPolicyRegistry::Builder:: RegisterLoadBalancingPolicyFactory( - absl::make_unique()); + absl::make_unique()); } -void grpc_lb_policy_weighted_target_shutdown() {} +void grpc_lb_policy_rds_shutdown() {} diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index 42708fca556..a9725a392a9 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -1946,8 +1946,15 @@ grpc_error* XdsClient::CreateServiceConfig( gpr_asprintf(&json, "{\n" " \"loadBalancingConfig\":[\n" - " { \"cds_experimental\":{\n" - " \"cluster\": \"%s\"\n" + " { \"xds_routing_experimental\":{\n" + " \"actions\":[\n" + " { \"name\": \"default\",\n" + " \"child_policy\":[\n" + " { \"cds_experimental\":{\n" + " \"cluster\": \"%s\"\n" + " } }\n" + " ]\n" + " } ]\n" " } }\n" " ]\n" "}", diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc index 20ad526d837..4af79901595 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_plugin_registry.cc @@ -36,6 +36,8 @@ void grpc_lb_policy_grpclb_init(void); void grpc_lb_policy_grpclb_shutdown(void); void grpc_lb_policy_cds_init(void); void grpc_lb_policy_cds_shutdown(void); +void grpc_lb_policy_rds_init(void); +void grpc_lb_policy_rds_shutdown(void); void grpc_lb_policy_xds_init(void); void grpc_lb_policy_xds_shutdown(void); void grpc_lb_policy_pick_first_init(void); @@ -78,6 +80,8 @@ void grpc_register_built_in_plugins(void) { grpc_lb_policy_grpclb_shutdown); grpc_register_plugin(grpc_lb_policy_cds_init, grpc_lb_policy_cds_shutdown); + grpc_register_plugin(grpc_lb_policy_rds_init, + grpc_lb_policy_rds_shutdown); grpc_register_plugin(grpc_lb_policy_xds_init, grpc_lb_policy_xds_shutdown); grpc_register_plugin(grpc_lb_policy_pick_first_init, diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc index bfed2e22ddd..c150d64bb62 100644 --- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc @@ -44,6 +44,8 @@ void grpc_lb_policy_grpclb_init(void); void grpc_lb_policy_grpclb_shutdown(void); void grpc_lb_policy_cds_init(void); void grpc_lb_policy_cds_shutdown(void); +void grpc_lb_policy_rds_init(void); +void grpc_lb_policy_rds_shutdown(void); void grpc_lb_policy_xds_init(void); void grpc_lb_policy_xds_shutdown(void); void grpc_lb_policy_pick_first_init(void); @@ -86,6 +88,8 @@ void grpc_register_built_in_plugins(void) { grpc_lb_policy_grpclb_shutdown); grpc_register_plugin(grpc_lb_policy_cds_init, grpc_lb_policy_cds_shutdown); + grpc_register_plugin(grpc_lb_policy_rds_init, + grpc_lb_policy_rds_shutdown); grpc_register_plugin(grpc_lb_policy_xds_init, grpc_lb_policy_xds_shutdown); grpc_register_plugin(grpc_lb_policy_pick_first_init, diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 1fbdbad953e..522ed07d202 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -1546,9 +1546,9 @@ TEST_P(BasicTest, Vanilla) { backends_[i]->backend_service()->request_count()); } // Check LB policy name for the channel. - EXPECT_EQ( - (GetParam().use_xds_resolver() ? "cds_experimental" : "xds_experimental"), - channel_->GetLoadBalancingPolicyName()); + EXPECT_EQ((GetParam().use_xds_resolver() ? "xds_routing_experimental" + : "xds_experimental"), + channel_->GetLoadBalancingPolicyName()); } TEST_P(BasicTest, IgnoresUnhealthyEndpoints) {