From 70fa55155beade94670661f033e91b55f43c9dee Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 24 Mar 2022 15:49:15 -0700 Subject: [PATCH] priority LB: avoid possibility of rescheduling a timer before it fires (#29188) * priority LB: avoid possibility of rescheduling a timer before it fires * clang-format * fix memory leak * small change, just to be paranoid * inline StartFailoverTimerLocked() * initialize timer_pending_ to true * don't check shutting_down_ in timer callbacks --- .../lb_policy/priority/priority.cc | 314 ++++++++++-------- 1 file changed, 172 insertions(+), 142 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc b/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc index 257c36b4bea..d6816732a45 100644 --- a/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc +++ b/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc @@ -106,7 +106,6 @@ class PriorityLb : public LoadBalancingPolicy { void ResetBackoffLocked(); void DeactivateLocked(); void MaybeReactivateLocked(); - void MaybeCancelFailoverTimerLocked(); void Orphan() override; @@ -122,9 +121,7 @@ class PriorityLb : public LoadBalancingPolicy { return connectivity_status_; } - bool failover_timer_callback_pending() const { - return failover_timer_callback_pending_; - } + bool FailoverTimerPending() const { return failover_timer_ != nullptr; } private: // A simple wrapper for ref-counting a picker from the child policy. @@ -170,6 +167,38 @@ class PriorityLb : public LoadBalancingPolicy { RefCountedPtr priority_; }; + class DeactivationTimer : public InternallyRefCounted { + public: + explicit DeactivationTimer(RefCountedPtr child_priority); + + void Orphan() override; + + private: + static void OnTimer(void* arg, grpc_error_handle error); + void OnTimerLocked(grpc_error_handle); + + RefCountedPtr child_priority_; + grpc_timer timer_; + grpc_closure on_timer_; + bool timer_pending_ = true; + }; + + class FailoverTimer : public InternallyRefCounted { + public: + explicit FailoverTimer(RefCountedPtr child_priority); + + void Orphan() override; + + private: + static void OnTimer(void* arg, grpc_error_handle error); + void OnTimerLocked(grpc_error_handle); + + RefCountedPtr child_priority_; + grpc_timer timer_; + grpc_closure on_timer_; + bool timer_pending_ = true; + }; + // Methods for dealing with the child policy. OrphanablePtr CreateChildPolicyLocked( const grpc_channel_args* args); @@ -178,13 +207,6 @@ class PriorityLb : public LoadBalancingPolicy { grpc_connectivity_state state, const absl::Status& status, std::unique_ptr picker); - void StartFailoverTimerLocked(); - - static void OnFailoverTimer(void* arg, grpc_error_handle error); - void OnFailoverTimerLocked(grpc_error_handle error); - static void OnDeactivationTimer(void* arg, grpc_error_handle error); - void OnDeactivationTimerLocked(grpc_error_handle error); - RefCountedPtr priority_policy_; const std::string name_; bool ignore_reresolution_requests_ = false; @@ -195,15 +217,8 @@ class PriorityLb : public LoadBalancingPolicy { absl::Status connectivity_status_; RefCountedPtr picker_wrapper_; - // States for delayed removal. - grpc_timer deactivation_timer_; - grpc_closure on_deactivation_timer_; - bool deactivation_timer_callback_pending_ = false; - - // States of failover. - grpc_timer failover_timer_; - grpc_closure on_failover_timer_; - bool failover_timer_callback_pending_ = false; + OrphanablePtr deactivation_timer_; + OrphanablePtr failover_timer_; }; ~PriorityLb() override; @@ -451,7 +466,7 @@ void PriorityLb::TryNextPriorityLocked(bool report_connecting) { } // Child is not READY or IDLE. // If its failover timer is still pending, give it time to fire. - if (child->failover_timer_callback_pending()) { + if (child->FailoverTimerPending()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] priority %u, child %s: child still " @@ -501,6 +516,132 @@ void PriorityLb::SelectPriorityLocked(uint32_t priority) { child->GetPicker()); } +// +// PriorityLb::ChildPriority::DeactivationTimer +// + +PriorityLb::ChildPriority::DeactivationTimer::DeactivationTimer( + RefCountedPtr child_priority) + : child_priority_(std::move(child_priority)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { + gpr_log(GPR_INFO, + "[priority_lb %p] child %s (%p): deactivating -- will remove in " + "%" PRId64 "ms", + child_priority_->priority_policy_.get(), + child_priority_->name_.c_str(), child_priority_.get(), + kChildRetentionInterval.millis()); + } + GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr); + Ref(DEBUG_LOCATION, "Timer").release(); + grpc_timer_init(&timer_, ExecCtx::Get()->Now() + kChildRetentionInterval, + &on_timer_); +} + +void PriorityLb::ChildPriority::DeactivationTimer::Orphan() { + if (timer_pending_) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { + gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating", + child_priority_->priority_policy_.get(), + child_priority_->name_.c_str(), child_priority_.get()); + } + timer_pending_ = false; + grpc_timer_cancel(&timer_); + } + Unref(); +} + +void PriorityLb::ChildPriority::DeactivationTimer::OnTimer( + void* arg, grpc_error_handle error) { + auto* self = static_cast(arg); + (void)GRPC_ERROR_REF(error); // ref owned by lambda + self->child_priority_->priority_policy_->work_serializer()->Run( + [self, error]() { self->OnTimerLocked(error); }, DEBUG_LOCATION); +} + +void PriorityLb::ChildPriority::DeactivationTimer::OnTimerLocked( + grpc_error_handle error) { + if (error == GRPC_ERROR_NONE && timer_pending_) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { + gpr_log(GPR_INFO, + "[priority_lb %p] child %s (%p): deactivation timer fired, " + "deleting child", + child_priority_->priority_policy_.get(), + child_priority_->name_.c_str(), child_priority_.get()); + } + timer_pending_ = false; + child_priority_->priority_policy_->DeleteChild(child_priority_.get()); + } + Unref(DEBUG_LOCATION, "Timer"); + GRPC_ERROR_UNREF(error); +} + +// +// PriorityLb::ChildPriority::FailoverTimer +// + +PriorityLb::ChildPriority::FailoverTimer::FailoverTimer( + RefCountedPtr child_priority) + : child_priority_(std::move(child_priority)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { + gpr_log( + GPR_INFO, + "[priority_lb %p] child %s (%p): starting failover timer for %" PRId64 + "ms", + child_priority_->priority_policy_.get(), child_priority_->name_.c_str(), + child_priority_.get(), + child_priority_->priority_policy_->child_failover_timeout_.millis()); + } + GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr); + Ref(DEBUG_LOCATION, "Timer").release(); + grpc_timer_init( + &timer_, + ExecCtx::Get()->Now() + + child_priority_->priority_policy_->child_failover_timeout_, + &on_timer_); +} + +void PriorityLb::ChildPriority::FailoverTimer::Orphan() { + if (timer_pending_) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { + gpr_log(GPR_INFO, + "[priority_lb %p] child %s (%p): cancelling failover timer", + child_priority_->priority_policy_.get(), + child_priority_->name_.c_str(), child_priority_.get()); + } + timer_pending_ = false; + grpc_timer_cancel(&timer_); + } + Unref(); +} + +void PriorityLb::ChildPriority::FailoverTimer::OnTimer( + void* arg, grpc_error_handle error) { + auto* self = static_cast(arg); + (void)GRPC_ERROR_REF(error); // ref owned by lambda + self->child_priority_->priority_policy_->work_serializer()->Run( + [self, error]() { self->OnTimerLocked(error); }, DEBUG_LOCATION); +} + +void PriorityLb::ChildPriority::FailoverTimer::OnTimerLocked( + grpc_error_handle error) { + if (error == GRPC_ERROR_NONE && timer_pending_) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { + gpr_log(GPR_INFO, + "[priority_lb %p] child %s (%p): failover timer fired, " + "reporting TRANSIENT_FAILURE", + child_priority_->priority_policy_.get(), + child_priority_->name_.c_str(), child_priority_.get()); + } + timer_pending_ = false; + child_priority_->OnConnectivityStateUpdateLocked( + GRPC_CHANNEL_TRANSIENT_FAILURE, + absl::Status(absl::StatusCode::kUnavailable, "failover timer fired"), + nullptr); + } + Unref(DEBUG_LOCATION, "Timer"); + GRPC_ERROR_UNREF(error); +} + // // PriorityLb::ChildPriority // @@ -512,12 +653,8 @@ PriorityLb::ChildPriority::ChildPriority( gpr_log(GPR_INFO, "[priority_lb %p] creating child %s (%p)", priority_policy_.get(), name_.c_str(), this); } - GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimer, this, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&on_deactivation_timer_, OnDeactivationTimer, this, - grpc_schedule_on_exec_ctx); // Start the failover timer. - StartFailoverTimerLocked(); + failover_timer_ = MakeOrphanable(Ref()); } void PriorityLb::ChildPriority::Orphan() { @@ -525,10 +662,8 @@ void PriorityLb::ChildPriority::Orphan() { gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): orphaned", priority_policy_.get(), name_.c_str(), this); } - MaybeCancelFailoverTimerLocked(); - if (deactivation_timer_callback_pending_) { - grpc_timer_cancel(&deactivation_timer_); - } + failover_timer_.reset(); + deactivation_timer_.reset(); // Remove the child policy's interested_parties pollset_set from the // xDS policy. grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), @@ -537,9 +672,6 @@ void PriorityLb::ChildPriority::Orphan() { // Drop our ref to the child's picker, in case it's holding a ref to // the child. picker_wrapper_.reset(); - if (deactivation_timer_callback_pending_) { - grpc_timer_cancel(&deactivation_timer_); - } Unref(DEBUG_LOCATION, "ChildPriority+Orphan"); } @@ -600,9 +732,8 @@ PriorityLb::ChildPriority::CreateChildPolicyLocked( } void PriorityLb::ChildPriority::ExitIdleLocked() { - if (connectivity_state_ == GRPC_CHANNEL_IDLE && - !failover_timer_callback_pending_) { - StartFailoverTimerLocked(); + if (connectivity_state_ == GRPC_CHANNEL_IDLE && failover_timer_ == nullptr) { + failover_timer_ = MakeOrphanable(Ref()); } child_policy_->ExitIdleLocked(); } @@ -628,122 +759,21 @@ void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked( // If READY or IDLE or TRANSIENT_FAILURE, cancel failover timer. if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE || state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - MaybeCancelFailoverTimerLocked(); + failover_timer_.reset(); } // Notify the parent policy. priority_policy_->HandleChildConnectivityStateChangeLocked(this); } -void PriorityLb::ChildPriority::StartFailoverTimerLocked() { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { - gpr_log( - GPR_INFO, - "[priority_lb %p] child %s (%p): starting failover timer for %" PRId64 - "ms", - priority_policy_.get(), name_.c_str(), this, - priority_policy_->child_failover_timeout_.millis()); - } - Ref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked").release(); - grpc_timer_init( - &failover_timer_, - ExecCtx::Get()->Now() + priority_policy_->child_failover_timeout_, - &on_failover_timer_); - failover_timer_callback_pending_ = true; -} - -void PriorityLb::ChildPriority::MaybeCancelFailoverTimerLocked() { - if (failover_timer_callback_pending_) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { - gpr_log(GPR_INFO, - "[priority_lb %p] child %s (%p): cancelling failover timer", - priority_policy_.get(), name_.c_str(), this); - } - grpc_timer_cancel(&failover_timer_); - failover_timer_callback_pending_ = false; - } -} - -void PriorityLb::ChildPriority::OnFailoverTimer(void* arg, - grpc_error_handle error) { - ChildPriority* self = static_cast(arg); - (void)GRPC_ERROR_REF(error); // ref owned by lambda - self->priority_policy_->work_serializer()->Run( - [self, error]() { self->OnFailoverTimerLocked(error); }, DEBUG_LOCATION); -} - -void PriorityLb::ChildPriority::OnFailoverTimerLocked(grpc_error_handle error) { - if (error == GRPC_ERROR_NONE && failover_timer_callback_pending_ && - !priority_policy_->shutting_down_) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { - gpr_log(GPR_INFO, - "[priority_lb %p] child %s (%p): failover timer fired, " - "reporting TRANSIENT_FAILURE", - priority_policy_.get(), name_.c_str(), this); - } - failover_timer_callback_pending_ = false; - OnConnectivityStateUpdateLocked( - GRPC_CHANNEL_TRANSIENT_FAILURE, - absl::Status(absl::StatusCode::kUnavailable, "failover timer fired"), - nullptr); - } - Unref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked"); - GRPC_ERROR_UNREF(error); -} - void PriorityLb::ChildPriority::DeactivateLocked() { // If already deactivated, don't do it again. - if (deactivation_timer_callback_pending_) return; - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { - gpr_log(GPR_INFO, - "[priority_lb %p] child %s (%p): deactivating -- will remove in " - "%" PRId64 "ms.", - priority_policy_.get(), name_.c_str(), this, - kChildRetentionInterval.millis()); - } - MaybeCancelFailoverTimerLocked(); - // Start a timer to delete the child. - Ref(DEBUG_LOCATION, "ChildPriority+timer").release(); - grpc_timer_init(&deactivation_timer_, - ExecCtx::Get()->Now() + kChildRetentionInterval, - &on_deactivation_timer_); - deactivation_timer_callback_pending_ = true; + if (deactivation_timer_ != nullptr) return; + failover_timer_.reset(); + deactivation_timer_ = MakeOrphanable(Ref()); } void PriorityLb::ChildPriority::MaybeReactivateLocked() { - if (deactivation_timer_callback_pending_) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { - gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating", - priority_policy_.get(), name_.c_str(), this); - } - deactivation_timer_callback_pending_ = false; - grpc_timer_cancel(&deactivation_timer_); - } -} - -void PriorityLb::ChildPriority::OnDeactivationTimer(void* arg, - grpc_error_handle error) { - ChildPriority* self = static_cast(arg); - (void)GRPC_ERROR_REF(error); // ref owned by lambda - self->priority_policy_->work_serializer()->Run( - [self, error]() { self->OnDeactivationTimerLocked(error); }, - DEBUG_LOCATION); -} - -void PriorityLb::ChildPriority::OnDeactivationTimerLocked( - grpc_error_handle error) { - if (error == GRPC_ERROR_NONE && deactivation_timer_callback_pending_ && - !priority_policy_->shutting_down_) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { - gpr_log(GPR_INFO, - "[priority_lb %p] child %s (%p): deactivation timer fired, " - "deleting child", - priority_policy_.get(), name_.c_str(), this); - } - deactivation_timer_callback_pending_ = false; - priority_policy_->DeleteChild(this); - } - Unref(DEBUG_LOCATION, "ChildPriority+timer"); - GRPC_ERROR_UNREF(error); + deactivation_timer_.reset(); } //