|
|
|
@ -251,8 +251,10 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { |
|
|
|
|
int server_try_cancel = GetIntValueFromMetadata( |
|
|
|
|
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); |
|
|
|
|
if (server_try_cancel > DO_NOT_CANCEL) { |
|
|
|
|
// For unary RPC, the actual value of server_try_cancel does not matter
|
|
|
|
|
// (as long as it is greater than DO_NOT_CANCEL)
|
|
|
|
|
// Since this is a unary RPC, by the time this server handler is called,
|
|
|
|
|
// the 'request' message is already read from the client. So the scenarios
|
|
|
|
|
// in server_try_cancel don't make much sense. Just cancel the RPC as long
|
|
|
|
|
// as server_try_cancel is not DO_NOT_CANCEL
|
|
|
|
|
ServerTryCancel(context); |
|
|
|
|
return Status::CANCELLED; |
|
|
|
|
} |
|
|
|
@ -318,13 +320,27 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { |
|
|
|
|
Status RequestStream(ServerContext* context, |
|
|
|
|
ServerReader<EchoRequest>* reader, |
|
|
|
|
EchoResponse* response) GRPC_OVERRIDE { |
|
|
|
|
EchoRequest request; |
|
|
|
|
response->set_message(""); |
|
|
|
|
int cancel_after_reads = GetIntValueFromMetadata( |
|
|
|
|
kServerCancelAfterReads, context->client_metadata(), 0); |
|
|
|
|
// If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
|
|
|
|
|
// the server by calling ServerContext::TryCancel() depending on the value:
|
|
|
|
|
// CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads
|
|
|
|
|
// any message from the client
|
|
|
|
|
// CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
|
|
|
|
|
// reading messages from the client
|
|
|
|
|
// CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
|
|
|
|
|
// all the messages from the client
|
|
|
|
|
int server_try_cancel = GetIntValueFromMetadata( |
|
|
|
|
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); |
|
|
|
|
|
|
|
|
|
// If 'cancel_after_reads' is set in the metadata AND non-zero, the server
|
|
|
|
|
// will cancel the RPC (by just returning Status::CANCELLED - doesn't call
|
|
|
|
|
// ServerContext::TryCancel()) after reading the number of records specified
|
|
|
|
|
// by the 'cancel_after_reads' value set in the metadata.
|
|
|
|
|
int cancel_after_reads = GetIntValueFromMetadata( |
|
|
|
|
kServerCancelAfterReads, context->client_metadata(), 0); |
|
|
|
|
|
|
|
|
|
EchoRequest request; |
|
|
|
|
response->set_message(""); |
|
|
|
|
|
|
|
|
|
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { |
|
|
|
|
ServerTryCancel(context); |
|
|
|
|
return Status::CANCELLED; |
|
|
|
@ -367,6 +383,14 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { |
|
|
|
|
// TODO(yangg) make it generic by adding a parameter into EchoRequest
|
|
|
|
|
Status ResponseStream(ServerContext* context, const EchoRequest* request, |
|
|
|
|
ServerWriter<EchoResponse>* writer) GRPC_OVERRIDE { |
|
|
|
|
// If server_try_cancel is set in the metadata, the RPC is cancelled by the
|
|
|
|
|
// server by calling ServerContext::TryCancel() depending on the value:
|
|
|
|
|
// CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes
|
|
|
|
|
// any messages to the client
|
|
|
|
|
// CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
|
|
|
|
|
// writing messages to the client
|
|
|
|
|
// CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes
|
|
|
|
|
// all the messages to the client
|
|
|
|
|
int server_try_cancel = GetIntValueFromMetadata( |
|
|
|
|
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); |
|
|
|
|
|
|
|
|
@ -404,12 +428,20 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { |
|
|
|
|
Status BidiStream(ServerContext* context, |
|
|
|
|
ServerReaderWriter<EchoResponse, EchoRequest>* stream) |
|
|
|
|
GRPC_OVERRIDE { |
|
|
|
|
EchoRequest request; |
|
|
|
|
EchoResponse response; |
|
|
|
|
|
|
|
|
|
// If server_try_cancel is set in the metadata, the RPC is cancelled by the
|
|
|
|
|
// server by calling ServerContext::TryCancel() depending on the value:
|
|
|
|
|
// CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/
|
|
|
|
|
// writes any messages from/to the client
|
|
|
|
|
// CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
|
|
|
|
|
// reading/writing messages from/to the client
|
|
|
|
|
// CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server
|
|
|
|
|
// reads/writes all messages from/to the client
|
|
|
|
|
int server_try_cancel = GetIntValueFromMetadata( |
|
|
|
|
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); |
|
|
|
|
|
|
|
|
|
EchoRequest request; |
|
|
|
|
EchoResponse response; |
|
|
|
|
|
|
|
|
|
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { |
|
|
|
|
ServerTryCancel(context); |
|
|
|
|
return Status::CANCELLED; |
|
|
|
@ -574,11 +606,23 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// == Tests for cancelling RPC from server side ==
|
|
|
|
|
|
|
|
|
|
// This class is for testing scenarios where RPCs are cancelled on the server
|
|
|
|
|
// by calling ServerContext::TryCancel()
|
|
|
|
|
class End2endServerTryCancelTest : public End2endTest { |
|
|
|
|
protected: |
|
|
|
|
// Tests for Client streaming
|
|
|
|
|
// Helper for testing client-streaming RPCs which are cancelled on the server.
|
|
|
|
|
// Depending on the value of server_try_cancel parameter, this will test one
|
|
|
|
|
// of the following three scenarios:
|
|
|
|
|
// CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
|
|
|
|
|
// any messages from the client
|
|
|
|
|
//
|
|
|
|
|
// CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
|
|
|
|
|
// messages from the client
|
|
|
|
|
//
|
|
|
|
|
// CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
|
|
|
|
|
// the messages from the client
|
|
|
|
|
//
|
|
|
|
|
// NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
|
|
|
|
|
void TestRequestStreamServerCancel( |
|
|
|
|
ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) { |
|
|
|
|
ResetStub(); |
|
|
|
@ -586,10 +630,12 @@ class End2endServerTryCancelTest : public End2endTest { |
|
|
|
|
EchoResponse response; |
|
|
|
|
ClientContext context; |
|
|
|
|
|
|
|
|
|
// Send server_try_cancel value in the client metadata
|
|
|
|
|
context.AddMetadata(kServerTryCancelRequest, |
|
|
|
|
std::to_string(server_try_cancel)); |
|
|
|
|
|
|
|
|
|
auto stream = stub_->RequestStream(&context, &response); |
|
|
|
|
|
|
|
|
|
int num_msgs_sent = 0; |
|
|
|
|
while (num_msgs_sent < num_msgs_to_send) { |
|
|
|
|
request.set_message("hello"); |
|
|
|
@ -599,17 +645,32 @@ class End2endServerTryCancelTest : public End2endTest { |
|
|
|
|
num_msgs_sent++; |
|
|
|
|
} |
|
|
|
|
gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent); |
|
|
|
|
|
|
|
|
|
stream->WritesDone(); |
|
|
|
|
Status s = stream->Finish(); |
|
|
|
|
|
|
|
|
|
// At this point, we know for sure that RPC was cancelled by the server
|
|
|
|
|
// since we passed server_try_cancel value in the metadata. Depending on the
|
|
|
|
|
// value of server_try_cancel, the RPC might have been cancelled by the
|
|
|
|
|
// server at different stages. The following validates our expectations of
|
|
|
|
|
// number of messages sent in various cancellation scenarios:
|
|
|
|
|
|
|
|
|
|
switch (server_try_cancel) { |
|
|
|
|
case CANCEL_BEFORE_PROCESSING: |
|
|
|
|
case CANCEL_DURING_PROCESSING: |
|
|
|
|
// If the RPC is cancelled by server before / during messages from the
|
|
|
|
|
// client, it means that the client most likely did not get a chance to
|
|
|
|
|
// send all the messages it wanted to send. i.e num_msgs_sent <=
|
|
|
|
|
// num_msgs_to_send
|
|
|
|
|
EXPECT_LE(num_msgs_sent, num_msgs_to_send); |
|
|
|
|
break; |
|
|
|
|
|
|
|
|
|
case CANCEL_AFTER_PROCESSING: |
|
|
|
|
// If the RPC was cancelled after all messages were read by the server,
|
|
|
|
|
// the client did get a chance to send all its messages
|
|
|
|
|
EXPECT_EQ(num_msgs_sent, num_msgs_to_send); |
|
|
|
|
break; |
|
|
|
|
|
|
|
|
|
default: |
|
|
|
|
gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", |
|
|
|
|
server_try_cancel); |
|
|
|
@ -622,7 +683,19 @@ class End2endServerTryCancelTest : public End2endTest { |
|
|
|
|
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Test for server streaming
|
|
|
|
|
// Helper for testing server-streaming RPCs which are cancelled on the server.
|
|
|
|
|
// Depending on the value of server_try_cancel parameter, this will test one
|
|
|
|
|
// of the following three scenarios:
|
|
|
|
|
// CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before writing
|
|
|
|
|
// any messages to the client
|
|
|
|
|
//
|
|
|
|
|
// CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while writing
|
|
|
|
|
// messages to the client
|
|
|
|
|
//
|
|
|
|
|
// CANCEL_AFTER PROCESSING: Rpc is cancelled by server after writing all
|
|
|
|
|
// the messages to the client
|
|
|
|
|
//
|
|
|
|
|
// NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
|
|
|
|
|
void TestResponseStreamServerCancel( |
|
|
|
|
ServerTryCancelRequestPhase server_try_cancel) { |
|
|
|
|
ResetStub(); |
|
|
|
@ -630,8 +703,10 @@ class End2endServerTryCancelTest : public End2endTest { |
|
|
|
|
EchoResponse response; |
|
|
|
|
ClientContext context; |
|
|
|
|
|
|
|
|
|
// Send server_try_cancel in the client metadata
|
|
|
|
|
context.AddMetadata(kServerTryCancelRequest, |
|
|
|
|
std::to_string(server_try_cancel)); |
|
|
|
|
|
|
|
|
|
request.set_message("hello"); |
|
|
|
|
auto stream = stub_->ResponseStream(&context, request); |
|
|
|
|
|
|
|
|
@ -648,18 +723,29 @@ class End2endServerTryCancelTest : public End2endTest { |
|
|
|
|
|
|
|
|
|
Status s = stream->Finish(); |
|
|
|
|
|
|
|
|
|
// Depending on the value of server_try_cancel, the RPC might have been
|
|
|
|
|
// cancelled by the server at different stages. The following validates our
|
|
|
|
|
// expectations of number of messages read in various cancellation
|
|
|
|
|
// scenarios:
|
|
|
|
|
switch (server_try_cancel) { |
|
|
|
|
case CANCEL_BEFORE_PROCESSING: { |
|
|
|
|
case CANCEL_BEFORE_PROCESSING: |
|
|
|
|
// Server cancelled before sending any messages. Which means the client
|
|
|
|
|
// wouldn't have read any
|
|
|
|
|
EXPECT_EQ(num_msgs_read, 0); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
case CANCEL_DURING_PROCESSING: { |
|
|
|
|
|
|
|
|
|
case CANCEL_DURING_PROCESSING: |
|
|
|
|
// Server cancelled while writing messages. Client must have read less
|
|
|
|
|
// than or equal to the expected number of messages
|
|
|
|
|
EXPECT_LE(num_msgs_read, kNumResponseStreamsMsgs); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
case CANCEL_AFTER_PROCESSING: { |
|
|
|
|
|
|
|
|
|
case CANCEL_AFTER_PROCESSING: |
|
|
|
|
// Server cancelled after writing all messages. Client must have read
|
|
|
|
|
// all messages
|
|
|
|
|
EXPECT_EQ(num_msgs_read, kNumResponseStreamsMsgs); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
|
|
|
|
|
default: { |
|
|
|
|
gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", |
|
|
|
|
server_try_cancel); |
|
|
|
@ -673,6 +759,19 @@ class End2endServerTryCancelTest : public End2endTest { |
|
|
|
|
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Helper for testing bidirectional-streaming RPCs which are cancelled on the
|
|
|
|
|
// server. Depending on the value of server_try_cancel parameter, this will
|
|
|
|
|
// test one of the following three scenarios:
|
|
|
|
|
// CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
|
|
|
|
|
// writing any messages from/to the client
|
|
|
|
|
//
|
|
|
|
|
// CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading/
|
|
|
|
|
// writing messages from/to the client
|
|
|
|
|
//
|
|
|
|
|
// CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading/writing
|
|
|
|
|
// all the messages from/to the client
|
|
|
|
|
//
|
|
|
|
|
// NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
|
|
|
|
|
void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel, |
|
|
|
|
int num_messages) { |
|
|
|
|
ResetStub(); |
|
|
|
@ -680,6 +779,7 @@ class End2endServerTryCancelTest : public End2endTest { |
|
|
|
|
EchoResponse response; |
|
|
|
|
ClientContext context; |
|
|
|
|
|
|
|
|
|
// Send server_try_cancel in the client metadata
|
|
|
|
|
context.AddMetadata(kServerTryCancelRequest, |
|
|
|
|
std::to_string(server_try_cancel)); |
|
|
|
|
|
|
|
|
@ -707,27 +807,31 @@ class End2endServerTryCancelTest : public End2endTest { |
|
|
|
|
stream->WritesDone(); |
|
|
|
|
Status s = stream->Finish(); |
|
|
|
|
|
|
|
|
|
// Depending on the value of server_try_cancel, the RPC might have been
|
|
|
|
|
// cancelled by the server at different stages. The following validates our
|
|
|
|
|
// expectations of number of messages read in various cancellation
|
|
|
|
|
// scenarios:
|
|
|
|
|
switch (server_try_cancel) { |
|
|
|
|
case CANCEL_BEFORE_PROCESSING: { |
|
|
|
|
case CANCEL_BEFORE_PROCESSING: |
|
|
|
|
EXPECT_EQ(num_msgs_read, 0); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
case CANCEL_DURING_PROCESSING: { |
|
|
|
|
|
|
|
|
|
case CANCEL_DURING_PROCESSING: |
|
|
|
|
EXPECT_LE(num_msgs_sent, num_messages); |
|
|
|
|
EXPECT_LE(num_msgs_read, num_msgs_sent); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
case CANCEL_AFTER_PROCESSING: { |
|
|
|
|
|
|
|
|
|
case CANCEL_AFTER_PROCESSING: |
|
|
|
|
EXPECT_EQ(num_msgs_sent, num_messages); |
|
|
|
|
EXPECT_EQ(num_msgs_read, num_msgs_sent); |
|
|
|
|
} |
|
|
|
|
default: { |
|
|
|
|
break; |
|
|
|
|
|
|
|
|
|
default: |
|
|
|
|
gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", |
|
|
|
|
server_try_cancel); |
|
|
|
|
EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL && |
|
|
|
|
server_try_cancel <= CANCEL_AFTER_PROCESSING); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
EXPECT_FALSE(s.ok()); |
|
|
|
|