diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index dc98a921788..1fecdebccf2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -76,7 +76,8 @@ class PickFirst : public LoadBalancingPolicy { : SubchannelData(subchannel_list, user_data_vtable, address, subchannel, combiner) {} - void ProcessConnectivityChangeLocked(grpc_error* error) override; + void ProcessConnectivityChangeLocked( + grpc_connectivity_state connectivity_state, grpc_error* error) override; }; class PickFirstSubchannelList @@ -369,7 +370,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { } void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( - grpc_error* error) { + grpc_connectivity_state connectivity_state, grpc_error* error) { PickFirst* p = static_cast(subchannel_list()->policy()); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, @@ -379,7 +380,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( "sd->subchannel_list->shutting_down=%d error=%s", p, subchannel(), Index(), subchannel_list()->num_subchannels(), subchannel_list(), - grpc_connectivity_state_name(connectivity_state()), p->shutdown_, + grpc_connectivity_state_name(connectivity_state), p->shutdown_, subchannel_list()->shutting_down(), grpc_error_string(error)); } // The notification must be for a subchannel in either the current or @@ -390,7 +391,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( if (p->selected_ == this) { // If the new state is anything other than READY and there is a // pending update, switch to the pending update. - if (connectivity_state() != GRPC_CHANNEL_READY && + if (connectivity_state != GRPC_CHANNEL_READY && p->latest_pending_subchannel_list_ != nullptr) { p->selected_ = nullptr; StopConnectivityWatchLocked(); @@ -404,8 +405,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( // re-resolution is introduced. But we need to investigate whether we // really want to take any action instead of waiting for the selected // subchannel reconnecting. - GPR_ASSERT(connectivity_state() != GRPC_CHANNEL_SHUTDOWN); - if (connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN); + if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // If the selected channel goes bad, request a re-resolution. grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, @@ -417,7 +418,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( UnrefSubchannelLocked("pf_selected_shutdown"); StopConnectivityWatchLocked(); } else { - grpc_connectivity_state_set(&p->state_tracker_, connectivity_state(), + grpc_connectivity_state_set(&p->state_tracker_, connectivity_state, GRPC_ERROR_REF(error), "selected_changed"); // Renew notification. RenewConnectivityWatchLocked(); @@ -435,7 +436,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( // for a subchannel in p->latest_pending_subchannel_list_. The // goal here is to find a subchannel from the update that we can // select in place of the current one. - switch (connectivity_state()) { + switch (connectivity_state) { case GRPC_CHANNEL_READY: { // Case 2. Promote p->latest_pending_subchannel_list_ to // p->subchannel_list_. diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index d6a738ffc98..764d2c8ad76 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -109,12 +109,20 @@ class RoundRobin : public LoadBalancingPolicy { void* user_data() const { return user_data_; } + grpc_connectivity_state connectivity_state() const { + return last_connectivity_state_; + } + + void UpdateConnectivityStateLocked( + grpc_connectivity_state connectivity_state, grpc_error* error); + private: - void ProcessConnectivityChangeLocked(grpc_error* error) override; + void ProcessConnectivityChangeLocked( + grpc_connectivity_state connectivity_state, grpc_error* error) override; const grpc_lb_user_data_vtable* user_data_vtable_; void* user_data_ = nullptr; - grpc_connectivity_state prev_connectivity_state_ = GRPC_CHANNEL_IDLE; + grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE; }; // A list of subchannels. @@ -137,9 +145,6 @@ class RoundRobin : public LoadBalancingPolicy { // Starts watching the subchannels in this list. void StartWatchingLocked(); - // Returns true if we have started watching. - bool started_watching() const { return started_watching_; } - // Updates the counters of subchannels in each state when a // subchannel transitions from old_state to new_state. // transient_failure_error is the error that is reported when @@ -158,7 +163,6 @@ class RoundRobin : public LoadBalancingPolicy { void UpdateRoundRobinStateFromSubchannelStateCountsLocked(); private: - bool started_watching_ = false; size_t num_ready_ = 0; size_t num_connecting_ = 0; size_t num_transient_failure_ = 0; @@ -416,29 +420,16 @@ void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() { if (num_subchannels() == 0) return; // Check current state of each subchannel synchronously, since any // subchannel already used by some other channel may have a non-IDLE - // state. This will invoke ProcessConnectivityChangeLocked() for each - // subchannel whose state is not IDLE. However, because started_watching_ - // is still false, the code there will do two special things: - // - // - It will skip re-resolution for any subchannel in state - // TRANSIENT_FAILURE, since doing this at start-watching-time would - // cause us to enter an endless loop of re-resolution (i.e., - // re-resolution would cause a new update, and the new update would - // immediately trigger a new re-resolution). - // - // - It will not call UpdateRoundRobinStateFromSubchannelStateCountsLocked(); - // instead, we call that here after all subchannels have been checked. - // This allows us to act more intelligently based on the state of all - // subchannels, rather than just acting on the first one. For example, - // if there is more than one pending pick, this allows us to spread the - // picks across all READY subchannels rather than sending them all to - // the first subchannel that reports READY. + // state. for (size_t i = 0; i < num_subchannels(); ++i) { - subchannel(i)->CheckConnectivityStateLocked(); + grpc_error* error = GRPC_ERROR_NONE; + grpc_connectivity_state state = + subchannel(i)->CheckConnectivityStateLocked(&error); + if (state != GRPC_CHANNEL_IDLE) { + subchannel(i)->UpdateConnectivityStateLocked(state, error); + } } - // Now set started_watching_ to true and call - // UpdateRoundRobinStateFromSubchannelStateCountsLocked(). - started_watching_ = true; + // Now set the LB policy's state based on the subchannels' states. UpdateRoundRobinStateFromSubchannelStateCountsLocked(); // Start connectivity watch for each subchannel. for (size_t i = 0; i < num_subchannels(); i++) { @@ -544,8 +535,15 @@ void RoundRobin::RoundRobinSubchannelList:: MaybeUpdateRoundRobinConnectivityStateLocked(); } +void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked( + grpc_connectivity_state connectivity_state, grpc_error* error) { + subchannel_list()->UpdateStateCountersLocked( + last_connectivity_state_, connectivity_state, GRPC_ERROR_REF(error)); + last_connectivity_state_ = connectivity_state; +} + void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( - grpc_error* error) { + grpc_connectivity_state connectivity_state, grpc_error* error) { RoundRobin* p = static_cast(subchannel_list()->policy()); if (grpc_lb_round_robin_trace.enabled()) { gpr_log( @@ -556,8 +554,8 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( "p->shutdown=%d sd->subchannel_list->shutting_down=%d error=%s", p, subchannel(), subchannel_list(), Index(), subchannel_list()->num_subchannels(), - grpc_connectivity_state_name(prev_connectivity_state_), - grpc_connectivity_state_name(connectivity_state()), p->shutdown_, + grpc_connectivity_state_name(last_connectivity_state_), + grpc_connectivity_state_name(connectivity_state), p->shutdown_, subchannel_list()->shutting_down(), grpc_error_string(error)); } GPR_ASSERT(subchannel() != nullptr); @@ -566,8 +564,7 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE // when the subchannel list was created, we'd wind up in a constant // loop of re-resolution. - if (connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE && - subchannel_list()->started_watching()) { + if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " @@ -577,15 +574,10 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE); } // Update state counters. - subchannel_list()->UpdateStateCountersLocked( - prev_connectivity_state_, connectivity_state(), GRPC_ERROR_REF(error)); - prev_connectivity_state_ = connectivity_state(); - // If we've started watching, update overall state and renew notification. - if (subchannel_list()->started_watching()) { - subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked(); - RenewConnectivityWatchLocked(); - } - GRPC_ERROR_UNREF(error); + UpdateConnectivityStateLocked(connectivity_state, error); + // Update overall state and renew notification. + subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked(); + RenewConnectivityWatchLocked(); } grpc_connectivity_state RoundRobin::CheckConnectivityLocked( diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index bae3f0ba71d..bad50c461cc 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -49,7 +49,8 @@ class MySubchannelList; // Forward declaration. class MySubchannelData : public SubchannelData { public: - void ProcessConnectivityChangeLocked(grpc_error* error) override { + void ProcessConnectivityChangeLocked( + grpc_connectivity_state connectivity_state, grpc_error* error) override { // ...code to handle connectivity changes... } }; @@ -88,13 +89,7 @@ class SubchannelData { return connected_subchannel_.get(); } - // The current connectivity state. - // May be called from ProcessConnectivityChangeLocked() to determine - // the state that the subchannel has transitioned into. - grpc_connectivity_state connectivity_state() const { - return curr_connectivity_state_; - } - +// FIXME: remove // Used to set the connected subchannel in cases where we are retaining a // subchannel from a previous subchannel list. This is slightly more // efficient than getting the connected subchannel from the subchannel, @@ -108,25 +103,17 @@ class SubchannelData { connected_subchannel_ = other->connected_subchannel_; // Adds ref. } - // Synchronously checks the subchannel's connectivity state. Calls - // ProcessConnectivityChangeLocked() if the state has changed. + // Synchronously checks the subchannel's connectivity state. // Must not be called while there is a connectivity notification // pending (i.e., between calling StartConnectivityWatchLocked() or // RenewConnectivityWatchLocked() and the resulting invocation of // ProcessConnectivityChangeLocked()). - void CheckConnectivityStateLocked() { + grpc_connectivity_state CheckConnectivityStateLocked(grpc_error** error) { GPR_ASSERT(!connectivity_notification_pending_); - grpc_error* error = GRPC_ERROR_NONE; pending_connectivity_state_unsafe_ = - grpc_subchannel_check_connectivity(subchannel(), &error); + grpc_subchannel_check_connectivity(subchannel(), error); UpdateConnectedSubchannelLocked(); -// FIXME: move the rest of this into RR - if (pending_connectivity_state_unsafe_ != curr_connectivity_state_) { - curr_connectivity_state_ = pending_connectivity_state_unsafe_; - ProcessConnectivityChangeLocked(error); - } else { - GRPC_ERROR_UNREF(error); - } + return pending_connectivity_state_unsafe_; } // Unrefs the subchannel. May be used if an individual subchannel is @@ -170,11 +157,11 @@ class SubchannelData { // After StartConnectivityWatchLocked() or RenewConnectivityWatchLocked() // is called, this method will be invoked when the subchannel's connectivity // state changes. - // Implementations can use connectivity_state() to get the new - // connectivity state. // Implementations must invoke either RenewConnectivityWatchLocked() or // StopConnectivityWatchLocked() before returning. - virtual void ProcessConnectivityChangeLocked(grpc_error* error) GRPC_ABSTRACT; + virtual void ProcessConnectivityChangeLocked( + grpc_connectivity_state connectivity_state, + grpc_error* error) GRPC_ABSTRACT; private: // Updates connected_subchannel_ based on pending_connectivity_state_unsafe_. @@ -196,14 +183,8 @@ class SubchannelData { bool connectivity_notification_pending_ = false; // Connectivity state to be updated by // grpc_subchannel_notify_on_state_change(), not guarded by - // the combiner. Will be copied to curr_connectivity_state_ by - // OnConnectivityChangedLocked(). + // the combiner. grpc_connectivity_state pending_connectivity_state_unsafe_; - // Current connectivity state. -// FIXME: move this into RR, not needed in PF because connectivity_state -// is only used in ProcessConnectivityChangeLocked() -// (maybe pass it as a param and eliminate the accessor method?) - grpc_connectivity_state curr_connectivity_state_; }; // A list of subchannels. @@ -287,8 +268,7 @@ SubchannelData::SubchannelData( subchannel_(subchannel), // We assume that the current state is IDLE. If not, we'll get a // callback telling us that. - pending_connectivity_state_unsafe_(GRPC_CHANNEL_IDLE), - curr_connectivity_state_(GRPC_CHANNEL_IDLE) { + pending_connectivity_state_unsafe_(GRPC_CHANNEL_IDLE) { GRPC_CLOSURE_INIT( &connectivity_changed_closure_, (&SubchannelData:: sd->RenewConnectivityWatchLocked(); return; } - // Now that we're inside the combiner, copy the pending connectivity - // state (which was set by the connectivity state watcher) to - // curr_connectivity_state_, which is what we use inside of the combiner. - sd->curr_connectivity_state_ = sd->pending_connectivity_state_unsafe_; // Call the subclass's ProcessConnectivityChangeLocked() method. - sd->ProcessConnectivityChangeLocked(GRPC_ERROR_REF(error)); + sd->ProcessConnectivityChangeLocked(sd->pending_connectivity_state_unsafe_, + GRPC_ERROR_REF(error)); } template