diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 6506dc99d61..e80e473f338 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -211,7 +211,7 @@ void PickFirst::StartPickingLocked() { if (subchannel_list_ != nullptr) { for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) { if (subchannel_list_->subchannel(i)->subchannel() != nullptr) { - subchannel_list_->subchannel(i)->StartOrRenewConnectivityWatchLocked(); + subchannel_list_->subchannel(i)->StartConnectivityWatchLocked(); break; } } @@ -315,7 +315,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { // If we've started picking, start trying to connect to the first // subchannel in the new list. if (started_picking_) { - subchannel_list_->subchannel(0)->StartOrRenewConnectivityWatchLocked(); + subchannel_list_->subchannel(0)->StartConnectivityWatchLocked(); } } else { // We do have a selected subchannel. @@ -337,7 +337,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { selected_ = sd; subchannel_list_ = std::move(subchannel_list); DestroyUnselectedSubchannelsLocked(); - sd->StartOrRenewConnectivityWatchLocked(); + sd->StartConnectivityWatchLocked(); // If there was a previously pending update (which may or may // not have contained the currently selected subchannel), drop // it, so that it doesn't override what we've done here. @@ -363,7 +363,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { // subchannel in the new list. if (started_picking_) { latest_pending_subchannel_list_->subchannel(0) - ->StartOrRenewConnectivityWatchLocked(); + ->StartConnectivityWatchLocked(); } } } @@ -420,7 +420,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( grpc_connectivity_state_set(&p->state_tracker_, connectivity_state(), GRPC_ERROR_REF(error), "selected_changed"); // Renew notification. - StartOrRenewConnectivityWatchLocked(); + RenewConnectivityWatchLocked(); } } GRPC_ERROR_UNREF(error); @@ -467,7 +467,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } // Renew notification. - StartOrRenewConnectivityWatchLocked(); + RenewConnectivityWatchLocked(); break; } case GRPC_CHANNEL_TRANSIENT_FAILURE: { @@ -485,7 +485,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "connecting_transient_failure"); } - sd->StartOrRenewConnectivityWatchLocked(); + sd->StartConnectivityWatchLocked(); break; } case GRPC_CHANNEL_CONNECTING: @@ -497,7 +497,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( "connecting_changed"); } // Renew notification. - StartOrRenewConnectivityWatchLocked(); + RenewConnectivityWatchLocked(); break; } case GRPC_CHANNEL_SHUTDOWN: diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index a4bf3e7398d..a84e29bb071 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -444,7 +444,7 @@ void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() { // Start connectivity watch for each subchannel. for (size_t i = 0; i < num_subchannels(); i++) { if (subchannel(i)->subchannel() != nullptr) { - subchannel(i)->StartOrRenewConnectivityWatchLocked(); + subchannel(i)->StartConnectivityWatchLocked(); } } } @@ -584,7 +584,7 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( // If we've started watching, update overall state and renew notification. if (subchannel_list()->started_watching()) { subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked(); - StartOrRenewConnectivityWatchLocked(); + RenewConnectivityWatchLocked(); } GRPC_ERROR_UNREF(error); } diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 5fb92e22f4e..f5799b7c611 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -111,8 +111,9 @@ class SubchannelData { // Synchronously checks the subchannel's connectivity state. Calls // ProcessConnectivityChangeLocked() if the state has changed. // Must not be called while there is a connectivity notification - // pending (i.e., between calling StartOrRenewConnectivityWatchLocked() - // and the resulting invocation of ProcessConnectivityChangeLocked()). + // pending (i.e., between calling StartConnectivityWatchLocked() or + // RenewConnectivityWatchLocked() and the resulting invocation of + // ProcessConnectivityChangeLocked()). void CheckConnectivityStateLocked() { GPR_ASSERT(!connectivity_notification_pending_); grpc_error* error = GRPC_ERROR_NONE; @@ -133,18 +134,22 @@ class SubchannelData { // being unreffed. virtual void UnrefSubchannelLocked(const char* reason); - // Starts or renewes watching the connectivity state of the subchannel. + // Starts watching the connectivity state of the subchannel. // ProcessConnectivityChangeLocked() will be called when the // connectivity state changes. - void StartOrRenewConnectivityWatchLocked(); + void StartConnectivityWatchLocked(); + + // Renews watching the connectivity state of the subchannel. + void RenewConnectivityWatchLocked(); // Stops watching the connectivity state of the subchannel. void StopConnectivityWatchLocked(); // Cancels watching the connectivity state of the subchannel. // Must be called only while there is a connectivity notification - // pending (i.e., between calling StartOrRenewConnectivityWatchLocked() - // and the resulting invocation of ProcessConnectivityChangeLocked()). + // pending (i.e., between calling StartConnectivityWatchLocked() or + // RenewConnectivityWatchLocked() and the resulting invocation of + // ProcessConnectivityChangeLocked()). // From within ProcessConnectivityChangeLocked(), use // StopConnectivityWatchLocked() instead. void CancelConnectivityWatchLocked(const char* reason); @@ -162,12 +167,13 @@ class SubchannelData { virtual ~SubchannelData(); - // After StartOrRenewConnectivityWatchLocked() is called, this method - // will be invoked when the subchannel's connectivity state changes. + // After StartConnectivityWatchLocked() or RenewConnectivityWatchLocked() + // is called, this method will be invoked when the subchannel's connectivity + // state changes. // Implementations can use connectivity_state() to get the new // connectivity state. - // Implementations must invoke either StopConnectivityWatch() or again - // call StartOrRenewConnectivityWatch() before returning. + // Implementations must invoke either RenewConnectivityWatchLocked() or + // StopConnectivityWatchLocked() before returning. virtual void ProcessConnectivityChangeLocked(grpc_error* error) GRPC_ABSTRACT; private: @@ -252,6 +258,8 @@ class SubchannelList TraceFlag* tracer_; + grpc_combiner* combiner_; + // The list of subchannels. SubchannelVector subchannels_; @@ -313,21 +321,39 @@ void SubchannelData:: template void SubchannelData::StartOrRenewConnectivityWatchLocked() { + SubchannelDataType>::StartConnectivityWatchLocked() { if (subchannel_list_->tracer()->enabled()) { gpr_log(GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): requesting connectivity change " + " (subchannel %p): starting watch: requesting connectivity change " "notification (from %s)", subchannel_list_->tracer()->name(), subchannel_list_->policy(), subchannel_list_, Index(), subchannel_list_->num_subchannels(), subchannel_, grpc_connectivity_state_name(pending_connectivity_state_unsafe_)); } - if (!connectivity_notification_pending_) { - subchannel_list()->Ref(DEBUG_LOCATION, "connectivity_watch").release(); - connectivity_notification_pending_ = true; + GPR_ASSERT(!connectivity_notification_pending_); + connectivity_notification_pending_ = true; + subchannel_list()->Ref(DEBUG_LOCATION, "connectivity_watch").release(); + grpc_subchannel_notify_on_state_change( + subchannel_, subchannel_list_->policy()->interested_parties(), + &pending_connectivity_state_unsafe_, &connectivity_changed_closure_); +} + +template +void SubchannelData::RenewConnectivityWatchLocked() { + if (subchannel_list_->tracer()->enabled()) { + gpr_log(GPR_DEBUG, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): renewing watch: requesting connectivity change " + "notification (from %s)", + subchannel_list_->tracer()->name(), subchannel_list_->policy(), + subchannel_list_, Index(), subchannel_list_->num_subchannels(), + subchannel_, + grpc_connectivity_state_name(pending_connectivity_state_unsafe_)); } + GPR_ASSERT(connectivity_notification_pending_); grpc_subchannel_notify_on_state_change( subchannel_, subchannel_list_->policy()->interested_parties(), &pending_connectivity_state_unsafe_, &connectivity_changed_closure_); @@ -360,6 +386,7 @@ void SubchannelData:: subchannel_list_, Index(), subchannel_list_->num_subchannels(), subchannel_, reason); } + GPR_ASSERT(connectivity_notification_pending_); grpc_subchannel_notify_on_state_change(subchannel_, nullptr, nullptr, &connectivity_changed_closure_); } @@ -427,7 +454,7 @@ void SubchannelData:: // Get or release ref to connected subchannel. if (!sd->UpdateConnectedSubchannelLocked()) { // We don't want to report this connectivity state, so renew the watch. - sd->StartOrRenewConnectivityWatchLocked(); + sd->RenewConnectivityWatchLocked(); return; } // Now that we're inside the combiner, copy the pending connectivity @@ -462,7 +489,8 @@ SubchannelList::SubchannelList( const grpc_channel_args& args) : InternallyRefCountedWithTracing(tracer), policy_(policy), - tracer_(tracer) { + tracer_(tracer), + combiner_(GRPC_COMBINER_REF(combiner, "subchannel_list")) { if (tracer_->enabled()) { gpr_log(GPR_DEBUG, "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", @@ -523,6 +551,7 @@ SubchannelList::~SubchannelList() { gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p", tracer_->name(), policy_, this); } + GRPC_COMBINER_UNREF(combiner_, "subchannel_list"); } template