connection delay injector: fix tsan problems (#29445)

pull/29449/head
Mark D. Roth 3 years ago committed by GitHub
parent be1b4a6d3a
commit 8ba275291a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      test/cpp/end2end/client_lb_end2end_test.cc
  2. 27
      test/cpp/end2end/connection_delay_injector.cc
  3. 5
      test/cpp/end2end/connection_delay_injector.h
  4. 1
      test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc
  5. 3
      test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc

@ -607,6 +607,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) {
// sure we are hitting the codepath that waits for the min reconnect backoff. // sure we are hitting the codepath that waits for the min reconnect backoff.
ConnectionDelayInjector delay_injector( ConnectionDelayInjector delay_injector(
grpc_core::Duration::Milliseconds(kMinReconnectBackOffMs * 1.10)); grpc_core::Duration::Milliseconds(kMinReconnectBackOffMs * 1.10));
delay_injector.Start();
const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC); const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
channel->WaitForConnected( channel->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kMinReconnectBackOffMs * 2)); grpc_timeout_milliseconds_to_deadline(kMinReconnectBackOffMs * 2));

@ -14,12 +14,13 @@
#include "test/cpp/end2end/connection_delay_injector.h" #include "test/cpp/end2end/connection_delay_injector.h"
#include <atomic>
#include <memory> #include <memory>
#include "absl/memory/memory.h" #include "absl/memory/memory.h"
#include "absl/utility/utility.h" #include "absl/utility/utility.h"
#include "src/core/lib/gprpp/sync.h"
// defined in tcp_client.cc // defined in tcp_client.cc
extern grpc_tcp_client_vtable* grpc_tcp_client_impl; extern grpc_tcp_client_vtable* grpc_tcp_client_impl;
@ -33,21 +34,23 @@ namespace testing {
namespace { namespace {
grpc_tcp_client_vtable* g_original_vtable = nullptr; grpc_tcp_client_vtable* g_original_vtable = nullptr;
std::atomic<ConnectionAttemptInjector*> g_injector{nullptr};
grpc_core::Mutex* g_mu = nullptr;
ConnectionAttemptInjector* g_injector ABSL_GUARDED_BY(*g_mu) = nullptr;
void TcpConnectWithDelay(grpc_closure* closure, grpc_endpoint** ep, void TcpConnectWithDelay(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args, const grpc_channel_args* channel_args,
const grpc_resolved_address* addr, const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) { grpc_core::Timestamp deadline) {
ConnectionAttemptInjector* injector = g_injector.load(); grpc_core::MutexLock lock(g_mu);
if (injector == nullptr) { if (g_injector == nullptr) {
g_original_vtable->connect(closure, ep, interested_parties, channel_args, g_original_vtable->connect(closure, ep, interested_parties, channel_args,
addr, deadline); addr, deadline);
return; return;
} }
injector->HandleConnection(closure, ep, interested_parties, channel_args, g_injector->HandleConnection(closure, ep, interested_parties, channel_args,
addr, deadline); addr, deadline);
} }
grpc_tcp_client_vtable kDelayedConnectVTable = {TcpConnectWithDelay}; grpc_tcp_client_vtable kDelayedConnectVTable = {TcpConnectWithDelay};
@ -55,16 +58,20 @@ grpc_tcp_client_vtable kDelayedConnectVTable = {TcpConnectWithDelay};
} // namespace } // namespace
void ConnectionAttemptInjector::Init() { void ConnectionAttemptInjector::Init() {
g_mu = new grpc_core::Mutex();
g_original_vtable = grpc_tcp_client_impl; g_original_vtable = grpc_tcp_client_impl;
grpc_tcp_client_impl = &kDelayedConnectVTable; grpc_tcp_client_impl = &kDelayedConnectVTable;
} }
ConnectionAttemptInjector::ConnectionAttemptInjector() { ConnectionAttemptInjector::~ConnectionAttemptInjector() {
GPR_ASSERT(g_injector.exchange(this) == nullptr); grpc_core::MutexLock lock(g_mu);
g_injector = nullptr;
} }
ConnectionAttemptInjector::~ConnectionAttemptInjector() { void ConnectionAttemptInjector::Start() {
g_injector.store(nullptr); grpc_core::MutexLock lock(g_mu);
GPR_ASSERT(g_injector == nullptr);
g_injector = this;
} }
void ConnectionAttemptInjector::AttemptConnection( void ConnectionAttemptInjector::AttemptConnection(

@ -33,6 +33,7 @@ namespace testing {
// //
// // When an injection is desired. // // When an injection is desired.
// ConnectionDelayInjector delay_injector(grpc_core::Duration::Seconds(10)); // ConnectionDelayInjector delay_injector(grpc_core::Duration::Seconds(10));
// delay_injector.Start();
// //
// The injection is global, so there must be only one ConnectionAttemptInjector // The injection is global, so there must be only one ConnectionAttemptInjector
// object at any one time. // object at any one time.
@ -42,9 +43,11 @@ class ConnectionAttemptInjector {
// Must be called exactly once before any TCP connections are established. // Must be called exactly once before any TCP connections are established.
static void Init(); static void Init();
ConnectionAttemptInjector();
virtual ~ConnectionAttemptInjector(); virtual ~ConnectionAttemptInjector();
// Must be called after instantiation.
void Start();
// Invoked for every TCP connection attempt. // Invoked for every TCP connection attempt.
// Implementations must eventually either invoke the closure // Implementations must eventually either invoke the closure
// themselves or delegate to the iomgr implementation by calling // themselves or delegate to the iomgr implementation by calling

@ -559,6 +559,7 @@ TEST_P(AggregateClusterTest, FallBackWithConnectivityChurn) {
}; };
ConnectionInjector connection_attempt_injector(backends_[0]->port(), ConnectionInjector connection_attempt_injector(backends_[0]->port(),
backends_[1]->port()); backends_[1]->port());
connection_attempt_injector.Start();
// Wait for P0 backend. // Wait for P0 backend.
// Increase timeout to account for subchannel connection delays. // Increase timeout to account for subchannel connection delays.
WaitForBackend(0, WaitForBackendOptions(), RpcOptions().set_timeout_ms(2000)); WaitForBackend(0, WaitForBackendOptions(), RpcOptions().set_timeout_ms(2000));

@ -212,6 +212,7 @@ TEST_P(RingHashTest,
// Inject connection delay to make this act more realistically. // Inject connection delay to make this act more realistically.
ConnectionDelayInjector delay_injector( ConnectionDelayInjector delay_injector(
grpc_core::Duration::Milliseconds(500) * grpc_test_slowdown_factor()); grpc_core::Duration::Milliseconds(500) * grpc_test_slowdown_factor());
delay_injector.Start();
// Send RPC. Need the timeout to be long enough to account for the // Send RPC. Need the timeout to be long enough to account for the
// subchannel connection delays. // subchannel connection delays.
CheckRpcSendOk(1, RpcOptions().set_timeout_ms(5000)); CheckRpcSendOk(1, RpcOptions().set_timeout_ms(5000));
@ -670,6 +671,7 @@ TEST_P(RingHashTest, ContinuesConnectingWithoutPicks) {
bool seen_port_ ABSL_GUARDED_BY(mu_) = false; bool seen_port_ ABSL_GUARDED_BY(mu_) = false;
}; };
ConnectionInjector connection_injector(non_existant_endpoint.port); ConnectionInjector connection_injector(non_existant_endpoint.port);
connection_injector.Start();
// A long-running RPC, just used to send the RPC in another thread. // A long-running RPC, just used to send the RPC in another thread.
LongRunningRpc rpc; LongRunningRpc rpc;
std::vector<std::pair<std::string, std::string>> metadata = { std::vector<std::pair<std::string, std::string>> metadata = {
@ -855,6 +857,7 @@ TEST_P(RingHashTest, ContinuesConnectingWithoutPicksOneSubchannelAtATime) {
ConnectionInjector connection_injector( ConnectionInjector connection_injector(
non_existant_endpoint0.port, non_existant_endpoint1.port, non_existant_endpoint0.port, non_existant_endpoint1.port,
non_existant_endpoint2.port, backends_[0]->port()); non_existant_endpoint2.port, backends_[0]->port());
connection_injector.Start();
// A long-running RPC, just used to send the RPC in another thread. // A long-running RPC, just used to send the RPC in another thread.
LongRunningRpc rpc; LongRunningRpc rpc;
std::vector<std::pair<std::string, std::string>> metadata = { std::vector<std::pair<std::string, std::string>> metadata = {

Loading…
Cancel
Save