Fix tsan failures in cq-pluck and cleanup cq code

pull/12054/head
Sree Kuchibhotla 8 years ago
parent 04034515e2
commit cfce451785
  1. 76
      src/core/lib/surface/completion_queue.c

@ -235,7 +235,8 @@ typedef struct cq_next_data {
/* Number of outstanding events (+1 if not shut down) */
gpr_atm pending_events;
int shutdown_called;
/** 0 initially. 1 once we initiated shutdown */
bool shutdown_called;
} cq_next_data;
typedef struct cq_pluck_data {
@ -244,15 +245,20 @@ typedef struct cq_pluck_data {
grpc_cq_completion *completed_tail;
/** Number of pending events (+1 if we're not shutdown) */
gpr_refcount pending_events;
gpr_atm pending_events;
/** Counter of how many things have ever been queued on this completion queue
useful for avoiding locks to check the queue */
gpr_atm things_queued_ever;
/** 0 initially, 1 once we've begun shutting down */
/** 0 initially. 1 once we completed shutting */
/* TODO: (sreek) This is not needed since (shutdown == 1) if and only if
* (pending_events == 0). So consider removing this in future and use
* pending_events */
gpr_atm shutdown;
int shutdown_called;
/** 0 initially. 1 once we initiated shutdown */
bool shutdown_called;
int num_pluckers;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
@ -436,7 +442,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
static void cq_init_next(void *ptr) {
cq_next_data *cqd = ptr;
/* Initial ref is dropped by grpc_completion_queue_shutdown */
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->shutdown_called = false;
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
@ -451,12 +457,12 @@ static void cq_destroy_next(void *ptr) {
static void cq_init_pluck(void *ptr) {
cq_pluck_data *cqd = ptr;
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cqd->pending_events, 1);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->completed_tail = &cqd->completed_head;
cqd->completed_head.next = (uintptr_t)cqd->completed_tail;
gpr_atm_no_barrier_store(&cqd->shutdown, 0);
cqd->shutdown_called = 0;
cqd->shutdown_called = false;
cqd->num_pluckers = 0;
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
}
@ -549,24 +555,32 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
#endif
static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
cq_next_data *cqd = DATA_FROM_CQ(cq);
/* Atomically increments a counter only if the counter is not zero. Returns
* true if the increment was successful; false if the counter is zero */
static bool atm_inc_if_nonzero(gpr_atm *counter) {
while (true) {
gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events);
gpr_atm count = gpr_atm_no_barrier_load(counter);
/* If zero, we are done. If not, we must to a CAS (instead of an atomic
* increment) to maintain the contract: do not increment the counter if it
* is zero. */
if (count == 0) {
return false;
} else if (gpr_atm_no_barrier_cas(&cqd->pending_events, count, count + 1)) {
} else if (gpr_atm_no_barrier_cas(counter, count, count + 1)) {
break;
}
}
return true;
}
static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
cq_next_data *cqd = DATA_FROM_CQ(cq);
return atm_inc_if_nonzero(&cqd->pending_events);
}
static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
cq_pluck_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(!cqd->shutdown_called);
gpr_ref(&cqd->pending_events);
return true;
return atm_inc_if_nonzero(&cqd->pending_events);
}
bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
@ -704,8 +718,10 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
((uintptr_t)storage) | (1u & (uintptr_t)cqd->completed_tail->next);
cqd->completed_tail = storage;
int shutdown = gpr_unref(&cqd->pending_events);
if (!shutdown) {
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
cq_finish_shutdown_pluck(exec_ctx, cq);
gpr_mu_unlock(cq->mu);
} else {
grpc_pollset_worker *pluck_worker = NULL;
for (int i = 0; i < cqd->num_pluckers; i++) {
if (cqd->pluckers[i].tag == tag) {
@ -725,9 +741,6 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(kick_error);
}
} else {
cq_finish_shutdown_pluck(exec_ctx, cq);
gpr_mu_unlock(cq->mu);
}
GPR_TIMER_END("cq_end_op_for_pluck", 0);
@ -952,6 +965,12 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) {
cq_next_data *cqd = DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
* We call cq_finish_shutdown_next() below, that would call pollset shutdown.
* Pollset shutdown decrements the cq ref count which can potentially destroy
* the cq (if that happens to be the last ref).
* Creating an extra ref here prevents the cq from getting destroyed while
* this function is still active */
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
@ -960,7 +979,7 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
return;
}
cqd->shutdown_called = 1;
cqd->shutdown_called = true;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
cq_finish_shutdown_next(exec_ctx, cq);
}
@ -1172,21 +1191,32 @@ static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
&cq->pollset_shutdown_done);
}
/* NOTE: This function is almost exactly identical to cq_shutdown_next() but
* merging them is a bit tricky and probably not worth it */
static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) {
cq_pluck_data *cqd = DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
* We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
* Pollset shutdown decrements the cq ref count which can potentially destroy
* the cq (if that happens to be the last ref).
* Creating an extra ref here prevents the cq from getting destroyed while
* this function is still active */
GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)");
gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
gpr_mu_unlock(cq->mu);
GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)");
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
return;
}
cqd->shutdown_called = 1;
if (gpr_unref(&cqd->pending_events)) {
cqd->shutdown_called = true;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
cq_finish_shutdown_pluck(exec_ctx, cq);
}
gpr_mu_unlock(cq->mu);
GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)");
}
/* Shutdown simply drops a ref that we reserved at creation time; if we drop

Loading…
Cancel
Save