|
|
|
@ -449,7 +449,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal( |
|
|
|
|
static void cq_init_next(void *ptr) { |
|
|
|
|
cq_next_data *cqd = (cq_next_data *)ptr; |
|
|
|
|
/* Initial count is dropped by grpc_completion_queue_shutdown */ |
|
|
|
|
gpr_atm_rel_store(&cqd->pending_events, 1); |
|
|
|
|
gpr_atm_no_barrier_store(&cqd->pending_events, 1); |
|
|
|
|
cqd->shutdown_called = false; |
|
|
|
|
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); |
|
|
|
|
cq_event_queue_init(&cqd->queue); |
|
|
|
@ -464,7 +464,7 @@ static void cq_destroy_next(void *ptr) { |
|
|
|
|
static void cq_init_pluck(void *ptr) { |
|
|
|
|
cq_pluck_data *cqd = (cq_pluck_data *)ptr; |
|
|
|
|
/* Initial count is dropped by grpc_completion_queue_shutdown */ |
|
|
|
|
gpr_atm_rel_store(&cqd->pending_events, 1); |
|
|
|
|
gpr_atm_no_barrier_store(&cqd->pending_events, 1); |
|
|
|
|
cqd->completed_tail = &cqd->completed_head; |
|
|
|
|
cqd->completed_head.next = (uintptr_t)cqd->completed_tail; |
|
|
|
|
gpr_atm_no_barrier_store(&cqd->shutdown, 0); |
|
|
|
@ -643,6 +643,11 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, |
|
|
|
|
/* Add the completion to the queue */ |
|
|
|
|
bool is_first = cq_event_queue_push(&cqd->queue, storage); |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); |
|
|
|
|
|
|
|
|
|
/* Since we do not hold the cq lock here, it is important to do an 'acquire'
|
|
|
|
|
load here (instead of a 'no_barrier' load) to match with the release store |
|
|
|
|
(done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next |
|
|
|
|
*/ |
|
|
|
|
bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1; |
|
|
|
|
|
|
|
|
|
if (!will_definitely_shutdown) { |
|
|
|
|