Merge pull request #10531 from sreecha/bm_cq_asan

Fix asan bug in bm_cq_multiple_threads
pull/10542/head
Sree Kuchibhotla 8 years ago committed by GitHub
commit 921b896873
  1. 46
      test/cpp/microbenchmarks/bm_cq_multiple_threads.cc

@ -36,6 +36,8 @@
#include <atomic> #include <atomic>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "test/cpp/microbenchmarks/helpers.h" #include "test/cpp/microbenchmarks/helpers.h"
extern "C" { extern "C" {
@ -51,13 +53,10 @@ struct grpc_pollset {
namespace grpc { namespace grpc {
namespace testing { namespace testing {
static void* make_tag(int i) { return (void*)(intptr_t)i; } static void* g_tag = (void*)(intptr_t)10; // Some random number
static grpc_completion_queue* g_cq; static grpc_completion_queue* g_cq;
static grpc_event_engine_vtable g_vtable; static grpc_event_engine_vtable g_vtable;
static __thread int g_thread_idx;
static __thread grpc_cq_completion g_cq_completion;
static void pollset_shutdown(grpc_exec_ctx* exec_ctx, grpc_pollset* ps, static void pollset_shutdown(grpc_exec_ctx* exec_ctx, grpc_pollset* ps,
grpc_closure* closure) { grpc_closure* closure) {
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
@ -76,15 +75,18 @@ static grpc_error* pollset_kick(grpc_pollset* p, grpc_pollset_worker* worker) {
/* Callback when the tag is dequeued from the completion queue. Does nothing */ /* Callback when the tag is dequeued from the completion queue. Does nothing */
static void cq_done_cb(grpc_exec_ctx* exec_ctx, void* done_arg, static void cq_done_cb(grpc_exec_ctx* exec_ctx, void* done_arg,
grpc_cq_completion* cq_completion) {} grpc_cq_completion* cq_completion) {
gpr_free(cq_completion);
}
/* Queues a completion tag. ZERO polling overhead */ /* Queues a completion tag. ZERO polling overhead */
static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* ps, static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* ps,
grpc_pollset_worker** worker, gpr_timespec now, grpc_pollset_worker** worker, gpr_timespec now,
gpr_timespec deadline) { gpr_timespec deadline) {
gpr_mu_unlock(&ps->mu); gpr_mu_unlock(&ps->mu);
grpc_cq_end_op(exec_ctx, g_cq, make_tag(g_thread_idx), GRPC_ERROR_NONE, grpc_cq_begin_op(g_cq, g_tag);
cq_done_cb, NULL, &g_cq_completion); grpc_cq_end_op(exec_ctx, g_cq, g_tag, GRPC_ERROR_NONE, cq_done_cb, NULL,
(grpc_cq_completion*)gpr_malloc(sizeof(grpc_cq_completion)));
grpc_exec_ctx_flush(exec_ctx); grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&ps->mu); gpr_mu_lock(&ps->mu);
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
@ -109,26 +111,42 @@ static void setup() {
g_cq = grpc_completion_queue_create(NULL); g_cq = grpc_completion_queue_create(NULL);
} }
static void teardown() {
grpc_completion_queue_shutdown(g_cq);
grpc_completion_queue_destroy(g_cq);
}
/* A few notes about Multi-threaded benchmarks:
Setup:
The benchmark framework ensures that none of the threads proceed beyond the
state.KeepRunning() call unless all the threads have called state.keepRunning
atleast once. So it is safe to do the initialization in one of the threads
before state.KeepRunning() is called.
Teardown:
The benchmark framework also ensures that no thread is running the benchmark
code (i.e the code between two successive calls of state.KeepRunning()) if
state.KeepRunning() returns false. So it is safe to do the teardown in one
of the threads after state.keepRunning() returns false.
*/
static void BM_Cq_Throughput(benchmark::State& state) { static void BM_Cq_Throughput(benchmark::State& state) {
TrackCounters track_counters; TrackCounters track_counters;
gpr_timespec deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
if (state.thread_index == 0) { if (state.thread_index == 0) {
setup(); setup();
} }
while (state.KeepRunning()) { while (state.KeepRunning()) {
g_thread_idx = state.thread_index; GPR_ASSERT(grpc_completion_queue_next(g_cq, deadline, NULL).type ==
void* dummy_tag = make_tag(g_thread_idx); GRPC_OP_COMPLETE);
grpc_cq_begin_op(g_cq, dummy_tag);
grpc_completion_queue_next(g_cq, deadline, NULL);
} }
state.SetItemsProcessed(state.iterations()); state.SetItemsProcessed(state.iterations());
if (state.thread_index == 0) { if (state.thread_index == 0) {
grpc_completion_queue_shutdown(g_cq); teardown();
grpc_completion_queue_destroy(g_cq);
} }
track_counters.Finish(state); track_counters.Finish(state);

Loading…
Cancel
Save