diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 3a0937015e7..dc504a5a399 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -104,11 +104,6 @@ class RoundRobin : public LoadBalancingPolicy { void* user_data() const { return user_data_; } - grpc_connectivity_state CheckConnectivityStateLocked() override { - prev_connectivity_state_ = SubchannelData::CheckConnectivityStateLocked(); - return prev_connectivity_state_; - } - private: const grpc_lb_user_data_vtable* user_data_vtable_; void* user_data_ = nullptr; @@ -125,23 +120,27 @@ class RoundRobin : public LoadBalancingPolicy { grpc_client_channel_factory* client_channel_factory, const grpc_channel_args& args) : SubchannelList(policy, tracer, addresses, combiner, - client_channel_factory, args), - num_idle_(num_subchannels()) {} + client_channel_factory, args) {} void RefForConnectivityWatch(const char* reason); void UnrefForConnectivityWatch(const char* reason); + void StartWatchingLocked(); + void UpdateStateCountersLocked(grpc_connectivity_state old_state, grpc_connectivity_state new_state); - size_t num_ready() const { return num_ready_; } - size_t num_transient_failure() const { return num_transient_failure_; } - size_t num_idle() const { return num_idle_; } + void UpdateConnectivityStateLocked(); + + void UpdateOverallStateLocked(); + + bool initialized() const { return initialized_; } private: + bool initialized_ = false; size_t num_ready_ = 0; + size_t num_connecting_ = 0; size_t num_transient_failure_ = 0; - size_t num_idle_; }; void ShutdownLocked() override; @@ -149,9 +148,8 @@ class RoundRobin : public LoadBalancingPolicy { void StartPickingLocked(); size_t GetNextReadySubchannelIndexLocked(); bool DoPickLocked(PickState* pick); + void DrainPendingPicksLocked(); void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index); - void UpdateConnectivityStateLocked(grpc_connectivity_state state, - grpc_error* error); /** list of subchannels */ RefCountedPtr subchannel_list_; @@ -170,7 +168,7 @@ class RoundRobin : public LoadBalancingPolicy { /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker_; /** Index into subchannel_list_ for last pick. */ - size_t last_ready_subchannel_index_ = 0; + size_t last_ready_subchannel_index_ = 0; // FIXME: set to -1? }; RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) { @@ -338,12 +336,7 @@ void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, void RoundRobin::StartPickingLocked() { started_picking_ = true; - for (size_t i = 0; i < subchannel_list_->num_subchannels(); i++) { - if (subchannel_list_->subchannel(i)->subchannel() != nullptr) { - subchannel_list_->RefForConnectivityWatch("connectivity_watch"); - subchannel_list_->subchannel(i)->StartConnectivityWatchLocked(); - } - } + subchannel_list_->StartWatchingLocked(); } void RoundRobin::ExitIdleLocked() { @@ -377,6 +370,15 @@ bool RoundRobin::DoPickLocked(PickState* pick) { return false; } +void RoundRobin::DrainPendingPicksLocked() { + PickState* pick; + while ((pick = pending_picks_)) { + pending_picks_ = pick->next; + GPR_ASSERT(DoPickLocked(pick)); + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + } +} + bool RoundRobin::PickLocked(PickState* pick) { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Trying to pick (shutdown: %d)", this, @@ -395,44 +397,6 @@ bool RoundRobin::PickLocked(PickState* pick) { return false; } -/** Sets the policy's connectivity status based on that of the passed-in \a sd - * (the grpc_lb_subchannel_data associated with the updated subchannel) and the - * subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used - * only if the policy transitions to state TRANSIENT_FAILURE. */ -void RoundRobin::UpdateConnectivityStateLocked(grpc_connectivity_state state, - grpc_error* error) { - /* In priority order. The first rule to match terminates the search (ie, if we - * are on rule n, all previous rules were unfulfilled). - * - * 1) RULE: ANY subchannel is READY => policy is READY. - * CHECK: subchannel_list->num_ready > 0. - * - * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING. - * CHECK: sd->curr_connectivity_state == CONNECTING. - * - * 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is - * TRANSIENT_FAILURE. - * CHECK: subchannel_list->num_transient_failures == - * subchannel_list->num_subchannels. - */ - if (subchannel_list_->num_ready() > 0) { - /* 1) READY */ - grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_READY, - GRPC_ERROR_NONE, "rr_ready"); - } else if (state == GRPC_CHANNEL_CONNECTING) { - /* 2) CONNECTING */ - grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_CONNECTING, - GRPC_ERROR_NONE, "rr_connecting"); - } else if (subchannel_list_->num_transient_failure() == - subchannel_list_->num_subchannels()) { - /* 3) TRANSIENT_FAILURE */ - grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), - "rr_exhausted_subchannels"); - } - GRPC_ERROR_UNREF(error); -} - void RoundRobin::RoundRobinSubchannelList::RefForConnectivityWatch( const char* reason) { // TODO(roth): We currently track these refs manually. Once the new @@ -454,6 +418,24 @@ void RoundRobin::RoundRobinSubchannelList::UnrefForConnectivityWatch( Unref(DEBUG_LOCATION, reason); } +void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() { +// FIXME: consider moving this to SubchannelList ctor +// FIXME: add explanatory comment +gpr_log(GPR_INFO, "BEFORE: CheckConnectivityStateLocked loop"); + for (size_t i = 0; i < num_subchannels(); ++i) { + subchannel(i)->CheckConnectivityStateLocked(); + } +gpr_log(GPR_INFO, "AFTER: CheckConnectivityStateLocked loop"); + initialized_ = true; + UpdateOverallStateLocked(); + for (size_t i = 0; i < num_subchannels(); i++) { + if (subchannel(i)->subchannel() != nullptr) { + RefForConnectivityWatch("connectivity_watch"); + subchannel(i)->StartConnectivityWatchLocked(); + } + } +} + void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked( grpc_connectivity_state old_state, grpc_connectivity_state new_state) { GPR_ASSERT(old_state != GRPC_CHANNEL_SHUTDOWN); @@ -461,19 +443,94 @@ void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked( if (old_state == GRPC_CHANNEL_READY) { GPR_ASSERT(num_ready_ > 0); --num_ready_; + } else if (old_state == GRPC_CHANNEL_CONNECTING) { + GPR_ASSERT(num_connecting_ > 0); + --num_connecting_; } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { GPR_ASSERT(num_transient_failure_ > 0); --num_transient_failure_; - } else if (old_state == GRPC_CHANNEL_IDLE) { - GPR_ASSERT(num_idle_ > 0); - --num_idle_; } if (new_state == GRPC_CHANNEL_READY) { ++num_ready_; + } else if (new_state == GRPC_CHANNEL_CONNECTING) { + ++num_connecting_; } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { ++num_transient_failure_; - } else if (new_state == GRPC_CHANNEL_IDLE) { - ++num_idle_; + } +} + +/** Sets the policy's connectivity status based on that of the passed-in \a sd + * (the grpc_lb_subchannel_data associated with the updated subchannel) and the + * subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used + * only if the policy transitions to state TRANSIENT_FAILURE. */ +void RoundRobin::RoundRobinSubchannelList::UpdateConnectivityStateLocked() { + RoundRobin* p = static_cast(policy()); + if (p->subchannel_list_ != this) return; + /* In priority order. The first rule to match terminates the search (ie, if we + * are on rule n, all previous rules were unfulfilled). + * + * 1) RULE: ANY subchannel is READY => policy is READY. + * CHECK: subchannel_list->num_ready > 0. + * + * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING. + * CHECK: sd->curr_connectivity_state == CONNECTING. + * + * 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is + * TRANSIENT_FAILURE. + * CHECK: subchannel_list->num_transient_failures == + * subchannel_list->num_subchannels. + */ + if (num_ready_ > 0) { + /* 1) READY */ + grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY, + GRPC_ERROR_NONE, "rr_ready"); + } else if (num_connecting_ > 0) { + /* 2) CONNECTING */ + grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING, + GRPC_ERROR_NONE, "rr_connecting"); + } else if (num_transient_failure_ == num_subchannels()) { + /* 3) TRANSIENT_FAILURE */ + grpc_connectivity_state_set(&p->state_tracker_, + GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_NONE, // FIXME: GRPC_ERROR_REF(error), + "rr_exhausted_subchannels"); + } +// FIXME: GRPC_ERROR_UNREF(error); +} + +void RoundRobin::RoundRobinSubchannelList::UpdateOverallStateLocked() { + RoundRobin* p = static_cast(policy()); + if (num_ready_ > 0) { + if (p->subchannel_list_ != this) { + // Promote this list to p->subchannel_list_. + // This list must be p->latest_pending_subchannel_list_, because we + // any previous update would have been shut down already and + // therefore weeded out in ProcessConnectivityChangeLocked(). + GPR_ASSERT(p->latest_pending_subchannel_list_ == this); + GPR_ASSERT(!shutting_down()); + if (grpc_lb_round_robin_trace.enabled()) { + const size_t old_num_subchannels = + p->subchannel_list_ != nullptr + ? p->subchannel_list_->num_subchannels() + : 0; + gpr_log(GPR_DEBUG, + "[RR %p] phasing out subchannel list %p (size %" PRIuPTR + ") in favor of %p (size %" PRIuPTR ")", + p, p->subchannel_list_.get(), old_num_subchannels, this, + num_subchannels()); + } + if (p->subchannel_list_ != nullptr) { + // Dispose of the current subchannel_list. + p->subchannel_list_->ShutdownLocked("sl_phase_out_shutdown"); + } + p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); + } + // Drain pending picks. + p->DrainPendingPicksLocked(); + } + // Only update connectivity based on the selected subchannel list. + if (p->subchannel_list_ == this) { + UpdateConnectivityStateLocked(); } } @@ -494,6 +551,8 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( grpc_error_string(error)); } GPR_ASSERT(subchannel() != nullptr); +// FIXME: this check may not be needed, because subchannel_list should +// always be shutting down if policy is shutting down // If the policy is shutting down, unref and return. if (p->shutdown_) { StopConnectivityWatchLocked(); @@ -508,59 +567,28 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( subchannel_list()->UnrefForConnectivityWatch("rr_sl_shutdown"); return; } - GPR_ASSERT(connectivity_state() != GRPC_CHANNEL_SHUTDOWN); - // If we're still here, the notification must be for a subchannel in - // either the current or latest pending subchannel lists. - GPR_ASSERT(p->subchannel_list_ == subchannel_list() || - p->latest_pending_subchannel_list_ == subchannel_list()); - // If the sd's new state is TRANSIENT_FAILURE, unref the *connected* - // subchannel, if any. + // Process the state change. switch (connectivity_state()) { case GRPC_CHANNEL_TRANSIENT_FAILURE: { - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, - "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " - "Requesting re-resolution", - p, subchannel()); + // Only re-resolve if we're being called for a state update, not + // for initialization. Otherwise, if the subchannel was already + // in state TRANSIENT_FAILURE when the subchannel list was + // created, we'd wind up in a constant loop of re-resolution. + if (subchannel_list()->initialized()) { + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_DEBUG, + "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " + "Requesting re-resolution", + p, subchannel()); + } + p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE); } - p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE); break; } case GRPC_CHANNEL_READY: { if (connected_subchannel() == nullptr) { SetConnectedSubchannelFromSubchannelLocked(); } - if (p->subchannel_list_ != subchannel_list()) { - // promote subchannel_list() to p->subchannel_list_. - // subchannel_list() must be equal to - // p->latest_pending_subchannel_list_ because we have already filtered - // for subchannels belonging to outdated subchannel lists. - GPR_ASSERT(p->latest_pending_subchannel_list_ == subchannel_list()); - GPR_ASSERT(!subchannel_list()->shutting_down()); - if (grpc_lb_round_robin_trace.enabled()) { - const size_t num_subchannels = - p->subchannel_list_ != nullptr - ? p->subchannel_list_->num_subchannels() - : 0; - gpr_log(GPR_DEBUG, - "[RR %p] phasing out subchannel list %p (size %" PRIuPTR - ") in favor of %p (size %" PRIuPTR ")", - p, p->subchannel_list_.get(), num_subchannels, - subchannel_list(), subchannel_list()->num_subchannels()); - } - if (p->subchannel_list_ != nullptr) { - // dispose of the current subchannel_list - p->subchannel_list_->ShutdownLocked("sl_phase_out_shutdown"); - } - p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); - } - // Drain pending picks. - PickState* pick; - while ((pick = p->pending_picks_)) { - p->pending_picks_ = pick->next; - GPR_ASSERT(p->DoPickLocked(pick)); - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); - } break; } case GRPC_CHANNEL_SHUTDOWN: @@ -572,13 +600,11 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( subchannel_list()->UpdateStateCountersLocked(prev_connectivity_state_, connectivity_state()); prev_connectivity_state_ = connectivity_state(); - // Only update connectivity based on the selected subchannel list. - if (p->subchannel_list_ == subchannel_list()) { - p->UpdateConnectivityStateLocked(connectivity_state(), - GRPC_ERROR_REF(error)); + // If not initializing, update overall state and renew notification. + if (subchannel_list()->initialized()) { + subchannel_list()->UpdateOverallStateLocked(); + StartConnectivityWatchLocked(); } - // Renew notification. - StartConnectivityWatchLocked(); } grpc_connectivity_state RoundRobin::CheckConnectivityLocked( @@ -621,75 +647,41 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args) { } return; } - grpc_lb_addresses* addresses = (grpc_lb_addresses*)arg->value.pointer.p; + grpc_lb_addresses* addresses = + static_cast(arg->value.pointer.p); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", this, addresses->num_addresses); } - auto subchannel_list = MakeRefCounted( - this, &grpc_lb_round_robin_trace, addresses, combiner(), - client_channel_factory(), args); - if (subchannel_list->num_subchannels() == 0) { - grpc_connectivity_state_set( - &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), - "rr_update_empty"); - if (subchannel_list_ != nullptr) { - subchannel_list_->ShutdownLocked("sl_shutdown_empty_update"); + // Replace latest_pending_subchannel_list_. + if (latest_pending_subchannel_list_ != nullptr) { + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_DEBUG, + "[RR %p] Shutting down previous pending subchannel list %p", + this, latest_pending_subchannel_list_.get()); } - subchannel_list_ = std::move(subchannel_list); // empty list - return; + latest_pending_subchannel_list_->ShutdownLocked("sl_outdated"); } - if (started_picking_) { - for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { -// FIXME: this is wrong, because we should not reset -// curr_connectivity_state_ or pending_connectivity_state_unsafe_ unless -// the new state is TRANSIENT_FAILURE - const grpc_connectivity_state subchannel_state = - subchannel_list->subchannel(i)->CheckConnectivityStateLocked(); - // Override the default setting of IDLE for connectivity notification - // purposes if the subchannel is already in transient failure. Otherwise - // we'd be immediately notified of the IDLE-TRANSIENT_FAILURE - // discrepancy, attempt to re-resolve, and end up here again. -// FIXME: do this - // TODO(roth): As part of C++-ifying the subchannel_list API, design a - // better API for notifying the LB policy of subchannel states, which can - // be used both for the subchannel's initial state and for subsequent - // state changes. This will allow us to handle this more generally instead - // of special-casing TRANSIENT_FAILURE (e.g., we can also distribute any - // pending picks across all READY subchannels rather than sending them all - // to the first one). - if (subchannel_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - subchannel_list->UpdateStateCountersLocked(GRPC_CHANNEL_IDLE, - subchannel_state); - } - } - for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { - /* Watch every new subchannel. A subchannel list becomes active the - * moment one of its subchannels is READY. At that moment, we swap - * p->subchannel_list for sd->subchannel_list, provided the subchannel - * list is still valid (ie, isn't shutting down) */ - subchannel_list->RefForConnectivityWatch("connectivity_watch"); - subchannel_list->subchannel(i)->StartConnectivityWatchLocked(); - } - if (latest_pending_subchannel_list_ != nullptr) { - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, - "[RR %p] Shutting down latest pending subchannel list %p, " - "about to be replaced by newer latest %p", - this, latest_pending_subchannel_list_.get(), - subchannel_list.get()); - } - latest_pending_subchannel_list_->ShutdownLocked("sl_outdated"); + latest_pending_subchannel_list_ = MakeRefCounted( + this, &grpc_lb_round_robin_trace, addresses, combiner(), + client_channel_factory(), args); + // If we haven't started picking yet or the new list is empty, + // immediately promote the new list to the current list. + if (!started_picking_ || + latest_pending_subchannel_list_->num_subchannels() == 0) { + if (latest_pending_subchannel_list_->num_subchannels() == 0) { + grpc_connectivity_state_set( + &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), + "rr_update_empty"); } - latest_pending_subchannel_list_ = std::move(subchannel_list); - } else { - // The policy isn't picking yet. Save the update for later, disposing of - // previous version if any. if (subchannel_list_ != nullptr) { - subchannel_list_->ShutdownLocked("rr_update_before_started_picking"); + subchannel_list_->ShutdownLocked("sl_shutdown_replace_on_update"); } - subchannel_list_ = std::move(subchannel_list); + subchannel_list_ = std::move(latest_pending_subchannel_list_); + } else { + // If we've started picking, start watching the new list. + latest_pending_subchannel_list_->StartWatchingLocked(); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 0824fe373c6..c643f4cb4b4 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -79,11 +79,15 @@ class SubchannelData { return curr_connectivity_state_; } - virtual grpc_connectivity_state CheckConnectivityStateLocked() { + void CheckConnectivityStateLocked() { + GPR_ASSERT(!connectivity_notification_pending_); + grpc_error* error = GRPC_ERROR_NONE; pending_connectivity_state_unsafe_ = - grpc_subchannel_check_connectivity(subchannel(), nullptr); - curr_connectivity_state_ = pending_connectivity_state_unsafe_; - return curr_connectivity_state_; + grpc_subchannel_check_connectivity(subchannel(), &error); + if (pending_connectivity_state_unsafe_ != curr_connectivity_state_) { + curr_connectivity_state_ = pending_connectivity_state_unsafe_; + ProcessConnectivityChangeLocked(error); + } } // Unrefs the subchannel.