|
|
|
@ -65,6 +65,14 @@ namespace testing { |
|
|
|
|
namespace { |
|
|
|
|
|
|
|
|
|
const char* kServerCancelAfterReads = "cancel_after_reads"; |
|
|
|
|
const char* kServerTryCancelRequest = "server_try_cancel"; |
|
|
|
|
typedef enum { |
|
|
|
|
DO_NOT_CANCEL = 0, |
|
|
|
|
CANCEL_BEFORE_PROCESSING, |
|
|
|
|
CANCEL_DURING_PROCESSING, |
|
|
|
|
CANCEL_AFTER_PROCESSING |
|
|
|
|
} ServerTryCancelRequestPhase; |
|
|
|
|
const int kNumResponseStreamsMsgs = 3; |
|
|
|
|
|
|
|
|
|
// When echo_deadline is requested, deadline seen in the ServerContext is set in
|
|
|
|
|
// the response in seconds.
|
|
|
|
@ -218,8 +226,37 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { |
|
|
|
|
explicit TestServiceImpl(const grpc::string& host) |
|
|
|
|
: signal_client_(false), host_(new grpc::string(host)) {} |
|
|
|
|
|
|
|
|
|
int GetIntValueFromMetadata( |
|
|
|
|
const char* key, |
|
|
|
|
const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, |
|
|
|
|
int default_value) { |
|
|
|
|
if (metadata.find(key) != metadata.end()) { |
|
|
|
|
std::istringstream iss(ToString(metadata.find(key)->second)); |
|
|
|
|
iss >> default_value; |
|
|
|
|
gpr_log(GPR_INFO, "%s : %d", key, default_value); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return default_value; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ServerTryCancel(ServerContext* context) { |
|
|
|
|
EXPECT_FALSE(context->IsCancelled()); |
|
|
|
|
context->TryCancel(); |
|
|
|
|
gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); |
|
|
|
|
EXPECT_TRUE(context->IsCancelled()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Status Echo(ServerContext* context, const EchoRequest* request, |
|
|
|
|
EchoResponse* response) GRPC_OVERRIDE { |
|
|
|
|
int server_try_cancel = GetIntValueFromMetadata( |
|
|
|
|
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); |
|
|
|
|
if (server_try_cancel > DO_NOT_CANCEL) { |
|
|
|
|
// For unary RPC, the actual value of server_try_cancel does not matter
|
|
|
|
|
// (as long as it is greater than DO_NOT_CANCEL)
|
|
|
|
|
ServerTryCancel(context); |
|
|
|
|
return Status::CANCELLED; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
response->set_message(request->message()); |
|
|
|
|
MaybeEchoDeadline(context, request, response); |
|
|
|
|
if (host_) { |
|
|
|
@ -283,17 +320,25 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { |
|
|
|
|
EchoResponse* response) GRPC_OVERRIDE { |
|
|
|
|
EchoRequest request; |
|
|
|
|
response->set_message(""); |
|
|
|
|
int cancel_after_reads = 0; |
|
|
|
|
const std::multimap<grpc::string_ref, grpc::string_ref>& |
|
|
|
|
client_initial_metadata = context->client_metadata(); |
|
|
|
|
if (client_initial_metadata.find(kServerCancelAfterReads) != |
|
|
|
|
client_initial_metadata.end()) { |
|
|
|
|
std::istringstream iss(ToString( |
|
|
|
|
client_initial_metadata.find(kServerCancelAfterReads)->second)); |
|
|
|
|
iss >> cancel_after_reads; |
|
|
|
|
gpr_log(GPR_INFO, "cancel_after_reads %d", cancel_after_reads); |
|
|
|
|
int cancel_after_reads = GetIntValueFromMetadata( |
|
|
|
|
kServerCancelAfterReads, context->client_metadata(), 0); |
|
|
|
|
int server_try_cancel = GetIntValueFromMetadata( |
|
|
|
|
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); |
|
|
|
|
|
|
|
|
|
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { |
|
|
|
|
ServerTryCancel(context); |
|
|
|
|
return Status::CANCELLED; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
std::thread* server_try_cancel_thd = NULL; |
|
|
|
|
if (server_try_cancel == CANCEL_DURING_PROCESSING) { |
|
|
|
|
server_try_cancel_thd = |
|
|
|
|
new std::thread(&TestServiceImpl::ServerTryCancel, this, context); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int num_msgs_read = 0; |
|
|
|
|
while (reader->Read(&request)) { |
|
|
|
|
num_msgs_read++; |
|
|
|
|
if (cancel_after_reads == 1) { |
|
|
|
|
gpr_log(GPR_INFO, "return cancel status"); |
|
|
|
|
return Status::CANCELLED; |
|
|
|
@ -302,20 +347,56 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { |
|
|
|
|
} |
|
|
|
|
response->mutable_message()->append(request.message()); |
|
|
|
|
} |
|
|
|
|
gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read); |
|
|
|
|
|
|
|
|
|
if (server_try_cancel_thd != NULL) { |
|
|
|
|
server_try_cancel_thd->join(); |
|
|
|
|
delete server_try_cancel_thd; |
|
|
|
|
return Status::CANCELLED; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (server_try_cancel == CANCEL_AFTER_PROCESSING) { |
|
|
|
|
ServerTryCancel(context); |
|
|
|
|
return Status::CANCELLED; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return Status::OK; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Return 3 messages.
|
|
|
|
|
// Return 'kNumResponseStreamMsgs' messages.
|
|
|
|
|
// TODO(yangg) make it generic by adding a parameter into EchoRequest
|
|
|
|
|
Status ResponseStream(ServerContext* context, const EchoRequest* request, |
|
|
|
|
ServerWriter<EchoResponse>* writer) GRPC_OVERRIDE { |
|
|
|
|
int server_try_cancel = GetIntValueFromMetadata( |
|
|
|
|
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); |
|
|
|
|
|
|
|
|
|
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { |
|
|
|
|
ServerTryCancel(context); |
|
|
|
|
return Status::CANCELLED; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
EchoResponse response; |
|
|
|
|
response.set_message(request->message() + "0"); |
|
|
|
|
writer->Write(response); |
|
|
|
|
response.set_message(request->message() + "1"); |
|
|
|
|
writer->Write(response); |
|
|
|
|
response.set_message(request->message() + "2"); |
|
|
|
|
std::thread* server_try_cancel_thd = NULL; |
|
|
|
|
if (server_try_cancel == CANCEL_DURING_PROCESSING) { |
|
|
|
|
server_try_cancel_thd = |
|
|
|
|
new std::thread(&TestServiceImpl::ServerTryCancel, this, context); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for (int i = 0; i < kNumResponseStreamsMsgs; i++) { |
|
|
|
|
response.set_message(request->message() + std::to_string(i)); |
|
|
|
|
writer->Write(response); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (server_try_cancel_thd != NULL) { |
|
|
|
|
server_try_cancel_thd->join(); |
|
|
|
|
delete server_try_cancel_thd; |
|
|
|
|
return Status::CANCELLED; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (server_try_cancel == CANCEL_AFTER_PROCESSING) { |
|
|
|
|
ServerTryCancel(context); |
|
|
|
|
return Status::CANCELLED; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return Status::OK; |
|
|
|
|
} |
|
|
|
@ -325,11 +406,38 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { |
|
|
|
|
GRPC_OVERRIDE { |
|
|
|
|
EchoRequest request; |
|
|
|
|
EchoResponse response; |
|
|
|
|
|
|
|
|
|
int server_try_cancel = GetIntValueFromMetadata( |
|
|
|
|
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); |
|
|
|
|
|
|
|
|
|
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { |
|
|
|
|
ServerTryCancel(context); |
|
|
|
|
return Status::CANCELLED; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
std::thread* server_try_cancel_thd = NULL; |
|
|
|
|
if (server_try_cancel == CANCEL_DURING_PROCESSING) { |
|
|
|
|
server_try_cancel_thd = |
|
|
|
|
new std::thread(&TestServiceImpl::ServerTryCancel, this, context); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
while (stream->Read(&request)) { |
|
|
|
|
gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); |
|
|
|
|
response.set_message(request.message()); |
|
|
|
|
stream->Write(response); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (server_try_cancel_thd != NULL) { |
|
|
|
|
server_try_cancel_thd->join(); |
|
|
|
|
delete server_try_cancel_thd; |
|
|
|
|
return Status::CANCELLED; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (server_try_cancel == CANCEL_AFTER_PROCESSING) { |
|
|
|
|
ServerTryCancel(context); |
|
|
|
|
return Status::CANCELLED; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return Status::OK; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -466,6 +574,231 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// == Tests for cancelling RPC from server side ==
|
|
|
|
|
|
|
|
|
|
class End2endServerTryCancelTest : public End2endTest { |
|
|
|
|
protected: |
|
|
|
|
// Tests for Client streaming
|
|
|
|
|
void TestRequestStreamServerCancel( |
|
|
|
|
ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) { |
|
|
|
|
ResetStub(); |
|
|
|
|
EchoRequest request; |
|
|
|
|
EchoResponse response; |
|
|
|
|
ClientContext context; |
|
|
|
|
|
|
|
|
|
context.AddMetadata(kServerTryCancelRequest, |
|
|
|
|
std::to_string(server_try_cancel)); |
|
|
|
|
|
|
|
|
|
auto stream = stub_->RequestStream(&context, &response); |
|
|
|
|
int num_msgs_sent = 0; |
|
|
|
|
while (num_msgs_sent < num_msgs_to_send) { |
|
|
|
|
request.set_message("hello"); |
|
|
|
|
if (!stream->Write(request)) { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
num_msgs_sent++; |
|
|
|
|
} |
|
|
|
|
gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent); |
|
|
|
|
stream->WritesDone(); |
|
|
|
|
Status s = stream->Finish(); |
|
|
|
|
|
|
|
|
|
switch (server_try_cancel) { |
|
|
|
|
case CANCEL_BEFORE_PROCESSING: |
|
|
|
|
case CANCEL_DURING_PROCESSING: |
|
|
|
|
EXPECT_LE(num_msgs_sent, num_msgs_to_send); |
|
|
|
|
break; |
|
|
|
|
case CANCEL_AFTER_PROCESSING: |
|
|
|
|
EXPECT_EQ(num_msgs_sent, num_msgs_to_send); |
|
|
|
|
break; |
|
|
|
|
default: |
|
|
|
|
gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", |
|
|
|
|
server_try_cancel); |
|
|
|
|
EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL && |
|
|
|
|
server_try_cancel <= CANCEL_AFTER_PROCESSING); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
EXPECT_FALSE(s.ok()); |
|
|
|
|
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Test for server streaming
|
|
|
|
|
void TestResponseStreamServerCancel( |
|
|
|
|
ServerTryCancelRequestPhase server_try_cancel) { |
|
|
|
|
ResetStub(); |
|
|
|
|
EchoRequest request; |
|
|
|
|
EchoResponse response; |
|
|
|
|
ClientContext context; |
|
|
|
|
|
|
|
|
|
context.AddMetadata(kServerTryCancelRequest, |
|
|
|
|
std::to_string(server_try_cancel)); |
|
|
|
|
request.set_message("hello"); |
|
|
|
|
auto stream = stub_->ResponseStream(&context, request); |
|
|
|
|
|
|
|
|
|
int num_msgs_read = 0; |
|
|
|
|
while (num_msgs_read < kNumResponseStreamsMsgs) { |
|
|
|
|
if (!stream->Read(&response)) { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
EXPECT_EQ(response.message(), |
|
|
|
|
request.message() + std::to_string(num_msgs_read)); |
|
|
|
|
num_msgs_read++; |
|
|
|
|
} |
|
|
|
|
gpr_log(GPR_INFO, "Read %d messages", num_msgs_read); |
|
|
|
|
|
|
|
|
|
Status s = stream->Finish(); |
|
|
|
|
|
|
|
|
|
switch (server_try_cancel) { |
|
|
|
|
case CANCEL_BEFORE_PROCESSING: { |
|
|
|
|
EXPECT_EQ(num_msgs_read, 0); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
case CANCEL_DURING_PROCESSING: { |
|
|
|
|
EXPECT_LE(num_msgs_read, kNumResponseStreamsMsgs); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
case CANCEL_AFTER_PROCESSING: { |
|
|
|
|
EXPECT_EQ(num_msgs_read, kNumResponseStreamsMsgs); |
|
|
|
|
} |
|
|
|
|
default: { |
|
|
|
|
gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", |
|
|
|
|
server_try_cancel); |
|
|
|
|
EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL && |
|
|
|
|
server_try_cancel <= CANCEL_AFTER_PROCESSING); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
EXPECT_FALSE(s.ok()); |
|
|
|
|
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel, |
|
|
|
|
int num_messages) { |
|
|
|
|
ResetStub(); |
|
|
|
|
EchoRequest request; |
|
|
|
|
EchoResponse response; |
|
|
|
|
ClientContext context; |
|
|
|
|
|
|
|
|
|
context.AddMetadata(kServerTryCancelRequest, |
|
|
|
|
std::to_string(server_try_cancel)); |
|
|
|
|
|
|
|
|
|
auto stream = stub_->BidiStream(&context); |
|
|
|
|
|
|
|
|
|
int num_msgs_read = 0; |
|
|
|
|
int num_msgs_sent = 0; |
|
|
|
|
while (num_msgs_sent < num_messages) { |
|
|
|
|
request.set_message("hello " + std::to_string(num_msgs_sent)); |
|
|
|
|
if (!stream->Write(request)) { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
num_msgs_sent++; |
|
|
|
|
|
|
|
|
|
if (!stream->Read(&response)) { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
num_msgs_read++; |
|
|
|
|
|
|
|
|
|
EXPECT_EQ(response.message(), request.message()); |
|
|
|
|
} |
|
|
|
|
gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent); |
|
|
|
|
gpr_log(GPR_INFO, "Read %d messages", num_msgs_read); |
|
|
|
|
|
|
|
|
|
stream->WritesDone(); |
|
|
|
|
Status s = stream->Finish(); |
|
|
|
|
|
|
|
|
|
switch (server_try_cancel) { |
|
|
|
|
case CANCEL_BEFORE_PROCESSING: { |
|
|
|
|
EXPECT_EQ(num_msgs_read, 0); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
case CANCEL_DURING_PROCESSING: { |
|
|
|
|
EXPECT_LE(num_msgs_sent, num_messages); |
|
|
|
|
EXPECT_LE(num_msgs_read, num_msgs_sent); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
case CANCEL_AFTER_PROCESSING: { |
|
|
|
|
EXPECT_EQ(num_msgs_sent, num_messages); |
|
|
|
|
EXPECT_EQ(num_msgs_read, num_msgs_sent); |
|
|
|
|
} |
|
|
|
|
default: { |
|
|
|
|
gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", |
|
|
|
|
server_try_cancel); |
|
|
|
|
EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL && |
|
|
|
|
server_try_cancel <= CANCEL_AFTER_PROCESSING); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
EXPECT_FALSE(s.ok()); |
|
|
|
|
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) { |
|
|
|
|
ResetStub(); |
|
|
|
|
EchoRequest request; |
|
|
|
|
EchoResponse response; |
|
|
|
|
ClientContext context; |
|
|
|
|
|
|
|
|
|
context.AddMetadata(kServerTryCancelRequest, |
|
|
|
|
std::to_string(CANCEL_BEFORE_PROCESSING)); |
|
|
|
|
Status s = stub_->Echo(&context, request, &response); |
|
|
|
|
EXPECT_FALSE(s.ok()); |
|
|
|
|
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Server to cancel before doing reading the request
|
|
|
|
|
TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelBeforeReads) { |
|
|
|
|
TestRequestStreamServerCancel(CANCEL_BEFORE_PROCESSING, 1); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Server to cancel while reading a request from the stream in parallel
|
|
|
|
|
TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelDuringRead) { |
|
|
|
|
TestRequestStreamServerCancel(CANCEL_DURING_PROCESSING, 10); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Server to cancel after reading all the requests but before returning to the
|
|
|
|
|
// client
|
|
|
|
|
TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelAfterReads) { |
|
|
|
|
TestRequestStreamServerCancel(CANCEL_AFTER_PROCESSING, 4); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Server to cancel before sending any response messages
|
|
|
|
|
TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelBefore) { |
|
|
|
|
TestResponseStreamServerCancel(CANCEL_BEFORE_PROCESSING); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Server to cancel while writing a response to the stream in parallel
|
|
|
|
|
TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelDuring) { |
|
|
|
|
TestResponseStreamServerCancel(CANCEL_DURING_PROCESSING); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Server to cancel after writing all the respones to the stream but before
|
|
|
|
|
// returning to the client
|
|
|
|
|
TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelAfter) { |
|
|
|
|
TestResponseStreamServerCancel(CANCEL_AFTER_PROCESSING); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Server to cancel before reading/writing any requests/responses on the stream
|
|
|
|
|
TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelBefore) { |
|
|
|
|
TestBidiStreamServerCancel(CANCEL_BEFORE_PROCESSING, 2); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Server to cancel while reading/writing requests/responses on the stream in
|
|
|
|
|
// parallel
|
|
|
|
|
TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelDuring) { |
|
|
|
|
TestBidiStreamServerCancel(CANCEL_DURING_PROCESSING, 10); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Server to cancel after reading/writing all requests/responses on the stream
|
|
|
|
|
// but before returning to the client
|
|
|
|
|
TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) { |
|
|
|
|
TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// =====
|
|
|
|
|
|
|
|
|
|
TEST_P(End2endTest, RequestStreamOneRequest) { |
|
|
|
|
ResetStub(); |
|
|
|
|
EchoRequest request; |
|
|
|
@ -1195,6 +1528,9 @@ INSTANTIATE_TEST_CASE_P(End2end, End2endTest, |
|
|
|
|
::testing::Values(TestScenario(false, false), |
|
|
|
|
TestScenario(false, true))); |
|
|
|
|
|
|
|
|
|
INSTANTIATE_TEST_CASE_P(End2endServerTryCancel, End2endServerTryCancelTest, |
|
|
|
|
::testing::Values(TestScenario(false, false))); |
|
|
|
|
|
|
|
|
|
INSTANTIATE_TEST_CASE_P(ProxyEnd2end, ProxyEnd2endTest, |
|
|
|
|
::testing::Values(TestScenario(false, false), |
|
|
|
|
TestScenario(false, true), |
|
|
|
|