subchannel: fix reset backoff when connection attempt is in flight (#29792)

* subchannel: fix reset backoff when connection attempt is in flight

* clang-tidy
pull/29802/head
Mark D. Roth 3 years ago committed by GitHub
parent c34a006801
commit 9c23d7999c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/core/ext/filters/client_channel/subchannel.cc
  2. 124
      test/cpp/end2end/client_lb_end2end_test.cc

@ -806,6 +806,8 @@ void Subchannel::ResetBackoff() {
backoff_.Reset();
if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) {
grpc_timer_cancel(&retry_timer_);
} else if (state_ == GRPC_CHANNEL_CONNECTING) {
next_attempt_time_ = ExecCtx::Get()->Now();
}
}

@ -733,50 +733,108 @@ TEST_F(PickFirstTest, ResetConnectionBackoff) {
TEST_F(ClientLbEnd2endTest,
ResetConnectionBackoffNextAttemptStartsImmediately) {
// A connection attempt injector that allows us to control timing of a
// connection attempt.
class ConnectionInjector : public ConnectionAttemptInjector {
public:
explicit ConnectionInjector(int port) : port_(port) {}
void InterceptNextAttempt(grpc_core::CondVar* cv) {
grpc_core::MutexLock lock(&mu_);
cv_ = cv;
}
void WaitForAttemptToStart(grpc_core::CondVar* cv) {
grpc_core::MutexLock lock(&mu_);
while (queued_attempt_ == nullptr) {
cv->Wait(&mu_);
}
}
void ResumeAttempt() {
grpc_core::ExecCtx exec_ctx;
std::unique_ptr<QueuedAttempt> attempt;
{
grpc_core::MutexLock lock(&mu_);
attempt = std::move(queued_attempt_);
}
attempt->Resume();
}
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override {
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): port=%d", port);
if (port == port_) {
grpc_core::MutexLock lock(&mu_);
if (cv_ != nullptr) {
gpr_log(GPR_INFO, "*** INTERCEPTING CONNECTION ATTEMPT");
GPR_ASSERT(queued_attempt_ == nullptr);
queued_attempt_ = absl::make_unique<QueuedAttempt>(
closure, ep, interested_parties, channel_args, addr, deadline);
cv_->Signal();
cv_ = nullptr;
return;
}
}
AttemptConnection(closure, ep, interested_parties, channel_args, addr,
deadline);
}
private:
const int port_;
grpc_core::Mutex mu_;
grpc_core::CondVar* cv_ = nullptr;
std::unique_ptr<QueuedAttempt> queued_attempt_ ABSL_GUARDED_BY(mu_);
};
// Get an unused port and start connection injector.
const int port = grpc_pick_unused_port_or_die();
ConnectionInjector injector(port);
injector.Start();
// Create client.
ChannelArguments args;
constexpr int kInitialBackOffMs = 1000;
const int kInitialBackOffMs = 5000 * grpc_test_slowdown_factor();
args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("pick_first", response_generator, args);
auto stub = BuildStub(channel);
response_generator.SetNextResolution(ports);
// Wait for connect, which should fail ~immediately, because the server
// is not up.
gpr_log(GPR_INFO, "=== INITIAL CONNECTION ATTEMPT");
EXPECT_FALSE(
channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
// Reset connection backoff.
// Note that the time at which the third attempt will be started is
// actually computed at this point, so we record the start time here.
response_generator.SetNextResolution({port});
// Intercept initial connection attempt.
grpc_core::CondVar cv1;
injector.InterceptNextAttempt(&cv1);
gpr_log(GPR_INFO, "=== TRIGGERING INITIAL CONNECTION ATTEMPT");
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(/*try_to_connect=*/true));
injector.WaitForAttemptToStart(&cv1);
EXPECT_EQ(GRPC_CHANNEL_CONNECTING,
channel->GetState(/*try_to_connect=*/false));
// Reset backoff.
gpr_log(GPR_INFO, "=== RESETTING BACKOFF");
const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
experimental::ChannelResetConnectionBackoff(channel.get());
// Trigger a second connection attempt. This should also fail
// ~immediately, but the retry should be scheduled for
// kInitialBackOffMs instead of applying the multiplier.
gpr_log(GPR_INFO, "=== POLLING FOR SECOND CONNECTION ATTEMPT");
EXPECT_FALSE(
channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
// Bring up a server on the chosen port.
gpr_log(GPR_INFO, "=== STARTING BACKEND");
StartServers(1, ports);
// Wait for connect. Should happen within kInitialBackOffMs.
// Give an extra 100ms to account for the time spent in the second and
// third connection attempts themselves (since what we really want to
// measure is the time between the two). As long as this is less than
// the 1.6x increase we would see if the backoff state was not reset
// properly, the test is still proving that the backoff was reset.
constexpr int kWaitMs = kInitialBackOffMs + 100;
gpr_log(GPR_INFO, "=== POLLING FOR THIRD CONNECTION ATTEMPT");
EXPECT_TRUE(channel->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kWaitMs)));
// Intercept next attempt. Do this before resuming the first attempt,
// just in case the client makes progress faster than this thread.
grpc_core::CondVar cv2;
injector.InterceptNextAttempt(&cv2);
// Fail current attempt and wait for next one to start.
gpr_log(GPR_INFO, "=== RESUMING INITIAL ATTEMPT");
const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
injector.ResumeAttempt();
gpr_log(GPR_INFO, "=== WAITING FOR SECOND ATTEMPT");
// This WaitForStateChange() call just makes sure we're doing some polling.
EXPECT_TRUE(channel->WaitForStateChange(GRPC_CHANNEL_CONNECTING,
grpc_timeout_seconds_to_deadline(1)));
injector.WaitForAttemptToStart(&cv2);
const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_log(GPR_INFO, "=== RESUMING SECOND ATTEMPT");
injector.ResumeAttempt();
// Elapsed time should be very short, much less than kInitialBackOffMs.
const grpc_core::Duration waited =
grpc_core::Duration::FromTimespec(gpr_time_sub(t1, t0));
gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited.millis());
// We should have waited less than kInitialBackOffMs.
EXPECT_LT(waited.millis(), kWaitMs);
EXPECT_LT(waited.millis(), 1000 * grpc_test_slowdown_factor());
}
TEST_F(PickFirstTest, Updates) {

Loading…
Cancel
Save