From 968e1b40a540db1d66e9cb49cad273b0c64ffd4e Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 28 Jul 2020 12:42:20 -0700 Subject: [PATCH] Properly follow callback API guarantees rather than existing behavior --- test/cpp/microbenchmarks/bm_cq.cc | 62 ++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index c53eb2b9413..1640c4d52f0 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -69,6 +69,11 @@ BENCHMARK(BM_CreateDestroyCore); static void DoneWithCompletionOnStack(void* /*arg*/, grpc_cq_completion* /*completion*/) {} +static void DoneWithCompletionOnHeap(void* /*arg*/, + grpc_cq_completion* completion) { + delete completion; +} + class DummyTag final : public internal::CompletionQueueTag { public: bool FinalizeResult(void** /*tag*/, bool* /*status*/) override { @@ -205,8 +210,15 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { gpr_cv_init(&shutdown_cv); bool got_shutdown = false; ShutdownCallback shutdown_cb(&got_shutdown); - grpc_completion_queue* cc = - grpc_completion_queue_create_for_callback(&shutdown_cb, nullptr); + // This test with stack-allocated completions only works for non-polling or + // EM-polling callback core CQs. For generality, test with non-polling. + grpc_completion_queue_attributes attr; + attr.version = 2; + attr.cq_completion_type = GRPC_CQ_CALLBACK; + attr.cq_polling_type = GRPC_CQ_NON_POLLING; + attr.cq_shutdown_cb = &shutdown_cb; + grpc_completion_queue* cc = grpc_completion_queue_create( + grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); for (auto _ : state) { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; @@ -240,7 +252,53 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { gpr_cv_destroy(&shutdown_cv); gpr_mu_destroy(&shutdown_mu); } +static void BM_Callback_CQ_Pass1CoreHeapCompletion(benchmark::State& state) { + TrackCounters track_counters; + int iteration = 0, current_iterations = 0; + TagCallback tag_cb(&iteration); + gpr_mu_init(&mu); + gpr_cv_init(&cv); + gpr_mu_init(&shutdown_mu); + gpr_cv_init(&shutdown_cv); + bool got_shutdown = false; + ShutdownCallback shutdown_cb(&got_shutdown); + grpc_completion_queue* cc = + grpc_completion_queue_create_for_callback(&shutdown_cb, nullptr); + for (auto _ : state) { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + grpc_core::ExecCtx exec_ctx; + grpc_cq_completion* completion = new grpc_cq_completion; + GPR_ASSERT(grpc_cq_begin_op(cc, &tag_cb)); + grpc_cq_end_op(cc, &tag_cb, GRPC_ERROR_NONE, DoneWithCompletionOnHeap, + nullptr, completion); + } + shutdown_and_destroy(cc); + + gpr_mu_lock(&mu); + current_iterations = static_cast(state.iterations()); + while (current_iterations != iteration) { + // Wait for all the callbacks to complete. + gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&mu); + + gpr_mu_lock(&shutdown_mu); + while (!got_shutdown) { + // Wait for the shutdown callback to complete. + gpr_cv_wait(&shutdown_cv, &shutdown_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&shutdown_mu); + + GPR_ASSERT(got_shutdown); + GPR_ASSERT(iteration == static_cast(state.iterations())); + track_counters.Finish(state); + gpr_cv_destroy(&cv); + gpr_mu_destroy(&mu); + gpr_cv_destroy(&shutdown_cv); + gpr_mu_destroy(&shutdown_mu); +} BENCHMARK(BM_Callback_CQ_Pass1Core); +BENCHMARK(BM_Callback_CQ_Pass1CoreHeapCompletion); } // namespace testing } // namespace grpc