diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index fa1a799f1b5..d0510ec67ad 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -140,7 +140,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { class AsyncClient : public Client { public: explicit AsyncClient(const ClientConfig& config, - std::function setup_ctx) : Client(config) { for (int i = 0; i < config.async_client_threads(); i++) { @@ -158,18 +158,22 @@ class AsyncClient : public Client { if (!closed_loop_) { for (auto channel = channels_.begin(); channel != channels_.end(); channel++) { - channel_rpc_count_lock.emplace_back(); + channel_rpc_lock_.emplace_back(); rpcs_outstanding_.push_back(0); } } - else { - int t = 0; - for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { - for (auto channel = channels_.begin(); channel != channels_.end(); - channel++) { - auto* cq = cli_cqs_[t].get(); - t = (t + 1) % cli_cqs_.size(); - setup_ctx(cq, channel->get_stub(), request_); + + int t = 0; + for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { + for (auto channel = channels_.begin(); channel != channels_.end(); + channel++) { + auto* cq = cli_cqs_[t].get(); + t = (t + 1) % cli_cqs_.size(); + ClientRpcContext *ctx = setup_ctx(cq, channel->get_stub(), request_); + if (closed_loop_) { + // only relevant for closed_loop unary, but harmless for + // closed_loop streaming + ctx->Start(); } } } @@ -222,12 +226,13 @@ class AsyncClient : public Client { } issue_allowed_[thread_idx] = true; // may be ok now even if it hadn't been } - if (issue_allowed && grpc_time_source::now() >= next_issue_[thread_idx]) { + if (issue_allowed_[thread_idx] && + grpc_time_source::now() >= next_issue_[thread_idx]) { // Attempt to issue bool issued = false; for (int num_attempts = 0; num_attempts < channel_count_ && !issued; num_attempts++, next_channel_[thread_idx] = (next_channel_[thread_idx]+1)%channel_count_) { - std::lock_guard g(channel_rpc_count_lock_[next_channel_[thread_idx]]); + std::lock_guard g(channel_rpc_lock_[next_channel_[thread_idx]]); if (rpcs_outstanding[next_channel_[thread_idx]] < max_outstanding_per_channel_) { // do the work to issue rpcs_outstanding[next_channel_[thread_idx]]++; @@ -247,7 +252,7 @@ class AsyncClient : public Client { std::vector issue_allowed_; // may this thread attempt to issue std::vector next_issue_; // when should it issue? - std::vector channel_rpc_count_lock_; + std::vector channel_rpc_lock_; std::vector rpcs_outstanding_; // per-channel vector int max_outstanding_per_channel_; int channel_count_; @@ -261,15 +266,15 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient { } ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } private: - static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, + static ClientRpcContext *SetupCtx(CompletionQueue* cq, TestService::Stub* stub, const SimpleRequest& req) { auto check_done = [](grpc::Status s, SimpleResponse* response) {}; auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, const SimpleRequest& request) { return stub->AsyncUnaryCall(ctx, request, cq); }; - new ClientRpcContextUnaryImpl( - stub, req, start_req, check_done); + return new ClientRpcContextUnaryImpl( + stub, req, start_req, check_done); } }; @@ -350,7 +355,7 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient { ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } private: - static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, + static ClientRpcContext *SetupCtx(CompletionQueue* cq, TestService::Stub* stub, const SimpleRequest& req) { auto check_done = [](grpc::Status s, SimpleResponse* response) {}; auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx, @@ -358,7 +363,7 @@ private: auto stream = stub->AsyncStreamingCall(ctx, cq, tag); return stream; }; - new ClientRpcContextStreamingImpl( + return new ClientRpcContextStreamingImpl( stub, req, start_req, check_done); } };