From 6f6f0df2baeabfb7563da6fffcac6bcd0b04d149 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Tue, 5 Mar 2024 12:36:46 -0800 Subject: [PATCH] Make TestServiceSignaller more generically useful There are tests in another change I'm working on for which we need to do things like: 1) waiting for N RPCs to concurrently pile up on a server 2) doing 1) multiple times sequentially on the same server PiperOrigin-RevId: 612934305 --- test/cpp/end2end/end2end_test.cc | 4 +-- test/cpp/end2end/test_service_impl.h | 49 ++++++++++++++++++++++------ 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 673ce88315a..6647c331fd9 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -1206,9 +1206,9 @@ TEST_P(End2endTest, CancelRpcAfterStart) { s = stub_->Echo(&context, request, &response); }); if (!GetParam().callback_server()) { - service_.ClientWaitUntilRpcStarted(); + EXPECT_EQ(service_.ClientWaitUntilNRpcsStarted(1), 1); } else { - callback_service_.ClientWaitUntilRpcStarted(); + EXPECT_EQ(callback_service_.ClientWaitUntilNRpcsStarted(1), 1); } context.TryCancel(); diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h index c6c9edd6541..9415b88c199 100644 --- a/test/cpp/end2end/test_service_impl.h +++ b/test/cpp/end2end/test_service_impl.h @@ -89,11 +89,27 @@ void ServerTryCancel(ServerContext* context); class TestServiceSignaller { public: - void ClientWaitUntilRpcStarted() { - gpr_log(GPR_DEBUG, "*** enter ClientWaitUntilRpcStarted ***"); + // Waits for at least *desired_rpcs* to to be waiting for a server + // continue notification. + // Returns when *desired_rpcs* reaches that amount, or when we've + // surpassed the timeout, whichever happens first. The return value + // is whatever the number of RPCs waiting for server notification is + // at that time. + int ClientWaitUntilNRpcsStarted(int desired_rpcs, absl::Duration timeout) { + gpr_log(GPR_DEBUG, "*** enter ClientWaitUntilNRpcsStarted ***"); + absl::Time deadline = absl::Now() + timeout; + std::chrono::system_clock::time_point chrono_deadline = + absl::ToChronoTime(deadline); std::unique_lock lock(mu_); - cv_rpc_started_.wait(lock, [this] { return rpc_started_; }); - gpr_log(GPR_DEBUG, "*** leave ClientWaitUntilRpcStarted ***"); + cv_rpc_started_.wait_until(lock, chrono_deadline, [this, desired_rpcs] { + gpr_log( + GPR_DEBUG, + "*** desired_rpcs: %d rpcs_waiting_for_server_to_continue_: %d ***", + desired_rpcs, rpcs_waiting_for_server_to_continue_); + return rpcs_waiting_for_server_to_continue_ >= desired_rpcs; + }); + gpr_log(GPR_DEBUG, "*** leave ClientWaitUntilNRpcsStarted ***"); + return rpcs_waiting_for_server_to_continue_; } void ServerWaitToContinue() { gpr_log(GPR_DEBUG, "*** enter ServerWaitToContinue ***"); @@ -104,20 +120,25 @@ class TestServiceSignaller { void SignalClientThatRpcStarted() { gpr_log(GPR_DEBUG, "*** SignalClientThatRpcStarted ***"); std::unique_lock lock(mu_); - rpc_started_ = true; - cv_rpc_started_.notify_one(); + ++rpcs_waiting_for_server_to_continue_; + cv_rpc_started_.notify_all(); } void SignalServerToContinue() { gpr_log(GPR_DEBUG, "*** SignalServerToContinue ***"); std::unique_lock lock(mu_); server_should_continue_ = true; - cv_server_continue_.notify_one(); + cv_server_continue_.notify_all(); + } + void Reset() { + std::unique_lock lock(mu_); + rpcs_waiting_for_server_to_continue_ = 0; + server_should_continue_ = false; } private: std::mutex mu_; std::condition_variable cv_rpc_started_; - bool rpc_started_ /* GUARDED_BY(mu_) */ = false; + int rpcs_waiting_for_server_to_continue_ /* GUARDED_BY(mu_) */ = 0; std::condition_variable cv_server_continue_; bool server_should_continue_ /* GUARDED_BY(mu_) */ = false; }; @@ -451,8 +472,12 @@ class TestMultipleServiceImpl : public RpcService { std::unique_lock lock(mu_); return signal_client_; } - void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); } + int ClientWaitUntilNRpcsStarted(int desired_rpcs, + absl::Duration timeout = absl::Minutes(1)) { + return signaller_.ClientWaitUntilNRpcsStarted(desired_rpcs, timeout); + } void SignalServerToContinue() { signaller_.SignalServerToContinue(); } + void ResetSignaller() { signaller_.Reset(); } uint64_t RpcsWaitingForClientCancel() { std::unique_lock lock(mu_); return rpcs_waiting_for_client_cancel_; @@ -495,8 +520,12 @@ class CallbackTestServiceImpl std::unique_lock lock(mu_); return signal_client_; } - void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); } + int ClientWaitUntilNRpcsStarted(int desired_rpcs, + absl::Duration timeout = absl::Minutes(1)) { + return signaller_.ClientWaitUntilNRpcsStarted(desired_rpcs, timeout); + } void SignalServerToContinue() { signaller_.SignalServerToContinue(); } + void ResetSignaller() { signaller_.Reset(); } private: bool signal_client_;