diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 4312f597b29..5a27fff09a4 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -33,6 +33,7 @@ #include #include +#include #include #include #include @@ -64,7 +65,8 @@ namespace testing { class AsyncQpsServerTest : public Server { public: AsyncQpsServerTest(const ServerConfig &config, int port) - : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) { + : srv_cq_(), async_service_(&srv_cq_), server_(nullptr), + shutdown_(false) { char *server_address = NULL; gpr_join_host_port(&server_address, "::", port); @@ -100,7 +102,9 @@ class AsyncQpsServerTest : public Server { // The tag is a pointer to an RPC context to invoke if (ctx->RunNextState(ok) == false) { // this RPC context is done, so refresh it - ctx->Reset(); + std::lock_guard g(shutdown_mutex_); + if (!shutdown_) + ctx->Reset(); } } return; @@ -109,7 +113,11 @@ class AsyncQpsServerTest : public Server { } ~AsyncQpsServerTest() { server_->Shutdown(); - srv_cq_.Shutdown(); + { + std::lock_guard g(shutdown_mutex_); + shutdown_ = true; + srv_cq_.Shutdown(); + } for (auto &thr : threads_) { thr.join(); } @@ -195,7 +203,7 @@ class AsyncQpsServerTest : public Server { class ServerRpcContextStreamingImpl GRPC_FINAL : public ServerRpcContext { public: ServerRpcContextStreamingImpl( - std::function *, void *)> request_method, std::function @@ -228,7 +236,7 @@ class AsyncQpsServerTest : public Server { next_state_ = &ServerRpcContextStreamingImpl::read_done; return true; } - + bool read_done(bool ok) { if (ok) { // invoke the method @@ -291,6 +299,9 @@ class AsyncQpsServerTest : public Server { SimpleResponse,SimpleRequest> *, void *)> request_streaming_; std::forward_list contexts_; + + std::mutex shutdown_mutex_; + bool shutdown_; }; std::unique_ptr CreateAsyncServer(const ServerConfig &config,