From b1c1309bfcbe7d8834965169d9e527528d477c2e Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 25 Apr 2018 12:39:45 -0700 Subject: [PATCH] Clean up refcounting. --- .../lb_policy/pick_first/pick_first.cc | 66 ++++--------------- .../lb_policy/round_robin/round_robin.cc | 40 +---------- .../lb_policy/subchannel_list.h | 29 +++++--- 3 files changed, 33 insertions(+), 102 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 24a0c83b1a6..0883a4ed6c3 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -90,9 +90,6 @@ class PickFirst : public LoadBalancingPolicy { const grpc_channel_args& args) : SubchannelList(policy, tracer, addresses, combiner, client_channel_factory, args) {} - - void RefForConnectivityWatch(const char* reason); - void UnrefForConnectivityWatch(const char* reason); }; void ShutdownLocked() override; @@ -220,9 +217,7 @@ void PickFirst::StartPickingLocked() { if (subchannel_list_ != nullptr) { for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) { if (subchannel_list_->subchannel(i)->subchannel() != nullptr) { - subchannel_list_->RefForConnectivityWatch( - "connectivity_watch+start_picking"); - subchannel_list_->subchannel(i)->StartConnectivityWatchLocked(); + subchannel_list_->subchannel(i)->StartOrRenewConnectivityWatchLocked(); break; } } @@ -332,8 +327,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { // If we've started picking, start trying to connect to the first // subchannel in the new list. if (started_picking_) { - subchannel_list_->RefForConnectivityWatch("connectivity_watch+update"); - subchannel_list_->subchannel(0)->StartConnectivityWatchLocked(); + subchannel_list_->subchannel(0)->StartOrRenewConnectivityWatchLocked(); } } else { // We do have a selected subchannel. @@ -359,9 +353,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { } subchannel_list_ = std::move(subchannel_list); DestroyUnselectedSubchannelsLocked(); - subchannel_list_->RefForConnectivityWatch( - "connectivity_watch+replace_selected"); - sd->StartConnectivityWatchLocked(); + sd->StartOrRenewConnectivityWatchLocked(); // If there was a previously pending update (which may or may // not have contained the currently selected subchannel), drop // it, so that it doesn't override what we've done here. @@ -391,35 +383,12 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { // If we've started picking, start trying to connect to the first // subchannel in the new list. if (started_picking_) { - latest_pending_subchannel_list_->RefForConnectivityWatch( - "connectivity_watch+update"); latest_pending_subchannel_list_->subchannel(0) - ->StartConnectivityWatchLocked(); + ->StartOrRenewConnectivityWatchLocked(); } } } -void PickFirst::PickFirstSubchannelList::RefForConnectivityWatch( - const char* reason) { - // TODO(roth): We currently track these refs manually. Once the new - // ClosureRef API is ready, find a way to pass the RefCountedPtr<> - // along with the closures instead of doing this manually. - // Ref subchannel list. - Ref(DEBUG_LOCATION, reason).release(); - // Ref LB policy. - PickFirst* p = static_cast(policy()); - p->Ref(DEBUG_LOCATION, reason).release(); -} - -void PickFirst::PickFirstSubchannelList::UnrefForConnectivityWatch( - const char* reason) { - // Unref LB policy. - PickFirst* p = static_cast(policy()); - p->Unref(DEBUG_LOCATION, reason); - // Unref subchannel list. - Unref(DEBUG_LOCATION, reason); -} - void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( grpc_error* error) { PickFirst* p = static_cast(subchannel_list()->policy()); @@ -434,17 +403,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( grpc_connectivity_state_name(connectivity_state()), p->shutdown_, subchannel_list()->shutting_down(), grpc_error_string(error)); } -// FIXME: move this to SubchannelData::OnConnectivityChangedLocked() - // If the subchannel list is shutting down, stop watching. - if (subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) { - StopConnectivityWatchLocked(); - UnrefSubchannelLocked("pf_sl_shutdown"); - subchannel_list()->UnrefForConnectivityWatch("pf_sl_shutdown"); - GRPC_ERROR_UNREF(error); - return; - } - // If we're still here, the notification must be for a subchannel in - // either the current or latest pending subchannel lists. + // The notification must be for a subchannel in either the current or + // latest pending subchannel lists. GPR_ASSERT(p->subchannel_list_ == subchannel_list() || p->latest_pending_subchannel_list_ == subchannel_list()); // Handle updates for the currently selected subchannel. @@ -455,8 +415,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( p->latest_pending_subchannel_list_ != nullptr) { p->selected_ = nullptr; StopConnectivityWatchLocked(); - subchannel_list()->UnrefForConnectivityWatch( - "selected_not_ready+switch_to_update"); subchannel_list()->ShutdownLocked("selected_not_ready+switch_to_update"); p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); grpc_connectivity_state_set( @@ -478,14 +436,13 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE); // In transient failure. Rely on re-resolution to recover. p->selected_ = nullptr; - StopConnectivityWatchLocked(); - subchannel_list()->UnrefForConnectivityWatch("pf_selected_shutdown"); UnrefSubchannelLocked("pf_selected_shutdown"); + StopConnectivityWatchLocked(); } else { grpc_connectivity_state_set(&p->state_tracker_, connectivity_state(), GRPC_ERROR_REF(error), "selected_changed"); // Renew notification. - StartConnectivityWatchLocked(); + StartOrRenewConnectivityWatchLocked(); } } GRPC_ERROR_UNREF(error); @@ -533,7 +490,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } // Renew notification. - StartConnectivityWatchLocked(); + StartOrRenewConnectivityWatchLocked(); break; } case GRPC_CHANNEL_TRANSIENT_FAILURE: { @@ -551,8 +508,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "connecting_transient_failure"); } - // Reuses the connectivity refs from the previous watch. - sd->StartConnectivityWatchLocked(); + sd->StartOrRenewConnectivityWatchLocked(); break; } case GRPC_CHANNEL_CONNECTING: @@ -564,7 +520,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( "connecting_changed"); } // Renew notification. - StartConnectivityWatchLocked(); + StartOrRenewConnectivityWatchLocked(); break; } case GRPC_CHANNEL_SHUTDOWN: diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 889616e0561..2b2c0e51327 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -134,10 +134,6 @@ class RoundRobin : public LoadBalancingPolicy { GRPC_ERROR_UNREF(last_transient_failure_error_); } - // Manages references for connectivity watches. - void RefForConnectivityWatch(const char* reason); - void UnrefForConnectivityWatch(const char* reason); - // Starts watching the subchannels in this list. void StartWatchingLocked(); @@ -377,6 +373,7 @@ bool RoundRobin::DoPickLocked(PickState* pick) { /* readily available, report right away */ RoundRobinSubchannelData* sd = subchannel_list_->subchannel(next_ready_index); + GPR_ASSERT(sd->connected_subchannel() != nullptr); pick->connected_subchannel = sd->connected_subchannel()->Ref(); if (pick->user_data != nullptr) { *pick->user_data = sd->user_data(); @@ -422,27 +419,6 @@ bool RoundRobin::PickLocked(PickState* pick) { return false; } -void RoundRobin::RoundRobinSubchannelList::RefForConnectivityWatch( - const char* reason) { - // TODO(roth): We currently track these refs manually. Once the new - // ClosureRef API is ready, find a way to pass the RefCountedPtr<> - // along with the closures instead of doing this manually. - // Ref subchannel list. - Ref(DEBUG_LOCATION, reason).release(); - // Ref LB policy. - RoundRobin* p = static_cast(policy()); - p->Ref(DEBUG_LOCATION, reason).release(); -} - -void RoundRobin::RoundRobinSubchannelList::UnrefForConnectivityWatch( - const char* reason) { - // Unref LB policy. - RoundRobin* p = static_cast(policy()); - p->Unref(DEBUG_LOCATION, reason); - // Unref subchannel list. - Unref(DEBUG_LOCATION, reason); -} - void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() { if (num_subchannels() == 0) return; // Check current state of each subchannel synchronously, since any @@ -474,8 +450,7 @@ void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() { // Start connectivity watch for each subchannel. for (size_t i = 0; i < num_subchannels(); i++) { if (subchannel(i)->subchannel() != nullptr) { - RefForConnectivityWatch("connectivity_watch"); - subchannel(i)->StartConnectivityWatchLocked(); + subchannel(i)->StartOrRenewConnectivityWatchLocked(); } } } @@ -597,15 +572,6 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( subchannel_list()->shutting_down(), grpc_error_string(error)); } GPR_ASSERT(subchannel() != nullptr); -// FIXME: move this to SubchannelData::OnConnectivityChangedLocked() - // If the subchannel list is shutting down, stop watching. - if (subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) { - StopConnectivityWatchLocked(); - UnrefSubchannelLocked("rr_sl_shutdown"); - subchannel_list()->UnrefForConnectivityWatch("rr_sl_shutdown"); - GRPC_ERROR_UNREF(error); - return; - } // If the new state is TRANSIENT_FAILURE, re-resolve. // Only do this if we've started watching, not at startup time. // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE @@ -628,7 +594,7 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( // If we've started watching, update overall state and renew notification. if (subchannel_list()->started_watching()) { subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked(); - StartConnectivityWatchLocked(); + StartOrRenewConnectivityWatchLocked(); } GRPC_ERROR_UNREF(error); } diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index b88719b7473..8b843b16c00 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -109,8 +109,8 @@ class SubchannelData { // Synchronously checks the subchannel's connectivity state. Calls // ProcessConnectivityChangeLocked() if the state has changed. // Must not be called while there is a connectivity notification - // pending (i.e., between calling StartConnectivityWatchLocked() and - // the resulting invocation of ProcessConnectivityChangeLocked()). + // pending (i.e., between calling StartOrRenewConnectivityWatchLocked() + // and the resulting invocation of ProcessConnectivityChangeLocked()). void CheckConnectivityStateLocked() { GPR_ASSERT(!connectivity_notification_pending_); grpc_error* error = GRPC_ERROR_NONE; @@ -133,15 +133,15 @@ class SubchannelData { // Starts or renewes watching the connectivity state of the subchannel. // ProcessConnectivityChangeLocked() will be called when the // connectivity state changes. - void StartConnectivityWatchLocked(); + void StartOrRenewConnectivityWatchLocked(); // Stops watching the connectivity state of the subchannel. void StopConnectivityWatchLocked(); // Cancels watching the connectivity state of the subchannel. // Must be called only while there is a connectivity notification - // pending (i.e., between calling StartConnectivityWatchLocked() and - // the resulting invocation of ProcessConnectivityChangeLocked()). + // pending (i.e., between calling StartOrRenewConnectivityWatchLocked() + // and the resulting invocation of ProcessConnectivityChangeLocked()). // From within ProcessConnectivityChangeLocked(), use // StopConnectivityWatchLocked() instead. void CancelConnectivityWatchLocked(const char* reason); @@ -159,8 +159,8 @@ class SubchannelData { virtual ~SubchannelData(); - // After StartConnectivityWatchLocked() is called, this method will be - // invoked when the subchannel's connectivity state changes. + // After StartOrRenewConnectivityWatchLocked() is called, this method + // will be invoked when the subchannel's connectivity state changes. // Implementations can use connectivity_state() to get the new // connectivity state. // Implementations must invoke either StopConnectivityWatch() or again @@ -302,7 +302,7 @@ void SubchannelData:: template void SubchannelData::StartConnectivityWatchLocked() { + SubchannelDataType>::StartOrRenewConnectivityWatchLocked() { if (subchannel_list_->tracer()->enabled()) { gpr_log(GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR @@ -313,7 +313,10 @@ void SubchannelDataRef(DEBUG_LOCATION, "connectivity_watch").release(); + connectivity_notification_pending_ = true; + } grpc_subchannel_notify_on_state_change( subchannel_, subchannel_list_->policy()->interested_parties(), &pending_connectivity_state_unsafe_, &connectivity_changed_closure_); @@ -332,6 +335,7 @@ void SubchannelDataUnref(DEBUG_LOCATION, "connectivity_watch"); } template @@ -387,9 +391,14 @@ void SubchannelData:: OnConnectivityChangedLocked(void* arg, grpc_error* error) { SubchannelData* sd = static_cast(arg); // FIXME: add trace logging + if (sd->subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) { + sd->UnrefSubchannelLocked("connectivity_shutdown"); + sd->StopConnectivityWatchLocked(); + return; + } if (!sd->UpdateConnectedSubchannelLocked()) { // We don't want to report this connectivity state, so renew the watch. - sd->StartConnectivityWatchLocked(); + sd->StartOrRenewConnectivityWatchLocked(); return; } // Now that we're inside the combiner, copy the pending connectivity