From 1cd6e69347cbf62a012477fe184ee6fa8f25d32c Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 26 Apr 2022 15:25:47 -0700 Subject: [PATCH] subchannel: report IDLE upon existing connection failure and after backoff interval (#29428) * subchannel: report IDLE upon existing connection failure and after backoff interval * rename AttemptToConnect() to RequestConnection() * clang-format * fix unused parameter warning * fix subchannel to handle either TF or SHUTDOWN from transport * fix handling of ConnectedSubchannel failure * pass status up in IDLE state to communicate keepalive info * update comment * split pick_first and round_robin tests into their own test suites * improve log message * add test * clang-format * appease clang-tidy * fix test to do a poor man's graceful shutdown to avoid spurious RPC failures * simplify round_robin logic and fix test flakes * fix grpclb bug --- .../filters/client_channel/client_channel.cc | 2 +- .../client_channel/lb_policy/grpclb/grpclb.cc | 2 +- .../lb_policy/pick_first/pick_first.cc | 70 ++- .../lb_policy/ring_hash/ring_hash.cc | 27 +- .../lb_policy/round_robin/round_robin.cc | 313 ++++++------- .../ext/filters/client_channel/subchannel.cc | 243 +++++----- .../ext/filters/client_channel/subchannel.h | 50 +- .../client_channel/subchannel_interface.h | 10 +- src/core/lib/surface/server.cc | 9 + src/core/lib/surface/server.h | 2 + test/cpp/end2end/client_lb_end2end_test.cc | 439 ++++++++++++------ test/cpp/end2end/connection_delay_injector.cc | 3 + test/cpp/end2end/connection_delay_injector.h | 3 +- 13 files changed, 647 insertions(+), 526 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 592a1b0683e..e8cc467c9e8 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -525,7 +525,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { return subchannel_->connected_subchannel(); } - void AttemptToConnect() override { subchannel_->AttemptToConnect(); } + void RequestConnection() override { subchannel_->RequestConnection(); } void ResetBackoff() override { subchannel_->ResetBackoff(); } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index cd8e3e37136..089688fcdb8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1239,6 +1239,7 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( // If the fallback-at-startup checks are pending, go into fallback mode // immediately. This short-circuits the timeout for the fallback-at-startup // case. + grpclb_policy()->lb_calld_.reset(); if (grpclb_policy()->fallback_at_startup_checks_pending_) { GPR_ASSERT(!seen_serverlist_); gpr_log(GPR_INFO, @@ -1254,7 +1255,6 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( // This handles the fallback-after-startup case. grpclb_policy()->MaybeEnterFallbackModeAfterStartup(); } - grpclb_policy()->lb_calld_.reset(); GPR_ASSERT(!grpclb_policy()->shutting_down_); grpclb_policy()->channel_control_helper()->RequestReresolution(); if (seen_initial_response_) { diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 02ae76bc22d..da6ced25a8f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -201,6 +201,10 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() { channel_control_helper()->UpdateState( GRPC_CHANNEL_TRANSIENT_FAILURE, status, absl::make_unique(status)); + // If there was a previously pending update (which may or may + // not have contained the currently selected subchannel), drop + // it, so that it doesn't override what we've done here. + latest_pending_subchannel_list_.reset(); return; } // If one of the subchannels in the new list is already in state @@ -232,7 +236,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() { // here, since we've already checked the initial connectivity // state of all subchannels above. subchannel_list_->subchannel(0)->StartConnectivityWatchLocked(); - subchannel_list_->subchannel(0)->subchannel()->AttemptToConnect(); + subchannel_list_->subchannel(0)->subchannel()->RequestConnection(); } else { // We do have a selected subchannel (which means it's READY), so keep // using it until one of the subchannels in the new list reports READY. @@ -255,7 +259,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() { ->StartConnectivityWatchLocked(); latest_pending_subchannel_list_->subchannel(0) ->subchannel() - ->AttemptToConnect(); + ->RequestConnection(); } } @@ -307,10 +311,13 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( "Pick First %p selected subchannel connectivity changed to %s", p, ConnectivityStateName(connectivity_state)); } - // If the new state is anything other than READY and there is a - // pending update, switch to the pending update. - if (connectivity_state != GRPC_CHANNEL_READY && - p->latest_pending_subchannel_list_ != nullptr) { + // We might miss a connectivity state update between calling + // CheckConnectivityStateLocked() and StartConnectivityWatchLocked(). + // If the new state is READY, just ignore it; otherwise, regardless of + // what state it is, we treat it as a failure of the existing connection. + if (connectivity_state == GRPC_CHANNEL_READY) return; + // If there is a pending update, switch to the pending update. + if (p->latest_pending_subchannel_list_ != nullptr) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { gpr_log(GPR_INFO, "Pick First %p promoting pending subchannel list %p to " @@ -335,38 +342,19 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( absl::make_unique( p->Ref(DEBUG_LOCATION, "QueuePicker"))); } - } else { - if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - // If the selected subchannel goes bad, request a re-resolution. We - // also set the channel state to IDLE. The reason is that if the new - // state is TRANSIENT_FAILURE due to a GOAWAY reception we don't want - // to connect to the re-resolved backends until we leave IDLE state. - // TODO(qianchengz): We may want to request re-resolution in - // ExitIdleLocked(). - p->idle_ = true; - p->channel_control_helper()->RequestReresolution(); - p->selected_ = nullptr; - p->subchannel_list_.reset(); - p->channel_control_helper()->UpdateState( - GRPC_CHANNEL_IDLE, absl::Status(), - absl::make_unique( - p->Ref(DEBUG_LOCATION, "QueuePicker"))); - } else { - // This is unlikely but can happen when a subchannel has been asked - // to reconnect by a different channel and this channel has dropped - // some connectivity state notifications. - if (connectivity_state == GRPC_CHANNEL_READY) { - p->channel_control_helper()->UpdateState( - GRPC_CHANNEL_READY, absl::Status(), - absl::make_unique(subchannel()->Ref())); - } else { // CONNECTING - p->channel_control_helper()->UpdateState( - connectivity_state, absl::Status(), - absl::make_unique( - p->Ref(DEBUG_LOCATION, "QueuePicker"))); - } - } + return; } + // If the selected subchannel goes bad, request a re-resolution. + // TODO(qianchengz): We may want to request re-resolution in + // ExitIdleLocked(). + p->channel_control_helper()->RequestReresolution(); + // Enter idle. + p->idle_ = true; + p->selected_ = nullptr; + p->subchannel_list_.reset(); + p->channel_control_helper()->UpdateState( + GRPC_CHANNEL_IDLE, absl::Status(), + absl::make_unique(p->Ref(DEBUG_LOCATION, "QueuePicker"))); return; } // If we get here, there are two possible cases: @@ -384,7 +372,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( ProcessUnselectedReadyLocked(); break; } - case GRPC_CHANNEL_TRANSIENT_FAILURE: { + case GRPC_CHANNEL_TRANSIENT_FAILURE: + case GRPC_CHANNEL_IDLE: { CancelConnectivityWatchLocked("connection attempt failed"); PickFirstSubchannelData* sd = this; size_t next_index = @@ -428,8 +417,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( sd->CheckConnectivityStateAndStartWatchingLocked(); break; } - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE: { + case GRPC_CHANNEL_CONNECTING: { // Only update connectivity state in case 1, and only if we're not // already in TRANSIENT_FAILURE. if (subchannel_list() == p->subchannel_list_.get() && @@ -499,7 +487,7 @@ void PickFirst::PickFirstSubchannelData:: if (current_state == GRPC_CHANNEL_READY) { if (p->selected_ != this) ProcessUnselectedReadyLocked(); } else { - subchannel()->AttemptToConnect(); + subchannel()->RequestConnection(); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc index 9780b85479c..5b63d840437 100644 --- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc +++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc @@ -273,7 +273,7 @@ class RingHash : public LoadBalancingPolicy { [self]() { if (!self->ring_hash_lb_->shutdown_) { for (auto& subchannel : self->subchannels_) { - subchannel->AttemptToConnect(); + subchannel->RequestConnection(); } } delete self; @@ -648,7 +648,7 @@ void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked( num_subchannels()); } internally_triggered_connection_index_ = next_index; - subchannel(next_index)->subchannel()->AttemptToConnect(); + subchannel(next_index)->subchannel()->RequestConnection(); } } @@ -676,24 +676,11 @@ void RingHash::RingHashSubchannelData::UpdateConnectivityStateLocked( } // Decide what state to report for the purposes of aggregation and // picker behavior. - // If we haven't seen a failure since the last time we were in state - // READY, then we report the state change as-is. However, once we do see - // a failure, we report TRANSIENT_FAILURE and do not report any subsequent - // state changes until we go back into state READY. - if (last_connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) { - // If not transitioning to READY, ignore the update, since we want - // to continue to consider ourselves in TRANSIENT_FAILURE. - if (connectivity_state != GRPC_CHANNEL_READY) return; - } else if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - // If we go from READY to TF, treat it as IDLE. - // This transition can be caused by a "normal" connection failure, such - // as the server closing the connection due to a max-age setting. In - // this case, we want to have RPCs that hash to this subchannel wait for - // the reconnection attempt rather than assuming that the subchannel is - // bad and moving on to a subsequent subchannel in the ring. - if (last_connectivity_state_ == GRPC_CHANNEL_READY) { - connectivity_state = GRPC_CHANNEL_IDLE; - } + // If the last recorded state was TRANSIENT_FAILURE, ignore the update + // unless the new state is READY. + if (last_connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && + connectivity_state != GRPC_CHANNEL_READY) { + return; } // Update state counters used for aggregation. subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_, diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 1946a878072..0a9a0be4c93 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -75,14 +75,15 @@ class RoundRobin : public LoadBalancingPolicy { : SubchannelData(subchannel_list, address, std::move(subchannel)) {} grpc_connectivity_state connectivity_state() const { - return last_connectivity_state_; + return logical_connectivity_state_; } - bool seen_failure_since_ready() const { return seen_failure_since_ready_; } - - // Performs connectivity state updates that need to be done both when we - // first start watching and when a watcher notification is received. - void UpdateConnectivityStateLocked( + // Computes and updates the logical connectivity state of the subchannel. + // Note that the logical connectivity state may differ from the + // actual reported state in some cases (e.g., after we see + // TRANSIENT_FAILURE, we ignore any subsequent state changes until + // we see READY). Returns true if the state changed. + bool UpdateLogicalConnectivityStateLocked( grpc_connectivity_state connectivity_state); private: @@ -91,8 +92,7 @@ class RoundRobin : public LoadBalancingPolicy { void ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state) override; - grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE; - bool seen_failure_since_ready_ = false; + grpc_connectivity_state logical_connectivity_state_ = GRPC_CHANNEL_IDLE; }; // A list of subchannels. @@ -117,23 +117,27 @@ class RoundRobin : public LoadBalancingPolicy { } // Starts watching the subchannels in this list. - void StartWatchingLocked(); + void StartWatchingLocked(absl::Status status_for_tf); // Updates the counters of subchannels in each state when a // subchannel transitions from old_state to new_state. void UpdateStateCountersLocked(grpc_connectivity_state old_state, grpc_connectivity_state new_state); - // If this subchannel list is the RR policy's current subchannel - // list, updates the RR policy's connectivity state based on the - // subchannel list's state counters. - void MaybeUpdateRoundRobinConnectivityStateLocked(); - - // Updates the RR policy's overall state based on the counters of - // subchannels in each state. - void UpdateRoundRobinStateFromSubchannelStateCountsLocked(); + // Ensures that the right subchannel list is used and then updates + // the RR policy's connectivity state based on the subchannel list's + // state counters. + void MaybeUpdateRoundRobinConnectivityStateLocked( + absl::Status status_for_tf); private: + std::string CountersString() const { + return absl::StrCat("num_subchannels=", num_subchannels(), + " num_ready=", num_ready_, + " num_connecting=", num_connecting_, + " num_transient_failure=", num_transient_failure_); + } + size_t num_ready_ = 0; size_t num_connecting_ = 0; size_t num_transient_failure_ = 0; @@ -238,8 +242,45 @@ void RoundRobin::ResetBackoffLocked() { } } -void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() { - if (num_subchannels() == 0) return; +void RoundRobin::UpdateLocked(UpdateArgs args) { + ServerAddressList addresses; + if (args.addresses.ok()) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses", + this, args.addresses->size()); + } + addresses = std::move(*args.addresses); + } else { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this, + args.addresses.status().ToString().c_str()); + } + // If we already have a subchannel list, then ignore the resolver + // failure and keep using the existing list. + if (subchannel_list_ != nullptr) return; + } + // Create new subchannel list, replacing the previous pending list, if any. + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) && + latest_pending_subchannel_list_ != nullptr) { + gpr_log(GPR_INFO, "[RR %p] replacing previous pending subchannel list %p", + this, latest_pending_subchannel_list_.get()); + } + latest_pending_subchannel_list_ = MakeOrphanable( + this, &grpc_lb_round_robin_trace, std::move(addresses), *args.args); + // Start watching the new list. If appropriate, this will cause it to be + // immediately promoted to subchannel_list_ and to generate a new picker. + latest_pending_subchannel_list_->StartWatchingLocked( + args.addresses.ok() ? absl::UnavailableError(absl::StrCat( + "empty address list: ", args.resolution_note)) + : args.addresses.status()); +} + +// +// RoundRobinSubchannelList +// + +void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked( + absl::Status status_for_tf) { // Check current state of each subchannel synchronously, since any // subchannel already used by some other channel may have a non-IDLE // state. @@ -247,18 +288,18 @@ void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() { grpc_connectivity_state state = subchannel(i)->CheckConnectivityStateLocked(); if (state != GRPC_CHANNEL_IDLE) { - subchannel(i)->UpdateConnectivityStateLocked(state); + subchannel(i)->UpdateLogicalConnectivityStateLocked(state); } } // Start connectivity watch for each subchannel. for (size_t i = 0; i < num_subchannels(); i++) { if (subchannel(i)->subchannel() != nullptr) { subchannel(i)->StartConnectivityWatchLocked(); - subchannel(i)->subchannel()->AttemptToConnect(); + subchannel(i)->subchannel()->RequestConnection(); } } - // Now set the LB policy's state based on the subchannels' states. - UpdateRoundRobinStateFromSubchannelStateCountsLocked(); + // Update RR connectivity state if needed. + MaybeUpdateRoundRobinConnectivityStateLocked(status_for_tf); } void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked( @@ -284,80 +325,73 @@ void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked( } } -// Sets the RR policy's connectivity state and generates a new picker based -// on the current subchannel list. void RoundRobin::RoundRobinSubchannelList:: - MaybeUpdateRoundRobinConnectivityStateLocked() { + MaybeUpdateRoundRobinConnectivityStateLocked(absl::Status status_for_tf) { RoundRobin* p = static_cast(policy()); + // If this is latest_pending_subchannel_list_, then swap it into + // subchannel_list_ in the following cases: + // - subchannel_list_ is null (i.e., this is the first update). + // - subchannel_list_ has no READY subchannels. + // - This list has at least one READY subchannel. + // - All of the subchannels in this list are in TRANSIENT_FAILURE, or + // the list is empty. (This may cause the channel to go from READY + // to TRANSIENT_FAILURE, but we're doing what the control plane told + // us to do. + if (p->latest_pending_subchannel_list_.get() == this && + (p->subchannel_list_ == nullptr || p->subchannel_list_->num_ready_ == 0 || + num_ready_ > 0 || + // Note: num_transient_failure_ and num_subchannels() may both be 0. + num_transient_failure_ == num_subchannels())) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { + const std::string old_counters_string = + p->subchannel_list_ != nullptr ? p->subchannel_list_->CountersString() + : ""; + gpr_log( + GPR_INFO, + "[RR %p] swapping out subchannel list %p (%s) in favor of %p (%s)", p, + p->subchannel_list_.get(), old_counters_string.c_str(), this, + CountersString().c_str()); + } + p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); + } // Only set connectivity state if this is the current subchannel list. if (p->subchannel_list_.get() != this) return; - // In priority order. The first rule to match terminates the search (ie, if we - // are on rule n, all previous rules were unfulfilled). - // - // 1) RULE: ANY subchannel is READY => policy is READY. - // CHECK: subchannel_list->num_ready > 0. - // - // 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING. - // CHECK: sd->curr_connectivity_state == CONNECTING. - // - // 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is - // TRANSIENT_FAILURE. - // CHECK: subchannel_list->num_transient_failures == - // subchannel_list->num_subchannels. + // First matching rule wins: + // 1) ANY subchannel is READY => policy is READY. + // 2) ANY subchannel is CONNECTING => policy is CONNECTING. + // 3) ALL subchannels are TRANSIENT_FAILURE => policy is TRANSIENT_FAILURE. if (num_ready_ > 0) { - // 1) READY + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, "[RR %p] reporting READY with subchannel list %p", p, + this); + } p->channel_control_helper()->UpdateState( GRPC_CHANNEL_READY, absl::Status(), absl::make_unique(p, this)); } else if (num_connecting_ > 0) { - // 2) CONNECTING + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, "[RR %p] reporting CONNECTING with subchannel list %p", + p, this); + } p->channel_control_helper()->UpdateState( GRPC_CHANNEL_CONNECTING, absl::Status(), absl::make_unique(p->Ref(DEBUG_LOCATION, "QueuePicker"))); } else if (num_transient_failure_ == num_subchannels()) { - // 3) TRANSIENT_FAILURE - absl::Status status = - absl::UnavailableError("connections to all backends failing"); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, + "[RR %p] reporting TRANSIENT_FAILURE with subchannel list %p: %s", + p, this, status_for_tf.ToString().c_str()); + } p->channel_control_helper()->UpdateState( - GRPC_CHANNEL_TRANSIENT_FAILURE, status, - absl::make_unique(status)); + GRPC_CHANNEL_TRANSIENT_FAILURE, status_for_tf, + absl::make_unique(status_for_tf)); } } -void RoundRobin::RoundRobinSubchannelList:: - UpdateRoundRobinStateFromSubchannelStateCountsLocked() { - RoundRobin* p = static_cast(policy()); - // If we have at least one READY subchannel, then swap to the new list. - // Also, if all of the subchannels are in TRANSIENT_FAILURE, then we know - // we've tried all of them and failed, so we go ahead and swap over - // anyway; this may cause the channel to go from READY to TRANSIENT_FAILURE, - // but we are doing what the control plane told us to do. - if (num_ready_ > 0 || num_transient_failure_ == num_subchannels()) { - if (p->subchannel_list_.get() != this) { - // Promote this list to p->subchannel_list_. - // This list must be p->latest_pending_subchannel_list_, because - // any previous update would have been shut down already and - // therefore we would not be receiving a notification for them. - GPR_ASSERT(p->latest_pending_subchannel_list_.get() == this); - GPR_ASSERT(!shutting_down()); - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { - const size_t old_num_subchannels = - p->subchannel_list_ != nullptr - ? p->subchannel_list_->num_subchannels() - : 0; - gpr_log(GPR_INFO, - "[RR %p] phasing out subchannel list %p (size %" PRIuPTR - ") in favor of %p (size %" PRIuPTR ")", - p, p->subchannel_list_.get(), old_num_subchannels, this, - num_subchannels()); - } - p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); - } - } - // Update the RR policy's connectivity state if needed. - MaybeUpdateRoundRobinConnectivityStateLocked(); -} +// +// RoundRobinSubchannelData +// -void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked( +bool RoundRobin::RoundRobinSubchannelData::UpdateLogicalConnectivityStateLocked( grpc_connectivity_state connectivity_state) { RoundRobin* p = static_cast(subchannel_list()->policy()); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { @@ -367,115 +401,74 @@ void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked( "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s", p, subchannel(), subchannel_list(), Index(), subchannel_list()->num_subchannels(), - ConnectivityStateName(last_connectivity_state_), + ConnectivityStateName(logical_connectivity_state_), ConnectivityStateName(connectivity_state)); } // Decide what state to report for aggregation purposes. - // If we haven't seen a failure since the last time we were in state - // READY, then we report the state change as-is. However, once we do see - // a failure, we report TRANSIENT_FAILURE and do not report any subsequent - // state changes until we go back into state READY. - if (!seen_failure_since_ready_) { - if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - seen_failure_since_ready_ = true; - } - subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_, - connectivity_state); - } else { - if (connectivity_state == GRPC_CHANNEL_READY) { - seen_failure_since_ready_ = false; - subchannel_list()->UpdateStateCountersLocked( - GRPC_CHANNEL_TRANSIENT_FAILURE, connectivity_state); + // If the last logical state was TRANSIENT_FAILURE, then ignore the + // state change unless the new state is READY. + if (logical_connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && + connectivity_state != GRPC_CHANNEL_READY) { + return false; + } + // If the new state is IDLE, treat it as CONNECTING, since it will + // immediately transition into CONNECTING anyway. + if (connectivity_state == GRPC_CHANNEL_IDLE) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, + "[RR %p] subchannel %p, subchannel_list %p (index %" PRIuPTR + " of %" PRIuPTR "): treating IDLE as CONNECTING", + p, subchannel(), subchannel_list(), Index(), + subchannel_list()->num_subchannels()); } + connectivity_state = GRPC_CHANNEL_CONNECTING; } - // Record last seen connectivity state. - last_connectivity_state_ = connectivity_state; + // If no change, return false. + if (logical_connectivity_state_ == connectivity_state) return false; + // Otherwise, update counters and logical state. + subchannel_list()->UpdateStateCountersLocked(logical_connectivity_state_, + connectivity_state); + logical_connectivity_state_ = connectivity_state; + return true; } void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state) { RoundRobin* p = static_cast(subchannel_list()->policy()); GPR_ASSERT(subchannel() != nullptr); - // If the new state is TRANSIENT_FAILURE, re-resolve. + // If the new state is TRANSIENT_FAILURE or IDLE, re-resolve. // Only do this if we've started watching, not at startup time. // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE // when the subchannel list was created, we'd wind up in a constant // loop of re-resolution. // Also attempt to reconnect. - if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE || + connectivity_state == GRPC_CHANNEL_IDLE) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, - "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " - "Requesting re-resolution", - p, subchannel()); + "[RR %p] Subchannel %p reported %s; requesting re-resolution", p, + subchannel(), ConnectivityStateName(connectivity_state)); } p->channel_control_helper()->RequestReresolution(); - subchannel()->AttemptToConnect(); + subchannel()->RequestConnection(); } - // Update state counters. - UpdateConnectivityStateLocked(connectivity_state); - // Update overall state and renew notification. - subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked(); -} - -void RoundRobin::UpdateLocked(UpdateArgs args) { - ServerAddressList addresses; - if (args.addresses.ok()) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { - gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses", - this, args.addresses->size()); - } - addresses = std::move(*args.addresses); - } else { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { - gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this, - args.addresses.status().ToString().c_str()); - } - // If we already have a subchannel list, then ignore the resolver - // failure and keep using the existing list. - if (subchannel_list_ != nullptr) return; - } - // Replace latest_pending_subchannel_list_. - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) && - latest_pending_subchannel_list_ != nullptr) { - gpr_log(GPR_INFO, - "[RR %p] Shutting down previous pending subchannel list %p", this, - latest_pending_subchannel_list_.get()); - } - latest_pending_subchannel_list_ = MakeOrphanable( - this, &grpc_lb_round_robin_trace, std::move(addresses), *args.args); - if (latest_pending_subchannel_list_->num_subchannels() == 0) { - // If the new list is empty, immediately promote the new list to the - // current list and transition to TRANSIENT_FAILURE. - absl::Status status = - args.addresses.ok() ? absl::UnavailableError(absl::StrCat( - "empty address list: ", args.resolution_note)) - : args.addresses.status(); - channel_control_helper()->UpdateState( - GRPC_CHANNEL_TRANSIENT_FAILURE, status, - absl::make_unique(status)); - subchannel_list_ = std::move(latest_pending_subchannel_list_); - } else if (subchannel_list_ == nullptr) { - // If there is no current list, immediately promote the new list to - // the current list and start watching it. - subchannel_list_ = std::move(latest_pending_subchannel_list_); - subchannel_list_->StartWatchingLocked(); - } else { - // Start watching the pending list. It will get swapped into the - // current list when it reports READY. - latest_pending_subchannel_list_->StartWatchingLocked(); + // Update logical connectivity state. + // If it changed, update the policy state. + if (UpdateLogicalConnectivityStateLocked(connectivity_state)) { + subchannel_list()->MaybeUpdateRoundRobinConnectivityStateLocked( + absl::UnavailableError("connections to all backends failing")); } } +// +// factory +// + class RoundRobinConfig : public LoadBalancingPolicy::Config { public: const char* name() const override { return kRoundRobin; } }; -// -// factory -// - class RoundRobinFactory : public LoadBalancingPolicyFactory { public: OrphanablePtr CreateLoadBalancingPolicy( diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 9957ae73ec2..8c09aecd999 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -308,43 +308,33 @@ class Subchannel::ConnectedSubchannelStateWatcher const absl::Status& status) override { Subchannel* c = subchannel_.get(); MutexLock lock(&c->mu_); - switch (new_state) { - case GRPC_CHANNEL_TRANSIENT_FAILURE: - case GRPC_CHANNEL_SHUTDOWN: { - if (!c->disconnected_ && c->connected_subchannel_ != nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) { - gpr_log(GPR_INFO, - "subchannel %p %s: Connected subchannel %p has gone into " - "%s. Attempting to reconnect.", - c, c->key_.ToString().c_str(), - c->connected_subchannel_.get(), - ConnectivityStateName(new_state)); - } - c->connected_subchannel_.reset(); - if (c->channelz_node() != nullptr) { - c->channelz_node()->SetChildSocket(nullptr); - } - // We need to construct our own status if the underlying state was - // shutdown since the accompanying status will be StatusCode::OK - // otherwise. - c->SetConnectivityStateLocked( - GRPC_CHANNEL_TRANSIENT_FAILURE, - new_state == GRPC_CHANNEL_SHUTDOWN - ? absl::Status(absl::StatusCode::kUnavailable, - "Subchannel has disconnected.") - : status); - c->backoff_begun_ = false; - c->backoff_.Reset(); - } - break; + // If we're either shutting down or have already seen this connection + // failure (i.e., c->connected_subchannel_ is null), do nothing. + // + // The transport reports TRANSIENT_FAILURE upon GOAWAY but SHUTDOWN + // upon connection close. So if the server gracefully shuts down, + // we will see TRANSIENT_FAILURE followed by SHUTDOWN, but if not, we + // will see only SHUTDOWN. Either way, we react to the first one we + // see, ignoring anything that happens after that. + if (c->connected_subchannel_ == nullptr) return; + if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE || + new_state == GRPC_CHANNEL_SHUTDOWN) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) { + gpr_log(GPR_INFO, + "subchannel %p %s: Connected subchannel %p reports %s: %s", c, + c->key_.ToString().c_str(), c->connected_subchannel_.get(), + ConnectivityStateName(new_state), status.ToString().c_str()); } - default: { - // In principle, this should never happen. We should not get - // a callback for READY, because that was the state we started - // this watch from. And a connected subchannel should never go - // from READY to CONNECTING or IDLE. - c->SetConnectivityStateLocked(new_state, status); + c->connected_subchannel_.reset(); + if (c->channelz_node() != nullptr) { + c->channelz_node()->SetChildSocket(nullptr); } + // Even though we're reporting IDLE instead of TRANSIENT_FAILURE here, + // pass along the status from the transport, since it may have + // keepalive info attached to it that the channel needs. + // TODO(roth): Consider whether there's a cleaner way to do this. + c->SetConnectivityStateLocked(GRPC_CHANNEL_IDLE, status); + c->backoff_.Reset(); } } @@ -563,6 +553,25 @@ Subchannel::HealthWatcherMap::CheckConnectivityStateLocked( void Subchannel::HealthWatcherMap::ShutdownLocked() { map_.clear(); } +// +// Subchannel::ConnectivityStateWatcherInterface +// + +void Subchannel::ConnectivityStateWatcherInterface::PushConnectivityStateChange( + ConnectivityStateChange state_change) { + MutexLock lock(&mu_); + connectivity_state_queue_.push_back(std::move(state_change)); +} + +Subchannel::ConnectivityStateWatcherInterface::ConnectivityStateChange +Subchannel::ConnectivityStateWatcherInterface::PopConnectivityStateChange() { + MutexLock lock(&mu_); + GPR_ASSERT(!connectivity_state_queue_.empty()); + ConnectivityStateChange state_change = connectivity_state_queue_.front(); + connectivity_state_queue_.pop_front(); + return state_change; +} + // // Subchannel // @@ -622,21 +631,6 @@ BackOff::Options ParseArgsForBackoffValues(const grpc_channel_args* args, } // namespace -void Subchannel::ConnectivityStateWatcherInterface::PushConnectivityStateChange( - ConnectivityStateChange state_change) { - MutexLock lock(&mu_); - connectivity_state_queue_.push_back(std::move(state_change)); -} - -Subchannel::ConnectivityStateWatcherInterface::ConnectivityStateChange -Subchannel::ConnectivityStateWatcherInterface::PopConnectivityStateChange() { - MutexLock lock(&mu_); - GPR_ASSERT(!connectivity_state_queue_.empty()); - ConnectivityStateChange state_change = connectivity_state_queue_.front(); - connectivity_state_queue_.pop_front(); - return state_change; -} - Subchannel::Subchannel(SubchannelKey key, OrphanablePtr connector, const grpc_channel_args* args) @@ -650,6 +644,7 @@ Subchannel::Subchannel(SubchannelKey key, GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(); GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this, nullptr); // Check proxy mapper to determine address to connect to and channel // args to use. address_for_connect_ = key_.address(); @@ -789,20 +784,20 @@ void Subchannel::CancelConnectivityStateWatch( } } -void Subchannel::AttemptToConnect() { +void Subchannel::RequestConnection() { MutexLock lock(&mu_); - MaybeStartConnectingLocked(); + if (state_ == GRPC_CHANNEL_IDLE) { + StartConnectingLocked(); + } else if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) { + connection_requested_ = true; + } } void Subchannel::ResetBackoff() { MutexLock lock(&mu_); backoff_.Reset(); - if (have_retry_alarm_) { - retry_immediately_ = true; - grpc_timer_cancel(&retry_alarm_); - } else { - backoff_begun_ = false; - MaybeStartConnectingLocked(); + if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) { + grpc_timer_cancel(&retry_timer_); } } @@ -814,8 +809,8 @@ void Subchannel::Orphan() { subchannel_pool_.reset(); } MutexLock lock(&mu_); - GPR_ASSERT(!disconnected_); - disconnected_ = true; + GPR_ASSERT(!shutdown_); + shutdown_ = true; connector_.reset(); connected_subchannel_.reset(); health_watcher_map_.ShutdownLocked(); @@ -885,77 +880,44 @@ void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state, health_watcher_map_.NotifyLocked(state, status); } -void Subchannel::MaybeStartConnectingLocked() { - if (disconnected_) { - // Don't try to connect if we're already disconnected. - return; - } - if (connecting_) { - // Already connecting: don't restart. - return; - } - if (connected_subchannel_ != nullptr) { - // Already connected: don't restart. - return; - } - connecting_ = true; - WeakRef(DEBUG_LOCATION, "connecting") - .release(); // ref held by pending connect - if (!backoff_begun_) { - backoff_begun_ = true; - ContinueConnectingLocked(); - } else { - GPR_ASSERT(!have_retry_alarm_); - have_retry_alarm_ = true; - const Duration time_til_next = - next_attempt_deadline_ - ExecCtx::Get()->Now(); - if (time_til_next <= Duration::Zero()) { - gpr_log(GPR_INFO, "subchannel %p %s: Retry immediately", this, - key_.ToString().c_str()); - } else { - gpr_log(GPR_INFO, "subchannel %p %s: Retry in %" PRId64 " milliseconds", - this, key_.ToString().c_str(), time_til_next.millis()); - } - GRPC_CLOSURE_INIT(&on_retry_alarm_, OnRetryAlarm, this, - grpc_schedule_on_exec_ctx); - grpc_timer_init(&retry_alarm_, next_attempt_deadline_, &on_retry_alarm_); +void Subchannel::OnRetryTimer(void* arg, grpc_error_handle /*error*/) { + WeakRefCountedPtr c(static_cast(arg)); + { + MutexLock lock(&c->mu_); + c->OnRetryTimerLocked(); } + c.reset(DEBUG_LOCATION, "RetryTimer"); } -void Subchannel::OnRetryAlarm(void* arg, grpc_error_handle error) { - WeakRefCountedPtr c(static_cast(arg)); - MutexLock lock(&c->mu_); - c->have_retry_alarm_ = false; - if (c->disconnected_) { - error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected", - &error, 1); - } else if (c->retry_immediately_) { - c->retry_immediately_ = false; - error = GRPC_ERROR_NONE; - } else { - (void)GRPC_ERROR_REF(error); - } - if (error == GRPC_ERROR_NONE) { +void Subchannel::OnRetryTimerLocked() { + if (shutdown_) return; + if (connection_requested_) { gpr_log(GPR_INFO, - "subchannel %p %s: failed to connect to channel, retrying", c.get(), - c->key_.ToString().c_str()); - c->ContinueConnectingLocked(); - // Still connecting, keep ref around. Note that this stolen ref won't - // be dropped without first acquiring c->mu_. - c.release(); + "subchannel %p %s: connection attempt requested while backoff " + "timer was pending, retrying now", + this, key_.ToString().c_str()); + connection_requested_ = false; + StartConnectingLocked(); + } else { + gpr_log(GPR_INFO, "subchannel %p %s: backoff delay elapsed, reporting IDLE", + this, key_.ToString().c_str()); + SetConnectivityStateLocked(GRPC_CHANNEL_IDLE, absl::OkStatus()); } - GRPC_ERROR_UNREF(error); } -void Subchannel::ContinueConnectingLocked() { +void Subchannel::StartConnectingLocked() { + // Set next attempt time. + const Timestamp min_deadline = min_connect_timeout_ + ExecCtx::Get()->Now(); + next_attempt_time_ = backoff_.NextAttemptTime(); + // Report CONNECTING. + SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, absl::OkStatus()); + // Start connection attempt. SubchannelConnector::Args args; args.address = &address_for_connect_; args.interested_parties = pollset_set_; - const Timestamp min_deadline = min_connect_timeout_ + ExecCtx::Get()->Now(); - next_attempt_deadline_ = backoff_.NextAttemptTime(); - args.deadline = std::max(next_attempt_deadline_, min_deadline); + args.deadline = std::max(next_attempt_time_, min_deadline); args.channel_args = args_; - SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, absl::Status()); + WeakRef(DEBUG_LOCATION, "Connect").release(); // Ref held by callback. connector_->Connect(args, &connecting_result_, &on_connecting_finished_); } @@ -965,19 +927,36 @@ void Subchannel::OnConnectingFinished(void* arg, grpc_error_handle error) { c->connecting_result_.channel_args; { MutexLock lock(&c->mu_); - c->connecting_ = false; - if (c->connecting_result_.transport != nullptr && - c->PublishTransportLocked()) { - // Do nothing, transport was published. - } else if (!c->disconnected_) { - gpr_log(GPR_INFO, "subchannel %p %s: connect failed: %s", c.get(), - c->key_.ToString().c_str(), grpc_error_std_string(error).c_str()); - c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, - grpc_error_to_absl_status(error)); - } + c->OnConnectingFinishedLocked(GRPC_ERROR_REF(error)); } grpc_channel_args_destroy(delete_channel_args); - c.reset(DEBUG_LOCATION, "connecting"); + c.reset(DEBUG_LOCATION, "Connect"); +} + +void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) { + if (shutdown_) { + (void)GRPC_ERROR_UNREF(error); + return; + } + // If we didn't get a transport or we fail to publish it, report + // TRANSIENT_FAILURE and start the retry timer. + // Note that if the connection attempt took longer than the backoff + // time, then the timer will fire immediately, and we will quickly + // transition back to IDLE. + if (connecting_result_.transport == nullptr || !PublishTransportLocked()) { + const Duration time_until_next_attempt = + next_attempt_time_ - ExecCtx::Get()->Now(); + gpr_log(GPR_INFO, + "subchannel %p %s: connect failed (%s), backing off for %" PRId64 + " ms", + this, key_.ToString().c_str(), grpc_error_std_string(error).c_str(), + time_until_next_attempt.millis()); + SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, + grpc_error_to_absl_status(error)); + WeakRef(DEBUG_LOCATION, "RetryTimer").release(); // Ref held by callback. + grpc_timer_init(&retry_timer_, next_attempt_time_, &on_retry_timer_); + } + (void)GRPC_ERROR_UNREF(error); } bool Subchannel::PublishTransportLocked() { @@ -1001,7 +980,7 @@ bool Subchannel::PublishTransportLocked() { RefCountedPtr socket = std::move(connecting_result_.socket_node); connecting_result_.Reset(); - if (disconnected_) return false; + if (shutdown_) return false; // Publish. connected_subchannel_.reset( new ConnectedSubchannel(stk->release(), args_, channelz_node_)); diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 49ee00035cc..131f3f590f5 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -253,7 +253,7 @@ class Subchannel : public DualRefCounted { } // Attempt to connect to the backend. Has no effect if already connected. - void AttemptToConnect() ABSL_LOCKS_EXCLUDED(mu_); + void RequestConnection() ABSL_LOCKS_EXCLUDED(mu_); // Resets the connection backoff of the subchannel. void ResetBackoff() ABSL_LOCKS_EXCLUDED(mu_); @@ -344,12 +344,14 @@ class Subchannel : public DualRefCounted { ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); // Methods for connection. - void MaybeStartConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - static void OnRetryAlarm(void* arg, grpc_error_handle error) + static void OnRetryTimer(void* arg, grpc_error_handle error) ABSL_LOCKS_EXCLUDED(mu_); - void ContinueConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + void OnRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + void StartConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); static void OnConnectingFinished(void* arg, grpc_error_handle error) ABSL_LOCKS_EXCLUDED(mu_); + void OnConnectingFinishedLocked(grpc_error_handle error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); bool PublishTransportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); // The subchannel pool this subchannel is in. @@ -365,6 +367,8 @@ class Subchannel : public DualRefCounted { grpc_pollset_set* pollset_set_; // Channelz tracking. RefCountedPtr channelz_node_; + // Minimum connection timeout. + Duration min_connect_timeout_; // Connection state. OrphanablePtr connector_; @@ -374,12 +378,18 @@ class Subchannel : public DualRefCounted { // Protects the other members. Mutex mu_; - // Active connection, or null. - RefCountedPtr connected_subchannel_ ABSL_GUARDED_BY(mu_); - bool connecting_ ABSL_GUARDED_BY(mu_) = false; - bool disconnected_ ABSL_GUARDED_BY(mu_) = false; + bool shutdown_ ABSL_GUARDED_BY(mu_) = false; + + // Records if RequestConnection() was called while in backoff. + bool connection_requested_ ABSL_GUARDED_BY(mu_) = false; // Connectivity state tracking. + // Note that the connectivity state implies the state of the + // Subchannel object: + // - IDLE: no retry timer pending, can start a connection attempt at any time + // - CONNECTING: connection attempt in progress + // - READY: connection attempt succeeded, connected_subchannel_ created + // - TRANSIENT_FAILURE: connection attempt failed, retry timer pending grpc_connectivity_state state_ ABSL_GUARDED_BY(mu_) = GRPC_CHANNEL_IDLE; absl::Status status_ ABSL_GUARDED_BY(mu_); // The list of watchers without a health check service name. @@ -387,25 +397,21 @@ class Subchannel : public DualRefCounted { // The map of watchers with health check service names. HealthWatcherMap health_watcher_map_ ABSL_GUARDED_BY(mu_); - // Data provider map. - std::map data_producer_map_ - ABSL_GUARDED_BY(mu_); + // Active connection, or null. + RefCountedPtr connected_subchannel_ ABSL_GUARDED_BY(mu_); - // Minimum connect timeout - must be located before backoff_. - Duration min_connect_timeout_ ABSL_GUARDED_BY(mu_); // Backoff state. BackOff backoff_ ABSL_GUARDED_BY(mu_); - Timestamp next_attempt_deadline_ ABSL_GUARDED_BY(mu_); - bool backoff_begun_ ABSL_GUARDED_BY(mu_) = false; - - // Retry alarm. - grpc_timer retry_alarm_ ABSL_GUARDED_BY(mu_); - grpc_closure on_retry_alarm_ ABSL_GUARDED_BY(mu_); - bool have_retry_alarm_ ABSL_GUARDED_BY(mu_) = false; - // reset_backoff() was called while alarm was pending. - bool retry_immediately_ ABSL_GUARDED_BY(mu_) = false; + Timestamp next_attempt_time_ ABSL_GUARDED_BY(mu_); + grpc_timer retry_timer_ ABSL_GUARDED_BY(mu_); + grpc_closure on_retry_timer_ ABSL_GUARDED_BY(mu_); + // Keepalive time period (-1 for unset) int keepalive_time_ ABSL_GUARDED_BY(mu_) = -1; + + // Data producer map. + std::map data_producer_map_ + ABSL_GUARDED_BY(mu_); }; } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/subchannel_interface.h b/src/core/ext/filters/client_channel/subchannel_interface.h index c2abd00bbd9..91694af808e 100644 --- a/src/core/ext/filters/client_channel/subchannel_interface.h +++ b/src/core/ext/filters/client_channel/subchannel_interface.h @@ -84,12 +84,12 @@ class SubchannelInterface : public RefCounted { // If the subchannel is currently in backoff delay due to a previously // failed attempt, the new connection attempt will not start until the // backoff delay has elapsed. - virtual void AttemptToConnect() = 0; + virtual void RequestConnection() = 0; - // Resets the subchannel's connection backoff state. If AttemptToConnect() + // Resets the subchannel's connection backoff state. If RequestConnection() // has been called since the subchannel entered TRANSIENT_FAILURE state, // starts a new connection attempt immediately; otherwise, a new connection - // attempt will be started as soon as AttemptToConnect() is called. + // attempt will be started as soon as RequestConnection() is called. virtual void ResetBackoff() = 0; // Registers a new data watcher. @@ -124,7 +124,9 @@ class DelegatingSubchannel : public SubchannelInterface { ConnectivityStateWatcherInterface* watcher) override { return wrapped_subchannel_->CancelConnectivityStateWatch(watcher); } - void AttemptToConnect() override { wrapped_subchannel_->AttemptToConnect(); } + void RequestConnection() override { + wrapped_subchannel_->RequestConnection(); + } void ResetBackoff() override { wrapped_subchannel_->ResetBackoff(); } const grpc_channel_args* channel_args() override { return wrapped_subchannel_->channel_args(); diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 0bb8b5fa251..97430ccf14d 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -849,6 +849,15 @@ void Server::CancelAllCalls() { GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls")); } +void Server::SendGoaways() { + ChannelBroadcaster broadcaster; + { + MutexLock lock(&mu_global_); + broadcaster.FillChannelsLocked(GetChannelsLocked()); + } + broadcaster.BroadcastShutdown(/*send_goaway=*/true, GRPC_ERROR_NONE); +} + void Server::Orphan() { { MutexLock lock(&mu_global_); diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index 7270feba95a..5af932d5237 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -170,6 +170,8 @@ class Server : public InternallyRefCounted, void CancelAllCalls() ABSL_LOCKS_EXCLUDED(mu_global_); + void SendGoaways() ABSL_LOCKS_EXCLUDED(mu_global_, mu_call_); + private: struct RequestedCall; diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 613f52df015..77ed100b4a8 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -58,6 +58,7 @@ #include "src/core/lib/security/credentials/fake/fake_credentials.h" #include "src/core/lib/service_config/service_config.h" #include "src/core/lib/service_config/service_config_impl.h" +#include "src/core/lib/surface/server.h" #include "src/cpp/client/secure_credentials.h" #include "src/cpp/server/secure_server_credentials.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" @@ -324,8 +325,7 @@ class ClientLbEnd2endTest : public ::testing::Test { const bool success = SendRpc(stub, &response, 2000, &status, wait_for_ready); ASSERT_TRUE(success) << "From " << location.file() << ":" << location.line() - << "\n" - << "Error: " << status.error_message() << " " + << "\nError: " << status.error_message() << " " << status.error_details(); ASSERT_EQ(response.message(), kRequestMessage) << "From " << location.file() << ":" << location.line(); @@ -399,20 +399,44 @@ class ClientLbEnd2endTest : public ::testing::Test { for (const auto& server : servers_) server->service_.ResetCounters(); } - void WaitForServer( + bool SeenAllServers(size_t start_index, size_t stop_index) { + for (size_t i = start_index; i < stop_index; ++i) { + if (servers_[i]->service_.request_count() == 0) return false; + } + return true; + } + + void WaitForServers( const std::unique_ptr& stub, - size_t server_idx, const grpc_core::DebugLocation& location, - bool ignore_failure = false) { - do { + size_t start_index, size_t stop_index, + const grpc_core::DebugLocation& location, bool ignore_failure = false) { + auto deadline = + absl::Now() + (absl::Seconds(30) * grpc_test_slowdown_factor()); + gpr_log(GPR_INFO, + "========= WAITING FOR BACKENDS [%" PRIuPTR ", %" PRIuPTR + ") ==========", + start_index, stop_index); + while (!SeenAllServers(start_index, stop_index)) { if (ignore_failure) { SendRpc(stub); } else { CheckRpcSendOk(stub, location, true); } - } while (servers_[server_idx]->service_.request_count() == 0); + EXPECT_LE(absl::Now(), deadline) + << " at " << location.file() << ":" << location.line(); + if (absl::Now() >= deadline) break; + } ResetCounters(); } + void WaitForServer( + const std::unique_ptr& stub, + size_t server_index, const grpc_core::DebugLocation& location, + bool ignore_failure = false) { + WaitForServers(stub, server_index, server_index + 1, location, + ignore_failure); + } + bool WaitForChannelState( Channel* channel, const std::function& predicate, @@ -494,7 +518,42 @@ TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) { EXPECT_TRUE(WaitForChannelReady(channel.get())); } -TEST_F(ClientLbEnd2endTest, PickFirst) { +TEST_F(ClientLbEnd2endTest, ChannelIdleness) { + // Start server. + const int kNumServers = 1; + StartServers(kNumServers); + // Set max idle time and build the channel. + ChannelArguments args; + args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS, 1000); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("", response_generator, args); + auto stub = BuildStub(channel); + // The initial channel state should be IDLE. + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE); + // After sending RPC, channel state should be READY. + gpr_log(GPR_INFO, "*** SENDING RPC, CHANNEL SHOULD CONNECT ***"); + response_generator.SetNextResolution(GetServersPorts()); + CheckRpcSendOk(stub, DEBUG_LOCATION); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); + // After a period time not using the channel, the channel state should switch + // to IDLE. + gpr_log(GPR_INFO, "*** WAITING FOR CHANNEL TO GO IDLE ***"); + gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1200)); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE); + // Sending a new RPC should awake the IDLE channel. + gpr_log(GPR_INFO, "*** SENDING ANOTHER RPC, CHANNEL SHOULD RECONNECT ***"); + response_generator.SetNextResolution(GetServersPorts()); + CheckRpcSendOk(stub, DEBUG_LOCATION); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); +} + +// +// pick_first tests +// + +using PickFirstTest = ClientLbEnd2endTest; + +TEST_F(PickFirstTest, Basic) { // Start servers and send one RPC per server. const int kNumServers = 3; StartServers(kNumServers); @@ -521,7 +580,7 @@ TEST_F(ClientLbEnd2endTest, PickFirst) { EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName()); } -TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) { +TEST_F(PickFirstTest, ProcessPending) { StartServers(1); // Single server auto response_generator = BuildResolverResponseGenerator(); auto channel = BuildChannel( @@ -541,7 +600,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) { CheckRpcSendOk(second_stub, DEBUG_LOCATION); } -TEST_F(ClientLbEnd2endTest, PickFirstSelectsReadyAtStartup) { +TEST_F(PickFirstTest, SelectsReadyAtStartup) { ChannelArguments args; constexpr int kInitialBackOffMs = 5000; args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs); @@ -567,7 +626,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstSelectsReadyAtStartup) { EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */)); } -TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) { +TEST_F(PickFirstTest, BackOffInitialReconnect) { ChannelArguments args; constexpr int kInitialBackOffMs = 100; args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs); @@ -599,7 +658,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) { 0); } -TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) { +TEST_F(PickFirstTest, BackOffMinReconnect) { ChannelArguments args; constexpr int kMinReconnectBackOffMs = 1000; args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, kMinReconnectBackOffMs); @@ -625,7 +684,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) { EXPECT_GE(waited.millis(), kMinReconnectBackOffMs - 1); } -TEST_F(ClientLbEnd2endTest, PickFirstResetConnectionBackoff) { +TEST_F(PickFirstTest, ResetConnectionBackoff) { ChannelArguments args; constexpr int kInitialBackOffMs = 1000; args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs); @@ -660,7 +719,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstResetConnectionBackoff) { } TEST_F(ClientLbEnd2endTest, - PickFirstResetConnectionBackoffNextAttemptStartsImmediately) { + ResetConnectionBackoffNextAttemptStartsImmediately) { ChannelArguments args; constexpr int kInitialBackOffMs = 1000; args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs); @@ -707,7 +766,7 @@ TEST_F(ClientLbEnd2endTest, EXPECT_LT(waited.millis(), kWaitMs); } -TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { +TEST_F(PickFirstTest, Updates) { // Start servers and send one RPC per server. const int kNumServers = 3; StartServers(kNumServers); @@ -756,7 +815,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName()); } -TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) { +TEST_F(PickFirstTest, UpdateSuperset) { // Start servers and send one RPC per server. const int kNumServers = 3; StartServers(kNumServers); @@ -789,7 +848,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) { EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName()); } -TEST_F(ClientLbEnd2endTest, PickFirstUpdateToUnconnected) { +TEST_F(PickFirstTest, UpdateToUnconnected) { const int kNumServers = 2; CreateServers(kNumServers); StartServer(0); @@ -822,7 +881,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateToUnconnected) { EXPECT_TRUE(WaitForChannelReady(channel.get())); } -TEST_F(ClientLbEnd2endTest, PickFirstGlobalSubchannelPool) { +TEST_F(PickFirstTest, GlobalSubchannelPool) { // Start one server. const int kNumServers = 1; StartServers(kNumServers); @@ -847,7 +906,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstGlobalSubchannelPool) { EXPECT_EQ(1UL, servers_[0]->service_.clients().size()); } -TEST_F(ClientLbEnd2endTest, PickFirstLocalSubchannelPool) { +TEST_F(PickFirstTest, LocalSubchannelPool) { // Start one server. const int kNumServers = 1; StartServers(kNumServers); @@ -874,7 +933,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstLocalSubchannelPool) { EXPECT_EQ(2UL, servers_[0]->service_.clients().size()); } -TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) { +TEST_F(PickFirstTest, ManyUpdates) { const int kNumUpdates = 1000; const int kNumServers = 3; StartServers(kNumServers); @@ -894,7 +953,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) { EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName()); } -TEST_F(ClientLbEnd2endTest, PickFirstReresolutionNoSelected) { +TEST_F(PickFirstTest, ReresolutionNoSelected) { // Prepare the ports for up servers and down servers. const int kNumServers = 3; const int kNumAliveServers = 1; @@ -926,7 +985,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstReresolutionNoSelected) { EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName()); } -TEST_F(ClientLbEnd2endTest, PickFirstReconnectWithoutNewResolverResult) { +TEST_F(PickFirstTest, ReconnectWithoutNewResolverResult) { std::vector ports = {grpc_pick_unused_port_or_die()}; StartServers(1, ports); auto response_generator = BuildResolverResponseGenerator(); @@ -943,8 +1002,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstReconnectWithoutNewResolverResult) { WaitForServer(stub, 0, DEBUG_LOCATION); } -TEST_F(ClientLbEnd2endTest, - PickFirstReconnectWithoutNewResolverResultStartsFromTopOfList) { +TEST_F(PickFirstTest, ReconnectWithoutNewResolverResultStartsFromTopOfList) { std::vector ports = {grpc_pick_unused_port_or_die(), grpc_pick_unused_port_or_die()}; CreateServers(2, ports); @@ -963,7 +1021,7 @@ TEST_F(ClientLbEnd2endTest, WaitForServer(stub, 0, DEBUG_LOCATION); } -TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) { +TEST_F(PickFirstTest, CheckStateBeforeStartWatch) { std::vector ports = {grpc_pick_unused_port_or_die()}; StartServers(1, ports); auto response_generator = BuildResolverResponseGenerator(); @@ -1003,7 +1061,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) { EXPECT_EQ("pick_first", channel_2->GetLoadBalancingPolicyName()); } -TEST_F(ClientLbEnd2endTest, PickFirstIdleOnDisconnect) { +TEST_F(PickFirstTest, IdleOnDisconnect) { // Start server, send RPC, and make sure channel is READY. const int kNumServers = 1; StartServers(kNumServers); @@ -1022,7 +1080,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstIdleOnDisconnect) { servers_.clear(); } -TEST_F(ClientLbEnd2endTest, PickFirstPendingUpdateAndSelectedSubchannelFails) { +TEST_F(PickFirstTest, PendingUpdateAndSelectedSubchannelFails) { auto response_generator = BuildResolverResponseGenerator(); auto channel = BuildChannel("", response_generator); // pick_first is the default. @@ -1071,7 +1129,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstPendingUpdateAndSelectedSubchannelFails) { WaitForServer(stub, 1, DEBUG_LOCATION, true /* ignore_failure */); } -TEST_F(ClientLbEnd2endTest, PickFirstStaysIdleUponEmptyUpdate) { +TEST_F(PickFirstTest, StaysIdleUponEmptyUpdate) { // Start server, send RPC, and make sure channel is READY. const int kNumServers = 1; StartServers(kNumServers); @@ -1099,8 +1157,8 @@ TEST_F(ClientLbEnd2endTest, PickFirstStaysIdleUponEmptyUpdate) { EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); } -TEST_F(ClientLbEnd2endTest, - PickFirstStaysTransientFailureOnFailedConnectionAttemptUntilReady) { +TEST_F(PickFirstTest, + StaysTransientFailureOnFailedConnectionAttemptUntilReady) { // Allocate 3 ports, with no servers running. std::vector ports = {grpc_pick_unused_port_or_die(), grpc_pick_unused_port_or_die(), @@ -1127,7 +1185,13 @@ TEST_F(ClientLbEnd2endTest, CheckRpcSendOk(stub, DEBUG_LOCATION); } -TEST_F(ClientLbEnd2endTest, RoundRobin) { +// +// round_robin tests +// + +using RoundRobinTest = ClientLbEnd2endTest; + +TEST_F(RoundRobinTest, Basic) { // Start servers and send one RPC per server. const int kNumServers = 3; StartServers(kNumServers); @@ -1156,7 +1220,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) { EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName()); } -TEST_F(ClientLbEnd2endTest, RoundRobinProcessPending) { +TEST_F(RoundRobinTest, ProcessPending) { StartServers(1); // Single server auto response_generator = BuildResolverResponseGenerator(); auto channel = BuildChannel("round_robin", response_generator); @@ -1175,17 +1239,16 @@ TEST_F(ClientLbEnd2endTest, RoundRobinProcessPending) { CheckRpcSendOk(second_stub, DEBUG_LOCATION); } -TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { - // Start servers and send one RPC per server. +TEST_F(RoundRobinTest, Updates) { + // Start servers. const int kNumServers = 3; StartServers(kNumServers); auto response_generator = BuildResolverResponseGenerator(); auto channel = BuildChannel("round_robin", response_generator); auto stub = BuildStub(channel); - std::vector ports; // Start with a single server. gpr_log(GPR_INFO, "*** FIRST BACKEND ***"); - ports.emplace_back(servers_[0]->port_); + std::vector ports = {servers_[0]->port_}; response_generator.SetNextResolution(ports); WaitForServer(stub, 0, DEBUG_LOCATION); // Send RPCs. They should all go servers_[0] @@ -1193,7 +1256,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { EXPECT_EQ(10, servers_[0]->service_.request_count()); EXPECT_EQ(0, servers_[1]->service_.request_count()); EXPECT_EQ(0, servers_[2]->service_.request_count()); - servers_[0]->service_.ResetCounters(); + ResetCounters(); // And now for the second server. gpr_log(GPR_INFO, "*** SECOND BACKEND ***"); ports.clear(); @@ -1207,7 +1270,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { EXPECT_EQ(0, servers_[0]->service_.request_count()); EXPECT_EQ(10, servers_[1]->service_.request_count()); EXPECT_EQ(0, servers_[2]->service_.request_count()); - servers_[1]->service_.ResetCounters(); + ResetCounters(); // ... and for the last server. gpr_log(GPR_INFO, "*** THIRD BACKEND ***"); ports.clear(); @@ -1218,7 +1281,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { EXPECT_EQ(0, servers_[0]->service_.request_count()); EXPECT_EQ(0, servers_[1]->service_.request_count()); EXPECT_EQ(10, servers_[2]->service_.request_count()); - servers_[2]->service_.ResetCounters(); + ResetCounters(); // Back to all servers. gpr_log(GPR_INFO, "*** ALL BACKENDS ***"); ports.clear(); @@ -1226,23 +1289,19 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { ports.emplace_back(servers_[1]->port_); ports.emplace_back(servers_[2]->port_); response_generator.SetNextResolution(ports); - WaitForServer(stub, 0, DEBUG_LOCATION); - WaitForServer(stub, 1, DEBUG_LOCATION); - WaitForServer(stub, 2, DEBUG_LOCATION); + WaitForServers(stub, 0, 3, DEBUG_LOCATION); // Send three RPCs, one per server. for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION); EXPECT_EQ(1, servers_[0]->service_.request_count()); EXPECT_EQ(1, servers_[1]->service_.request_count()); EXPECT_EQ(1, servers_[2]->service_.request_count()); + ResetCounters(); // An empty update will result in the channel going into TRANSIENT_FAILURE. gpr_log(GPR_INFO, "*** NO BACKENDS ***"); ports.clear(); response_generator.SetNextResolution(ports); - grpc_connectivity_state channel_state; - do { - channel_state = channel->GetState(true /* try to connect */); - } while (channel_state == GRPC_CHANNEL_READY); - ASSERT_NE(channel_state, GRPC_CHANNEL_READY); + WaitForChannelNotReady(channel.get()); + CheckRpcSendFailure(stub); servers_[0]->service_.ResetCounters(); // Next update introduces servers_[1], making the channel recover. gpr_log(GPR_INFO, "*** BACK TO SECOND BACKEND ***"); @@ -1250,13 +1309,12 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { ports.emplace_back(servers_[1]->port_); response_generator.SetNextResolution(ports); WaitForServer(stub, 1, DEBUG_LOCATION); - channel_state = channel->GetState(false /* try to connect */); - ASSERT_EQ(channel_state, GRPC_CHANNEL_READY); + EXPECT_EQ(GRPC_CHANNEL_READY, channel->GetState(/*try_to_connect=*/false)); // Check LB policy name for the channel. EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName()); } -TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) { +TEST_F(RoundRobinTest, UpdateInError) { const int kNumServers = 3; StartServers(kNumServers); auto response_generator = BuildResolverResponseGenerator(); @@ -1286,7 +1344,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) { EXPECT_EQ(0, servers_[1]->service_.request_count()); } -TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) { +TEST_F(RoundRobinTest, ManyUpdates) { // Start servers and send one RPC per server. const int kNumServers = 3; StartServers(kNumServers); @@ -1304,66 +1362,33 @@ TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) { EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName()); } -TEST_F(ClientLbEnd2endTest, RoundRobinConcurrentUpdates) { - // TODO(dgq): replicate the way internal testing exercises the concurrent - // update provisions of RR. -} - -TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) { - // Start servers and send one RPC per server. - const int kNumServers = 3; - std::vector first_ports; - std::vector second_ports; - first_ports.reserve(kNumServers); - for (int i = 0; i < kNumServers; ++i) { - first_ports.push_back(grpc_pick_unused_port_or_die()); - } - second_ports.reserve(kNumServers); - for (int i = 0; i < kNumServers; ++i) { - second_ports.push_back(grpc_pick_unused_port_or_die()); - } - StartServers(kNumServers, first_ports); +TEST_F(RoundRobinTest, ReresolveOnSubchannelConnectionFailure) { + // Start 3 servers. + StartServers(3); + // Create channel. auto response_generator = BuildResolverResponseGenerator(); auto channel = BuildChannel("round_robin", response_generator); auto stub = BuildStub(channel); - response_generator.SetNextResolution(first_ports); - // Send a number of RPCs, which succeed. - for (size_t i = 0; i < 100; ++i) { - CheckRpcSendOk(stub, DEBUG_LOCATION); - } - // Kill all servers - gpr_log(GPR_INFO, "****** ABOUT TO KILL SERVERS *******"); - for (size_t i = 0; i < servers_.size(); ++i) { - servers_[i]->Shutdown(); + // Initially, tell the channel about only the first two servers. + std::vector ports = {servers_[0]->port_, servers_[1]->port_}; + response_generator.SetNextResolution(ports); + // Wait for both servers to be seen. + WaitForServers(stub, 0, 2, DEBUG_LOCATION); + // Tell the fake resolver to send an update that adds the last server, but + // only when the LB policy requests re-resolution. + ports.push_back(servers_[2]->port_); + response_generator.SetNextResolutionUponError(ports); + // Have server 0 send a GOAWAY. This should trigger a re-resolution. + gpr_log(GPR_INFO, "****** SENDING GOAWAY FROM SERVER 0 *******"); + { + grpc_core::ExecCtx exec_ctx; + grpc_core::Server::FromC(servers_[0]->server_->c_server())->SendGoaways(); } - gpr_log(GPR_INFO, "****** SERVERS KILLED *******"); - gpr_log(GPR_INFO, "****** SENDING DOOMED REQUESTS *******"); - // Client requests should fail. Send enough to tickle all subchannels. - for (size_t i = 0; i < servers_.size(); ++i) CheckRpcSendFailure(stub); - gpr_log(GPR_INFO, "****** DOOMED REQUESTS SENT *******"); - // Bring servers back up on a different set of ports. We need to do this to be - // sure that the eventual success is *not* due to subchannel reconnection - // attempts and that an actual re-resolution has happened as a result of the - // RR policy going into transient failure when all its subchannels become - // unavailable (in transient failure as well). - gpr_log(GPR_INFO, "****** RESTARTING SERVERS *******"); - StartServers(kNumServers, second_ports); - // Don't notify of the update. Wait for the LB policy's re-resolution to - // "pull" the new ports. - response_generator.SetNextResolutionUponError(second_ports); - gpr_log(GPR_INFO, "****** SERVERS RESTARTED *******"); - gpr_log(GPR_INFO, "****** SENDING REQUEST TO SUCCEED *******"); - // Client request should eventually (but still fairly soon) succeed. - const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(5); - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - while (gpr_time_cmp(deadline, now) > 0) { - if (SendRpc(stub)) break; - now = gpr_now(GPR_CLOCK_MONOTONIC); - } - ASSERT_GT(gpr_time_cmp(deadline, now), 0); + // Wait for the client to see server 2. + WaitForServer(stub, 2, DEBUG_LOCATION); } -TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailure) { +TEST_F(RoundRobinTest, TransientFailure) { // Start servers and create channel. Channel should go to READY state. const int kNumServers = 3; StartServers(kNumServers); @@ -1391,7 +1416,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailure) { EXPECT_TRUE(WaitForChannelState(channel.get(), predicate)); } -TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailureAtStartup) { +TEST_F(RoundRobinTest, TransientFailureAtStartup) { // Create channel and return servers that don't exist. Channel should // quickly transition into TRANSIENT_FAILURE. // TODO(roth): This test should ideally check that even when the @@ -1420,7 +1445,146 @@ TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailureAtStartup) { EXPECT_TRUE(WaitForChannelState(channel.get(), predicate, true)); } -TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) { +TEST_F(RoundRobinTest, DoesNotFailRpcsUponDisconnection) { + // A connection attempt injector that allows us to control timing. + class ConnectionInjector : public ConnectionAttemptInjector { + public: + explicit ConnectionInjector(int port) : port_(port) {} + + void InterceptNextAttempt() { + grpc_core::MutexLock lock(&mu_); + intercept_next_attempt_ = true; + } + + void WaitForAttemptToStart() { + grpc_core::MutexLock lock(&mu_); + while (queued_attempt_ == nullptr) { + start_cond_.Wait(&mu_); + } + } + + void ResumeAttempt() { + grpc_core::ExecCtx exec_ctx; + std::unique_ptr attempt; + { + grpc_core::MutexLock lock(&mu_); + attempt = std::move(queued_attempt_); + } + attempt->Resume(); + } + + void WaitForAttemptComplete() { + grpc_core::MutexLock lock(&mu_); + while (!attempt_complete_) { + complete_cond_.Wait(&mu_); + } + } + + void HandleConnection(grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* addr, + grpc_core::Timestamp deadline) override { + const int port = grpc_sockaddr_get_port(addr); + gpr_log(GPR_INFO, "==> HandleConnection(): port=%d", port); + if (port == port_) { + grpc_core::MutexLock lock(&mu_); + if (intercept_next_attempt_) { + gpr_log(GPR_INFO, "*** INTERCEPTING CONNECTION ATTEMPT"); + original_closure_ = closure; + closure = GRPC_CLOSURE_INIT(&closure_, OnComplete, this, nullptr); + intercept_next_attempt_ = false; + queued_attempt_ = absl::make_unique( + closure, ep, interested_parties, channel_args, addr, deadline); + start_cond_.Signal(); + return; + } + } + AttemptConnection(closure, ep, interested_parties, channel_args, addr, + deadline); + } + + private: + static void OnComplete(void* arg, grpc_error_handle error) { + auto* self = static_cast(arg); + { + grpc_core::MutexLock lock(&self->mu_); + self->attempt_complete_ = true; + self->complete_cond_.Signal(); + } + grpc_core::Closure::Run(DEBUG_LOCATION, self->original_closure_, + GRPC_ERROR_REF(error)); + } + + const int port_; + + grpc_core::Mutex mu_; + bool intercept_next_attempt_ ABSL_GUARDED_BY(mu_) = false; + grpc_core::CondVar start_cond_; + std::unique_ptr queued_attempt_ ABSL_GUARDED_BY(mu_); + grpc_closure* original_closure_ = nullptr; + grpc_closure closure_; + grpc_core::CondVar complete_cond_; + bool attempt_complete_ ABSL_GUARDED_BY(mu_) = false; + }; + // Start server. + StartServers(1); + ConnectionInjector injector(servers_[0]->port_); + injector.Start(); + // Create channel. + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator); + auto stub = BuildStub(channel); + response_generator.SetNextResolution(GetServersPorts()); + // Start a thread constantly sending RPCs in a loop. + gpr_log(GPR_ERROR, "=== STARTING CLIENT THREAD ==="); + std::atomic shutdown{false}; + gpr_event ev; + gpr_event_init(&ev); + std::thread thd([&]() { + gpr_log(GPR_INFO, "sending first RPC"); + CheckRpcSendOk(stub, DEBUG_LOCATION); + gpr_event_set(&ev, reinterpret_cast(1)); + while (!shutdown.load()) { + gpr_log(GPR_INFO, "sending RPC"); + CheckRpcSendOk(stub, DEBUG_LOCATION); + } + }); + // Wait for first RPC to complete. + gpr_log(GPR_ERROR, "=== WAITING FOR FIRST RPC TO COMPLETE ==="); + ASSERT_EQ(reinterpret_cast(1), + gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(1))); + // Channel should now be READY. + ASSERT_EQ(GRPC_CHANNEL_READY, channel->GetState(false)); + // Tell injector to intercept the next connection attempt. + injector.InterceptNextAttempt(); + // Now kill the server. The subchannel should report IDLE and be + // immediately reconnected to, but this should not cause any test + // failures. + gpr_log(GPR_ERROR, "=== SHUTTING DOWN SERVER ==="); + { + grpc_core::ExecCtx exec_ctx; + grpc_core::Server::FromC(servers_[0]->server_->c_server())->SendGoaways(); + } + gpr_sleep_until(grpc_timeout_seconds_to_deadline(1)); + servers_[0]->Shutdown(); + // Wait for next attempt to start. + gpr_log(GPR_ERROR, "=== WAITING FOR RECONNECTION ATTEMPT ==="); + injector.WaitForAttemptToStart(); + // Start server and allow attempt to continue. + gpr_log(GPR_ERROR, "=== RESTARTING SERVER ==="); + StartServer(0); + injector.ResumeAttempt(); + // Wait for next attempt to complete. + gpr_log(GPR_ERROR, "=== WAITING FOR RECONNECTION ATTEMPT TO COMPLETE ==="); + injector.WaitForAttemptComplete(); + // Now shut down the thread. + gpr_log(GPR_ERROR, "=== SHUTTING DOWN CLIENT THREAD ==="); + shutdown.store(true); + thd.join(); +} + +TEST_F(RoundRobinTest, SingleReconnect) { const int kNumServers = 3; StartServers(kNumServers); const auto ports = GetServersPorts(); @@ -1464,8 +1628,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) { // If health checking is required by client but health checking service // is not running on the server, the channel should be treated as healthy. -TEST_F(ClientLbEnd2endTest, - RoundRobinServersHealthCheckingUnimplementedTreatedAsHealthy) { +TEST_F(RoundRobinTest, ServersHealthCheckingUnimplementedTreatedAsHealthy) { StartServers(1); // Single server ChannelArguments args; args.SetServiceConfigJSON( @@ -1479,7 +1642,7 @@ TEST_F(ClientLbEnd2endTest, CheckRpcSendOk(stub, DEBUG_LOCATION); } -TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthChecking) { +TEST_F(RoundRobinTest, HealthChecking) { EnableDefaultHealthCheckService(true); // Start servers. const int kNumServers = 3; @@ -1553,8 +1716,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthChecking) { EnableDefaultHealthCheckService(false); } -TEST_F(ClientLbEnd2endTest, - RoundRobinWithHealthCheckingHandlesSubchannelFailure) { +TEST_F(RoundRobinTest, HealthCheckingHandlesSubchannelFailure) { EnableDefaultHealthCheckService(true); // Start servers. const int kNumServers = 3; @@ -1581,7 +1743,7 @@ TEST_F(ClientLbEnd2endTest, } } -TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingInhibitPerChannel) { +TEST_F(RoundRobinTest, WithHealthCheckingInhibitPerChannel) { EnableDefaultHealthCheckService(true); // Start server. const int kNumServers = 1; @@ -1618,7 +1780,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingInhibitPerChannel) { EnableDefaultHealthCheckService(false); } -TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingServiceNamePerChannel) { +TEST_F(RoundRobinTest, HealthCheckingServiceNamePerChannel) { EnableDefaultHealthCheckService(true); // Start server. const int kNumServers = 1; @@ -1661,8 +1823,8 @@ TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingServiceNamePerChannel) { EnableDefaultHealthCheckService(false); } -TEST_F(ClientLbEnd2endTest, - RoundRobinWithHealthCheckingServiceNameChangesAfterSubchannelsCreated) { +TEST_F(RoundRobinTest, + HealthCheckingServiceNameChangesAfterSubchannelsCreated) { EnableDefaultHealthCheckService(true); // Start server. const int kNumServers = 1; @@ -1689,34 +1851,9 @@ TEST_F(ClientLbEnd2endTest, EnableDefaultHealthCheckService(false); } -TEST_F(ClientLbEnd2endTest, ChannelIdleness) { - // Start server. - const int kNumServers = 1; - StartServers(kNumServers); - // Set max idle time and build the channel. - ChannelArguments args; - args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS, 1000); - auto response_generator = BuildResolverResponseGenerator(); - auto channel = BuildChannel("", response_generator, args); - auto stub = BuildStub(channel); - // The initial channel state should be IDLE. - EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE); - // After sending RPC, channel state should be READY. - gpr_log(GPR_INFO, "*** SENDING RPC, CHANNEL SHOULD CONNECT ***"); - response_generator.SetNextResolution(GetServersPorts()); - CheckRpcSendOk(stub, DEBUG_LOCATION); - EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); - // After a period time not using the channel, the channel state should switch - // to IDLE. - gpr_log(GPR_INFO, "*** WAITING FOR CHANNEL TO GO IDLE ***"); - gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1200)); - EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE); - // Sending a new RPC should awake the IDLE channel. - gpr_log(GPR_INFO, "*** SENDING ANOTHER RPC, CHANNEL SHOULD RECONNECT ***"); - response_generator.SetNextResolution(GetServersPorts()); - CheckRpcSendOk(stub, DEBUG_LOCATION); - EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); -} +// +// LB policy pick args +// class ClientLbPickArgsTest : public ClientLbEnd2endTest { protected: @@ -1795,6 +1932,10 @@ TEST_F(ClientLbPickArgsTest, Basic) { << ArgsSeenListString(pick_args_seen_list); } +// +// tests that LB policies can get the call's trailing metadata +// + xds::data::orca::v3::OrcaLoadReport BackendMetricDataToOrcaLoadReport( const grpc_core::LoadBalancingPolicy::BackendMetricAccessor:: BackendMetricData& backend_metric_data) { @@ -2053,6 +2194,10 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) { EXPECT_EQ(kNumRpcs, trailers_intercepted()); } +// +// tests that address attributes from the resolver are visible to the LB policy +// + class ClientLbAddressTest : public ClientLbEnd2endTest { protected: static const char* kAttributeKey; @@ -2131,6 +2276,10 @@ TEST_F(ClientLbAddressTest, Basic) { EXPECT_EQ(addresses_seen(), expected); } +// +// tests OOB backend metric API +// + class OobBackendMetricTest : public ClientLbEnd2endTest { protected: using BackendMetricReport = @@ -2239,7 +2388,9 @@ TEST_F(OobBackendMetricTest, Basic) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); grpc::testing::TestEnvironment env(&argc, argv); + grpc_init(); grpc::testing::ConnectionAttemptInjector::Init(); const auto result = RUN_ALL_TESTS(); + grpc_shutdown(); return result; } diff --git a/test/cpp/end2end/connection_delay_injector.cc b/test/cpp/end2end/connection_delay_injector.cc index 75e45870175..d3bd3d146d8 100644 --- a/test/cpp/end2end/connection_delay_injector.cc +++ b/test/cpp/end2end/connection_delay_injector.cc @@ -69,6 +69,9 @@ ConnectionAttemptInjector::~ConnectionAttemptInjector() { } void ConnectionAttemptInjector::Start() { + // Fail if ConnectionAttemptInjector::Init() was not called after + // grpc_init() to inject the vtable. + GPR_ASSERT(grpc_tcp_client_impl == &kDelayedConnectVTable); grpc_core::MutexLock lock(g_mu); GPR_ASSERT(g_injector == nullptr); g_injector = this; diff --git a/test/cpp/end2end/connection_delay_injector.h b/test/cpp/end2end/connection_delay_injector.h index cbe09f1c36a..cac171a758a 100644 --- a/test/cpp/end2end/connection_delay_injector.h +++ b/test/cpp/end2end/connection_delay_injector.h @@ -40,7 +40,8 @@ namespace testing { class ConnectionAttemptInjector { public: // Global initializer. Replaces the iomgr TCP client vtable. - // Must be called exactly once before any TCP connections are established. + // Must be called exactly once after grpc_init() but before any TCP + // connections are established. static void Init(); virtual ~ConnectionAttemptInjector();