diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index eb5ad634bd5..831bdb4aff8 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -176,8 +176,10 @@ static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n)); GPR_ASSERT(exec_ctx->active_combiner == lock); if (n == NULL) { - // queue is in an inconsistant state: use this as a cue that we should - // go off and do something else for a while (and come back later) + // Queue is in an transiently inconsistent state: a new item is being queued + // but is not visible to this thread yet. + // Use this as a cue that we should go off and do something else for a while + // (and come back later) grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline, lock); grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE, @@ -204,6 +206,9 @@ static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { "C:%p finish[%d] old_state=%" PRIdPTR, lock, loops, old_state)); switch (old_state) { + default: + // we have multiple queued work items: just continue executing them + break; case 5: // we're down to one queued item: if it's the final list we case 4: // should do that if (!grpc_closure_list_empty(lock->final_list)) { diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index 57df8f0ba82..08acbb7441c 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -41,7 +41,7 @@ #include "src/core/lib/support/mpscq.h" // Provides serialized access to some resource. -// Each action queued on an aelock is executed serially in a borrowed thread. +// Each action queued on a combiner is executed serially in a borrowed thread. // The actual thread executing actions may change over time (but there will only // every be one at a time). diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c index 25b055b172f..cdd6335f82b 100644 --- a/src/core/lib/support/mpscq.c +++ b/src/core/lib/support/mpscq.c @@ -38,7 +38,7 @@ void gpr_mpscq_init(gpr_mpscq *q) { gpr_atm_no_barrier_store(&q->head, (gpr_atm)&q->stub); q->tail = &q->stub; - gpr_atm_no_barrier_store(&q->stub.next, 0); + gpr_atm_no_barrier_store(&q->stub.next, (gpr_atm)NULL); } void gpr_mpscq_destroy(gpr_mpscq *q) { @@ -47,16 +47,17 @@ void gpr_mpscq_destroy(gpr_mpscq *q) { } void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { - gpr_atm_no_barrier_store(&n->next, 0); + gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL); gpr_mpscq_node *prev = (gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n); - gpr_atm_rel_store(&prev->next, (gpr_atm)n); + gpr_atm_no_barrier_store(&prev->next, (gpr_atm)n); } gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) { gpr_mpscq_node *tail = q->tail; gpr_mpscq_node *next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next); if (tail == &q->stub) { + // indicates the list is actually (ephemerally) empty if (next == NULL) return NULL; q->tail = next; tail = next; @@ -68,7 +69,8 @@ gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) { } gpr_mpscq_node *head = (gpr_mpscq_node *)gpr_atm_acq_load(&q->head); if (tail != head) { - return 0; + // indicates a retry is in order: we're still adding + return NULL; } gpr_mpscq_push(q, &q->stub); next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next); @@ -76,5 +78,6 @@ gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) { q->tail = next; return tail; } + // indicates a retry is in order: we're still adding return NULL; } diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h index 1201edceb1a..977a1179529 100644 --- a/src/core/lib/support/mpscq.h +++ b/src/core/lib/support/mpscq.h @@ -41,8 +41,8 @@ // implementation from Dmitry Vyukov here: // http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue -// List node (include this in a data structure and dangle the rest of the -// interesting bits off the end) +// List node (include this in a data structure at the top, and add application +// fields after it - to simulate inheritance) typedef struct gpr_mpscq_node { gpr_atm next; } gpr_mpscq_node; // Actual queue type diff --git a/test/core/iomgr/combiner_test.c b/test/core/iomgr/combiner_test.c index 7cf016d82cc..197998c1e54 100644 --- a/test/core/iomgr/combiner_test.c +++ b/test/core/iomgr/combiner_test.c @@ -80,7 +80,6 @@ typedef struct { static void check_one(grpc_exec_ctx *exec_ctx, void *a, grpc_error *error) { ex_args *args = a; - // gpr_log(GPR_DEBUG, "*%p=%d; step %d", args->ctr, *args->ctr, args->value); GPR_ASSERT(*args->ctr == args->value - 1); *args->ctr = args->value; gpr_free(a); @@ -99,6 +98,8 @@ static void execute_many_loop(void *a) { grpc_closure_create(check_one, c), GRPC_ERROR_NONE); grpc_exec_ctx_flush(&exec_ctx); } + // sleep for a little bit, to test a combiner draining and another thread + // picking it up gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100)); } grpc_exec_ctx_finish(&exec_ctx);