|
|
|
@ -333,6 +333,8 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
explicit FallbackHelper(RefCountedPtr<XdsLb> parent) |
|
|
|
|
: parent_(std::move(parent)) {} |
|
|
|
|
|
|
|
|
|
~FallbackHelper() { parent_.reset(DEBUG_LOCATION, "FallbackHelper"); } |
|
|
|
|
|
|
|
|
|
RefCountedPtr<SubchannelInterface> CreateSubchannel( |
|
|
|
|
const grpc_channel_args& args) override; |
|
|
|
|
grpc_channel* CreateChannel(const char* target, |
|
|
|
@ -377,19 +379,30 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
strcmp(subzone_.get(), other.subzone_.get()) == 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const char* AsHumanReadableString() { |
|
|
|
|
if (human_readable_string_ == nullptr) { |
|
|
|
|
char* tmp; |
|
|
|
|
gpr_asprintf(&tmp, "{region=\"%s\", zone=\"%s\", subzone=\"%s\"}", |
|
|
|
|
region_.get(), zone_.get(), subzone_.get()); |
|
|
|
|
human_readable_string_.reset(tmp); |
|
|
|
|
} |
|
|
|
|
return human_readable_string_.get(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
UniquePtr<char> region_; |
|
|
|
|
UniquePtr<char> zone_; |
|
|
|
|
UniquePtr<char> subzone_; |
|
|
|
|
UniquePtr<char> human_readable_string_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class LocalityMap { |
|
|
|
|
public: |
|
|
|
|
class LocalityEntry : public InternallyRefCounted<LocalityEntry> { |
|
|
|
|
public: |
|
|
|
|
LocalityEntry(RefCountedPtr<XdsLb> parent, uint32_t locality_weight) |
|
|
|
|
: parent_(std::move(parent)), locality_weight_(locality_weight) {} |
|
|
|
|
~LocalityEntry() = default; |
|
|
|
|
LocalityEntry(RefCountedPtr<XdsLb> parent, |
|
|
|
|
RefCountedPtr<LocalityName> name, uint32_t locality_weight); |
|
|
|
|
~LocalityEntry(); |
|
|
|
|
|
|
|
|
|
void UpdateLocked(xds_grpclb_serverlist* serverlist, |
|
|
|
|
LoadBalancingPolicy::Config* child_policy_config, |
|
|
|
@ -404,6 +417,8 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
explicit Helper(RefCountedPtr<LocalityEntry> entry) |
|
|
|
|
: entry_(std::move(entry)) {} |
|
|
|
|
|
|
|
|
|
~Helper() { entry_.reset(DEBUG_LOCATION, "Helper"); } |
|
|
|
|
|
|
|
|
|
RefCountedPtr<SubchannelInterface> CreateSubchannel( |
|
|
|
|
const grpc_channel_args& args) override; |
|
|
|
|
grpc_channel* CreateChannel(const char* target, |
|
|
|
@ -428,9 +443,10 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
grpc_channel_args* CreateChildPolicyArgsLocked( |
|
|
|
|
const grpc_channel_args* args); |
|
|
|
|
|
|
|
|
|
RefCountedPtr<XdsLb> parent_; |
|
|
|
|
RefCountedPtr<LocalityName> name_; |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> child_policy_; |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_; |
|
|
|
|
RefCountedPtr<XdsLb> parent_; |
|
|
|
|
RefCountedPtr<PickerRef> picker_ref_; |
|
|
|
|
grpc_connectivity_state connectivity_state_; |
|
|
|
|
uint32_t locality_weight_; |
|
|
|
@ -743,7 +759,7 @@ ServerAddressList ProcessServerlist(const xds_grpclb_serverlist* serverlist) { |
|
|
|
|
|
|
|
|
|
XdsLb::BalancerChannelState::BalancerChannelState( |
|
|
|
|
const char* balancer_name, const grpc_channel_args& args, |
|
|
|
|
grpc_core::RefCountedPtr<grpc_core::XdsLb> parent_xdslb_policy) |
|
|
|
|
RefCountedPtr<XdsLb> parent_xdslb_policy) |
|
|
|
|
: InternallyRefCounted<BalancerChannelState>(&grpc_lb_xds_trace), |
|
|
|
|
xdslb_policy_(std::move(parent_xdslb_policy)), |
|
|
|
|
lb_call_backoff_( |
|
|
|
@ -763,6 +779,7 @@ XdsLb::BalancerChannelState::BalancerChannelState( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
XdsLb::BalancerChannelState::~BalancerChannelState() { |
|
|
|
|
xdslb_policy_.reset(DEBUG_LOCATION, "BalancerChannelState"); |
|
|
|
|
grpc_channel_destroy(channel_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1414,12 +1431,18 @@ XdsLb::XdsLb(Args args) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
XdsLb::~XdsLb() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] destroying xds LB policy", this); |
|
|
|
|
} |
|
|
|
|
gpr_free((void*)server_name_); |
|
|
|
|
grpc_channel_args_destroy(args_); |
|
|
|
|
locality_serverlist_.clear(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::ShutdownLocked() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] shutting down", this); |
|
|
|
|
} |
|
|
|
|
shutting_down_ = true; |
|
|
|
|
if (fallback_at_startup_checks_pending_) { |
|
|
|
|
grpc_timer_cancel(&lb_fallback_timer_); |
|
|
|
@ -1486,8 +1509,9 @@ void XdsLb::ProcessAddressesAndChannelArgsLocked( |
|
|
|
|
} |
|
|
|
|
if (create_lb_channel) { |
|
|
|
|
OrphanablePtr<BalancerChannelState> lb_chand = |
|
|
|
|
MakeOrphanable<BalancerChannelState>(balancer_name_.get(), |
|
|
|
|
*lb_channel_args, Ref()); |
|
|
|
|
MakeOrphanable<BalancerChannelState>( |
|
|
|
|
balancer_name_.get(), *lb_channel_args, |
|
|
|
|
Ref(DEBUG_LOCATION, "BalancerChannelState")); |
|
|
|
|
if (lb_chand_ == nullptr || !lb_chand_->HasActiveCall()) { |
|
|
|
|
GPR_ASSERT(pending_lb_chand_ == nullptr); |
|
|
|
|
// If we do not have a working LB channel yet, use the newly created one.
|
|
|
|
@ -1676,7 +1700,8 @@ void XdsLb::UpdateFallbackPolicyLocked() { |
|
|
|
|
|
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> XdsLb::CreateFallbackPolicyLocked( |
|
|
|
|
const char* name, const grpc_channel_args* args) { |
|
|
|
|
FallbackHelper* helper = New<FallbackHelper>(Ref()); |
|
|
|
|
FallbackHelper* helper = |
|
|
|
|
New<FallbackHelper>(Ref(DEBUG_LOCATION, "FallbackHelper")); |
|
|
|
|
LoadBalancingPolicy::Args lb_policy_args; |
|
|
|
|
lb_policy_args.combiner = combiner(); |
|
|
|
|
lb_policy_args.args = args; |
|
|
|
@ -1739,7 +1764,9 @@ void XdsLb::LocalityMap::UpdateLocked( |
|
|
|
|
auto iter = map_.find(locality_serverlist[i]->locality_name); |
|
|
|
|
if (iter == map_.end()) { |
|
|
|
|
OrphanablePtr<LocalityEntry> new_entry = MakeOrphanable<LocalityEntry>( |
|
|
|
|
parent->Ref(), locality_serverlist[i]->locality_weight); |
|
|
|
|
parent->Ref(DEBUG_LOCATION, "LocalityEntry"), |
|
|
|
|
locality_serverlist[i]->locality_name, |
|
|
|
|
locality_serverlist[i]->locality_weight); |
|
|
|
|
iter = map_.emplace(locality_serverlist[i]->locality_name, |
|
|
|
|
std::move(new_entry)) |
|
|
|
|
.first; |
|
|
|
@ -1764,6 +1791,27 @@ void XdsLb::LocalityMap::ResetBackoffLocked() { |
|
|
|
|
// XdsLb::LocalityMap::LocalityEntry
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
XdsLb::LocalityMap::LocalityEntry::LocalityEntry( |
|
|
|
|
RefCountedPtr<XdsLb> parent, RefCountedPtr<LocalityName> name, |
|
|
|
|
uint32_t locality_weight) |
|
|
|
|
: parent_(std::move(parent)), |
|
|
|
|
name_(std::move(name)), |
|
|
|
|
locality_weight_(locality_weight) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] created LocalityEntry %p for %s", |
|
|
|
|
parent_.get(), this, name_->AsHumanReadableString()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
XdsLb::LocalityMap::LocalityEntry::~LocalityEntry() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] LocalityEntry %p %s: destroying locality entry", |
|
|
|
|
parent_.get(), this, name_->AsHumanReadableString()); |
|
|
|
|
} |
|
|
|
|
parent_.reset(DEBUG_LOCATION, "LocalityEntry"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_channel_args* |
|
|
|
|
XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyArgsLocked( |
|
|
|
|
const grpc_channel_args* args_in) { |
|
|
|
@ -1785,7 +1833,7 @@ XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyArgsLocked( |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> |
|
|
|
|
XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyLocked( |
|
|
|
|
const char* name, const grpc_channel_args* args) { |
|
|
|
|
Helper* helper = New<Helper>(this->Ref()); |
|
|
|
|
Helper* helper = New<Helper>(this->Ref(DEBUG_LOCATION, "Helper")); |
|
|
|
|
LoadBalancingPolicy::Args lb_policy_args; |
|
|
|
|
lb_policy_args.combiner = parent_->combiner(); |
|
|
|
|
lb_policy_args.args = args; |
|
|
|
@ -1795,13 +1843,16 @@ XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyLocked( |
|
|
|
|
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( |
|
|
|
|
name, std::move(lb_policy_args)); |
|
|
|
|
if (GPR_UNLIKELY(lb_policy == nullptr)) { |
|
|
|
|
gpr_log(GPR_ERROR, "[xdslb %p] Failure creating child policy %s", this, |
|
|
|
|
name); |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"[xdslb %p] LocalityEntry %p %s: failure creating child policy %s", |
|
|
|
|
parent_.get(), this, name_->AsHumanReadableString(), name); |
|
|
|
|
return nullptr; |
|
|
|
|
} |
|
|
|
|
helper->set_child(lb_policy.get()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] Created new child policy %s (%p)", this, name, |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] LocalityEntry %p %s: Created new child policy %s (%p)", |
|
|
|
|
parent_.get(), this, name_->AsHumanReadableString(), name, |
|
|
|
|
lb_policy.get()); |
|
|
|
|
} |
|
|
|
|
// Add the xDS's interested_parties pollset_set to that of the newly created
|
|
|
|
@ -1892,7 +1943,9 @@ void XdsLb::LocalityMap::LocalityEntry::UpdateLocked( |
|
|
|
|
// If child_policy_ is null, we set it (case 1), else we set
|
|
|
|
|
// pending_child_policy_ (cases 2b and 3b).
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] Creating new %schild policy %s", this, |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] LocalityEntry %p %s: Creating new %schild policy %s", |
|
|
|
|
parent_.get(), this, name_->AsHumanReadableString(), |
|
|
|
|
child_policy_ == nullptr ? "" : "pending ", child_policy_name); |
|
|
|
|
} |
|
|
|
|
auto& lb_policy = |
|
|
|
@ -1910,7 +1963,9 @@ void XdsLb::LocalityMap::LocalityEntry::UpdateLocked( |
|
|
|
|
GPR_ASSERT(policy_to_update != nullptr); |
|
|
|
|
// Update the policy.
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] Updating %schild policy %p", this, |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] LocalityEntry %p %s: Updating %schild policy %p", |
|
|
|
|
parent_.get(), this, name_->AsHumanReadableString(), |
|
|
|
|
policy_to_update == pending_child_policy_.get() ? "pending " : "", |
|
|
|
|
policy_to_update); |
|
|
|
|
} |
|
|
|
@ -1918,18 +1973,23 @@ void XdsLb::LocalityMap::LocalityEntry::UpdateLocked( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::LocalityEntry::ShutdownLocked() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] LocalityEntry %p %s: shutting down locality entry", |
|
|
|
|
parent_.get(), this, name_->AsHumanReadableString()); |
|
|
|
|
} |
|
|
|
|
// Remove the child policy's interested_parties pollset_set from the
|
|
|
|
|
// xDS policy.
|
|
|
|
|
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), |
|
|
|
|
parent_->interested_parties()); |
|
|
|
|
child_policy_.reset(); |
|
|
|
|
if (pending_child_policy_ != nullptr) { |
|
|
|
|
grpc_pollset_set_del_pollset_set( |
|
|
|
|
pending_child_policy_->interested_parties(), |
|
|
|
|
parent_->interested_parties()); |
|
|
|
|
} |
|
|
|
|
child_policy_.reset(); |
|
|
|
|
pending_child_policy_.reset(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::LocalityEntry::ResetBackoffLocked() { |
|
|
|
|
child_policy_->ResetBackoffLocked(); |
|
|
|
@ -2061,11 +2121,13 @@ void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState( |
|
|
|
|
} else if (num_connecting > 0) { |
|
|
|
|
entry_->parent_->channel_control_helper()->UpdateState( |
|
|
|
|
GRPC_CHANNEL_CONNECTING, |
|
|
|
|
UniquePtr<SubchannelPicker>(New<QueuePicker>(this->entry_->parent_))); |
|
|
|
|
UniquePtr<SubchannelPicker>(New<QueuePicker>( |
|
|
|
|
this->entry_->parent_->Ref(DEBUG_LOCATION, "QueuePicker")))); |
|
|
|
|
} else if (num_idle > 0) { |
|
|
|
|
entry_->parent_->channel_control_helper()->UpdateState( |
|
|
|
|
GRPC_CHANNEL_IDLE, |
|
|
|
|
UniquePtr<SubchannelPicker>(New<QueuePicker>(this->entry_->parent_))); |
|
|
|
|
UniquePtr<SubchannelPicker>(New<QueuePicker>( |
|
|
|
|
this->entry_->parent_->Ref(DEBUG_LOCATION, "QueuePicker")))); |
|
|
|
|
} else { |
|
|
|
|
GPR_ASSERT(num_transient_failures == locality_map.size()); |
|
|
|
|
grpc_error* error = |
|
|
|
|