Fix race between READY notification and reffing connected subchannel.

reviewable/pr14886/r6
Mark D. Roth 7 years ago
parent 75d9edab09
commit 542bceb573
  1. 3
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  2. 41
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  3. 43
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h

@ -350,6 +350,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
subchannel_list->num_subchannels()); subchannel_list->num_subchannels());
} }
if (selected_->connected_subchannel() != nullptr) { if (selected_->connected_subchannel() != nullptr) {
// FIXME: restructure to work more like RR?
sd->SetConnectedSubchannelFromLocked(selected_); sd->SetConnectedSubchannelFromLocked(selected_);
} }
selected_ = sd; selected_ = sd;
@ -433,6 +434,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
grpc_connectivity_state_name(connectivity_state()), p->shutdown_, grpc_connectivity_state_name(connectivity_state()), p->shutdown_,
subchannel_list()->shutting_down(), grpc_error_string(error)); subchannel_list()->shutting_down(), grpc_error_string(error));
} }
// FIXME: move this to SubchannelData::OnConnectivityChangedLocked()
// If the subchannel list is shutting down, stop watching. // If the subchannel list is shutting down, stop watching.
if (subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) { if (subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) {
StopConnectivityWatchLocked(); StopConnectivityWatchLocked();
@ -502,7 +504,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
case GRPC_CHANNEL_READY: { case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list_ to // Case 2. Promote p->latest_pending_subchannel_list_ to
// p->subchannel_list_. // p->subchannel_list_.
SetConnectedSubchannelFromSubchannelLocked();
if (p->latest_pending_subchannel_list_ == subchannel_list()) { if (p->latest_pending_subchannel_list_ == subchannel_list()) {
GPR_ASSERT(p->subchannel_list_ != nullptr); GPR_ASSERT(p->subchannel_list_ != nullptr);
p->subchannel_list_->ShutdownLocked("finish_update"); p->subchannel_list_->ShutdownLocked("finish_update");

@ -597,6 +597,7 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
subchannel_list()->shutting_down(), grpc_error_string(error)); subchannel_list()->shutting_down(), grpc_error_string(error));
} }
GPR_ASSERT(subchannel() != nullptr); GPR_ASSERT(subchannel() != nullptr);
// FIXME: move this to SubchannelData::OnConnectivityChangedLocked()
// If the subchannel list is shutting down, stop watching. // If the subchannel list is shutting down, stop watching.
if (subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) { if (subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) {
StopConnectivityWatchLocked(); StopConnectivityWatchLocked();
@ -605,34 +606,20 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
return; return;
} }
// Process the state change. // If the new state is TRANSIENT_FAILURE, re-resolve.
switch (connectivity_state()) { // Only do this if we've started watching, not at startup time.
case GRPC_CHANNEL_TRANSIENT_FAILURE: { // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
// Only re-resolve if we've started watching, not at startup time. // when the subchannel list was created, we'd wind up in a constant
// Otherwise, if the subchannel was already in state TRANSIENT_FAILURE // loop of re-resolution.
// when the subchannel list was created, we'd wind up in a constant if (connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE &&
// loop of re-resolution. subchannel_list()->started_watching()) {
if (subchannel_list()->started_watching()) { if (grpc_lb_round_robin_trace.enabled()) {
if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG,
gpr_log(GPR_DEBUG, "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
"[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " "Requesting re-resolution",
"Requesting re-resolution", p, subchannel());
p, subchannel());
}
p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE);
}
break;
}
case GRPC_CHANNEL_READY: {
if (connected_subchannel() == nullptr) {
SetConnectedSubchannelFromSubchannelLocked();
}
break;
} }
case GRPC_CHANNEL_SHUTDOWN: p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE);
GPR_UNREACHABLE_CODE(return );
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:; // fallthrough
} }
// Update state counters. // Update state counters.
subchannel_list()->UpdateStateCountersLocked( subchannel_list()->UpdateStateCountersLocked(

@ -94,12 +94,7 @@ class SubchannelData {
return curr_connectivity_state_; return curr_connectivity_state_;
} }
// Sets the connected subchannel from the subchannel. // FIXME: remove
void SetConnectedSubchannelFromSubchannelLocked() {
connected_subchannel_ =
grpc_subchannel_get_connected_subchannel(subchannel_);
}
// An alternative to SetConnectedSubchannelFromSubchannelLocked() for // An alternative to SetConnectedSubchannelFromSubchannelLocked() for
// cases where we are retaining a connected subchannel from a previous // cases where we are retaining a connected subchannel from a previous
// subchannel list. This is slightly more efficient than getting the // subchannel list. This is slightly more efficient than getting the
@ -191,10 +186,16 @@ class SubchannelData {
// OnConnectivityChangedLocked(). // OnConnectivityChangedLocked().
grpc_connectivity_state pending_connectivity_state_unsafe_; grpc_connectivity_state pending_connectivity_state_unsafe_;
// Current connectivity state. // Current connectivity state.
// FIXME: move this into RR, not needed in PF because connectivity_state
// is only used in ProcessConnectivityChangeLocked()
// (maybe pass it as a param and eliminate the accessor method?)
grpc_connectivity_state curr_connectivity_state_; grpc_connectivity_state curr_connectivity_state_;
}; };
// A list of subchannels. // A list of subchannels.
// FIXME: make this InternallyRefCounted, and have Orphan() do
// ShutdownLocked()?
// (also, maybe we don't need to take a ref to the LB policy anymore?)
template <typename SubchannelListType, typename SubchannelDataType> template <typename SubchannelListType, typename SubchannelDataType>
class SubchannelList : public RefCountedWithTracing<SubchannelListType> { class SubchannelList : public RefCountedWithTracing<SubchannelListType> {
public: public:
@ -348,14 +349,36 @@ template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType, SubchannelDataType>:: void SubchannelData<SubchannelListType, SubchannelDataType>::
OnConnectivityChangedLocked(void* arg, grpc_error* error) { OnConnectivityChangedLocked(void* arg, grpc_error* error) {
SubchannelData* sd = static_cast<SubchannelData*>(arg); SubchannelData* sd = static_cast<SubchannelData*>(arg);
// FIXME: add trace logging
// If the subchannel is READY, get a ref to the connected subchannel.
if (sd->pending_connectivity_state_unsafe_ == GRPC_CHANNEL_READY) {
sd->connected_subchannel_ =
grpc_subchannel_get_connected_subchannel(sd->subchannel_);
// If the subchannel became disconnected between the time that this
// callback was scheduled and the time that it was actually run in the
// combiner, then the connected subchannel may have disappeared out from
// under us. In that case, instead of propagating the READY notification,
// we simply renew our watch and wait for the next notification.
// Note that we start the renewed watch from IDLE to make sure we
// get a notification for the next state, even if that state is
// READY again (e.g., if the subchannel has transitioned back to
// READY before the callback gets scheduled).
if (sd->connected_subchannel_ == nullptr) {
sd->pending_connectivity_state_unsafe_ = GRPC_CHANNEL_IDLE;
sd->StartConnectivityWatchLocked();
return;
}
}
// If we get TRANSIENT_FAILURE, unref the connected subchannel.
else if (sd->pending_connectivity_state_unsafe_ ==
GRPC_CHANNEL_TRANSIENT_FAILURE) {
sd->connected_subchannel_.reset();
}
// Now that we're inside the combiner, copy the pending connectivity // Now that we're inside the combiner, copy the pending connectivity
// state (which was set by the connectivity state watcher) to // state (which was set by the connectivity state watcher) to
// curr_connectivity_state_, which is what we use inside of the combiner. // curr_connectivity_state_, which is what we use inside of the combiner.
sd->curr_connectivity_state_ = sd->pending_connectivity_state_unsafe_; sd->curr_connectivity_state_ = sd->pending_connectivity_state_unsafe_;
// If we get TRANSIENT_FAILURE, unref the connected subchannel. // Call the subclass's ProcessConnectivityChangeLocked() method.
if (sd->curr_connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) {
sd->connected_subchannel_.reset();
}
sd->ProcessConnectivityChangeLocked(GRPC_ERROR_REF(error)); sd->ProcessConnectivityChangeLocked(GRPC_ERROR_REF(error));
} }

Loading…
Cancel
Save