From 078a340db8b756d6381d6a9e09740bc3979d941d Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 11 Apr 2017 21:23:03 -0700 Subject: [PATCH] fix bugs and refactor code --- src/core/lib/surface/completion_queue.c | 93 +++++++++++++++++-------- 1 file changed, 64 insertions(+), 29 deletions(-) diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index b1a0fa6c4ab..c3c8a92d187 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -60,6 +60,25 @@ typedef struct { void *tag; } plucker; +/* Queue that holds the cq_completion_events. This internally uses gpr_mpscq + * queue (a lockfree multiproducer single consumer queue). However this queue + * supports multiple consumers too. As such, it uses the queue_mu to serialize + * consumer access (but no locks for producer access). + * + * Currently this is only used in completion queues whose completion_type is + * GRPC_CQ_NEXT */ +typedef struct grpc_cq_event_queue { + /* Mutex to serialize consumers i.e pop() operations */ + gpr_mu queue_mu; + + gpr_mpscq queue; + + /* A lazy counter indicating the number of items in the queue. This is NOT + atomically incremented/decrements along with push/pop operations and hence + only eventually consistent */ + gpr_atm num_queue_items; +} grpc_cq_event_queue; + /* Completion queue structure */ struct grpc_completion_queue { /** Owned by pollset */ @@ -68,24 +87,14 @@ struct grpc_completion_queue { grpc_cq_completion_type completion_type; grpc_cq_polling_type polling_type; - /** TODO: sreek - We should be moving the 'completed events' to a different - * structure (co-allocated with cq) which can change depending on the type - * of completion queue. */ - /** Completed events (Only relevant if the completion_type is NOT * GRPC_CQ_NEXT) */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; /** Completed events for completion-queues of type GRPC_CQ_NEXT are stored in - a lockfree queue multi-producer/single-consumer queue. - - So if the completion queue has more than one thread concurrently calling - grpc_completion_queue_next(), we need a mutex (i.e queue_mu) to serialize - those calls */ - gpr_mu queue_mu; - gpr_mpscq queue; - gpr_atm num_queue_items; /* Count of items in the queue */ + * this queue */ + grpc_cq_event_queue queue; /** Number of pending events (+1 if we're not shutdown) */ gpr_refcount pending_events; @@ -131,6 +140,39 @@ int grpc_cq_event_timeout_trace; static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, grpc_error *error); +static void cq_event_queue_init(grpc_cq_event_queue *q) { + gpr_mpscq_init(&q->queue); + gpr_mu_init(&q->queue_mu); + gpr_atm_no_barrier_store(&q->num_queue_items, 0); +} + +static void cq_event_queue_destroy(grpc_cq_event_queue *q) { + gpr_mpscq_destroy(&q->queue); + gpr_mu_destroy(&q->queue_mu); +} + +static void cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { + gpr_mpscq_push(&q->queue, (gpr_mpscq_node *)c); + gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1); +} + +static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) { + gpr_mu_lock(&q->queue_mu); + grpc_cq_completion *c = (grpc_cq_completion *)gpr_mpscq_pop(&q->queue); + gpr_mu_unlock(&q->queue_mu); + if (c) { + gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1); + } + + return c; +} + +/* Note: The counter is not incremented/decremented atomically with push/pop. + * The count is only eventually consistent */ +static long cq_event_queue_num_items(grpc_cq_event_queue *q) { + return gpr_atm_no_barrier_load(&q->num_queue_items); +} + grpc_completion_queue *grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type) { @@ -168,10 +210,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal( #ifndef NDEBUG cc->outstanding_tag_count = 0; #endif - gpr_mpscq_init(&cc->queue); - gpr_mu_init(&cc->queue_mu); - gpr_atm_no_barrier_store(&cc->num_queue_items, 0); - + cq_event_queue_init(&cc->queue); grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc, grpc_schedule_on_exec_ctx); @@ -216,7 +255,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { if (gpr_unref(&cc->owning_refs)) { GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head); grpc_pollset_destroy(POLLSET_FROM_CQ(cc)); - gpr_mpscq_destroy(&cc->queue); + cq_event_queue_destroy(&cc->queue); #ifndef NDEBUG gpr_free(cc->outstanding_tags); #endif @@ -283,9 +322,8 @@ void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, check_tag_in_cq(cc, tag, true); /* Used in debug builds only */ /* Add the completion to the queue */ - gpr_mpscq_push(&cc->queue, (gpr_mpscq_node *)storage); + cq_event_queue_push(&cc->queue, storage); gpr_atm_no_barrier_fetch_add(&cc->things_queued_ever, 1); - gpr_atm_no_barrier_fetch_add(&cc->num_queue_items, 1); int shutdown = gpr_unref(&cc->pending_events); if (!shutdown) { @@ -433,8 +471,10 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { * might return NULL in some cases even if the queue is not empty; but that * is ok and doesn't affect correctness. Might effect the tail latencies a * bit) */ - a->stolen_completion = (grpc_cq_completion *)gpr_mpscq_pop(&cq->queue); - return true; + a->stolen_completion = cq_event_queue_pop(&cq->queue); + if (a->stolen_completion != NULL) { + return true; + } } return !a->first_loop && @@ -515,18 +555,13 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, break; } - gpr_mu_lock(&cc->queue_mu); - grpc_cq_completion *c = (grpc_cq_completion *)gpr_mpscq_pop(&cc->queue); - gpr_mu_unlock(&cc->queue_mu); + grpc_cq_completion *c = cq_event_queue_pop(&cc->queue); /* TODO: sreek - If c == NULL it means either the queue is empty OR in an transient inconsistent state. Consider doing a 0-timeout poll if (cc->num_queue_items > 0 and c == NULL) so that the thread comes back quickly from poll to make a second attempt at popping */ - if (c != NULL) { - gpr_atm_no_barrier_fetch_add(&cc->num_queue_items, -1); - ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -538,7 +573,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, /* Before returning, check if the queue has any items left over (since gpr_mpscq_pop() can sometimes return NULL even if the queue is not empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ - if (gpr_atm_no_barrier_load(&cc->num_queue_items) > 0) { + if (cq_event_queue_num_items(&cc->queue) > 0) { /* Go to the beginning of the loop. No point doing a poll because (cc->shutdown == true) is only possible when there is no pending work (i.e cc->pending_events == 0) and any outstanding grpc_cq_completion @@ -822,7 +857,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) { grpc_completion_queue_shutdown(cc); if (cc->completion_type == GRPC_CQ_NEXT) { - GPR_ASSERT(gpr_atm_no_barrier_load(&cc->num_queue_items) == 0); + GPR_ASSERT(cq_event_queue_num_items(&cc->queue) == 0); } GRPC_CQ_INTERNAL_UNREF(cc, "destroy");