From 8d51cabcb9e67928739650ac2e78a748d8e5be9d Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 27 Apr 2018 08:35:11 -0700 Subject: [PATCH] Track last ready subchannel index in RoundRobinSubchannelList. --- .../lb_policy/round_robin/round_robin.cc | 138 +++++++++--------- 1 file changed, 66 insertions(+), 72 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 764d2c8ad76..1b17cbdd985 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -162,11 +162,15 @@ class RoundRobin : public LoadBalancingPolicy { // subchannels in each state. void UpdateRoundRobinStateFromSubchannelStateCountsLocked(); + size_t GetNextReadySubchannelIndexLocked(); + void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index); + private: size_t num_ready_ = 0; size_t num_connecting_ = 0; size_t num_transient_failure_ = 0; grpc_error* last_transient_failure_error_ = GRPC_ERROR_NONE; + size_t last_ready_index_ = -1; // Index into list of last pick. }; void ShutdownLocked() override; @@ -174,8 +178,6 @@ class RoundRobin : public LoadBalancingPolicy { void StartPickingLocked(); bool DoPickLocked(PickState* pick); void DrainPendingPicksLocked(); - size_t GetNextReadySubchannelIndexLocked(); - void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index); /** list of subchannels */ OrphanablePtr subchannel_list_; @@ -193,8 +195,6 @@ class RoundRobin : public LoadBalancingPolicy { PickState* pending_picks_ = nullptr; /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker_; - /** Index into subchannel_list_ for last pick. */ - size_t last_ready_subchannel_index_ = -1; }; RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) { @@ -220,68 +220,6 @@ RoundRobin::~RoundRobin() { grpc_subchannel_index_unref(); } -/** Returns the index into p->subchannel_list->subchannels of the next - * subchannel in READY state, or p->subchannel_list->num_subchannels if no - * subchannel is READY. - * - * Note that this function does *not* update p->last_ready_subchannel_index. - * The caller must do that if it returns a pick. */ -size_t RoundRobin::GetNextReadySubchannelIndexLocked() { - GPR_ASSERT(subchannel_list_ != nullptr); - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_INFO, - "[RR %p] getting next ready subchannel (out of %" PRIuPTR - "), " - "last_ready_subchannel_index=%" PRIuPTR, - this, subchannel_list_->num_subchannels(), - last_ready_subchannel_index_); - } - for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) { - const size_t index = (i + last_ready_subchannel_index_ + 1) % - subchannel_list_->num_subchannels(); - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log( - GPR_INFO, - "[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR - ": state=%s", - this, subchannel_list_->subchannel(index)->subchannel(), - subchannel_list_.get(), index, - grpc_connectivity_state_name( - subchannel_list_->subchannel(index)->connectivity_state())); - } - if (subchannel_list_->subchannel(index)->connectivity_state() == - GRPC_CHANNEL_READY) { - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_INFO, - "[RR %p] found next ready subchannel (%p) at index %" PRIuPTR - " of subchannel_list %p", - this, subchannel_list_->subchannel(index)->subchannel(), index, - subchannel_list_.get()); - } - return index; - } - } - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_INFO, "[RR %p] no subchannels in ready state", this); - } - return subchannel_list_->num_subchannels(); -} - -// Sets last_ready_subchannel_index_ to last_ready_index. -void RoundRobin::UpdateLastReadySubchannelIndexLocked(size_t last_ready_index) { - GPR_ASSERT(last_ready_index < subchannel_list_->num_subchannels()); - last_ready_subchannel_index_ = last_ready_index; - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_INFO, - "[RR %p] setting last_ready_subchannel_index=%" PRIuPTR - " (SC %p, CSC %p)", - this, last_ready_index, - subchannel_list_->subchannel(last_ready_index)->subchannel(), - subchannel_list_->subchannel(last_ready_index) - ->connected_subchannel()); - } -} - void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { PickState* pick; while ((pick = pending_picks_) != nullptr) { @@ -366,7 +304,8 @@ void RoundRobin::ExitIdleLocked() { } bool RoundRobin::DoPickLocked(PickState* pick) { - const size_t next_ready_index = GetNextReadySubchannelIndexLocked(); + const size_t next_ready_index = + subchannel_list_->GetNextReadySubchannelIndexLocked(); if (next_ready_index < subchannel_list_->num_subchannels()) { /* readily available, report right away */ RoundRobinSubchannelData* sd = @@ -384,7 +323,7 @@ bool RoundRobin::DoPickLocked(PickState* pick) { sd->subchannel_list(), next_ready_index); } /* only advance the last picked pointer if the selection was used */ - UpdateLastReadySubchannelIndexLocked(next_ready_index); + subchannel_list_->UpdateLastReadySubchannelIndexLocked(next_ready_index); return true; } return false; @@ -511,7 +450,7 @@ void RoundRobin::RoundRobinSubchannelList:: // Promote this list to p->subchannel_list_. // This list must be p->latest_pending_subchannel_list_, because // any previous update would have been shut down already and - // therefore weeded out in ProcessConnectivityChangeLocked(). + // therefore we would not be receiving a notification for them. GPR_ASSERT(p->latest_pending_subchannel_list_.get() == this); GPR_ASSERT(!shutting_down()); if (grpc_lb_round_robin_trace.enabled()) { @@ -526,7 +465,6 @@ void RoundRobin::RoundRobinSubchannelList:: num_subchannels()); } p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); - p->last_ready_subchannel_index_ = -1; } // Drain pending picks. p->DrainPendingPicksLocked(); @@ -580,6 +518,62 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( RenewConnectivityWatchLocked(); } +/** Returns the index into p->subchannel_list->subchannels of the next + * subchannel in READY state, or p->subchannel_list->num_subchannels if no + * subchannel is READY. + * + * Note that this function does *not* update p->last_ready_subchannel_index. + * The caller must do that if it returns a pick. */ +size_t +RoundRobin::RoundRobinSubchannelList::GetNextReadySubchannelIndexLocked() { + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_INFO, + "[RR %p] getting next ready subchannel (out of %" PRIuPTR + "), last_ready_index=%" PRIuPTR, + policy(), num_subchannels(), last_ready_index_); + } + for (size_t i = 0; i < num_subchannels(); ++i) { + const size_t index = (i + last_ready_index_ + 1) % num_subchannels(); + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log( + GPR_INFO, + "[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR + ": state=%s", + policy(), subchannel(index)->subchannel(), this, index, + grpc_connectivity_state_name( + subchannel(index)->connectivity_state())); + } + if (subchannel(index)->connectivity_state() == GRPC_CHANNEL_READY) { + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_INFO, + "[RR %p] found next ready subchannel (%p) at index %" PRIuPTR + " of subchannel_list %p", + policy(), subchannel(index)->subchannel(), index, this); + } + return index; + } + } + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_INFO, "[RR %p] no subchannels in ready state", this); + } + return num_subchannels(); +} + +// Sets last_ready_index_ to last_ready_index. +void RoundRobin::RoundRobinSubchannelList::UpdateLastReadySubchannelIndexLocked( + size_t last_ready_index) { + GPR_ASSERT(last_ready_index < num_subchannels()); + last_ready_index_ = last_ready_index; + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_INFO, + "[RR %p] setting last_ready_subchannel_index=%" PRIuPTR + " (SC %p, CSC %p)", + policy(), last_ready_index, + subchannel(last_ready_index)->subchannel(), + subchannel(last_ready_index)->connected_subchannel()); + } +} + grpc_connectivity_state RoundRobin::CheckConnectivityLocked( grpc_error** error) { return grpc_connectivity_state_get(&state_tracker_, error); @@ -593,7 +587,8 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current, void RoundRobin::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { - const size_t next_ready_index = GetNextReadySubchannelIndexLocked(); + const size_t next_ready_index = + subchannel_list_->GetNextReadySubchannelIndexLocked(); if (next_ready_index < subchannel_list_->num_subchannels()) { RoundRobinSubchannelData* selected = subchannel_list_->subchannel(next_ready_index); @@ -648,7 +643,6 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args) { "rr_update_empty"); } subchannel_list_ = std::move(latest_pending_subchannel_list_); - last_ready_subchannel_index_ = -1; } else { // If we've started picking, start watching the new list. latest_pending_subchannel_list_->StartWatchingLocked();