From 8e3684518d1f320d4cc0c10ac25bb17bd7181012 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 25 Apr 2017 15:45:02 -0700 Subject: [PATCH] Fix Tsan failures --- src/core/lib/surface/completion_queue.c | 48 ++++++++++++++----------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 11f2e09003b..266af30cfff 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -496,6 +496,10 @@ void check_tag_in_cq(grpc_completion_queue *cc, void *tag, bool lock_cq) { void check_tag_in_cq(grpc_completion_queue *cc, void *tag, bool lock_cq) {} #endif +/* Forward declaration */ +static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc); + /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion * type of GRPC_CQ_NEXT) */ void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, @@ -533,9 +537,9 @@ void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); int shutdown = gpr_unref(&cqd->pending_events); - if (!shutdown) { - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cqd->mu); + if (!shutdown) { grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), NULL); gpr_mu_unlock(cqd->mu); @@ -546,14 +550,7 @@ void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GRPC_ERROR_UNREF(kick_error); } } else { - GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); - GPR_ASSERT(cqd->shutdown_called); - - gpr_atm_no_barrier_store(&cqd->shutdown, 1); - - gpr_mu_lock(cqd->mu); - cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), - &cqd->pollset_shutdown_done); + cq_finish_shutdown(exec_ctx, cc); gpr_mu_unlock(cqd->mu); } @@ -624,11 +621,7 @@ void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(kick_error); } } else { - GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); - GPR_ASSERT(cqd->shutdown_called); - gpr_atm_no_barrier_store(&cqd->shutdown, 1); - cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), - &cqd->pollset_shutdown_done); + cq_finish_shutdown(exec_ctx, cc); gpr_mu_unlock(cqd->mu); } @@ -959,7 +952,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, } prev = c; } - if (cqd->shutdown) { + if (gpr_atm_no_barrier_load(&cqd->shutdown)) { gpr_mu_unlock(cqd->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; @@ -1026,6 +1019,24 @@ done: return ret; } +/* Finishes the completion queue shutdown. This means that there are no more + completion events / tags expected from the completion queue + - Must be called under completion queue lock + - Must be called only once in completion queue's lifetime + - grpc_completion_queue_shutdown() MUST have been called before calling this + function */ +static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc) { + cq_data *cqd = &cc->data; + + GPR_ASSERT(cqd->shutdown_called); + GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); + gpr_atm_no_barrier_store(&cqd->shutdown, 1); + + cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), + &cqd->pollset_shutdown_done); +} + /* Shutdown simply drops a ref that we reserved at creation time; if we drop to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { @@ -1042,10 +1053,7 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { } cqd->shutdown_called = 1; if (gpr_unref(&cqd->pending_events)) { - GPR_ASSERT(!cqd->shutdown); - cqd->shutdown = 1; - cc->poller_vtable->shutdown(&exec_ctx, POLLSET_FROM_CQ(cc), - &cqd->pollset_shutdown_done); + cq_finish_shutdown(&exec_ctx, cc); } gpr_mu_unlock(cqd->mu); grpc_exec_ctx_finish(&exec_ctx);