diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 7a8858ef194..6876961e210 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -179,10 +179,13 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen { grpc::mutex mu_; bool started_; bool shutdown_; + bool shutdown_notified_; // The number of threads which are running callbacks. int num_running_cb_; grpc::condition_variable callback_cv_; + grpc::condition_variable shutdown_cv_; + std::shared_ptr global_callbacks_; std::list* sync_methods_; diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index fb4c68ebe49..af04fd4ca64 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -281,6 +281,7 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, : max_message_size_(max_message_size), started_(false), shutdown_(false), + shutdown_notified_(false), num_running_cb_(0), sync_methods_(new std::list), has_generic_service_(false), @@ -462,13 +463,16 @@ void Server::ShutdownInternal(gpr_timespec deadline) { while (num_running_cb_ != 0) { callback_cv_.wait(lock); } + + shutdown_notified_ = true; + shutdown_cv_.notify_all(); } } void Server::Wait() { grpc::unique_lock lock(mu_); - while (num_running_cb_ != 0) { - callback_cv_.wait(lock); + while (started_ && !shutdown_notified_) { + shutdown_cv_.wait(lock); } } diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 6c7eae53a40..4a8936d2810 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -345,6 +345,31 @@ TEST_P(AsyncEnd2endTest, SequentialRpcs) { SendRpc(10); } +// We do not need to protect notify because the use is synchronized. +void ServerWait(Server* server, int* notify) { + server->Wait(); + *notify = 1; +} +TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) { + int notify = 0; + std::thread* wait_thread = + new std::thread(&ServerWait, server_.get(), ¬ify); + ResetStub(); + SendRpc(1); + EXPECT_EQ(0, notify); + server_->Shutdown(); + wait_thread->join(); + EXPECT_EQ(1, notify); + delete wait_thread; +} + +TEST_P(AsyncEnd2endTest, ShutdownThenWait) { + ResetStub(); + SendRpc(1); + server_->Shutdown(); + server_->Wait(); +} + // Test a simple RPC using the async version of Next TEST_P(AsyncEnd2endTest, AsyncNextRpc) { ResetStub();