diff --git a/src/proto/grpc/testing/echo_messages.proto b/src/proto/grpc/testing/echo_messages.proto index 2c773ea0aaa..ef99cfc657e 100644 --- a/src/proto/grpc/testing/echo_messages.proto +++ b/src/proto/grpc/testing/echo_messages.proto @@ -47,9 +47,10 @@ message RequestParams { bool server_die = 12; // Server should not see a request with this set. string binary_error_details = 13; ErrorStatus expected_error = 14; - int32 server_sleep_us = 15; // Amount to sleep when invoking server + int32 server_sleep_us = 15; // sleep when invoking server for deadline tests int32 backend_channel_idx = 16; // which backend to send request to bool echo_metadata_initially = 17; + bool server_notify_started = 18; } message EchoRequest { diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 114d6ccfd21..4f8d6c7d937 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -1131,8 +1131,6 @@ TEST_P(End2endTest, CancelRpcBeforeStart) { } } -// TODO(https://github.com/grpc/grpc/issues/21263): stop using timed sleeps to -// synchronize cancellation semantics. TEST_P(End2endTest, CancelDelayedRpc) { MAYBE_SKIP_TEST; ResetStub(); @@ -1140,15 +1138,27 @@ TEST_P(End2endTest, CancelDelayedRpc) { EchoResponse response; ClientContext context; request.set_message("hello"); - request.mutable_param()->set_server_sleep_us(100 * 1000); + request.mutable_param()->set_server_notify_started(true); request.mutable_param()->set_skip_cancelled_check(true); Status s; std::thread echo_thread([this, &s, &context, &request, &response] { s = stub_->Echo(&context, request, &response); EXPECT_EQ(StatusCode::CANCELLED, s.error_code()); }); - std::this_thread::sleep_for(std::chrono::microseconds(10 * 1000)); + if (!GetParam().callback_server) { + service_.ClientWaitRpcStarted(); + } else { + callback_service_.ClientWaitRpcStarted(); + } + context.TryCancel(); + + if (!GetParam().callback_server) { + service_.SignalServerToContinue(); + } else { + callback_service_.SignalServerToContinue(); + } + echo_thread.join(); EXPECT_EQ("", response.message()); EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index c50f8d8b5f3..5ac6ad28e4e 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -127,6 +127,11 @@ void ServerTryCancelNonblocking(experimental::CallbackServerContext* context) { Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) { + if (request->has_param() && request->param().server_notify_started()) { + signaller_.SignalClientRpcStarted(); + signaller_.ServerWaitToContinue(); + } + // A bit of sleep to make sure that short deadline tests fail if (request->has_param() && request->param().server_sleep_us() > 0) { gpr_sleep_until( @@ -416,19 +421,37 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo( : service_(service), ctx_(ctx), req_(request), resp_(response) { // It should be safe to call IsCancelled here, even though we don't know // the result. Call it asynchronously to see if we trigger any data races. + // Join it in OnDone (technically that could be blocking but shouldn't be + // for very long). async_cancel_check_ = std::thread([this] { (void)ctx_->IsCancelled(); }); - if (request->has_param() && request->param().server_sleep_us() > 0) { + started_ = true; + + if (request->has_param() && request->param().server_notify_started()) { + service->signaller_.SignalClientRpcStarted(); + // Block on the "wait to continue" decision in a different thread since + // we can't tie up an EM thread with blocking events. We can join it in + // OnDone since it would definitely be done by then. + rpc_wait_thread_ = std::thread([this] { + service_->signaller_.ServerWaitToContinue(); + StartRpc(); + }); + } else { + StartRpc(); + } + } + + void StartRpc() { + if (req_->has_param() && req_->param().server_sleep_us() > 0) { // Set an alarm for that much time alarm_.experimental().Set( gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_micros( - request->param().server_sleep_us(), GPR_TIMESPAN)), + gpr_time_from_micros(req_->param().server_sleep_us(), + GPR_TIMESPAN)), [this](bool ok) { NonDelayed(ok); }); } else { NonDelayed(true); } - started_ = true; } void OnSendInitialMetadataDone(bool ok) override { EXPECT_TRUE(ok); @@ -448,6 +471,9 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo( } EXPECT_EQ(ctx_->IsCancelled(), on_cancel_invoked_); async_cancel_check_.join(); + if (rpc_wait_thread_.joinable()) { + rpc_wait_thread_.join(); + } delete this; } @@ -575,6 +601,7 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo( bool started_{false}; bool on_cancel_invoked_{false}; std::thread async_cancel_check_; + std::thread rpc_wait_thread_; }; return new Reactor(this, context, request, response); diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h index 5673f0fa1c6..4b385ecbb0d 100644 --- a/test/cpp/end2end/test_service_impl.h +++ b/test/cpp/end2end/test_service_impl.h @@ -18,6 +18,7 @@ #ifndef GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H #define GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H +#include #include #include @@ -46,6 +47,35 @@ typedef enum { CANCEL_AFTER_PROCESSING } ServerTryCancelRequestPhase; +class TestServiceSignaller { + public: + void ClientWaitRpcStarted() { + std::unique_lock lock(mu_); + cv_rpc_started_.wait(lock, [this] { return rpc_started_; }); + } + void ServerWaitToContinue() { + std::unique_lock lock(mu_); + cv_server_continue_.wait(lock, [this] { return server_should_continue_; }); + } + void SignalClientRpcStarted() { + std::unique_lock lock(mu_); + rpc_started_ = true; + cv_rpc_started_.notify_one(); + } + void SignalServerToContinue() { + std::unique_lock lock(mu_); + server_should_continue_ = true; + cv_server_continue_.notify_one(); + } + + private: + std::mutex mu_; + std::condition_variable cv_rpc_started_; + bool rpc_started_ /* GUARDED_BY(mu_) */ = false; + std::condition_variable cv_server_continue_; + bool server_should_continue_ /* GUARDED_BY(mu_) */ = false; +}; + class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { public: TestServiceImpl() : signal_client_(false), host_() {} @@ -76,10 +106,13 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { std::unique_lock lock(mu_); return signal_client_; } + void ClientWaitRpcStarted() { signaller_.ClientWaitRpcStarted(); } + void SignalServerToContinue() { signaller_.SignalServerToContinue(); } private: bool signal_client_; std::mutex mu_; + TestServiceSignaller signaller_; std::unique_ptr host_; }; @@ -114,10 +147,13 @@ class CallbackTestServiceImpl std::unique_lock lock(mu_); return signal_client_; } + void ClientWaitRpcStarted() { signaller_.ClientWaitRpcStarted(); } + void SignalServerToContinue() { signaller_.SignalServerToContinue(); } private: bool signal_client_; std::mutex mu_; + TestServiceSignaller signaller_; std::unique_ptr host_; };