diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 17b97f6ecff..2a5a7fe5ebd 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -324,6 +324,7 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { *tag = tag_; + bool orig_status = *status; if (*status && request_) { if (payload_) { *status = DeserializeProto(payload_, request_); @@ -343,7 +344,9 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { } ctx_->call_ = call_; Call call(call_, server_, cq_); - ctx_->BeginCompletionOp(&call); + if (orig_status && call_) { + ctx_->BeginCompletionOp(&call); + } // just the pointers inside call are copied here stream_->BindCall(&call); delete this; diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index c797d8af963..c006262fc34 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -143,21 +143,24 @@ class AsyncQpsServerTest { delete contexts_.front(); contexts_.pop_front(); } + for (auto& thr: threads_) { + thr.join(); + } } void ServeRpcs(int num_threads) { - std::vector threads; for (int i = 0; i < num_threads; i++) { - threads.push_back(std::thread([=]() { + threads_.push_back(std::thread([=]() { // Wait until work is available or we are shutting down bool ok; void *got_tag; while (srv_cq_.Next(&got_tag, &ok)) { - EXPECT_EQ(ok, true); - ServerRpcContext *ctx = detag(got_tag); - // The tag is a pointer to an RPC context to invoke - if (ctx->RunNextState() == false) { - // this RPC context is done, so refresh it - ctx->Reset(); + if (ok) { + ServerRpcContext *ctx = detag(got_tag); + // The tag is a pointer to an RPC context to invoke + if (ctx->RunNextState() == false) { + // this RPC context is done, so refresh it + ctx->Reset(); + } } } return; @@ -260,6 +263,7 @@ class AsyncQpsServerTest { } CompletionQueue srv_cq_; TestService::AsyncService async_service_; + std::vector threads_; std::unique_ptr server_; std::function *, void *)>