From f5a3e32b9b1ec178de3d23aa6bf173aae9470eca Mon Sep 17 00:00:00 2001 From: Moiz Haidry Date: Thu, 18 Jul 2019 09:46:42 -0700 Subject: [PATCH 1/4] Take the mu_call mutex before zombifying pending calls so that there is no race between publishing new rpcs during a shutdown scenario --- src/core/lib/surface/server.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index a1c7d132ade..7fe05c10a54 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -711,8 +711,10 @@ static void maybe_finish_shutdown(grpc_server* server) { return; } + gpr_mu_lock(&server->mu_call); kill_pending_work_locked( server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); + gpr_mu_unlock(&server->mu_call); if (server->root_channel_data.next != &server->root_channel_data || server->listeners_destroyed < num_listeners(server)) { From 6d984166bd00bccd729ebbb2ae01752195b63f98 Mon Sep 17 00:00:00 2001 From: Moiz Haidry Date: Fri, 19 Jul 2019 11:45:33 -0700 Subject: [PATCH 2/4] Porting Aaron Jacob's unit test that detecting the race --- test/cpp/end2end/generic_end2end_test.cc | 77 ++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc index c2807310aa4..d0571dc1a6f 100644 --- a/test/cpp/end2end/generic_end2end_test.cc +++ b/test/cpp/end2end/generic_end2end_test.cc @@ -92,6 +92,7 @@ class GenericEnd2endTest : public ::testing::Test { void ResetStub() { std::shared_ptr channel = grpc::CreateChannel( server_address_.str(), InsecureChannelCredentials()); + stub_ = grpc::testing::EchoTestService::NewStub(channel); generic_stub_.reset(new GenericStub(channel)); } @@ -177,6 +178,54 @@ class GenericEnd2endTest : public ::testing::Test { } } + // Return errors to up to one call that comes in on the supplied completion + // queue, until the CQ is being shut down (and therefore we can no longer + // enqueue further events). + void DriveCompletionQueue() { + enum class Event : uintptr_t { + kCallReceived, + kResponseSent, + }; + // Request the call, but only if the main thread hasn't beaten us to + // shutting down the CQ. + grpc::GenericServerContext server_context; + grpc::GenericServerAsyncReaderWriter reader_writer(&server_context); + + { + std::lock_guard lock(shutting_down_mu_); + if (!shutting_down_) { + generic_service_.RequestCall( + &server_context, &reader_writer, srv_cq_.get(), srv_cq_.get(), + reinterpret_cast(Event::kCallReceived)); + } + } + // Process events. + { + Event event; + bool ok; + while (srv_cq_->Next(reinterpret_cast(&event), &ok)) { + std::lock_guard lock(shutting_down_mu_); + if (shutting_down_) { + // The main thread has started shutting down. Simply continue to drain + // events. + continue; + } + + switch (event) { + case Event::kCallReceived: + reader_writer.Finish( + ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "go away"), + reinterpret_cast(Event::kResponseSent)); + break; + + case Event::kResponseSent: + // We are done. + break; + } + } + } + } + CompletionQueue cli_cq_; std::unique_ptr srv_cq_; std::unique_ptr stub_; @@ -185,6 +234,8 @@ class GenericEnd2endTest : public ::testing::Test { AsyncGenericService generic_service_; const grpc::string server_host_; std::ostringstream server_address_; + bool shutting_down_; + std::mutex shutting_down_mu_; }; TEST_F(GenericEnd2endTest, SimpleRpc) { @@ -330,6 +381,32 @@ TEST_F(GenericEnd2endTest, Deadline) { gpr_time_from_seconds(10, GPR_TIMESPAN))); } +TEST_F(GenericEnd2endTest, ShortDeadline) { + ResetStub(); + + ClientContext cli_ctx; + EchoRequest request; + EchoResponse response; + + { + std::lock_guard lock(shutting_down_mu_); + shutting_down_ = false; + } + std::thread driver([=] { DriveCompletionQueue(); }); + + request.set_message(""); + cli_ctx.set_deadline(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_micros(500, GPR_TIMESPAN))); + Status s = stub_->Echo(&cli_ctx, request, &response); + EXPECT_FALSE(s.ok()); + { + std::lock_guard lock(shutting_down_mu_); + shutting_down_ = true; + } + TearDown(); + driver.join(); +} + } // namespace } // namespace testing } // namespace grpc From f4b1182a100c81eca490a614ad56e32b068750db Mon Sep 17 00:00:00 2001 From: Moiz Haidry Date: Fri, 19 Jul 2019 13:48:35 -0700 Subject: [PATCH 3/4] Addressed review comments --- test/cpp/end2end/generic_end2end_test.cc | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc index d0571dc1a6f..92654ff0de0 100644 --- a/test/cpp/end2end/generic_end2end_test.cc +++ b/test/cpp/end2end/generic_end2end_test.cc @@ -388,11 +388,8 @@ TEST_F(GenericEnd2endTest, ShortDeadline) { EchoRequest request; EchoResponse response; - { - std::lock_guard lock(shutting_down_mu_); - shutting_down_ = false; - } - std::thread driver([=] { DriveCompletionQueue(); }); + shutting_down_ = false; + std::thread driver([this] { DriveCompletionQueue(); }); request.set_message(""); cli_ctx.set_deadline(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), From da85cec0f2251f504227af90798c8b38258b4e00 Mon Sep 17 00:00:00 2001 From: Moiz Haidry Date: Fri, 19 Jul 2019 14:19:23 -0700 Subject: [PATCH 4/4] Add condition to avoid duplicate shutdown --- test/cpp/end2end/generic_end2end_test.cc | 28 ++++++++++++++---------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc index 92654ff0de0..f9f68bbe023 100644 --- a/test/cpp/end2end/generic_end2end_test.cc +++ b/test/cpp/end2end/generic_end2end_test.cc @@ -62,6 +62,7 @@ class GenericEnd2endTest : public ::testing::Test { GenericEnd2endTest() : server_host_("localhost") {} void SetUp() override { + shut_down_ = false; int port = grpc_pick_unused_port_or_die(); server_address_ << server_host_ << ":" << port; // Setup server @@ -77,17 +78,21 @@ class GenericEnd2endTest : public ::testing::Test { server_ = builder.BuildAndStart(); } - void TearDown() override { - server_->Shutdown(); - void* ignored_tag; - bool ignored_ok; - cli_cq_.Shutdown(); - srv_cq_->Shutdown(); - while (cli_cq_.Next(&ignored_tag, &ignored_ok)) - ; - while (srv_cq_->Next(&ignored_tag, &ignored_ok)) - ; + void ShutDownServerAndCQs() { + if (!shut_down_) { + server_->Shutdown(); + void* ignored_tag; + bool ignored_ok; + cli_cq_.Shutdown(); + srv_cq_->Shutdown(); + while (cli_cq_.Next(&ignored_tag, &ignored_ok)) + ; + while (srv_cq_->Next(&ignored_tag, &ignored_ok)) + ; + shut_down_ = true; + } } + void TearDown() override { ShutDownServerAndCQs(); } void ResetStub() { std::shared_ptr channel = grpc::CreateChannel( @@ -235,6 +240,7 @@ class GenericEnd2endTest : public ::testing::Test { const grpc::string server_host_; std::ostringstream server_address_; bool shutting_down_; + bool shut_down_; std::mutex shutting_down_mu_; }; @@ -400,7 +406,7 @@ TEST_F(GenericEnd2endTest, ShortDeadline) { std::lock_guard lock(shutting_down_mu_); shutting_down_ = true; } - TearDown(); + ShutDownServerAndCQs(); driver.join(); }