Make stream responses togglable via context

pull/11557/head
ncteisen 8 years ago
parent e9089624fe
commit 9bee3086c3
  1. 46
      test/cpp/end2end/end2end_test.cc
  2. 6
      test/cpp/end2end/hybrid_end2end_test.cc
  3. 8
      test/cpp/end2end/test_service_impl.cc
  4. 4
      test/cpp/end2end/test_service_impl.h
  5. 6
      test/cpp/util/grpc_tool_test.cc

@ -437,7 +437,7 @@ class End2endServerTryCancelTest : public End2endTest {
auto stream = stub_->ResponseStream(&context, request);
int num_msgs_read = 0;
while (num_msgs_read < kNumResponseStreamsMsgs) {
while (num_msgs_read < kServerDefaultResponseStreamsToSend) {
if (!stream->Read(&response)) {
break;
}
@ -463,14 +463,14 @@ class End2endServerTryCancelTest : public End2endTest {
case CANCEL_DURING_PROCESSING:
// Server cancelled while writing messages. Client must have read less
// than or equal to the expected number of messages
EXPECT_LE(num_msgs_read, kNumResponseStreamsMsgs);
EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
break;
case CANCEL_AFTER_PROCESSING:
// Even though the Server cancelled after writing all messages, the RPC
// may be cancelled before the Client got a chance to read all the
// messages.
EXPECT_LE(num_msgs_read, kNumResponseStreamsMsgs);
EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
break;
default: {
@ -743,12 +743,10 @@ TEST_P(End2endTest, ResponseStream) {
request.set_message("hello");
auto stream = stub_->ResponseStream(&context, request);
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message() + "0");
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message() + "1");
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message() + "2");
for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message() + grpc::to_string(i));
}
EXPECT_FALSE(stream->Read(&response));
Status s = stream->Finish();
@ -764,12 +762,10 @@ TEST_P(End2endTest, ResponseStreamWithCoalescingApi) {
context.AddMetadata(kServerUseCoalescingApi, "1");
auto stream = stub_->ResponseStream(&context, request);
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message() + "0");
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message() + "1");
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message() + "2");
for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message() + grpc::to_string(i));
}
EXPECT_FALSE(stream->Read(&response));
Status s = stream->Finish();
@ -785,20 +781,12 @@ TEST_P(End2endTest, BidiStream) {
auto stream = stub_->BidiStream(&context);
request.set_message(msg + "0");
EXPECT_TRUE(stream->Write(request));
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message());
request.set_message(msg + "1");
EXPECT_TRUE(stream->Write(request));
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message());
request.set_message(msg + "2");
EXPECT_TRUE(stream->Write(request));
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message());
for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
request.set_message(msg + grpc::to_string(i));
EXPECT_TRUE(stream->Write(request));
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message());
}
stream->WritesDone();
EXPECT_FALSE(stream->Read(&response));

@ -521,7 +521,7 @@ class SplitResponseStreamDupPkg
stream->NextMessageSize(&next_msg_sz);
gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
GPR_ASSERT(stream->Read(&req));
for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
resp.set_message(req.message() + grpc::to_string(i) + "_dup");
GPR_ASSERT(stream->Write(resp));
}
@ -561,7 +561,7 @@ class FullySplitStreamedDupPkg
stream->NextMessageSize(&next_msg_sz);
gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
GPR_ASSERT(stream->Read(&req));
for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
resp.set_message(req.message() + grpc::to_string(i) + "_dup");
GPR_ASSERT(stream->Write(resp));
}
@ -613,7 +613,7 @@ class FullyStreamedDupPkg : public duplicate::EchoTestService::StreamedService {
stream->NextMessageSize(&next_msg_sz);
gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
GPR_ASSERT(stream->Read(&req));
for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
resp.set_message(req.message() + grpc::to_string(i) + "_dup");
GPR_ASSERT(stream->Write(resp));
}

@ -239,6 +239,10 @@ Status TestServiceImpl::ResponseStream(ServerContext* context,
int server_coalescing_api = GetIntValueFromMetadata(
kServerUseCoalescingApi, context->client_metadata(), 0);
int server_responses_to_send = GetIntValueFromMetadata(
kServerResponseStreamsToSend, context->client_metadata(),
kServerDefaultResponseStreamsToSend);
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
ServerTryCancel(context);
return Status::CANCELLED;
@ -251,9 +255,9 @@ Status TestServiceImpl::ResponseStream(ServerContext* context,
new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
}
for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
for (int i = 0; i < server_responses_to_send; i++) {
response.set_message(request->message() + grpc::to_string(i));
if (i == kNumResponseStreamsMsgs - 1 && server_coalescing_api != 0) {
if (i == server_responses_to_send - 1 && server_coalescing_api != 0) {
writer->WriteLast(response, WriteOptions());
} else {
writer->Write(response);

@ -29,7 +29,9 @@
namespace grpc {
namespace testing {
const int kNumResponseStreamsMsgs = 3;
// const int kNumResponseStreamsMsgs = 3;
const int kServerDefaultResponseStreamsToSend = 3;
const char* const kServerResponseStreamsToSend = "server_responses_to_send";
const char* const kServerCancelAfterReads = "cancel_after_reads";
const char* const kServerTryCancelRequest = "server_try_cancel";
const char* const kDebugInfoTrailerKey = "debug-info-bin";

@ -87,7 +87,7 @@ DECLARE_bool(l);
namespace {
const int kNumResponseStreamsMsgs = 3;
const int kServerDefaultResponseStreamsToSend = 3;
class TestCliCredentials final : public grpc::testing::CliCredentials {
public:
@ -159,7 +159,7 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
context->AddTrailingMetadata("trailing_key", "trailing_value");
EchoResponse response;
for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
response.set_message(request->message() + grpc::to_string(i));
writer->Write(response);
}
@ -463,7 +463,7 @@ TEST_F(GrpcToolTest, CallCommandResponseStream) {
std::placeholders::_1)));
// Expected output: "message: \"Hello{n}\""
for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
grpc::string expected_response_text =
"message: \"Hello" + grpc::to_string(i) + "\"\n";
EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(),

Loading…
Cancel
Save