diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc index 84afb0bbc40..7b7c271efef 100644 --- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc +++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc @@ -270,6 +270,12 @@ class RingHash : public LoadBalancingPolicy { // The index of the subchannel currently doing an internally // triggered connection attempt, if any. absl::optional internally_triggered_connection_index_; + + // TODO(roth): If we ever change the helper UpdateState() API to not + // need the status reported for TRANSIENT_FAILURE state (because + // it's not currently actually used for anything outside of the picker), + // then we will no longer need this data member. + absl::Status last_failure_; }; class Ring : public RefCounted { @@ -646,8 +652,17 @@ void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked( state = GRPC_CHANNEL_TRANSIENT_FAILURE; start_connection_attempt = true; } - // Pass along status only in TRANSIENT_FAILURE. - if (state != GRPC_CHANNEL_TRANSIENT_FAILURE) status = absl::OkStatus(); + // In TRANSIENT_FAILURE, report the last reported failure. + // Otherwise, report OK. + if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + if (!status.ok()) { + last_failure_ = absl::UnavailableError(absl::StrCat( + "no reachable subchannels; last error: ", status.ToString())); + } + status = last_failure_; + } else { + status = absl::OkStatus(); + } // Generate new picker and return it to the channel. // Note that we use our own picker regardless of connectivity state. p->channel_control_helper()->UpdateState( @@ -755,9 +770,7 @@ void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked( // Update the RH policy's connectivity state, creating new picker and new // ring. subchannel_list()->UpdateRingHashConnectivityStateLocked( - Index(), connection_attempt_complete, - absl::UnavailableError(absl::StrCat( - "no reachable subchannels; last error: ", status.ToString()))); + Index(), connection_attempt_complete, status); } // diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index e99997e1bc9..305ba26f08e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -101,9 +101,8 @@ class RoundRobin : public LoadBalancingPolicy { absl::optional old_state, grpc_connectivity_state new_state) override; - // Updates the logical connectivity state. Returns true if the - // state has changed. - bool UpdateLogicalConnectivityStateLocked( + // Updates the logical connectivity state. + void UpdateLogicalConnectivityStateLocked( grpc_connectivity_state connectivity_state); // The logical connectivity state of the subchannel. @@ -161,6 +160,8 @@ class RoundRobin : public LoadBalancingPolicy { size_t num_ready_ = 0; size_t num_connecting_ = 0; size_t num_transient_failure_ = 0; + + absl::Status last_failure_; }; class Picker : public SubchannelPicker { @@ -397,9 +398,14 @@ void RoundRobin::RoundRobinSubchannelList:: "[RR %p] reporting TRANSIENT_FAILURE with subchannel list %p: %s", p, this, status_for_tf.ToString().c_str()); } + if (!status_for_tf.ok()) { + last_failure_ = absl::UnavailableError( + absl::StrCat("connections to all backends failing; last error: ", + status_for_tf.ToString())); + } p->channel_control_helper()->UpdateState( - GRPC_CHANNEL_TRANSIENT_FAILURE, status_for_tf, - absl::make_unique(status_for_tf)); + GRPC_CHANNEL_TRANSIENT_FAILURE, last_failure_, + absl::make_unique(last_failure_)); } } @@ -434,16 +440,13 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( subchannel()->RequestConnection(); } // Update logical connectivity state. - // If it changed, update the policy state. - if (UpdateLogicalConnectivityStateLocked(new_state)) { - subchannel_list()->MaybeUpdateRoundRobinConnectivityStateLocked( - absl::UnavailableError( - absl::StrCat("connections to all backends failing; last error: ", - connectivity_status().ToString()))); - } + UpdateLogicalConnectivityStateLocked(new_state); + // Update the policy state. + subchannel_list()->MaybeUpdateRoundRobinConnectivityStateLocked( + connectivity_status()); } -bool RoundRobin::RoundRobinSubchannelData::UpdateLogicalConnectivityStateLocked( +void RoundRobin::RoundRobinSubchannelData::UpdateLogicalConnectivityStateLocked( grpc_connectivity_state connectivity_state) { RoundRobin* p = static_cast(subchannel_list()->policy()); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { @@ -464,7 +467,7 @@ bool RoundRobin::RoundRobinSubchannelData::UpdateLogicalConnectivityStateLocked( if (logical_connectivity_state_.has_value() && *logical_connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && connectivity_state != GRPC_CHANNEL_READY) { - return false; + return; } // If the new state is IDLE, treat it as CONNECTING, since it will // immediately transition into CONNECTING anyway. @@ -481,13 +484,12 @@ bool RoundRobin::RoundRobinSubchannelData::UpdateLogicalConnectivityStateLocked( // If no change, return false. if (logical_connectivity_state_.has_value() && *logical_connectivity_state_ == connectivity_state) { - return false; + return; } // Otherwise, update counters and logical state. subchannel_list()->UpdateStateCountersLocked(logical_connectivity_state_, connectivity_state); logical_connectivity_state_ = connectivity_state; - return true; } // diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 036a6a2c988..8992efa160d 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -119,6 +119,17 @@ class ConnectionInjector : public ConnectionAttemptInjector { attempt->Resume(); } + void Fail(grpc_error_handle error) { + gpr_log(GPR_INFO, "=== FAILING CONNECTION ATTEMPT ON PORT %d ===", port_); + grpc_core::ExecCtx exec_ctx; + std::unique_ptr attempt; + { + grpc_core::MutexLock lock(&injector_->mu_); + attempt = std::move(queued_attempt_); + } + attempt->Fail(error); + } + void WaitForCompletion() { gpr_log(GPR_INFO, "=== WAITING FOR CONNECTION COMPLETION ON PORT %d ===", port_); @@ -1734,6 +1745,57 @@ TEST_F(RoundRobinTest, StaysInTransientFailureInSubsequentConnecting) { hold->Resume(); } +TEST_F(RoundRobinTest, ReportsLatestStatusInTransientFailure) { + // Start connection injector. + ConnectionInjector injector; + injector.Start(); + // Get port. + const std::vector ports = {grpc_pick_unused_port_or_die(), + grpc_pick_unused_port_or_die()}; + // Create channel. + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator); + auto stub = BuildStub(channel); + response_generator.SetNextResolution(ports); + // Allow first connection attempts to fail normally, and check that + // the RPC fails with the right status message. + CheckRpcSendFailure( + DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE, + "connections to all backends failing; last error: " + "(UNKNOWN: Failed to connect to remote host: Connection refused|" + "UNAVAILABLE: Failed to connect to remote host: FD shutdown)"); + // Now intercept the next connection attempt for each port. + auto hold1 = injector.AddHold(ports[0]); + auto hold2 = injector.AddHold(ports[1]); + hold1->Wait(); + hold2->Wait(); + // Inject a custom failure message. + hold1->Wait(); + hold1->Fail(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Survey says... Bzzzzt!")); + // Wait until RPC fails with the right message. + absl::Time deadline = + absl::Now() + (absl::Seconds(5) * grpc_test_slowdown_factor()); + while (true) { + Status status = SendRpc(stub); + EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code()); + if (status.error_message() == + "connections to all backends failing; last error: " + "UNKNOWN: Survey says... Bzzzzt!") { + break; + } + EXPECT_THAT( + status.error_message(), + ::testing::MatchesRegex( + "connections to all backends failing; last error: " + "(UNKNOWN: Failed to connect to remote host: Connection refused|" + "UNAVAILABLE: Failed to connect to remote host: FD shutdown)")); + EXPECT_LT(absl::Now(), deadline); + if (absl::Now() >= deadline) break; + } + // Clean up. + hold2->Resume(); +} + TEST_F(RoundRobinTest, DoesNotFailRpcsUponDisconnection) { // Start connection injector. ConnectionInjector injector;