diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 986851b2af0..712719e9aff 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -62,7 +62,7 @@ typedef struct { /* Completion queue structure */ struct grpc_completion_queue { - /** owned by pollset */ + /** Owned by pollset */ gpr_mu *mu; grpc_cq_completion_type completion_type; @@ -79,17 +79,19 @@ struct grpc_completion_queue { /** Completed events for completion-queues of type GRPC_CQ_NEXT are stored in a lockfree queue multi-producer/single-consumer queue. + So if the completion queue has more than one thread concurrently calling grpc_completion_queue_next(), we need a mutex (i.e queue_mu) to serialize those calls */ gpr_mu queue_mu; gpr_mpscq queue; + gpr_atm num_queue_items; /* Count of items in the queue */ /** Number of pending events (+1 if we're not shutdown) */ gpr_refcount pending_events; /** Once owning_refs drops to zero, we will destroy the cq */ gpr_refcount owning_refs; - /** counter of how many things have ever been queued on this completion queue + /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ gpr_atm things_queued_ever; /** 0 initially, 1 once we've begun shutting down */ @@ -168,6 +170,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal( #endif gpr_mpscq_init(&cc->queue); gpr_mu_init(&cc->queue_mu); + gpr_atm_no_barrier_store(&cc->num_queue_items, 0); grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc, grpc_schedule_on_exec_ctx); @@ -237,106 +240,118 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { gpr_ref(&cc->pending_events); } -void grpc_cq_end_op_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, - grpc_cq_completion *storage) { - /* push completion */ - gpr_mpscq_push(&cc->queue, &storage->node); +#ifndef NDEBUG +void check_tag_in_cq(grpc_completion_queue *cc, void *tag, bool lock_cq) { + int found = 0; + if (lock_cq) { + gpr_mu_lock(cc->mu); + } - int shutdown = gpr_unref(&cc->pending_events); + for (int i = 0; i < (int)cc->outstanding_tag_count; i++) { + if (cc->outstanding_tags[i] == tag) { + cc->outstanding_tag_count--; + GPR_SWAP(void *, cc->outstanding_tags[i], + cc->outstanding_tags[cc->outstanding_tag_count]); + found = 1; + break; + } + } + + if (lock_cq) { + gpr_mu_unlock(cc->mu); + } + + GPR_ASSERT(found); +} +#else +void check_tag_in_cq(grpc_completion_queue *cc, void *tag, bool lock_cq) {} +#endif + +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion + * type of GRPC_CQ_NEXT) */ +void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, + void *tag, grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, + void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage) { + storage->tag = tag; + storage->done = done; + storage->done_arg = done_arg; + storage->next = (uintptr_t)(error == GRPC_ERROR_NONE); + + check_tag_in_cq(cc, tag, true); /* Used in debug builds only */ + + /* Add the completion to the queue */ + gpr_mpscq_push(&cc->queue, (gpr_mpscq_node *)storage); gpr_atm_no_barrier_fetch_add(&cc->things_queued_ever, 1); - gpr_mu_lock(cc->mu); + gpr_atm_no_barrier_fetch_add(&cc->num_queue_items, 1); + + int shutdown = gpr_unref(&cc->pending_events); if (!shutdown) { + gpr_mu_lock(cc->mu); grpc_error *kick_error = grpc_pollset_kick(POLLSET_FROM_CQ(cc), NULL); + gpr_mu_unlock(cc->mu); + if (kick_error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(kick_error); gpr_log(GPR_ERROR, "Kick failed: %s", msg); GRPC_ERROR_UNREF(kick_error); } - } else { GPR_ASSERT(!gpr_atm_no_barrier_load(&cc->shutdown)); GPR_ASSERT(cc->shutdown_called); + gpr_atm_no_barrier_store(&cc->shutdown, 1); + + gpr_mu_lock(cc->mu); grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc), &cc->pollset_shutdown_done); gpr_mu_unlock(cc->mu); } - gpr_mu_unlock(cc->mu); -} - -/* Signal the end of an operation - if this is the last waiting-to-be-queued - event, then enter shutdown mode */ -/* Queue a GRPC_OP_COMPLETED operation */ -void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, - void *tag, grpc_error *error, - void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, - grpc_cq_completion *storage), - void *done_arg, grpc_cq_completion *storage) { - int shutdown; - int i; - grpc_pollset_worker *pluck_worker; -#ifndef NDEBUG - int found = 0; -#endif - GPR_TIMER_BEGIN("grpc_cq_end_op", 0); - if (grpc_api_trace || - (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) { - const char *errmsg = grpc_error_string(error); - GRPC_API_TRACE( - "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%s, done=%p, " - "done_arg=%p, storage=%p)", - 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); - if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); - } - } + GRPC_ERROR_UNREF(error); +} +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion + * type of GRPC_CQ_PLUCK) */ +void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc, void *tag, + grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, + void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage) { storage->tag = tag; storage->done = done; storage->done_arg = done_arg; - if (cc->completion_type == GRPC_CQ_NEXT) { - storage->next = (uintptr_t)(error == GRPC_ERROR_NONE); - } else { - storage->next = ((uintptr_t)&cc->completed_head) | - ((uintptr_t)(error == GRPC_ERROR_NONE)); - } - - if (cc->completion_type == GRPC_CQ_NEXT) { - grpc_cq_end_op_next(exec_ctx, cc, storage); - return; /* EARLY OUT */ - } + storage->next = ((uintptr_t)&cc->completed_head) | + ((uintptr_t)(error == GRPC_ERROR_NONE)); gpr_mu_lock(cc->mu); -#ifndef NDEBUG - for (i = 0; i < (int)cc->outstanding_tag_count; i++) { - if (cc->outstanding_tags[i] == tag) { - cc->outstanding_tag_count--; - GPR_SWAP(void *, cc->outstanding_tags[i], - cc->outstanding_tags[cc->outstanding_tag_count]); - found = 1; - break; - } - } - GPR_ASSERT(found); -#endif - shutdown = gpr_unref(&cc->pending_events); + check_tag_in_cq(cc, tag, false); /* Used in debug builds only */ + + /* Add to the list of completions */ gpr_atm_no_barrier_fetch_add(&cc->things_queued_ever, 1); + cc->completed_tail->next = + ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); + cc->completed_tail = storage; + + int shutdown = gpr_unref(&cc->pending_events); if (!shutdown) { - cc->completed_tail->next = - ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); - cc->completed_tail = storage; - pluck_worker = NULL; - for (i = 0; i < cc->num_pluckers; i++) { + grpc_pollset_worker *pluck_worker = NULL; + for (int i = 0; i < cc->num_pluckers; i++) { if (cc->pluckers[i].tag == tag) { pluck_worker = *cc->pluckers[i].worker; break; } } + grpc_error *kick_error = grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker); gpr_mu_unlock(cc->mu); + if (kick_error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(kick_error); gpr_log(GPR_ERROR, "Kick failed: %s", msg); @@ -344,9 +359,6 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GRPC_ERROR_UNREF(kick_error); } } else { - cc->completed_tail->next = - ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); - cc->completed_tail = storage; GPR_ASSERT(!gpr_atm_no_barrier_load(&cc->shutdown)); GPR_ASSERT(cc->shutdown_called); gpr_atm_no_barrier_store(&cc->shutdown, 1); @@ -355,6 +367,42 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, gpr_mu_unlock(cc->mu); } + GRPC_ERROR_UNREF(error); +} + +/* Signal the end of an operation - if this is the last waiting-to-be-queued + event, then enter shutdown mode */ +/* Queue a GRPC_OP_COMPLETED operation */ +void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, + void *tag, grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage) { + GPR_TIMER_BEGIN("grpc_cq_end_op", 0); + + if (grpc_api_trace || + (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) { + const char *errmsg = grpc_error_string(error); + GRPC_API_TRACE( + "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%s, done=%p, " + "done_arg=%p, storage=%p)", + 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); + if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); + } + } + + /* Call the appropriate function to queue the completion based on the + completion queue type */ + if (cc->completion_type == GRPC_CQ_NEXT) { + grpc_cq_end_op_for_next(exec_ctx, cc, tag, error, done, done_arg, storage); + } else if (cc->completion_type == GRPC_CQ_PLUCK) { + grpc_cq_end_op_for_pluck(exec_ctx, cc, tag, error, done, done_arg, storage); + } else { + gpr_log(GPR_ERROR, "Unexpected completion type %d", cc->completion_type); + abort(); + } + GPR_TIMER_END("grpc_cq_end_op", 0); GRPC_ERROR_UNREF(error); @@ -369,28 +417,25 @@ typedef struct { bool first_loop; } cq_is_finished_arg; -/* TODO (sreek) FIX THIS */ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; GPR_ASSERT(a->stolen_completion == NULL); + gpr_atm current_last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cq->things_queued_ever); + if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { - gpr_mu_lock(cq->mu); a->last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cq->things_queued_ever); - if (cq->completed_tail != &cq->completed_head) { - a->stolen_completion = (grpc_cq_completion *)cq->completed_head.next; - cq->completed_head.next = a->stolen_completion->next & ~(uintptr_t)1; - if (a->stolen_completion == cq->completed_tail) { - cq->completed_tail = &cq->completed_head; - } - gpr_mu_unlock(cq->mu); - return true; - } - gpr_mu_unlock(cq->mu); + + /* Pop a cq_completion from the queue. Returns NULL if the queue is empty + * might return NULL in some cases even if the queue is not empty; but that + * is ok and doesn't affect correctness. Might effect the tail latencies a + * bit) */ + a->stolen_completion = (grpc_cq_completion *)gpr_mpscq_pop(&cq->queue); } + return !a->first_loop && gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; } @@ -438,9 +483,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, "deadline=gpr_timespec { tv_sec: %" PRId64 ", tv_nsec: %d, clock_type: %d }, " "reserved=%p)", - 5, - (cc, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, - reserved)); + 5, (cc, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, + reserved)); GPR_ASSERT(!reserved); dump_pending_tags(cc); @@ -474,7 +518,14 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, grpc_cq_completion *c = (grpc_cq_completion *)gpr_mpscq_pop(&cc->queue); gpr_mu_unlock(&cc->queue_mu); + /* TODO: sreek - If c == NULL it means either the queue is empty OR in an + transient inconsistent state. Consider doing a 0-timeout poll if + (cc->num_queue_items > 0 and c == NULL) so that the thread comes back + quickly from poll to make a second attempt at popping */ + if (c != NULL) { + gpr_atm_no_barrier_fetch_add(&cc->num_queue_items, -1); + ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -483,6 +534,17 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, } if (gpr_atm_no_barrier_load(&cc->shutdown)) { + /* Before returning, check if the queue has any items left over (since + gpr_mpscq_pop() can sometimes return NULL even if the queue is not + empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ + if (gpr_atm_no_barrier_load(&cc->num_queue_items) > 0) { + /* Go to the beginning of the loop. No point doing a poll because + (cc->shutdown == true) is only possible when there is no pending work + (i.e cc->pending_events == 0) and any outstanding grpc_cq_completion + events are already queued on this cq */ + continue; + } + memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; @@ -495,9 +557,9 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, dump_pending_tags(cc); break; } - /* Check alarms - these are a global resource so we just ping - each time through on every pollset. - May update deadline to ensure timely wakeups. */ + + /* Check alarms - these are a global resource so we just ping each time + through on every pollset. May update deadline to ensure timely wakeups.*/ gpr_timespec iteration_deadline = deadline; if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) { GPR_TIMER_MARK("alarm_triggered", 0); @@ -505,10 +567,12 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, continue; } + /* The main polling work happens in grpc_pollset_work */ gpr_mu_lock(cc->mu); grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), NULL, now, iteration_deadline); gpr_mu_unlock(cc->mu); + if (err != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(err); gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); @@ -611,9 +675,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, "deadline=gpr_timespec { tv_sec: %" PRId64 ", tv_nsec: %d, clock_type: %d }, " "reserved=%p)", - 6, - (cc, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, - reserved)); + 6, (cc, tag, deadline.tv_sec, deadline.tv_nsec, + (int)deadline.clock_type, reserved)); } GPR_ASSERT(!reserved); @@ -756,6 +819,11 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) { GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc)); GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); grpc_completion_queue_shutdown(cc); + + if (cc->completion_type == GRPC_CQ_NEXT) { + GPR_ASSERT(gpr_atm_no_barrier_load(&cc->num_queue_items) == 0); + } + GRPC_CQ_INTERNAL_UNREF(cc, "destroy"); GPR_TIMER_END("grpc_completion_queue_destroy", 0); } diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc index 9d7f65d2923..3362510e5a3 100644 --- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc +++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc @@ -79,10 +79,16 @@ static void cq_done_cb(grpc_exec_ctx* exec_ctx, void* done_arg, gpr_free(cq_completion); } -/* Queues a completion tag. ZERO polling overhead */ +/* Queues a completion tag if deadline is > 0. + * Does nothing if deadline is 0 (i.e gpr_time_0(GPR_CLOCK_MONOTONIC)) */ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* ps, grpc_pollset_worker** worker, gpr_timespec now, gpr_timespec deadline) { + if (gpr_time_cmp(deadline, gpr_time_0(GPR_CLOCK_MONOTONIC)) == 0) { + gpr_log(GPR_ERROR, "no-op"); + return GRPC_ERROR_NONE; + } + gpr_mu_unlock(&ps->mu); grpc_cq_begin_op(g_cq, g_tag); grpc_cq_end_op(exec_ctx, g_cq, g_tag, GRPC_ERROR_NONE, cq_done_cb, NULL, @@ -113,6 +119,14 @@ static void setup() { static void teardown() { grpc_completion_queue_shutdown(g_cq); + + /* Drain any events */ + gpr_timespec deadline = gpr_time_0(GPR_CLOCK_MONOTONIC); + while (grpc_completion_queue_next(g_cq, deadline, NULL).type != + GRPC_QUEUE_SHUTDOWN) { + /* Do nothing */ + } + grpc_completion_queue_destroy(g_cq); }