|
|
|
@ -109,6 +109,7 @@ |
|
|
|
|
#define GRPC_XDS_RECONNECT_JITTER 0.2 |
|
|
|
|
#define GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS 10000 |
|
|
|
|
#define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000 |
|
|
|
|
#define GRPC_XDS_DEFAULT_LOCALITY_RETENTION_INTERVAL_MS (15 * 60 * 1000) |
|
|
|
|
|
|
|
|
|
namespace grpc_core { |
|
|
|
|
|
|
|
|
@ -452,15 +453,15 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
class LocalityEntry : public InternallyRefCounted<LocalityEntry> { |
|
|
|
|
public: |
|
|
|
|
LocalityEntry(RefCountedPtr<XdsLb> parent, |
|
|
|
|
RefCountedPtr<XdsLocalityName> name, |
|
|
|
|
uint32_t locality_weight); |
|
|
|
|
RefCountedPtr<XdsLocalityName> name); |
|
|
|
|
~LocalityEntry(); |
|
|
|
|
|
|
|
|
|
void UpdateLocked(ServerAddressList serverlist, |
|
|
|
|
void UpdateLocked(uint32_t locality_weight, ServerAddressList serverlist, |
|
|
|
|
LoadBalancingPolicy::Config* child_policy_config, |
|
|
|
|
const grpc_channel_args* args); |
|
|
|
|
void ShutdownLocked(); |
|
|
|
|
void ResetBackoffLocked(); |
|
|
|
|
void DeactivateLocked(); |
|
|
|
|
void Orphan() override; |
|
|
|
|
|
|
|
|
|
grpc_connectivity_state connectivity_state() const { |
|
|
|
@ -504,6 +505,8 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
grpc_channel_args* CreateChildPolicyArgsLocked( |
|
|
|
|
const grpc_channel_args* args); |
|
|
|
|
|
|
|
|
|
static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); |
|
|
|
|
|
|
|
|
|
RefCountedPtr<XdsLb> parent_; |
|
|
|
|
RefCountedPtr<XdsLocalityName> name_; |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> child_policy_; |
|
|
|
@ -511,20 +514,22 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
RefCountedPtr<PickerWrapper> picker_wrapper_; |
|
|
|
|
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE; |
|
|
|
|
uint32_t locality_weight_; |
|
|
|
|
grpc_closure on_delayed_removal_timer_; |
|
|
|
|
grpc_timer delayed_removal_timer_; |
|
|
|
|
bool delayed_removal_timer_callback_pending_ = false; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
explicit LocalityMap(XdsLb* xds_policy) : xds_policy_(xds_policy) {} |
|
|
|
|
|
|
|
|
|
void UpdateLocked(const XdsLocalityList& locality_list, |
|
|
|
|
LoadBalancingPolicy::Config* child_policy_config, |
|
|
|
|
const grpc_channel_args* args, XdsLb* parent); |
|
|
|
|
const grpc_channel_args* args, XdsLb* parent, |
|
|
|
|
bool is_initial_update = false); |
|
|
|
|
void UpdateXdsPickerLocked(); |
|
|
|
|
void ShutdownLocked(); |
|
|
|
|
void ResetBackoffLocked(); |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
void PruneLocalities(const XdsLocalityList& locality_list); |
|
|
|
|
|
|
|
|
|
XdsLb* xds_policy_; |
|
|
|
|
Map<RefCountedPtr<XdsLocalityName>, OrphanablePtr<LocalityEntry>, |
|
|
|
|
XdsLocalityName::Less> |
|
|
|
@ -602,6 +607,7 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
|
|
|
|
|
// The policy to use for the backends.
|
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config_; |
|
|
|
|
const grpc_millis locality_retention_interval_ms_; |
|
|
|
|
// Map of policies to use in the backend
|
|
|
|
|
LocalityMap locality_map_; |
|
|
|
|
// TODO(mhaidry) : Add support for multiple maps of localities
|
|
|
|
@ -1711,6 +1717,9 @@ XdsLb::XdsLb(Args args) |
|
|
|
|
lb_fallback_timeout_ms_(grpc_channel_args_find_integer( |
|
|
|
|
args.args, GRPC_ARG_XDS_FALLBACK_TIMEOUT_MS, |
|
|
|
|
{GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX})), |
|
|
|
|
locality_retention_interval_ms_(grpc_channel_args_find_integer( |
|
|
|
|
args.args, GRPC_ARG_LOCALITY_RETENTION_INTERVAL_MS, |
|
|
|
|
{GRPC_XDS_DEFAULT_LOCALITY_RETENTION_INTERVAL_MS, 0, INT_MAX})), |
|
|
|
|
locality_map_(this) { |
|
|
|
|
// Record server name.
|
|
|
|
|
const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); |
|
|
|
@ -1837,7 +1846,7 @@ void XdsLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
} |
|
|
|
|
ProcessAddressesAndChannelArgsLocked(std::move(args.addresses), *args.args); |
|
|
|
|
locality_map_.UpdateLocked(locality_list_, child_policy_config_.get(), args_, |
|
|
|
|
this); |
|
|
|
|
this, is_initial_update); |
|
|
|
|
// Update the existing fallback policy. The fallback policy config and/or the
|
|
|
|
|
// fallback addresses may be new.
|
|
|
|
|
if (fallback_policy_ != nullptr) UpdateFallbackPolicyLocked(); |
|
|
|
@ -2035,27 +2044,12 @@ void XdsLb::MaybeExitFallbackMode() { |
|
|
|
|
// XdsLb::LocalityMap
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::PruneLocalities(const XdsLocalityList& locality_list) { |
|
|
|
|
for (auto iter = map_.begin(); iter != map_.end();) { |
|
|
|
|
bool found = false; |
|
|
|
|
for (size_t i = 0; i < locality_list.size(); i++) { |
|
|
|
|
if (*locality_list[i].locality_name == *iter->first) { |
|
|
|
|
found = true; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (!found) { // Remove entries not present in the locality list.
|
|
|
|
|
iter = map_.erase(iter); |
|
|
|
|
} else |
|
|
|
|
iter++; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::UpdateLocked( |
|
|
|
|
const XdsLocalityList& locality_list, |
|
|
|
|
LoadBalancingPolicy::Config* child_policy_config, |
|
|
|
|
const grpc_channel_args* args, XdsLb* parent) { |
|
|
|
|
const grpc_channel_args* args, XdsLb* parent, bool is_initial_update) { |
|
|
|
|
if (parent->shutting_down_) return; |
|
|
|
|
// Add or update the localities in locality_list.
|
|
|
|
|
for (size_t i = 0; i < locality_list.size(); i++) { |
|
|
|
|
auto& locality_name = locality_list[i].locality_name; |
|
|
|
|
auto iter = map_.find(locality_name); |
|
|
|
@ -2063,19 +2057,35 @@ void XdsLb::LocalityMap::UpdateLocked( |
|
|
|
|
// locality list.
|
|
|
|
|
if (iter == map_.end()) { |
|
|
|
|
OrphanablePtr<LocalityEntry> new_entry = MakeOrphanable<LocalityEntry>( |
|
|
|
|
parent->Ref(DEBUG_LOCATION, "LocalityEntry"), locality_name, |
|
|
|
|
locality_list[i].lb_weight); |
|
|
|
|
parent->Ref(DEBUG_LOCATION, "LocalityEntry"), locality_name); |
|
|
|
|
iter = map_.emplace(locality_name, std::move(new_entry)).first; |
|
|
|
|
} |
|
|
|
|
// Keep a copy of serverlist in locality_list_ so that we can compare it
|
|
|
|
|
// with the future ones.
|
|
|
|
|
iter->second->UpdateLocked(locality_list[i].serverlist, child_policy_config, |
|
|
|
|
iter->second->UpdateLocked(locality_list[i].lb_weight, |
|
|
|
|
locality_list[i].serverlist, child_policy_config, |
|
|
|
|
args); |
|
|
|
|
} |
|
|
|
|
PruneLocalities(locality_list); |
|
|
|
|
// Remove (later) the localities not in locality_list.
|
|
|
|
|
for (auto& p : map_) { |
|
|
|
|
const XdsLocalityName* locality_name = p.first.get(); |
|
|
|
|
LocalityEntry* locality_entry = p.second.get(); |
|
|
|
|
bool in_locality_list = false; |
|
|
|
|
for (size_t i = 0; i < locality_list.size(); ++i) { |
|
|
|
|
if (*locality_list[i].locality_name == *locality_name) { |
|
|
|
|
in_locality_list = true; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (!in_locality_list) locality_entry->DeactivateLocked(); |
|
|
|
|
} |
|
|
|
|
// Generate a new xds picker immediately.
|
|
|
|
|
if (!is_initial_update) UpdateXdsPickerLocked(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::UpdateXdsPickerLocked() { |
|
|
|
|
// If we are in fallback mode, don't generate an xds picker from localities.
|
|
|
|
|
if (xds_policy_->fallback_policy_ != nullptr) return; |
|
|
|
|
// Construct a new xds picker which maintains a map of all locality pickers
|
|
|
|
|
// that are ready. Each locality is represented by a portion of the range
|
|
|
|
|
// proportional to its weight, such that the total range is the sum of the
|
|
|
|
@ -2086,23 +2096,8 @@ void XdsLb::LocalityMap::UpdateXdsPickerLocked() { |
|
|
|
|
size_t num_transient_failures = 0; |
|
|
|
|
Picker::PickerList pickers; |
|
|
|
|
for (auto& p : map_) { |
|
|
|
|
// TODO(juanlishen): We should prune a locality (and kill its stats) after
|
|
|
|
|
// we know we won't pick from it. We need to improve our update logic to
|
|
|
|
|
// make that easier. Consider the following situation: the current map has
|
|
|
|
|
// two READY localities A and B, and the update only contains B with the
|
|
|
|
|
// same addresses as before. Without the following hack, we will generate
|
|
|
|
|
// the same picker containing A and B because we haven't pruned A when the
|
|
|
|
|
// update happens. Remove the for loop below once we implement the locality
|
|
|
|
|
// map update.
|
|
|
|
|
bool in_locality_list = false; |
|
|
|
|
for (size_t i = 0; i < xds_policy_->locality_list_.size(); ++i) { |
|
|
|
|
if (*xds_policy_->locality_list_[i].locality_name == *p.first) { |
|
|
|
|
in_locality_list = true; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (!in_locality_list) continue; |
|
|
|
|
const LocalityEntry* entry = p.second.get(); |
|
|
|
|
if (entry->locality_weight() == 0) continue; |
|
|
|
|
switch (entry->connectivity_state()) { |
|
|
|
|
case GRPC_CHANNEL_READY: { |
|
|
|
|
end += entry->locality_weight(); |
|
|
|
@ -2121,10 +2116,8 @@ void XdsLb::LocalityMap::UpdateXdsPickerLocked() { |
|
|
|
|
num_transient_failures++; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
default: { |
|
|
|
|
gpr_log(GPR_ERROR, "Invalid locality connectivity state - %d", |
|
|
|
|
entry->connectivity_state()); |
|
|
|
|
} |
|
|
|
|
default: |
|
|
|
|
GPR_UNREACHABLE_CODE(return ); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Pass on the constructed xds picker if it has any ready pickers in their map
|
|
|
|
@ -2148,11 +2141,9 @@ void XdsLb::LocalityMap::UpdateXdsPickerLocked() { |
|
|
|
|
UniquePtr<SubchannelPicker>( |
|
|
|
|
New<QueuePicker>(xds_policy_->Ref(DEBUG_LOCATION, "QueuePicker")))); |
|
|
|
|
} else { |
|
|
|
|
GPR_ASSERT(num_transient_failures == |
|
|
|
|
xds_policy_->locality_map_.map_.size()); |
|
|
|
|
grpc_error* error = |
|
|
|
|
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"connections to all localities failing"), |
|
|
|
|
"connections to all active localities failing"), |
|
|
|
|
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); |
|
|
|
|
xds_policy_->channel_control_helper()->UpdateState( |
|
|
|
|
GRPC_CHANNEL_TRANSIENT_FAILURE, |
|
|
|
@ -2173,15 +2164,14 @@ void XdsLb::LocalityMap::ResetBackoffLocked() { |
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
XdsLb::LocalityMap::LocalityEntry::LocalityEntry( |
|
|
|
|
RefCountedPtr<XdsLb> parent, RefCountedPtr<XdsLocalityName> name, |
|
|
|
|
uint32_t locality_weight) |
|
|
|
|
: parent_(std::move(parent)), |
|
|
|
|
name_(std::move(name)), |
|
|
|
|
locality_weight_(locality_weight) { |
|
|
|
|
RefCountedPtr<XdsLb> parent, RefCountedPtr<XdsLocalityName> name) |
|
|
|
|
: parent_(std::move(parent)), name_(std::move(name)) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] created LocalityEntry %p for %s", |
|
|
|
|
parent_.get(), this, name_->AsHumanReadableString()); |
|
|
|
|
} |
|
|
|
|
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimerLocked, |
|
|
|
|
this, grpc_combiner_scheduler(parent_->combiner())); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
XdsLb::LocalityMap::LocalityEntry::~LocalityEntry() { |
|
|
|
@ -2245,10 +2235,15 @@ XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyLocked( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::LocalityEntry::UpdateLocked( |
|
|
|
|
ServerAddressList serverlist, |
|
|
|
|
uint32_t locality_weight, ServerAddressList serverlist, |
|
|
|
|
LoadBalancingPolicy::Config* child_policy_config, |
|
|
|
|
const grpc_channel_args* args_in) { |
|
|
|
|
if (parent_->shutting_down_) return; |
|
|
|
|
// Update locality weight.
|
|
|
|
|
locality_weight_ = locality_weight; |
|
|
|
|
if (delayed_removal_timer_callback_pending_) { |
|
|
|
|
grpc_timer_cancel(&delayed_removal_timer_); |
|
|
|
|
} |
|
|
|
|
// Construct update args.
|
|
|
|
|
UpdateArgs update_args; |
|
|
|
|
update_args.addresses = std::move(serverlist); |
|
|
|
@ -2373,6 +2368,9 @@ void XdsLb::LocalityMap::LocalityEntry::ShutdownLocked() { |
|
|
|
|
// Drop our ref to the child's picker, in case it's holding a ref to
|
|
|
|
|
// the child.
|
|
|
|
|
picker_wrapper_.reset(); |
|
|
|
|
if (delayed_removal_timer_callback_pending_) { |
|
|
|
|
grpc_timer_cancel(&delayed_removal_timer_); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::LocalityEntry::ResetBackoffLocked() { |
|
|
|
@ -2387,6 +2385,36 @@ void XdsLb::LocalityMap::LocalityEntry::Orphan() { |
|
|
|
|
Unref(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::LocalityEntry::DeactivateLocked() { |
|
|
|
|
// If locality retaining is disabled, delete the locality immediately.
|
|
|
|
|
if (parent_->locality_retention_interval_ms_ == 0) { |
|
|
|
|
parent_->locality_map_.map_.erase(name_); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// If already deactivated, don't do that again.
|
|
|
|
|
if (locality_weight_ == 0) return; |
|
|
|
|
// Set the locality weight to 0 so that future xds picker won't contain this
|
|
|
|
|
// locality.
|
|
|
|
|
locality_weight_ = 0; |
|
|
|
|
// Start a timer to delete the locality.
|
|
|
|
|
Ref(DEBUG_LOCATION, "LocalityEntry+timer").release(); |
|
|
|
|
grpc_timer_init( |
|
|
|
|
&delayed_removal_timer_, |
|
|
|
|
ExecCtx::Get()->Now() + parent_->locality_retention_interval_ms_, |
|
|
|
|
&on_delayed_removal_timer_); |
|
|
|
|
delayed_removal_timer_callback_pending_ = true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::LocalityEntry::OnDelayedRemovalTimerLocked( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
LocalityEntry* self = static_cast<LocalityEntry*>(arg); |
|
|
|
|
self->delayed_removal_timer_callback_pending_ = false; |
|
|
|
|
if (error == GRPC_ERROR_NONE && self->locality_weight_ == 0) { |
|
|
|
|
self->parent_->locality_map_.map_.erase(self->name_); |
|
|
|
|
} |
|
|
|
|
self->Unref(DEBUG_LOCATION, "LocalityEntry+timer"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// XdsLb::LocalityEntry::Helper
|
|
|
|
|
//
|
|
|
|
@ -2446,8 +2474,6 @@ void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState( |
|
|
|
|
entry_->parent_->MaybeCancelFallbackAtStartupChecks(); |
|
|
|
|
entry_->parent_->MaybeExitFallbackMode(); |
|
|
|
|
} |
|
|
|
|
// If we are in fallback mode, ignore update request from the child policy.
|
|
|
|
|
if (entry_->parent_->fallback_policy_ != nullptr) return; |
|
|
|
|
GPR_ASSERT(entry_->parent_->lb_chand_ != nullptr); |
|
|
|
|
// Cache the picker and its state in the entry.
|
|
|
|
|
entry_->picker_wrapper_ = MakeRefCounted<PickerWrapper>( |
|
|
|
|