diff --git a/CMakeLists.txt b/CMakeLists.txt index 6ae63032179..e0fb16f0fd7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7952,7 +7952,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) src/cpp/server/orca/orca_service.cc test/core/util/test_lb_policies.cc test/cpp/end2end/client_lb_end2end_test.cc - test/cpp/end2end/connection_delay_injector.cc + test/cpp/end2end/connection_attempt_injector.cc test/cpp/end2end/test_service_impl.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc @@ -19869,7 +19869,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.h - test/cpp/end2end/connection_delay_injector.cc + test/cpp/end2end/connection_attempt_injector.cc test/cpp/end2end/test_service_impl.cc test/cpp/end2end/xds/xds_cluster_end2end_test.cc test/cpp/end2end/xds/xds_end2end_test_lib.cc @@ -20047,7 +20047,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.h - test/cpp/end2end/connection_delay_injector.cc + test/cpp/end2end/connection_attempt_injector.cc test/cpp/end2end/test_service_impl.cc test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc test/cpp/end2end/xds/xds_end2end_test_lib.cc @@ -21480,7 +21480,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.h - test/cpp/end2end/connection_delay_injector.cc + test/cpp/end2end/connection_attempt_injector.cc test/cpp/end2end/test_service_impl.cc test/cpp/end2end/xds/xds_end2end_test_lib.cc test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index f04589c8da8..783ee84bab9 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -5059,7 +5059,7 @@ targets: headers: - src/cpp/server/orca/orca_interceptor.h - test/core/util/test_lb_policies.h - - test/cpp/end2end/connection_delay_injector.h + - test/cpp/end2end/connection_attempt_injector.h - test/cpp/end2end/test_service_impl.h src: - src/proto/grpc/testing/duplicate/echo_duplicate.proto @@ -5071,7 +5071,7 @@ targets: - src/cpp/server/orca/orca_service.cc - test/core/util/test_lb_policies.cc - test/cpp/end2end/client_lb_end2end_test.cc - - test/cpp/end2end/connection_delay_injector.cc + - test/cpp/end2end/connection_attempt_injector.cc - test/cpp/end2end/test_service_impl.cc deps: - grpc++_test_util @@ -10915,7 +10915,7 @@ targets: run: false language: c++ headers: - - test/cpp/end2end/connection_delay_injector.h + - test/cpp/end2end/connection_attempt_injector.h - test/cpp/end2end/counted_service.h - test/cpp/end2end/test_service_impl.h - test/cpp/end2end/xds/xds_end2end_test_lib.h @@ -10955,7 +10955,7 @@ targets: - src/proto/grpc/testing/xds/v3/route.proto - src/proto/grpc/testing/xds/v3/router.proto - src/proto/grpc/testing/xds/v3/string.proto - - test/cpp/end2end/connection_delay_injector.cc + - test/cpp/end2end/connection_attempt_injector.cc - test/cpp/end2end/test_service_impl.cc - test/cpp/end2end/xds/xds_cluster_end2end_test.cc - test/cpp/end2end/xds/xds_end2end_test_lib.cc @@ -10973,7 +10973,7 @@ targets: run: false language: c++ headers: - - test/cpp/end2end/connection_delay_injector.h + - test/cpp/end2end/connection_attempt_injector.h - test/cpp/end2end/counted_service.h - test/cpp/end2end/test_service_impl.h - test/cpp/end2end/xds/xds_end2end_test_lib.h @@ -11014,7 +11014,7 @@ targets: - src/proto/grpc/testing/xds/v3/route.proto - src/proto/grpc/testing/xds/v3/router.proto - src/proto/grpc/testing/xds/v3/string.proto - - test/cpp/end2end/connection_delay_injector.cc + - test/cpp/end2end/connection_attempt_injector.cc - test/cpp/end2end/test_service_impl.cc - test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc - test/cpp/end2end/xds/xds_end2end_test_lib.cc @@ -11463,7 +11463,7 @@ targets: run: false language: c++ headers: - - test/cpp/end2end/connection_delay_injector.h + - test/cpp/end2end/connection_attempt_injector.h - test/cpp/end2end/counted_service.h - test/cpp/end2end/test_service_impl.h - test/cpp/end2end/xds/xds_end2end_test_lib.h @@ -11504,7 +11504,7 @@ targets: - src/proto/grpc/testing/xds/v3/route.proto - src/proto/grpc/testing/xds/v3/router.proto - src/proto/grpc/testing/xds/v3/string.proto - - test/cpp/end2end/connection_delay_injector.cc + - test/cpp/end2end/connection_attempt_injector.cc - test/cpp/end2end/test_service_impl.cc - test/cpp/end2end/xds/xds_end2end_test_lib.cc - test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 80b87010dc6..a66aa4c9ba8 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -58,10 +58,10 @@ grpc_cc_library( ) grpc_cc_library( - name = "connection_delay_injector", + name = "connection_attempt_injector", testonly = True, - srcs = ["connection_delay_injector.cc"], - hdrs = ["connection_delay_injector.h"], + srcs = ["connection_attempt_injector.cc"], + hdrs = ["connection_attempt_injector.h"], deps = [ "//:grpc", ], @@ -490,7 +490,7 @@ grpc_cc_test( flaky = True, # TODO(b/151315347) tags = ["no_windows"], # TODO(jtattermusch): fix test on windows deps = [ - ":connection_delay_injector", + ":connection_attempt_injector", ":test_service_impl", "//:gpr", "//:grpc", diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 4ee6f697552..e3a03e650be 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -71,7 +71,7 @@ #include "test/core/util/resolve_localhost_ip46.h" #include "test/core/util/test_config.h" #include "test/core/util/test_lb_policies.h" -#include "test/cpp/end2end/connection_delay_injector.h" +#include "test/cpp/end2end/connection_attempt_injector.h" #include "test/cpp/end2end/test_service_impl.h" using grpc::testing::EchoRequest; @@ -798,9 +798,9 @@ TEST_F(PickFirstTest, BackOffMinReconnect) { response_generator.SetNextResolution(ports); // Make connection delay a 10% longer than it's willing to in order to make // sure we are hitting the codepath that waits for the min reconnect backoff. - ConnectionDelayInjector delay_injector( + ConnectionAttemptInjector injector; + injector.SetDelay( grpc_core::Duration::Milliseconds(kMinReconnectBackOffMs * 1.10)); - delay_injector.Start(); const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC); channel->WaitForConnected( grpc_timeout_milliseconds_to_deadline(kMinReconnectBackOffMs * 2)); @@ -850,8 +850,7 @@ TEST_F(PickFirstTest, ResetConnectionBackoff) { TEST_F(ClientLbEnd2endTest, ResetConnectionBackoffNextAttemptStartsImmediately) { // Start connection injector. - ConnectionHoldInjector injector; - injector.Start(); + ConnectionAttemptInjector injector; // Create client. const int port = grpc_pick_unused_port_or_die(); ChannelArguments args; @@ -897,8 +896,7 @@ TEST_F( PickFirstTest, TriesAllSubchannelsBeforeReportingTransientFailureWithSubchannelSharing) { // Start connection injector. - ConnectionHoldInjector injector; - injector.Start(); + ConnectionAttemptInjector injector; // Get 5 unused ports. Each channel will have 2 unique ports followed // by a common port. std::vector ports1 = {grpc_pick_unused_port_or_die(), @@ -1683,8 +1681,7 @@ TEST_F(RoundRobinTest, TransientFailureAtStartup) { TEST_F(RoundRobinTest, StaysInTransientFailureInSubsequentConnecting) { // Start connection injector. - ConnectionHoldInjector injector; - injector.Start(); + ConnectionAttemptInjector injector; // Get port. const int port = grpc_pick_unused_port_or_die(); // Create channel. @@ -1723,8 +1720,7 @@ TEST_F(RoundRobinTest, StaysInTransientFailureInSubsequentConnecting) { TEST_F(RoundRobinTest, ReportsLatestStatusInTransientFailure) { // Start connection injector. - ConnectionHoldInjector injector; - injector.Start(); + ConnectionAttemptInjector injector; // Get port. const std::vector ports = {grpc_pick_unused_port_or_die(), grpc_pick_unused_port_or_die()}; @@ -1774,8 +1770,7 @@ TEST_F(RoundRobinTest, ReportsLatestStatusInTransientFailure) { TEST_F(RoundRobinTest, DoesNotFailRpcsUponDisconnection) { // Start connection injector. - ConnectionHoldInjector injector; - injector.Start(); + ConnectionAttemptInjector injector; // Start server. StartServers(1); // Create channel. diff --git a/test/cpp/end2end/connection_delay_injector.cc b/test/cpp/end2end/connection_attempt_injector.cc similarity index 70% rename from test/cpp/end2end/connection_delay_injector.cc rename to test/cpp/end2end/connection_attempt_injector.cc index 2d97972334f..30d045ade03 100644 --- a/test/cpp/end2end/connection_delay_injector.cc +++ b/test/cpp/end2end/connection_attempt_injector.cc @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "test/cpp/end2end/connection_delay_injector.h" +#include "test/cpp/end2end/connection_attempt_injector.h" #include @@ -29,7 +29,7 @@ namespace grpc { namespace testing { // -// ConnectionAttemptInjector +// ConnectionAttemptInjector static setup // namespace { @@ -39,17 +39,30 @@ grpc_tcp_client_vtable* g_original_vtable = nullptr; grpc_core::Mutex* g_mu = nullptr; ConnectionAttemptInjector* g_injector ABSL_GUARDED_BY(*g_mu) = nullptr; -int64_t TcpConnectWithDelay(grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, - grpc_core::Timestamp deadline) { +} // namespace + +grpc_tcp_client_vtable ConnectionAttemptInjector::kDelayedConnectVTable = { + ConnectionAttemptInjector::TcpConnect, + ConnectionAttemptInjector::TcpConnectCancel}; + +void ConnectionAttemptInjector::Init() { + g_mu = new grpc_core::Mutex(); + g_original_vtable = grpc_tcp_client_impl; + grpc_tcp_client_impl = &kDelayedConnectVTable; +} + +int64_t ConnectionAttemptInjector::TcpConnect( + grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, + const grpc_resolved_address* addr, grpc_core::Timestamp deadline) { grpc_core::MutexLock lock(g_mu); + // If there's no injector, use the original vtable. if (g_injector == nullptr) { g_original_vtable->connect(closure, ep, interested_parties, channel_args, addr, deadline); return 0; } + // Otherwise, use the injector. g_injector->HandleConnection(closure, ep, interested_parties, channel_args, addr, deadline); return 0; @@ -60,25 +73,16 @@ int64_t TcpConnectWithDelay(grpc_closure* closure, grpc_endpoint** ep, // g_original_vtable->cancel_connect(). If the attempt has not actually been // started, it should mark the connect request as cancelled, so that when the // request is resumed, it will not actually proceed. -bool TcpConnectCancel(int64_t /*connection_handle*/) { return false; } - -grpc_tcp_client_vtable kDelayedConnectVTable = {TcpConnectWithDelay, - TcpConnectCancel}; - -} // namespace - -void ConnectionAttemptInjector::Init() { - g_mu = new grpc_core::Mutex(); - g_original_vtable = grpc_tcp_client_impl; - grpc_tcp_client_impl = &kDelayedConnectVTable; +bool ConnectionAttemptInjector::TcpConnectCancel( + int64_t /*connection_handle*/) { + return false; } -ConnectionAttemptInjector::~ConnectionAttemptInjector() { - grpc_core::MutexLock lock(g_mu); - g_injector = nullptr; -} +// +// ConnectionAttemptInjector instance +// -void ConnectionAttemptInjector::Start() { +ConnectionAttemptInjector::ConnectionAttemptInjector() { // Fail if ConnectionAttemptInjector::Init() was not called after // grpc_init() to inject the vtable. GPR_ASSERT(grpc_tcp_client_impl == &kDelayedConnectVTable); @@ -87,14 +91,95 @@ void ConnectionAttemptInjector::Start() { g_injector = this; } -void ConnectionAttemptInjector::AttemptConnection( +ConnectionAttemptInjector::~ConnectionAttemptInjector() { + grpc_core::MutexLock lock(g_mu); + g_injector = nullptr; +} + +std::unique_ptr +ConnectionAttemptInjector::AddHold(int port, bool intercept_completion) { + grpc_core::MutexLock lock(&mu_); + auto hold = absl::make_unique(this, port, intercept_completion); + holds_.push_back(hold.get()); + return hold; +} + +void ConnectionAttemptInjector::SetDelay(grpc_core::Duration delay) { + grpc_core::MutexLock lock(&mu_); + delay_ = delay; +} + +void ConnectionAttemptInjector::HandleConnection( grpc_closure* closure, grpc_endpoint** ep, grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, const grpc_resolved_address* addr, grpc_core::Timestamp deadline) { + const int port = grpc_sockaddr_get_port(addr); + gpr_log(GPR_INFO, "==> HandleConnection(): port=%d", port); + { + grpc_core::MutexLock lock(&mu_); + // First, check if there's a hold request for this port. + for (auto it = holds_.begin(); it != holds_.end(); ++it) { + Hold* hold = *it; + if (port == hold->port_) { + gpr_log(GPR_INFO, "*** INTERCEPTING CONNECTION ATTEMPT"); + if (hold->intercept_completion_) { + hold->original_on_complete_ = closure; + closure = GRPC_CLOSURE_INIT(&hold->on_complete_, Hold::OnComplete, + hold, nullptr); + } + hold->queued_attempt_ = absl::make_unique( + closure, ep, interested_parties, channel_args, addr, deadline); + hold->start_cv_.Signal(); + holds_.erase(it); + return; + } + } + // Otherwise, if there's a configured delay, impose it. + if (delay_.has_value()) { + new InjectedDelay(*delay_, closure, ep, interested_parties, channel_args, + addr, deadline); + return; + } + } + // Anything we're not holding or delaying should proceed normally. g_original_vtable->connect(closure, ep, interested_parties, channel_args, addr, deadline); } +// +// ConnectionAttemptInjector::QueuedAttempt +// + +ConnectionAttemptInjector::QueuedAttempt::QueuedAttempt( + grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, + const grpc_resolved_address* addr, grpc_core::Timestamp deadline) + : closure_(closure), + endpoint_(ep), + interested_parties_(interested_parties), + channel_args_(grpc_channel_args_copy(channel_args)), + deadline_(deadline) { + memcpy(&address_, addr, sizeof(address_)); +} + +ConnectionAttemptInjector::QueuedAttempt::~QueuedAttempt() { + GPR_ASSERT(closure_ == nullptr); + grpc_channel_args_destroy(channel_args_); +} + +void ConnectionAttemptInjector::QueuedAttempt::Resume() { + GPR_ASSERT(closure_ != nullptr); + g_original_vtable->connect(closure_, endpoint_, interested_parties_, + channel_args_, &address_, deadline_); + closure_ = nullptr; +} + +void ConnectionAttemptInjector::QueuedAttempt::Fail(grpc_error_handle error) { + GPR_ASSERT(closure_ != nullptr); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure_, error); + closure_ = nullptr; +} + // // ConnectionAttemptInjector::InjectedDelay // @@ -113,34 +198,21 @@ ConnectionAttemptInjector::InjectedDelay::InjectedDelay( void ConnectionAttemptInjector::InjectedDelay::TimerCallback( void* arg, grpc_error_handle /*error*/) { auto* self = static_cast(arg); - self->BeforeResumingAction(); self->attempt_.Resume(); delete self; } // -// ConnectionDelayInjector +// ConnectionAttemptInjector::Hold // -void ConnectionDelayInjector::HandleConnection( - grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, grpc_core::Timestamp deadline) { - new InjectedDelay(duration_, closure, ep, interested_parties, channel_args, - addr, deadline); -} - -// -// ConnectionHoldInjector::Hold -// - -ConnectionHoldInjector::Hold::Hold(ConnectionHoldInjector* injector, int port, - bool intercept_completion) +ConnectionAttemptInjector::Hold::Hold(ConnectionAttemptInjector* injector, + int port, bool intercept_completion) : injector_(injector), port_(port), intercept_completion_(intercept_completion) {} -void ConnectionHoldInjector::Hold::Wait() { +void ConnectionAttemptInjector::Hold::Wait() { gpr_log(GPR_INFO, "=== WAITING FOR CONNECTION ATTEMPT ON PORT %d ===", port_); grpc_core::MutexLock lock(&injector_->mu_); while (queued_attempt_ == nullptr) { @@ -149,7 +221,7 @@ void ConnectionHoldInjector::Hold::Wait() { gpr_log(GPR_INFO, "=== CONNECTION ATTEMPT STARTED ON PORT %d ===", port_); } -void ConnectionHoldInjector::Hold::Resume() { +void ConnectionAttemptInjector::Hold::Resume() { gpr_log(GPR_INFO, "=== RESUMING CONNECTION ATTEMPT ON PORT %d ===", port_); grpc_core::ExecCtx exec_ctx; std::unique_ptr attempt; @@ -160,7 +232,7 @@ void ConnectionHoldInjector::Hold::Resume() { attempt->Resume(); } -void ConnectionHoldInjector::Hold::Fail(grpc_error_handle error) { +void ConnectionAttemptInjector::Hold::Fail(grpc_error_handle error) { gpr_log(GPR_INFO, "=== FAILING CONNECTION ATTEMPT ON PORT %d ===", port_); grpc_core::ExecCtx exec_ctx; std::unique_ptr attempt; @@ -171,7 +243,7 @@ void ConnectionHoldInjector::Hold::Fail(grpc_error_handle error) { attempt->Fail(error); } -void ConnectionHoldInjector::Hold::WaitForCompletion() { +void ConnectionAttemptInjector::Hold::WaitForCompletion() { gpr_log(GPR_INFO, "=== WAITING FOR CONNECTION COMPLETION ON PORT %d ===", port_); grpc_core::MutexLock lock(&injector_->mu_); @@ -181,13 +253,13 @@ void ConnectionHoldInjector::Hold::WaitForCompletion() { gpr_log(GPR_INFO, "=== CONNECTION COMPLETED ON PORT %d ===", port_); } -bool ConnectionHoldInjector::Hold::IsStarted() { +bool ConnectionAttemptInjector::Hold::IsStarted() { grpc_core::MutexLock lock(&injector_->mu_); return !start_cv_.WaitWithDeadline(&injector_->mu_, absl::Now()); } -void ConnectionHoldInjector::Hold::OnComplete(void* arg, - grpc_error_handle error) { +void ConnectionAttemptInjector::Hold::OnComplete(void* arg, + grpc_error_handle error) { auto* self = static_cast(arg); grpc_closure* on_complete; { @@ -199,47 +271,5 @@ void ConnectionHoldInjector::Hold::OnComplete(void* arg, grpc_core::Closure::Run(DEBUG_LOCATION, on_complete, GRPC_ERROR_REF(error)); } -// -// ConnectionHoldInjector -// - -std::unique_ptr ConnectionHoldInjector::AddHold( - int port, bool intercept_completion) { - grpc_core::MutexLock lock(&mu_); - auto hold = absl::make_unique(this, port, intercept_completion); - holds_.push_back(hold.get()); - return hold; -} - -void ConnectionHoldInjector::HandleConnection( - grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, grpc_core::Timestamp deadline) { - const int port = grpc_sockaddr_get_port(addr); - gpr_log(GPR_INFO, "==> HandleConnection(): port=%d", port); - { - grpc_core::MutexLock lock(&mu_); - for (auto it = holds_.begin(); it != holds_.end(); ++it) { - Hold* hold = *it; - if (port == hold->port_) { - gpr_log(GPR_INFO, "*** INTERCEPTING CONNECTION ATTEMPT"); - if (hold->intercept_completion_) { - hold->original_on_complete_ = closure; - closure = GRPC_CLOSURE_INIT(&hold->on_complete_, Hold::OnComplete, - hold, nullptr); - } - hold->queued_attempt_ = absl::make_unique( - closure, ep, interested_parties, channel_args, addr, deadline); - hold->start_cv_.Signal(); - holds_.erase(it); - return; - } - } - } - // Anything we're not holding should proceed normally. - AttemptConnection(closure, ep, interested_parties, channel_args, addr, - deadline); -} - } // namespace testing } // namespace grpc diff --git a/test/cpp/end2end/connection_delay_injector.h b/test/cpp/end2end/connection_attempt_injector.h similarity index 60% rename from test/cpp/end2end/connection_delay_injector.h rename to test/cpp/end2end/connection_attempt_injector.h index f5c4c11c6d1..36e6dcf88aa 100644 --- a/test/cpp/end2end/connection_delay_injector.h +++ b/test/cpp/end2end/connection_attempt_injector.h @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef GRPC_TEST_CPP_END2END_CONNECTION_DELAY_INJECTOR_H -#define GRPC_TEST_CPP_END2END_CONNECTION_DELAY_INJECTOR_H +#ifndef GRPC_TEST_CPP_END2END_CONNECTION_ATTEMPT_INJECTOR_H +#define GRPC_TEST_CPP_END2END_CONNECTION_ATTEMPT_INJECTOR_H #include @@ -31,36 +31,92 @@ namespace testing { // // At grpc_init() time. // ConnectionAttemptInjector::Init(); // -// // When an injection is desired. -// ConnectionDelayInjector delay_injector(grpc_core::Duration::Seconds(10)); -// delay_injector.Start(); +// // Instantiate when injection is desired. +// ConnectionAttemptInjector injector; +// +// // To inject a hold for the next connection attempt for a given port. +// auto hold = injector.AddHold(port); +// hold.Wait(); +// // ...do stuff... +// hold.Resume(); // Or hold.Fail() if you want to force a failure. +// +// // Inject a fixed delay for all connection attempts. +// injector.SetDelay(grpc_core::Duration::Seconds(10)); // // The injection is global, so there must be only one ConnectionAttemptInjector // object at any one time. -class ConnectionAttemptInjector { +// +// Note: This must be "final" to avoid tsan problems in both the ctor +// and dtor related to initializing the vtable. +class ConnectionAttemptInjector final { + private: + // Forward declarations. + class QueuedAttempt; + + grpc_core::Mutex mu_; + public: + class Hold { + public: + // Do not instantiate directly -- must be created via AddHold(). + Hold(ConnectionAttemptInjector* injector, int port, + bool intercept_completion); + + // Waits for the connection attempt to start. + // After this returns, exactly one of Resume() or Fail() must be called. + void Wait(); + + // Resumes a connection attempt. Must be called after Wait(). + void Resume(); + + // Fails a connection attempt. Must be called after Wait(). + void Fail(grpc_error_handle error); + + // If the hold was created with intercept_completion=true, then this + // can be called after Resume() to wait for the connection attempt + // to complete. + void WaitForCompletion(); + + // Returns true if the connection attempt has been started. + bool IsStarted(); + + private: + friend class ConnectionAttemptInjector; + + static void OnComplete(void* arg, grpc_error_handle error); + + ConnectionAttemptInjector* injector_; + const int port_; + const bool intercept_completion_; + std::unique_ptr queued_attempt_ + ABSL_GUARDED_BY(&ConnectionAttemptInjector::mu_); + grpc_core::CondVar start_cv_; + grpc_closure on_complete_; + grpc_closure* original_on_complete_; + grpc_core::CondVar complete_cv_; + }; + // Global initializer. Replaces the iomgr TCP client vtable. // Must be called exactly once after grpc_init() but before any TCP // connections are established. static void Init(); - virtual ~ConnectionAttemptInjector(); + ConnectionAttemptInjector(); + ~ConnectionAttemptInjector(); - // Must be called after instantiation. - void Start(); + // Adds a hold for a given port. The caller may then use Wait() on + // the resulting Hold object to wait for the connection attempt to start. + // If intercept_completion is true, the caller can use WaitForCompletion() + // on the resulting Hold object. + std::unique_ptr AddHold(int port, bool intercept_completion = false); - // Invoked for every TCP connection attempt. - // Implementations must eventually either invoke the closure - // themselves or delegate to the iomgr implementation by calling - // AttemptConnection(). QueuedAttempt may be used to queue an attempt - // for asynchronous processing. - virtual void HandleConnection(grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, - grpc_core::Timestamp deadline) = 0; + // Set a fixed delay for all RPCs. Will be used only if there is no + // hold for the connection attempt. + void SetDelay(grpc_core::Duration delay); + + private: + static grpc_tcp_client_vtable kDelayedConnectVTable; - protected: // Represents a queued attempt. // The caller must invoke either Resume() or Fail() before destroying. class QueuedAttempt { @@ -69,34 +125,14 @@ class ConnectionAttemptInjector { grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, const grpc_resolved_address* addr, - grpc_core::Timestamp deadline) - : closure_(closure), - endpoint_(ep), - interested_parties_(interested_parties), - channel_args_(grpc_channel_args_copy(channel_args)), - deadline_(deadline) { - memcpy(&address_, addr, sizeof(address_)); - } - - ~QueuedAttempt() { - GPR_ASSERT(closure_ == nullptr); - grpc_channel_args_destroy(channel_args_); - } + grpc_core::Timestamp deadline); + ~QueuedAttempt(); // Caller must invoke this from a thread with an ExecCtx. - void Resume() { - GPR_ASSERT(closure_ != nullptr); - AttemptConnection(closure_, endpoint_, interested_parties_, channel_args_, - &address_, deadline_); - closure_ = nullptr; - } + void Resume(); // Caller must invoke this from a thread with an ExecCtx. - void Fail(grpc_error_handle error) { - GPR_ASSERT(closure_ != nullptr); - grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure_, error); - closure_ = nullptr; - } + void Fail(grpc_error_handle error); private: grpc_closure* closure_; @@ -119,9 +155,6 @@ class ConnectionAttemptInjector { grpc_core::Timestamp deadline); private: - // Subclasses can override to perform an action when the attempt resumes. - virtual void BeforeResumingAction() {} - static void TimerCallback(void* arg, grpc_error_handle /*error*/); QueuedAttempt attempt_; @@ -129,92 +162,33 @@ class ConnectionAttemptInjector { grpc_closure timer_callback_; }; - static void AttemptConnection(grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, - grpc_core::Timestamp deadline); -}; - -// A concrete implementation that injects a fixed delay. -class ConnectionDelayInjector : public ConnectionAttemptInjector { - public: - explicit ConnectionDelayInjector(grpc_core::Duration duration) - : duration_(duration) {} - + // Invoked for every TCP connection attempt. void HandleConnection(grpc_closure* closure, grpc_endpoint** ep, grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, const grpc_resolved_address* addr, - grpc_core::Timestamp deadline) override; - - private: - grpc_core::Duration duration_; -}; - -// A concrete implementation that allows injecting holds for individual -// connection attemps, one at a time. -class ConnectionHoldInjector : public ConnectionAttemptInjector { - private: - grpc_core::Mutex mu_; // Needs to be declared up front. - - public: - class Hold { - public: - // Do not instantiate directly -- must be created via AddHold(). - Hold(ConnectionHoldInjector* injector, int port, bool intercept_completion); + grpc_core::Timestamp deadline); - // Waits for the connection attempt to start. - // After this returns, exactly one of Resume() or Fail() must be called. - void Wait(); - - // Resumes a connection attempt. Must be called after Wait(). - void Resume(); - - // Fails a connection attempt. Must be called after Wait(). - void Fail(grpc_error_handle error); - - // If the hold was created with intercept_completion=true, then this - // can be called after Resume() to wait for the connection attempt - // to complete. - void WaitForCompletion(); - - // Returns true if the connection attempt has been started. - bool IsStarted(); - - private: - friend class ConnectionHoldInjector; - - static void OnComplete(void* arg, grpc_error_handle error); - - ConnectionHoldInjector* injector_; - const int port_; - const bool intercept_completion_; - std::unique_ptr queued_attempt_ - ABSL_GUARDED_BY(&ConnectionHoldInjector::mu_); - grpc_core::CondVar start_cv_; - grpc_closure on_complete_; - grpc_closure* original_on_complete_; - grpc_core::CondVar complete_cv_; - }; - - // Adds a hold for a given port. The caller may then use Wait() on - // the resulting Hold object to wait for the connection attempt to start. - // If intercept_completion is true, the caller can use WaitForCompletion() - // on the resulting Hold object. - std::unique_ptr AddHold(int port, bool intercept_completion = false); - - private: - void HandleConnection(grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, - grpc_core::Timestamp deadline) override; + static void AttemptConnection(grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* addr, + grpc_core::Timestamp deadline); - std::vector holds_; + // Replacement iomgr tcp_connect vtable functions that use the current + // ConnectionAttemptInjector object. + static int64_t TcpConnect(grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* addr, + grpc_core::Timestamp deadline); + static bool TcpConnectCancel(int64_t connection_handle); + + std::vector holds_ ABSL_GUARDED_BY(&mu_); + absl::optional delay_ ABSL_GUARDED_BY(&mu_); }; } // namespace testing } // namespace grpc -#endif // GRPC_TEST_CPP_END2END_CONNECTION_DELAY_INJECTOR_H +#endif // GRPC_TEST_CPP_END2END_CONNECTION_ATTEMPT_INJECTOR_H diff --git a/test/cpp/end2end/xds/BUILD b/test/cpp/end2end/xds/BUILD index 772d23ecfaf..55ad32fb695 100644 --- a/test/cpp/end2end/xds/BUILD +++ b/test/cpp/end2end/xds/BUILD @@ -166,7 +166,7 @@ grpc_cc_test( "//:grpc", "//:grpc++", "//test/core/util:grpc_test_util", - "//test/cpp/end2end:connection_delay_injector", + "//test/cpp/end2end:connection_attempt_injector", ], ) @@ -191,7 +191,7 @@ grpc_cc_test( "//:grpc_resolver_fake", "//src/proto/grpc/testing/xds/v3:aggregate_cluster_proto", "//test/core/util:grpc_test_util", - "//test/cpp/end2end:connection_delay_injector", + "//test/cpp/end2end:connection_attempt_injector", ], ) @@ -322,7 +322,7 @@ grpc_cc_test( "//src/proto/grpc/testing/xds/v3:listener_proto", "//src/proto/grpc/testing/xds/v3:route_proto", "//test/core/util:grpc_test_util", - "//test/cpp/end2end:connection_delay_injector", + "//test/cpp/end2end:connection_attempt_injector", ], ) diff --git a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc index 2b5c01d2381..9c73811cfb2 100644 --- a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc @@ -25,7 +25,7 @@ #include "src/core/ext/filters/client_channel/backup_poller.h" #include "src/core/lib/address_utils/sockaddr_utils.h" -#include "test/cpp/end2end/connection_delay_injector.h" +#include "test/cpp/end2end/connection_attempt_injector.h" #include "test/cpp/end2end/xds/xds_end2end_test_lib.h" namespace grpc { @@ -1155,8 +1155,7 @@ TEST_P(FailoverTest, ReportsConnectingDuringFailover) { {"locality1", CreateEndpointsForBackends(), kDefaultLocalityWeight, 1}, }); balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - ConnectionHoldInjector injector; - injector.Start(); + ConnectionAttemptInjector injector; auto hold = injector.AddHold(backends_[0]->port()); // Start an RPC in the background, which should cause the channel to // try to connect. diff --git a/test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc b/test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc index 87a03402421..165f7c776e7 100644 --- a/test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc @@ -28,7 +28,7 @@ #include "src/core/lib/gpr/env.h" #include "src/core/lib/resolver/server_address.h" #include "src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h" -#include "test/cpp/end2end/connection_delay_injector.h" +#include "test/cpp/end2end/connection_attempt_injector.h" #include "test/cpp/end2end/xds/xds_end2end_test_lib.h" namespace grpc { @@ -435,8 +435,7 @@ TEST_P(AggregateClusterTest, FallBackWithConnectivityChurn) { custom_cluster->mutable_typed_config()->PackFrom(cluster_config); balancer_->ads_service()->SetCdsResource(cluster); // Start connection injector. - ConnectionHoldInjector injector; - injector.Start(); + ConnectionAttemptInjector injector; auto hold0 = injector.AddHold(backends_[0]->port()); auto hold1 = injector.AddHold(backends_[1]->port()); // Start long-running RPC in the background. diff --git a/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc b/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc index 0479c6f0de2..7d8dd4e9e9f 100644 --- a/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc @@ -29,7 +29,7 @@ #include "src/core/lib/gpr/env.h" #include "src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h" #include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h" -#include "test/cpp/end2end/connection_delay_injector.h" +#include "test/cpp/end2end/connection_attempt_injector.h" #include "test/cpp/end2end/xds/xds_end2end_test_lib.h" namespace grpc { @@ -212,9 +212,9 @@ TEST_P(RingHashTest, std::move(result)); } // Inject connection delay to make this act more realistically. - ConnectionDelayInjector delay_injector( - grpc_core::Duration::Milliseconds(500) * grpc_test_slowdown_factor()); - delay_injector.Start(); + ConnectionAttemptInjector injector; + injector.SetDelay(grpc_core::Duration::Milliseconds(500) * + grpc_test_slowdown_factor()); // Send RPC. Need the timeout to be long enough to account for the // subchannel connection delays. CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(5000)); @@ -280,8 +280,7 @@ TEST_P(RingHashTest, std::move(result)); } // Set up connection attempt injector. - ConnectionHoldInjector injector; - injector.Start(); + ConnectionAttemptInjector injector; auto hold = injector.AddHold(backends_[0]->port()); // Increase subchannel backoff time, so that subchannels stay in // TRANSIENT_FAILURE for long enough to trigger potential problems. @@ -789,8 +788,7 @@ TEST_P(RingHashTest, ContinuesConnectingWithoutPicks) { new_route_config); // Start connection attempt injector and add a hold for the P0 // connection attempt. - ConnectionHoldInjector injector; - injector.Start(); + ConnectionAttemptInjector injector; auto hold = injector.AddHold(non_existant_endpoint.port); // A long-running RPC, just used to send the RPC in another thread. LongRunningRpc rpc; @@ -835,8 +833,7 @@ TEST_P(RingHashTest, ContinuesConnectingWithoutPicksOneSubchannelAtATime) { SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, new_route_config); // Start connection attempt injector. - ConnectionHoldInjector injector; - injector.Start(); + ConnectionAttemptInjector injector; auto hold_non_existant0 = injector.AddHold(non_existant_endpoint0.port); auto hold_non_existant1 = injector.AddHold(non_existant_endpoint1.port); auto hold_non_existant2 = injector.AddHold(non_existant_endpoint2.port);