Make TestServiceSignaller more generically useful

There are tests in another change I'm working on for which we need to do things like:

1) waiting for N RPCs to concurrently pile up on a server

2) doing 1) multiple times sequentially on the same server

PiperOrigin-RevId: 612934305
pull/35913/head
Alexander Polcyn 11 months ago committed by Copybara-Service
parent db9ca10b35
commit 6f6f0df2ba
  1. 4
      test/cpp/end2end/end2end_test.cc
  2. 49
      test/cpp/end2end/test_service_impl.h

@ -1206,9 +1206,9 @@ TEST_P(End2endTest, CancelRpcAfterStart) {
s = stub_->Echo(&context, request, &response);
});
if (!GetParam().callback_server()) {
service_.ClientWaitUntilRpcStarted();
EXPECT_EQ(service_.ClientWaitUntilNRpcsStarted(1), 1);
} else {
callback_service_.ClientWaitUntilRpcStarted();
EXPECT_EQ(callback_service_.ClientWaitUntilNRpcsStarted(1), 1);
}
context.TryCancel();

@ -89,11 +89,27 @@ void ServerTryCancel(ServerContext* context);
class TestServiceSignaller {
public:
void ClientWaitUntilRpcStarted() {
gpr_log(GPR_DEBUG, "*** enter ClientWaitUntilRpcStarted ***");
// Waits for at least *desired_rpcs* to to be waiting for a server
// continue notification.
// Returns when *desired_rpcs* reaches that amount, or when we've
// surpassed the timeout, whichever happens first. The return value
// is whatever the number of RPCs waiting for server notification is
// at that time.
int ClientWaitUntilNRpcsStarted(int desired_rpcs, absl::Duration timeout) {
gpr_log(GPR_DEBUG, "*** enter ClientWaitUntilNRpcsStarted ***");
absl::Time deadline = absl::Now() + timeout;
std::chrono::system_clock::time_point chrono_deadline =
absl::ToChronoTime(deadline);
std::unique_lock<std::mutex> lock(mu_);
cv_rpc_started_.wait(lock, [this] { return rpc_started_; });
gpr_log(GPR_DEBUG, "*** leave ClientWaitUntilRpcStarted ***");
cv_rpc_started_.wait_until(lock, chrono_deadline, [this, desired_rpcs] {
gpr_log(
GPR_DEBUG,
"*** desired_rpcs: %d rpcs_waiting_for_server_to_continue_: %d ***",
desired_rpcs, rpcs_waiting_for_server_to_continue_);
return rpcs_waiting_for_server_to_continue_ >= desired_rpcs;
});
gpr_log(GPR_DEBUG, "*** leave ClientWaitUntilNRpcsStarted ***");
return rpcs_waiting_for_server_to_continue_;
}
void ServerWaitToContinue() {
gpr_log(GPR_DEBUG, "*** enter ServerWaitToContinue ***");
@ -104,20 +120,25 @@ class TestServiceSignaller {
void SignalClientThatRpcStarted() {
gpr_log(GPR_DEBUG, "*** SignalClientThatRpcStarted ***");
std::unique_lock<std::mutex> lock(mu_);
rpc_started_ = true;
cv_rpc_started_.notify_one();
++rpcs_waiting_for_server_to_continue_;
cv_rpc_started_.notify_all();
}
void SignalServerToContinue() {
gpr_log(GPR_DEBUG, "*** SignalServerToContinue ***");
std::unique_lock<std::mutex> lock(mu_);
server_should_continue_ = true;
cv_server_continue_.notify_one();
cv_server_continue_.notify_all();
}
void Reset() {
std::unique_lock<std::mutex> lock(mu_);
rpcs_waiting_for_server_to_continue_ = 0;
server_should_continue_ = false;
}
private:
std::mutex mu_;
std::condition_variable cv_rpc_started_;
bool rpc_started_ /* GUARDED_BY(mu_) */ = false;
int rpcs_waiting_for_server_to_continue_ /* GUARDED_BY(mu_) */ = 0;
std::condition_variable cv_server_continue_;
bool server_should_continue_ /* GUARDED_BY(mu_) */ = false;
};
@ -451,8 +472,12 @@ class TestMultipleServiceImpl : public RpcService {
std::unique_lock<std::mutex> lock(mu_);
return signal_client_;
}
void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); }
int ClientWaitUntilNRpcsStarted(int desired_rpcs,
absl::Duration timeout = absl::Minutes(1)) {
return signaller_.ClientWaitUntilNRpcsStarted(desired_rpcs, timeout);
}
void SignalServerToContinue() { signaller_.SignalServerToContinue(); }
void ResetSignaller() { signaller_.Reset(); }
uint64_t RpcsWaitingForClientCancel() {
std::unique_lock<std::mutex> lock(mu_);
return rpcs_waiting_for_client_cancel_;
@ -495,8 +520,12 @@ class CallbackTestServiceImpl
std::unique_lock<std::mutex> lock(mu_);
return signal_client_;
}
void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); }
int ClientWaitUntilNRpcsStarted(int desired_rpcs,
absl::Duration timeout = absl::Minutes(1)) {
return signaller_.ClientWaitUntilNRpcsStarted(desired_rpcs, timeout);
}
void SignalServerToContinue() { signaller_.SignalServerToContinue(); }
void ResetSignaller() { signaller_.Reset(); }
private:
bool signal_client_;

Loading…
Cancel
Save