Merge pull request #4958 from sreecha/server_try_cancel_api

Implement a TryCancel() API on ServerContext to cancel the API from the server side
pull/5048/head
Yang Gao 9 years ago
commit e51db34255
  1. 13
      include/grpc++/impl/codegen/server_context.h
  2. 16
      src/cpp/server/server_context.cc
  3. 410
      test/cpp/end2end/async_end2end_test.cc
  4. 298
      test/cpp/end2end/end2end_test.cc
  5. 165
      test/cpp/end2end/test_service_impl.cc
  6. 17
      test/cpp/end2end/test_service_impl.h

@ -105,6 +105,19 @@ class ServerContext {
bool IsCancelled() const;
// Cancel the Call from the server. This is a best-effort API and depending on
// when it is called, the RPC may still appear successful to the client.
// For example, if TryCancel() is called on a separate thread, it might race
// with the server handler which might return success to the client before
// TryCancel() was even started by the thread.
//
// It is the caller's responsibility to prevent such races and ensure that if
// TryCancel() is called, the serverhandler must return Status::CANCELLED. The
// only exception is that if the serverhandler is already returning an error
// status code, it is ok to not return Status::CANCELLED even if TryCancel()
// was called.
void TryCancel() const;
const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata() {
return client_metadata_;
}

@ -33,14 +33,14 @@
#include <grpc++/server_context.h>
#include <grpc/compression.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc++/completion_queue.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/sync.h>
#include <grpc++/support/time.h>
#include <grpc/compression.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/channel/compress_filter.h"
#include "src/cpp/common/create_auth_context.h"
@ -173,6 +173,14 @@ void ServerContext::AddTrailingMetadata(const grpc::string& key,
trailing_metadata_.insert(std::make_pair(key, value));
}
void ServerContext::TryCancel() const {
grpc_call_error err = grpc_call_cancel_with_status(
call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", NULL);
if (err != GRPC_CALL_OK) {
gpr_log(GPR_ERROR, "TryCancel failed with: %d", err);
}
}
bool ServerContext::IsCancelled() const {
return completion_op_ && completion_op_->CheckCancelled(cq_);
}

@ -32,6 +32,7 @@
*/
#include <memory>
#include <thread>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
@ -104,7 +105,10 @@ class Verifier : public PollingCheckRegion {
expectations_[tag(i)] = expect_ok;
return *this;
}
void Verify(CompletionQueue* cq) {
void Verify(CompletionQueue* cq) { Verify(cq, false); }
void Verify(CompletionQueue* cq, bool ignore_ok) {
GPR_ASSERT(!expectations_.empty());
while (!expectations_.empty()) {
bool ok;
@ -122,7 +126,9 @@ class Verifier : public PollingCheckRegion {
}
auto it = expectations_.find(got_tag);
EXPECT_TRUE(it != expectations_.end());
EXPECT_EQ(it->second, ok);
if (!ignore_ok) {
EXPECT_EQ(it->second, ok);
}
expectations_.erase(it);
}
}
@ -217,7 +223,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
@ -270,7 +276,7 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
std::chrono::system_clock::time_point time_now(
@ -315,7 +321,7 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
send_request.set_message("Hello");
std::unique_ptr<ClientAsyncWriter<EchoRequest> > cli_stream(
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
@ -368,7 +374,7 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
send_request.set_message("Hello");
std::unique_ptr<ClientAsyncReader<EchoResponse> > cli_stream(
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
@ -418,7 +424,7 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
send_request.set_message("Hello");
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse> >
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
@ -476,7 +482,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
cli_ctx.AddMetadata(meta1.first, meta1.second);
cli_ctx.AddMetadata(meta2.first, meta2.second);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
@ -519,7 +525,7 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
@ -568,7 +574,7 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
@ -629,7 +635,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
cli_ctx.AddMetadata(meta1.first, meta1.second);
cli_ctx.AddMetadata(meta2.first, meta2.second);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
@ -690,7 +696,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
srv_ctx.AsyncNotifyWhenDone(tag(5));
@ -725,7 +731,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
srv_ctx.AsyncNotifyWhenDone(tag(5));
@ -759,7 +765,7 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
ClientContext cli_ctx;
send_request.set_message("Hello");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
response_reader->Finish(&recv_response, &recv_status, tag(4));
@ -769,8 +775,384 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
EXPECT_EQ("", recv_status.error_message());
}
// This class is for testing scenarios where RPCs are cancelled on the server
// by calling ServerContext::TryCancel()
class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
protected:
typedef enum {
DO_NOT_CANCEL = 0,
CANCEL_BEFORE_PROCESSING,
CANCEL_DURING_PROCESSING,
CANCEL_AFTER_PROCESSING
} ServerTryCancelRequestPhase;
void ServerTryCancel(ServerContext* context) {
EXPECT_FALSE(context->IsCancelled());
context->TryCancel();
gpr_log(GPR_INFO, "Server called TryCancel()");
EXPECT_TRUE(context->IsCancelled());
}
// Helper for testing client-streaming RPCs which are cancelled on the server.
// Depending on the value of server_try_cancel parameter, this will test one
// of the following three scenarios:
// CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
// any messages from the client
//
// CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
// messages from the client
//
// CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
// messages from the client (but before sending any status back to the
// client)
void TestClientStreamingServerCancel(
ServerTryCancelRequestPhase server_try_cancel) {
ResetStub();
EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
EchoResponse recv_response;
Status recv_status;
ClientContext cli_ctx;
ServerContext srv_ctx;
ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
// Initiate the 'RequestStream' call on client
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of 'RequestStream' calls
// and receive the 'RequestStream' call just made by the client
service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
// Client sends 3 messages (tags 3, 4 and 5)
for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
send_request.set_message("Ping " + std::to_string(tag_idx));
cli_stream->Write(send_request, tag(tag_idx));
Verifier(GetParam()).Expect(tag_idx, true).Verify(cq_.get());
}
cli_stream->WritesDone(tag(6));
Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
bool expected_server_cq_result = true;
bool ignore_cq_result = false;
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
ServerTryCancel(&srv_ctx);
// Since cancellation is done before server reads any results, we know
// for sure that all cq results will return false from this point forward
expected_server_cq_result = false;
}
std::thread* server_try_cancel_thd = NULL;
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd = new std::thread(
&AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx);
// Server will cancel the RPC in a parallel thread while reading the
// requests from the client. Since the cancellation can happen at anytime,
// some of the cq results (i.e those until cancellation) might be true but
// its non deterministic. So better to ignore the cq results
ignore_cq_result = true;
}
// Server reads 3 messages (tags 6, 7 and 8)
for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
srv_stream.Read(&recv_request, tag(tag_idx));
Verifier(GetParam())
.Expect(tag_idx, expected_server_cq_result)
.Verify(cq_.get(), ignore_cq_result);
}
if (server_try_cancel_thd != NULL) {
server_try_cancel_thd->join();
delete server_try_cancel_thd;
}
if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
ServerTryCancel(&srv_ctx);
}
// The RPC has been cancelled at this point for sure (i.e irrespective of
// the value of `server_try_cancel` is). So, from this point forward, we
// know that cq results are supposed to return false on server.
// Server sends the final message and cancelled status (but the RPC is
// already cancelled at this point. So we expect the operation to fail)
srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
// Client will see the cancellation
cli_stream->Finish(&recv_status, tag(10));
// TODO(sreek): The expectation here should be true. This is a bug (github
// issue #4972)
Verifier(GetParam()).Expect(10, false).Verify(cq_.get());
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
}
// Helper for testing server-streaming RPCs which are cancelled on the server.
// Depending on the value of server_try_cancel parameter, this will test one
// of the following three scenarios:
// CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before sending
// any messages to the client
//
// CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while sending
// messages to the client
//
// CANCEL_AFTER PROCESSING: Rpc is cancelled by server after sending all
// messages to the client (but before sending any status back to the
// client)
void TestServerStreamingServerCancel(
ServerTryCancelRequestPhase server_try_cancel) {
ResetStub();
EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
EchoResponse recv_response;
Status recv_status;
ClientContext cli_ctx;
ServerContext srv_ctx;
ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
send_request.set_message("Ping");
// Initiate the 'ResponseStream' call on the client
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of 'ResponseStream' calls and
// receive the call just made by the client
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
bool expected_cq_result = true;
bool ignore_cq_result = false;
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
ServerTryCancel(&srv_ctx);
// We know for sure that all cq results will be false from this point
// since the server cancelled the RPC
expected_cq_result = false;
}
std::thread* server_try_cancel_thd = NULL;
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd = new std::thread(
&AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx);
// Server will cancel the RPC in a parallel thread while writing responses
// to the client. Since the cancellation can happen at anytime, some of
// the cq results (i.e those until cancellation) might be true but it is
// non deterministic. So better to ignore the cq results
ignore_cq_result = true;
}
// Server sends three messages (tags 3, 4 and 5)
for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
send_response.set_message("Pong " + std::to_string(tag_idx));
srv_stream.Write(send_response, tag(tag_idx));
Verifier(GetParam())
.Expect(tag_idx, expected_cq_result)
.Verify(cq_.get(), ignore_cq_result);
}
if (server_try_cancel_thd != NULL) {
server_try_cancel_thd->join();
delete server_try_cancel_thd;
}
if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
ServerTryCancel(&srv_ctx);
}
// Client attemts to read the three messages from the server
for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
cli_stream->Read(&recv_response, tag(tag_idx));
Verifier(GetParam())
.Expect(tag_idx, expected_cq_result)
.Verify(cq_.get(), ignore_cq_result);
}
// The RPC has been cancelled at this point for sure (i.e irrespective of
// the value of `server_try_cancel` is). So, from this point forward, we
// know that cq results are supposed to return false on server.
// Server finishes the stream (but the RPC is already cancelled)
srv_stream.Finish(Status::CANCELLED, tag(9));
Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
// Client will see the cancellation
cli_stream->Finish(&recv_status, tag(10));
Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
}
// Helper for testing bidirectinal-streaming RPCs which are cancelled on the
// server.
//
// Depending on the value of server_try_cancel parameter, this will
// test one of the following three scenarios:
// CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
// writing any messages from/to the client
//
// CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
// messages from the client
//
// CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
// messages from the client (but before sending any status back to the
// client)
void TestBidiStreamingServerCancel(
ServerTryCancelRequestPhase server_try_cancel) {
ResetStub();
EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
EchoResponse recv_response;
Status recv_status;
ClientContext cli_ctx;
ServerContext srv_ctx;
ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
// Initiate the call from the client side
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of the 'BidiStream' call and
// receive the call just made by the client
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
// Client sends the first and the only message
send_request.set_message("Ping");
cli_stream->Write(send_request, tag(3));
Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
bool expected_cq_result = true;
bool ignore_cq_result = false;
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
ServerTryCancel(&srv_ctx);
// We know for sure that all cq results will be false from this point
// since the server cancelled the RPC
expected_cq_result = false;
}
std::thread* server_try_cancel_thd = NULL;
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd = new std::thread(
&AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx);
// Since server is going to cancel the RPC in a parallel thread, some of
// the cq results (i.e those until the cancellation) might be true. Since
// that number is non-deterministic, it is better to ignore the cq results
ignore_cq_result = true;
}
srv_stream.Read(&recv_request, tag(4));
Verifier(GetParam())
.Expect(4, expected_cq_result)
.Verify(cq_.get(), ignore_cq_result);
send_response.set_message("Pong");
srv_stream.Write(send_response, tag(5));
Verifier(GetParam())
.Expect(5, expected_cq_result)
.Verify(cq_.get(), ignore_cq_result);
cli_stream->Read(&recv_response, tag(6));
Verifier(GetParam())
.Expect(6, expected_cq_result)
.Verify(cq_.get(), ignore_cq_result);
// This is expected to succeed in all cases
cli_stream->WritesDone(tag(7));
Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
// This is expected to fail in all cases i.e for all values of
// server_try_cancel. This is becasue at this point, either there are no
// more msgs from the client (because client called WritesDone) or the RPC
// is cancelled on the server
srv_stream.Read(&recv_request, tag(8));
Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
if (server_try_cancel_thd != NULL) {
server_try_cancel_thd->join();
delete server_try_cancel_thd;
}
if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
ServerTryCancel(&srv_ctx);
}
// The RPC has been cancelled at this point for sure (i.e irrespective of
// the value of `server_try_cancel` is). So, from this point forward, we
// know that cq results are supposed to return false on server.
srv_stream.Finish(Status::CANCELLED, tag(9));
Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(10));
Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
}
};
TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelBefore) {
TestClientStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
}
TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelDuring) {
TestClientStreamingServerCancel(CANCEL_DURING_PROCESSING);
}
TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelAfter) {
TestClientStreamingServerCancel(CANCEL_AFTER_PROCESSING);
}
TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelBefore) {
TestServerStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
}
TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelDuring) {
TestServerStreamingServerCancel(CANCEL_DURING_PROCESSING);
}
TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelAfter) {
TestServerStreamingServerCancel(CANCEL_AFTER_PROCESSING);
}
TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelBefore) {
TestBidiStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
}
TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelDuring) {
TestBidiStreamingServerCancel(CANCEL_DURING_PROCESSING);
}
TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
}
INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
::testing::Values(false, true));
INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel,
AsyncEnd2endServerTryCancelTest,
::testing::Values(false));
} // namespace
} // namespace testing

@ -306,6 +306,301 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
}
}
// This class is for testing scenarios where RPCs are cancelled on the server
// by calling ServerContext::TryCancel()
class End2endServerTryCancelTest : public End2endTest {
protected:
// Helper for testing client-streaming RPCs which are cancelled on the server.
// Depending on the value of server_try_cancel parameter, this will test one
// of the following three scenarios:
// CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
// any messages from the client
//
// CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
// messages from the client
//
// CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
// the messages from the client
//
// NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
void TestRequestStreamServerCancel(
ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
// Send server_try_cancel value in the client metadata
context.AddMetadata(kServerTryCancelRequest,
std::to_string(server_try_cancel));
auto stream = stub_->RequestStream(&context, &response);
int num_msgs_sent = 0;
while (num_msgs_sent < num_msgs_to_send) {
request.set_message("hello");
if (!stream->Write(request)) {
break;
}
num_msgs_sent++;
}
gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
stream->WritesDone();
Status s = stream->Finish();
// At this point, we know for sure that RPC was cancelled by the server
// since we passed server_try_cancel value in the metadata. Depending on the
// value of server_try_cancel, the RPC might have been cancelled by the
// server at different stages. The following validates our expectations of
// number of messages sent in various cancellation scenarios:
switch (server_try_cancel) {
case CANCEL_BEFORE_PROCESSING:
case CANCEL_DURING_PROCESSING:
// If the RPC is cancelled by server before / during messages from the
// client, it means that the client most likely did not get a chance to
// send all the messages it wanted to send. i.e num_msgs_sent <=
// num_msgs_to_send
EXPECT_LE(num_msgs_sent, num_msgs_to_send);
break;
case CANCEL_AFTER_PROCESSING:
// If the RPC was cancelled after all messages were read by the server,
// the client did get a chance to send all its messages
EXPECT_EQ(num_msgs_sent, num_msgs_to_send);
break;
default:
gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
server_try_cancel);
EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
server_try_cancel <= CANCEL_AFTER_PROCESSING);
break;
}
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
}
// Helper for testing server-streaming RPCs which are cancelled on the server.
// Depending on the value of server_try_cancel parameter, this will test one
// of the following three scenarios:
// CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before writing
// any messages to the client
//
// CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while writing
// messages to the client
//
// CANCEL_AFTER PROCESSING: Rpc is cancelled by server after writing all
// the messages to the client
//
// NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
void TestResponseStreamServerCancel(
ServerTryCancelRequestPhase server_try_cancel) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
// Send server_try_cancel in the client metadata
context.AddMetadata(kServerTryCancelRequest,
std::to_string(server_try_cancel));
request.set_message("hello");
auto stream = stub_->ResponseStream(&context, request);
int num_msgs_read = 0;
while (num_msgs_read < kNumResponseStreamsMsgs) {
if (!stream->Read(&response)) {
break;
}
EXPECT_EQ(response.message(),
request.message() + std::to_string(num_msgs_read));
num_msgs_read++;
}
gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
Status s = stream->Finish();
// Depending on the value of server_try_cancel, the RPC might have been
// cancelled by the server at different stages. The following validates our
// expectations of number of messages read in various cancellation
// scenarios:
switch (server_try_cancel) {
case CANCEL_BEFORE_PROCESSING:
// Server cancelled before sending any messages. Which means the client
// wouldn't have read any
EXPECT_EQ(num_msgs_read, 0);
break;
case CANCEL_DURING_PROCESSING:
// Server cancelled while writing messages. Client must have read less
// than or equal to the expected number of messages
EXPECT_LE(num_msgs_read, kNumResponseStreamsMsgs);
break;
case CANCEL_AFTER_PROCESSING:
// Server cancelled after writing all messages. Client must have read
// all messages
EXPECT_EQ(num_msgs_read, kNumResponseStreamsMsgs);
break;
default: {
gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
server_try_cancel);
EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
server_try_cancel <= CANCEL_AFTER_PROCESSING);
break;
}
}
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
}
// Helper for testing bidirectional-streaming RPCs which are cancelled on the
// server. Depending on the value of server_try_cancel parameter, this will
// test one of the following three scenarios:
// CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
// writing any messages from/to the client
//
// CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading/
// writing messages from/to the client
//
// CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading/writing
// all the messages from/to the client
//
// NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,
int num_messages) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
// Send server_try_cancel in the client metadata
context.AddMetadata(kServerTryCancelRequest,
std::to_string(server_try_cancel));
auto stream = stub_->BidiStream(&context);
int num_msgs_read = 0;
int num_msgs_sent = 0;
while (num_msgs_sent < num_messages) {
request.set_message("hello " + std::to_string(num_msgs_sent));
if (!stream->Write(request)) {
break;
}
num_msgs_sent++;
if (!stream->Read(&response)) {
break;
}
num_msgs_read++;
EXPECT_EQ(response.message(), request.message());
}
gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
stream->WritesDone();
Status s = stream->Finish();
// Depending on the value of server_try_cancel, the RPC might have been
// cancelled by the server at different stages. The following validates our
// expectations of number of messages read in various cancellation
// scenarios:
switch (server_try_cancel) {
case CANCEL_BEFORE_PROCESSING:
EXPECT_EQ(num_msgs_read, 0);
break;
case CANCEL_DURING_PROCESSING:
EXPECT_LE(num_msgs_sent, num_messages);
EXPECT_LE(num_msgs_read, num_msgs_sent);
break;
case CANCEL_AFTER_PROCESSING:
EXPECT_EQ(num_msgs_sent, num_messages);
EXPECT_EQ(num_msgs_read, num_msgs_sent);
break;
default:
gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
server_try_cancel);
EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
server_try_cancel <= CANCEL_AFTER_PROCESSING);
break;
}
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
}
};
TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
context.AddMetadata(kServerTryCancelRequest,
std::to_string(CANCEL_BEFORE_PROCESSING));
Status s = stub_->Echo(&context, request, &response);
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
}
// Server to cancel before doing reading the request
TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelBeforeReads) {
TestRequestStreamServerCancel(CANCEL_BEFORE_PROCESSING, 1);
}
// Server to cancel while reading a request from the stream in parallel
TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelDuringRead) {
TestRequestStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
}
// Server to cancel after reading all the requests but before returning to the
// client
TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelAfterReads) {
TestRequestStreamServerCancel(CANCEL_AFTER_PROCESSING, 4);
}
// Server to cancel before sending any response messages
TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelBefore) {
TestResponseStreamServerCancel(CANCEL_BEFORE_PROCESSING);
}
// Server to cancel while writing a response to the stream in parallel
TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelDuring) {
TestResponseStreamServerCancel(CANCEL_DURING_PROCESSING);
}
// Server to cancel after writing all the respones to the stream but before
// returning to the client
TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelAfter) {
TestResponseStreamServerCancel(CANCEL_AFTER_PROCESSING);
}
// Server to cancel before reading/writing any requests/responses on the stream
TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelBefore) {
TestBidiStreamServerCancel(CANCEL_BEFORE_PROCESSING, 2);
}
// Server to cancel while reading/writing requests/responses on the stream in
// parallel
TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelDuring) {
TestBidiStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
}
// Server to cancel after reading/writing all requests/responses on the stream
// but before returning to the client
TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) {
TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5);
}
TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) {
ResetStub();
std::vector<std::thread*> threads;
@ -1059,6 +1354,9 @@ INSTANTIATE_TEST_CASE_P(End2end, End2endTest,
::testing::Values(TestScenario(false, false),
TestScenario(false, true)));
INSTANTIATE_TEST_CASE_P(End2endServerTryCancel, End2endServerTryCancelTest,
::testing::Values(TestScenario(false, false)));
INSTANTIATE_TEST_CASE_P(ProxyEnd2end, ProxyEnd2endTest,
::testing::Values(TestScenario(false, false),
TestScenario(false, true),

@ -33,6 +33,8 @@
#include "test/cpp/end2end/test_service_impl.h"
#include <thread>
#include <grpc++/security/credentials.h>
#include <grpc++/server_context.h>
#include <grpc/grpc.h>
@ -82,6 +84,17 @@ void CheckServerAuthContext(const ServerContext* context,
Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) {
int server_try_cancel = GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
if (server_try_cancel > DO_NOT_CANCEL) {
// Since this is a unary RPC, by the time this server handler is called,
// the 'request' message is already read from the client. So the scenarios
// in server_try_cancel don't make much sense. Just cancel the RPC as long
// as server_try_cancel is not DO_NOT_CANCEL
ServerTryCancel(context);
return Status::CANCELLED;
}
response->set_message(request->message());
MaybeEchoDeadline(context, request, response);
if (host_) {
@ -143,18 +156,39 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
Status TestServiceImpl::RequestStream(ServerContext* context,
ServerReader<EchoRequest>* reader,
EchoResponse* response) {
// If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
// the server by calling ServerContext::TryCancel() depending on the value:
// CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads
// any message from the client
// CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
// reading messages from the client
// CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
// all the messages from the client
int server_try_cancel = GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
// If 'cancel_after_reads' is set in the metadata AND non-zero, the server
// will cancel the RPC (by just returning Status::CANCELLED - doesn't call
// ServerContext::TryCancel()) after reading the number of records specified
// by the 'cancel_after_reads' value set in the metadata.
int cancel_after_reads = GetIntValueFromMetadata(
kServerCancelAfterReads, context->client_metadata(), 0);
EchoRequest request;
response->set_message("");
int cancel_after_reads = 0;
const std::multimap<grpc::string_ref, grpc::string_ref>&
client_initial_metadata = context->client_metadata();
if (client_initial_metadata.find(kServerCancelAfterReads) !=
client_initial_metadata.end()) {
std::istringstream iss(ToString(
client_initial_metadata.find(kServerCancelAfterReads)->second));
iss >> cancel_after_reads;
gpr_log(GPR_INFO, "cancel_after_reads %d", cancel_after_reads);
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
ServerTryCancel(context);
return Status::CANCELLED;
}
std::thread* server_try_cancel_thd = NULL;
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
}
int num_msgs_read = 0;
while (reader->Read(&request)) {
if (cancel_after_reads == 1) {
gpr_log(GPR_INFO, "return cancel status");
@ -164,21 +198,65 @@ Status TestServiceImpl::RequestStream(ServerContext* context,
}
response->mutable_message()->append(request.message());
}
gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read);
if (server_try_cancel_thd != NULL) {
server_try_cancel_thd->join();
delete server_try_cancel_thd;
return Status::CANCELLED;
}
if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
ServerTryCancel(context);
return Status::CANCELLED;
}
return Status::OK;
}
// Return 3 messages.
// Return 'kNumResponseStreamMsgs' messages.
// TODO(yangg) make it generic by adding a parameter into EchoRequest
Status TestServiceImpl::ResponseStream(ServerContext* context,
const EchoRequest* request,
ServerWriter<EchoResponse>* writer) {
// If server_try_cancel is set in the metadata, the RPC is cancelled by the
// server by calling ServerContext::TryCancel() depending on the value:
// CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes
// any messages to the client
// CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
// writing messages to the client
// CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes
// all the messages to the client
int server_try_cancel = GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
ServerTryCancel(context);
return Status::CANCELLED;
}
EchoResponse response;
response.set_message(request->message() + "0");
writer->Write(response);
response.set_message(request->message() + "1");
writer->Write(response);
response.set_message(request->message() + "2");
writer->Write(response);
std::thread* server_try_cancel_thd = NULL;
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
}
for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
response.set_message(request->message() + std::to_string(i));
writer->Write(response);
}
if (server_try_cancel_thd != NULL) {
server_try_cancel_thd->join();
delete server_try_cancel_thd;
return Status::CANCELLED;
}
if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
ServerTryCancel(context);
return Status::CANCELLED;
}
return Status::OK;
}
@ -186,15 +264,70 @@ Status TestServiceImpl::ResponseStream(ServerContext* context,
Status TestServiceImpl::BidiStream(
ServerContext* context,
ServerReaderWriter<EchoResponse, EchoRequest>* stream) {
// If server_try_cancel is set in the metadata, the RPC is cancelled by the
// server by calling ServerContext::TryCancel() depending on the value:
// CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/
// writes any messages from/to the client
// CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
// reading/writing messages from/to the client
// CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server
// reads/writes all messages from/to the client
int server_try_cancel = GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
EchoRequest request;
EchoResponse response;
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
ServerTryCancel(context);
return Status::CANCELLED;
}
std::thread* server_try_cancel_thd = NULL;
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
}
while (stream->Read(&request)) {
gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
response.set_message(request.message());
stream->Write(response);
}
if (server_try_cancel_thd != NULL) {
server_try_cancel_thd->join();
delete server_try_cancel_thd;
return Status::CANCELLED;
}
if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
ServerTryCancel(context);
return Status::CANCELLED;
}
return Status::OK;
}
int TestServiceImpl::GetIntValueFromMetadata(
const char* key,
const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
int default_value) {
if (metadata.find(key) != metadata.end()) {
std::istringstream iss(ToString(metadata.find(key)->second));
iss >> default_value;
gpr_log(GPR_INFO, "%s : %d", key, default_value);
}
return default_value;
}
void TestServiceImpl::ServerTryCancel(ServerContext* context) {
EXPECT_FALSE(context->IsCancelled());
context->TryCancel();
gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
EXPECT_TRUE(context->IsCancelled());
}
} // namespace testing
} // namespace grpc

@ -44,7 +44,16 @@
namespace grpc {
namespace testing {
const int kNumResponseStreamsMsgs = 3;
const char* const kServerCancelAfterReads = "cancel_after_reads";
const char* const kServerTryCancelRequest = "server_try_cancel";
typedef enum {
DO_NOT_CANCEL = 0,
CANCEL_BEFORE_PROCESSING,
CANCEL_DURING_PROCESSING,
CANCEL_AFTER_PROCESSING
} ServerTryCancelRequestPhase;
class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
public:
@ -73,6 +82,14 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
return signal_client_;
}
private:
int GetIntValueFromMetadata(
const char* key,
const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
int default_value);
void ServerTryCancel(ServerContext* context);
private:
bool signal_client_;
std::mutex mu_;

Loading…
Cancel
Save