client_lb_end2end_test: refactor connection injectors and add test for sticky TF (#29993)

* refactor connection injectors in client_lb_end2end_test and add test for sticky TF

* Automated change: Fix sanity tests

Co-authored-by: markdroth <markdroth@users.noreply.github.com>
pull/30001/head^2
Mark D. Roth 2 years ago committed by GitHub
parent d0c8b29ef7
commit 7976501534
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 438
      test/cpp/end2end/client_lb_end2end_test.cc

@ -83,6 +83,121 @@ namespace {
constexpr char kRequestMessage[] = "Live long and prosper.";
// A connection attempt injector that allows us to control timing of
// connection attempts.
class ConnectionInjector : public ConnectionAttemptInjector {
private:
grpc_core::Mutex mu_; // Needs to be declared up front.
public:
class Hold {
public:
Hold(ConnectionInjector* injector, int port, bool intercept_completion)
: injector_(injector),
port_(port),
intercept_completion_(intercept_completion) {}
void Wait() {
gpr_log(GPR_INFO,
"=== WAITING FOR CONNECTION ATTEMPT ON PORT %d ===", port_);
grpc_core::MutexLock lock(&injector_->mu_);
while (queued_attempt_ == nullptr) {
start_cv_.Wait(&injector_->mu_);
}
gpr_log(GPR_INFO, "=== CONNECTION ATTEMPT STARTED ON PORT %d ===", port_);
}
void Resume() {
gpr_log(GPR_INFO,
"=== RESUMING CONNECTION ATTEMPT ON PORT %d ===", port_);
grpc_core::ExecCtx exec_ctx;
std::unique_ptr<QueuedAttempt> attempt;
{
grpc_core::MutexLock lock(&injector_->mu_);
attempt = std::move(queued_attempt_);
}
attempt->Resume();
}
void WaitForCompletion() {
gpr_log(GPR_INFO,
"=== WAITING FOR CONNECTION COMPLETION ON PORT %d ===", port_);
grpc_core::MutexLock lock(&injector_->mu_);
while (original_on_complete_ != nullptr) {
complete_cv_.Wait(&injector_->mu_);
}
gpr_log(GPR_INFO, "=== CONNECTION COMPLETED ON PORT %d ===", port_);
}
private:
friend class ConnectionInjector;
static void OnComplete(void* arg, grpc_error_handle error) {
auto* self = static_cast<Hold*>(arg);
grpc_closure* on_complete;
{
grpc_core::MutexLock lock(&self->injector_->mu_);
on_complete = self->original_on_complete_;
self->original_on_complete_ = nullptr;
self->complete_cv_.Signal();
}
grpc_core::Closure::Run(DEBUG_LOCATION, on_complete,
GRPC_ERROR_REF(error));
}
ConnectionInjector* injector_;
const int port_;
const bool intercept_completion_;
std::unique_ptr<QueuedAttempt> queued_attempt_
ABSL_GUARDED_BY(&ConnectionInjector::mu_);
grpc_core::CondVar start_cv_;
grpc_closure on_complete_;
grpc_closure* original_on_complete_;
grpc_core::CondVar complete_cv_;
};
std::unique_ptr<Hold> AddHold(int port, bool intercept_completion = false) {
grpc_core::MutexLock lock(&mu_);
auto hold = absl::make_unique<Hold>(this, port, intercept_completion);
holds_.push_back(hold.get());
return hold;
}
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override {
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): port=%d", port);
{
grpc_core::MutexLock lock(&mu_);
for (auto it = holds_.begin(); it != holds_.end(); ++it) {
Hold* hold = *it;
if (port == hold->port_) {
gpr_log(GPR_INFO, "*** INTERCEPTING CONNECTION ATTEMPT");
if (hold->intercept_completion_) {
hold->original_on_complete_ = closure;
closure = GRPC_CLOSURE_INIT(&hold->on_complete_, Hold::OnComplete,
hold, nullptr);
}
hold->queued_attempt_ = absl::make_unique<QueuedAttempt>(
closure, ep, interested_parties, channel_args, addr, deadline);
hold->start_cv_.Signal();
holds_.erase(it);
return;
}
}
}
// Anything we're not holding should proceed normally.
AttemptConnection(closure, ep, interested_parties, channel_args, addr,
deadline);
}
private:
std::vector<Hold*> holds_;
};
// Subclass of TestServiceImpl that increments a request counter for
// every call to the Echo RPC.
class MyTestServiceImpl : public TestServiceImpl {
@ -358,7 +473,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
EXPECT_EQ(expected_status, status.error_code())
<< location.file() << ":" << location.line();
EXPECT_THAT(status.error_message(),
::testing::ContainsRegex(expected_message_regex))
::testing::MatchesRegex(expected_message_regex))
<< location.file() << ":" << location.line();
}
@ -748,69 +863,11 @@ TEST_F(PickFirstTest, ResetConnectionBackoff) {
TEST_F(ClientLbEnd2endTest,
ResetConnectionBackoffNextAttemptStartsImmediately) {
// A connection attempt injector that allows us to control timing of a
// connection attempt.
class ConnectionInjector : public ConnectionAttemptInjector {
public:
explicit ConnectionInjector(int port) : port_(port) {}
void InterceptNextAttempt(grpc_core::CondVar* cv) {
grpc_core::MutexLock lock(&mu_);
cv_ = cv;
}
void WaitForAttemptToStart(grpc_core::CondVar* cv) {
grpc_core::MutexLock lock(&mu_);
while (queued_attempt_ == nullptr) {
cv->Wait(&mu_);
}
}
void ResumeAttempt() {
grpc_core::ExecCtx exec_ctx;
std::unique_ptr<QueuedAttempt> attempt;
{
grpc_core::MutexLock lock(&mu_);
attempt = std::move(queued_attempt_);
}
attempt->Resume();
}
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override {
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): port=%d", port);
if (port == port_) {
grpc_core::MutexLock lock(&mu_);
if (cv_ != nullptr) {
gpr_log(GPR_INFO, "*** INTERCEPTING CONNECTION ATTEMPT");
GPR_ASSERT(queued_attempt_ == nullptr);
queued_attempt_ = absl::make_unique<QueuedAttempt>(
closure, ep, interested_parties, channel_args, addr, deadline);
cv_->Signal();
cv_ = nullptr;
return;
}
}
AttemptConnection(closure, ep, interested_parties, channel_args, addr,
deadline);
}
private:
const int port_;
grpc_core::Mutex mu_;
grpc_core::CondVar* cv_ = nullptr;
std::unique_ptr<QueuedAttempt> queued_attempt_ ABSL_GUARDED_BY(mu_);
};
// Get an unused port and start connection injector.
const int port = grpc_pick_unused_port_or_die();
ConnectionInjector injector(port);
// Start connection injector.
ConnectionInjector injector;
injector.Start();
// Create client.
const int port = grpc_pick_unused_port_or_die();
ChannelArguments args;
const int kInitialBackOffMs = 5000 * grpc_test_slowdown_factor();
args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
@ -819,11 +876,10 @@ TEST_F(ClientLbEnd2endTest,
auto stub = BuildStub(channel);
response_generator.SetNextResolution({port});
// Intercept initial connection attempt.
grpc_core::CondVar cv1;
injector.InterceptNextAttempt(&cv1);
auto hold1 = injector.AddHold(port);
gpr_log(GPR_INFO, "=== TRIGGERING INITIAL CONNECTION ATTEMPT");
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(/*try_to_connect=*/true));
injector.WaitForAttemptToStart(&cv1);
hold1->Wait();
EXPECT_EQ(GRPC_CHANNEL_CONNECTING,
channel->GetState(/*try_to_connect=*/false));
// Reset backoff.
@ -831,20 +887,19 @@ TEST_F(ClientLbEnd2endTest,
experimental::ChannelResetConnectionBackoff(channel.get());
// Intercept next attempt. Do this before resuming the first attempt,
// just in case the client makes progress faster than this thread.
grpc_core::CondVar cv2;
injector.InterceptNextAttempt(&cv2);
auto hold2 = injector.AddHold(port);
// Fail current attempt and wait for next one to start.
gpr_log(GPR_INFO, "=== RESUMING INITIAL ATTEMPT");
const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
injector.ResumeAttempt();
hold1->Resume();
gpr_log(GPR_INFO, "=== WAITING FOR SECOND ATTEMPT");
// This WaitForStateChange() call just makes sure we're doing some polling.
EXPECT_TRUE(channel->WaitForStateChange(GRPC_CHANNEL_CONNECTING,
grpc_timeout_seconds_to_deadline(1)));
injector.WaitForAttemptToStart(&cv2);
hold2->Wait();
const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_log(GPR_INFO, "=== RESUMING SECOND ATTEMPT");
injector.ResumeAttempt();
hold2->Resume();
// Elapsed time should be very short, much less than kInitialBackOffMs.
const grpc_core::Duration waited =
grpc_core::Duration::FromTimespec(gpr_time_sub(t1, t0));
@ -855,86 +910,6 @@ TEST_F(ClientLbEnd2endTest,
TEST_F(
PickFirstTest,
TriesAllSubchannelsBeforeReportingTransientFailureWithSubchannelSharing) {
// A connection attempt injector that allows us to control timing of
// connection attempts.
class ConnectionInjector : public ConnectionAttemptInjector {
private:
grpc_core::Mutex mu_; // Needs to be declared up front.
public:
class Hold {
public:
Hold(ConnectionInjector* injector, int port)
: injector_(injector), port_(port) {}
int port() const { return port_; }
void set_queued_attempt(std::unique_ptr<QueuedAttempt> queued_attempt)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ConnectionInjector::mu_) {
queued_attempt_ = std::move(queued_attempt);
cv_.Signal();
}
void Wait() {
grpc_core::MutexLock lock(&injector_->mu_);
while (queued_attempt_ == nullptr) {
cv_.Wait(&injector_->mu_);
}
}
void Resume() {
grpc_core::ExecCtx exec_ctx;
std::unique_ptr<QueuedAttempt> attempt;
{
grpc_core::MutexLock lock(&injector_->mu_);
attempt = std::move(queued_attempt_);
}
attempt->Resume();
}
private:
ConnectionInjector* injector_;
const int port_;
std::unique_ptr<QueuedAttempt> queued_attempt_
ABSL_GUARDED_BY(&ConnectionInjector::mu_);
grpc_core::CondVar cv_;
};
std::unique_ptr<Hold> AddHold(int port) {
grpc_core::MutexLock lock(&mu_);
auto hold = absl::make_unique<Hold>(this, port);
holds_.push_back(hold.get());
return hold;
}
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override {
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): port=%d", port);
{
grpc_core::MutexLock lock(&mu_);
for (auto it = holds_.begin(); it != holds_.end(); ++it) {
Hold* hold = *it;
if (port == hold->port()) {
gpr_log(GPR_INFO, "*** INTERCEPTING CONNECTION ATTEMPT");
hold->set_queued_attempt(absl::make_unique<QueuedAttempt>(
closure, ep, interested_parties, channel_args, addr, deadline));
holds_.erase(it);
return;
}
}
}
// Anything we're not holding should proceed normally.
AttemptConnection(closure, ep, interested_parties, channel_args, addr,
deadline);
}
private:
std::vector<Hold*> holds_;
};
// Start connection injector.
ConnectionInjector injector;
injector.Start();
@ -1680,15 +1655,6 @@ TEST_F(RoundRobinTest, TransientFailure) {
response_generator.SetNextResolution(GetServersPorts());
EXPECT_TRUE(WaitForChannelReady(channel.get()));
// Now kill the servers. The channel should transition to TRANSIENT_FAILURE.
// TODO(roth): This test should ideally check that even when the
// subchannels are in state CONNECTING for an extended period of time,
// we will still report TRANSIENT_FAILURE. Unfortunately, we don't
// currently have a good way to get a subchannel to report CONNECTING
// for a long period of time, since the servers in this test framework
// are on the loopback interface, which will immediately return a
// "Connection refused" error, so the subchannels will only be in
// CONNECTING state very briefly. When we have time, see if we can
// find a way to fix this.
for (size_t i = 0; i < servers_.size(); ++i) {
servers_[i]->Shutdown();
}
@ -1696,20 +1662,16 @@ TEST_F(RoundRobinTest, TransientFailure) {
return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
};
EXPECT_TRUE(WaitForChannelState(channel.get(), predicate));
CheckRpcSendFailure(
DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
"connections to all backends failing; last error: "
"(UNKNOWN: Failed to connect to remote host: Connection refused|"
"UNAVAILABLE: Failed to connect to remote host: FD shutdown)");
}
TEST_F(RoundRobinTest, TransientFailureAtStartup) {
// Create channel and return servers that don't exist. Channel should
// quickly transition into TRANSIENT_FAILURE.
// TODO(roth): This test should ideally check that even when the
// subchannels are in state CONNECTING for an extended period of time,
// we will still report TRANSIENT_FAILURE. Unfortunately, we don't
// currently have a good way to get a subchannel to report CONNECTING
// for a long period of time, since the servers in this test framework
// are on the loopback interface, which will immediately return a
// "Connection refused" error, so the subchannels will only be in
// CONNECTING state very briefly. When we have time, see if we can
// find a way to fix this.
auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel);
@ -1725,94 +1687,59 @@ TEST_F(RoundRobinTest, TransientFailureAtStartup) {
return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
};
EXPECT_TRUE(WaitForChannelState(channel.get(), predicate, true));
CheckRpcSendFailure(
DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
"connections to all backends failing; last error: "
"(UNKNOWN: Failed to connect to remote host: Connection refused|"
"UNAVAILABLE: Failed to connect to remote host: FD shutdown)");
}
TEST_F(RoundRobinTest, DoesNotFailRpcsUponDisconnection) {
// A connection attempt injector that allows us to control timing.
class ConnectionInjector : public ConnectionAttemptInjector {
public:
explicit ConnectionInjector(int port) : port_(port) {}
void InterceptNextAttempt() {
grpc_core::MutexLock lock(&mu_);
intercept_next_attempt_ = true;
}
void WaitForAttemptToStart() {
grpc_core::MutexLock lock(&mu_);
while (queued_attempt_ == nullptr) {
start_cond_.Wait(&mu_);
}
}
void ResumeAttempt() {
grpc_core::ExecCtx exec_ctx;
std::unique_ptr<QueuedAttempt> attempt;
{
grpc_core::MutexLock lock(&mu_);
attempt = std::move(queued_attempt_);
}
attempt->Resume();
}
void WaitForAttemptComplete() {
grpc_core::MutexLock lock(&mu_);
while (!attempt_complete_) {
complete_cond_.Wait(&mu_);
}
}
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override {
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): port=%d", port);
if (port == port_) {
grpc_core::MutexLock lock(&mu_);
if (intercept_next_attempt_) {
gpr_log(GPR_INFO, "*** INTERCEPTING CONNECTION ATTEMPT");
original_closure_ = closure;
closure = GRPC_CLOSURE_INIT(&closure_, OnComplete, this, nullptr);
intercept_next_attempt_ = false;
queued_attempt_ = absl::make_unique<QueuedAttempt>(
closure, ep, interested_parties, channel_args, addr, deadline);
start_cond_.Signal();
return;
}
}
AttemptConnection(closure, ep, interested_parties, channel_args, addr,
deadline);
}
private:
static void OnComplete(void* arg, grpc_error_handle error) {
auto* self = static_cast<ConnectionInjector*>(arg);
{
grpc_core::MutexLock lock(&self->mu_);
self->attempt_complete_ = true;
self->complete_cond_.Signal();
}
grpc_core::Closure::Run(DEBUG_LOCATION, self->original_closure_,
GRPC_ERROR_REF(error));
}
const int port_;
grpc_core::Mutex mu_;
bool intercept_next_attempt_ ABSL_GUARDED_BY(mu_) = false;
grpc_core::CondVar start_cond_;
std::unique_ptr<QueuedAttempt> queued_attempt_ ABSL_GUARDED_BY(mu_);
grpc_closure* original_closure_ = nullptr;
grpc_closure closure_;
grpc_core::CondVar complete_cond_;
bool attempt_complete_ ABSL_GUARDED_BY(mu_) = false;
TEST_F(RoundRobinTest, StaysInTransientFailureInSubsequentConnecting) {
// Start connection injector.
ConnectionInjector injector;
injector.Start();
// Get port.
const int port = grpc_pick_unused_port_or_die();
// Create channel.
auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel);
response_generator.SetNextResolution({port});
// Allow first connection attempt to fail normally, and wait for
// channel to report TRANSIENT_FAILURE.
gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL TO REPORT TF ===");
auto predicate = [](grpc_connectivity_state state) {
return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
};
EXPECT_TRUE(
WaitForChannelState(channel.get(), predicate, /*try_to_connect=*/true));
// Wait for next connection attempt to start.
auto hold = injector.AddHold(port);
hold->Wait();
// Now the subchannel should be reporting CONNECTING. Make sure the
// channel is still in TRANSIENT_FAILURE and is still reporting the
// right status.
EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel->GetState(false));
// Send a few RPCs, just to give the channel a chance to propagate a
// new picker, in case it was going to incorrectly do so.
gpr_log(GPR_INFO, "=== EXPECTING RPCs TO FAIL ===");
for (size_t i = 0; i < 5; ++i) {
CheckRpcSendFailure(
DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
"connections to all backends failing; last error: "
"(UNKNOWN: Failed to connect to remote host: Connection refused|"
"UNAVAILABLE: Failed to connect to remote host: FD shutdown)");
}
// Clean up.
hold->Resume();
}
TEST_F(RoundRobinTest, DoesNotFailRpcsUponDisconnection) {
// Start connection injector.
ConnectionInjector injector;
injector.Start();
// Start server.
StartServers(1);
ConnectionInjector injector(servers_[0]->port_);
injector.Start();
// Create channel.
auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("round_robin", response_generator);
@ -1839,7 +1766,8 @@ TEST_F(RoundRobinTest, DoesNotFailRpcsUponDisconnection) {
// Channel should now be READY.
ASSERT_EQ(GRPC_CHANNEL_READY, channel->GetState(false));
// Tell injector to intercept the next connection attempt.
injector.InterceptNextAttempt();
auto hold1 =
injector.AddHold(servers_[0]->port_, /*intercept_completion=*/true);
// Now kill the server. The subchannel should report IDLE and be
// immediately reconnected to, but this should not cause any test
// failures.
@ -1852,14 +1780,14 @@ TEST_F(RoundRobinTest, DoesNotFailRpcsUponDisconnection) {
servers_[0]->Shutdown();
// Wait for next attempt to start.
gpr_log(GPR_ERROR, "=== WAITING FOR RECONNECTION ATTEMPT ===");
injector.WaitForAttemptToStart();
hold1->Wait();
// Start server and allow attempt to continue.
gpr_log(GPR_ERROR, "=== RESTARTING SERVER ===");
StartServer(0);
injector.ResumeAttempt();
hold1->Resume();
// Wait for next attempt to complete.
gpr_log(GPR_ERROR, "=== WAITING FOR RECONNECTION ATTEMPT TO COMPLETE ===");
injector.WaitForAttemptComplete();
hold1->WaitForCompletion();
// Now shut down the thread.
gpr_log(GPR_ERROR, "=== SHUTTING DOWN CLIENT THREAD ===");
shutdown.store(true);

Loading…
Cancel
Save