diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 0a6d9beeca6..1b039b3dc17 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -130,39 +130,26 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { response_reader_; }; -class AsyncUnaryClient GRPC_FINAL : public Client { +class AsyncClient : public Client { public: - explicit AsyncUnaryClient(const ClientConfig& config) : Client(config) { + explicit AsyncClient(const ClientConfig& config, + void (*setup_ctx)(CompletionQueue*, TestService::Stub*, + const SimpleRequest&)) : + Client(config) { for (int i = 0; i < config.async_client_threads(); i++) { cli_cqs_.emplace_back(new CompletionQueue); } - - auto check_done = [](grpc::Status s, SimpleResponse* response) {}; - int t = 0; for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { for (auto channel = channels_.begin(); channel != channels_.end(); channel++) { auto* cq = cli_cqs_[t].get(); t = (t + 1) % cli_cqs_.size(); - auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, - const SimpleRequest& request, void* tag) { - return stub->AsyncUnaryCall(ctx, request, cq, tag); - }; - - TestService::Stub* stub = channel->get_stub(); - const SimpleRequest& request = request_; - new ClientRpcContextUnaryImpl( - stub, request, start_req, check_done); + (*setup_ctx)(cq, channel->get_stub(), request_); } } - - StartThreads(config.async_client_threads()); } - - ~AsyncUnaryClient() GRPC_OVERRIDE { - EndThreads(); - + virtual ~AsyncClient() { for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { (*cq)->Shutdown(); void* got_tag; @@ -173,10 +160,13 @@ class AsyncUnaryClient GRPC_FINAL : public Client { } } - bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + bool ThreadFunc(Histogram* histogram, size_t thread_idx) + GRPC_OVERRIDE GRPC_FINAL { void* got_tag; bool ok; - switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, std::chrono::system_clock::now() + std::chrono::seconds(1))) { + switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, + std::chrono::system_clock::now() + + std::chrono::seconds(1))) { case CompletionQueue::SHUTDOWN: return false; case CompletionQueue::TIMEOUT: return true; case CompletionQueue::GOT_EVENT: break; @@ -192,10 +182,30 @@ class AsyncUnaryClient GRPC_FINAL : public Client { return true; } - + private: std::vector> cli_cqs_; }; +class AsyncUnaryClient GRPC_FINAL : public AsyncClient { + public: + explicit AsyncUnaryClient(const ClientConfig& config) : + AsyncClient(config, SetupCtx) { + StartThreads(config.async_client_threads()); + } + ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } +private: + static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, + const SimpleRequest& req) { + auto check_done = [](grpc::Status s, SimpleResponse* response) {}; + auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, + const SimpleRequest& request, void* tag) { + return stub->AsyncUnaryCall(ctx, request, cq, tag); + }; + new ClientRpcContextUnaryImpl( + stub, req, start_req, check_done); + } +}; + template class ClientRpcContextStreamingImpl : public ClientRpcContext { public: @@ -241,7 +251,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { return(false); } next_state_ = &ClientRpcContextStreamingImpl::ReadDone; - stream_->Read(&response_, ClientRpcContext::tag(this)); + stream_->Read(&response_, ClientRpcContext::tag(this)); return true; } bool ReadDone(bool ok, Histogram *hist) { @@ -263,71 +273,26 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { stream_; }; -class AsyncStreamingClient GRPC_FINAL : public Client { +class AsyncStreamingClient GRPC_FINAL : public AsyncClient { public: - explicit AsyncStreamingClient(const ClientConfig &config) : Client(config) { - for (int i = 0; i < config.async_client_threads(); i++) { - cli_cqs_.emplace_back(new CompletionQueue); - } - - auto check_done = [](grpc::Status s, SimpleResponse* response) {}; - - int t = 0; - for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { - for (auto channel = channels_.begin(); channel != channels_.end(); - channel++) { - auto* cq = cli_cqs_[t].get(); - t = (t + 1) % cli_cqs_.size(); - auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx, - void *tag) { - auto stream = stub->AsyncStreamingCall(ctx, cq, tag); - return stream; - }; - - TestService::Stub *stub = channel->get_stub(); - const SimpleRequest &request = request_; - new ClientRpcContextStreamingImpl( - stub, request, start_req, check_done); - } - } - + explicit AsyncStreamingClient(const ClientConfig &config) : + AsyncClient(config, SetupCtx) { StartThreads(config.async_client_threads()); } - ~AsyncStreamingClient() GRPC_OVERRIDE { - EndThreads(); - - for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { - (*cq)->Shutdown(); - void *got_tag; - bool ok; - while ((*cq)->Next(&got_tag, &ok)) { - delete ClientRpcContext::detag(got_tag); - } - } - } - - bool ThreadFunc(Histogram *histogram, size_t thread_idx) GRPC_OVERRIDE { - void *got_tag; - bool ok; - switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, std::chrono::system_clock::now() + std::chrono::seconds(1))) { - case CompletionQueue::SHUTDOWN: return false; - case CompletionQueue::TIMEOUT: return true; - case CompletionQueue::GOT_EVENT: break; - } - - ClientRpcContext *ctx = ClientRpcContext::detag(got_tag); - if (ctx->RunNextState(ok, histogram) == false) { - // call the callback and then delete it - ctx->RunNextState(ok, histogram); - ctx->StartNewClone(); - delete ctx; - } - - return true; + ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } +private: + static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, + const SimpleRequest& req) { + auto check_done = [](grpc::Status s, SimpleResponse* response) {}; + auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx, + void *tag) { + auto stream = stub->AsyncStreamingCall(ctx, cq, tag); + return stream; + }; + new ClientRpcContextStreamingImpl( + stub, req, start_req, check_done); } - - std::vector> cli_cqs_; }; std::unique_ptr CreateAsyncUnaryClient(const ClientConfig& args) {