Clean up connectivity state tracking.

reviewable/pr14886/r6
Mark D. Roth 7 years ago
parent 0839ac6d18
commit 717c100c8c
  1. 17
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  2. 70
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  3. 51
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h

@ -76,7 +76,8 @@ class PickFirst : public LoadBalancingPolicy {
: SubchannelData(subchannel_list, user_data_vtable, address, subchannel, : SubchannelData(subchannel_list, user_data_vtable, address, subchannel,
combiner) {} combiner) {}
void ProcessConnectivityChangeLocked(grpc_error* error) override; void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) override;
}; };
class PickFirstSubchannelList class PickFirstSubchannelList
@ -369,7 +370,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
} }
void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
grpc_error* error) { grpc_connectivity_state connectivity_state, grpc_error* error) {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy()); PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
if (grpc_lb_pick_first_trace.enabled()) { if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
@ -379,7 +380,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
"sd->subchannel_list->shutting_down=%d error=%s", "sd->subchannel_list->shutting_down=%d error=%s",
p, subchannel(), Index(), subchannel_list()->num_subchannels(), p, subchannel(), Index(), subchannel_list()->num_subchannels(),
subchannel_list(), subchannel_list(),
grpc_connectivity_state_name(connectivity_state()), p->shutdown_, grpc_connectivity_state_name(connectivity_state), p->shutdown_,
subchannel_list()->shutting_down(), grpc_error_string(error)); subchannel_list()->shutting_down(), grpc_error_string(error));
} }
// The notification must be for a subchannel in either the current or // The notification must be for a subchannel in either the current or
@ -390,7 +391,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
if (p->selected_ == this) { if (p->selected_ == this) {
// If the new state is anything other than READY and there is a // If the new state is anything other than READY and there is a
// pending update, switch to the pending update. // pending update, switch to the pending update.
if (connectivity_state() != GRPC_CHANNEL_READY && if (connectivity_state != GRPC_CHANNEL_READY &&
p->latest_pending_subchannel_list_ != nullptr) { p->latest_pending_subchannel_list_ != nullptr) {
p->selected_ = nullptr; p->selected_ = nullptr;
StopConnectivityWatchLocked(); StopConnectivityWatchLocked();
@ -404,8 +405,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// re-resolution is introduced. But we need to investigate whether we // re-resolution is introduced. But we need to investigate whether we
// really want to take any action instead of waiting for the selected // really want to take any action instead of waiting for the selected
// subchannel reconnecting. // subchannel reconnecting.
GPR_ASSERT(connectivity_state() != GRPC_CHANNEL_SHUTDOWN); GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
if (connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE) { if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected channel goes bad, request a re-resolution. // If the selected channel goes bad, request a re-resolution.
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE, grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE, GRPC_ERROR_NONE,
@ -417,7 +418,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
UnrefSubchannelLocked("pf_selected_shutdown"); UnrefSubchannelLocked("pf_selected_shutdown");
StopConnectivityWatchLocked(); StopConnectivityWatchLocked();
} else { } else {
grpc_connectivity_state_set(&p->state_tracker_, connectivity_state(), grpc_connectivity_state_set(&p->state_tracker_, connectivity_state,
GRPC_ERROR_REF(error), "selected_changed"); GRPC_ERROR_REF(error), "selected_changed");
// Renew notification. // Renew notification.
RenewConnectivityWatchLocked(); RenewConnectivityWatchLocked();
@ -435,7 +436,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// for a subchannel in p->latest_pending_subchannel_list_. The // for a subchannel in p->latest_pending_subchannel_list_. The
// goal here is to find a subchannel from the update that we can // goal here is to find a subchannel from the update that we can
// select in place of the current one. // select in place of the current one.
switch (connectivity_state()) { switch (connectivity_state) {
case GRPC_CHANNEL_READY: { case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list_ to // Case 2. Promote p->latest_pending_subchannel_list_ to
// p->subchannel_list_. // p->subchannel_list_.

@ -109,12 +109,20 @@ class RoundRobin : public LoadBalancingPolicy {
void* user_data() const { return user_data_; } void* user_data() const { return user_data_; }
grpc_connectivity_state connectivity_state() const {
return last_connectivity_state_;
}
void UpdateConnectivityStateLocked(
grpc_connectivity_state connectivity_state, grpc_error* error);
private: private:
void ProcessConnectivityChangeLocked(grpc_error* error) override; void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) override;
const grpc_lb_user_data_vtable* user_data_vtable_; const grpc_lb_user_data_vtable* user_data_vtable_;
void* user_data_ = nullptr; void* user_data_ = nullptr;
grpc_connectivity_state prev_connectivity_state_ = GRPC_CHANNEL_IDLE; grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE;
}; };
// A list of subchannels. // A list of subchannels.
@ -137,9 +145,6 @@ class RoundRobin : public LoadBalancingPolicy {
// Starts watching the subchannels in this list. // Starts watching the subchannels in this list.
void StartWatchingLocked(); void StartWatchingLocked();
// Returns true if we have started watching.
bool started_watching() const { return started_watching_; }
// Updates the counters of subchannels in each state when a // Updates the counters of subchannels in each state when a
// subchannel transitions from old_state to new_state. // subchannel transitions from old_state to new_state.
// transient_failure_error is the error that is reported when // transient_failure_error is the error that is reported when
@ -158,7 +163,6 @@ class RoundRobin : public LoadBalancingPolicy {
void UpdateRoundRobinStateFromSubchannelStateCountsLocked(); void UpdateRoundRobinStateFromSubchannelStateCountsLocked();
private: private:
bool started_watching_ = false;
size_t num_ready_ = 0; size_t num_ready_ = 0;
size_t num_connecting_ = 0; size_t num_connecting_ = 0;
size_t num_transient_failure_ = 0; size_t num_transient_failure_ = 0;
@ -416,29 +420,16 @@ void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
if (num_subchannels() == 0) return; if (num_subchannels() == 0) return;
// Check current state of each subchannel synchronously, since any // Check current state of each subchannel synchronously, since any
// subchannel already used by some other channel may have a non-IDLE // subchannel already used by some other channel may have a non-IDLE
// state. This will invoke ProcessConnectivityChangeLocked() for each // state.
// subchannel whose state is not IDLE. However, because started_watching_
// is still false, the code there will do two special things:
//
// - It will skip re-resolution for any subchannel in state
// TRANSIENT_FAILURE, since doing this at start-watching-time would
// cause us to enter an endless loop of re-resolution (i.e.,
// re-resolution would cause a new update, and the new update would
// immediately trigger a new re-resolution).
//
// - It will not call UpdateRoundRobinStateFromSubchannelStateCountsLocked();
// instead, we call that here after all subchannels have been checked.
// This allows us to act more intelligently based on the state of all
// subchannels, rather than just acting on the first one. For example,
// if there is more than one pending pick, this allows us to spread the
// picks across all READY subchannels rather than sending them all to
// the first subchannel that reports READY.
for (size_t i = 0; i < num_subchannels(); ++i) { for (size_t i = 0; i < num_subchannels(); ++i) {
subchannel(i)->CheckConnectivityStateLocked(); grpc_error* error = GRPC_ERROR_NONE;
grpc_connectivity_state state =
subchannel(i)->CheckConnectivityStateLocked(&error);
if (state != GRPC_CHANNEL_IDLE) {
subchannel(i)->UpdateConnectivityStateLocked(state, error);
}
} }
// Now set started_watching_ to true and call // Now set the LB policy's state based on the subchannels' states.
// UpdateRoundRobinStateFromSubchannelStateCountsLocked().
started_watching_ = true;
UpdateRoundRobinStateFromSubchannelStateCountsLocked(); UpdateRoundRobinStateFromSubchannelStateCountsLocked();
// Start connectivity watch for each subchannel. // Start connectivity watch for each subchannel.
for (size_t i = 0; i < num_subchannels(); i++) { for (size_t i = 0; i < num_subchannels(); i++) {
@ -544,8 +535,15 @@ void RoundRobin::RoundRobinSubchannelList::
MaybeUpdateRoundRobinConnectivityStateLocked(); MaybeUpdateRoundRobinConnectivityStateLocked();
} }
void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) {
subchannel_list()->UpdateStateCountersLocked(
last_connectivity_state_, connectivity_state, GRPC_ERROR_REF(error));
last_connectivity_state_ = connectivity_state;
}
void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
grpc_error* error) { grpc_connectivity_state connectivity_state, grpc_error* error) {
RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy()); RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
if (grpc_lb_round_robin_trace.enabled()) { if (grpc_lb_round_robin_trace.enabled()) {
gpr_log( gpr_log(
@ -556,8 +554,8 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
"p->shutdown=%d sd->subchannel_list->shutting_down=%d error=%s", "p->shutdown=%d sd->subchannel_list->shutting_down=%d error=%s",
p, subchannel(), subchannel_list(), Index(), p, subchannel(), subchannel_list(), Index(),
subchannel_list()->num_subchannels(), subchannel_list()->num_subchannels(),
grpc_connectivity_state_name(prev_connectivity_state_), grpc_connectivity_state_name(last_connectivity_state_),
grpc_connectivity_state_name(connectivity_state()), p->shutdown_, grpc_connectivity_state_name(connectivity_state), p->shutdown_,
subchannel_list()->shutting_down(), grpc_error_string(error)); subchannel_list()->shutting_down(), grpc_error_string(error));
} }
GPR_ASSERT(subchannel() != nullptr); GPR_ASSERT(subchannel() != nullptr);
@ -566,8 +564,7 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
// Otherwise, if the subchannel was already in state TRANSIENT_FAILURE // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
// when the subchannel list was created, we'd wind up in a constant // when the subchannel list was created, we'd wind up in a constant
// loop of re-resolution. // loop of re-resolution.
if (connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE && if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
subchannel_list()->started_watching()) {
if (grpc_lb_round_robin_trace.enabled()) { if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
@ -577,16 +574,11 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE); p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE);
} }
// Update state counters. // Update state counters.
subchannel_list()->UpdateStateCountersLocked( UpdateConnectivityStateLocked(connectivity_state, error);
prev_connectivity_state_, connectivity_state(), GRPC_ERROR_REF(error)); // Update overall state and renew notification.
prev_connectivity_state_ = connectivity_state();
// If we've started watching, update overall state and renew notification.
if (subchannel_list()->started_watching()) {
subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked(); subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked();
RenewConnectivityWatchLocked(); RenewConnectivityWatchLocked();
} }
GRPC_ERROR_UNREF(error);
}
grpc_connectivity_state RoundRobin::CheckConnectivityLocked( grpc_connectivity_state RoundRobin::CheckConnectivityLocked(
grpc_error** error) { grpc_error** error) {

@ -49,7 +49,8 @@ class MySubchannelList; // Forward declaration.
class MySubchannelData class MySubchannelData
: public SubchannelData<MySubchannelList, MySubchannelData> { : public SubchannelData<MySubchannelList, MySubchannelData> {
public: public:
void ProcessConnectivityChangeLocked(grpc_error* error) override { void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) override {
// ...code to handle connectivity changes... // ...code to handle connectivity changes...
} }
}; };
@ -88,13 +89,7 @@ class SubchannelData {
return connected_subchannel_.get(); return connected_subchannel_.get();
} }
// The current connectivity state. // FIXME: remove
// May be called from ProcessConnectivityChangeLocked() to determine
// the state that the subchannel has transitioned into.
grpc_connectivity_state connectivity_state() const {
return curr_connectivity_state_;
}
// Used to set the connected subchannel in cases where we are retaining a // Used to set the connected subchannel in cases where we are retaining a
// subchannel from a previous subchannel list. This is slightly more // subchannel from a previous subchannel list. This is slightly more
// efficient than getting the connected subchannel from the subchannel, // efficient than getting the connected subchannel from the subchannel,
@ -108,25 +103,17 @@ class SubchannelData {
connected_subchannel_ = other->connected_subchannel_; // Adds ref. connected_subchannel_ = other->connected_subchannel_; // Adds ref.
} }
// Synchronously checks the subchannel's connectivity state. Calls // Synchronously checks the subchannel's connectivity state.
// ProcessConnectivityChangeLocked() if the state has changed.
// Must not be called while there is a connectivity notification // Must not be called while there is a connectivity notification
// pending (i.e., between calling StartConnectivityWatchLocked() or // pending (i.e., between calling StartConnectivityWatchLocked() or
// RenewConnectivityWatchLocked() and the resulting invocation of // RenewConnectivityWatchLocked() and the resulting invocation of
// ProcessConnectivityChangeLocked()). // ProcessConnectivityChangeLocked()).
void CheckConnectivityStateLocked() { grpc_connectivity_state CheckConnectivityStateLocked(grpc_error** error) {
GPR_ASSERT(!connectivity_notification_pending_); GPR_ASSERT(!connectivity_notification_pending_);
grpc_error* error = GRPC_ERROR_NONE;
pending_connectivity_state_unsafe_ = pending_connectivity_state_unsafe_ =
grpc_subchannel_check_connectivity(subchannel(), &error); grpc_subchannel_check_connectivity(subchannel(), error);
UpdateConnectedSubchannelLocked(); UpdateConnectedSubchannelLocked();
// FIXME: move the rest of this into RR return pending_connectivity_state_unsafe_;
if (pending_connectivity_state_unsafe_ != curr_connectivity_state_) {
curr_connectivity_state_ = pending_connectivity_state_unsafe_;
ProcessConnectivityChangeLocked(error);
} else {
GRPC_ERROR_UNREF(error);
}
} }
// Unrefs the subchannel. May be used if an individual subchannel is // Unrefs the subchannel. May be used if an individual subchannel is
@ -170,11 +157,11 @@ class SubchannelData {
// After StartConnectivityWatchLocked() or RenewConnectivityWatchLocked() // After StartConnectivityWatchLocked() or RenewConnectivityWatchLocked()
// is called, this method will be invoked when the subchannel's connectivity // is called, this method will be invoked when the subchannel's connectivity
// state changes. // state changes.
// Implementations can use connectivity_state() to get the new
// connectivity state.
// Implementations must invoke either RenewConnectivityWatchLocked() or // Implementations must invoke either RenewConnectivityWatchLocked() or
// StopConnectivityWatchLocked() before returning. // StopConnectivityWatchLocked() before returning.
virtual void ProcessConnectivityChangeLocked(grpc_error* error) GRPC_ABSTRACT; virtual void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state,
grpc_error* error) GRPC_ABSTRACT;
private: private:
// Updates connected_subchannel_ based on pending_connectivity_state_unsafe_. // Updates connected_subchannel_ based on pending_connectivity_state_unsafe_.
@ -196,14 +183,8 @@ class SubchannelData {
bool connectivity_notification_pending_ = false; bool connectivity_notification_pending_ = false;
// Connectivity state to be updated by // Connectivity state to be updated by
// grpc_subchannel_notify_on_state_change(), not guarded by // grpc_subchannel_notify_on_state_change(), not guarded by
// the combiner. Will be copied to curr_connectivity_state_ by // the combiner.
// OnConnectivityChangedLocked().
grpc_connectivity_state pending_connectivity_state_unsafe_; grpc_connectivity_state pending_connectivity_state_unsafe_;
// Current connectivity state.
// FIXME: move this into RR, not needed in PF because connectivity_state
// is only used in ProcessConnectivityChangeLocked()
// (maybe pass it as a param and eliminate the accessor method?)
grpc_connectivity_state curr_connectivity_state_;
}; };
// A list of subchannels. // A list of subchannels.
@ -287,8 +268,7 @@ SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
subchannel_(subchannel), subchannel_(subchannel),
// We assume that the current state is IDLE. If not, we'll get a // We assume that the current state is IDLE. If not, we'll get a
// callback telling us that. // callback telling us that.
pending_connectivity_state_unsafe_(GRPC_CHANNEL_IDLE), pending_connectivity_state_unsafe_(GRPC_CHANNEL_IDLE) {
curr_connectivity_state_(GRPC_CHANNEL_IDLE) {
GRPC_CLOSURE_INIT( GRPC_CLOSURE_INIT(
&connectivity_changed_closure_, &connectivity_changed_closure_,
(&SubchannelData<SubchannelListType, (&SubchannelData<SubchannelListType,
@ -457,12 +437,9 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
sd->RenewConnectivityWatchLocked(); sd->RenewConnectivityWatchLocked();
return; return;
} }
// Now that we're inside the combiner, copy the pending connectivity
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state_, which is what we use inside of the combiner.
sd->curr_connectivity_state_ = sd->pending_connectivity_state_unsafe_;
// Call the subclass's ProcessConnectivityChangeLocked() method. // Call the subclass's ProcessConnectivityChangeLocked() method.
sd->ProcessConnectivityChangeLocked(GRPC_ERROR_REF(error)); sd->ProcessConnectivityChangeLocked(sd->pending_connectivity_state_unsafe_,
GRPC_ERROR_REF(error));
} }
template <typename SubchannelListType, typename SubchannelDataType> template <typename SubchannelListType, typename SubchannelDataType>

Loading…
Cancel
Save