Merge pull request #23653 from vjpai/bm_cq

Properly follow callback CQ API guarantees rather than existing behavior in ubm
pull/23727/head
Vijay Pai 4 years ago committed by GitHub
commit cd20a80a1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 67
      test/cpp/microbenchmarks/bm_cq.cc

@ -69,6 +69,11 @@ BENCHMARK(BM_CreateDestroyCore);
static void DoneWithCompletionOnStack(void* /*arg*/,
grpc_cq_completion* /*completion*/) {}
static void DoneWithCompletionOnHeap(void* /*arg*/,
grpc_cq_completion* completion) {
delete completion;
}
class DummyTag final : public internal::CompletionQueueTag {
public:
bool FinalizeResult(void** /*tag*/, bool* /*status*/) override {
@ -205,8 +210,20 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) {
gpr_cv_init(&shutdown_cv);
bool got_shutdown = false;
ShutdownCallback shutdown_cb(&got_shutdown);
grpc_completion_queue* cc =
grpc_completion_queue_create_for_callback(&shutdown_cb, nullptr);
// This test with stack-allocated completions only works for non-polling or
// EM-polling callback core CQs because otherwise the callback could execute
// on another thread after the stack objects here go out of scope. An
// alternative would be to synchronize between the benchmark loop and the
// callback, but then it would be measuring the overhead of synchronization
// rather than the overhead of the completion queue.
// For generality, test here with non-polling.
grpc_completion_queue_attributes attr;
attr.version = 2;
attr.cq_completion_type = GRPC_CQ_CALLBACK;
attr.cq_polling_type = GRPC_CQ_NON_POLLING;
attr.cq_shutdown_cb = &shutdown_cb;
grpc_completion_queue* cc = grpc_completion_queue_create(
grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
for (auto _ : state) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
@ -240,7 +257,53 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) {
gpr_cv_destroy(&shutdown_cv);
gpr_mu_destroy(&shutdown_mu);
}
static void BM_Callback_CQ_Pass1CoreHeapCompletion(benchmark::State& state) {
TrackCounters track_counters;
int iteration = 0, current_iterations = 0;
TagCallback tag_cb(&iteration);
gpr_mu_init(&mu);
gpr_cv_init(&cv);
gpr_mu_init(&shutdown_mu);
gpr_cv_init(&shutdown_cv);
bool got_shutdown = false;
ShutdownCallback shutdown_cb(&got_shutdown);
grpc_completion_queue* cc =
grpc_completion_queue_create_for_callback(&shutdown_cb, nullptr);
for (auto _ : state) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_cq_completion* completion = new grpc_cq_completion;
GPR_ASSERT(grpc_cq_begin_op(cc, &tag_cb));
grpc_cq_end_op(cc, &tag_cb, GRPC_ERROR_NONE, DoneWithCompletionOnHeap,
nullptr, completion);
}
shutdown_and_destroy(cc);
gpr_mu_lock(&mu);
current_iterations = static_cast<int>(state.iterations());
while (current_iterations != iteration) {
// Wait for all the callbacks to complete.
gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(&mu);
gpr_mu_lock(&shutdown_mu);
while (!got_shutdown) {
// Wait for the shutdown callback to complete.
gpr_cv_wait(&shutdown_cv, &shutdown_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(&shutdown_mu);
GPR_ASSERT(got_shutdown);
GPR_ASSERT(iteration == static_cast<int>(state.iterations()));
track_counters.Finish(state);
gpr_cv_destroy(&cv);
gpr_mu_destroy(&mu);
gpr_cv_destroy(&shutdown_cv);
gpr_mu_destroy(&shutdown_mu);
}
BENCHMARK(BM_Callback_CQ_Pass1Core);
BENCHMARK(BM_Callback_CQ_Pass1CoreHeapCompletion);
} // namespace testing
} // namespace grpc

Loading…
Cancel
Save