From fd7166d264b4abd9dab4215cdc60a0c9dc635c8e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 19 May 2015 10:23:03 -0700 Subject: [PATCH] Add response streams to server crash test, fix them --- src/core/surface/call.c | 5 ++-- test/cpp/end2end/server_crash_test.cc | 29 ++++++++++++++++++-- test/cpp/end2end/server_crash_test_client.cc | 29 ++++++++++++++------ 3 files changed, 49 insertions(+), 14 deletions(-) diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 7a30de5f332..8b1ef0f2be0 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -536,9 +536,8 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, switch ((grpc_ioreq_op)i) { case GRPC_IOREQ_RECV_MESSAGE: case GRPC_IOREQ_SEND_MESSAGE: - if (master->success) { - call->request_set[i] = REQSET_EMPTY; - } else { + call->request_set[i] = REQSET_EMPTY; + if (!master->success) { call->write_state = WRITE_STATE_WRITE_CLOSED; } break; diff --git a/test/cpp/end2end/server_crash_test.cc b/test/cpp/end2end/server_crash_test.cc index 8f3954f9211..11d73aec7d6 100644 --- a/test/cpp/end2end/server_crash_test.cc +++ b/test/cpp/end2end/server_crash_test.cc @@ -79,6 +79,20 @@ class ServiceImpl GRPC_FINAL : public ::grpc::cpp::test::util::TestService::Serv gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); response.set_message(request.message()); stream->Write(response); + gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_seconds(1))); + } + return Status::OK; + } + + Status ResponseStream(ServerContext* context, const EchoRequest* request, + ServerWriter* writer) GRPC_OVERRIDE { + EchoResponse response; + for (int i = 0;; i++) { + std::ostringstream msg; + msg << "Hello " << i; + response.set_message(msg.str()); + if (!writer->Write(response)) break; + gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_seconds(1))); } return Status::OK; } @@ -89,7 +103,7 @@ class CrashTest : public ::testing::Test { CrashTest() {} std::unique_ptr - CreateServerAndClient() { + CreateServerAndClient(const std::string& mode) { auto port = grpc_pick_unused_port_or_die(); std::ostringstream addr_stream; addr_stream << "localhost:" << port; @@ -97,6 +111,7 @@ class CrashTest : public ::testing::Test { client_.reset(new SubProcess({ g_root + "/server_crash_test_client", "--address=" + addr, + "--mode=" + mode })); GPR_ASSERT(client_); @@ -115,8 +130,16 @@ class CrashTest : public ::testing::Test { ServiceImpl service_; }; -TEST_F(CrashTest, Kill) { - auto server = CreateServerAndClient(); +TEST_F(CrashTest, ResponseStream) { + auto server = CreateServerAndClient("response"); + + gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_seconds(5))); + KillClient(); + server->Shutdown(); +} + +TEST_F(CrashTest, BidiStream) { + auto server = CreateServerAndClient("bidi"); gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_seconds(5))); KillClient(); diff --git a/test/cpp/end2end/server_crash_test_client.cc b/test/cpp/end2end/server_crash_test_client.cc index d4273b8f587..497ccb4cb2b 100644 --- a/test/cpp/end2end/server_crash_test_client.cc +++ b/test/cpp/end2end/server_crash_test_client.cc @@ -46,6 +46,7 @@ #include "test/cpp/util/echo.grpc.pb.h" DEFINE_string(address, "", "Address to connect to"); +DEFINE_string(mode, "", "Test mode to use"); using grpc::cpp::test::util::EchoRequest; using grpc::cpp::test::util::EchoResponse; @@ -66,14 +67,26 @@ int main(int argc, char** argv) { EchoResponse response; grpc::ClientContext context; - auto stream = stub->BidiStream(&context); - for (int i = 0;; i++) { - std::ostringstream msg; - msg << "Hello " << i; - request.set_message(msg.str()); - GPR_ASSERT(stream->Write(request)); - GPR_ASSERT(stream->Read(&response)); - GPR_ASSERT(response.message() == request.message()); + if (FLAGS_mode == "bidi") { + auto stream = stub->BidiStream(&context); + for (int i = 0;; i++) { + std::ostringstream msg; + msg << "Hello " << i; + request.set_message(msg.str()); + GPR_ASSERT(stream->Write(request)); + GPR_ASSERT(stream->Read(&response)); + GPR_ASSERT(response.message() == request.message()); + } + } else if (FLAGS_mode == "response") { + EchoRequest request; + request.set_message("Hello"); + auto stream = stub->ResponseStream(&context, request); + for (;;) { + GPR_ASSERT(stream->Read(&response)); + } + } else { + gpr_log(GPR_ERROR, "invalid test mode '%s'", FLAGS_mode.c_str()); + return 1; } return 0;