ConnectionAttemptInjector: fix tsan failures (#30730)

pull/30737/head
Mark D. Roth 2 years ago committed by GitHub
parent 24bc7c455f
commit 03b6b01043
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      CMakeLists.txt
  2. 16
      build_autogenerated.yaml
  3. 8
      test/cpp/end2end/BUILD
  4. 21
      test/cpp/end2end/client_lb_end2end_test.cc
  5. 208
      test/cpp/end2end/connection_attempt_injector.cc
  6. 224
      test/cpp/end2end/connection_attempt_injector.h
  7. 6
      test/cpp/end2end/xds/BUILD
  8. 5
      test/cpp/end2end/xds/xds_cluster_end2end_test.cc
  9. 5
      test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc
  10. 17
      test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc

8
CMakeLists.txt generated

@ -7952,7 +7952,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
src/cpp/server/orca/orca_service.cc
test/core/util/test_lb_policies.cc
test/cpp/end2end/client_lb_end2end_test.cc
test/cpp/end2end/connection_delay_injector.cc
test/cpp/end2end/connection_attempt_injector.cc
test/cpp/end2end/test_service_impl.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@ -19869,7 +19869,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.h
test/cpp/end2end/connection_delay_injector.cc
test/cpp/end2end/connection_attempt_injector.cc
test/cpp/end2end/test_service_impl.cc
test/cpp/end2end/xds/xds_cluster_end2end_test.cc
test/cpp/end2end/xds/xds_end2end_test_lib.cc
@ -20047,7 +20047,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.h
test/cpp/end2end/connection_delay_injector.cc
test/cpp/end2end/connection_attempt_injector.cc
test/cpp/end2end/test_service_impl.cc
test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc
test/cpp/end2end/xds/xds_end2end_test_lib.cc
@ -21480,7 +21480,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.h
test/cpp/end2end/connection_delay_injector.cc
test/cpp/end2end/connection_attempt_injector.cc
test/cpp/end2end/test_service_impl.cc
test/cpp/end2end/xds/xds_end2end_test_lib.cc
test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc

@ -5059,7 +5059,7 @@ targets:
headers:
- src/cpp/server/orca/orca_interceptor.h
- test/core/util/test_lb_policies.h
- test/cpp/end2end/connection_delay_injector.h
- test/cpp/end2end/connection_attempt_injector.h
- test/cpp/end2end/test_service_impl.h
src:
- src/proto/grpc/testing/duplicate/echo_duplicate.proto
@ -5071,7 +5071,7 @@ targets:
- src/cpp/server/orca/orca_service.cc
- test/core/util/test_lb_policies.cc
- test/cpp/end2end/client_lb_end2end_test.cc
- test/cpp/end2end/connection_delay_injector.cc
- test/cpp/end2end/connection_attempt_injector.cc
- test/cpp/end2end/test_service_impl.cc
deps:
- grpc++_test_util
@ -10915,7 +10915,7 @@ targets:
run: false
language: c++
headers:
- test/cpp/end2end/connection_delay_injector.h
- test/cpp/end2end/connection_attempt_injector.h
- test/cpp/end2end/counted_service.h
- test/cpp/end2end/test_service_impl.h
- test/cpp/end2end/xds/xds_end2end_test_lib.h
@ -10955,7 +10955,7 @@ targets:
- src/proto/grpc/testing/xds/v3/route.proto
- src/proto/grpc/testing/xds/v3/router.proto
- src/proto/grpc/testing/xds/v3/string.proto
- test/cpp/end2end/connection_delay_injector.cc
- test/cpp/end2end/connection_attempt_injector.cc
- test/cpp/end2end/test_service_impl.cc
- test/cpp/end2end/xds/xds_cluster_end2end_test.cc
- test/cpp/end2end/xds/xds_end2end_test_lib.cc
@ -10973,7 +10973,7 @@ targets:
run: false
language: c++
headers:
- test/cpp/end2end/connection_delay_injector.h
- test/cpp/end2end/connection_attempt_injector.h
- test/cpp/end2end/counted_service.h
- test/cpp/end2end/test_service_impl.h
- test/cpp/end2end/xds/xds_end2end_test_lib.h
@ -11014,7 +11014,7 @@ targets:
- src/proto/grpc/testing/xds/v3/route.proto
- src/proto/grpc/testing/xds/v3/router.proto
- src/proto/grpc/testing/xds/v3/string.proto
- test/cpp/end2end/connection_delay_injector.cc
- test/cpp/end2end/connection_attempt_injector.cc
- test/cpp/end2end/test_service_impl.cc
- test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc
- test/cpp/end2end/xds/xds_end2end_test_lib.cc
@ -11463,7 +11463,7 @@ targets:
run: false
language: c++
headers:
- test/cpp/end2end/connection_delay_injector.h
- test/cpp/end2end/connection_attempt_injector.h
- test/cpp/end2end/counted_service.h
- test/cpp/end2end/test_service_impl.h
- test/cpp/end2end/xds/xds_end2end_test_lib.h
@ -11504,7 +11504,7 @@ targets:
- src/proto/grpc/testing/xds/v3/route.proto
- src/proto/grpc/testing/xds/v3/router.proto
- src/proto/grpc/testing/xds/v3/string.proto
- test/cpp/end2end/connection_delay_injector.cc
- test/cpp/end2end/connection_attempt_injector.cc
- test/cpp/end2end/test_service_impl.cc
- test/cpp/end2end/xds/xds_end2end_test_lib.cc
- test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc

@ -58,10 +58,10 @@ grpc_cc_library(
)
grpc_cc_library(
name = "connection_delay_injector",
name = "connection_attempt_injector",
testonly = True,
srcs = ["connection_delay_injector.cc"],
hdrs = ["connection_delay_injector.h"],
srcs = ["connection_attempt_injector.cc"],
hdrs = ["connection_attempt_injector.h"],
deps = [
"//:grpc",
],
@ -490,7 +490,7 @@ grpc_cc_test(
flaky = True, # TODO(b/151315347)
tags = ["no_windows"], # TODO(jtattermusch): fix test on windows
deps = [
":connection_delay_injector",
":connection_attempt_injector",
":test_service_impl",
"//:gpr",
"//:grpc",

@ -71,7 +71,7 @@
#include "test/core/util/resolve_localhost_ip46.h"
#include "test/core/util/test_config.h"
#include "test/core/util/test_lb_policies.h"
#include "test/cpp/end2end/connection_delay_injector.h"
#include "test/cpp/end2end/connection_attempt_injector.h"
#include "test/cpp/end2end/test_service_impl.h"
using grpc::testing::EchoRequest;
@ -798,9 +798,9 @@ TEST_F(PickFirstTest, BackOffMinReconnect) {
response_generator.SetNextResolution(ports);
// Make connection delay a 10% longer than it's willing to in order to make
// sure we are hitting the codepath that waits for the min reconnect backoff.
ConnectionDelayInjector delay_injector(
ConnectionAttemptInjector injector;
injector.SetDelay(
grpc_core::Duration::Milliseconds(kMinReconnectBackOffMs * 1.10));
delay_injector.Start();
const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
channel->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kMinReconnectBackOffMs * 2));
@ -850,8 +850,7 @@ TEST_F(PickFirstTest, ResetConnectionBackoff) {
TEST_F(ClientLbEnd2endTest,
ResetConnectionBackoffNextAttemptStartsImmediately) {
// Start connection injector.
ConnectionHoldInjector injector;
injector.Start();
ConnectionAttemptInjector injector;
// Create client.
const int port = grpc_pick_unused_port_or_die();
ChannelArguments args;
@ -897,8 +896,7 @@ TEST_F(
PickFirstTest,
TriesAllSubchannelsBeforeReportingTransientFailureWithSubchannelSharing) {
// Start connection injector.
ConnectionHoldInjector injector;
injector.Start();
ConnectionAttemptInjector injector;
// Get 5 unused ports. Each channel will have 2 unique ports followed
// by a common port.
std::vector<int> ports1 = {grpc_pick_unused_port_or_die(),
@ -1683,8 +1681,7 @@ TEST_F(RoundRobinTest, TransientFailureAtStartup) {
TEST_F(RoundRobinTest, StaysInTransientFailureInSubsequentConnecting) {
// Start connection injector.
ConnectionHoldInjector injector;
injector.Start();
ConnectionAttemptInjector injector;
// Get port.
const int port = grpc_pick_unused_port_or_die();
// Create channel.
@ -1723,8 +1720,7 @@ TEST_F(RoundRobinTest, StaysInTransientFailureInSubsequentConnecting) {
TEST_F(RoundRobinTest, ReportsLatestStatusInTransientFailure) {
// Start connection injector.
ConnectionHoldInjector injector;
injector.Start();
ConnectionAttemptInjector injector;
// Get port.
const std::vector<int> ports = {grpc_pick_unused_port_or_die(),
grpc_pick_unused_port_or_die()};
@ -1774,8 +1770,7 @@ TEST_F(RoundRobinTest, ReportsLatestStatusInTransientFailure) {
TEST_F(RoundRobinTest, DoesNotFailRpcsUponDisconnection) {
// Start connection injector.
ConnectionHoldInjector injector;
injector.Start();
ConnectionAttemptInjector injector;
// Start server.
StartServers(1);
// Create channel.

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/cpp/end2end/connection_delay_injector.h"
#include "test/cpp/end2end/connection_attempt_injector.h"
#include <memory>
@ -29,7 +29,7 @@ namespace grpc {
namespace testing {
//
// ConnectionAttemptInjector
// ConnectionAttemptInjector static setup
//
namespace {
@ -39,17 +39,30 @@ grpc_tcp_client_vtable* g_original_vtable = nullptr;
grpc_core::Mutex* g_mu = nullptr;
ConnectionAttemptInjector* g_injector ABSL_GUARDED_BY(*g_mu) = nullptr;
int64_t TcpConnectWithDelay(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) {
} // namespace
grpc_tcp_client_vtable ConnectionAttemptInjector::kDelayedConnectVTable = {
ConnectionAttemptInjector::TcpConnect,
ConnectionAttemptInjector::TcpConnectCancel};
void ConnectionAttemptInjector::Init() {
g_mu = new grpc_core::Mutex();
g_original_vtable = grpc_tcp_client_impl;
grpc_tcp_client_impl = &kDelayedConnectVTable;
}
int64_t ConnectionAttemptInjector::TcpConnect(
grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline) {
grpc_core::MutexLock lock(g_mu);
// If there's no injector, use the original vtable.
if (g_injector == nullptr) {
g_original_vtable->connect(closure, ep, interested_parties, channel_args,
addr, deadline);
return 0;
}
// Otherwise, use the injector.
g_injector->HandleConnection(closure, ep, interested_parties, channel_args,
addr, deadline);
return 0;
@ -60,25 +73,16 @@ int64_t TcpConnectWithDelay(grpc_closure* closure, grpc_endpoint** ep,
// g_original_vtable->cancel_connect(). If the attempt has not actually been
// started, it should mark the connect request as cancelled, so that when the
// request is resumed, it will not actually proceed.
bool TcpConnectCancel(int64_t /*connection_handle*/) { return false; }
grpc_tcp_client_vtable kDelayedConnectVTable = {TcpConnectWithDelay,
TcpConnectCancel};
} // namespace
void ConnectionAttemptInjector::Init() {
g_mu = new grpc_core::Mutex();
g_original_vtable = grpc_tcp_client_impl;
grpc_tcp_client_impl = &kDelayedConnectVTable;
bool ConnectionAttemptInjector::TcpConnectCancel(
int64_t /*connection_handle*/) {
return false;
}
ConnectionAttemptInjector::~ConnectionAttemptInjector() {
grpc_core::MutexLock lock(g_mu);
g_injector = nullptr;
}
//
// ConnectionAttemptInjector instance
//
void ConnectionAttemptInjector::Start() {
ConnectionAttemptInjector::ConnectionAttemptInjector() {
// Fail if ConnectionAttemptInjector::Init() was not called after
// grpc_init() to inject the vtable.
GPR_ASSERT(grpc_tcp_client_impl == &kDelayedConnectVTable);
@ -87,14 +91,95 @@ void ConnectionAttemptInjector::Start() {
g_injector = this;
}
void ConnectionAttemptInjector::AttemptConnection(
ConnectionAttemptInjector::~ConnectionAttemptInjector() {
grpc_core::MutexLock lock(g_mu);
g_injector = nullptr;
}
std::unique_ptr<ConnectionAttemptInjector::Hold>
ConnectionAttemptInjector::AddHold(int port, bool intercept_completion) {
grpc_core::MutexLock lock(&mu_);
auto hold = absl::make_unique<Hold>(this, port, intercept_completion);
holds_.push_back(hold.get());
return hold;
}
void ConnectionAttemptInjector::SetDelay(grpc_core::Duration delay) {
grpc_core::MutexLock lock(&mu_);
delay_ = delay;
}
void ConnectionAttemptInjector::HandleConnection(
grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline) {
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): port=%d", port);
{
grpc_core::MutexLock lock(&mu_);
// First, check if there's a hold request for this port.
for (auto it = holds_.begin(); it != holds_.end(); ++it) {
Hold* hold = *it;
if (port == hold->port_) {
gpr_log(GPR_INFO, "*** INTERCEPTING CONNECTION ATTEMPT");
if (hold->intercept_completion_) {
hold->original_on_complete_ = closure;
closure = GRPC_CLOSURE_INIT(&hold->on_complete_, Hold::OnComplete,
hold, nullptr);
}
hold->queued_attempt_ = absl::make_unique<QueuedAttempt>(
closure, ep, interested_parties, channel_args, addr, deadline);
hold->start_cv_.Signal();
holds_.erase(it);
return;
}
}
// Otherwise, if there's a configured delay, impose it.
if (delay_.has_value()) {
new InjectedDelay(*delay_, closure, ep, interested_parties, channel_args,
addr, deadline);
return;
}
}
// Anything we're not holding or delaying should proceed normally.
g_original_vtable->connect(closure, ep, interested_parties, channel_args,
addr, deadline);
}
//
// ConnectionAttemptInjector::QueuedAttempt
//
ConnectionAttemptInjector::QueuedAttempt::QueuedAttempt(
grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline)
: closure_(closure),
endpoint_(ep),
interested_parties_(interested_parties),
channel_args_(grpc_channel_args_copy(channel_args)),
deadline_(deadline) {
memcpy(&address_, addr, sizeof(address_));
}
ConnectionAttemptInjector::QueuedAttempt::~QueuedAttempt() {
GPR_ASSERT(closure_ == nullptr);
grpc_channel_args_destroy(channel_args_);
}
void ConnectionAttemptInjector::QueuedAttempt::Resume() {
GPR_ASSERT(closure_ != nullptr);
g_original_vtable->connect(closure_, endpoint_, interested_parties_,
channel_args_, &address_, deadline_);
closure_ = nullptr;
}
void ConnectionAttemptInjector::QueuedAttempt::Fail(grpc_error_handle error) {
GPR_ASSERT(closure_ != nullptr);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure_, error);
closure_ = nullptr;
}
//
// ConnectionAttemptInjector::InjectedDelay
//
@ -113,34 +198,21 @@ ConnectionAttemptInjector::InjectedDelay::InjectedDelay(
void ConnectionAttemptInjector::InjectedDelay::TimerCallback(
void* arg, grpc_error_handle /*error*/) {
auto* self = static_cast<InjectedDelay*>(arg);
self->BeforeResumingAction();
self->attempt_.Resume();
delete self;
}
//
// ConnectionDelayInjector
// ConnectionAttemptInjector::Hold
//
void ConnectionDelayInjector::HandleConnection(
grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline) {
new InjectedDelay(duration_, closure, ep, interested_parties, channel_args,
addr, deadline);
}
//
// ConnectionHoldInjector::Hold
//
ConnectionHoldInjector::Hold::Hold(ConnectionHoldInjector* injector, int port,
bool intercept_completion)
ConnectionAttemptInjector::Hold::Hold(ConnectionAttemptInjector* injector,
int port, bool intercept_completion)
: injector_(injector),
port_(port),
intercept_completion_(intercept_completion) {}
void ConnectionHoldInjector::Hold::Wait() {
void ConnectionAttemptInjector::Hold::Wait() {
gpr_log(GPR_INFO, "=== WAITING FOR CONNECTION ATTEMPT ON PORT %d ===", port_);
grpc_core::MutexLock lock(&injector_->mu_);
while (queued_attempt_ == nullptr) {
@ -149,7 +221,7 @@ void ConnectionHoldInjector::Hold::Wait() {
gpr_log(GPR_INFO, "=== CONNECTION ATTEMPT STARTED ON PORT %d ===", port_);
}
void ConnectionHoldInjector::Hold::Resume() {
void ConnectionAttemptInjector::Hold::Resume() {
gpr_log(GPR_INFO, "=== RESUMING CONNECTION ATTEMPT ON PORT %d ===", port_);
grpc_core::ExecCtx exec_ctx;
std::unique_ptr<QueuedAttempt> attempt;
@ -160,7 +232,7 @@ void ConnectionHoldInjector::Hold::Resume() {
attempt->Resume();
}
void ConnectionHoldInjector::Hold::Fail(grpc_error_handle error) {
void ConnectionAttemptInjector::Hold::Fail(grpc_error_handle error) {
gpr_log(GPR_INFO, "=== FAILING CONNECTION ATTEMPT ON PORT %d ===", port_);
grpc_core::ExecCtx exec_ctx;
std::unique_ptr<QueuedAttempt> attempt;
@ -171,7 +243,7 @@ void ConnectionHoldInjector::Hold::Fail(grpc_error_handle error) {
attempt->Fail(error);
}
void ConnectionHoldInjector::Hold::WaitForCompletion() {
void ConnectionAttemptInjector::Hold::WaitForCompletion() {
gpr_log(GPR_INFO,
"=== WAITING FOR CONNECTION COMPLETION ON PORT %d ===", port_);
grpc_core::MutexLock lock(&injector_->mu_);
@ -181,13 +253,13 @@ void ConnectionHoldInjector::Hold::WaitForCompletion() {
gpr_log(GPR_INFO, "=== CONNECTION COMPLETED ON PORT %d ===", port_);
}
bool ConnectionHoldInjector::Hold::IsStarted() {
bool ConnectionAttemptInjector::Hold::IsStarted() {
grpc_core::MutexLock lock(&injector_->mu_);
return !start_cv_.WaitWithDeadline(&injector_->mu_, absl::Now());
}
void ConnectionHoldInjector::Hold::OnComplete(void* arg,
grpc_error_handle error) {
void ConnectionAttemptInjector::Hold::OnComplete(void* arg,
grpc_error_handle error) {
auto* self = static_cast<Hold*>(arg);
grpc_closure* on_complete;
{
@ -199,47 +271,5 @@ void ConnectionHoldInjector::Hold::OnComplete(void* arg,
grpc_core::Closure::Run(DEBUG_LOCATION, on_complete, GRPC_ERROR_REF(error));
}
//
// ConnectionHoldInjector
//
std::unique_ptr<ConnectionHoldInjector::Hold> ConnectionHoldInjector::AddHold(
int port, bool intercept_completion) {
grpc_core::MutexLock lock(&mu_);
auto hold = absl::make_unique<Hold>(this, port, intercept_completion);
holds_.push_back(hold.get());
return hold;
}
void ConnectionHoldInjector::HandleConnection(
grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline) {
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): port=%d", port);
{
grpc_core::MutexLock lock(&mu_);
for (auto it = holds_.begin(); it != holds_.end(); ++it) {
Hold* hold = *it;
if (port == hold->port_) {
gpr_log(GPR_INFO, "*** INTERCEPTING CONNECTION ATTEMPT");
if (hold->intercept_completion_) {
hold->original_on_complete_ = closure;
closure = GRPC_CLOSURE_INIT(&hold->on_complete_, Hold::OnComplete,
hold, nullptr);
}
hold->queued_attempt_ = absl::make_unique<QueuedAttempt>(
closure, ep, interested_parties, channel_args, addr, deadline);
hold->start_cv_.Signal();
holds_.erase(it);
return;
}
}
}
// Anything we're not holding should proceed normally.
AttemptConnection(closure, ep, interested_parties, channel_args, addr,
deadline);
}
} // namespace testing
} // namespace grpc

@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_TEST_CPP_END2END_CONNECTION_DELAY_INJECTOR_H
#define GRPC_TEST_CPP_END2END_CONNECTION_DELAY_INJECTOR_H
#ifndef GRPC_TEST_CPP_END2END_CONNECTION_ATTEMPT_INJECTOR_H
#define GRPC_TEST_CPP_END2END_CONNECTION_ATTEMPT_INJECTOR_H
#include <memory>
@ -31,36 +31,92 @@ namespace testing {
// // At grpc_init() time.
// ConnectionAttemptInjector::Init();
//
// // When an injection is desired.
// ConnectionDelayInjector delay_injector(grpc_core::Duration::Seconds(10));
// delay_injector.Start();
// // Instantiate when injection is desired.
// ConnectionAttemptInjector injector;
//
// // To inject a hold for the next connection attempt for a given port.
// auto hold = injector.AddHold(port);
// hold.Wait();
// // ...do stuff...
// hold.Resume(); // Or hold.Fail() if you want to force a failure.
//
// // Inject a fixed delay for all connection attempts.
// injector.SetDelay(grpc_core::Duration::Seconds(10));
//
// The injection is global, so there must be only one ConnectionAttemptInjector
// object at any one time.
class ConnectionAttemptInjector {
//
// Note: This must be "final" to avoid tsan problems in both the ctor
// and dtor related to initializing the vtable.
class ConnectionAttemptInjector final {
private:
// Forward declarations.
class QueuedAttempt;
grpc_core::Mutex mu_;
public:
class Hold {
public:
// Do not instantiate directly -- must be created via AddHold().
Hold(ConnectionAttemptInjector* injector, int port,
bool intercept_completion);
// Waits for the connection attempt to start.
// After this returns, exactly one of Resume() or Fail() must be called.
void Wait();
// Resumes a connection attempt. Must be called after Wait().
void Resume();
// Fails a connection attempt. Must be called after Wait().
void Fail(grpc_error_handle error);
// If the hold was created with intercept_completion=true, then this
// can be called after Resume() to wait for the connection attempt
// to complete.
void WaitForCompletion();
// Returns true if the connection attempt has been started.
bool IsStarted();
private:
friend class ConnectionAttemptInjector;
static void OnComplete(void* arg, grpc_error_handle error);
ConnectionAttemptInjector* injector_;
const int port_;
const bool intercept_completion_;
std::unique_ptr<QueuedAttempt> queued_attempt_
ABSL_GUARDED_BY(&ConnectionAttemptInjector::mu_);
grpc_core::CondVar start_cv_;
grpc_closure on_complete_;
grpc_closure* original_on_complete_;
grpc_core::CondVar complete_cv_;
};
// Global initializer. Replaces the iomgr TCP client vtable.
// Must be called exactly once after grpc_init() but before any TCP
// connections are established.
static void Init();
virtual ~ConnectionAttemptInjector();
ConnectionAttemptInjector();
~ConnectionAttemptInjector();
// Must be called after instantiation.
void Start();
// Adds a hold for a given port. The caller may then use Wait() on
// the resulting Hold object to wait for the connection attempt to start.
// If intercept_completion is true, the caller can use WaitForCompletion()
// on the resulting Hold object.
std::unique_ptr<Hold> AddHold(int port, bool intercept_completion = false);
// Invoked for every TCP connection attempt.
// Implementations must eventually either invoke the closure
// themselves or delegate to the iomgr implementation by calling
// AttemptConnection(). QueuedAttempt may be used to queue an attempt
// for asynchronous processing.
virtual void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) = 0;
// Set a fixed delay for all RPCs. Will be used only if there is no
// hold for the connection attempt.
void SetDelay(grpc_core::Duration delay);
private:
static grpc_tcp_client_vtable kDelayedConnectVTable;
protected:
// Represents a queued attempt.
// The caller must invoke either Resume() or Fail() before destroying.
class QueuedAttempt {
@ -69,34 +125,14 @@ class ConnectionAttemptInjector {
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline)
: closure_(closure),
endpoint_(ep),
interested_parties_(interested_parties),
channel_args_(grpc_channel_args_copy(channel_args)),
deadline_(deadline) {
memcpy(&address_, addr, sizeof(address_));
}
~QueuedAttempt() {
GPR_ASSERT(closure_ == nullptr);
grpc_channel_args_destroy(channel_args_);
}
grpc_core::Timestamp deadline);
~QueuedAttempt();
// Caller must invoke this from a thread with an ExecCtx.
void Resume() {
GPR_ASSERT(closure_ != nullptr);
AttemptConnection(closure_, endpoint_, interested_parties_, channel_args_,
&address_, deadline_);
closure_ = nullptr;
}
void Resume();
// Caller must invoke this from a thread with an ExecCtx.
void Fail(grpc_error_handle error) {
GPR_ASSERT(closure_ != nullptr);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure_, error);
closure_ = nullptr;
}
void Fail(grpc_error_handle error);
private:
grpc_closure* closure_;
@ -119,9 +155,6 @@ class ConnectionAttemptInjector {
grpc_core::Timestamp deadline);
private:
// Subclasses can override to perform an action when the attempt resumes.
virtual void BeforeResumingAction() {}
static void TimerCallback(void* arg, grpc_error_handle /*error*/);
QueuedAttempt attempt_;
@ -129,92 +162,33 @@ class ConnectionAttemptInjector {
grpc_closure timer_callback_;
};
static void AttemptConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline);
};
// A concrete implementation that injects a fixed delay.
class ConnectionDelayInjector : public ConnectionAttemptInjector {
public:
explicit ConnectionDelayInjector(grpc_core::Duration duration)
: duration_(duration) {}
// Invoked for every TCP connection attempt.
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override;
private:
grpc_core::Duration duration_;
};
// A concrete implementation that allows injecting holds for individual
// connection attemps, one at a time.
class ConnectionHoldInjector : public ConnectionAttemptInjector {
private:
grpc_core::Mutex mu_; // Needs to be declared up front.
public:
class Hold {
public:
// Do not instantiate directly -- must be created via AddHold().
Hold(ConnectionHoldInjector* injector, int port, bool intercept_completion);
grpc_core::Timestamp deadline);
// Waits for the connection attempt to start.
// After this returns, exactly one of Resume() or Fail() must be called.
void Wait();
// Resumes a connection attempt. Must be called after Wait().
void Resume();
// Fails a connection attempt. Must be called after Wait().
void Fail(grpc_error_handle error);
// If the hold was created with intercept_completion=true, then this
// can be called after Resume() to wait for the connection attempt
// to complete.
void WaitForCompletion();
// Returns true if the connection attempt has been started.
bool IsStarted();
private:
friend class ConnectionHoldInjector;
static void OnComplete(void* arg, grpc_error_handle error);
ConnectionHoldInjector* injector_;
const int port_;
const bool intercept_completion_;
std::unique_ptr<QueuedAttempt> queued_attempt_
ABSL_GUARDED_BY(&ConnectionHoldInjector::mu_);
grpc_core::CondVar start_cv_;
grpc_closure on_complete_;
grpc_closure* original_on_complete_;
grpc_core::CondVar complete_cv_;
};
// Adds a hold for a given port. The caller may then use Wait() on
// the resulting Hold object to wait for the connection attempt to start.
// If intercept_completion is true, the caller can use WaitForCompletion()
// on the resulting Hold object.
std::unique_ptr<Hold> AddHold(int port, bool intercept_completion = false);
private:
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override;
static void AttemptConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline);
std::vector<Hold*> holds_;
// Replacement iomgr tcp_connect vtable functions that use the current
// ConnectionAttemptInjector object.
static int64_t TcpConnect(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline);
static bool TcpConnectCancel(int64_t connection_handle);
std::vector<Hold*> holds_ ABSL_GUARDED_BY(&mu_);
absl::optional<grpc_core::Duration> delay_ ABSL_GUARDED_BY(&mu_);
};
} // namespace testing
} // namespace grpc
#endif // GRPC_TEST_CPP_END2END_CONNECTION_DELAY_INJECTOR_H
#endif // GRPC_TEST_CPP_END2END_CONNECTION_ATTEMPT_INJECTOR_H

@ -166,7 +166,7 @@ grpc_cc_test(
"//:grpc",
"//:grpc++",
"//test/core/util:grpc_test_util",
"//test/cpp/end2end:connection_delay_injector",
"//test/cpp/end2end:connection_attempt_injector",
],
)
@ -191,7 +191,7 @@ grpc_cc_test(
"//:grpc_resolver_fake",
"//src/proto/grpc/testing/xds/v3:aggregate_cluster_proto",
"//test/core/util:grpc_test_util",
"//test/cpp/end2end:connection_delay_injector",
"//test/cpp/end2end:connection_attempt_injector",
],
)
@ -322,7 +322,7 @@ grpc_cc_test(
"//src/proto/grpc/testing/xds/v3:listener_proto",
"//src/proto/grpc/testing/xds/v3:route_proto",
"//test/core/util:grpc_test_util",
"//test/cpp/end2end:connection_delay_injector",
"//test/cpp/end2end:connection_attempt_injector",
],
)

@ -25,7 +25,7 @@
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "test/cpp/end2end/connection_delay_injector.h"
#include "test/cpp/end2end/connection_attempt_injector.h"
#include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
namespace grpc {
@ -1155,8 +1155,7 @@ TEST_P(FailoverTest, ReportsConnectingDuringFailover) {
{"locality1", CreateEndpointsForBackends(), kDefaultLocalityWeight, 1},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
ConnectionHoldInjector injector;
injector.Start();
ConnectionAttemptInjector injector;
auto hold = injector.AddHold(backends_[0]->port());
// Start an RPC in the background, which should cause the channel to
// try to connect.

@ -28,7 +28,7 @@
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h"
#include "test/cpp/end2end/connection_delay_injector.h"
#include "test/cpp/end2end/connection_attempt_injector.h"
#include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
namespace grpc {
@ -435,8 +435,7 @@ TEST_P(AggregateClusterTest, FallBackWithConnectivityChurn) {
custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
balancer_->ads_service()->SetCdsResource(cluster);
// Start connection injector.
ConnectionHoldInjector injector;
injector.Start();
ConnectionAttemptInjector injector;
auto hold0 = injector.AddHold(backends_[0]->port());
auto hold1 = injector.AddHold(backends_[1]->port());
// Start long-running RPC in the background.

@ -29,7 +29,7 @@
#include "src/core/lib/gpr/env.h"
#include "src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h"
#include "test/cpp/end2end/connection_delay_injector.h"
#include "test/cpp/end2end/connection_attempt_injector.h"
#include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
namespace grpc {
@ -212,9 +212,9 @@ TEST_P(RingHashTest,
std::move(result));
}
// Inject connection delay to make this act more realistically.
ConnectionDelayInjector delay_injector(
grpc_core::Duration::Milliseconds(500) * grpc_test_slowdown_factor());
delay_injector.Start();
ConnectionAttemptInjector injector;
injector.SetDelay(grpc_core::Duration::Milliseconds(500) *
grpc_test_slowdown_factor());
// Send RPC. Need the timeout to be long enough to account for the
// subchannel connection delays.
CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(5000));
@ -280,8 +280,7 @@ TEST_P(RingHashTest,
std::move(result));
}
// Set up connection attempt injector.
ConnectionHoldInjector injector;
injector.Start();
ConnectionAttemptInjector injector;
auto hold = injector.AddHold(backends_[0]->port());
// Increase subchannel backoff time, so that subchannels stay in
// TRANSIENT_FAILURE for long enough to trigger potential problems.
@ -789,8 +788,7 @@ TEST_P(RingHashTest, ContinuesConnectingWithoutPicks) {
new_route_config);
// Start connection attempt injector and add a hold for the P0
// connection attempt.
ConnectionHoldInjector injector;
injector.Start();
ConnectionAttemptInjector injector;
auto hold = injector.AddHold(non_existant_endpoint.port);
// A long-running RPC, just used to send the RPC in another thread.
LongRunningRpc rpc;
@ -835,8 +833,7 @@ TEST_P(RingHashTest, ContinuesConnectingWithoutPicksOneSubchannelAtATime) {
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
// Start connection attempt injector.
ConnectionHoldInjector injector;
injector.Start();
ConnectionAttemptInjector injector;
auto hold_non_existant0 = injector.AddHold(non_existant_endpoint0.port);
auto hold_non_existant1 = injector.AddHold(non_existant_endpoint1.port);
auto hold_non_existant2 = injector.AddHold(non_existant_endpoint2.port);

Loading…
Cancel
Save