From 4c47f6543c2fdcd7383c8b1d97e84614e575cb76 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 14 Jan 2020 17:31:14 -0800 Subject: [PATCH] Xds Reviewer comments --- .../client_channel/lb_policy/xds/xds.cc | 110 ++++++++---------- 1 file changed, 51 insertions(+), 59 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 94c2d93cff6..b39ca325a0c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -261,7 +261,7 @@ class XdsLb : public LoadBalancingPolicy { const grpc_channel_args* args); static void OnDelayedRemovalTimer(void* arg, grpc_error* error); - static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); + void OnDelayedRemovalTimerLocked(grpc_error* error); XdsLb* xds_policy() const { return locality_map_->xds_policy(); } @@ -312,8 +312,8 @@ class XdsLb : public LoadBalancingPolicy { void UpdateConnectivityStateLocked(); static void OnDelayedRemovalTimer(void* arg, grpc_error* error); static void OnFailoverTimer(void* arg, grpc_error* error); - static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); - static void OnFailoverTimerLocked(void* arg, grpc_error* error); + void OnDelayedRemovalTimerLocked(grpc_error* error); + void OnFailoverTimerLocked(grpc_error* error); PriorityList* priority_list() const { return &xds_policy_->priority_list_; @@ -386,7 +386,7 @@ class XdsLb : public LoadBalancingPolicy { // Methods for dealing with fallback state. void MaybeCancelFallbackAtStartupChecks(); static void OnFallbackTimer(void* arg, grpc_error* error); - static void OnFallbackTimerLocked(void* arg, grpc_error* error); + void OnFallbackTimerLocked(grpc_error* error); void UpdateFallbackPolicyLocked(); OrphanablePtr CreateFallbackPolicyLocked( const char* name, const grpc_channel_args* args); @@ -699,6 +699,9 @@ XdsLb::XdsLb(Args args) gpr_log(GPR_INFO, "[xdslb %p] Using xds client %p from channel", this, xds_client_from_channel_.get()); } + // Closure Initialization + GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimer, this, + grpc_schedule_on_exec_ctx); // Record server name. const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); const char* server_uri = grpc_channel_arg_get_string(arg); @@ -812,8 +815,6 @@ void XdsLb::UpdateLocked(UpdateArgs args) { // Start fallback-at-startup checks. grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Held by closure - GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimer, this, - grpc_schedule_on_exec_ctx); fallback_at_startup_checks_pending_ = true; grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); } @@ -870,28 +871,26 @@ void XdsLb::MaybeCancelFallbackAtStartupChecks() { void XdsLb::OnFallbackTimer(void* arg, grpc_error* error) { XdsLb* xdslb_policy = static_cast(arg); + GRPC_ERROR_REF(error); // ref owned by lambda xdslb_policy->logical_thread()->Run( - Closure::ToFunction(GRPC_CLOSURE_INIT(&xdslb_policy->lb_on_fallback_, - &XdsLb::OnFallbackTimerLocked, - xdslb_policy, nullptr), - GRPC_ERROR_REF(error)), + [xdslb_policy, error]() { xdslb_policy->OnFallbackTimerLocked(error); }, DEBUG_LOCATION); } -void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { - XdsLb* xdslb_policy = static_cast(arg); +void XdsLb::OnFallbackTimerLocked(grpc_error* error) { // If some fallback-at-startup check is done after the timer fires but before // this callback actually runs, don't fall back. - if (xdslb_policy->fallback_at_startup_checks_pending_ && - !xdslb_policy->shutting_down_ && error == GRPC_ERROR_NONE) { + if (fallback_at_startup_checks_pending_ && !shutting_down_ && + error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "[xdslb %p] Child policy not ready after fallback timeout; " "entering fallback mode", - xdslb_policy); - xdslb_policy->fallback_at_startup_checks_pending_ = false; - xdslb_policy->UpdateFallbackPolicyLocked(); + this); + fallback_at_startup_checks_pending_ = false; + UpdateFallbackPolicyLocked(); } - xdslb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); + Unref(DEBUG_LOCATION, "on_fallback_timer"); + GRPC_ERROR_UNREF(error); } void XdsLb::UpdateFallbackPolicyLocked() { @@ -1158,7 +1157,9 @@ XdsLb::PriorityList::LocalityMap::LocalityMap(RefCountedPtr xds_policy, gpr_log(GPR_INFO, "[xdslb %p] Creating priority %" PRIu32, xds_policy_.get(), priority_); } - + // Closure Initialization + GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this, + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimer, this, grpc_schedule_on_exec_ctx); // Start the failover timer. @@ -1276,8 +1277,6 @@ void XdsLb::PriorityList::LocalityMap::DeactivateLocked() { xds_policy(), priority_, xds_policy()->locality_retention_interval_ms_); } - GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this, - grpc_schedule_on_exec_ctx); grpc_timer_init( &delayed_removal_timer_, ExecCtx::Get()->Now() + xds_policy()->locality_retention_interval_ms_, @@ -1409,22 +1408,18 @@ void XdsLb::PriorityList::LocalityMap::UpdateConnectivityStateLocked() { void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimer( void* arg, grpc_error* error) { LocalityMap* self = static_cast(arg); + GRPC_ERROR_REF(error); // ref owned by lambda self->xds_policy_->logical_thread()->Run( - Closure::ToFunction( - GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_, - OnDelayedRemovalTimerLocked, self, nullptr), - GRPC_ERROR_REF(error)), + [self, error]() { self->OnDelayedRemovalTimerLocked(error); }, DEBUG_LOCATION); } void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked( - void* arg, grpc_error* error) { - LocalityMap* self = static_cast(arg); - self->delayed_removal_timer_callback_pending_ = false; - if (error == GRPC_ERROR_NONE && !self->xds_policy_->shutting_down_) { - auto* priority_list = self->priority_list(); - const bool keep = self->priority_list_update().Contains(self->priority_) && - self->priority_ <= priority_list->current_priority(); + grpc_error* error) { + delayed_removal_timer_callback_pending_ = false; + if (error == GRPC_ERROR_NONE && !xds_policy_->shutting_down_) { + const bool keep = priority_list_update().Contains(priority_) && + priority_ <= priority_list()->current_priority(); if (!keep) { // This check is to make sure we always delete the locality maps from // the lowest priority even if the closures of the back-to-back timers @@ -1433,39 +1428,37 @@ void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked( // deactivated locality maps when out-of-order closures are run. // TODO(juanlishen): Check the timer implementation to see if this // defense is necessary. - if (self->priority_ == priority_list->LowestPriority()) { - priority_list->priorities_.pop_back(); + if (priority_ == priority_list()->LowestPriority()) { + priority_list()->priorities_.pop_back(); } else { gpr_log(GPR_ERROR, "[xdslb %p] Priority %" PRIu32 " is not the lowest priority (highest numeric value) but is " "attempted to be deleted.", - self->xds_policy(), self->priority_); + xds_policy(), priority_); } } } - self->Unref(DEBUG_LOCATION, "LocalityMap+timer"); + Unref(DEBUG_LOCATION, "LocalityMap+timer"); + GRPC_ERROR_UNREF(error); } void XdsLb::PriorityList::LocalityMap::OnFailoverTimer(void* arg, grpc_error* error) { LocalityMap* self = static_cast(arg); + GRPC_ERROR_REF(error); // ref owned by lambda self->xds_policy_->logical_thread()->Run( - Closure::ToFunction( - GRPC_CLOSURE_INIT(&self->on_failover_timer_, OnFailoverTimerLocked, - self, nullptr), - GRPC_ERROR_REF(error)), - DEBUG_LOCATION); + [self, error]() { self->OnFailoverTimerLocked(error); }, DEBUG_LOCATION); } void XdsLb::PriorityList::LocalityMap::OnFailoverTimerLocked( - void* arg, grpc_error* error) { - LocalityMap* self = static_cast(arg); - self->failover_timer_callback_pending_ = false; - if (error == GRPC_ERROR_NONE && !self->xds_policy_->shutting_down_) { - self->priority_list()->FailoverOnConnectionFailureLocked(); + grpc_error* error) { + failover_timer_callback_pending_ = false; + if (error == GRPC_ERROR_NONE && !xds_policy_->shutting_down_) { + priority_list()->FailoverOnConnectionFailureLocked(); } - self->Unref(DEBUG_LOCATION, "LocalityMap+OnFailoverTimerLocked"); + Unref(DEBUG_LOCATION, "LocalityMap+OnFailoverTimerLocked"); + GRPC_ERROR_UNREF(error); } // @@ -1480,6 +1473,9 @@ XdsLb::PriorityList::LocalityMap::Locality::Locality( gpr_log(GPR_INFO, "[xdslb %p] created Locality %p for %s", xds_policy(), this, name_->AsHumanReadableString()); } + // Closure Initialization + GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this, + grpc_schedule_on_exec_ctx); } XdsLb::PriorityList::LocalityMap::Locality::~Locality() { @@ -1696,8 +1692,6 @@ void XdsLb::PriorityList::LocalityMap::Locality::DeactivateLocked() { weight_ = 0; // Start a timer to delete the locality. Ref(DEBUG_LOCATION, "Locality+timer").release(); - GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this, - grpc_schedule_on_exec_ctx); grpc_timer_init( &delayed_removal_timer_, ExecCtx::Get()->Now() + xds_policy()->locality_retention_interval_ms_, @@ -1708,22 +1702,20 @@ void XdsLb::PriorityList::LocalityMap::Locality::DeactivateLocked() { void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimer( void* arg, grpc_error* error) { Locality* self = static_cast(arg); + GRPC_ERROR_REF(error); // ref owned by lambda self->xds_policy()->logical_thread()->Run( - Closure::ToFunction( - GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_, - OnDelayedRemovalTimerLocked, self, nullptr), - GRPC_ERROR_REF(error)), + [self, error]() { self->OnDelayedRemovalTimerLocked(error); }, DEBUG_LOCATION); } void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimerLocked( - void* arg, grpc_error* error) { - Locality* self = static_cast(arg); - self->delayed_removal_timer_callback_pending_ = false; - if (error == GRPC_ERROR_NONE && !self->shutdown_ && self->weight_ == 0) { - self->locality_map_->localities_.erase(self->name_); + grpc_error* error) { + delayed_removal_timer_callback_pending_ = false; + if (error == GRPC_ERROR_NONE && !shutdown_ && weight_ == 0) { + locality_map_->localities_.erase(name_); } - self->Unref(DEBUG_LOCATION, "Locality+timer"); + Unref(DEBUG_LOCATION, "Locality+timer"); + GRPC_ERROR_UNREF(error); } //