diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index e2649c2826e..0d5db046df4 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -83,10 +83,30 @@ void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request, class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service { public: + TestServiceImpl() : signal_client_(false) {} + Status Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) GRPC_OVERRIDE { response->set_message(request->message()); MaybeEchoDeadline(context, request, response); + if (request->has_param() && request->param().client_cancel_after_us()) { + { + std::unique_lock lock(mu_); + signal_client_ = true; + } + while (!context->IsCancelled()) { + std::this_thread::sleep_for(std::chrono::microseconds( + request->param().client_cancel_after_us())); + } + return Status::Cancelled; + } else if (request->has_param() && + request->param().server_cancel_after_us()) { + std::this_thread::sleep_for( + std::chrono::microseconds(request->param().server_cancel_after_us())); + return Status::Cancelled; + } else { + EXPECT_FALSE(context->IsCancelled()); + } return Status::OK; } @@ -130,6 +150,15 @@ class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service { } return Status::OK; } + + bool signal_client() { + std::unique_lock lock(mu_); + return signal_client_; + } + + private: + bool signal_client_; + std::mutex mu_; }; class TestServiceImplDupPkg @@ -151,7 +180,8 @@ class End2endTest : public ::testing::Test { server_address_ << "localhost:" << port; // Setup server ServerBuilder builder; - builder.AddListeningPort(server_address_.str(), InsecureServerCredentials()); + builder.AddListeningPort(server_address_.str(), + InsecureServerCredentials()); builder.RegisterService(&service_); builder.RegisterService(&dup_pkg_service_); builder.SetThreadPool(&thread_pool_); @@ -423,6 +453,44 @@ TEST_F(End2endTest, BadCredentials) { EXPECT_EQ("Rpc sent on a lame channel.", s.details()); } +void CancelRpc(ClientContext* context, int delay_us, TestServiceImpl* service) { + std::this_thread::sleep_for(std::chrono::microseconds(delay_us)); + while (!service->signal_client()) { + } + context->TryCancel(); +} + +// Client cancels rpc after 10ms +TEST_F(End2endTest, ClientCancelsRpc) { + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + const int kCancelDelayUs = 10 * 1000; + request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs); + + ClientContext context; + std::thread cancel_thread(CancelRpc, &context, kCancelDelayUs, &service_); + Status s = stub_->Echo(&context, request, &response); + cancel_thread.join(); + EXPECT_EQ(StatusCode::CANCELLED, s.code()); + EXPECT_TRUE(s.details().empty()); +} + +// Server cancels rpc after 1ms +TEST_F(End2endTest, ServerCancelsRpc) { + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + request.mutable_param()->set_server_cancel_after_us(1000); + + ClientContext context; + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(StatusCode::CANCELLED, s.code()); + EXPECT_TRUE(s.details().empty()); +} + } // namespace testing } // namespace grpc diff --git a/test/cpp/util/messages.proto b/test/cpp/util/messages.proto index 9c27f6869ec..a79bce1f306 100644 --- a/test/cpp/util/messages.proto +++ b/test/cpp/util/messages.proto @@ -34,6 +34,8 @@ package grpc.cpp.test.util; message RequestParams { optional bool echo_deadline = 1; + optional int32 client_cancel_after_us = 2; + optional int32 server_cancel_after_us = 3; } message EchoRequest {