Harded coded a service config to point to xds_routing_experimental

With just 1 action, which has 1 child policy: cds_experimental

Basically wrapping the previous cds_experimental config inside the new
xds_routing_experimental

Tested to make sure all current tests still pass.

This is just a skeleton code to allow new parsing code to be added and
tested.
reviewable/pr22280/r1
Donna Dionne 5 years ago
parent 0fa6782cb8
commit 7a146722db
  1. 26
      BUILD
  2. 456
      src/core/ext/filters/client_channel/lb_policy/xds/rds.cc
  3. 11
      src/core/ext/filters/client_channel/xds/xds_client.cc
  4. 4
      src/core/plugin_registry/grpc_plugin_registry.cc
  5. 4
      src/core/plugin_registry/grpc_unsecure_plugin_registry.cc
  6. 6
      test/cpp/end2end/xds_end2end_test.cc

26
BUILD

@ -320,6 +320,7 @@ grpc_cc_library(
"grpc_common", "grpc_common",
"grpc_lb_policy_cds", "grpc_lb_policy_cds",
"grpc_lb_policy_grpclb", "grpc_lb_policy_grpclb",
"grpc_lb_policy_rds",
"grpc_lb_policy_xds", "grpc_lb_policy_xds",
"grpc_resolver_xds", "grpc_resolver_xds",
], ],
@ -338,6 +339,7 @@ grpc_cc_library(
"grpc_common", "grpc_common",
"grpc_lb_policy_cds_secure", "grpc_lb_policy_cds_secure",
"grpc_lb_policy_grpclb_secure", "grpc_lb_policy_grpclb_secure",
"grpc_lb_policy_rds_secure",
"grpc_lb_policy_xds_secure", "grpc_lb_policy_xds_secure",
"grpc_resolver_xds_secure", "grpc_resolver_xds_secure",
"grpc_secure", "grpc_secure",
@ -1397,6 +1399,30 @@ grpc_cc_library(
], ],
) )
grpc_cc_library(
name = "grpc_lb_policy_rds",
srcs = [
"src/core/ext/filters/client_channel/lb_policy/xds/rds.cc",
],
language = "c++",
deps = [
"grpc_base",
"grpc_client_channel",
],
)
grpc_cc_library(
name = "grpc_lb_policy_rds_secure",
srcs = [
"src/core/ext/filters/client_channel/lb_policy/xds/rds.cc",
],
language = "c++",
deps = [
"grpc_base",
"grpc_client_channel",
],
)
grpc_cc_library( grpc_cc_library(
name = "grpc_lb_subchannel_list", name = "grpc_lb_subchannel_list",
hdrs = [ hdrs = [

@ -25,9 +25,9 @@
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include "src/core/ext/filters/client_channel/lb_policy.h" #include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
@ -39,39 +39,38 @@
namespace grpc_core { namespace grpc_core {
TraceFlag grpc_lb_weighted_target_trace(false, "weighted_target_lb"); TraceFlag grpc_rds_lb_trace(false, "rds_lb");
namespace { namespace {
constexpr char kWeightedTarget[] = "weighted_target_experimental"; constexpr char kRds[] = "xds_routing_experimental";
// Config for weighted_target LB policy. // Config for rds LB policy.
class WeightedTargetLbConfig : public LoadBalancingPolicy::Config { class RdsLbConfig : public LoadBalancingPolicy::Config {
public: public:
struct ChildConfig { struct ChildConfig {
uint32_t weight;
RefCountedPtr<LoadBalancingPolicy::Config> config; RefCountedPtr<LoadBalancingPolicy::Config> config;
}; };
using TargetMap = std::map<std::string, ChildConfig>; using ActionMap = std::map<std::string, ChildConfig>;
explicit WeightedTargetLbConfig(TargetMap target_map) explicit RdsLbConfig(ActionMap action_map)
: target_map_(std::move(target_map)) {} : action_map_(std::move(action_map)) {}
const char* name() const override { return kWeightedTarget; } const char* name() const override { return kRds; }
const TargetMap& target_map() const { return target_map_; } const ActionMap& action_map() const { return action_map_; }
private: private:
TargetMap target_map_; ActionMap action_map_;
}; };
// weighted_target LB policy. // rds LB policy.
class WeightedTargetLb : public LoadBalancingPolicy { class RdsLb : public LoadBalancingPolicy {
public: public:
explicit WeightedTargetLb(Args args); explicit RdsLb(Args args);
const char* name() const override { return kWeightedTarget; } const char* name() const override { return kRds; }
void UpdateLocked(UpdateArgs args) override; void UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override; void ExitIdleLocked() override;
@ -83,53 +82,48 @@ class WeightedTargetLb : public LoadBalancingPolicy {
public: public:
explicit ChildPickerWrapper(std::unique_ptr<SubchannelPicker> picker) explicit ChildPickerWrapper(std::unique_ptr<SubchannelPicker> picker)
: picker_(std::move(picker)) {} : picker_(std::move(picker)) {}
PickResult Pick(PickArgs args) { PickResult Pick(PickArgs args) { return picker_->Pick(std::move(args)); }
return picker_->Pick(std::move(args));
}
private: private:
std::unique_ptr<SubchannelPicker> picker_; std::unique_ptr<SubchannelPicker> picker_;
}; };
// Picks a child using stateless WRR and then delegates to that // Picks a child using stateless WRR and then delegates to that
// child's picker. // child's picker.
class WeightedPicker : public SubchannelPicker { class RdsPicker : public SubchannelPicker {
public: public:
// Maintains a weighted list of pickers from each child that is in // Maintains a rds list of pickers from each child that is in
// ready state. The first element in the pair represents the end of a // ready state. The first element in the pair represents the end of a
// range proportional to the child's weight. The start of the range // range proportional to the child's weight. The start of the range
// is the previous value in the vector and is 0 for the first element. // is the previous value in the vector and is 0 for the first element.
using PickerList = using PickerList = InlinedVector<RefCountedPtr<ChildPickerWrapper>, 1>;
InlinedVector<std::pair<uint32_t,
RefCountedPtr<ChildPickerWrapper>>, 1>;
WeightedPicker(RefCountedPtr<WeightedTargetLb> parent, PickerList pickers) RdsPicker(RefCountedPtr<RdsLb> parent, PickerList pickers)
: parent_(std::move(parent)), pickers_(std::move(pickers)) {} : parent_(std::move(parent)), pickers_(std::move(pickers)) {}
~WeightedPicker() { parent_.reset(DEBUG_LOCATION, "WeightedPicker"); } ~RdsPicker() { parent_.reset(DEBUG_LOCATION, "RdsPicker"); }
PickResult Pick(PickArgs args) override; PickResult Pick(PickArgs args) override;
private: private:
RefCountedPtr<WeightedTargetLb> parent_; RefCountedPtr<RdsLb> parent_;
PickerList pickers_; PickerList pickers_;
}; };
// Each WeightedChild holds a ref to its parent WeightedTargetLb. // Each RdsChild holds a ref to its parent RdsLb.
class WeightedChild : public InternallyRefCounted<WeightedChild> { class RdsChild : public InternallyRefCounted<RdsChild> {
public: public:
WeightedChild(RefCountedPtr<WeightedTargetLb> weighted_target_policy, RdsChild(RefCountedPtr<RdsLb> rds_policy, const std::string& name);
const std::string& name); ~RdsChild();
~WeightedChild();
void Orphan() override; void Orphan() override;
void UpdateLocked(const WeightedTargetLbConfig::ChildConfig& config, void UpdateLocked(const RdsLbConfig::ChildConfig& config,
const ServerAddressList& addresses, const ServerAddressList& addresses,
const grpc_channel_args* args); const grpc_channel_args* args);
void ExitIdleLocked(); void ExitIdleLocked();
void ResetBackoffLocked(); void ResetBackoffLocked();
void DeactivateLocked(); void DeactivateLocked();
uint32_t weight() const { return weight_; }
grpc_connectivity_state connectivity_state() const { grpc_connectivity_state connectivity_state() const {
return connectivity_state_; return connectivity_state_;
} }
@ -140,10 +134,10 @@ class WeightedTargetLb : public LoadBalancingPolicy {
private: private:
class Helper : public ChannelControlHelper { class Helper : public ChannelControlHelper {
public: public:
explicit Helper(RefCountedPtr<WeightedChild> weighted_child) explicit Helper(RefCountedPtr<RdsChild> rds_child)
: weighted_child_(std::move(weighted_child)) {} : rds_child_(std::move(rds_child)) {}
~Helper() { weighted_child_.reset(DEBUG_LOCATION, "Helper"); } ~Helper() { rds_child_.reset(DEBUG_LOCATION, "Helper"); }
RefCountedPtr<SubchannelInterface> CreateSubchannel( RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override; const grpc_channel_args& args) override;
@ -153,7 +147,7 @@ class WeightedTargetLb : public LoadBalancingPolicy {
void AddTraceEvent(TraceSeverity severity, StringView message) override; void AddTraceEvent(TraceSeverity severity, StringView message) override;
private: private:
RefCountedPtr<WeightedChild> weighted_child_; RefCountedPtr<RdsChild> rds_child_;
}; };
// Methods for dealing with the child policy. // Methods for dealing with the child policy.
@ -164,13 +158,11 @@ class WeightedTargetLb : public LoadBalancingPolicy {
static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error);
// The owning LB policy. // The owning LB policy.
RefCountedPtr<WeightedTargetLb> weighted_target_policy_; RefCountedPtr<RdsLb> rds_policy_;
// Points to the corresponding key in WeightedTargetLb::targets_. // Points to the corresponding key in RdsLb::actions_.
const std::string& name_; const std::string& name_;
uint32_t weight_;
OrphanablePtr<LoadBalancingPolicy> child_policy_; OrphanablePtr<LoadBalancingPolicy> child_policy_;
RefCountedPtr<ChildPickerWrapper> picker_wrapper_; RefCountedPtr<ChildPickerWrapper> picker_wrapper_;
@ -184,7 +176,7 @@ class WeightedTargetLb : public LoadBalancingPolicy {
bool shutdown_ = false; bool shutdown_ = false;
}; };
~WeightedTargetLb(); ~RdsLb();
void ShutdownLocked() override; void ShutdownLocked() override;
@ -193,138 +185,116 @@ class WeightedTargetLb : public LoadBalancingPolicy {
const grpc_millis child_retention_interval_ms_; const grpc_millis child_retention_interval_ms_;
// Current config from the resolver. // Current config from the resolver.
RefCountedPtr<WeightedTargetLbConfig> config_; RefCountedPtr<RdsLbConfig> config_;
// Internal state. // Internal state.
bool shutting_down_ = false; bool shutting_down_ = false;
// Children. // Children.
std::map<std::string, OrphanablePtr<WeightedChild>> targets_; std::map<std::string, OrphanablePtr<RdsChild>> actions_;
}; };
// //
// WeightedTargetLb::WeightedPicker // RdsLb::RdsPicker
// //
WeightedTargetLb::PickResult WeightedTargetLb::WeightedPicker::Pick( RdsLb::PickResult RdsLb::RdsPicker::Pick(PickArgs args) {
PickArgs args) { gpr_log(
// Generate a random number in [0, total weight). GPR_INFO,
const uint32_t key = rand() % pickers_[pickers_.size() - 1].first; "donna Picking not implemented yet, just always use the one and only");
// Find the index in pickers_ corresponding to key. return pickers_[0]->Pick(args);
size_t mid = 0;
size_t start_index = 0;
size_t end_index = pickers_.size() - 1;
size_t index = 0;
while (end_index > start_index) {
mid = (start_index + end_index) / 2;
if (pickers_[mid].first > key) {
end_index = mid;
} else if (pickers_[mid].first < key) {
start_index = mid + 1;
} else {
index = mid + 1;
break;
}
}
if (index == 0) index = start_index;
GPR_ASSERT(pickers_[index].first > key);
// Delegate to the child picker.
return pickers_[index].second->Pick(args);
} }
// //
// WeightedTargetLb // RdsLb
// //
WeightedTargetLb::WeightedTargetLb(Args args) RdsLb::RdsLb(Args args)
: LoadBalancingPolicy(std::move(args)), : LoadBalancingPolicy(std::move(args)),
// FIXME: new channel arg // FIXME: new channel arg
child_retention_interval_ms_(grpc_channel_args_find_integer( child_retention_interval_ms_(grpc_channel_args_find_integer(
args.args, GRPC_ARG_LOCALITY_RETENTION_INTERVAL_MS, args.args, GRPC_ARG_LOCALITY_RETENTION_INTERVAL_MS,
{GRPC_WEIGHTED_TARGET_CHILD_RETENTION_INTERVAL_MS, 0, INT_MAX})) {} {GRPC_WEIGHTED_TARGET_CHILD_RETENTION_INTERVAL_MS, 0, INT_MAX})) {}
WeightedTargetLb::~WeightedTargetLb() { RdsLb::~RdsLb() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) {
gpr_log(GPR_INFO, "[weighted_target_lb %p] destroying weighted_target LB policy", gpr_log(GPR_INFO, "[rds_lb %p] destroying rds LB policy", this);
this);
} }
} }
void WeightedTargetLb::ShutdownLocked() { void RdsLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) {
gpr_log(GPR_INFO, "[weighted_target_lb %p] shutting down", this); gpr_log(GPR_INFO, "[rds_lb %p] shutting down", this);
} }
shutting_down_ = true; shutting_down_ = true;
targets_.clear(); actions_.clear();
} }
void WeightedTargetLb::ExitIdleLocked() { void RdsLb::ExitIdleLocked() {
for (auto& p : targets_) p.second->ExitIdleLocked(); for (auto& p : actions_) p.second->ExitIdleLocked();
} }
void WeightedTargetLb::ResetBackoffLocked() { void RdsLb::ResetBackoffLocked() {
for (auto& p : targets_) p.second->ResetBackoffLocked(); for (auto& p : actions_) p.second->ResetBackoffLocked();
} }
void WeightedTargetLb::UpdateLocked(UpdateArgs args) { void RdsLb::UpdateLocked(UpdateArgs args) {
if (shutting_down_) return; if (shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) {
gpr_log(GPR_INFO, "[weighted_target_lb %p] Received update", this); gpr_log(GPR_INFO, "[rds_lb %p] Received update", this);
} }
// Update config. // Update config.
config_ = std::move(args.config); config_ = std::move(args.config);
// Deactivate the targets not in the new config. // Deactivate the actions not in the new config.
for (auto it = targets_.begin(); it != targets_.end();) { for (auto it = actions_.begin(); it != actions_.end();) {
const std::string& name = it->first; const std::string& name = it->first;
WeightedChild* child = it->second.get(); RdsChild* child = it->second.get();
if (config_->target_map().find(name) != config_->target_map().end()) { if (config_->action_map().find(name) != config_->action_map().end()) {
++it; ++it;
continue; continue;
} }
if (child_retention_interval_ms_ == 0) { if (child_retention_interval_ms_ == 0) {
it = targets_.erase(it); it = actions_.erase(it);
} else { } else {
child->DeactivateLocked(); child->DeactivateLocked();
++it; ++it;
} }
} }
// Add or update the targets in the new config. // Add or update the actions in the new config.
for (const auto& p : config_->target_map()) { for (const auto& p : config_->action_map()) {
const std::string& name = p.first; const std::string& name = p.first;
const WeightedTargetLbConfig::ChildConfig& config = p.second; const RdsLbConfig::ChildConfig& config = p.second;
auto it = targets_.find(name); auto it = actions_.find(name);
if (it == targets_.end()) { if (it == actions_.end()) {
it = targets_.emplace(std::make_pair(name, nullptr)).first; it = actions_.emplace(std::make_pair(name, nullptr)).first;
it->second = MakeOrphanable<WeightedChild>( it->second =
Ref(DEBUG_LOCATION, "WeightedChild"), it->first); MakeOrphanable<RdsChild>(Ref(DEBUG_LOCATION, "RdsChild"), it->first);
} }
it->second->UpdateLocked(config, args.addresses, args.args); it->second->UpdateLocked(config, args.addresses, args.args);
} }
} }
void WeightedTargetLb::UpdateStateLocked() { void RdsLb::UpdateStateLocked() {
// Construct a new picker which maintains a map of all child pickers // Construct a new picker which maintains a map of all child pickers
// that are ready. Each child is represented by a portion of the range // that are ready. Each child is represented by a portion of the range
// proportional to its weight, such that the total range is the sum of the // proportional to its weight, such that the total range is the sum of the
// weights of all children. // weights of all children.
WeightedPicker::PickerList picker_list; RdsPicker::PickerList picker_list;
uint32_t end = 0;
// Also count the number of children in each state, to determine the // Also count the number of children in each state, to determine the
// overall state. // overall state.
size_t num_connecting = 0; size_t num_connecting = 0;
size_t num_idle = 0; size_t num_idle = 0;
size_t num_transient_failures = 0; size_t num_transient_failures = 0;
for (const auto& p : targets_) { for (const auto& p : actions_) {
const auto& child_name = p.first; const auto& child_name = p.first;
const WeightedChild* child = p.second.get(); const RdsChild* child = p.second.get();
// Skip the targets that are not in the latest update. // Skip the actions that are not in the latest update.
if (config_->target_map().find(child_name) == config_->target_map().end()) { if (config_->action_map().find(child_name) == config_->action_map().end()) {
continue; continue;
} }
switch (child->connectivity_state()) { switch (child->connectivity_state()) {
case GRPC_CHANNEL_READY: { case GRPC_CHANNEL_READY: {
end += child->weight(); picker_list.push_back(child->picker_wrapper());
picker_list.push_back(std::make_pair(end, child->picker_wrapper()));
break; break;
} }
case GRPC_CHANNEL_CONNECTING: { case GRPC_CHANNEL_CONNECTING: {
@ -354,62 +324,59 @@ void WeightedTargetLb::UpdateStateLocked() {
} else { } else {
connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE; connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
} }
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) {
gpr_log(GPR_INFO, "[weighted_target_lb %p] connectivity changed to %s", gpr_log(GPR_INFO, "[rds_lb %p] connectivity changed to %s", this,
this, ConnectivityStateName(connectivity_state)); ConnectivityStateName(connectivity_state));
} }
std::unique_ptr<SubchannelPicker> picker; std::unique_ptr<SubchannelPicker> picker;
switch (connectivity_state) { switch (connectivity_state) {
case GRPC_CHANNEL_READY: case GRPC_CHANNEL_READY:
picker = absl::make_unique<WeightedPicker>( picker = absl::make_unique<RdsPicker>(Ref(DEBUG_LOCATION, "RdsPicker"),
Ref(DEBUG_LOCATION, "WeightedPicker"), std::move(picker_list)); std::move(picker_list));
break; break;
case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_IDLE:
picker = absl::make_unique<QueuePicker>( picker =
Ref(DEBUG_LOCATION, "QueuePicker")); absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
break; break;
default: default:
picker = absl::make_unique<TransientFailurePicker>( picker = absl::make_unique<TransientFailurePicker>(
GRPC_ERROR_CREATE_FROM_STATIC_STRING( GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"weighted_target: all children report state TRANSIENT_FAILURE")); "rds: all children report state TRANSIENT_FAILURE"));
} }
channel_control_helper()->UpdateState(connectivity_state, std::move(picker)); channel_control_helper()->UpdateState(connectivity_state, std::move(picker));
} }
// //
// WeightedTargetLb::WeightedChild // RdsLb::RdsChild
// //
WeightedTargetLb::WeightedChild::WeightedChild( RdsLb::RdsChild::RdsChild(RefCountedPtr<RdsLb> rds_policy,
RefCountedPtr<WeightedTargetLb> weighted_target_policy, const std::string& name)
const std::string& name) : rds_policy_(std::move(rds_policy)), name_(name) {
: weighted_target_policy_(std::move(weighted_target_policy)), name_(name) { if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { gpr_log(GPR_INFO, "[rds_lb %p] created RdsChild %p for %s",
gpr_log(GPR_INFO, "[weighted_target_lb %p] created WeightedChild %p for %s", rds_policy_.get(), this, name_.c_str());
weighted_target_policy_.get(), this, name_.c_str());
} }
} }
WeightedTargetLb::WeightedChild::~WeightedChild() { RdsLb::RdsChild::~RdsChild() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO, "[rds_lb %p] RdsChild %p %s: destroying child",
"[weighted_target_lb %p] WeightedChild %p %s: destroying child", rds_policy_.get(), this, name_.c_str());
weighted_target_policy_.get(), this, name_.c_str());
} }
weighted_target_policy_.reset(DEBUG_LOCATION, "WeightedChild"); rds_policy_.reset(DEBUG_LOCATION, "RdsChild");
} }
void WeightedTargetLb::WeightedChild::Orphan() { void RdsLb::RdsChild::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO, "[rds_lb %p] RdsChild %p %s: shutting down child",
"[weighted_target_lb %p] WeightedChild %p %s: shutting down child", rds_policy_.get(), this, name_.c_str());
weighted_target_policy_.get(), this, name_.c_str());
} }
// Remove the child policy's interested_parties pollset_set from the // Remove the child policy's interested_parties pollset_set from the
// xDS policy. // xDS policy.
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
weighted_target_policy_->interested_parties()); rds_policy_->interested_parties());
child_policy_.reset(); child_policy_.reset();
// Drop our ref to the child's picker, in case it's holding a ref to // Drop our ref to the child's picker, in case it's holding a ref to
// the child. // the child.
@ -421,39 +388,35 @@ void WeightedTargetLb::WeightedChild::Orphan() {
Unref(); Unref();
} }
OrphanablePtr<LoadBalancingPolicy> OrphanablePtr<LoadBalancingPolicy> RdsLb::RdsChild::CreateChildPolicyLocked(
WeightedTargetLb::WeightedChild::CreateChildPolicyLocked(
const grpc_channel_args* args) { const grpc_channel_args* args) {
LoadBalancingPolicy::Args lb_policy_args; LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = weighted_target_policy_->combiner(); lb_policy_args.combiner = rds_policy_->combiner();
lb_policy_args.args = args; lb_policy_args.args = args;
lb_policy_args.channel_control_helper = lb_policy_args.channel_control_helper =
absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper")); absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
OrphanablePtr<LoadBalancingPolicy> lb_policy = OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_lb_weighted_target_trace); &grpc_rds_lb_trace);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[weighted_target_lb %p] WeightedChild %p %s: Created new child " "[rds_lb %p] RdsChild %p %s: Created new child "
"policy handler %p", "policy handler %p",
weighted_target_policy_.get(), this, name_.c_str(), rds_policy_.get(), this, name_.c_str(), lb_policy.get());
lb_policy.get());
} }
// Add the xDS's interested_parties pollset_set to that of the newly created // Add the xDS's interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on // child policy. This will make the child policy progress upon activity on
// xDS LB, which in turn is tied to the application's call. // xDS LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set( grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
lb_policy->interested_parties(), rds_policy_->interested_parties());
weighted_target_policy_->interested_parties());
return lb_policy; return lb_policy;
} }
void WeightedTargetLb::WeightedChild::UpdateLocked( void RdsLb::RdsChild::UpdateLocked(const RdsLbConfig::ChildConfig& config,
const WeightedTargetLbConfig::ChildConfig& config, const ServerAddressList& addresses,
const ServerAddressList& addresses, const grpc_channel_args* args) { const grpc_channel_args* args) {
if (weighted_target_policy_->shutting_down_) return; if (rds_policy_->shutting_down_) return;
// Update child weight. // Update child weight.
weight_ = config.weight;
// Reactivate if needed. // Reactivate if needed.
if (delayed_removal_timer_callback_pending_) { if (delayed_removal_timer_callback_pending_) {
grpc_timer_cancel(&delayed_removal_timer_); grpc_timer_cancel(&delayed_removal_timer_);
@ -468,212 +431,193 @@ void WeightedTargetLb::WeightedChild::UpdateLocked(
update_args.addresses = addresses; update_args.addresses = addresses;
update_args.args = grpc_channel_args_copy(args); update_args.args = grpc_channel_args_copy(args);
// Update the policy. // Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[weighted_target_lb %p] WeightedChild %p %s: Updating child " "[rds_lb %p] RdsChild %p %s: Updating child "
"policy handler %p", "policy handler %p",
weighted_target_policy_.get(), this, name_.c_str(), rds_policy_.get(), this, name_.c_str(), child_policy_.get());
child_policy_.get());
} }
child_policy_->UpdateLocked(std::move(update_args)); child_policy_->UpdateLocked(std::move(update_args));
} }
void WeightedTargetLb::WeightedChild::ExitIdleLocked() { void RdsLb::RdsChild::ExitIdleLocked() { child_policy_->ExitIdleLocked(); }
child_policy_->ExitIdleLocked();
}
void WeightedTargetLb::WeightedChild::ResetBackoffLocked() { void RdsLb::RdsChild::ResetBackoffLocked() {
child_policy_->ResetBackoffLocked(); child_policy_->ResetBackoffLocked();
} }
void WeightedTargetLb::WeightedChild::DeactivateLocked() { void RdsLb::RdsChild::DeactivateLocked() {
// If already deactivated, don't do that again. // If already deactivated, don't do that again.
if (weight_ == 0) return;
// Set the child weight to 0 so that future picker won't contain this child. // Set the child weight to 0 so that future picker won't contain this child.
weight_ = 0;
// Start a timer to delete the child. // Start a timer to delete the child.
Ref(DEBUG_LOCATION, "WeightedChild+timer").release(); Ref(DEBUG_LOCATION, "RdsChild+timer").release();
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this, GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
grpc_timer_init( grpc_timer_init(
&delayed_removal_timer_, &delayed_removal_timer_,
ExecCtx::Get()->Now() + ExecCtx::Get()->Now() + rds_policy_->child_retention_interval_ms_,
weighted_target_policy_->child_retention_interval_ms_,
&on_delayed_removal_timer_); &on_delayed_removal_timer_);
delayed_removal_timer_callback_pending_ = true; delayed_removal_timer_callback_pending_ = true;
} }
void WeightedTargetLb::WeightedChild::OnDelayedRemovalTimer(void* arg, void RdsLb::RdsChild::OnDelayedRemovalTimer(void* arg, grpc_error* error) {
grpc_error* error) { RdsChild* self = static_cast<RdsChild*>(arg);
WeightedChild* self = static_cast<WeightedChild*>(arg); self->rds_policy_->combiner()->Run(
self->weighted_target_policy_->combiner()->Run(
GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_, GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_,
OnDelayedRemovalTimerLocked, self, nullptr), OnDelayedRemovalTimerLocked, self, nullptr),
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
} }
void WeightedTargetLb::WeightedChild::OnDelayedRemovalTimerLocked( void RdsLb::RdsChild::OnDelayedRemovalTimerLocked(void* arg,
void* arg, grpc_error* error) { grpc_error* error) {
WeightedChild* self = static_cast<WeightedChild*>(arg); RdsChild* self = static_cast<RdsChild*>(arg);
self->delayed_removal_timer_callback_pending_ = false; self->delayed_removal_timer_callback_pending_ = false;
if (error == GRPC_ERROR_NONE && !self->shutdown_ && self->weight_ == 0) { if (error == GRPC_ERROR_NONE && !self->shutdown_) {
self->weighted_target_policy_->targets_.erase(self->name_); self->rds_policy_->actions_.erase(self->name_);
} }
self->Unref(DEBUG_LOCATION, "WeightedChild+timer"); self->Unref(DEBUG_LOCATION, "RdsChild+timer");
} }
// //
// WeightedTargetLb::WeightedChild::Helper // RdsLb::RdsChild::Helper
// //
RefCountedPtr<SubchannelInterface> RefCountedPtr<SubchannelInterface> RdsLb::RdsChild::Helper::CreateSubchannel(
WeightedTargetLb::WeightedChild::Helper::CreateSubchannel(
const grpc_channel_args& args) { const grpc_channel_args& args) {
if (weighted_child_->weighted_target_policy_->shutting_down_) return nullptr; if (rds_child_->rds_policy_->shutting_down_) return nullptr;
return weighted_child_->weighted_target_policy_->channel_control_helper()->CreateSubchannel( return rds_child_->rds_policy_->channel_control_helper()->CreateSubchannel(
args); args);
} }
void WeightedTargetLb::WeightedChild::Helper::UpdateState( void RdsLb::RdsChild::Helper::UpdateState(
grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) { grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) {
if (weighted_child_->weighted_target_policy_->shutting_down_) return; if (rds_child_->rds_policy_->shutting_down_) return;
// Cache the picker in the WeightedChild. // Cache the picker in the RdsChild.
weighted_child_->picker_wrapper_ = rds_child_->picker_wrapper_ =
MakeRefCounted<ChildPickerWrapper>(std::move(picker)); MakeRefCounted<ChildPickerWrapper>(std::move(picker));
// Decide what state to report for aggregation purposes. // Decide what state to report for aggregation purposes.
// If we haven't seen a failure since the last time we were in state // If we haven't seen a failure since the last time we were in state
// READY, then we report the state change as-is. However, once we do see // READY, then we report the state change as-is. However, once we do see
// a failure, we report TRANSIENT_FAILURE and ignore any subsequent state // a failure, we report TRANSIENT_FAILURE and ignore any subsequent state
// changes until we go back into state READY. // changes until we go back into state READY.
if (!weighted_child_->seen_failure_since_ready_) { if (!rds_child_->seen_failure_since_ready_) {
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
weighted_child_->seen_failure_since_ready_ = true; rds_child_->seen_failure_since_ready_ = true;
} }
} else { } else {
if (state != GRPC_CHANNEL_READY) return; if (state != GRPC_CHANNEL_READY) return;
weighted_child_->seen_failure_since_ready_ = false; rds_child_->seen_failure_since_ready_ = false;
} }
weighted_child_->connectivity_state_ = state; rds_child_->connectivity_state_ = state;
// Notify the LB policy. // Notify the LB policy.
weighted_child_->weighted_target_policy_->UpdateStateLocked(); rds_child_->rds_policy_->UpdateStateLocked();
} }
void WeightedTargetLb::WeightedChild::Helper::RequestReresolution() { void RdsLb::RdsChild::Helper::RequestReresolution() {
if (weighted_child_->weighted_target_policy_->shutting_down_) return; if (rds_child_->rds_policy_->shutting_down_) return;
weighted_child_->weighted_target_policy_->channel_control_helper()->RequestReresolution(); rds_child_->rds_policy_->channel_control_helper()->RequestReresolution();
} }
void WeightedTargetLb::WeightedChild::Helper::AddTraceEvent(TraceSeverity severity, void RdsLb::RdsChild::Helper::AddTraceEvent(TraceSeverity severity,
StringView message) { StringView message) {
if (weighted_child_->weighted_target_policy_->shutting_down_) return; if (rds_child_->rds_policy_->shutting_down_) return;
weighted_child_->weighted_target_policy_->channel_control_helper()->AddTraceEvent( rds_child_->rds_policy_->channel_control_helper()->AddTraceEvent(severity,
severity, message); message);
} }
// //
// factory // factory
// //
class WeightedTargetLbFactory : public LoadBalancingPolicyFactory { class RdsLbFactory : public LoadBalancingPolicyFactory {
public: public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override { LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<WeightedTargetLb>(std::move(args)); return MakeOrphanable<RdsLb>(std::move(args));
} }
const char* name() const override { return kWeightedTarget; } const char* name() const override { return kRds; }
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig( RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const Json& json, grpc_error** error) const override { const Json& json, grpc_error** error) const override {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
if (json.type() == Json::Type::JSON_NULL) { if (json.type() == Json::Type::JSON_NULL) {
// weighted_target was mentioned as a policy in the deprecated // rds was mentioned as a policy in the deprecated
// loadBalancingPolicy field or in the client API. // loadBalancingPolicy field or in the client API.
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:loadBalancingPolicy error:weighted_target policy requires " "field:loadBalancingPolicy error:rds policy requires "
"configuration. Please use loadBalancingConfig field of service " "configuration. Please use loadBalancingConfig field of service "
"config instead."); "config instead.");
return nullptr; return nullptr;
} }
std::vector<grpc_error*> error_list; std::vector<grpc_error*> error_list;
// Weight map. // Weight map.
WeightedTargetLbConfig::TargetMap target_map; RdsLbConfig::ActionMap action_map;
auto it = json.object_value().find("targets"); auto it = json.object_value().find("actions");
if (it == json.object_value().end()) { if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:targets error:required field not present")); "field:actions error:required field not present"));
} else if (it->second.type() != Json::Type::OBJECT) { } else if (it->second.type() != Json::Type::ARRAY) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:targets error:type should be object")); "field:actions error:type should be array"));
} else { } else {
for (const auto& p : it->second.object_value()) { for (const auto& p : it->second.array_value()) {
WeightedTargetLbConfig::ChildConfig child_config; auto it_name = p.object_value().find("name");
if (it_name == p.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:actions error: each action needs a name"));
}
auto it_child_policy = p.object_value().find("child_policy");
if (it_child_policy == p.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:actions error: each action needs child policies"));
}
RdsLbConfig::ChildConfig child_config;
std::vector<grpc_error*> child_errors = std::vector<grpc_error*> child_errors =
ParseChildConfig(p.second, &child_config); ParseChildConfig(it_child_policy->second, &child_config);
if (!child_errors.empty()) { if (!child_errors.empty()) {
// Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error // Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
// string is not static in this case. // string is not static in this case.
grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING( grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("field:targets key:", p.first).c_str()); absl::StrCat("field:actions name:",
it_name->second.string_value())
.c_str());
for (grpc_error* child_error : child_errors) { for (grpc_error* child_error : child_errors) {
error = grpc_error_add_child(error, child_error); error = grpc_error_add_child(error, child_error);
} }
error_list.push_back(error); error_list.push_back(error);
} else { } else {
target_map[p.first] = std::move(child_config); action_map[it_name->second.string_value()] = std::move(child_config);
} }
} }
} }
if (!error_list.empty()) { if (!error_list.empty()) {
*error = GRPC_ERROR_CREATE_FROM_VECTOR( *error = GRPC_ERROR_CREATE_FROM_VECTOR(
"weighted_target_experimental LB policy config", &error_list); "rds_experimental LB policy config", &error_list);
return nullptr; return nullptr;
} }
return MakeRefCounted<WeightedTargetLbConfig>(std::move(target_map)); return MakeRefCounted<RdsLbConfig>(std::move(action_map));
} }
private: private:
static std::vector<grpc_error*> ParseChildConfig( static std::vector<grpc_error*> ParseChildConfig(
const Json& json, WeightedTargetLbConfig::ChildConfig* child_config) { const Json& json, RdsLbConfig::ChildConfig* child_config) {
std::vector<grpc_error*> error_list; std::vector<grpc_error*> error_list;
if (json.type() != Json::Type::OBJECT) { if (json.type() != Json::Type::ARRAY) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"value should be of type object")); "value should be of type array"));
return error_list; return error_list;
} }
// Weight. grpc_error* parse_error = GRPC_ERROR_NONE;
auto it = json.object_value().find("weight"); child_config->config =
if (it == json.object_value().end()) { LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( json.array_value(), &parse_error);
"required field \"weight\" not specified")); if (child_config->config == nullptr) {
} else if (it->second.type() != Json::Type::NUMBER) { GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( std::vector<grpc_error*> child_errors;
"field:weight error:must be of type number")); child_errors.push_back(parse_error);
} else { error_list.push_back(
child_config->weight = GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
gpr_parse_nonnegative_int(it->second.string_value().c_str());
if (child_config->weight == -1) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:weight error:unparseable value"));
} else if (child_config->weight == 0) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:weight error:value must be greater than zero"));
}
}
// Child policy.
it = json.object_value().find("childPolicy");
if (it != json.object_value().end()) {
grpc_error* parse_error = GRPC_ERROR_NONE;
child_config->config =
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(it->second,
&parse_error);
if (child_config->config == nullptr) {
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
std::vector<grpc_error*> child_errors;
child_errors.push_back(parse_error);
error_list.push_back(
GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
}
} }
return error_list; return error_list;
} }
@ -687,10 +631,10 @@ class WeightedTargetLbFactory : public LoadBalancingPolicyFactory {
// Plugin registration // Plugin registration
// //
void grpc_lb_policy_weighted_target_init() { void grpc_lb_policy_rds_init() {
grpc_core::LoadBalancingPolicyRegistry::Builder:: grpc_core::LoadBalancingPolicyRegistry::Builder::
RegisterLoadBalancingPolicyFactory( RegisterLoadBalancingPolicyFactory(
absl::make_unique<grpc_core::WeightedTargetLbFactory>()); absl::make_unique<grpc_core::RdsLbFactory>());
} }
void grpc_lb_policy_weighted_target_shutdown() {} void grpc_lb_policy_rds_shutdown() {}

@ -1946,8 +1946,15 @@ grpc_error* XdsClient::CreateServiceConfig(
gpr_asprintf(&json, gpr_asprintf(&json,
"{\n" "{\n"
" \"loadBalancingConfig\":[\n" " \"loadBalancingConfig\":[\n"
" { \"cds_experimental\":{\n" " { \"xds_routing_experimental\":{\n"
" \"cluster\": \"%s\"\n" " \"actions\":[\n"
" { \"name\": \"default\",\n"
" \"child_policy\":[\n"
" { \"cds_experimental\":{\n"
" \"cluster\": \"%s\"\n"
" } }\n"
" ]\n"
" } ]\n"
" } }\n" " } }\n"
" ]\n" " ]\n"
"}", "}",

@ -36,6 +36,8 @@ void grpc_lb_policy_grpclb_init(void);
void grpc_lb_policy_grpclb_shutdown(void); void grpc_lb_policy_grpclb_shutdown(void);
void grpc_lb_policy_cds_init(void); void grpc_lb_policy_cds_init(void);
void grpc_lb_policy_cds_shutdown(void); void grpc_lb_policy_cds_shutdown(void);
void grpc_lb_policy_rds_init(void);
void grpc_lb_policy_rds_shutdown(void);
void grpc_lb_policy_xds_init(void); void grpc_lb_policy_xds_init(void);
void grpc_lb_policy_xds_shutdown(void); void grpc_lb_policy_xds_shutdown(void);
void grpc_lb_policy_pick_first_init(void); void grpc_lb_policy_pick_first_init(void);
@ -78,6 +80,8 @@ void grpc_register_built_in_plugins(void) {
grpc_lb_policy_grpclb_shutdown); grpc_lb_policy_grpclb_shutdown);
grpc_register_plugin(grpc_lb_policy_cds_init, grpc_register_plugin(grpc_lb_policy_cds_init,
grpc_lb_policy_cds_shutdown); grpc_lb_policy_cds_shutdown);
grpc_register_plugin(grpc_lb_policy_rds_init,
grpc_lb_policy_rds_shutdown);
grpc_register_plugin(grpc_lb_policy_xds_init, grpc_register_plugin(grpc_lb_policy_xds_init,
grpc_lb_policy_xds_shutdown); grpc_lb_policy_xds_shutdown);
grpc_register_plugin(grpc_lb_policy_pick_first_init, grpc_register_plugin(grpc_lb_policy_pick_first_init,

@ -44,6 +44,8 @@ void grpc_lb_policy_grpclb_init(void);
void grpc_lb_policy_grpclb_shutdown(void); void grpc_lb_policy_grpclb_shutdown(void);
void grpc_lb_policy_cds_init(void); void grpc_lb_policy_cds_init(void);
void grpc_lb_policy_cds_shutdown(void); void grpc_lb_policy_cds_shutdown(void);
void grpc_lb_policy_rds_init(void);
void grpc_lb_policy_rds_shutdown(void);
void grpc_lb_policy_xds_init(void); void grpc_lb_policy_xds_init(void);
void grpc_lb_policy_xds_shutdown(void); void grpc_lb_policy_xds_shutdown(void);
void grpc_lb_policy_pick_first_init(void); void grpc_lb_policy_pick_first_init(void);
@ -86,6 +88,8 @@ void grpc_register_built_in_plugins(void) {
grpc_lb_policy_grpclb_shutdown); grpc_lb_policy_grpclb_shutdown);
grpc_register_plugin(grpc_lb_policy_cds_init, grpc_register_plugin(grpc_lb_policy_cds_init,
grpc_lb_policy_cds_shutdown); grpc_lb_policy_cds_shutdown);
grpc_register_plugin(grpc_lb_policy_rds_init,
grpc_lb_policy_rds_shutdown);
grpc_register_plugin(grpc_lb_policy_xds_init, grpc_register_plugin(grpc_lb_policy_xds_init,
grpc_lb_policy_xds_shutdown); grpc_lb_policy_xds_shutdown);
grpc_register_plugin(grpc_lb_policy_pick_first_init, grpc_register_plugin(grpc_lb_policy_pick_first_init,

@ -1546,9 +1546,9 @@ TEST_P(BasicTest, Vanilla) {
backends_[i]->backend_service()->request_count()); backends_[i]->backend_service()->request_count());
} }
// Check LB policy name for the channel. // Check LB policy name for the channel.
EXPECT_EQ( EXPECT_EQ((GetParam().use_xds_resolver() ? "xds_routing_experimental"
(GetParam().use_xds_resolver() ? "cds_experimental" : "xds_experimental"), : "xds_experimental"),
channel_->GetLoadBalancingPolicyName()); channel_->GetLoadBalancingPolicyName());
} }
TEST_P(BasicTest, IgnoresUnhealthyEndpoints) { TEST_P(BasicTest, IgnoresUnhealthyEndpoints) {

Loading…
Cancel
Save