diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index 6ab4b083c13..edbff9c2be3 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -150,6 +150,9 @@ static void shutdown_and_destroy(grpc_completion_queue* cc) { grpc_completion_queue_destroy(cc); } +static gpr_mu shutdown_mu, mu; +static gpr_cv shutdown_cv, cv; + // Tag completion queue iterate times class TagCallback : public grpc_experimental_completion_queue_functor { public: @@ -158,17 +161,17 @@ class TagCallback : public grpc_experimental_completion_queue_functor { } ~TagCallback() {} static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { + gpr_mu_lock(&mu); GPR_ASSERT(static_cast<bool>(ok)); *static_cast<TagCallback*>(cb)->iter_ += 1; + gpr_cv_signal(&cv); + gpr_mu_unlock(&mu); }; private: int* iter_; }; -static gpr_mu shutdown_mu; -static gpr_cv shutdown_cv; - // Check if completion queue is shut down class ShutdownCallback : public grpc_experimental_completion_queue_functor { public: @@ -189,8 +192,10 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { TrackCounters track_counters; - int iteration = 0; + int iteration = 0, current_iterations = 0; TagCallback tag_cb(&iteration); + gpr_mu_init(&mu); + gpr_cv_init(&cv); gpr_mu_init(&shutdown_mu); gpr_cv_init(&shutdown_cv); bool got_shutdown = false; @@ -207,15 +212,26 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { } shutdown_and_destroy(cc); + gpr_mu_lock(&mu); + current_iterations = static_cast<int>(state.iterations()); + while (current_iterations != iteration) { + // Wait for all the callbacks to complete. + gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&mu); + gpr_mu_lock(&shutdown_mu); while (!got_shutdown) { // Wait for the shutdown callback to complete. gpr_cv_wait(&shutdown_cv, &shutdown_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } gpr_mu_unlock(&shutdown_mu); + GPR_ASSERT(got_shutdown); GPR_ASSERT(iteration == static_cast<int>(state.iterations())); track_counters.Finish(state); + gpr_cv_destroy(&cv); + gpr_mu_destroy(&mu); gpr_cv_destroy(&shutdown_cv); gpr_mu_destroy(&shutdown_mu); }