|
|
|
@ -35,18 +35,18 @@ |
|
|
|
|
#include "src/core/lib/iomgr/combiner.h" |
|
|
|
|
#include "src/core/lib/iomgr/timer.h" |
|
|
|
|
|
|
|
|
|
#define GRPC_WEIGHTED_TARGET_CHILD_RETENTION_INTERVAL_MS (15 * 60 * 1000) |
|
|
|
|
#define GRPC_XDS_ROUTING_CHILD_RETENTION_INTERVAL_MS (15 * 60 * 1000) |
|
|
|
|
|
|
|
|
|
namespace grpc_core { |
|
|
|
|
|
|
|
|
|
TraceFlag grpc_rds_lb_trace(false, "rds_lb"); |
|
|
|
|
TraceFlag grpc_xds_routing_lb_trace(false, "xds_routing_lb"); |
|
|
|
|
|
|
|
|
|
namespace { |
|
|
|
|
|
|
|
|
|
constexpr char kRds[] = "xds_routing_experimental"; |
|
|
|
|
constexpr char kXdsRouting[] = "xds_routing_experimental"; |
|
|
|
|
|
|
|
|
|
// Config for rds LB policy.
|
|
|
|
|
class RdsLbConfig : public LoadBalancingPolicy::Config { |
|
|
|
|
// Config for xds_routing LB policy.
|
|
|
|
|
class XdsRoutingLbConfig : public LoadBalancingPolicy::Config { |
|
|
|
|
public: |
|
|
|
|
struct ChildConfig { |
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::Config> config; |
|
|
|
@ -54,10 +54,10 @@ class RdsLbConfig : public LoadBalancingPolicy::Config { |
|
|
|
|
|
|
|
|
|
using ActionMap = std::map<std::string, ChildConfig>; |
|
|
|
|
|
|
|
|
|
explicit RdsLbConfig(ActionMap action_map) |
|
|
|
|
explicit XdsRoutingLbConfig(ActionMap action_map) |
|
|
|
|
: action_map_(std::move(action_map)) {} |
|
|
|
|
|
|
|
|
|
const char* name() const override { return kRds; } |
|
|
|
|
const char* name() const override { return kXdsRouting; } |
|
|
|
|
|
|
|
|
|
const ActionMap& action_map() const { return action_map_; } |
|
|
|
|
|
|
|
|
@ -65,12 +65,12 @@ class RdsLbConfig : public LoadBalancingPolicy::Config { |
|
|
|
|
ActionMap action_map_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
// rds LB policy.
|
|
|
|
|
class RdsLb : public LoadBalancingPolicy { |
|
|
|
|
// xds_routing LB policy.
|
|
|
|
|
class XdsRoutingLb : public LoadBalancingPolicy { |
|
|
|
|
public: |
|
|
|
|
explicit RdsLb(Args args); |
|
|
|
|
explicit XdsRoutingLb(Args args); |
|
|
|
|
|
|
|
|
|
const char* name() const override { return kRds; } |
|
|
|
|
const char* name() const override { return kXdsRouting; } |
|
|
|
|
|
|
|
|
|
void UpdateLocked(UpdateArgs args) override; |
|
|
|
|
void ExitIdleLocked() override; |
|
|
|
@ -82,7 +82,8 @@ class RdsLb : public LoadBalancingPolicy { |
|
|
|
|
public: |
|
|
|
|
explicit ChildPickerWrapper(std::unique_ptr<SubchannelPicker> picker) |
|
|
|
|
: picker_(std::move(picker)) {} |
|
|
|
|
PickResult Pick(PickArgs args) { return picker_->Pick(std::move(args)); } |
|
|
|
|
PickResult Pick(PickArgs args) { |
|
|
|
|
return picker_->Pick(std::move(args)); } |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
std::unique_ptr<SubchannelPicker> picker_; |
|
|
|
@ -90,34 +91,34 @@ class RdsLb : public LoadBalancingPolicy { |
|
|
|
|
|
|
|
|
|
// Picks a child using stateless WRR and then delegates to that
|
|
|
|
|
// child's picker.
|
|
|
|
|
class RdsPicker : public SubchannelPicker { |
|
|
|
|
class XdsRoutingPicker : public SubchannelPicker { |
|
|
|
|
public: |
|
|
|
|
// Maintains a rds list of pickers from each child that is in
|
|
|
|
|
// Maintains a xds_routing list of pickers from each child that is in
|
|
|
|
|
// ready state. The first element in the pair represents the end of a
|
|
|
|
|
// range proportional to the child's weight. The start of the range
|
|
|
|
|
// is the previous value in the vector and is 0 for the first element.
|
|
|
|
|
using PickerList = InlinedVector<RefCountedPtr<ChildPickerWrapper>, 1>; |
|
|
|
|
|
|
|
|
|
RdsPicker(RefCountedPtr<RdsLb> parent, PickerList pickers) |
|
|
|
|
XdsRoutingPicker(RefCountedPtr<XdsRoutingLb> parent, PickerList pickers) |
|
|
|
|
: parent_(std::move(parent)), pickers_(std::move(pickers)) {} |
|
|
|
|
~RdsPicker() { parent_.reset(DEBUG_LOCATION, "RdsPicker"); } |
|
|
|
|
~XdsRoutingPicker() { parent_.reset(DEBUG_LOCATION, "XdsRoutingPicker"); } |
|
|
|
|
|
|
|
|
|
PickResult Pick(PickArgs args) override; |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
RefCountedPtr<RdsLb> parent_; |
|
|
|
|
RefCountedPtr<XdsRoutingLb> parent_; |
|
|
|
|
PickerList pickers_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
// Each RdsChild holds a ref to its parent RdsLb.
|
|
|
|
|
class RdsChild : public InternallyRefCounted<RdsChild> { |
|
|
|
|
// Each XdsRoutingChild holds a ref to its parent XdsRoutingLb.
|
|
|
|
|
class XdsRoutingChild : public InternallyRefCounted<XdsRoutingChild> { |
|
|
|
|
public: |
|
|
|
|
RdsChild(RefCountedPtr<RdsLb> rds_policy, const std::string& name); |
|
|
|
|
~RdsChild(); |
|
|
|
|
XdsRoutingChild(RefCountedPtr<XdsRoutingLb> xds_routing_policy, const std::string& name); |
|
|
|
|
~XdsRoutingChild(); |
|
|
|
|
|
|
|
|
|
void Orphan() override; |
|
|
|
|
|
|
|
|
|
void UpdateLocked(const RdsLbConfig::ChildConfig& config, |
|
|
|
|
void UpdateLocked(const XdsRoutingLbConfig::ChildConfig& config, |
|
|
|
|
const ServerAddressList& addresses, |
|
|
|
|
const grpc_channel_args* args); |
|
|
|
|
void ExitIdleLocked(); |
|
|
|
@ -134,10 +135,10 @@ class RdsLb : public LoadBalancingPolicy { |
|
|
|
|
private: |
|
|
|
|
class Helper : public ChannelControlHelper { |
|
|
|
|
public: |
|
|
|
|
explicit Helper(RefCountedPtr<RdsChild> rds_child) |
|
|
|
|
: rds_child_(std::move(rds_child)) {} |
|
|
|
|
explicit Helper(RefCountedPtr<XdsRoutingChild> xds_routing_child) |
|
|
|
|
: xds_routing_child_(std::move(xds_routing_child)) {} |
|
|
|
|
|
|
|
|
|
~Helper() { rds_child_.reset(DEBUG_LOCATION, "Helper"); } |
|
|
|
|
~Helper() { xds_routing_child_.reset(DEBUG_LOCATION, "Helper"); } |
|
|
|
|
|
|
|
|
|
RefCountedPtr<SubchannelInterface> CreateSubchannel( |
|
|
|
|
const grpc_channel_args& args) override; |
|
|
|
@ -147,7 +148,7 @@ class RdsLb : public LoadBalancingPolicy { |
|
|
|
|
void AddTraceEvent(TraceSeverity severity, StringView message) override; |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
RefCountedPtr<RdsChild> rds_child_; |
|
|
|
|
RefCountedPtr<XdsRoutingChild> xds_routing_child_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
// Methods for dealing with the child policy.
|
|
|
|
@ -158,9 +159,9 @@ class RdsLb : public LoadBalancingPolicy { |
|
|
|
|
static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); |
|
|
|
|
|
|
|
|
|
// The owning LB policy.
|
|
|
|
|
RefCountedPtr<RdsLb> rds_policy_; |
|
|
|
|
RefCountedPtr<XdsRoutingLb> xds_routing_policy_; |
|
|
|
|
|
|
|
|
|
// Points to the corresponding key in RdsLb::actions_.
|
|
|
|
|
// Points to the corresponding key in XdsRoutingLb::actions_.
|
|
|
|
|
const std::string& name_; |
|
|
|
|
|
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> child_policy_; |
|
|
|
@ -176,7 +177,7 @@ class RdsLb : public LoadBalancingPolicy { |
|
|
|
|
bool shutdown_ = false; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
~RdsLb(); |
|
|
|
|
~XdsRoutingLb(); |
|
|
|
|
|
|
|
|
|
void ShutdownLocked() override; |
|
|
|
|
|
|
|
|
@ -185,70 +186,68 @@ class RdsLb : public LoadBalancingPolicy { |
|
|
|
|
const grpc_millis child_retention_interval_ms_; |
|
|
|
|
|
|
|
|
|
// Current config from the resolver.
|
|
|
|
|
RefCountedPtr<RdsLbConfig> config_; |
|
|
|
|
RefCountedPtr<XdsRoutingLbConfig> config_; |
|
|
|
|
|
|
|
|
|
// Internal state.
|
|
|
|
|
bool shutting_down_ = false; |
|
|
|
|
|
|
|
|
|
// Children.
|
|
|
|
|
std::map<std::string, OrphanablePtr<RdsChild>> actions_; |
|
|
|
|
std::map<std::string, OrphanablePtr<XdsRoutingChild>> actions_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// RdsLb::RdsPicker
|
|
|
|
|
// XdsRoutingLb::XdsRoutingPicker
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
RdsLb::PickResult RdsLb::RdsPicker::Pick(PickArgs args) { |
|
|
|
|
gpr_log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"donna Picking not implemented yet, just always use the one and only"); |
|
|
|
|
XdsRoutingLb::PickResult XdsRoutingLb::XdsRoutingPicker::Pick(PickArgs args) { |
|
|
|
|
gpr_log(GPR_INFO, "donna picked first first"); |
|
|
|
|
return pickers_[0]->Pick(args); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// RdsLb
|
|
|
|
|
// XdsRoutingLb
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
RdsLb::RdsLb(Args args) |
|
|
|
|
XdsRoutingLb::XdsRoutingLb(Args args) |
|
|
|
|
: LoadBalancingPolicy(std::move(args)), |
|
|
|
|
// FIXME: new channel arg
|
|
|
|
|
child_retention_interval_ms_(grpc_channel_args_find_integer( |
|
|
|
|
args.args, GRPC_ARG_LOCALITY_RETENTION_INTERVAL_MS, |
|
|
|
|
{GRPC_WEIGHTED_TARGET_CHILD_RETENTION_INTERVAL_MS, 0, INT_MAX})) {} |
|
|
|
|
{GRPC_XDS_ROUTING_CHILD_RETENTION_INTERVAL_MS, 0, INT_MAX})) {} |
|
|
|
|
|
|
|
|
|
RdsLb::~RdsLb() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[rds_lb %p] destroying rds LB policy", this); |
|
|
|
|
XdsRoutingLb::~XdsRoutingLb() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[xds_routing_lb %p] destroying xds_routing LB policy", this); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RdsLb::ShutdownLocked() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[rds_lb %p] shutting down", this); |
|
|
|
|
void XdsRoutingLb::ShutdownLocked() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[xds_routing_lb %p] shutting down", this); |
|
|
|
|
} |
|
|
|
|
shutting_down_ = true; |
|
|
|
|
actions_.clear(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RdsLb::ExitIdleLocked() { |
|
|
|
|
void XdsRoutingLb::ExitIdleLocked() { |
|
|
|
|
for (auto& p : actions_) p.second->ExitIdleLocked(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RdsLb::ResetBackoffLocked() { |
|
|
|
|
void XdsRoutingLb::ResetBackoffLocked() { |
|
|
|
|
for (auto& p : actions_) p.second->ResetBackoffLocked(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RdsLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
void XdsRoutingLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
if (shutting_down_) return; |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[rds_lb %p] Received update", this); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[xds_routing_lb %p] Received update", this); |
|
|
|
|
} |
|
|
|
|
// Update config.
|
|
|
|
|
config_ = std::move(args.config); |
|
|
|
|
// Deactivate the actions not in the new config.
|
|
|
|
|
for (auto it = actions_.begin(); it != actions_.end();) { |
|
|
|
|
const std::string& name = it->first; |
|
|
|
|
RdsChild* child = it->second.get(); |
|
|
|
|
XdsRoutingChild* child = it->second.get(); |
|
|
|
|
if (config_->action_map().find(name) != config_->action_map().end()) { |
|
|
|
|
++it; |
|
|
|
|
continue; |
|
|
|
@ -263,23 +262,23 @@ void RdsLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
// Add or update the actions in the new config.
|
|
|
|
|
for (const auto& p : config_->action_map()) { |
|
|
|
|
const std::string& name = p.first; |
|
|
|
|
const RdsLbConfig::ChildConfig& config = p.second; |
|
|
|
|
const XdsRoutingLbConfig::ChildConfig& config = p.second; |
|
|
|
|
auto it = actions_.find(name); |
|
|
|
|
if (it == actions_.end()) { |
|
|
|
|
it = actions_.emplace(std::make_pair(name, nullptr)).first; |
|
|
|
|
it->second = |
|
|
|
|
MakeOrphanable<RdsChild>(Ref(DEBUG_LOCATION, "RdsChild"), it->first); |
|
|
|
|
MakeOrphanable<XdsRoutingChild>(Ref(DEBUG_LOCATION, "XdsRoutingChild"), it->first); |
|
|
|
|
} |
|
|
|
|
it->second->UpdateLocked(config, args.addresses, args.args); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RdsLb::UpdateStateLocked() { |
|
|
|
|
void XdsRoutingLb::UpdateStateLocked() { |
|
|
|
|
// Construct a new picker which maintains a map of all child pickers
|
|
|
|
|
// that are ready. Each child is represented by a portion of the range
|
|
|
|
|
// proportional to its weight, such that the total range is the sum of the
|
|
|
|
|
// weights of all children.
|
|
|
|
|
RdsPicker::PickerList picker_list; |
|
|
|
|
XdsRoutingPicker::PickerList picker_list; |
|
|
|
|
// Also count the number of children in each state, to determine the
|
|
|
|
|
// overall state.
|
|
|
|
|
size_t num_connecting = 0; |
|
|
|
@ -287,7 +286,7 @@ void RdsLb::UpdateStateLocked() { |
|
|
|
|
size_t num_transient_failures = 0; |
|
|
|
|
for (const auto& p : actions_) { |
|
|
|
|
const auto& child_name = p.first; |
|
|
|
|
const RdsChild* child = p.second.get(); |
|
|
|
|
const XdsRoutingChild* child = p.second.get(); |
|
|
|
|
// Skip the actions that are not in the latest update.
|
|
|
|
|
if (config_->action_map().find(child_name) == config_->action_map().end()) { |
|
|
|
|
continue; |
|
|
|
@ -324,14 +323,14 @@ void RdsLb::UpdateStateLocked() { |
|
|
|
|
} else { |
|
|
|
|
connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE; |
|
|
|
|
} |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[rds_lb %p] connectivity changed to %s", this, |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[xds_routing_lb %p] connectivity changed to %s", this, |
|
|
|
|
ConnectivityStateName(connectivity_state)); |
|
|
|
|
} |
|
|
|
|
std::unique_ptr<SubchannelPicker> picker; |
|
|
|
|
switch (connectivity_state) { |
|
|
|
|
case GRPC_CHANNEL_READY: |
|
|
|
|
picker = absl::make_unique<RdsPicker>(Ref(DEBUG_LOCATION, "RdsPicker"), |
|
|
|
|
picker = absl::make_unique<XdsRoutingPicker>(Ref(DEBUG_LOCATION, "XdsRoutingPicker"), |
|
|
|
|
std::move(picker_list)); |
|
|
|
|
break; |
|
|
|
|
case GRPC_CHANNEL_CONNECTING: |
|
|
|
@ -342,41 +341,41 @@ void RdsLb::UpdateStateLocked() { |
|
|
|
|
default: |
|
|
|
|
picker = absl::make_unique<TransientFailurePicker>( |
|
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"rds: all children report state TRANSIENT_FAILURE")); |
|
|
|
|
"xds_routing: all children report state TRANSIENT_FAILURE")); |
|
|
|
|
} |
|
|
|
|
channel_control_helper()->UpdateState(connectivity_state, std::move(picker)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// RdsLb::RdsChild
|
|
|
|
|
// XdsRoutingLb::XdsRoutingChild
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
RdsLb::RdsChild::RdsChild(RefCountedPtr<RdsLb> rds_policy, |
|
|
|
|
XdsRoutingLb::XdsRoutingChild::XdsRoutingChild(RefCountedPtr<XdsRoutingLb> xds_routing_policy, |
|
|
|
|
const std::string& name) |
|
|
|
|
: rds_policy_(std::move(rds_policy)), name_(name) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[rds_lb %p] created RdsChild %p for %s", |
|
|
|
|
rds_policy_.get(), this, name_.c_str()); |
|
|
|
|
: xds_routing_policy_(std::move(xds_routing_policy)), name_(name) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[xds_routing_lb %p] created XdsRoutingChild %p for %s", |
|
|
|
|
xds_routing_policy_.get(), this, name_.c_str()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
RdsLb::RdsChild::~RdsChild() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[rds_lb %p] RdsChild %p %s: destroying child", |
|
|
|
|
rds_policy_.get(), this, name_.c_str()); |
|
|
|
|
XdsRoutingLb::XdsRoutingChild::~XdsRoutingChild() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[xds_routing_lb %p] XdsRoutingChild %p %s: destroying child", |
|
|
|
|
xds_routing_policy_.get(), this, name_.c_str()); |
|
|
|
|
} |
|
|
|
|
rds_policy_.reset(DEBUG_LOCATION, "RdsChild"); |
|
|
|
|
xds_routing_policy_.reset(DEBUG_LOCATION, "XdsRoutingChild"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RdsLb::RdsChild::Orphan() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[rds_lb %p] RdsChild %p %s: shutting down child", |
|
|
|
|
rds_policy_.get(), this, name_.c_str()); |
|
|
|
|
void XdsRoutingLb::XdsRoutingChild::Orphan() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[xds_routing_lb %p] XdsRoutingChild %p %s: shutting down child", |
|
|
|
|
xds_routing_policy_.get(), this, name_.c_str()); |
|
|
|
|
} |
|
|
|
|
// Remove the child policy's interested_parties pollset_set from the
|
|
|
|
|
// xDS policy.
|
|
|
|
|
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), |
|
|
|
|
rds_policy_->interested_parties()); |
|
|
|
|
xds_routing_policy_->interested_parties()); |
|
|
|
|
child_policy_.reset(); |
|
|
|
|
// Drop our ref to the child's picker, in case it's holding a ref to
|
|
|
|
|
// the child.
|
|
|
|
@ -388,34 +387,34 @@ void RdsLb::RdsChild::Orphan() { |
|
|
|
|
Unref(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> RdsLb::RdsChild::CreateChildPolicyLocked( |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> XdsRoutingLb::XdsRoutingChild::CreateChildPolicyLocked( |
|
|
|
|
const grpc_channel_args* args) { |
|
|
|
|
LoadBalancingPolicy::Args lb_policy_args; |
|
|
|
|
lb_policy_args.combiner = rds_policy_->combiner(); |
|
|
|
|
lb_policy_args.combiner = xds_routing_policy_->combiner(); |
|
|
|
|
lb_policy_args.args = args; |
|
|
|
|
lb_policy_args.channel_control_helper = |
|
|
|
|
absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper")); |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> lb_policy = |
|
|
|
|
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), |
|
|
|
|
&grpc_rds_lb_trace); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) { |
|
|
|
|
&grpc_xds_routing_lb_trace); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[rds_lb %p] RdsChild %p %s: Created new child " |
|
|
|
|
"[xds_routing_lb %p] XdsRoutingChild %p %s: Created new child " |
|
|
|
|
"policy handler %p", |
|
|
|
|
rds_policy_.get(), this, name_.c_str(), lb_policy.get()); |
|
|
|
|
xds_routing_policy_.get(), this, name_.c_str(), lb_policy.get()); |
|
|
|
|
} |
|
|
|
|
// Add the xDS's interested_parties pollset_set to that of the newly created
|
|
|
|
|
// child policy. This will make the child policy progress upon activity on
|
|
|
|
|
// xDS LB, which in turn is tied to the application's call.
|
|
|
|
|
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), |
|
|
|
|
rds_policy_->interested_parties()); |
|
|
|
|
xds_routing_policy_->interested_parties()); |
|
|
|
|
return lb_policy; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RdsLb::RdsChild::UpdateLocked(const RdsLbConfig::ChildConfig& config, |
|
|
|
|
void XdsRoutingLb::XdsRoutingChild::UpdateLocked(const XdsRoutingLbConfig::ChildConfig& config, |
|
|
|
|
const ServerAddressList& addresses, |
|
|
|
|
const grpc_channel_args* args) { |
|
|
|
|
if (rds_policy_->shutting_down_) return; |
|
|
|
|
if (xds_routing_policy_->shutting_down_) return; |
|
|
|
|
// Update child weight.
|
|
|
|
|
// Reactivate if needed.
|
|
|
|
|
if (delayed_removal_timer_callback_pending_) { |
|
|
|
@ -431,97 +430,99 @@ void RdsLb::RdsChild::UpdateLocked(const RdsLbConfig::ChildConfig& config, |
|
|
|
|
update_args.addresses = addresses; |
|
|
|
|
update_args.args = grpc_channel_args_copy(args); |
|
|
|
|
// Update the policy.
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_rds_lb_trace)) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[rds_lb %p] RdsChild %p %s: Updating child " |
|
|
|
|
"[xds_routing_lb %p] XdsRoutingChild %p %s: Updating child " |
|
|
|
|
"policy handler %p", |
|
|
|
|
rds_policy_.get(), this, name_.c_str(), child_policy_.get()); |
|
|
|
|
xds_routing_policy_.get(), this, name_.c_str(), child_policy_.get()); |
|
|
|
|
} |
|
|
|
|
child_policy_->UpdateLocked(std::move(update_args)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RdsLb::RdsChild::ExitIdleLocked() { child_policy_->ExitIdleLocked(); } |
|
|
|
|
void XdsRoutingLb::XdsRoutingChild::ExitIdleLocked() { child_policy_->ExitIdleLocked(); } |
|
|
|
|
|
|
|
|
|
void RdsLb::RdsChild::ResetBackoffLocked() { |
|
|
|
|
void XdsRoutingLb::XdsRoutingChild::ResetBackoffLocked() { |
|
|
|
|
child_policy_->ResetBackoffLocked(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RdsLb::RdsChild::DeactivateLocked() { |
|
|
|
|
void XdsRoutingLb::XdsRoutingChild::DeactivateLocked() { |
|
|
|
|
// If already deactivated, don't do that again.
|
|
|
|
|
// Set the child weight to 0 so that future picker won't contain this child.
|
|
|
|
|
// Start a timer to delete the child.
|
|
|
|
|
Ref(DEBUG_LOCATION, "RdsChild+timer").release(); |
|
|
|
|
Ref(DEBUG_LOCATION, "XdsRoutingChild+timer").release(); |
|
|
|
|
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
grpc_timer_init( |
|
|
|
|
&delayed_removal_timer_, |
|
|
|
|
ExecCtx::Get()->Now() + rds_policy_->child_retention_interval_ms_, |
|
|
|
|
ExecCtx::Get()->Now() + xds_routing_policy_->child_retention_interval_ms_, |
|
|
|
|
&on_delayed_removal_timer_); |
|
|
|
|
delayed_removal_timer_callback_pending_ = true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RdsLb::RdsChild::OnDelayedRemovalTimer(void* arg, grpc_error* error) { |
|
|
|
|
RdsChild* self = static_cast<RdsChild*>(arg); |
|
|
|
|
self->rds_policy_->combiner()->Run( |
|
|
|
|
void XdsRoutingLb::XdsRoutingChild::OnDelayedRemovalTimer(void* arg, grpc_error* error) { |
|
|
|
|
XdsRoutingChild* self = static_cast<XdsRoutingChild*>(arg); |
|
|
|
|
self->xds_routing_policy_->combiner()->Run( |
|
|
|
|
GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_, |
|
|
|
|
OnDelayedRemovalTimerLocked, self, nullptr), |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RdsLb::RdsChild::OnDelayedRemovalTimerLocked(void* arg, |
|
|
|
|
void XdsRoutingLb::XdsRoutingChild::OnDelayedRemovalTimerLocked(void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
RdsChild* self = static_cast<RdsChild*>(arg); |
|
|
|
|
XdsRoutingChild* self = static_cast<XdsRoutingChild*>(arg); |
|
|
|
|
self->delayed_removal_timer_callback_pending_ = false; |
|
|
|
|
if (error == GRPC_ERROR_NONE && !self->shutdown_) { |
|
|
|
|
self->rds_policy_->actions_.erase(self->name_); |
|
|
|
|
self->xds_routing_policy_->actions_.erase(self->name_); |
|
|
|
|
} |
|
|
|
|
self->Unref(DEBUG_LOCATION, "RdsChild+timer"); |
|
|
|
|
self->Unref(DEBUG_LOCATION, "XdsRoutingChild+timer"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// RdsLb::RdsChild::Helper
|
|
|
|
|
// XdsRoutingLb::XdsRoutingChild::Helper
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
RefCountedPtr<SubchannelInterface> RdsLb::RdsChild::Helper::CreateSubchannel( |
|
|
|
|
RefCountedPtr<SubchannelInterface> XdsRoutingLb::XdsRoutingChild::Helper::CreateSubchannel( |
|
|
|
|
const grpc_channel_args& args) { |
|
|
|
|
if (rds_child_->rds_policy_->shutting_down_) return nullptr; |
|
|
|
|
return rds_child_->rds_policy_->channel_control_helper()->CreateSubchannel( |
|
|
|
|
gpr_log(GPR_INFO, "donna XdsRoutingChild::Helper::CreateSubchannel"); |
|
|
|
|
if (xds_routing_child_->xds_routing_policy_->shutting_down_) return nullptr; |
|
|
|
|
return xds_routing_child_->xds_routing_policy_->channel_control_helper()->CreateSubchannel( |
|
|
|
|
args); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RdsLb::RdsChild::Helper::UpdateState( |
|
|
|
|
void XdsRoutingLb::XdsRoutingChild::Helper::UpdateState( |
|
|
|
|
grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) { |
|
|
|
|
if (rds_child_->rds_policy_->shutting_down_) return; |
|
|
|
|
// Cache the picker in the RdsChild.
|
|
|
|
|
rds_child_->picker_wrapper_ = |
|
|
|
|
gpr_log(GPR_INFO, "donna XdsRoutingChild::Helper::UpdateState"); |
|
|
|
|
if (xds_routing_child_->xds_routing_policy_->shutting_down_) return; |
|
|
|
|
// Cache the picker in the XdsRoutingChild.
|
|
|
|
|
xds_routing_child_->picker_wrapper_ = |
|
|
|
|
MakeRefCounted<ChildPickerWrapper>(std::move(picker)); |
|
|
|
|
// Decide what state to report for aggregation purposes.
|
|
|
|
|
// If we haven't seen a failure since the last time we were in state
|
|
|
|
|
// READY, then we report the state change as-is. However, once we do see
|
|
|
|
|
// a failure, we report TRANSIENT_FAILURE and ignore any subsequent state
|
|
|
|
|
// changes until we go back into state READY.
|
|
|
|
|
if (!rds_child_->seen_failure_since_ready_) { |
|
|
|
|
if (!xds_routing_child_->seen_failure_since_ready_) { |
|
|
|
|
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
|
|
|
|
rds_child_->seen_failure_since_ready_ = true; |
|
|
|
|
xds_routing_child_->seen_failure_since_ready_ = true; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
if (state != GRPC_CHANNEL_READY) return; |
|
|
|
|
rds_child_->seen_failure_since_ready_ = false; |
|
|
|
|
xds_routing_child_->seen_failure_since_ready_ = false; |
|
|
|
|
} |
|
|
|
|
rds_child_->connectivity_state_ = state; |
|
|
|
|
xds_routing_child_->connectivity_state_ = state; |
|
|
|
|
// Notify the LB policy.
|
|
|
|
|
rds_child_->rds_policy_->UpdateStateLocked(); |
|
|
|
|
xds_routing_child_->xds_routing_policy_->UpdateStateLocked(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RdsLb::RdsChild::Helper::RequestReresolution() { |
|
|
|
|
if (rds_child_->rds_policy_->shutting_down_) return; |
|
|
|
|
rds_child_->rds_policy_->channel_control_helper()->RequestReresolution(); |
|
|
|
|
void XdsRoutingLb::XdsRoutingChild::Helper::RequestReresolution() { |
|
|
|
|
if (xds_routing_child_->xds_routing_policy_->shutting_down_) return; |
|
|
|
|
xds_routing_child_->xds_routing_policy_->channel_control_helper()->RequestReresolution(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RdsLb::RdsChild::Helper::AddTraceEvent(TraceSeverity severity, |
|
|
|
|
void XdsRoutingLb::XdsRoutingChild::Helper::AddTraceEvent(TraceSeverity severity, |
|
|
|
|
StringView message) { |
|
|
|
|
if (rds_child_->rds_policy_->shutting_down_) return; |
|
|
|
|
rds_child_->rds_policy_->channel_control_helper()->AddTraceEvent(severity, |
|
|
|
|
if (xds_routing_child_->xds_routing_policy_->shutting_down_) return; |
|
|
|
|
xds_routing_child_->xds_routing_policy_->channel_control_helper()->AddTraceEvent(severity, |
|
|
|
|
message); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -529,30 +530,30 @@ void RdsLb::RdsChild::Helper::AddTraceEvent(TraceSeverity severity, |
|
|
|
|
// factory
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
class RdsLbFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
class XdsRoutingLbFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
public: |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( |
|
|
|
|
LoadBalancingPolicy::Args args) const override { |
|
|
|
|
return MakeOrphanable<RdsLb>(std::move(args)); |
|
|
|
|
return MakeOrphanable<XdsRoutingLb>(std::move(args)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const char* name() const override { return kRds; } |
|
|
|
|
const char* name() const override { return kXdsRouting; } |
|
|
|
|
|
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig( |
|
|
|
|
const Json& json, grpc_error** error) const override { |
|
|
|
|
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); |
|
|
|
|
if (json.type() == Json::Type::JSON_NULL) { |
|
|
|
|
// rds was mentioned as a policy in the deprecated
|
|
|
|
|
// xds_routing was mentioned as a policy in the deprecated
|
|
|
|
|
// loadBalancingPolicy field or in the client API.
|
|
|
|
|
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"field:loadBalancingPolicy error:rds policy requires " |
|
|
|
|
"field:loadBalancingPolicy error:xds_routing policy requires " |
|
|
|
|
"configuration. Please use loadBalancingConfig field of service " |
|
|
|
|
"config instead."); |
|
|
|
|
return nullptr; |
|
|
|
|
} |
|
|
|
|
std::vector<grpc_error*> error_list; |
|
|
|
|
// Weight map.
|
|
|
|
|
RdsLbConfig::ActionMap action_map; |
|
|
|
|
// action map.
|
|
|
|
|
XdsRoutingLbConfig::ActionMap action_map; |
|
|
|
|
auto it = json.object_value().find("actions"); |
|
|
|
|
if (it == json.object_value().end()) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
@ -562,17 +563,19 @@ class RdsLbFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
"field:actions error:type should be array")); |
|
|
|
|
} else { |
|
|
|
|
for (const auto& p : it->second.array_value()) { |
|
|
|
|
auto it_name = p.object_value().find("name"); |
|
|
|
|
if (it_name == p.object_value().end()) { |
|
|
|
|
auto it_cds = p.object_value().find("cds"); |
|
|
|
|
auto it_weighted_target = p.object_value().find("weighted_target"); |
|
|
|
|
if (it_cds == p.object_value().end() && it_weighted_target == p.object_value().end()) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"field:actions error: each action needs a name")); |
|
|
|
|
"field:actions error: each action needs to be either cds or weighted target")); |
|
|
|
|
} |
|
|
|
|
auto it_name = (it_cds == p.object_value().end() ? it_weighted_target : it_cds); |
|
|
|
|
auto it_child_policy = p.object_value().find("child_policy"); |
|
|
|
|
if (it_child_policy == p.object_value().end()) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"field:actions error: each action needs child policies")); |
|
|
|
|
} |
|
|
|
|
RdsLbConfig::ChildConfig child_config; |
|
|
|
|
XdsRoutingLbConfig::ChildConfig child_config; |
|
|
|
|
std::vector<grpc_error*> child_errors = |
|
|
|
|
ParseChildConfig(it_child_policy->second, &child_config); |
|
|
|
|
if (!child_errors.empty()) { |
|
|
|
@ -593,15 +596,15 @@ class RdsLbFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
} |
|
|
|
|
if (!error_list.empty()) { |
|
|
|
|
*error = GRPC_ERROR_CREATE_FROM_VECTOR( |
|
|
|
|
"rds_experimental LB policy config", &error_list); |
|
|
|
|
"xds_routing_experimental LB policy config", &error_list); |
|
|
|
|
return nullptr; |
|
|
|
|
} |
|
|
|
|
return MakeRefCounted<RdsLbConfig>(std::move(action_map)); |
|
|
|
|
return MakeRefCounted<XdsRoutingLbConfig>(std::move(action_map)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
static std::vector<grpc_error*> ParseChildConfig( |
|
|
|
|
const Json& json, RdsLbConfig::ChildConfig* child_config) { |
|
|
|
|
const Json& json, XdsRoutingLbConfig::ChildConfig* child_config) { |
|
|
|
|
std::vector<grpc_error*> error_list; |
|
|
|
|
if (json.type() != Json::Type::ARRAY) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
@ -631,10 +634,10 @@ class RdsLbFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
// Plugin registration
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
void grpc_lb_policy_rds_init() { |
|
|
|
|
void grpc_lb_policy_xds_routing_init() { |
|
|
|
|
grpc_core::LoadBalancingPolicyRegistry::Builder:: |
|
|
|
|
RegisterLoadBalancingPolicyFactory( |
|
|
|
|
absl::make_unique<grpc_core::RdsLbFactory>()); |
|
|
|
|
absl::make_unique<grpc_core::XdsRoutingLbFactory>()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_lb_policy_rds_shutdown() {} |
|
|
|
|
void grpc_lb_policy_xds_routing_shutdown() {} |