diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index ed848ccc2a9..e28472cad44 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -607,6 +607,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) { // sure we are hitting the codepath that waits for the min reconnect backoff. ConnectionDelayInjector delay_injector( grpc_core::Duration::Milliseconds(kMinReconnectBackOffMs * 1.10)); + delay_injector.Start(); const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC); channel->WaitForConnected( grpc_timeout_milliseconds_to_deadline(kMinReconnectBackOffMs * 2)); diff --git a/test/cpp/end2end/connection_delay_injector.cc b/test/cpp/end2end/connection_delay_injector.cc index 30ce66eb8f7..75e45870175 100644 --- a/test/cpp/end2end/connection_delay_injector.cc +++ b/test/cpp/end2end/connection_delay_injector.cc @@ -14,12 +14,13 @@ #include "test/cpp/end2end/connection_delay_injector.h" -#include #include #include "absl/memory/memory.h" #include "absl/utility/utility.h" +#include "src/core/lib/gprpp/sync.h" + // defined in tcp_client.cc extern grpc_tcp_client_vtable* grpc_tcp_client_impl; @@ -33,21 +34,23 @@ namespace testing { namespace { grpc_tcp_client_vtable* g_original_vtable = nullptr; -std::atomic g_injector{nullptr}; + +grpc_core::Mutex* g_mu = nullptr; +ConnectionAttemptInjector* g_injector ABSL_GUARDED_BY(*g_mu) = nullptr; void TcpConnectWithDelay(grpc_closure* closure, grpc_endpoint** ep, grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, const grpc_resolved_address* addr, grpc_core::Timestamp deadline) { - ConnectionAttemptInjector* injector = g_injector.load(); - if (injector == nullptr) { + grpc_core::MutexLock lock(g_mu); + if (g_injector == nullptr) { g_original_vtable->connect(closure, ep, interested_parties, channel_args, addr, deadline); return; } - injector->HandleConnection(closure, ep, interested_parties, channel_args, - addr, deadline); + g_injector->HandleConnection(closure, ep, interested_parties, channel_args, + addr, deadline); } grpc_tcp_client_vtable kDelayedConnectVTable = {TcpConnectWithDelay}; @@ -55,16 +58,20 @@ grpc_tcp_client_vtable kDelayedConnectVTable = {TcpConnectWithDelay}; } // namespace void ConnectionAttemptInjector::Init() { + g_mu = new grpc_core::Mutex(); g_original_vtable = grpc_tcp_client_impl; grpc_tcp_client_impl = &kDelayedConnectVTable; } -ConnectionAttemptInjector::ConnectionAttemptInjector() { - GPR_ASSERT(g_injector.exchange(this) == nullptr); +ConnectionAttemptInjector::~ConnectionAttemptInjector() { + grpc_core::MutexLock lock(g_mu); + g_injector = nullptr; } -ConnectionAttemptInjector::~ConnectionAttemptInjector() { - g_injector.store(nullptr); +void ConnectionAttemptInjector::Start() { + grpc_core::MutexLock lock(g_mu); + GPR_ASSERT(g_injector == nullptr); + g_injector = this; } void ConnectionAttemptInjector::AttemptConnection( diff --git a/test/cpp/end2end/connection_delay_injector.h b/test/cpp/end2end/connection_delay_injector.h index 6de089ce52d..cbe09f1c36a 100644 --- a/test/cpp/end2end/connection_delay_injector.h +++ b/test/cpp/end2end/connection_delay_injector.h @@ -33,6 +33,7 @@ namespace testing { // // // When an injection is desired. // ConnectionDelayInjector delay_injector(grpc_core::Duration::Seconds(10)); +// delay_injector.Start(); // // The injection is global, so there must be only one ConnectionAttemptInjector // object at any one time. @@ -42,9 +43,11 @@ class ConnectionAttemptInjector { // Must be called exactly once before any TCP connections are established. static void Init(); - ConnectionAttemptInjector(); virtual ~ConnectionAttemptInjector(); + // Must be called after instantiation. + void Start(); + // Invoked for every TCP connection attempt. // Implementations must eventually either invoke the closure // themselves or delegate to the iomgr implementation by calling diff --git a/test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc b/test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc index 8bd0207de50..983128a5240 100644 --- a/test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc @@ -559,6 +559,7 @@ TEST_P(AggregateClusterTest, FallBackWithConnectivityChurn) { }; ConnectionInjector connection_attempt_injector(backends_[0]->port(), backends_[1]->port()); + connection_attempt_injector.Start(); // Wait for P0 backend. // Increase timeout to account for subchannel connection delays. WaitForBackend(0, WaitForBackendOptions(), RpcOptions().set_timeout_ms(2000)); diff --git a/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc b/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc index 5d979f0b354..5951ea9f70a 100644 --- a/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc @@ -212,6 +212,7 @@ TEST_P(RingHashTest, // Inject connection delay to make this act more realistically. ConnectionDelayInjector delay_injector( grpc_core::Duration::Milliseconds(500) * grpc_test_slowdown_factor()); + delay_injector.Start(); // Send RPC. Need the timeout to be long enough to account for the // subchannel connection delays. CheckRpcSendOk(1, RpcOptions().set_timeout_ms(5000)); @@ -670,6 +671,7 @@ TEST_P(RingHashTest, ContinuesConnectingWithoutPicks) { bool seen_port_ ABSL_GUARDED_BY(mu_) = false; }; ConnectionInjector connection_injector(non_existant_endpoint.port); + connection_injector.Start(); // A long-running RPC, just used to send the RPC in another thread. LongRunningRpc rpc; std::vector> metadata = { @@ -855,6 +857,7 @@ TEST_P(RingHashTest, ContinuesConnectingWithoutPicksOneSubchannelAtATime) { ConnectionInjector connection_injector( non_existant_endpoint0.port, non_existant_endpoint1.port, non_existant_endpoint2.port, backends_[0]->port()); + connection_injector.Start(); // A long-running RPC, just used to send the RPC in another thread. LongRunningRpc rpc; std::vector> metadata = {