Make sure that nothing gets added to cq after shutdown

pull/1113/head
Vijay Pai 10 years ago
parent 55bb5bdf80
commit 8ad32091e6
  1. 21
      test/cpp/qps/server_async.cc

@ -33,6 +33,7 @@
#include <forward_list> #include <forward_list>
#include <functional> #include <functional>
#include <mutex>
#include <sys/time.h> #include <sys/time.h>
#include <sys/resource.h> #include <sys/resource.h>
#include <sys/signal.h> #include <sys/signal.h>
@ -64,7 +65,8 @@ namespace testing {
class AsyncQpsServerTest : public Server { class AsyncQpsServerTest : public Server {
public: public:
AsyncQpsServerTest(const ServerConfig &config, int port) AsyncQpsServerTest(const ServerConfig &config, int port)
: srv_cq_(), async_service_(&srv_cq_), server_(nullptr) { : srv_cq_(), async_service_(&srv_cq_), server_(nullptr),
shutdown_(false) {
char *server_address = NULL; char *server_address = NULL;
gpr_join_host_port(&server_address, "::", port); gpr_join_host_port(&server_address, "::", port);
@ -100,7 +102,9 @@ class AsyncQpsServerTest : public Server {
// The tag is a pointer to an RPC context to invoke // The tag is a pointer to an RPC context to invoke
if (ctx->RunNextState(ok) == false) { if (ctx->RunNextState(ok) == false) {
// this RPC context is done, so refresh it // this RPC context is done, so refresh it
ctx->Reset(); std::lock_guard<std::mutex> g(shutdown_mutex_);
if (!shutdown_)
ctx->Reset();
} }
} }
return; return;
@ -109,7 +113,11 @@ class AsyncQpsServerTest : public Server {
} }
~AsyncQpsServerTest() { ~AsyncQpsServerTest() {
server_->Shutdown(); server_->Shutdown();
srv_cq_.Shutdown(); {
std::lock_guard<std::mutex> g(shutdown_mutex_);
shutdown_ = true;
srv_cq_.Shutdown();
}
for (auto &thr : threads_) { for (auto &thr : threads_) {
thr.join(); thr.join();
} }
@ -195,7 +203,7 @@ class AsyncQpsServerTest : public Server {
class ServerRpcContextStreamingImpl GRPC_FINAL : public ServerRpcContext { class ServerRpcContextStreamingImpl GRPC_FINAL : public ServerRpcContext {
public: public:
ServerRpcContextStreamingImpl( ServerRpcContextStreamingImpl(
std::function<void(ServerContext *, std::function<void(ServerContext *,
grpc::ServerAsyncReaderWriter<ResponseType, grpc::ServerAsyncReaderWriter<ResponseType,
RequestType> *, void *)> request_method, RequestType> *, void *)> request_method,
std::function<grpc::Status(const RequestType *, ResponseType *)> std::function<grpc::Status(const RequestType *, ResponseType *)>
@ -228,7 +236,7 @@ class AsyncQpsServerTest : public Server {
next_state_ = &ServerRpcContextStreamingImpl::read_done; next_state_ = &ServerRpcContextStreamingImpl::read_done;
return true; return true;
} }
bool read_done(bool ok) { bool read_done(bool ok) {
if (ok) { if (ok) {
// invoke the method // invoke the method
@ -291,6 +299,9 @@ class AsyncQpsServerTest : public Server {
SimpleResponse,SimpleRequest> *, void *)> SimpleResponse,SimpleRequest> *, void *)>
request_streaming_; request_streaming_;
std::forward_list<ServerRpcContext *> contexts_; std::forward_list<ServerRpcContext *> contexts_;
std::mutex shutdown_mutex_;
bool shutdown_;
}; };
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config, std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config,

Loading…
Cancel
Save