diff --git a/test/cpp/microbenchmarks/callback_streaming_ping_pong.h b/test/cpp/microbenchmarks/callback_streaming_ping_pong.h index 7861aa67780..af2db947800 100644 --- a/test/cpp/microbenchmarks/callback_streaming_ping_pong.h +++ b/test/cpp/microbenchmarks/callback_streaming_ping_pong.h @@ -51,15 +51,8 @@ static void BM_CallbackBidiStreaming(benchmark::State& state) { } if (state.KeepRunning()) { GPR_TIMER_SCOPE("BenchmarkCycle", 0); - std::mutex mu; - std::condition_variable cv; - bool done = false; - BidiClient test{&state, stub_.get(), &request, &response, &mu, &cv, &done}; - test.StartNewRpc(); - std::unique_lock<std::mutex> l(mu); - while (!done) { - cv.wait(l); - } + BidiClient test{&state, stub_.get(), &request, &response}; + test.Await(); } fixture->Finish(state); fixture.reset(); diff --git a/test/cpp/microbenchmarks/callback_test_service.h b/test/cpp/microbenchmarks/callback_test_service.h index 0d371c4c740..b7b76fd806c 100644 --- a/test/cpp/microbenchmarks/callback_test_service.h +++ b/test/cpp/microbenchmarks/callback_test_service.h @@ -49,17 +49,11 @@ class BidiClient : public grpc::experimental::ClientBidiReactor<EchoRequest, EchoResponse> { public: BidiClient(benchmark::State* state, EchoTestService::Stub* stub, - EchoRequest* request, EchoResponse* response, std::mutex* mu, - std::condition_variable* cv, bool* done) - : state_{state}, - stub_{stub}, - request_{request}, - response_{response}, - mu_{mu}, - cv_{cv}, - done_(done) { + EchoRequest* request, EchoResponse* response) + : state_{state}, stub_{stub}, request_{request}, response_{response} { msgs_size_ = state->range(0); msgs_to_send_ = state->range(1); + StartNewRpc(); } void OnReadDone(bool ok) override { @@ -82,27 +76,33 @@ class BidiClient void OnDone(const Status& s) override { GPR_ASSERT(s.ok()); if (state_->KeepRunning()) { - BidiClient* test = - new BidiClient(state_, stub_, request_, response_, mu_, cv_, done_); - test->StartNewRpc(); + writes_complete_ = 0; + StartNewRpc(); } else { - std::unique_lock<std::mutex> l(*mu_); - *done_ = true; - cv_->notify_one(); + std::unique_lock<std::mutex> l(mu); + done = true; + cv.notify_one(); } - delete cli_ctx_; } void StartNewRpc() { - cli_ctx_ = new ClientContext(); - cli_ctx_->AddMetadata(kServerFinishAfterNReads, - grpc::to_string(msgs_to_send_)); - cli_ctx_->AddMetadata(kServerMessageSize, grpc::to_string(msgs_size_)); - stub_->experimental_async()->BidiStream(cli_ctx_, this); + cli_ctx_.reset(new ClientContext); + cli_ctx_.get()->AddMetadata(kServerFinishAfterNReads, + grpc::to_string(msgs_to_send_)); + cli_ctx_.get()->AddMetadata(kServerMessageSize, + grpc::to_string(msgs_size_)); + stub_->experimental_async()->BidiStream(cli_ctx_.get(), this); MaybeWrite(); StartCall(); } + void Await() { + std::unique_lock<std::mutex> l(mu); + while (!done) { + cv.wait(l); + } + } + private: void MaybeWrite() { if (writes_complete_ < msgs_to_send_) { @@ -112,7 +112,7 @@ class BidiClient } } - ClientContext* cli_ctx_; + std::unique_ptr<ClientContext> cli_ctx_; benchmark::State* state_; EchoTestService::Stub* stub_; EchoRequest* request_; @@ -120,9 +120,9 @@ class BidiClient int writes_complete_{0}; int msgs_to_send_; int msgs_size_; - std::mutex* mu_; - std::condition_variable* cv_; - bool* done_; + std::mutex mu; + std::condition_variable cv; + bool done; }; } // namespace testing