|
|
|
@ -60,6 +60,25 @@ typedef struct { |
|
|
|
|
void *tag; |
|
|
|
|
} plucker; |
|
|
|
|
|
|
|
|
|
/* Queue that holds the cq_completion_events. This internally uses gpr_mpscq
|
|
|
|
|
* queue (a lockfree multiproducer single consumer queue). However this queue |
|
|
|
|
* supports multiple consumers too. As such, it uses the queue_mu to serialize |
|
|
|
|
* consumer access (but no locks for producer access). |
|
|
|
|
* |
|
|
|
|
* Currently this is only used in completion queues whose completion_type is |
|
|
|
|
* GRPC_CQ_NEXT */ |
|
|
|
|
typedef struct grpc_cq_event_queue { |
|
|
|
|
/* Mutex to serialize consumers i.e pop() operations */ |
|
|
|
|
gpr_mu queue_mu; |
|
|
|
|
|
|
|
|
|
gpr_mpscq queue; |
|
|
|
|
|
|
|
|
|
/* A lazy counter indicating the number of items in the queue. This is NOT
|
|
|
|
|
atomically incremented/decrements along with push/pop operations and hence |
|
|
|
|
only eventually consistent */ |
|
|
|
|
gpr_atm num_queue_items; |
|
|
|
|
} grpc_cq_event_queue; |
|
|
|
|
|
|
|
|
|
/* Completion queue structure */ |
|
|
|
|
struct grpc_completion_queue { |
|
|
|
|
/** Owned by pollset */ |
|
|
|
@ -68,24 +87,14 @@ struct grpc_completion_queue { |
|
|
|
|
grpc_cq_completion_type completion_type; |
|
|
|
|
grpc_cq_polling_type polling_type; |
|
|
|
|
|
|
|
|
|
/** TODO: sreek - We should be moving the 'completed events' to a different
|
|
|
|
|
* structure (co-allocated with cq) which can change depending on the type |
|
|
|
|
* of completion queue. */ |
|
|
|
|
|
|
|
|
|
/** Completed events (Only relevant if the completion_type is NOT
|
|
|
|
|
* GRPC_CQ_NEXT) */ |
|
|
|
|
grpc_cq_completion completed_head; |
|
|
|
|
grpc_cq_completion *completed_tail; |
|
|
|
|
|
|
|
|
|
/** Completed events for completion-queues of type GRPC_CQ_NEXT are stored in
|
|
|
|
|
a lockfree queue multi-producer/single-consumer queue. |
|
|
|
|
|
|
|
|
|
So if the completion queue has more than one thread concurrently calling |
|
|
|
|
grpc_completion_queue_next(), we need a mutex (i.e queue_mu) to serialize |
|
|
|
|
those calls */ |
|
|
|
|
gpr_mu queue_mu; |
|
|
|
|
gpr_mpscq queue; |
|
|
|
|
gpr_atm num_queue_items; /* Count of items in the queue */ |
|
|
|
|
* this queue */ |
|
|
|
|
grpc_cq_event_queue queue; |
|
|
|
|
|
|
|
|
|
/** Number of pending events (+1 if we're not shutdown) */ |
|
|
|
|
gpr_refcount pending_events; |
|
|
|
@ -131,6 +140,39 @@ int grpc_cq_event_timeout_trace; |
|
|
|
|
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, |
|
|
|
|
grpc_error *error); |
|
|
|
|
|
|
|
|
|
static void cq_event_queue_init(grpc_cq_event_queue *q) { |
|
|
|
|
gpr_mpscq_init(&q->queue); |
|
|
|
|
gpr_mu_init(&q->queue_mu); |
|
|
|
|
gpr_atm_no_barrier_store(&q->num_queue_items, 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cq_event_queue_destroy(grpc_cq_event_queue *q) { |
|
|
|
|
gpr_mpscq_destroy(&q->queue); |
|
|
|
|
gpr_mu_destroy(&q->queue_mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { |
|
|
|
|
gpr_mpscq_push(&q->queue, (gpr_mpscq_node *)c); |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) { |
|
|
|
|
gpr_mu_lock(&q->queue_mu); |
|
|
|
|
grpc_cq_completion *c = (grpc_cq_completion *)gpr_mpscq_pop(&q->queue); |
|
|
|
|
gpr_mu_unlock(&q->queue_mu); |
|
|
|
|
if (c) { |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return c; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Note: The counter is not incremented/decremented atomically with push/pop.
|
|
|
|
|
* The count is only eventually consistent */ |
|
|
|
|
static long cq_event_queue_num_items(grpc_cq_event_queue *q) { |
|
|
|
|
return gpr_atm_no_barrier_load(&q->num_queue_items); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_completion_queue *grpc_completion_queue_create_internal( |
|
|
|
|
grpc_cq_completion_type completion_type, |
|
|
|
|
grpc_cq_polling_type polling_type) { |
|
|
|
@ -168,10 +210,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal( |
|
|
|
|
#ifndef NDEBUG |
|
|
|
|
cc->outstanding_tag_count = 0; |
|
|
|
|
#endif |
|
|
|
|
gpr_mpscq_init(&cc->queue); |
|
|
|
|
gpr_mu_init(&cc->queue_mu); |
|
|
|
|
gpr_atm_no_barrier_store(&cc->num_queue_items, 0); |
|
|
|
|
|
|
|
|
|
cq_event_queue_init(&cc->queue); |
|
|
|
|
grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
|
|
|
|
@ -216,7 +255,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { |
|
|
|
|
if (gpr_unref(&cc->owning_refs)) { |
|
|
|
|
GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head); |
|
|
|
|
grpc_pollset_destroy(POLLSET_FROM_CQ(cc)); |
|
|
|
|
gpr_mpscq_destroy(&cc->queue); |
|
|
|
|
cq_event_queue_destroy(&cc->queue); |
|
|
|
|
#ifndef NDEBUG |
|
|
|
|
gpr_free(cc->outstanding_tags); |
|
|
|
|
#endif |
|
|
|
@ -283,9 +322,8 @@ void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, |
|
|
|
|
check_tag_in_cq(cc, tag, true); /* Used in debug builds only */ |
|
|
|
|
|
|
|
|
|
/* Add the completion to the queue */ |
|
|
|
|
gpr_mpscq_push(&cc->queue, (gpr_mpscq_node *)storage); |
|
|
|
|
cq_event_queue_push(&cc->queue, storage); |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&cc->things_queued_ever, 1); |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&cc->num_queue_items, 1); |
|
|
|
|
|
|
|
|
|
int shutdown = gpr_unref(&cc->pending_events); |
|
|
|
|
if (!shutdown) { |
|
|
|
@ -433,8 +471,10 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { |
|
|
|
|
* might return NULL in some cases even if the queue is not empty; but that |
|
|
|
|
* is ok and doesn't affect correctness. Might effect the tail latencies a |
|
|
|
|
* bit) */ |
|
|
|
|
a->stolen_completion = (grpc_cq_completion *)gpr_mpscq_pop(&cq->queue); |
|
|
|
|
return true; |
|
|
|
|
a->stolen_completion = cq_event_queue_pop(&cq->queue); |
|
|
|
|
if (a->stolen_completion != NULL) { |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return !a->first_loop && |
|
|
|
@ -515,18 +555,13 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&cc->queue_mu); |
|
|
|
|
grpc_cq_completion *c = (grpc_cq_completion *)gpr_mpscq_pop(&cc->queue); |
|
|
|
|
gpr_mu_unlock(&cc->queue_mu); |
|
|
|
|
grpc_cq_completion *c = cq_event_queue_pop(&cc->queue); |
|
|
|
|
|
|
|
|
|
/* TODO: sreek - If c == NULL it means either the queue is empty OR in an
|
|
|
|
|
transient inconsistent state. Consider doing a 0-timeout poll if |
|
|
|
|
(cc->num_queue_items > 0 and c == NULL) so that the thread comes back |
|
|
|
|
quickly from poll to make a second attempt at popping */ |
|
|
|
|
|
|
|
|
|
if (c != NULL) { |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&cc->num_queue_items, -1); |
|
|
|
|
|
|
|
|
|
ret.type = GRPC_OP_COMPLETE; |
|
|
|
|
ret.success = c->next & 1u; |
|
|
|
|
ret.tag = c->tag; |
|
|
|
@ -538,7 +573,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, |
|
|
|
|
/* Before returning, check if the queue has any items left over (since
|
|
|
|
|
gpr_mpscq_pop() can sometimes return NULL even if the queue is not |
|
|
|
|
empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ |
|
|
|
|
if (gpr_atm_no_barrier_load(&cc->num_queue_items) > 0) { |
|
|
|
|
if (cq_event_queue_num_items(&cc->queue) > 0) { |
|
|
|
|
/* Go to the beginning of the loop. No point doing a poll because
|
|
|
|
|
(cc->shutdown == true) is only possible when there is no pending work |
|
|
|
|
(i.e cc->pending_events == 0) and any outstanding grpc_cq_completion |
|
|
|
@ -822,7 +857,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) { |
|
|
|
|
grpc_completion_queue_shutdown(cc); |
|
|
|
|
|
|
|
|
|
if (cc->completion_type == GRPC_CQ_NEXT) { |
|
|
|
|
GPR_ASSERT(gpr_atm_no_barrier_load(&cc->num_queue_items) == 0); |
|
|
|
|
GPR_ASSERT(cq_event_queue_num_items(&cc->queue) == 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GRPC_CQ_INTERNAL_UNREF(cc, "destroy"); |
|
|
|
|