round_robin: update status upon each new connection failure (#30001)

* refactor connection injectors in client_lb_end2end_test and add test for sticky TF

* round_robin: update status upon each new connection failure

* fix ring_hash too

* clang-format
pull/30083/head
Mark D. Roth 3 years ago committed by GitHub
parent 86e282ba97
commit 3a8e54b005
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  2. 34
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  3. 62
      test/cpp/end2end/client_lb_end2end_test.cc

@ -270,6 +270,12 @@ class RingHash : public LoadBalancingPolicy {
// The index of the subchannel currently doing an internally
// triggered connection attempt, if any.
absl::optional<size_t> internally_triggered_connection_index_;
// TODO(roth): If we ever change the helper UpdateState() API to not
// need the status reported for TRANSIENT_FAILURE state (because
// it's not currently actually used for anything outside of the picker),
// then we will no longer need this data member.
absl::Status last_failure_;
};
class Ring : public RefCounted<Ring> {
@ -646,8 +652,17 @@ void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked(
state = GRPC_CHANNEL_TRANSIENT_FAILURE;
start_connection_attempt = true;
}
// Pass along status only in TRANSIENT_FAILURE.
if (state != GRPC_CHANNEL_TRANSIENT_FAILURE) status = absl::OkStatus();
// In TRANSIENT_FAILURE, report the last reported failure.
// Otherwise, report OK.
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
if (!status.ok()) {
last_failure_ = absl::UnavailableError(absl::StrCat(
"no reachable subchannels; last error: ", status.ToString()));
}
status = last_failure_;
} else {
status = absl::OkStatus();
}
// Generate new picker and return it to the channel.
// Note that we use our own picker regardless of connectivity state.
p->channel_control_helper()->UpdateState(
@ -755,9 +770,7 @@ void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
// Update the RH policy's connectivity state, creating new picker and new
// ring.
subchannel_list()->UpdateRingHashConnectivityStateLocked(
Index(), connection_attempt_complete,
absl::UnavailableError(absl::StrCat(
"no reachable subchannels; last error: ", status.ToString())));
Index(), connection_attempt_complete, status);
}
//

@ -101,9 +101,8 @@ class RoundRobin : public LoadBalancingPolicy {
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) override;
// Updates the logical connectivity state. Returns true if the
// state has changed.
bool UpdateLogicalConnectivityStateLocked(
// Updates the logical connectivity state.
void UpdateLogicalConnectivityStateLocked(
grpc_connectivity_state connectivity_state);
// The logical connectivity state of the subchannel.
@ -161,6 +160,8 @@ class RoundRobin : public LoadBalancingPolicy {
size_t num_ready_ = 0;
size_t num_connecting_ = 0;
size_t num_transient_failure_ = 0;
absl::Status last_failure_;
};
class Picker : public SubchannelPicker {
@ -397,9 +398,14 @@ void RoundRobin::RoundRobinSubchannelList::
"[RR %p] reporting TRANSIENT_FAILURE with subchannel list %p: %s",
p, this, status_for_tf.ToString().c_str());
}
if (!status_for_tf.ok()) {
last_failure_ = absl::UnavailableError(
absl::StrCat("connections to all backends failing; last error: ",
status_for_tf.ToString()));
}
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status_for_tf,
absl::make_unique<TransientFailurePicker>(status_for_tf));
GRPC_CHANNEL_TRANSIENT_FAILURE, last_failure_,
absl::make_unique<TransientFailurePicker>(last_failure_));
}
}
@ -434,16 +440,13 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
subchannel()->RequestConnection();
}
// Update logical connectivity state.
// If it changed, update the policy state.
if (UpdateLogicalConnectivityStateLocked(new_state)) {
subchannel_list()->MaybeUpdateRoundRobinConnectivityStateLocked(
absl::UnavailableError(
absl::StrCat("connections to all backends failing; last error: ",
connectivity_status().ToString())));
}
UpdateLogicalConnectivityStateLocked(new_state);
// Update the policy state.
subchannel_list()->MaybeUpdateRoundRobinConnectivityStateLocked(
connectivity_status());
}
bool RoundRobin::RoundRobinSubchannelData::UpdateLogicalConnectivityStateLocked(
void RoundRobin::RoundRobinSubchannelData::UpdateLogicalConnectivityStateLocked(
grpc_connectivity_state connectivity_state) {
RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
@ -464,7 +467,7 @@ bool RoundRobin::RoundRobinSubchannelData::UpdateLogicalConnectivityStateLocked(
if (logical_connectivity_state_.has_value() &&
*logical_connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
connectivity_state != GRPC_CHANNEL_READY) {
return false;
return;
}
// If the new state is IDLE, treat it as CONNECTING, since it will
// immediately transition into CONNECTING anyway.
@ -481,13 +484,12 @@ bool RoundRobin::RoundRobinSubchannelData::UpdateLogicalConnectivityStateLocked(
// If no change, return false.
if (logical_connectivity_state_.has_value() &&
*logical_connectivity_state_ == connectivity_state) {
return false;
return;
}
// Otherwise, update counters and logical state.
subchannel_list()->UpdateStateCountersLocked(logical_connectivity_state_,
connectivity_state);
logical_connectivity_state_ = connectivity_state;
return true;
}
//

@ -119,6 +119,17 @@ class ConnectionInjector : public ConnectionAttemptInjector {
attempt->Resume();
}
void Fail(grpc_error_handle error) {
gpr_log(GPR_INFO, "=== FAILING CONNECTION ATTEMPT ON PORT %d ===", port_);
grpc_core::ExecCtx exec_ctx;
std::unique_ptr<QueuedAttempt> attempt;
{
grpc_core::MutexLock lock(&injector_->mu_);
attempt = std::move(queued_attempt_);
}
attempt->Fail(error);
}
void WaitForCompletion() {
gpr_log(GPR_INFO,
"=== WAITING FOR CONNECTION COMPLETION ON PORT %d ===", port_);
@ -1734,6 +1745,57 @@ TEST_F(RoundRobinTest, StaysInTransientFailureInSubsequentConnecting) {
hold->Resume();
}
TEST_F(RoundRobinTest, ReportsLatestStatusInTransientFailure) {
// Start connection injector.
ConnectionInjector injector;
injector.Start();
// Get port.
const std::vector<int> ports = {grpc_pick_unused_port_or_die(),
grpc_pick_unused_port_or_die()};
// Create channel.
auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel);
response_generator.SetNextResolution(ports);
// Allow first connection attempts to fail normally, and check that
// the RPC fails with the right status message.
CheckRpcSendFailure(
DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
"connections to all backends failing; last error: "
"(UNKNOWN: Failed to connect to remote host: Connection refused|"
"UNAVAILABLE: Failed to connect to remote host: FD shutdown)");
// Now intercept the next connection attempt for each port.
auto hold1 = injector.AddHold(ports[0]);
auto hold2 = injector.AddHold(ports[1]);
hold1->Wait();
hold2->Wait();
// Inject a custom failure message.
hold1->Wait();
hold1->Fail(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Survey says... Bzzzzt!"));
// Wait until RPC fails with the right message.
absl::Time deadline =
absl::Now() + (absl::Seconds(5) * grpc_test_slowdown_factor());
while (true) {
Status status = SendRpc(stub);
EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
if (status.error_message() ==
"connections to all backends failing; last error: "
"UNKNOWN: Survey says... Bzzzzt!") {
break;
}
EXPECT_THAT(
status.error_message(),
::testing::MatchesRegex(
"connections to all backends failing; last error: "
"(UNKNOWN: Failed to connect to remote host: Connection refused|"
"UNAVAILABLE: Failed to connect to remote host: FD shutdown)"));
EXPECT_LT(absl::Now(), deadline);
if (absl::Now() >= deadline) break;
}
// Clean up.
hold2->Resume();
}
TEST_F(RoundRobinTest, DoesNotFailRpcsUponDisconnection) {
// Start connection injector.
ConnectionInjector injector;

Loading…
Cancel
Save