From 7fa9d6f4c83766f9efc7a187527ee3ac242ea842 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Thu, 28 Jan 2016 17:32:48 -0800 Subject: [PATCH 1/8] TryCancel() on ServerContext() --- include/grpc++/impl/codegen/server_context.h | 3 +++ src/cpp/server/server_context.cc | 16 ++++++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index 2af9fdaa34d..868b02882d7 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -105,6 +105,9 @@ class ServerContext { bool IsCancelled() const; + // Best-effort API to cancel the call from the server. + void TryCancel() const; + const std::multimap& client_metadata() { return client_metadata_; } diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 3732c1f090c..4a5ac5af926 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -33,14 +33,14 @@ #include -#include -#include -#include -#include #include #include #include #include +#include +#include +#include +#include #include "src/core/channel/compress_filter.h" #include "src/cpp/common/create_auth_context.h" @@ -173,6 +173,14 @@ void ServerContext::AddTrailingMetadata(const grpc::string& key, trailing_metadata_.insert(std::make_pair(key, value)); } +void ServerContext::TryCancel() const { + grpc_call_error err = grpc_call_cancel_with_status( + call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", NULL); + if (err != GRPC_CALL_OK) { + gpr_log(GPR_INFO, "TryCancel failed with: %d", err); + } +} + bool ServerContext::IsCancelled() const { return completion_op_ && completion_op_->CheckCancelled(cq_); } From 944f4cf14ece078f33d7c84d95521d55d646cb61 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Wed, 27 Jan 2016 14:37:26 -0800 Subject: [PATCH 2/8] Sync server end2end and async_end2end tests --- test/cpp/end2end/async_end2end_test.cc | 365 +++++++++++++++++++++++- test/cpp/end2end/end2end_test.cc | 368 +++++++++++++++++++++++-- 2 files changed, 703 insertions(+), 30 deletions(-) diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 0616cc07eeb..0179894ec09 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -32,6 +32,7 @@ */ #include +#include #include #include @@ -104,7 +105,10 @@ class Verifier : public PollingCheckRegion { expectations_[tag(i)] = expect_ok; return *this; } - void Verify(CompletionQueue* cq) { + + void Verify(CompletionQueue* cq) { Verify(cq, false); } + + void Verify(CompletionQueue* cq, bool ignore_ok) { GPR_ASSERT(!expectations_.empty()); while (!expectations_.empty()) { bool ok; @@ -122,7 +126,9 @@ class Verifier : public PollingCheckRegion { } auto it = expectations_.find(got_tag); EXPECT_TRUE(it != expectations_.end()); - EXPECT_EQ(it->second, ok); + if (!ignore_ok) { + EXPECT_EQ(it->second, ok); + } expectations_.erase(it); } } @@ -217,7 +223,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam { grpc::ServerAsyncResponseWriter response_writer(&srv_ctx); send_request.set_message("Hello"); - std::unique_ptr > response_reader( + std::unique_ptr> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), @@ -270,7 +276,7 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) { grpc::ServerAsyncResponseWriter response_writer(&srv_ctx); send_request.set_message("Hello"); - std::unique_ptr > response_reader( + std::unique_ptr> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); std::chrono::system_clock::time_point time_now( @@ -315,7 +321,7 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { ServerAsyncReader srv_stream(&srv_ctx); send_request.set_message("Hello"); - std::unique_ptr > cli_stream( + std::unique_ptr> cli_stream( stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), @@ -368,7 +374,7 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) { ServerAsyncWriter srv_stream(&srv_ctx); send_request.set_message("Hello"); - std::unique_ptr > cli_stream( + std::unique_ptr> cli_stream( stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, @@ -418,7 +424,7 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) { ServerAsyncReaderWriter srv_stream(&srv_ctx); send_request.set_message("Hello"); - std::unique_ptr > + std::unique_ptr> cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), @@ -476,7 +482,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { cli_ctx.AddMetadata(meta1.first, meta1.second); cli_ctx.AddMetadata(meta2.first, meta2.second); - std::unique_ptr > response_reader( + std::unique_ptr> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), @@ -519,7 +525,7 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) { std::pair meta1("key1", "val1"); std::pair meta2("key2", "val2"); - std::unique_ptr > response_reader( + std::unique_ptr> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), @@ -568,7 +574,7 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) { std::pair meta1("key1", "val1"); std::pair meta2("key2", "val2"); - std::unique_ptr > response_reader( + std::unique_ptr> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), @@ -629,7 +635,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) { cli_ctx.AddMetadata(meta1.first, meta1.second); cli_ctx.AddMetadata(meta2.first, meta2.second); - std::unique_ptr > response_reader( + std::unique_ptr> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), @@ -690,7 +696,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) { grpc::ServerAsyncResponseWriter response_writer(&srv_ctx); send_request.set_message("Hello"); - std::unique_ptr > response_reader( + std::unique_ptr> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); srv_ctx.AsyncNotifyWhenDone(tag(5)); @@ -725,7 +731,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) { grpc::ServerAsyncResponseWriter response_writer(&srv_ctx); send_request.set_message("Hello"); - std::unique_ptr > response_reader( + std::unique_ptr> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); srv_ctx.AsyncNotifyWhenDone(tag(5)); @@ -759,7 +765,7 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) { ClientContext cli_ctx; send_request.set_message("Hello"); - std::unique_ptr > response_reader( + std::unique_ptr> response_reader( stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get())); response_reader->Finish(&recv_response, &recv_status, tag(4)); @@ -769,8 +775,339 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) { EXPECT_EQ("", recv_status.error_message()); } +class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { + protected: + typedef enum { + DO_NOT_CANCEL = 0, + CANCEL_BEFORE_PROCESSING, + CANCEL_DURING_PROCESSING, + CANCEL_AFTER_PROCESSING + } ServerTryCancelRequestPhase; + + void ServerTryCancel(ServerContext* context) { + EXPECT_FALSE(context->IsCancelled()); + context->TryCancel(); + gpr_log(GPR_INFO, "Server called TryCancel()"); + EXPECT_TRUE(context->IsCancelled()); + } + + void TestClientStreamingServerCancel( + ServerTryCancelRequestPhase server_try_cancel) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + CompletionQueue cli_cq; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncReader srv_stream(&srv_ctx); + + // Initiate the 'RequestStream' call on client + std::unique_ptr> cli_stream( + stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq, tag(1))); + Verifier(GetParam()).Expect(1, true).Verify(&cli_cq); + + // On the server, request to be notified of 'RequestStream' calls + // and receive the 'RequestStream' call just made by the client + service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); + Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + + // Client sends 3 messages (tags 3, 4 and 5) + for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { + send_request.set_message("Ping " + std::to_string(tag_idx)); + cli_stream->Write(send_request, tag(tag_idx)); + Verifier(GetParam()).Expect(tag_idx, true).Verify(&cli_cq); + } + cli_stream->WritesDone(tag(6)); + Verifier(GetParam()).Expect(6, true).Verify(&cli_cq); + + bool expected_server_cq_result = true; + bool ignore_cq_result = false; + + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(&srv_ctx); + + // Since cancellation is done before server reads any results, we know + // for sure that all cq results will return false from this point forward + expected_server_cq_result = false; + } + + std::thread* server_try_cancel_thd = NULL; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = new std::thread( + &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); + // Server will cancel the RPC in a parallel thread while reading the + // requests from the client. Since the cancellation can happen at anytime, + // some of the cq results (i.e those until cancellation) might be true but + // its non deterministic. So better to ignore the cq results + ignore_cq_result = true; + } + + // Server reads 3 messages (tags 6, 7 and 8) + for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { + srv_stream.Read(&recv_request, tag(tag_idx)); + Verifier(GetParam()) + .Expect(tag_idx, expected_server_cq_result) + .Verify(cq_.get(), ignore_cq_result); + } + + if (server_try_cancel_thd != NULL) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(&srv_ctx); + } + + // Note: The RPC has been cancelled at this point for sure. So, from this + // point forward, we know that cq results are supposed to return false on + // server. + + send_response.set_message("Pong"); + srv_stream.Finish(send_response, Status::CANCELLED, tag(9)); + Verifier(GetParam()).Expect(9, false).Verify(cq_.get()); + + cli_stream->Finish(&recv_status, tag(10)); + // TODO: sreek: The expectation here should be true. This seems like a bug. + // Investigating + Verifier(GetParam()).Expect(10, false).Verify(&cli_cq); + EXPECT_FALSE(recv_status.ok()); + EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code()); + } + + void TestServerStreamingServerCancel( + ServerTryCancelRequestPhase server_try_cancel) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + CompletionQueue cli_cq; + ServerAsyncWriter srv_stream(&srv_ctx); + + send_request.set_message("Ping"); + // Initiate the 'ResponseStream' call on the client + std::unique_ptr> cli_stream( + stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq, tag(1))); + Verifier(GetParam()).Expect(1, true).Verify(&cli_cq); + + // On the server, request to be notified of 'ResponseStream' calls and + // receive the call just made by the client + service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + cq_.get(), cq_.get(), tag(2)); + Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + EXPECT_EQ(send_request.message(), recv_request.message()); + + bool expected_cq_result = true; + bool ignore_cq_result = false; + + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(&srv_ctx); + + // We know for sure that all cq results will be false from this point + // since the server cancelled the RPC + expected_cq_result = false; + } + + std::thread* server_try_cancel_thd = NULL; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = new std::thread( + &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); + + // Server will cancel the RPC in a parallel thread while writing responses + // to the client. Since the cancellation can happen at anytime, some of + // the cq results (i.e those until cancellation) might be true but + // its non deterministic. So better to ignore the cq results + ignore_cq_result = true; + } + + // Server sends three messages (tags 3, 4 and 5) + for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { + send_response.set_message("Pong " + std::to_string(tag_idx)); + srv_stream.Write(send_response, tag(tag_idx)); + Verifier(GetParam()) + .Expect(tag_idx, expected_cq_result) + .Verify(cq_.get(), ignore_cq_result); + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(&srv_ctx); + } + + // Client attemts to read the three messages + for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { + cli_stream->Read(&recv_response, tag(tag_idx)); + Verifier(GetParam()) + .Expect(tag_idx, expected_cq_result) + .Verify(&cli_cq, ignore_cq_result); + } + + if (server_try_cancel_thd != NULL) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + } + + // Note: At this point, we know that server has cancelled the request for + // sure. + + // Server finishes the stream + srv_stream.Finish(Status::CANCELLED, tag(9)); + Verifier(GetParam()).Expect(9, false).Verify(cq_.get()); + + // Client receives the cancellation + cli_stream->Finish(&recv_status, tag(10)); + Verifier(GetParam()).Expect(10, true).Verify(&cli_cq); + EXPECT_FALSE(recv_status.ok()); + EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code()); + } + + void TestBidiStreamingServerCancel( + ServerTryCancelRequestPhase server_try_cancel) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + CompletionQueue cli_cq; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncReaderWriter srv_stream(&srv_ctx); + + // Initiate the call from the client side + std::unique_ptr> + cli_stream(stub_->AsyncBidiStream(&cli_ctx, &cli_cq, tag(1))); + Verifier(GetParam()).Expect(1, true).Verify(&cli_cq); + + // On the server, request to be notified of the 'BidiStream' call and + // receive the call just made by the client + service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); + Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + + send_request.set_message("Ping"); + cli_stream->Write(send_request, tag(3)); + Verifier(GetParam()).Expect(3, true).Verify(&cli_cq); + + bool expected_cq_result = true; + bool ignore_cq_result = false; + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(&srv_ctx); + + // We know for sure that all cq results will be false from this point + // since the server cancelled the RPC + expected_cq_result = false; + } + + std::thread* server_try_cancel_thd = NULL; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = new std::thread( + &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); + + // Since server is going to cancel the RPC in a parallel thread, some of + // the cq results (i.e those until the cancellation) might be true. Since + // that number is non-deterministic, it is better to ignore the cq results + ignore_cq_result = true; + } + + srv_stream.Read(&recv_request, tag(4)); + Verifier(GetParam()) + .Expect(4, expected_cq_result) + .Verify(cq_.get(), ignore_cq_result); + + send_response.set_message("Pong"); + srv_stream.Write(send_response, tag(5)); + Verifier(GetParam()) + .Expect(5, expected_cq_result) + .Verify(cq_.get(), ignore_cq_result); + + cli_stream->Read(&recv_response, tag(6)); + Verifier(GetParam()) + .Expect(6, expected_cq_result) + .Verify(&cli_cq, ignore_cq_result); + + // This is expected to succeed in all cases + cli_stream->WritesDone(tag(7)); + Verifier(GetParam()).Expect(7, true).Verify(&cli_cq); + + // This is expected to fail in all cases (Either there are no more msgs from + // the client or the RPC is cancelled on the server) + srv_stream.Read(&recv_request, tag(8)); + Verifier(GetParam()).Expect(8, false).Verify(cq_.get()); + + if (server_try_cancel_thd != NULL) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(&srv_ctx); + } + + // At this point, we know that the server cancelled the request for sure + + srv_stream.Finish(Status::CANCELLED, tag(9)); + Verifier(GetParam()).Expect(9, false).Verify(cq_.get()); + + cli_stream->Finish(&recv_status, tag(10)); + Verifier(GetParam()).Expect(10, true).Verify(&cli_cq); + EXPECT_FALSE(recv_status.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code()); + } +}; + +TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelBefore) { + TestClientStreamingServerCancel(CANCEL_BEFORE_PROCESSING); +} + +TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelDuring) { + TestClientStreamingServerCancel(CANCEL_DURING_PROCESSING); +} + +TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelAfter) { + TestClientStreamingServerCancel(CANCEL_AFTER_PROCESSING); +} + +TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelBefore) { + TestServerStreamingServerCancel(CANCEL_BEFORE_PROCESSING); +} + +TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelDuring) { + TestServerStreamingServerCancel(CANCEL_DURING_PROCESSING); +} + +TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelAfter) { + TestServerStreamingServerCancel(CANCEL_AFTER_PROCESSING); +} + +TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelBefore) { + TestBidiStreamingServerCancel(CANCEL_BEFORE_PROCESSING); +} + +TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelDuring) { + TestBidiStreamingServerCancel(CANCEL_DURING_PROCESSING); +} + +TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) { + TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING); +} + INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest, ::testing::Values(false, true)); +INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel, + AsyncEnd2endServerTryCancelTest, + ::testing::Values(false)); } // namespace } // namespace testing diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 5a414ebc86c..48838901d27 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -65,6 +65,14 @@ namespace testing { namespace { const char* kServerCancelAfterReads = "cancel_after_reads"; +const char* kServerTryCancelRequest = "server_try_cancel"; +typedef enum { + DO_NOT_CANCEL = 0, + CANCEL_BEFORE_PROCESSING, + CANCEL_DURING_PROCESSING, + CANCEL_AFTER_PROCESSING +} ServerTryCancelRequestPhase; +const int kNumResponseStreamsMsgs = 3; // When echo_deadline is requested, deadline seen in the ServerContext is set in // the response in seconds. @@ -218,8 +226,37 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { explicit TestServiceImpl(const grpc::string& host) : signal_client_(false), host_(new grpc::string(host)) {} + int GetIntValueFromMetadata( + const char* key, + const std::multimap& metadata, + int default_value) { + if (metadata.find(key) != metadata.end()) { + std::istringstream iss(ToString(metadata.find(key)->second)); + iss >> default_value; + gpr_log(GPR_INFO, "%s : %d", key, default_value); + } + + return default_value; + } + + void ServerTryCancel(ServerContext* context) { + EXPECT_FALSE(context->IsCancelled()); + context->TryCancel(); + gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); + EXPECT_TRUE(context->IsCancelled()); + } + Status Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) GRPC_OVERRIDE { + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + if (server_try_cancel > DO_NOT_CANCEL) { + // For unary RPC, the actual value of server_try_cancel does not matter + // (as long as it is greater than DO_NOT_CANCEL) + ServerTryCancel(context); + return Status::CANCELLED; + } + response->set_message(request->message()); MaybeEchoDeadline(context, request, response); if (host_) { @@ -283,17 +320,25 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { EchoResponse* response) GRPC_OVERRIDE { EchoRequest request; response->set_message(""); - int cancel_after_reads = 0; - const std::multimap& - client_initial_metadata = context->client_metadata(); - if (client_initial_metadata.find(kServerCancelAfterReads) != - client_initial_metadata.end()) { - std::istringstream iss(ToString( - client_initial_metadata.find(kServerCancelAfterReads)->second)); - iss >> cancel_after_reads; - gpr_log(GPR_INFO, "cancel_after_reads %d", cancel_after_reads); + int cancel_after_reads = GetIntValueFromMetadata( + kServerCancelAfterReads, context->client_metadata(), 0); + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + + std::thread* server_try_cancel_thd = NULL; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = + new std::thread(&TestServiceImpl::ServerTryCancel, this, context); } + + int num_msgs_read = 0; while (reader->Read(&request)) { + num_msgs_read++; if (cancel_after_reads == 1) { gpr_log(GPR_INFO, "return cancel status"); return Status::CANCELLED; @@ -302,20 +347,56 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { } response->mutable_message()->append(request.message()); } + gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read); + + if (server_try_cancel_thd != NULL) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + return Status::CANCELLED; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + return Status::OK; } - // Return 3 messages. + // Return 'kNumResponseStreamMsgs' messages. // TODO(yangg) make it generic by adding a parameter into EchoRequest Status ResponseStream(ServerContext* context, const EchoRequest* request, ServerWriter* writer) GRPC_OVERRIDE { + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + EchoResponse response; - response.set_message(request->message() + "0"); - writer->Write(response); - response.set_message(request->message() + "1"); - writer->Write(response); - response.set_message(request->message() + "2"); - writer->Write(response); + std::thread* server_try_cancel_thd = NULL; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = + new std::thread(&TestServiceImpl::ServerTryCancel, this, context); + } + + for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + response.set_message(request->message() + std::to_string(i)); + writer->Write(response); + } + + if (server_try_cancel_thd != NULL) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + return Status::CANCELLED; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } return Status::OK; } @@ -325,11 +406,38 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { GRPC_OVERRIDE { EchoRequest request; EchoResponse response; + + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + + std::thread* server_try_cancel_thd = NULL; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = + new std::thread(&TestServiceImpl::ServerTryCancel, this, context); + } + while (stream->Read(&request)) { gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); response.set_message(request.message()); stream->Write(response); } + + if (server_try_cancel_thd != NULL) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + return Status::CANCELLED; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + return Status::OK; } @@ -466,6 +574,231 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) { } } +// == Tests for cancelling RPC from server side == + +class End2endServerTryCancelTest : public End2endTest { + protected: + // Tests for Client streaming + void TestRequestStreamServerCancel( + ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + context.AddMetadata(kServerTryCancelRequest, + std::to_string(server_try_cancel)); + + auto stream = stub_->RequestStream(&context, &response); + int num_msgs_sent = 0; + while (num_msgs_sent < num_msgs_to_send) { + request.set_message("hello"); + if (!stream->Write(request)) { + break; + } + num_msgs_sent++; + } + gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent); + stream->WritesDone(); + Status s = stream->Finish(); + + switch (server_try_cancel) { + case CANCEL_BEFORE_PROCESSING: + case CANCEL_DURING_PROCESSING: + EXPECT_LE(num_msgs_sent, num_msgs_to_send); + break; + case CANCEL_AFTER_PROCESSING: + EXPECT_EQ(num_msgs_sent, num_msgs_to_send); + break; + default: + gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", + server_try_cancel); + EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL && + server_try_cancel <= CANCEL_AFTER_PROCESSING); + break; + } + + EXPECT_FALSE(s.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + } + + // Test for server streaming + void TestResponseStreamServerCancel( + ServerTryCancelRequestPhase server_try_cancel) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + context.AddMetadata(kServerTryCancelRequest, + std::to_string(server_try_cancel)); + request.set_message("hello"); + auto stream = stub_->ResponseStream(&context, request); + + int num_msgs_read = 0; + while (num_msgs_read < kNumResponseStreamsMsgs) { + if (!stream->Read(&response)) { + break; + } + EXPECT_EQ(response.message(), + request.message() + std::to_string(num_msgs_read)); + num_msgs_read++; + } + gpr_log(GPR_INFO, "Read %d messages", num_msgs_read); + + Status s = stream->Finish(); + + switch (server_try_cancel) { + case CANCEL_BEFORE_PROCESSING: { + EXPECT_EQ(num_msgs_read, 0); + break; + } + case CANCEL_DURING_PROCESSING: { + EXPECT_LE(num_msgs_read, kNumResponseStreamsMsgs); + break; + } + case CANCEL_AFTER_PROCESSING: { + EXPECT_EQ(num_msgs_read, kNumResponseStreamsMsgs); + } + default: { + gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", + server_try_cancel); + EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL && + server_try_cancel <= CANCEL_AFTER_PROCESSING); + break; + } + } + + EXPECT_FALSE(s.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + } + + void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel, + int num_messages) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + context.AddMetadata(kServerTryCancelRequest, + std::to_string(server_try_cancel)); + + auto stream = stub_->BidiStream(&context); + + int num_msgs_read = 0; + int num_msgs_sent = 0; + while (num_msgs_sent < num_messages) { + request.set_message("hello " + std::to_string(num_msgs_sent)); + if (!stream->Write(request)) { + break; + } + num_msgs_sent++; + + if (!stream->Read(&response)) { + break; + } + num_msgs_read++; + + EXPECT_EQ(response.message(), request.message()); + } + gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent); + gpr_log(GPR_INFO, "Read %d messages", num_msgs_read); + + stream->WritesDone(); + Status s = stream->Finish(); + + switch (server_try_cancel) { + case CANCEL_BEFORE_PROCESSING: { + EXPECT_EQ(num_msgs_read, 0); + break; + } + case CANCEL_DURING_PROCESSING: { + EXPECT_LE(num_msgs_sent, num_messages); + EXPECT_LE(num_msgs_read, num_msgs_sent); + break; + } + case CANCEL_AFTER_PROCESSING: { + EXPECT_EQ(num_msgs_sent, num_messages); + EXPECT_EQ(num_msgs_read, num_msgs_sent); + } + default: { + gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", + server_try_cancel); + EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL && + server_try_cancel <= CANCEL_AFTER_PROCESSING); + break; + } + } + + EXPECT_FALSE(s.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + } +}; + +TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + context.AddMetadata(kServerTryCancelRequest, + std::to_string(CANCEL_BEFORE_PROCESSING)); + Status s = stub_->Echo(&context, request, &response); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); +} + +// Server to cancel before doing reading the request +TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelBeforeReads) { + TestRequestStreamServerCancel(CANCEL_BEFORE_PROCESSING, 1); +} + +// Server to cancel while reading a request from the stream in parallel +TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelDuringRead) { + TestRequestStreamServerCancel(CANCEL_DURING_PROCESSING, 10); +} + +// Server to cancel after reading all the requests but before returning to the +// client +TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelAfterReads) { + TestRequestStreamServerCancel(CANCEL_AFTER_PROCESSING, 4); +} + +// Server to cancel before sending any response messages +TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelBefore) { + TestResponseStreamServerCancel(CANCEL_BEFORE_PROCESSING); +} + +// Server to cancel while writing a response to the stream in parallel +TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelDuring) { + TestResponseStreamServerCancel(CANCEL_DURING_PROCESSING); +} + +// Server to cancel after writing all the respones to the stream but before +// returning to the client +TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelAfter) { + TestResponseStreamServerCancel(CANCEL_AFTER_PROCESSING); +} + +// Server to cancel before reading/writing any requests/responses on the stream +TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelBefore) { + TestBidiStreamServerCancel(CANCEL_BEFORE_PROCESSING, 2); +} + +// Server to cancel while reading/writing requests/responses on the stream in +// parallel +TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelDuring) { + TestBidiStreamServerCancel(CANCEL_DURING_PROCESSING, 10); +} + +// Server to cancel after reading/writing all requests/responses on the stream +// but before returning to the client +TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) { + TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5); +} + +// ===== + TEST_P(End2endTest, RequestStreamOneRequest) { ResetStub(); EchoRequest request; @@ -1195,6 +1528,9 @@ INSTANTIATE_TEST_CASE_P(End2end, End2endTest, ::testing::Values(TestScenario(false, false), TestScenario(false, true))); +INSTANTIATE_TEST_CASE_P(End2endServerTryCancel, End2endServerTryCancelTest, + ::testing::Values(TestScenario(false, false))); + INSTANTIATE_TEST_CASE_P(ProxyEnd2end, ProxyEnd2endTest, ::testing::Values(TestScenario(false, false), TestScenario(false, true), From f25c6ba229d5ed18d16606e4b910232e774ed8a7 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Thu, 28 Jan 2016 18:04:03 -0800 Subject: [PATCH 3/8] Comments and format --- include/grpc++/impl/codegen/server_context.h | 10 +++++++++- test/cpp/end2end/end2end_test.cc | 2 -- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index 868b02882d7..d3086aef0a0 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -105,7 +105,15 @@ class ServerContext { bool IsCancelled() const; - // Best-effort API to cancel the call from the server. + // Cancel the Call from the server. This is a best-effort API and depending on + // when this is called, the Call may still appear successful to the client. + // For example, if called on a separate thread, it might race with the + // server handler which might return success to the client before TryCancel() + // was called. + // + // It is the caller's responsibility to prevent such races and ensure that the + // serverhandler returns Status::CANCELLED if TryCancel() is called (unless + // the serverhandler is already returning an error code) void TryCancel() const; const std::multimap& client_metadata() { diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 48838901d27..5fae056b63b 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -797,8 +797,6 @@ TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) { TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5); } -// ===== - TEST_P(End2endTest, RequestStreamOneRequest) { ResetStub(); EchoRequest request; From 4fb590852fc3ed0203a11dbfde3781b8489742ca Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Fri, 29 Jan 2016 11:16:24 -0800 Subject: [PATCH 4/8] Use the same completion queue for both client and server --- test/cpp/end2end/async_end2end_test.cc | 33 ++++++++++++-------------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 0179894ec09..8800a873450 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -801,15 +801,14 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { EchoResponse recv_response; Status recv_status; - CompletionQueue cli_cq; ClientContext cli_ctx; ServerContext srv_ctx; ServerAsyncReader srv_stream(&srv_ctx); // Initiate the 'RequestStream' call on client std::unique_ptr> cli_stream( - stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq, tag(1))); - Verifier(GetParam()).Expect(1, true).Verify(&cli_cq); + stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); + Verifier(GetParam()).Expect(1, true).Verify(cq_.get()); // On the server, request to be notified of 'RequestStream' calls // and receive the 'RequestStream' call just made by the client @@ -821,10 +820,10 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { send_request.set_message("Ping " + std::to_string(tag_idx)); cli_stream->Write(send_request, tag(tag_idx)); - Verifier(GetParam()).Expect(tag_idx, true).Verify(&cli_cq); + Verifier(GetParam()).Expect(tag_idx, true).Verify(cq_.get()); } cli_stream->WritesDone(tag(6)); - Verifier(GetParam()).Expect(6, true).Verify(&cli_cq); + Verifier(GetParam()).Expect(6, true).Verify(cq_.get()); bool expected_server_cq_result = true; bool ignore_cq_result = false; @@ -876,7 +875,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { cli_stream->Finish(&recv_status, tag(10)); // TODO: sreek: The expectation here should be true. This seems like a bug. // Investigating - Verifier(GetParam()).Expect(10, false).Verify(&cli_cq); + Verifier(GetParam()).Expect(10, false).Verify(cq_.get()); EXPECT_FALSE(recv_status.ok()); EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code()); } @@ -892,14 +891,13 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { Status recv_status; ClientContext cli_ctx; ServerContext srv_ctx; - CompletionQueue cli_cq; ServerAsyncWriter srv_stream(&srv_ctx); send_request.set_message("Ping"); // Initiate the 'ResponseStream' call on the client std::unique_ptr> cli_stream( - stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq, tag(1))); - Verifier(GetParam()).Expect(1, true).Verify(&cli_cq); + stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); + Verifier(GetParam()).Expect(1, true).Verify(cq_.get()); // On the server, request to be notified of 'ResponseStream' calls and // receive the call just made by the client @@ -949,7 +947,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { cli_stream->Read(&recv_response, tag(tag_idx)); Verifier(GetParam()) .Expect(tag_idx, expected_cq_result) - .Verify(&cli_cq, ignore_cq_result); + .Verify(cq_.get(), ignore_cq_result); } if (server_try_cancel_thd != NULL) { @@ -966,7 +964,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Client receives the cancellation cli_stream->Finish(&recv_status, tag(10)); - Verifier(GetParam()).Expect(10, true).Verify(&cli_cq); + Verifier(GetParam()).Expect(10, true).Verify(cq_.get()); EXPECT_FALSE(recv_status.ok()); EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code()); } @@ -980,15 +978,14 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { EchoResponse send_response; EchoResponse recv_response; Status recv_status; - CompletionQueue cli_cq; ClientContext cli_ctx; ServerContext srv_ctx; ServerAsyncReaderWriter srv_stream(&srv_ctx); // Initiate the call from the client side std::unique_ptr> - cli_stream(stub_->AsyncBidiStream(&cli_ctx, &cli_cq, tag(1))); - Verifier(GetParam()).Expect(1, true).Verify(&cli_cq); + cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); + Verifier(GetParam()).Expect(1, true).Verify(cq_.get()); // On the server, request to be notified of the 'BidiStream' call and // receive the call just made by the client @@ -998,7 +995,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { send_request.set_message("Ping"); cli_stream->Write(send_request, tag(3)); - Verifier(GetParam()).Expect(3, true).Verify(&cli_cq); + Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); bool expected_cq_result = true; bool ignore_cq_result = false; @@ -1035,11 +1032,11 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { cli_stream->Read(&recv_response, tag(6)); Verifier(GetParam()) .Expect(6, expected_cq_result) - .Verify(&cli_cq, ignore_cq_result); + .Verify(cq_.get(), ignore_cq_result); // This is expected to succeed in all cases cli_stream->WritesDone(tag(7)); - Verifier(GetParam()).Expect(7, true).Verify(&cli_cq); + Verifier(GetParam()).Expect(7, true).Verify(cq_.get()); // This is expected to fail in all cases (Either there are no more msgs from // the client or the RPC is cancelled on the server) @@ -1061,7 +1058,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { Verifier(GetParam()).Expect(9, false).Verify(cq_.get()); cli_stream->Finish(&recv_status, tag(10)); - Verifier(GetParam()).Expect(10, true).Verify(&cli_cq); + Verifier(GetParam()).Expect(10, true).Verify(cq_.get()); EXPECT_FALSE(recv_status.ok()); EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code()); } From 0f242acb9ddb04100cd234ec986e2092cc088d7f Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Fri, 29 Jan 2016 18:12:19 -0800 Subject: [PATCH 5/8] Comments and a minor fix --- test/cpp/end2end/async_end2end_test.cc | 91 ++++++++++---- test/cpp/end2end/end2end_test.cc | 158 ++++++++++++++++++++----- 2 files changed, 201 insertions(+), 48 deletions(-) diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 8800a873450..0deb2eff95b 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -775,6 +775,8 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) { EXPECT_EQ("", recv_status.error_message()); } +// This class is for testing scenarios where RPCs are cancelled on the server +// by calling ServerContext::TryCancel() class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { protected: typedef enum { @@ -791,6 +793,18 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { EXPECT_TRUE(context->IsCancelled()); } + // Helper for testing client-streaming RPCs which are cancelled on the server. + // Depending on the value of server_try_cancel parameter, this will test one + // of the following three scenarios: + // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading + // any messages from the client + // + // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading + // messages from the client + // + // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all + // messages from the client (but before sending any status back to the + // client) void TestClientStreamingServerCancel( ServerTryCancelRequestPhase server_try_cancel) { ResetStub(); @@ -864,22 +878,37 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { ServerTryCancel(&srv_ctx); } - // Note: The RPC has been cancelled at this point for sure. So, from this - // point forward, we know that cq results are supposed to return false on - // server. + // The RPC has been cancelled at this point for sure (i.e irrespective of + // the value of `server_try_cancel` is). So, from this point forward, we + // know that cq results are supposed to return false on server. + // Server sends the final message and cancelled status (but the RPC is + // already cancelled at this point. So we expect the operation to fail) send_response.set_message("Pong"); srv_stream.Finish(send_response, Status::CANCELLED, tag(9)); Verifier(GetParam()).Expect(9, false).Verify(cq_.get()); + // Client will see the cancellation cli_stream->Finish(&recv_status, tag(10)); - // TODO: sreek: The expectation here should be true. This seems like a bug. - // Investigating + // TODO: sreek: The expectation here should be true. This is a bug (github + // issue #4972) Verifier(GetParam()).Expect(10, false).Verify(cq_.get()); EXPECT_FALSE(recv_status.ok()); EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code()); } + // Helper for testing server-streaming RPCs which are cancelled on the server. + // Depending on the value of server_try_cancel parameter, this will test one + // of the following three scenarios: + // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before sending + // any messages to the client + // + // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while sending + // messages to the client + // + // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after sending all + // messages to the client (but before sending any status back to the + // client) void TestServerStreamingServerCancel( ServerTryCancelRequestPhase server_try_cancel) { ResetStub(); @@ -898,7 +927,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { std::unique_ptr> cli_stream( stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); Verifier(GetParam()).Expect(1, true).Verify(cq_.get()); - // On the server, request to be notified of 'ResponseStream' calls and // receive the call just made by the client service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, @@ -924,8 +952,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Server will cancel the RPC in a parallel thread while writing responses // to the client. Since the cancellation can happen at anytime, some of - // the cq results (i.e those until cancellation) might be true but - // its non deterministic. So better to ignore the cq results + // the cq results (i.e those until cancellation) might be true but it is + // non deterministic. So better to ignore the cq results ignore_cq_result = true; } @@ -938,11 +966,16 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { .Verify(cq_.get(), ignore_cq_result); } + if (server_try_cancel_thd != NULL) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + } + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { ServerTryCancel(&srv_ctx); } - // Client attemts to read the three messages + // Client attemts to read the three messages from the server for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { cli_stream->Read(&recv_response, tag(tag_idx)); Verifier(GetParam()) @@ -950,25 +983,35 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { .Verify(cq_.get(), ignore_cq_result); } - if (server_try_cancel_thd != NULL) { - server_try_cancel_thd->join(); - delete server_try_cancel_thd; - } - - // Note: At this point, we know that server has cancelled the request for - // sure. + // The RPC has been cancelled at this point for sure (i.e irrespective of + // the value of `server_try_cancel` is). So, from this point forward, we + // know that cq results are supposed to return false on server. - // Server finishes the stream + // Server finishes the stream (but the RPC is already cancelled) srv_stream.Finish(Status::CANCELLED, tag(9)); Verifier(GetParam()).Expect(9, false).Verify(cq_.get()); - // Client receives the cancellation + // Client will see the cancellation cli_stream->Finish(&recv_status, tag(10)); Verifier(GetParam()).Expect(10, true).Verify(cq_.get()); EXPECT_FALSE(recv_status.ok()); EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code()); } + // Helper for testing bidirectinal-streaming RPCs which are cancelled on the + // server. + // + // Depending on the value of server_try_cancel parameter, this will + // test one of the following three scenarios: + // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/ + // writing any messages from/to the client + // + // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading + // messages from the client + // + // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all + // messages from the client (but before sending any status back to the + // client) void TestBidiStreamingServerCancel( ServerTryCancelRequestPhase server_try_cancel) { ResetStub(); @@ -993,12 +1036,14 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { tag(2)); Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); + // Client sends the first and the only message send_request.set_message("Ping"); cli_stream->Write(send_request, tag(3)); Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); bool expected_cq_result = true; bool ignore_cq_result = false; + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { ServerTryCancel(&srv_ctx); @@ -1038,8 +1083,10 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { cli_stream->WritesDone(tag(7)); Verifier(GetParam()).Expect(7, true).Verify(cq_.get()); - // This is expected to fail in all cases (Either there are no more msgs from - // the client or the RPC is cancelled on the server) + // This is expected to fail in all cases i.e for all values of + // server_try_cancel. This is becasue at this point, either there are no + // more msgs from the client (because client called WritesDone) or the RPC + // is cancelled on the server srv_stream.Read(&recv_request, tag(8)); Verifier(GetParam()).Expect(8, false).Verify(cq_.get()); @@ -1052,7 +1099,9 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { ServerTryCancel(&srv_ctx); } - // At this point, we know that the server cancelled the request for sure + // The RPC has been cancelled at this point for sure (i.e irrespective of + // the value of `server_try_cancel` is). So, from this point forward, we + // know that cq results are supposed to return false on server. srv_stream.Finish(Status::CANCELLED, tag(9)); Verifier(GetParam()).Expect(9, false).Verify(cq_.get()); diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 5fae056b63b..107e46f4381 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -251,8 +251,10 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { int server_try_cancel = GetIntValueFromMetadata( kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); if (server_try_cancel > DO_NOT_CANCEL) { - // For unary RPC, the actual value of server_try_cancel does not matter - // (as long as it is greater than DO_NOT_CANCEL) + // Since this is a unary RPC, by the time this server handler is called, + // the 'request' message is already read from the client. So the scenarios + // in server_try_cancel don't make much sense. Just cancel the RPC as long + // as server_try_cancel is not DO_NOT_CANCEL ServerTryCancel(context); return Status::CANCELLED; } @@ -318,13 +320,27 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { Status RequestStream(ServerContext* context, ServerReader* reader, EchoResponse* response) GRPC_OVERRIDE { - EchoRequest request; - response->set_message(""); - int cancel_after_reads = GetIntValueFromMetadata( - kServerCancelAfterReads, context->client_metadata(), 0); + // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by + // the server by calling ServerContext::TryCancel() depending on the value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads + // any message from the client + // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is + // reading messages from the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads + // all the messages from the client int server_try_cancel = GetIntValueFromMetadata( kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + // If 'cancel_after_reads' is set in the metadata AND non-zero, the server + // will cancel the RPC (by just returning Status::CANCELLED - doesn't call + // ServerContext::TryCancel()) after reading the number of records specified + // by the 'cancel_after_reads' value set in the metadata. + int cancel_after_reads = GetIntValueFromMetadata( + kServerCancelAfterReads, context->client_metadata(), 0); + + EchoRequest request; + response->set_message(""); + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { ServerTryCancel(context); return Status::CANCELLED; @@ -367,6 +383,14 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { // TODO(yangg) make it generic by adding a parameter into EchoRequest Status ResponseStream(ServerContext* context, const EchoRequest* request, ServerWriter* writer) GRPC_OVERRIDE { + // If server_try_cancel is set in the metadata, the RPC is cancelled by the + // server by calling ServerContext::TryCancel() depending on the value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes + // any messages to the client + // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is + // writing messages to the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes + // all the messages to the client int server_try_cancel = GetIntValueFromMetadata( kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); @@ -404,12 +428,20 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { Status BidiStream(ServerContext* context, ServerReaderWriter* stream) GRPC_OVERRIDE { - EchoRequest request; - EchoResponse response; - + // If server_try_cancel is set in the metadata, the RPC is cancelled by the + // server by calling ServerContext::TryCancel() depending on the value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/ + // writes any messages from/to the client + // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is + // reading/writing messages from/to the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server + // reads/writes all messages from/to the client int server_try_cancel = GetIntValueFromMetadata( kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + EchoRequest request; + EchoResponse response; + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { ServerTryCancel(context); return Status::CANCELLED; @@ -574,11 +606,23 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) { } } -// == Tests for cancelling RPC from server side == - +// This class is for testing scenarios where RPCs are cancelled on the server +// by calling ServerContext::TryCancel() class End2endServerTryCancelTest : public End2endTest { protected: - // Tests for Client streaming + // Helper for testing client-streaming RPCs which are cancelled on the server. + // Depending on the value of server_try_cancel parameter, this will test one + // of the following three scenarios: + // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading + // any messages from the client + // + // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading + // messages from the client + // + // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all + // the messages from the client + // + // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. void TestRequestStreamServerCancel( ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) { ResetStub(); @@ -586,10 +630,12 @@ class End2endServerTryCancelTest : public End2endTest { EchoResponse response; ClientContext context; + // Send server_try_cancel value in the client metadata context.AddMetadata(kServerTryCancelRequest, std::to_string(server_try_cancel)); auto stream = stub_->RequestStream(&context, &response); + int num_msgs_sent = 0; while (num_msgs_sent < num_msgs_to_send) { request.set_message("hello"); @@ -599,17 +645,32 @@ class End2endServerTryCancelTest : public End2endTest { num_msgs_sent++; } gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent); + stream->WritesDone(); Status s = stream->Finish(); + // At this point, we know for sure that RPC was cancelled by the server + // since we passed server_try_cancel value in the metadata. Depending on the + // value of server_try_cancel, the RPC might have been cancelled by the + // server at different stages. The following validates our expectations of + // number of messages sent in various cancellation scenarios: + switch (server_try_cancel) { case CANCEL_BEFORE_PROCESSING: case CANCEL_DURING_PROCESSING: + // If the RPC is cancelled by server before / during messages from the + // client, it means that the client most likely did not get a chance to + // send all the messages it wanted to send. i.e num_msgs_sent <= + // num_msgs_to_send EXPECT_LE(num_msgs_sent, num_msgs_to_send); break; + case CANCEL_AFTER_PROCESSING: + // If the RPC was cancelled after all messages were read by the server, + // the client did get a chance to send all its messages EXPECT_EQ(num_msgs_sent, num_msgs_to_send); break; + default: gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", server_try_cancel); @@ -622,7 +683,19 @@ class End2endServerTryCancelTest : public End2endTest { EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); } - // Test for server streaming + // Helper for testing server-streaming RPCs which are cancelled on the server. + // Depending on the value of server_try_cancel parameter, this will test one + // of the following three scenarios: + // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before writing + // any messages to the client + // + // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while writing + // messages to the client + // + // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after writing all + // the messages to the client + // + // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. void TestResponseStreamServerCancel( ServerTryCancelRequestPhase server_try_cancel) { ResetStub(); @@ -630,8 +703,10 @@ class End2endServerTryCancelTest : public End2endTest { EchoResponse response; ClientContext context; + // Send server_try_cancel in the client metadata context.AddMetadata(kServerTryCancelRequest, std::to_string(server_try_cancel)); + request.set_message("hello"); auto stream = stub_->ResponseStream(&context, request); @@ -648,18 +723,29 @@ class End2endServerTryCancelTest : public End2endTest { Status s = stream->Finish(); + // Depending on the value of server_try_cancel, the RPC might have been + // cancelled by the server at different stages. The following validates our + // expectations of number of messages read in various cancellation + // scenarios: switch (server_try_cancel) { - case CANCEL_BEFORE_PROCESSING: { + case CANCEL_BEFORE_PROCESSING: + // Server cancelled before sending any messages. Which means the client + // wouldn't have read any EXPECT_EQ(num_msgs_read, 0); break; - } - case CANCEL_DURING_PROCESSING: { + + case CANCEL_DURING_PROCESSING: + // Server cancelled while writing messages. Client must have read less + // than or equal to the expected number of messages EXPECT_LE(num_msgs_read, kNumResponseStreamsMsgs); break; - } - case CANCEL_AFTER_PROCESSING: { + + case CANCEL_AFTER_PROCESSING: + // Server cancelled after writing all messages. Client must have read + // all messages EXPECT_EQ(num_msgs_read, kNumResponseStreamsMsgs); - } + break; + default: { gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", server_try_cancel); @@ -673,6 +759,19 @@ class End2endServerTryCancelTest : public End2endTest { EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); } + // Helper for testing bidirectional-streaming RPCs which are cancelled on the + // server. Depending on the value of server_try_cancel parameter, this will + // test one of the following three scenarios: + // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/ + // writing any messages from/to the client + // + // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading/ + // writing messages from/to the client + // + // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading/writing + // all the messages from/to the client + // + // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel, int num_messages) { ResetStub(); @@ -680,6 +779,7 @@ class End2endServerTryCancelTest : public End2endTest { EchoResponse response; ClientContext context; + // Send server_try_cancel in the client metadata context.AddMetadata(kServerTryCancelRequest, std::to_string(server_try_cancel)); @@ -707,27 +807,31 @@ class End2endServerTryCancelTest : public End2endTest { stream->WritesDone(); Status s = stream->Finish(); + // Depending on the value of server_try_cancel, the RPC might have been + // cancelled by the server at different stages. The following validates our + // expectations of number of messages read in various cancellation + // scenarios: switch (server_try_cancel) { - case CANCEL_BEFORE_PROCESSING: { + case CANCEL_BEFORE_PROCESSING: EXPECT_EQ(num_msgs_read, 0); break; - } - case CANCEL_DURING_PROCESSING: { + + case CANCEL_DURING_PROCESSING: EXPECT_LE(num_msgs_sent, num_messages); EXPECT_LE(num_msgs_read, num_msgs_sent); break; - } - case CANCEL_AFTER_PROCESSING: { + + case CANCEL_AFTER_PROCESSING: EXPECT_EQ(num_msgs_sent, num_messages); EXPECT_EQ(num_msgs_read, num_msgs_sent); - } - default: { + break; + + default: gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", server_try_cancel); EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL && server_try_cancel <= CANCEL_AFTER_PROCESSING); break; - } } EXPECT_FALSE(s.ok()); From 3075c810280b797b5c932c38e0382633cfe3774c Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Fri, 29 Jan 2016 18:25:22 -0800 Subject: [PATCH 6/8] Update comment on TryCancel() API --- include/grpc++/impl/codegen/server_context.h | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index d3086aef0a0..ad08b8210d6 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -106,14 +106,16 @@ class ServerContext { bool IsCancelled() const; // Cancel the Call from the server. This is a best-effort API and depending on - // when this is called, the Call may still appear successful to the client. - // For example, if called on a separate thread, it might race with the - // server handler which might return success to the client before TryCancel() - // was called. + // when it is called, the RPC may still appear successful to the client. + // For example, if TryCancel() is called on a separate thread, it might race + // with the server handler which might return success to the client before + // TryCancel() was even started by the thread. // - // It is the caller's responsibility to prevent such races and ensure that the - // serverhandler returns Status::CANCELLED if TryCancel() is called (unless - // the serverhandler is already returning an error code) + // It is the caller's responsibility to prevent such races and ensure that if + // TryCancel() is called, the serverhandler must return Status::CANCELLED. The + // only exception is that if the serverhandler is already returning an error + // status code, it is ok to not return Status::CANCELLED even if TryCancel() + // was called. void TryCancel() const; const std::multimap& client_metadata() { From 369a04ace686d2db7fff8a39473680566da6700a Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Mon, 1 Feb 2016 10:53:13 -0800 Subject: [PATCH 7/8] Address code review comments --- src/cpp/server/server_context.cc | 2 +- test/cpp/end2end/async_end2end_test.cc | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 4a5ac5af926..e205a1969b3 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -177,7 +177,7 @@ void ServerContext::TryCancel() const { grpc_call_error err = grpc_call_cancel_with_status( call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", NULL); if (err != GRPC_CALL_OK) { - gpr_log(GPR_INFO, "TryCancel failed with: %d", err); + gpr_log(GPR_ERROR, "TryCancel failed with: %d", err); } } diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 0deb2eff95b..252bda37988 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -884,13 +884,12 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Server sends the final message and cancelled status (but the RPC is // already cancelled at this point. So we expect the operation to fail) - send_response.set_message("Pong"); srv_stream.Finish(send_response, Status::CANCELLED, tag(9)); Verifier(GetParam()).Expect(9, false).Verify(cq_.get()); // Client will see the cancellation cli_stream->Finish(&recv_status, tag(10)); - // TODO: sreek: The expectation here should be true. This is a bug (github + // TODO(sreek): The expectation here should be true. This is a bug (github // issue #4972) Verifier(GetParam()).Expect(10, false).Verify(cq_.get()); EXPECT_FALSE(recv_status.ok()); From 4aadcb4ebf503b50cf55a91f2cdd113c6e0d92da Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Wed, 3 Feb 2016 09:44:27 -0800 Subject: [PATCH 8/8] Run tools/distrib/clang_format_code.sh --- test/cpp/end2end/test_service_impl.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index ac97cb435ca..66d11d0dfce 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -127,9 +127,10 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, if (request->has_param() && request->param().echo_metadata()) { const std::multimap& client_metadata = context->client_metadata(); - for (std::multimap::const_iterator - iter = client_metadata.begin(); - iter != client_metadata.end(); ++iter) { + for ( + std::multimap::const_iterator iter = + client_metadata.begin(); + iter != client_metadata.end(); ++iter) { context->AddTrailingMetadata(ToString(iter->first), ToString(iter->second)); }