|
|
|
@ -444,7 +444,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal( |
|
|
|
|
static void cq_init_next(void *ptr) { |
|
|
|
|
cq_next_data *cqd = (cq_next_data *)ptr; |
|
|
|
|
/* Initial count is dropped by grpc_completion_queue_shutdown */ |
|
|
|
|
gpr_atm_no_barrier_store(&cqd->pending_events, 1); |
|
|
|
|
gpr_atm_rel_store(&cqd->pending_events, 1); |
|
|
|
|
cqd->shutdown_called = false; |
|
|
|
|
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); |
|
|
|
|
cq_event_queue_init(&cqd->queue); |
|
|
|
@ -459,7 +459,7 @@ static void cq_destroy_next(void *ptr) { |
|
|
|
|
static void cq_init_pluck(void *ptr) { |
|
|
|
|
cq_pluck_data *cqd = (cq_pluck_data *)ptr; |
|
|
|
|
/* Initial count is dropped by grpc_completion_queue_shutdown */ |
|
|
|
|
gpr_atm_no_barrier_store(&cqd->pending_events, 1); |
|
|
|
|
gpr_atm_rel_store(&cqd->pending_events, 1); |
|
|
|
|
cqd->completed_tail = &cqd->completed_head; |
|
|
|
|
cqd->completed_head.next = (uintptr_t)cqd->completed_tail; |
|
|
|
|
gpr_atm_no_barrier_store(&cqd->shutdown, 0); |
|
|
|
@ -560,13 +560,13 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {} |
|
|
|
|
* true if the increment was successful; false if the counter is zero */ |
|
|
|
|
static bool atm_inc_if_nonzero(gpr_atm *counter) { |
|
|
|
|
while (true) { |
|
|
|
|
gpr_atm count = gpr_atm_no_barrier_load(counter); |
|
|
|
|
gpr_atm count = gpr_atm_acq_load(counter); |
|
|
|
|
/* If zero, we are done. If not, we must to a CAS (instead of an atomic
|
|
|
|
|
* increment) to maintain the contract: do not increment the counter if it |
|
|
|
|
* is zero. */ |
|
|
|
|
if (count == 0) { |
|
|
|
|
return false; |
|
|
|
|
} else if (gpr_atm_no_barrier_cas(counter, count, count + 1)) { |
|
|
|
|
} else if (gpr_atm_full_cas(counter, count, count + 1)) { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -638,8 +638,7 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, |
|
|
|
|
/* Add the completion to the queue */ |
|
|
|
|
bool is_first = cq_event_queue_push(&cqd->queue, storage); |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); |
|
|
|
|
bool will_definitely_shutdown = |
|
|
|
|
gpr_atm_no_barrier_load(&cqd->pending_events) == 1; |
|
|
|
|
bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1; |
|
|
|
|
|
|
|
|
|
if (!will_definitely_shutdown) { |
|
|
|
|
/* Only kick if this is the first item queued */ |
|
|
|
@ -883,7 +882,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (gpr_atm_no_barrier_load(&cqd->pending_events) == 0) { |
|
|
|
|
if (gpr_atm_acq_load(&cqd->pending_events) == 0) { |
|
|
|
|
/* Before returning, check if the queue has any items left over (since
|
|
|
|
|
gpr_mpscq_pop() can sometimes return NULL even if the queue is not |
|
|
|
|
empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ |
|
|
|
@ -929,7 +928,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (cq_event_queue_num_items(&cqd->queue) > 0 && |
|
|
|
|
gpr_atm_no_barrier_load(&cqd->pending_events) > 0) { |
|
|
|
|
gpr_atm_acq_load(&cqd->pending_events) > 0) { |
|
|
|
|
gpr_mu_lock(cq->mu); |
|
|
|
|
cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); |
|
|
|
|
gpr_mu_unlock(cq->mu); |
|
|
|
@ -956,7 +955,7 @@ static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, |
|
|
|
|
cq_next_data *cqd = DATA_FROM_CQ(cq); |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(cqd->shutdown_called); |
|
|
|
|
GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0); |
|
|
|
|
GPR_ASSERT(gpr_atm_acq_load(&cqd->pending_events) == 0); |
|
|
|
|
|
|
|
|
|
cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), |
|
|
|
|
&cq->pollset_shutdown_done); |
|
|
|
|