From 542bceb573b4c52a883f95ff240c6aba473790bc Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 12 Apr 2018 15:08:36 -0700 Subject: [PATCH] Fix race between READY notification and reffing connected subchannel. --- .../lb_policy/pick_first/pick_first.cc | 3 +- .../lb_policy/round_robin/round_robin.cc | 41 ++++++------------ .../lb_policy/subchannel_list.h | 43 ++++++++++++++----- 3 files changed, 49 insertions(+), 38 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 03e5c892813..24a0c83b1a6 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -350,6 +350,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { subchannel_list->num_subchannels()); } if (selected_->connected_subchannel() != nullptr) { +// FIXME: restructure to work more like RR? sd->SetConnectedSubchannelFromLocked(selected_); } selected_ = sd; @@ -433,6 +434,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( grpc_connectivity_state_name(connectivity_state()), p->shutdown_, subchannel_list()->shutting_down(), grpc_error_string(error)); } +// FIXME: move this to SubchannelData::OnConnectivityChangedLocked() // If the subchannel list is shutting down, stop watching. if (subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) { StopConnectivityWatchLocked(); @@ -502,7 +504,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( case GRPC_CHANNEL_READY: { // Case 2. Promote p->latest_pending_subchannel_list_ to // p->subchannel_list_. - SetConnectedSubchannelFromSubchannelLocked(); if (p->latest_pending_subchannel_list_ == subchannel_list()) { GPR_ASSERT(p->subchannel_list_ != nullptr); p->subchannel_list_->ShutdownLocked("finish_update"); diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index f9bd0c0eb4e..889616e0561 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -597,6 +597,7 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( subchannel_list()->shutting_down(), grpc_error_string(error)); } GPR_ASSERT(subchannel() != nullptr); +// FIXME: move this to SubchannelData::OnConnectivityChangedLocked() // If the subchannel list is shutting down, stop watching. if (subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) { StopConnectivityWatchLocked(); @@ -605,34 +606,20 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( GRPC_ERROR_UNREF(error); return; } - // Process the state change. - switch (connectivity_state()) { - case GRPC_CHANNEL_TRANSIENT_FAILURE: { - // Only re-resolve if we've started watching, not at startup time. - // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE - // when the subchannel list was created, we'd wind up in a constant - // loop of re-resolution. - if (subchannel_list()->started_watching()) { - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, - "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " - "Requesting re-resolution", - p, subchannel()); - } - p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE); - } - break; - } - case GRPC_CHANNEL_READY: { - if (connected_subchannel() == nullptr) { - SetConnectedSubchannelFromSubchannelLocked(); - } - break; + // If the new state is TRANSIENT_FAILURE, re-resolve. + // Only do this if we've started watching, not at startup time. + // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE + // when the subchannel list was created, we'd wind up in a constant + // loop of re-resolution. + if (connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE && + subchannel_list()->started_watching()) { + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_DEBUG, + "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " + "Requesting re-resolution", + p, subchannel()); } - case GRPC_CHANNEL_SHUTDOWN: - GPR_UNREACHABLE_CODE(return ); - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE:; // fallthrough + p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE); } // Update state counters. subchannel_list()->UpdateStateCountersLocked( diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index b3fc5fefe9c..e13504313d6 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -94,12 +94,7 @@ class SubchannelData { return curr_connectivity_state_; } - // Sets the connected subchannel from the subchannel. - void SetConnectedSubchannelFromSubchannelLocked() { - connected_subchannel_ = - grpc_subchannel_get_connected_subchannel(subchannel_); - } - +// FIXME: remove // An alternative to SetConnectedSubchannelFromSubchannelLocked() for // cases where we are retaining a connected subchannel from a previous // subchannel list. This is slightly more efficient than getting the @@ -191,10 +186,16 @@ class SubchannelData { // OnConnectivityChangedLocked(). grpc_connectivity_state pending_connectivity_state_unsafe_; // Current connectivity state. +// FIXME: move this into RR, not needed in PF because connectivity_state +// is only used in ProcessConnectivityChangeLocked() +// (maybe pass it as a param and eliminate the accessor method?) grpc_connectivity_state curr_connectivity_state_; }; // A list of subchannels. +// FIXME: make this InternallyRefCounted, and have Orphan() do +// ShutdownLocked()? +// (also, maybe we don't need to take a ref to the LB policy anymore?) template class SubchannelList : public RefCountedWithTracing { public: @@ -348,14 +349,36 @@ template void SubchannelData:: OnConnectivityChangedLocked(void* arg, grpc_error* error) { SubchannelData* sd = static_cast(arg); +// FIXME: add trace logging + // If the subchannel is READY, get a ref to the connected subchannel. + if (sd->pending_connectivity_state_unsafe_ == GRPC_CHANNEL_READY) { + sd->connected_subchannel_ = + grpc_subchannel_get_connected_subchannel(sd->subchannel_); + // If the subchannel became disconnected between the time that this + // callback was scheduled and the time that it was actually run in the + // combiner, then the connected subchannel may have disappeared out from + // under us. In that case, instead of propagating the READY notification, + // we simply renew our watch and wait for the next notification. + // Note that we start the renewed watch from IDLE to make sure we + // get a notification for the next state, even if that state is + // READY again (e.g., if the subchannel has transitioned back to + // READY before the callback gets scheduled). + if (sd->connected_subchannel_ == nullptr) { + sd->pending_connectivity_state_unsafe_ = GRPC_CHANNEL_IDLE; + sd->StartConnectivityWatchLocked(); + return; + } + } + // If we get TRANSIENT_FAILURE, unref the connected subchannel. + else if (sd->pending_connectivity_state_unsafe_ == + GRPC_CHANNEL_TRANSIENT_FAILURE) { + sd->connected_subchannel_.reset(); + } // Now that we're inside the combiner, copy the pending connectivity // state (which was set by the connectivity state watcher) to // curr_connectivity_state_, which is what we use inside of the combiner. sd->curr_connectivity_state_ = sd->pending_connectivity_state_unsafe_; - // If we get TRANSIENT_FAILURE, unref the connected subchannel. - if (sd->curr_connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) { - sd->connected_subchannel_.reset(); - } + // Call the subclass's ProcessConnectivityChangeLocked() method. sd->ProcessConnectivityChangeLocked(GRPC_ERROR_REF(error)); }