|
|
|
@ -68,7 +68,9 @@ |
|
|
|
|
#include <grpc/support/string_util.h> |
|
|
|
|
#include <grpc/support/time.h> |
|
|
|
|
|
|
|
|
|
#include "include/grpc/support/alloc.h" |
|
|
|
|
#include "src/core/ext/filters/client_channel/client_channel.h" |
|
|
|
|
#include "src/core/ext/filters/client_channel/lb_policy.h" |
|
|
|
|
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h" |
|
|
|
|
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h" |
|
|
|
|
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h" |
|
|
|
@ -85,6 +87,7 @@ |
|
|
|
|
#include "src/core/lib/gpr/host_port.h" |
|
|
|
|
#include "src/core/lib/gpr/string.h" |
|
|
|
|
#include "src/core/lib/gprpp/manual_constructor.h" |
|
|
|
|
#include "src/core/lib/gprpp/map.h" |
|
|
|
|
#include "src/core/lib/gprpp/memory.h" |
|
|
|
|
#include "src/core/lib/gprpp/mutex_lock.h" |
|
|
|
|
#include "src/core/lib/gprpp/orphanable.h" |
|
|
|
@ -114,6 +117,7 @@ TraceFlag grpc_lb_xds_trace(false, "xds"); |
|
|
|
|
namespace { |
|
|
|
|
|
|
|
|
|
constexpr char kXds[] = "xds_experimental"; |
|
|
|
|
constexpr char kDefaultLocalityName[] = "xds_default_locality"; |
|
|
|
|
|
|
|
|
|
class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
public: |
|
|
|
@ -128,6 +132,9 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
channelz::ChildRefsList* child_channels) override; |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
struct LocalityServerlistEntry; |
|
|
|
|
using LocalityList = InlinedVector<UniquePtr<LocalityServerlistEntry>, 1>; |
|
|
|
|
|
|
|
|
|
/// Contains a channel to the LB server and all the data related to the
|
|
|
|
|
/// channel.
|
|
|
|
|
class BalancerChannelState |
|
|
|
@ -266,25 +273,88 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
RefCountedPtr<XdsLbClientStats> client_stats_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class Helper : public ChannelControlHelper { |
|
|
|
|
class LocalityMap { |
|
|
|
|
public: |
|
|
|
|
explicit Helper(RefCountedPtr<XdsLb> parent) : parent_(std::move(parent)) {} |
|
|
|
|
class LocalityEntry : public InternallyRefCounted<LocalityEntry> { |
|
|
|
|
public: |
|
|
|
|
explicit LocalityEntry(RefCountedPtr<XdsLb> parent) |
|
|
|
|
: parent_(std::move(parent)) { |
|
|
|
|
gpr_mu_init(&child_policy_mu_); |
|
|
|
|
} |
|
|
|
|
~LocalityEntry() { gpr_mu_destroy(&child_policy_mu_); } |
|
|
|
|
|
|
|
|
|
void UpdateLocked(xds_grpclb_serverlist* serverlist, |
|
|
|
|
LoadBalancingPolicy::Config* child_policy_config, |
|
|
|
|
const grpc_channel_args* args); |
|
|
|
|
void ShutdownLocked(); |
|
|
|
|
void ResetBackoffLocked(); |
|
|
|
|
void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels, |
|
|
|
|
channelz::ChildRefsList* child_channels); |
|
|
|
|
void Orphan() override; |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
class Helper : public ChannelControlHelper { |
|
|
|
|
public: |
|
|
|
|
explicit Helper(RefCountedPtr<LocalityEntry> entry) |
|
|
|
|
: entry_(std::move(entry)) {} |
|
|
|
|
|
|
|
|
|
Subchannel* CreateSubchannel(const grpc_channel_args& args) override; |
|
|
|
|
grpc_channel* CreateChannel(const char* target, |
|
|
|
|
const grpc_channel_args& args) override; |
|
|
|
|
void UpdateState(grpc_connectivity_state state, grpc_error* state_error, |
|
|
|
|
UniquePtr<SubchannelPicker> picker) override; |
|
|
|
|
void RequestReresolution() override; |
|
|
|
|
void set_child(LoadBalancingPolicy* child) { child_ = child; } |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
bool CalledByPendingChild() const; |
|
|
|
|
bool CalledByCurrentChild() const; |
|
|
|
|
|
|
|
|
|
RefCountedPtr<LocalityEntry> entry_; |
|
|
|
|
LoadBalancingPolicy* child_ = nullptr; |
|
|
|
|
}; |
|
|
|
|
// Methods for dealing with the child policy.
|
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked( |
|
|
|
|
const char* name, const grpc_channel_args* args); |
|
|
|
|
grpc_channel_args* CreateChildPolicyArgsLocked( |
|
|
|
|
const grpc_channel_args* args); |
|
|
|
|
|
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> child_policy_; |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_; |
|
|
|
|
// Lock held when modifying the value of child_policy_ or
|
|
|
|
|
// pending_child_policy_.
|
|
|
|
|
gpr_mu child_policy_mu_; |
|
|
|
|
RefCountedPtr<XdsLb> parent_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
Subchannel* CreateSubchannel(const grpc_channel_args& args) override; |
|
|
|
|
grpc_channel* CreateChannel(const char* target, |
|
|
|
|
const grpc_channel_args& args) override; |
|
|
|
|
void UpdateState(grpc_connectivity_state state, grpc_error* state_error, |
|
|
|
|
UniquePtr<SubchannelPicker> picker) override; |
|
|
|
|
void RequestReresolution() override; |
|
|
|
|
LocalityMap() { gpr_mu_init(&child_refs_mu_); } |
|
|
|
|
~LocalityMap() { gpr_mu_destroy(&child_refs_mu_); } |
|
|
|
|
|
|
|
|
|
void set_child(LoadBalancingPolicy* child) { child_ = child; } |
|
|
|
|
void UpdateLocked(const LocalityList& locality_list, |
|
|
|
|
LoadBalancingPolicy::Config* child_policy_config, |
|
|
|
|
const grpc_channel_args* args, XdsLb* parent); |
|
|
|
|
void ShutdownLocked(); |
|
|
|
|
void ResetBackoffLocked(); |
|
|
|
|
void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels, |
|
|
|
|
channelz::ChildRefsList* child_channels); |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
bool CalledByPendingChild() const; |
|
|
|
|
bool CalledByCurrentChild() const; |
|
|
|
|
void PruneLocalities(const LocalityList& locality_list); |
|
|
|
|
Map<UniquePtr<char>, OrphanablePtr<LocalityEntry>, StringLess> map_; |
|
|
|
|
// Lock held while filling child refs for all localities
|
|
|
|
|
// inside the map
|
|
|
|
|
gpr_mu child_refs_mu_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
RefCountedPtr<XdsLb> parent_; |
|
|
|
|
LoadBalancingPolicy* child_ = nullptr; |
|
|
|
|
struct LocalityServerlistEntry { |
|
|
|
|
~LocalityServerlistEntry() { |
|
|
|
|
gpr_free(locality_name); |
|
|
|
|
xds_grpclb_destroy_serverlist(serverlist); |
|
|
|
|
} |
|
|
|
|
char* locality_name; |
|
|
|
|
// The deserialized response from the balancer. May be nullptr until one
|
|
|
|
|
// such response has arrived.
|
|
|
|
|
xds_grpclb_serverlist* serverlist; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
~XdsLb(); |
|
|
|
@ -309,12 +379,6 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
// Callback to enter fallback mode.
|
|
|
|
|
static void OnFallbackTimerLocked(void* arg, grpc_error* error); |
|
|
|
|
|
|
|
|
|
// Methods for dealing with the child policy.
|
|
|
|
|
void CreateOrUpdateChildPolicyLocked(); |
|
|
|
|
grpc_channel_args* CreateChildPolicyArgsLocked(); |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked( |
|
|
|
|
const char* name, const grpc_channel_args* args); |
|
|
|
|
|
|
|
|
|
// Who the client is trying to communicate with.
|
|
|
|
|
const char* server_name_ = nullptr; |
|
|
|
|
|
|
|
|
@ -338,10 +402,6 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
// Timeout in milliseconds for the LB call. 0 means no deadline.
|
|
|
|
|
int lb_call_timeout_ms_ = 0; |
|
|
|
|
|
|
|
|
|
// The deserialized response from the balancer. May be nullptr until one
|
|
|
|
|
// such response has arrived.
|
|
|
|
|
xds_grpclb_serverlist* serverlist_ = nullptr; |
|
|
|
|
|
|
|
|
|
// Timeout in milliseconds for before using fallback backend addresses.
|
|
|
|
|
// 0 means not using fallback.
|
|
|
|
|
RefCountedPtr<Config> fallback_policy_config_; |
|
|
|
@ -355,11 +415,12 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
|
|
|
|
|
// The policy to use for the backends.
|
|
|
|
|
RefCountedPtr<Config> child_policy_config_; |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> child_policy_; |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_; |
|
|
|
|
// Lock held when modifying the value of child_policy_ or
|
|
|
|
|
// pending_child_policy_.
|
|
|
|
|
gpr_mu child_policy_mu_; |
|
|
|
|
// Map of policies to use in the backend
|
|
|
|
|
LocalityMap locality_map_; |
|
|
|
|
LocalityList locality_serverlist_; |
|
|
|
|
// TODO(mhaidry) : Add a pending locality map that may be swapped with the
|
|
|
|
|
// the current one when new localities in the pending map are ready
|
|
|
|
|
// to accept connections
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -378,105 +439,6 @@ XdsLb::PickResult XdsLb::Picker::Pick(PickArgs* pick, grpc_error** error) { |
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// XdsLb::Helper
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
bool XdsLb::Helper::CalledByPendingChild() const { |
|
|
|
|
GPR_ASSERT(child_ != nullptr); |
|
|
|
|
return child_ == parent_->pending_child_policy_.get(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool XdsLb::Helper::CalledByCurrentChild() const { |
|
|
|
|
GPR_ASSERT(child_ != nullptr); |
|
|
|
|
return child_ == parent_->child_policy_.get(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Subchannel* XdsLb::Helper::CreateSubchannel(const grpc_channel_args& args) { |
|
|
|
|
if (parent_->shutting_down_ || |
|
|
|
|
(!CalledByPendingChild() && !CalledByCurrentChild())) { |
|
|
|
|
return nullptr; |
|
|
|
|
} |
|
|
|
|
return parent_->channel_control_helper()->CreateSubchannel(args); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_channel* XdsLb::Helper::CreateChannel(const char* target, |
|
|
|
|
const grpc_channel_args& args) { |
|
|
|
|
if (parent_->shutting_down_ || |
|
|
|
|
(!CalledByPendingChild() && !CalledByCurrentChild())) { |
|
|
|
|
return nullptr; |
|
|
|
|
} |
|
|
|
|
return parent_->channel_control_helper()->CreateChannel(target, args); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::Helper::UpdateState(grpc_connectivity_state state, |
|
|
|
|
grpc_error* state_error, |
|
|
|
|
UniquePtr<SubchannelPicker> picker) { |
|
|
|
|
if (parent_->shutting_down_) { |
|
|
|
|
GRPC_ERROR_UNREF(state_error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// If this request is from the pending child policy, ignore it until
|
|
|
|
|
// it reports READY, at which point we swap it into place.
|
|
|
|
|
if (CalledByPendingChild()) { |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p helper %p] pending child policy %p reports state=%s", |
|
|
|
|
parent_.get(), this, parent_->pending_child_policy_.get(), |
|
|
|
|
grpc_connectivity_state_name(state)); |
|
|
|
|
} |
|
|
|
|
if (state != GRPC_CHANNEL_READY) { |
|
|
|
|
GRPC_ERROR_UNREF(state_error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
grpc_pollset_set_del_pollset_set( |
|
|
|
|
parent_->child_policy_->interested_parties(), |
|
|
|
|
parent_->interested_parties()); |
|
|
|
|
MutexLock lock(&parent_->child_policy_mu_); |
|
|
|
|
parent_->child_policy_ = std::move(parent_->pending_child_policy_); |
|
|
|
|
} else if (!CalledByCurrentChild()) { |
|
|
|
|
// This request is from an outdated child, so ignore it.
|
|
|
|
|
GRPC_ERROR_UNREF(state_error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// TODO(juanlishen): When in fallback mode, pass the child picker
|
|
|
|
|
// through without wrapping it. (Or maybe use a different helper for
|
|
|
|
|
// the fallback policy?)
|
|
|
|
|
GPR_ASSERT(parent_->lb_chand_ != nullptr); |
|
|
|
|
RefCountedPtr<XdsLbClientStats> client_stats = |
|
|
|
|
parent_->lb_chand_->lb_calld() == nullptr |
|
|
|
|
? nullptr |
|
|
|
|
: parent_->lb_chand_->lb_calld()->client_stats(); |
|
|
|
|
parent_->channel_control_helper()->UpdateState( |
|
|
|
|
state, state_error, |
|
|
|
|
UniquePtr<SubchannelPicker>( |
|
|
|
|
New<Picker>(std::move(picker), std::move(client_stats)))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::Helper::RequestReresolution() { |
|
|
|
|
if (parent_->shutting_down_) return; |
|
|
|
|
// If there is a pending child policy, ignore re-resolution requests
|
|
|
|
|
// from the current child policy (or any outdated child).
|
|
|
|
|
if (parent_->pending_child_policy_ != nullptr && !CalledByPendingChild()) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Re-resolution requested from the internal RR policy " |
|
|
|
|
"(%p).", |
|
|
|
|
parent_.get(), parent_->child_policy_.get()); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(parent_->lb_chand_ != nullptr); |
|
|
|
|
// If we are talking to a balancer, we expect to get updated addresses
|
|
|
|
|
// from the balancer, so we can ignore the re-resolution request from
|
|
|
|
|
// the child policy. Otherwise, pass the re-resolution request up to the
|
|
|
|
|
// channel.
|
|
|
|
|
if (parent_->lb_chand_->lb_calld() == nullptr || |
|
|
|
|
!parent_->lb_chand_->lb_calld()->seen_initial_response()) { |
|
|
|
|
parent_->channel_control_helper()->RequestReresolution(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// serverlist parsing code
|
|
|
|
|
//
|
|
|
|
@ -951,7 +913,9 @@ void XdsLb::BalancerChannelState::BalancerCallState:: |
|
|
|
|
self.release(); |
|
|
|
|
lb_calld->ScheduleNextClientLoadReportLocked(); |
|
|
|
|
} |
|
|
|
|
if (xds_grpclb_serverlist_equals(xdslb_policy->serverlist_, serverlist)) { |
|
|
|
|
if (!xdslb_policy->locality_serverlist_.empty() && |
|
|
|
|
xds_grpclb_serverlist_equals( |
|
|
|
|
xdslb_policy->locality_serverlist_[0]->serverlist, serverlist)) { |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Incoming server list identical to current, " |
|
|
|
@ -960,21 +924,31 @@ void XdsLb::BalancerChannelState::BalancerCallState:: |
|
|
|
|
} |
|
|
|
|
xds_grpclb_destroy_serverlist(serverlist); |
|
|
|
|
} else { /* new serverlist */ |
|
|
|
|
if (xdslb_policy->serverlist_ != nullptr) { |
|
|
|
|
if (!xdslb_policy->locality_serverlist_.empty()) { |
|
|
|
|
/* dispose of the old serverlist */ |
|
|
|
|
xds_grpclb_destroy_serverlist(xdslb_policy->serverlist_); |
|
|
|
|
xds_grpclb_destroy_serverlist( |
|
|
|
|
xdslb_policy->locality_serverlist_[0]->serverlist); |
|
|
|
|
} else { |
|
|
|
|
/* or dispose of the fallback */ |
|
|
|
|
xdslb_policy->fallback_backend_addresses_.reset(); |
|
|
|
|
if (xdslb_policy->fallback_timer_callback_pending_) { |
|
|
|
|
grpc_timer_cancel(&xdslb_policy->lb_fallback_timer_); |
|
|
|
|
} |
|
|
|
|
/* Initialize locality serverlist, currently the list only handles
|
|
|
|
|
* one child */ |
|
|
|
|
xdslb_policy->locality_serverlist_.emplace_back( |
|
|
|
|
MakeUnique<LocalityServerlistEntry>()); |
|
|
|
|
xdslb_policy->locality_serverlist_[0]->locality_name = |
|
|
|
|
static_cast<char*>(gpr_strdup(kDefaultLocalityName)); |
|
|
|
|
} |
|
|
|
|
// and update the copy in the XdsLb instance. This
|
|
|
|
|
// serverlist instance will be destroyed either upon the next
|
|
|
|
|
// update or when the XdsLb instance is destroyed.
|
|
|
|
|
xdslb_policy->serverlist_ = serverlist; |
|
|
|
|
xdslb_policy->CreateOrUpdateChildPolicyLocked(); |
|
|
|
|
xdslb_policy->locality_serverlist_[0]->serverlist = serverlist; |
|
|
|
|
xdslb_policy->locality_map_.UpdateLocked( |
|
|
|
|
xdslb_policy->locality_serverlist_, |
|
|
|
|
xdslb_policy->child_policy_config_.get(), xdslb_policy->args_, |
|
|
|
|
xdslb_policy); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
@ -1112,9 +1086,11 @@ grpc_channel_args* BuildBalancerChannelArgs(const grpc_channel_args* args) { |
|
|
|
|
// ctor and dtor
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
XdsLb::XdsLb(Args args) : LoadBalancingPolicy(std::move(args)) { |
|
|
|
|
XdsLb::XdsLb(Args args) |
|
|
|
|
: LoadBalancingPolicy(std::move(args)), |
|
|
|
|
locality_map_(), |
|
|
|
|
locality_serverlist_() { |
|
|
|
|
gpr_mu_init(&lb_chand_mu_); |
|
|
|
|
gpr_mu_init(&child_policy_mu_); |
|
|
|
|
// Record server name.
|
|
|
|
|
const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); |
|
|
|
|
const char* server_uri = grpc_channel_arg_get_string(arg); |
|
|
|
@ -1141,10 +1117,7 @@ XdsLb::~XdsLb() { |
|
|
|
|
gpr_mu_destroy(&lb_chand_mu_); |
|
|
|
|
gpr_free((void*)server_name_); |
|
|
|
|
grpc_channel_args_destroy(args_); |
|
|
|
|
if (serverlist_ != nullptr) { |
|
|
|
|
xds_grpclb_destroy_serverlist(serverlist_); |
|
|
|
|
} |
|
|
|
|
gpr_mu_destroy(&child_policy_mu_); |
|
|
|
|
locality_serverlist_.clear(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::ShutdownLocked() { |
|
|
|
@ -1152,19 +1125,7 @@ void XdsLb::ShutdownLocked() { |
|
|
|
|
if (fallback_timer_callback_pending_) { |
|
|
|
|
grpc_timer_cancel(&lb_fallback_timer_); |
|
|
|
|
} |
|
|
|
|
if (child_policy_ != nullptr) { |
|
|
|
|
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), |
|
|
|
|
interested_parties()); |
|
|
|
|
} |
|
|
|
|
if (pending_child_policy_ != nullptr) { |
|
|
|
|
grpc_pollset_set_del_pollset_set( |
|
|
|
|
pending_child_policy_->interested_parties(), interested_parties()); |
|
|
|
|
} |
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&child_policy_mu_); |
|
|
|
|
child_policy_.reset(); |
|
|
|
|
pending_child_policy_.reset(); |
|
|
|
|
} |
|
|
|
|
locality_map_.ShutdownLocked(); |
|
|
|
|
// We destroy the LB channel here instead of in our destructor because
|
|
|
|
|
// destroying the channel triggers a last callback to
|
|
|
|
|
// OnBalancerChannelConnectivityChangedLocked(), and we need to be
|
|
|
|
@ -1187,30 +1148,13 @@ void XdsLb::ResetBackoffLocked() { |
|
|
|
|
if (pending_lb_chand_ != nullptr) { |
|
|
|
|
grpc_channel_reset_connect_backoff(pending_lb_chand_->channel()); |
|
|
|
|
} |
|
|
|
|
if (child_policy_ != nullptr) { |
|
|
|
|
child_policy_->ResetBackoffLocked(); |
|
|
|
|
} |
|
|
|
|
if (pending_child_policy_ != nullptr) { |
|
|
|
|
pending_child_policy_->ResetBackoffLocked(); |
|
|
|
|
} |
|
|
|
|
locality_map_.ResetBackoffLocked(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels, |
|
|
|
|
channelz::ChildRefsList* child_channels) { |
|
|
|
|
{ |
|
|
|
|
// Delegate to the child_policy_ to fill the children subchannels.
|
|
|
|
|
// This must be done holding child_policy_mu_, since this method does not
|
|
|
|
|
// run in the combiner.
|
|
|
|
|
MutexLock lock(&child_policy_mu_); |
|
|
|
|
if (child_policy_ != nullptr) { |
|
|
|
|
child_policy_->FillChildRefsForChannelz(child_subchannels, |
|
|
|
|
child_channels); |
|
|
|
|
} |
|
|
|
|
if (pending_child_policy_ != nullptr) { |
|
|
|
|
pending_child_policy_->FillChildRefsForChannelz(child_subchannels, |
|
|
|
|
child_channels); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Delegate to the child_policy_ to fill the children subchannels.
|
|
|
|
|
locality_map_.FillChildRefsForChannelz(child_subchannels, child_channels); |
|
|
|
|
MutexLock lock(&lb_chand_mu_); |
|
|
|
|
if (lb_chand_ != nullptr) { |
|
|
|
|
grpc_core::channelz::ChannelNode* channel_node = |
|
|
|
@ -1314,10 +1258,11 @@ void XdsLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
// have been created from a serverlist.
|
|
|
|
|
// TODO(vpowar): Handle the fallback_address changes when we add support for
|
|
|
|
|
// fallback in xDS.
|
|
|
|
|
if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked(); |
|
|
|
|
locality_map_.UpdateLocked(locality_serverlist_, child_policy_config_.get(), |
|
|
|
|
args_, this); |
|
|
|
|
// If this is the initial update, start the fallback timer.
|
|
|
|
|
if (is_initial_update) { |
|
|
|
|
if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr && |
|
|
|
|
if (lb_fallback_timeout_ms_ > 0 && locality_serverlist_.empty() && |
|
|
|
|
!fallback_timer_callback_pending_) { |
|
|
|
|
grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; |
|
|
|
|
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Held by closure
|
|
|
|
@ -1341,8 +1286,8 @@ void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { |
|
|
|
|
xdslb_policy->fallback_timer_callback_pending_ = false; |
|
|
|
|
// If we receive a serverlist after the timer fires but before this callback
|
|
|
|
|
// actually runs, don't fall back.
|
|
|
|
|
if (xdslb_policy->serverlist_ == nullptr && !xdslb_policy->shutting_down_ && |
|
|
|
|
error == GRPC_ERROR_NONE) { |
|
|
|
|
if (xdslb_policy->locality_serverlist_.empty() && |
|
|
|
|
!xdslb_policy->shutting_down_ && error == GRPC_ERROR_NONE) { |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Fallback timer fired. Not using fallback backends", |
|
|
|
@ -1352,11 +1297,70 @@ void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { |
|
|
|
|
xdslb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// code for interacting with the child policy
|
|
|
|
|
//
|
|
|
|
|
void XdsLb::LocalityMap::PruneLocalities(const LocalityList& locality_list) { |
|
|
|
|
for (auto iter = map_.begin(); iter != map_.end();) { |
|
|
|
|
bool found = false; |
|
|
|
|
for (size_t i = 0; i < locality_list.size(); i++) { |
|
|
|
|
if (!gpr_stricmp(locality_list[i]->locality_name, iter->first.get())) { |
|
|
|
|
found = true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (!found) { // Remove entries not present in the locality list
|
|
|
|
|
MutexLock lock(&child_refs_mu_); |
|
|
|
|
iter = map_.erase(iter); |
|
|
|
|
} else |
|
|
|
|
iter++; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::UpdateLocked( |
|
|
|
|
const LocalityList& locality_serverlist, |
|
|
|
|
LoadBalancingPolicy::Config* child_policy_config, |
|
|
|
|
const grpc_channel_args* args, XdsLb* parent) { |
|
|
|
|
if (parent->shutting_down_) return; |
|
|
|
|
for (size_t i = 0; i < locality_serverlist.size(); i++) { |
|
|
|
|
UniquePtr<char> locality_name( |
|
|
|
|
gpr_strdup(locality_serverlist[i]->locality_name)); |
|
|
|
|
auto iter = map_.find(locality_name); |
|
|
|
|
if (iter == map_.end()) { |
|
|
|
|
OrphanablePtr<LocalityEntry> new_entry = |
|
|
|
|
MakeOrphanable<LocalityEntry>(parent->Ref()); |
|
|
|
|
MutexLock lock(&child_refs_mu_); |
|
|
|
|
iter = map_.emplace(std::move(locality_name), std::move(new_entry)).first; |
|
|
|
|
} |
|
|
|
|
// Don't create new child policies if not directed to
|
|
|
|
|
xds_grpclb_serverlist* serverlist = |
|
|
|
|
parent->locality_serverlist_[i]->serverlist; |
|
|
|
|
iter->second->UpdateLocked(serverlist, child_policy_config, args); |
|
|
|
|
} |
|
|
|
|
PruneLocalities(locality_serverlist); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_core::XdsLb::LocalityMap::ShutdownLocked() { |
|
|
|
|
MutexLock lock(&child_refs_mu_); |
|
|
|
|
map_.clear(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_core::XdsLb::LocalityMap::ResetBackoffLocked() { |
|
|
|
|
for (auto iter = map_.begin(); iter != map_.end(); iter++) { |
|
|
|
|
iter->second->ResetBackoffLocked(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_core::XdsLb::LocalityMap::FillChildRefsForChannelz( |
|
|
|
|
channelz::ChildRefsList* child_subchannels, |
|
|
|
|
channelz::ChildRefsList* child_channels) { |
|
|
|
|
MutexLock lock(&child_refs_mu_); |
|
|
|
|
for (auto iter = map_.begin(); iter != map_.end(); iter++) { |
|
|
|
|
iter->second->FillChildRefsForChannelz(child_subchannels, child_channels); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() { |
|
|
|
|
// Locality Entry child policy methods
|
|
|
|
|
|
|
|
|
|
grpc_channel_args* |
|
|
|
|
XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyArgsLocked( |
|
|
|
|
const grpc_channel_args* args_in) { |
|
|
|
|
const grpc_arg args_to_add[] = { |
|
|
|
|
// A channel arg indicating if the target is a backend inferred from a
|
|
|
|
|
// grpclb load balancer.
|
|
|
|
@ -1368,15 +1372,16 @@ grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() { |
|
|
|
|
grpc_channel_arg_integer_create( |
|
|
|
|
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1), |
|
|
|
|
}; |
|
|
|
|
return grpc_channel_args_copy_and_add(args_, args_to_add, |
|
|
|
|
return grpc_channel_args_copy_and_add(args_in, args_to_add, |
|
|
|
|
GPR_ARRAY_SIZE(args_to_add)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> XdsLb::CreateChildPolicyLocked( |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> |
|
|
|
|
XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyLocked( |
|
|
|
|
const char* name, const grpc_channel_args* args) { |
|
|
|
|
Helper* helper = New<Helper>(Ref()); |
|
|
|
|
Helper* helper = New<Helper>(this->Ref()); |
|
|
|
|
LoadBalancingPolicy::Args lb_policy_args; |
|
|
|
|
lb_policy_args.combiner = combiner(); |
|
|
|
|
lb_policy_args.combiner = parent_->combiner(); |
|
|
|
|
lb_policy_args.args = args; |
|
|
|
|
lb_policy_args.channel_control_helper = |
|
|
|
|
UniquePtr<ChannelControlHelper>(helper); |
|
|
|
@ -1397,22 +1402,27 @@ OrphanablePtr<LoadBalancingPolicy> XdsLb::CreateChildPolicyLocked( |
|
|
|
|
// child policy. This will make the child policy progress upon activity on xDS
|
|
|
|
|
// LB, which in turn is tied to the application's call.
|
|
|
|
|
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), |
|
|
|
|
interested_parties()); |
|
|
|
|
parent_->interested_parties()); |
|
|
|
|
return lb_policy; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::CreateOrUpdateChildPolicyLocked() { |
|
|
|
|
if (shutting_down_) return; |
|
|
|
|
void XdsLb::LocalityMap::LocalityEntry::UpdateLocked( |
|
|
|
|
xds_grpclb_serverlist* serverlist, |
|
|
|
|
LoadBalancingPolicy::Config* child_policy_config, |
|
|
|
|
const grpc_channel_args* args_in) { |
|
|
|
|
if (parent_->shutting_down_) return; |
|
|
|
|
// This should never be invoked if we do not have serverlist_, as fallback
|
|
|
|
|
// mode is disabled for xDS plugin.
|
|
|
|
|
// TODO(juanlishen): Change this as part of implementing fallback mode.
|
|
|
|
|
GPR_ASSERT(serverlist_ != nullptr); |
|
|
|
|
GPR_ASSERT(serverlist_->num_servers > 0); |
|
|
|
|
GPR_ASSERT(serverlist != nullptr); |
|
|
|
|
GPR_ASSERT(serverlist->num_servers > 0); |
|
|
|
|
// Construct update args.
|
|
|
|
|
UpdateArgs update_args; |
|
|
|
|
update_args.addresses = ProcessServerlist(serverlist_); |
|
|
|
|
update_args.config = child_policy_config_; |
|
|
|
|
update_args.args = CreateChildPolicyArgsLocked(); |
|
|
|
|
update_args.addresses = ProcessServerlist(serverlist); |
|
|
|
|
update_args.config = |
|
|
|
|
child_policy_config == nullptr ? nullptr : child_policy_config->Ref(); |
|
|
|
|
update_args.args = CreateChildPolicyArgsLocked(args_in); |
|
|
|
|
|
|
|
|
|
// If the child policy name changes, we need to create a new child
|
|
|
|
|
// policy. When this happens, we leave child_policy_ as-is and store
|
|
|
|
|
// the new child policy in pending_child_policy_. Once the new child
|
|
|
|
@ -1464,9 +1474,9 @@ void XdsLb::CreateOrUpdateChildPolicyLocked() { |
|
|
|
|
// when the new child transitions into state READY.
|
|
|
|
|
// TODO(juanlishen): If the child policy is not configured via service config,
|
|
|
|
|
// use whatever algorithm is specified by the balancer.
|
|
|
|
|
const char* child_policy_name = child_policy_config_ == nullptr |
|
|
|
|
const char* child_policy_name = child_policy_config == nullptr |
|
|
|
|
? "round_robin" |
|
|
|
|
: child_policy_config_->name(); |
|
|
|
|
: child_policy_config->name(); |
|
|
|
|
const bool create_policy = |
|
|
|
|
// case 1
|
|
|
|
|
child_policy_ == nullptr || |
|
|
|
@ -1512,6 +1522,145 @@ void XdsLb::CreateOrUpdateChildPolicyLocked() { |
|
|
|
|
policy_to_update->UpdateLocked(std::move(update_args)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::LocalityEntry::ShutdownLocked() { |
|
|
|
|
// Remove the child policy's interested_parties pollset_set from the
|
|
|
|
|
// xDS policy.
|
|
|
|
|
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), |
|
|
|
|
parent_->interested_parties()); |
|
|
|
|
if (pending_child_policy_ != nullptr) { |
|
|
|
|
grpc_pollset_set_del_pollset_set( |
|
|
|
|
pending_child_policy_->interested_parties(), |
|
|
|
|
parent_->interested_parties()); |
|
|
|
|
} |
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&child_policy_mu_); |
|
|
|
|
child_policy_.reset(); |
|
|
|
|
pending_child_policy_.reset(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::LocalityEntry::ResetBackoffLocked() { |
|
|
|
|
child_policy_->ResetBackoffLocked(); |
|
|
|
|
if (pending_child_policy_ != nullptr) { |
|
|
|
|
pending_child_policy_->ResetBackoffLocked(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::LocalityEntry::FillChildRefsForChannelz( |
|
|
|
|
channelz::ChildRefsList* child_subchannels, |
|
|
|
|
channelz::ChildRefsList* child_channels) { |
|
|
|
|
MutexLock lock(&child_policy_mu_); |
|
|
|
|
child_policy_->FillChildRefsForChannelz(child_subchannels, child_channels); |
|
|
|
|
if (pending_child_policy_ != nullptr) { |
|
|
|
|
pending_child_policy_->FillChildRefsForChannelz(child_subchannels, |
|
|
|
|
child_channels); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::LocalityEntry::Orphan() { |
|
|
|
|
ShutdownLocked(); |
|
|
|
|
Unref(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// LocalityEntry::Helper implementation
|
|
|
|
|
//
|
|
|
|
|
bool XdsLb::LocalityMap::LocalityEntry::Helper::CalledByPendingChild() const { |
|
|
|
|
GPR_ASSERT(child_ != nullptr); |
|
|
|
|
return child_ == entry_->pending_child_policy_.get(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool XdsLb::LocalityMap::LocalityEntry::Helper::CalledByCurrentChild() const { |
|
|
|
|
GPR_ASSERT(child_ != nullptr); |
|
|
|
|
return child_ == entry_->child_policy_.get(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Subchannel* XdsLb::LocalityMap::LocalityEntry::Helper::CreateSubchannel( |
|
|
|
|
const grpc_channel_args& args) { |
|
|
|
|
if (entry_->parent_->shutting_down_ || |
|
|
|
|
(!CalledByPendingChild() && !CalledByCurrentChild())) { |
|
|
|
|
return nullptr; |
|
|
|
|
} |
|
|
|
|
return entry_->parent_->channel_control_helper()->CreateSubchannel(args); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_channel* XdsLb::LocalityMap::LocalityEntry::Helper::CreateChannel( |
|
|
|
|
const char* target, const grpc_channel_args& args) { |
|
|
|
|
if (entry_->parent_->shutting_down_ || |
|
|
|
|
(!CalledByPendingChild() && !CalledByCurrentChild())) { |
|
|
|
|
return nullptr; |
|
|
|
|
} |
|
|
|
|
return entry_->parent_->channel_control_helper()->CreateChannel(target, args); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState( |
|
|
|
|
grpc_connectivity_state state, grpc_error* state_error, |
|
|
|
|
UniquePtr<SubchannelPicker> picker) { |
|
|
|
|
if (entry_->parent_->shutting_down_) { |
|
|
|
|
GRPC_ERROR_UNREF(state_error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// If this request is from the pending child policy, ignore it until
|
|
|
|
|
// it reports READY, at which point we swap it into place.
|
|
|
|
|
if (CalledByPendingChild()) { |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p helper %p] pending child policy %p reports state=%s", |
|
|
|
|
entry_->parent_.get(), this, entry_->pending_child_policy_.get(), |
|
|
|
|
grpc_connectivity_state_name(state)); |
|
|
|
|
} |
|
|
|
|
if (state != GRPC_CHANNEL_READY) { |
|
|
|
|
GRPC_ERROR_UNREF(state_error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
grpc_pollset_set_del_pollset_set( |
|
|
|
|
entry_->child_policy_->interested_parties(), |
|
|
|
|
entry_->parent_->interested_parties()); |
|
|
|
|
MutexLock lock(&entry_->child_policy_mu_); |
|
|
|
|
entry_->child_policy_ = std::move(entry_->pending_child_policy_); |
|
|
|
|
} else if (!CalledByCurrentChild()) { |
|
|
|
|
// This request is from an outdated child, so ignore it.
|
|
|
|
|
GRPC_ERROR_UNREF(state_error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// TODO(juanlishen): When in fallback mode, pass the child picker
|
|
|
|
|
// through without wrapping it. (Or maybe use a different helper for
|
|
|
|
|
// the fallback policy?)
|
|
|
|
|
GPR_ASSERT(entry_->parent_->lb_chand_ != nullptr); |
|
|
|
|
RefCountedPtr<XdsLbClientStats> client_stats = |
|
|
|
|
entry_->parent_->lb_chand_->lb_calld() == nullptr |
|
|
|
|
? nullptr |
|
|
|
|
: entry_->parent_->lb_chand_->lb_calld()->client_stats(); |
|
|
|
|
entry_->parent_->channel_control_helper()->UpdateState( |
|
|
|
|
state, state_error, |
|
|
|
|
UniquePtr<SubchannelPicker>( |
|
|
|
|
New<Picker>(std::move(picker), std::move(client_stats)))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::LocalityEntry::Helper::RequestReresolution() { |
|
|
|
|
if (entry_->parent_->shutting_down_) return; |
|
|
|
|
// If there is a pending child policy, ignore re-resolution requests
|
|
|
|
|
// from the current child policy (or any outdated child).
|
|
|
|
|
if (entry_->pending_child_policy_ != nullptr && !CalledByPendingChild()) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Re-resolution requested from the internal RR policy " |
|
|
|
|
"(%p).", |
|
|
|
|
entry_->parent_.get(), entry_->child_policy_.get()); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(entry_->parent_->lb_chand_ != nullptr); |
|
|
|
|
// If we are talking to a balancer, we expect to get updated addresses
|
|
|
|
|
// from the balancer, so we can ignore the re-resolution request from
|
|
|
|
|
// the child policy. Otherwise, pass the re-resolution request up to the
|
|
|
|
|
// channel.
|
|
|
|
|
if (entry_->parent_->lb_chand_->lb_calld() == nullptr || |
|
|
|
|
!entry_->parent_->lb_chand_->lb_calld()->seen_initial_response()) { |
|
|
|
|
entry_->parent_->channel_control_helper()->RequestReresolution(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// factory
|
|
|
|
|
//
|
|
|
|
|