|
|
|
@ -261,7 +261,7 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
const grpc_channel_args* args); |
|
|
|
|
|
|
|
|
|
static void OnDelayedRemovalTimer(void* arg, grpc_error* error); |
|
|
|
|
static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); |
|
|
|
|
void OnDelayedRemovalTimerLocked(grpc_error* error); |
|
|
|
|
|
|
|
|
|
XdsLb* xds_policy() const { return locality_map_->xds_policy(); } |
|
|
|
|
|
|
|
|
@ -312,8 +312,8 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
void UpdateConnectivityStateLocked(); |
|
|
|
|
static void OnDelayedRemovalTimer(void* arg, grpc_error* error); |
|
|
|
|
static void OnFailoverTimer(void* arg, grpc_error* error); |
|
|
|
|
static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); |
|
|
|
|
static void OnFailoverTimerLocked(void* arg, grpc_error* error); |
|
|
|
|
void OnDelayedRemovalTimerLocked(grpc_error* error); |
|
|
|
|
void OnFailoverTimerLocked(grpc_error* error); |
|
|
|
|
|
|
|
|
|
PriorityList* priority_list() const { |
|
|
|
|
return &xds_policy_->priority_list_; |
|
|
|
@ -386,7 +386,7 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
// Methods for dealing with fallback state.
|
|
|
|
|
void MaybeCancelFallbackAtStartupChecks(); |
|
|
|
|
static void OnFallbackTimer(void* arg, grpc_error* error); |
|
|
|
|
static void OnFallbackTimerLocked(void* arg, grpc_error* error); |
|
|
|
|
void OnFallbackTimerLocked(grpc_error* error); |
|
|
|
|
void UpdateFallbackPolicyLocked(); |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> CreateFallbackPolicyLocked( |
|
|
|
|
const char* name, const grpc_channel_args* args); |
|
|
|
@ -699,6 +699,9 @@ XdsLb::XdsLb(Args args) |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] Using xds client %p from channel", this, |
|
|
|
|
xds_client_from_channel_.get()); |
|
|
|
|
} |
|
|
|
|
// Closure Initialization
|
|
|
|
|
GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimer, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
// Record server name.
|
|
|
|
|
const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); |
|
|
|
|
const char* server_uri = grpc_channel_arg_get_string(arg); |
|
|
|
@ -812,8 +815,6 @@ void XdsLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
// Start fallback-at-startup checks.
|
|
|
|
|
grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; |
|
|
|
|
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Held by closure
|
|
|
|
|
GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimer, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
fallback_at_startup_checks_pending_ = true; |
|
|
|
|
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); |
|
|
|
|
} |
|
|
|
@ -870,28 +871,26 @@ void XdsLb::MaybeCancelFallbackAtStartupChecks() { |
|
|
|
|
|
|
|
|
|
void XdsLb::OnFallbackTimer(void* arg, grpc_error* error) { |
|
|
|
|
XdsLb* xdslb_policy = static_cast<XdsLb*>(arg); |
|
|
|
|
GRPC_ERROR_REF(error); // ref owned by lambda
|
|
|
|
|
xdslb_policy->logical_thread()->Run( |
|
|
|
|
Closure::ToFunction(GRPC_CLOSURE_INIT(&xdslb_policy->lb_on_fallback_, |
|
|
|
|
&XdsLb::OnFallbackTimerLocked, |
|
|
|
|
xdslb_policy, nullptr), |
|
|
|
|
GRPC_ERROR_REF(error)), |
|
|
|
|
[xdslb_policy, error]() { xdslb_policy->OnFallbackTimerLocked(error); }, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { |
|
|
|
|
XdsLb* xdslb_policy = static_cast<XdsLb*>(arg); |
|
|
|
|
void XdsLb::OnFallbackTimerLocked(grpc_error* error) { |
|
|
|
|
// If some fallback-at-startup check is done after the timer fires but before
|
|
|
|
|
// this callback actually runs, don't fall back.
|
|
|
|
|
if (xdslb_policy->fallback_at_startup_checks_pending_ && |
|
|
|
|
!xdslb_policy->shutting_down_ && error == GRPC_ERROR_NONE) { |
|
|
|
|
if (fallback_at_startup_checks_pending_ && !shutting_down_ && |
|
|
|
|
error == GRPC_ERROR_NONE) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Child policy not ready after fallback timeout; " |
|
|
|
|
"entering fallback mode", |
|
|
|
|
xdslb_policy); |
|
|
|
|
xdslb_policy->fallback_at_startup_checks_pending_ = false; |
|
|
|
|
xdslb_policy->UpdateFallbackPolicyLocked(); |
|
|
|
|
this); |
|
|
|
|
fallback_at_startup_checks_pending_ = false; |
|
|
|
|
UpdateFallbackPolicyLocked(); |
|
|
|
|
} |
|
|
|
|
xdslb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); |
|
|
|
|
Unref(DEBUG_LOCATION, "on_fallback_timer"); |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::UpdateFallbackPolicyLocked() { |
|
|
|
@ -1158,7 +1157,9 @@ XdsLb::PriorityList::LocalityMap::LocalityMap(RefCountedPtr<XdsLb> xds_policy, |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] Creating priority %" PRIu32, |
|
|
|
|
xds_policy_.get(), priority_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Closure Initialization
|
|
|
|
|
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimer, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
// Start the failover timer.
|
|
|
|
@ -1276,8 +1277,6 @@ void XdsLb::PriorityList::LocalityMap::DeactivateLocked() { |
|
|
|
|
xds_policy(), priority_, |
|
|
|
|
xds_policy()->locality_retention_interval_ms_); |
|
|
|
|
} |
|
|
|
|
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
grpc_timer_init( |
|
|
|
|
&delayed_removal_timer_, |
|
|
|
|
ExecCtx::Get()->Now() + xds_policy()->locality_retention_interval_ms_, |
|
|
|
@ -1409,22 +1408,18 @@ void XdsLb::PriorityList::LocalityMap::UpdateConnectivityStateLocked() { |
|
|
|
|
void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimer( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
LocalityMap* self = static_cast<LocalityMap*>(arg); |
|
|
|
|
GRPC_ERROR_REF(error); // ref owned by lambda
|
|
|
|
|
self->xds_policy_->logical_thread()->Run( |
|
|
|
|
Closure::ToFunction( |
|
|
|
|
GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_, |
|
|
|
|
OnDelayedRemovalTimerLocked, self, nullptr), |
|
|
|
|
GRPC_ERROR_REF(error)), |
|
|
|
|
[self, error]() { self->OnDelayedRemovalTimerLocked(error); }, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
LocalityMap* self = static_cast<LocalityMap*>(arg); |
|
|
|
|
self->delayed_removal_timer_callback_pending_ = false; |
|
|
|
|
if (error == GRPC_ERROR_NONE && !self->xds_policy_->shutting_down_) { |
|
|
|
|
auto* priority_list = self->priority_list(); |
|
|
|
|
const bool keep = self->priority_list_update().Contains(self->priority_) && |
|
|
|
|
self->priority_ <= priority_list->current_priority(); |
|
|
|
|
grpc_error* error) { |
|
|
|
|
delayed_removal_timer_callback_pending_ = false; |
|
|
|
|
if (error == GRPC_ERROR_NONE && !xds_policy_->shutting_down_) { |
|
|
|
|
const bool keep = priority_list_update().Contains(priority_) && |
|
|
|
|
priority_ <= priority_list()->current_priority(); |
|
|
|
|
if (!keep) { |
|
|
|
|
// This check is to make sure we always delete the locality maps from
|
|
|
|
|
// the lowest priority even if the closures of the back-to-back timers
|
|
|
|
@ -1433,39 +1428,37 @@ void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked( |
|
|
|
|
// deactivated locality maps when out-of-order closures are run.
|
|
|
|
|
// TODO(juanlishen): Check the timer implementation to see if this
|
|
|
|
|
// defense is necessary.
|
|
|
|
|
if (self->priority_ == priority_list->LowestPriority()) { |
|
|
|
|
priority_list->priorities_.pop_back(); |
|
|
|
|
if (priority_ == priority_list()->LowestPriority()) { |
|
|
|
|
priority_list()->priorities_.pop_back(); |
|
|
|
|
} else { |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"[xdslb %p] Priority %" PRIu32 |
|
|
|
|
" is not the lowest priority (highest numeric value) but is " |
|
|
|
|
"attempted to be deleted.", |
|
|
|
|
self->xds_policy(), self->priority_); |
|
|
|
|
xds_policy(), priority_); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
self->Unref(DEBUG_LOCATION, "LocalityMap+timer"); |
|
|
|
|
Unref(DEBUG_LOCATION, "LocalityMap+timer"); |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::PriorityList::LocalityMap::OnFailoverTimer(void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
LocalityMap* self = static_cast<LocalityMap*>(arg); |
|
|
|
|
GRPC_ERROR_REF(error); // ref owned by lambda
|
|
|
|
|
self->xds_policy_->logical_thread()->Run( |
|
|
|
|
Closure::ToFunction( |
|
|
|
|
GRPC_CLOSURE_INIT(&self->on_failover_timer_, OnFailoverTimerLocked, |
|
|
|
|
self, nullptr), |
|
|
|
|
GRPC_ERROR_REF(error)), |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
[self, error]() { self->OnFailoverTimerLocked(error); }, DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::PriorityList::LocalityMap::OnFailoverTimerLocked( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
LocalityMap* self = static_cast<LocalityMap*>(arg); |
|
|
|
|
self->failover_timer_callback_pending_ = false; |
|
|
|
|
if (error == GRPC_ERROR_NONE && !self->xds_policy_->shutting_down_) { |
|
|
|
|
self->priority_list()->FailoverOnConnectionFailureLocked(); |
|
|
|
|
grpc_error* error) { |
|
|
|
|
failover_timer_callback_pending_ = false; |
|
|
|
|
if (error == GRPC_ERROR_NONE && !xds_policy_->shutting_down_) { |
|
|
|
|
priority_list()->FailoverOnConnectionFailureLocked(); |
|
|
|
|
} |
|
|
|
|
self->Unref(DEBUG_LOCATION, "LocalityMap+OnFailoverTimerLocked"); |
|
|
|
|
Unref(DEBUG_LOCATION, "LocalityMap+OnFailoverTimerLocked"); |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -1480,6 +1473,9 @@ XdsLb::PriorityList::LocalityMap::Locality::Locality( |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] created Locality %p for %s", xds_policy(), |
|
|
|
|
this, name_->AsHumanReadableString()); |
|
|
|
|
} |
|
|
|
|
// Closure Initialization
|
|
|
|
|
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
XdsLb::PriorityList::LocalityMap::Locality::~Locality() { |
|
|
|
@ -1696,8 +1692,6 @@ void XdsLb::PriorityList::LocalityMap::Locality::DeactivateLocked() { |
|
|
|
|
weight_ = 0; |
|
|
|
|
// Start a timer to delete the locality.
|
|
|
|
|
Ref(DEBUG_LOCATION, "Locality+timer").release(); |
|
|
|
|
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
grpc_timer_init( |
|
|
|
|
&delayed_removal_timer_, |
|
|
|
|
ExecCtx::Get()->Now() + xds_policy()->locality_retention_interval_ms_, |
|
|
|
@ -1708,22 +1702,20 @@ void XdsLb::PriorityList::LocalityMap::Locality::DeactivateLocked() { |
|
|
|
|
void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimer( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
Locality* self = static_cast<Locality*>(arg); |
|
|
|
|
GRPC_ERROR_REF(error); // ref owned by lambda
|
|
|
|
|
self->xds_policy()->logical_thread()->Run( |
|
|
|
|
Closure::ToFunction( |
|
|
|
|
GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_, |
|
|
|
|
OnDelayedRemovalTimerLocked, self, nullptr), |
|
|
|
|
GRPC_ERROR_REF(error)), |
|
|
|
|
[self, error]() { self->OnDelayedRemovalTimerLocked(error); }, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimerLocked( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
Locality* self = static_cast<Locality*>(arg); |
|
|
|
|
self->delayed_removal_timer_callback_pending_ = false; |
|
|
|
|
if (error == GRPC_ERROR_NONE && !self->shutdown_ && self->weight_ == 0) { |
|
|
|
|
self->locality_map_->localities_.erase(self->name_); |
|
|
|
|
grpc_error* error) { |
|
|
|
|
delayed_removal_timer_callback_pending_ = false; |
|
|
|
|
if (error == GRPC_ERROR_NONE && !shutdown_ && weight_ == 0) { |
|
|
|
|
locality_map_->localities_.erase(name_); |
|
|
|
|
} |
|
|
|
|
self->Unref(DEBUG_LOCATION, "Locality+timer"); |
|
|
|
|
Unref(DEBUG_LOCATION, "Locality+timer"); |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|