C++ end2end tests: refactor ConnectionAttemptInjector code (#30148)

* C++ end2end tests: refactor ConnectionAttemptInjector code

* clang-format
pull/30149/head^2
Mark D. Roth 3 years ago committed by GitHub
parent b1b59eb508
commit d379e811be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 136
      test/cpp/end2end/client_lb_end2end_test.cc
  2. 112
      test/cpp/end2end/connection_delay_injector.cc
  3. 62
      test/cpp/end2end/connection_delay_injector.h
  4. 120
      test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc
  5. 260
      test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc

@ -83,132 +83,6 @@ namespace {
constexpr char kRequestMessage[] = "Live long and prosper.";
// A connection attempt injector that allows us to control timing of
// connection attempts.
class ConnectionInjector : public ConnectionAttemptInjector {
private:
grpc_core::Mutex mu_; // Needs to be declared up front.
public:
class Hold {
public:
Hold(ConnectionInjector* injector, int port, bool intercept_completion)
: injector_(injector),
port_(port),
intercept_completion_(intercept_completion) {}
void Wait() {
gpr_log(GPR_INFO,
"=== WAITING FOR CONNECTION ATTEMPT ON PORT %d ===", port_);
grpc_core::MutexLock lock(&injector_->mu_);
while (queued_attempt_ == nullptr) {
start_cv_.Wait(&injector_->mu_);
}
gpr_log(GPR_INFO, "=== CONNECTION ATTEMPT STARTED ON PORT %d ===", port_);
}
void Resume() {
gpr_log(GPR_INFO,
"=== RESUMING CONNECTION ATTEMPT ON PORT %d ===", port_);
grpc_core::ExecCtx exec_ctx;
std::unique_ptr<QueuedAttempt> attempt;
{
grpc_core::MutexLock lock(&injector_->mu_);
attempt = std::move(queued_attempt_);
}
attempt->Resume();
}
void Fail(grpc_error_handle error) {
gpr_log(GPR_INFO, "=== FAILING CONNECTION ATTEMPT ON PORT %d ===", port_);
grpc_core::ExecCtx exec_ctx;
std::unique_ptr<QueuedAttempt> attempt;
{
grpc_core::MutexLock lock(&injector_->mu_);
attempt = std::move(queued_attempt_);
}
attempt->Fail(error);
}
void WaitForCompletion() {
gpr_log(GPR_INFO,
"=== WAITING FOR CONNECTION COMPLETION ON PORT %d ===", port_);
grpc_core::MutexLock lock(&injector_->mu_);
while (original_on_complete_ != nullptr) {
complete_cv_.Wait(&injector_->mu_);
}
gpr_log(GPR_INFO, "=== CONNECTION COMPLETED ON PORT %d ===", port_);
}
private:
friend class ConnectionInjector;
static void OnComplete(void* arg, grpc_error_handle error) {
auto* self = static_cast<Hold*>(arg);
grpc_closure* on_complete;
{
grpc_core::MutexLock lock(&self->injector_->mu_);
on_complete = self->original_on_complete_;
self->original_on_complete_ = nullptr;
self->complete_cv_.Signal();
}
grpc_core::Closure::Run(DEBUG_LOCATION, on_complete,
GRPC_ERROR_REF(error));
}
ConnectionInjector* injector_;
const int port_;
const bool intercept_completion_;
std::unique_ptr<QueuedAttempt> queued_attempt_
ABSL_GUARDED_BY(&ConnectionInjector::mu_);
grpc_core::CondVar start_cv_;
grpc_closure on_complete_;
grpc_closure* original_on_complete_;
grpc_core::CondVar complete_cv_;
};
std::unique_ptr<Hold> AddHold(int port, bool intercept_completion = false) {
grpc_core::MutexLock lock(&mu_);
auto hold = absl::make_unique<Hold>(this, port, intercept_completion);
holds_.push_back(hold.get());
return hold;
}
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override {
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): port=%d", port);
{
grpc_core::MutexLock lock(&mu_);
for (auto it = holds_.begin(); it != holds_.end(); ++it) {
Hold* hold = *it;
if (port == hold->port_) {
gpr_log(GPR_INFO, "*** INTERCEPTING CONNECTION ATTEMPT");
if (hold->intercept_completion_) {
hold->original_on_complete_ = closure;
closure = GRPC_CLOSURE_INIT(&hold->on_complete_, Hold::OnComplete,
hold, nullptr);
}
hold->queued_attempt_ = absl::make_unique<QueuedAttempt>(
closure, ep, interested_parties, channel_args, addr, deadline);
hold->start_cv_.Signal();
holds_.erase(it);
return;
}
}
}
// Anything we're not holding should proceed normally.
AttemptConnection(closure, ep, interested_parties, channel_args, addr,
deadline);
}
private:
std::vector<Hold*> holds_;
};
// Subclass of TestServiceImpl that increments a request counter for
// every call to the Echo RPC.
class MyTestServiceImpl : public TestServiceImpl {
@ -875,7 +749,7 @@ TEST_F(PickFirstTest, ResetConnectionBackoff) {
TEST_F(ClientLbEnd2endTest,
ResetConnectionBackoffNextAttemptStartsImmediately) {
// Start connection injector.
ConnectionInjector injector;
ConnectionHoldInjector injector;
injector.Start();
// Create client.
const int port = grpc_pick_unused_port_or_die();
@ -922,7 +796,7 @@ TEST_F(
PickFirstTest,
TriesAllSubchannelsBeforeReportingTransientFailureWithSubchannelSharing) {
// Start connection injector.
ConnectionInjector injector;
ConnectionHoldInjector injector;
injector.Start();
// Get 5 unused ports. Each channel will have 2 unique ports followed
// by a common port.
@ -1707,7 +1581,7 @@ TEST_F(RoundRobinTest, TransientFailureAtStartup) {
TEST_F(RoundRobinTest, StaysInTransientFailureInSubsequentConnecting) {
// Start connection injector.
ConnectionInjector injector;
ConnectionHoldInjector injector;
injector.Start();
// Get port.
const int port = grpc_pick_unused_port_or_die();
@ -1747,7 +1621,7 @@ TEST_F(RoundRobinTest, StaysInTransientFailureInSubsequentConnecting) {
TEST_F(RoundRobinTest, ReportsLatestStatusInTransientFailure) {
// Start connection injector.
ConnectionInjector injector;
ConnectionHoldInjector injector;
injector.Start();
// Get port.
const std::vector<int> ports = {grpc_pick_unused_port_or_die(),
@ -1798,7 +1672,7 @@ TEST_F(RoundRobinTest, ReportsLatestStatusInTransientFailure) {
TEST_F(RoundRobinTest, DoesNotFailRpcsUponDisconnection) {
// Start connection injector.
ConnectionInjector injector;
ConnectionHoldInjector injector;
injector.Start();
// Start server.
StartServers(1);

@ -19,6 +19,7 @@
#include "absl/memory/memory.h"
#include "absl/utility/utility.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/gprpp/sync.h"
// defined in tcp_client.cc
@ -129,5 +130,116 @@ void ConnectionDelayInjector::HandleConnection(
addr, deadline);
}
//
// ConnectionHoldInjector::Hold
//
ConnectionHoldInjector::Hold::Hold(ConnectionHoldInjector* injector, int port,
bool intercept_completion)
: injector_(injector),
port_(port),
intercept_completion_(intercept_completion) {}
void ConnectionHoldInjector::Hold::Wait() {
gpr_log(GPR_INFO, "=== WAITING FOR CONNECTION ATTEMPT ON PORT %d ===", port_);
grpc_core::MutexLock lock(&injector_->mu_);
while (queued_attempt_ == nullptr) {
start_cv_.Wait(&injector_->mu_);
}
gpr_log(GPR_INFO, "=== CONNECTION ATTEMPT STARTED ON PORT %d ===", port_);
}
void ConnectionHoldInjector::Hold::Resume() {
gpr_log(GPR_INFO, "=== RESUMING CONNECTION ATTEMPT ON PORT %d ===", port_);
grpc_core::ExecCtx exec_ctx;
std::unique_ptr<QueuedAttempt> attempt;
{
grpc_core::MutexLock lock(&injector_->mu_);
attempt = std::move(queued_attempt_);
}
attempt->Resume();
}
void ConnectionHoldInjector::Hold::Fail(grpc_error_handle error) {
gpr_log(GPR_INFO, "=== FAILING CONNECTION ATTEMPT ON PORT %d ===", port_);
grpc_core::ExecCtx exec_ctx;
std::unique_ptr<QueuedAttempt> attempt;
{
grpc_core::MutexLock lock(&injector_->mu_);
attempt = std::move(queued_attempt_);
}
attempt->Fail(error);
}
void ConnectionHoldInjector::Hold::WaitForCompletion() {
gpr_log(GPR_INFO,
"=== WAITING FOR CONNECTION COMPLETION ON PORT %d ===", port_);
grpc_core::MutexLock lock(&injector_->mu_);
while (original_on_complete_ != nullptr) {
complete_cv_.Wait(&injector_->mu_);
}
gpr_log(GPR_INFO, "=== CONNECTION COMPLETED ON PORT %d ===", port_);
}
bool ConnectionHoldInjector::Hold::IsStarted() {
grpc_core::MutexLock lock(&injector_->mu_);
return !start_cv_.WaitWithDeadline(&injector_->mu_, absl::Now());
}
void ConnectionHoldInjector::Hold::OnComplete(void* arg,
grpc_error_handle error) {
auto* self = static_cast<Hold*>(arg);
grpc_closure* on_complete;
{
grpc_core::MutexLock lock(&self->injector_->mu_);
on_complete = self->original_on_complete_;
self->original_on_complete_ = nullptr;
self->complete_cv_.Signal();
}
grpc_core::Closure::Run(DEBUG_LOCATION, on_complete, GRPC_ERROR_REF(error));
}
//
// ConnectionHoldInjector
//
std::unique_ptr<ConnectionHoldInjector::Hold> ConnectionHoldInjector::AddHold(
int port, bool intercept_completion) {
grpc_core::MutexLock lock(&mu_);
auto hold = absl::make_unique<Hold>(this, port, intercept_completion);
holds_.push_back(hold.get());
return hold;
}
void ConnectionHoldInjector::HandleConnection(
grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline) {
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): port=%d", port);
{
grpc_core::MutexLock lock(&mu_);
for (auto it = holds_.begin(); it != holds_.end(); ++it) {
Hold* hold = *it;
if (port == hold->port_) {
gpr_log(GPR_INFO, "*** INTERCEPTING CONNECTION ATTEMPT");
if (hold->intercept_completion_) {
hold->original_on_complete_ = closure;
closure = GRPC_CLOSURE_INIT(&hold->on_complete_, Hold::OnComplete,
hold, nullptr);
}
hold->queued_attempt_ = absl::make_unique<QueuedAttempt>(
closure, ep, interested_parties, channel_args, addr, deadline);
hold->start_cv_.Signal();
holds_.erase(it);
return;
}
}
}
// Anything we're not holding should proceed normally.
AttemptConnection(closure, ep, interested_parties, channel_args, addr,
deadline);
}
} // namespace testing
} // namespace grpc

@ -152,6 +152,68 @@ class ConnectionDelayInjector : public ConnectionAttemptInjector {
grpc_core::Duration duration_;
};
// A concrete implementation that allows injecting holds for individual
// connection attemps, one at a time.
class ConnectionHoldInjector : public ConnectionAttemptInjector {
private:
grpc_core::Mutex mu_; // Needs to be declared up front.
public:
class Hold {
public:
// Do not instantiate directly -- must be created via AddHold().
Hold(ConnectionHoldInjector* injector, int port, bool intercept_completion);
// Waits for the connection attempt to start.
// After this returns, exactly one of Resume() or Fail() must be called.
void Wait();
// Resumes a connection attempt. Must be called after Wait().
void Resume();
// Fails a connection attempt. Must be called after Wait().
void Fail(grpc_error_handle error);
// If the hold was created with intercept_completion=true, then this
// can be called after Resume() to wait for the connection attempt
// to complete.
void WaitForCompletion();
// Returns true if the connection attempt has been started.
bool IsStarted();
private:
friend class ConnectionHoldInjector;
static void OnComplete(void* arg, grpc_error_handle error);
ConnectionHoldInjector* injector_;
const int port_;
const bool intercept_completion_;
std::unique_ptr<QueuedAttempt> queued_attempt_
ABSL_GUARDED_BY(&ConnectionHoldInjector::mu_);
grpc_core::CondVar start_cv_;
grpc_closure on_complete_;
grpc_closure* original_on_complete_;
grpc_core::CondVar complete_cv_;
};
// Adds a hold for a given port. The caller may then use Wait() on
// the resulting Hold object to wait for the connection attempt to start.
// If intercept_completion is true, the caller can use WaitForCompletion()
// on the resulting Hold object.
std::unique_ptr<Hold> AddHold(int port, bool intercept_completion = false);
private:
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override;
std::vector<Hold*> holds_;
};
} // namespace testing
} // namespace grpc

@ -433,104 +433,42 @@ TEST_P(AggregateClusterTest, FallBackWithConnectivityChurn) {
cluster_config.add_clusters(kClusterName2);
custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
balancer_->ads_service()->SetCdsResource(cluster);
// This class injects itself into all TCP connection attempts made
// against iomgr. It intercepts the attempts for the P0 and P1
// backends and allows them to proceed as desired to simulate the case
// being tested.
class ConnectionInjector : public ConnectionAttemptInjector {
public:
ConnectionInjector(int p0_port, int p1_port)
: p0_port_(p0_port), p1_port_(p1_port) {}
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override {
{
grpc_core::MutexLock lock(&mu_);
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): state_=%d, port=%d", state_,
port);
switch (state_) {
case kInit:
// Make P0 report TF, which should trigger us to try to connect to
// P1.
if (port == p0_port_) {
gpr_log(GPR_INFO, "*** INJECTING FAILURE FOR P0 ENDPOINT");
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"injected connection failure"));
state_ = kP0Failed;
return;
}
break;
case kP0Failed:
// Hold connection attempt to P1 so that it stays in CONNECTING.
if (port == p1_port_) {
gpr_log(GPR_INFO,
"*** DELAYING CONNECTION ATTEMPT FOR P1 ENDPOINT");
queued_p1_attempt_ = absl::make_unique<QueuedAttempt>(
closure, ep, interested_parties, channel_args, addr,
deadline);
state_ = kDone;
return;
}
break;
case kDone:
// P0 should attempt reconnection. Log it to make the test
// easier to debug, but allow it to complete, so that the
// priority policy deactivates P1.
if (port == p0_port_) {
gpr_log(GPR_INFO,
"*** INTERCEPTING CONNECTION ATTEMPT FOR P0 ENDPOINT");
}
break;
}
}
AttemptConnection(closure, ep, interested_parties, channel_args, addr,
deadline);
}
// Invoked by the test when the RPC to the P0 backend has succeeded
// and it's ready to allow the P1 connection attempt to proceed.
void CompletePriority1Connection() {
grpc_core::ExecCtx exec_ctx;
std::unique_ptr<QueuedAttempt> attempt;
{
grpc_core::MutexLock lock(&mu_);
GPR_ASSERT(state_ == kDone);
attempt = std::move(queued_p1_attempt_);
}
attempt->Resume();
}
private:
const int p0_port_;
const int p1_port_;
grpc_core::Mutex mu_;
enum {
kInit,
kP0Failed,
kDone,
} state_ ABSL_GUARDED_BY(mu_) = kInit;
std::unique_ptr<QueuedAttempt> queued_p1_attempt_ ABSL_GUARDED_BY(mu_);
};
ConnectionInjector connection_attempt_injector(backends_[0]->port(),
backends_[1]->port());
connection_attempt_injector.Start();
// Wait for P0 backend.
// Start connection injector.
ConnectionHoldInjector injector;
injector.Start();
auto hold0 = injector.AddHold(backends_[0]->port());
auto hold1 = injector.AddHold(backends_[1]->port());
// Start long-running RPC in the background.
// This will trigger the channel to start connecting.
// Increase timeout to account for subchannel connection delays.
WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
WaitForBackendOptions(), RpcOptions().set_timeout_ms(2000));
LongRunningRpc rpc;
rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(2000));
// Tell channel to start connecting.
channel_->GetState(/*try_to_connect=*/true);
// Wait for backend 0 connection attempt to start, then fail it.
hold0->Wait();
hold0->Fail(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("injected connection failure"));
// The channel should trigger a connection attempt for backend 1 now,
// but we've added a hold for that, so it will not complete yet.
// Meanwhile, the channel will also start a second attempt for backend
// 0, which we have NOT held, so it will complete normally, and the
// RPC will finish on backend 0.
gpr_log(GPR_INFO, "=== WAITING FOR RPC TO FINISH === ");
Status status = rpc.GetStatus();
gpr_log(GPR_INFO, "=== RPC FINISHED === ");
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(1UL, backends_[0]->backend_service()->request_count());
// Wait for backend 1 connection attempt to start.
hold1->Wait();
// Send GOAWAY from the P0 backend.
// We don't actually shut it down here to avoid flakiness caused by
// failing an RPC after the client has already sent it but before the
// server finished processing it.
backends_[0]->StopListeningAndSendGoaways();
// Allow the connection attempt to the P1 backend to resume.
connection_attempt_injector.CompletePriority1Connection();
hold1->Resume();
// Wait for P1 backend to start getting traffic.
WaitForBackend(DEBUG_LOCATION, 1);
}

@ -681,64 +681,11 @@ TEST_P(RingHashTest, ContinuesConnectingWithoutPicks) {
hash_policy->mutable_header()->set_header_name("address_hash");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
// A connection injector that cancels the RPC after seeing the
// connection attempt for the non-existant endpoint.
class ConnectionInjector : public ConnectionAttemptInjector {
public:
explicit ConnectionInjector(int port) : port_(port) {}
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override {
{
grpc_core::MutexLock lock(&mu_);
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): seen_port_=%d, port=%d",
seen_port_, port);
if (!seen_port_ && port == port_) {
gpr_log(GPR_INFO, "*** SEEN P0 CONNECTION ATTEMPT");
queued_p0_attempt_ = absl::make_unique<QueuedAttempt>(
closure, ep, interested_parties, channel_args, addr, deadline);
seen_port_ = true;
cond_.Signal();
return;
}
}
AttemptConnection(closure, ep, interested_parties, channel_args, addr,
deadline);
}
void WaitForP0ConnectionAttempt() {
grpc_core::MutexLock lock(&mu_);
while (!seen_port_) {
cond_.Wait(&mu_);
}
}
// Invoked by the test when the RPC has been cancelled and it's ready
// to allow the connection attempt to proceed.
void CompleteP0ConnectionAttempt() {
grpc_core::ExecCtx exec_ctx;
std::unique_ptr<QueuedAttempt> attempt;
{
grpc_core::MutexLock lock(&mu_);
attempt = std::move(queued_p0_attempt_);
}
attempt->Resume();
}
private:
const int port_;
grpc_core::Mutex mu_;
grpc_core::CondVar cond_;
bool seen_port_ ABSL_GUARDED_BY(mu_) = false;
std::unique_ptr<QueuedAttempt> queued_p0_attempt_ ABSL_GUARDED_BY(mu_);
};
ConnectionInjector connection_injector(non_existant_endpoint.port);
connection_injector.Start();
// Start connection attempt injector and add a hold for the P0
// connection attempt.
ConnectionHoldInjector injector;
injector.Start();
auto hold = injector.AddHold(non_existant_endpoint.port);
// A long-running RPC, just used to send the RPC in another thread.
LongRunningRpc rpc;
std::vector<std::pair<std::string, std::string>> metadata = {
@ -748,10 +695,10 @@ TEST_P(RingHashTest, ContinuesConnectingWithoutPicks) {
std::move(metadata)));
// Wait for the RPC to trigger the P0 connection attempt, then cancel it,
// and then allow the connection attempt to complete.
connection_injector.WaitForP0ConnectionAttempt();
hold->Wait();
rpc.CancelRpc();
EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code());
connection_injector.CompleteP0ConnectionAttempt();
hold->Resume();
// Wait for channel to become connected without any pending RPC.
EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(5)));
// Make sure the backend did not get any requests.
@ -781,151 +728,13 @@ TEST_P(RingHashTest, ContinuesConnectingWithoutPicksOneSubchannelAtATime) {
hash_policy->mutable_header()->set_header_name("address_hash");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
// A connection injector that ensures that only one subchannel is
// connecting at a time.
class ConnectionInjector : public ConnectionAttemptInjector {
public:
ConnectionInjector(int port0, int port1, int port2, int good_port)
: port0_(port0), port1_(port1), port2_(port2), good_port_(good_port) {}
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override {
{
grpc_core::MutexLock lock(&mu_);
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): state_=%d, port=%d", state_,
port);
switch (state_) {
case kInit:
EXPECT_NE(port, port1_);
EXPECT_NE(port, port2_);
EXPECT_NE(port, good_port_);
if (port == port0_) {
gpr_log(GPR_INFO, "*** DELAYING ENDPOINT 0");
new DelayedAttempt(this, closure, ep, interested_parties,
channel_args, addr, deadline);
state_ = kDelayedEndpoint0;
cond_.Signal();
return;
}
break;
case kResumedEndpoint0:
EXPECT_NE(port, port0_);
EXPECT_NE(port, port2_);
EXPECT_NE(port, good_port_);
if (port == port1_) {
gpr_log(GPR_INFO, "*** DELAYING ENDPOINT 1");
new DelayedAttempt(this, closure, ep, interested_parties,
channel_args, addr, deadline);
state_ = kDelayedEndpoint1;
return;
} else {
gpr_log(GPR_INFO, "*** UNEXPECTED PORT");
}
break;
case kResumedEndpoint1:
EXPECT_NE(port, port0_);
EXPECT_NE(port, port1_);
EXPECT_NE(port, good_port_);
if (port == port2_) {
gpr_log(GPR_INFO, "*** DELAYING ENDPOINT 2");
new DelayedAttempt(this, closure, ep, interested_parties,
channel_args, addr, deadline);
state_ = kDelayedEndpoint2;
return;
} else {
gpr_log(GPR_INFO, "*** UNEXPECTED PORT");
}
break;
case kResumedEndpoint2:
EXPECT_NE(port, port0_);
EXPECT_NE(port, port1_);
EXPECT_NE(port, port2_);
if (port == good_port_) {
gpr_log(GPR_INFO, "*** DONE WITH ALL UNREACHABLE ENDPOINTS");
state_ = kDone;
}
break;
case kDelayedEndpoint0:
case kDelayedEndpoint1:
case kDelayedEndpoint2:
ASSERT_THAT(port, ::testing::AllOf(::testing::Ne(port0_),
::testing::Ne(port1_),
::testing::Ne(port2_),
::testing::Ne(good_port_)))
<< "started second connection attempt in parallel";
break;
case kDone:
break;
}
}
AttemptConnection(closure, ep, interested_parties, channel_args, addr,
deadline);
}
void WaitForFirstPortSeen() {
grpc_core::MutexLock lock(&mu_);
while (state_ == kInit) {
cond_.Wait(&mu_);
}
}
private:
class DelayedAttempt : public InjectedDelay {
public:
DelayedAttempt(ConnectionInjector* parent, grpc_closure* closure,
grpc_endpoint** ep, grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline)
: InjectedDelay(
grpc_core::Duration::Seconds(1 * grpc_test_slowdown_factor()),
closure, ep, interested_parties, channel_args, addr, deadline),
parent_(parent) {}
private:
void BeforeResumingAction() override {
grpc_core::MutexLock lock(&parent_->mu_);
if (parent_->state_ == kDelayedEndpoint0) {
gpr_log(GPR_INFO, "*** RESUMING ENDPOINT 0");
parent_->state_ = kResumedEndpoint0;
} else if (parent_->state_ == kDelayedEndpoint1) {
gpr_log(GPR_INFO, "*** RESUMING ENDPOINT 1");
parent_->state_ = kResumedEndpoint1;
} else if (parent_->state_ == kDelayedEndpoint2) {
gpr_log(GPR_INFO, "*** RESUMING ENDPOINT 2");
parent_->state_ = kResumedEndpoint2;
}
}
ConnectionInjector* parent_;
};
const int port0_;
const int port1_;
const int port2_;
const int good_port_;
grpc_core::Mutex mu_;
grpc_core::CondVar cond_;
enum {
kInit,
kDelayedEndpoint0,
kResumedEndpoint0,
kDelayedEndpoint1,
kResumedEndpoint1,
kDelayedEndpoint2,
kResumedEndpoint2,
kDone,
} state_ ABSL_GUARDED_BY(mu_) = kInit;
};
ConnectionInjector connection_injector(
non_existant_endpoint0.port, non_existant_endpoint1.port,
non_existant_endpoint2.port, backends_[0]->port());
connection_injector.Start();
// Start connection attempt injector.
ConnectionHoldInjector injector;
injector.Start();
auto hold_non_existant0 = injector.AddHold(non_existant_endpoint0.port);
auto hold_non_existant1 = injector.AddHold(non_existant_endpoint1.port);
auto hold_non_existant2 = injector.AddHold(non_existant_endpoint2.port);
auto hold_good = injector.AddHold(backends_[0]->port());
// A long-running RPC, just used to send the RPC in another thread.
LongRunningRpc rpc;
std::vector<std::pair<std::string, std::string>> metadata = {
@ -933,11 +742,48 @@ TEST_P(RingHashTest, ContinuesConnectingWithoutPicksOneSubchannelAtATime) {
non_existant_endpoint0.port)}};
rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata(
std::move(metadata)));
// Wait for the RPC to trigger the first connection attempt, then cancel it.
connection_injector.WaitForFirstPortSeen();
// Wait for the RPC to trigger a connection attempt to the first address,
// then cancel the RPC. No other connection attempts should be started yet.
hold_non_existant0->Wait();
rpc.CancelRpc();
EXPECT_FALSE(hold_non_existant1->IsStarted());
EXPECT_FALSE(hold_non_existant2->IsStarted());
EXPECT_FALSE(hold_good->IsStarted());
// Allow the connection attempt to the first address to resume and wait
// for the attempt for the second address. No other connection
// attempts should be started yet.
auto hold_non_existant0_again = injector.AddHold(non_existant_endpoint0.port);
hold_non_existant0->Resume();
hold_non_existant1->Wait();
EXPECT_FALSE(hold_non_existant0_again->IsStarted());
EXPECT_FALSE(hold_non_existant2->IsStarted());
EXPECT_FALSE(hold_good->IsStarted());
// Allow the connection attempt to the second address to resume and wait
// for the attempt for the third address. No other connection
// attempts should be started yet.
auto hold_non_existant1_again = injector.AddHold(non_existant_endpoint1.port);
hold_non_existant1->Resume();
hold_non_existant2->Wait();
EXPECT_FALSE(hold_non_existant0_again->IsStarted());
EXPECT_FALSE(hold_non_existant1_again->IsStarted());
EXPECT_FALSE(hold_good->IsStarted());
// Allow the connection attempt to the third address to resume and wait
// for the attempt for the final address. No other connection
// attempts should be started yet.
auto hold_non_existant2_again = injector.AddHold(non_existant_endpoint2.port);
hold_non_existant2->Resume();
hold_good->Wait();
EXPECT_FALSE(hold_non_existant0_again->IsStarted());
EXPECT_FALSE(hold_non_existant1_again->IsStarted());
EXPECT_FALSE(hold_non_existant2_again->IsStarted());
// Allow the final attempt to resume.
hold_good->Resume();
// Wait for channel to become connected without any pending RPC.
EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(10)));
// No other connection attempts should have been started.
EXPECT_FALSE(hold_non_existant0_again->IsStarted());
EXPECT_FALSE(hold_non_existant1_again->IsStarted());
EXPECT_FALSE(hold_non_existant2_again->IsStarted());
// RPC should have been cancelled.
EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code());
// Make sure the backend did not get any requests.

Loading…
Cancel
Save