Address review comments

pull/7856/head^2
Craig Tiller 8 years ago
parent 23aff2e0cd
commit f6b6d29841
  1. 9
      src/core/lib/iomgr/combiner.c
  2. 2
      src/core/lib/iomgr/combiner.h
  3. 11
      src/core/lib/support/mpscq.c
  4. 4
      src/core/lib/support/mpscq.h
  5. 3
      test/core/iomgr/combiner_test.c

@ -176,8 +176,10 @@ static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n));
GPR_ASSERT(exec_ctx->active_combiner == lock);
if (n == NULL) {
// queue is in an inconsistant state: use this as a cue that we should
// go off and do something else for a while (and come back later)
// Queue is in an transiently inconsistent state: a new item is being queued
// but is not visible to this thread yet.
// Use this as a cue that we should go off and do something else for a while
// (and come back later)
grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline,
lock);
grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE,
@ -204,6 +206,9 @@ static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
"C:%p finish[%d] old_state=%" PRIdPTR, lock,
loops, old_state));
switch (old_state) {
default:
// we have multiple queued work items: just continue executing them
break;
case 5: // we're down to one queued item: if it's the final list we
case 4: // should do that
if (!grpc_closure_list_empty(lock->final_list)) {

@ -41,7 +41,7 @@
#include "src/core/lib/support/mpscq.h"
// Provides serialized access to some resource.
// Each action queued on an aelock is executed serially in a borrowed thread.
// Each action queued on a combiner is executed serially in a borrowed thread.
// The actual thread executing actions may change over time (but there will only
// every be one at a time).

@ -38,7 +38,7 @@
void gpr_mpscq_init(gpr_mpscq *q) {
gpr_atm_no_barrier_store(&q->head, (gpr_atm)&q->stub);
q->tail = &q->stub;
gpr_atm_no_barrier_store(&q->stub.next, 0);
gpr_atm_no_barrier_store(&q->stub.next, (gpr_atm)NULL);
}
void gpr_mpscq_destroy(gpr_mpscq *q) {
@ -47,16 +47,17 @@ void gpr_mpscq_destroy(gpr_mpscq *q) {
}
void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) {
gpr_atm_no_barrier_store(&n->next, 0);
gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL);
gpr_mpscq_node *prev =
(gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n);
gpr_atm_rel_store(&prev->next, (gpr_atm)n);
gpr_atm_no_barrier_store(&prev->next, (gpr_atm)n);
}
gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) {
gpr_mpscq_node *tail = q->tail;
gpr_mpscq_node *next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next);
if (tail == &q->stub) {
// indicates the list is actually (ephemerally) empty
if (next == NULL) return NULL;
q->tail = next;
tail = next;
@ -68,7 +69,8 @@ gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) {
}
gpr_mpscq_node *head = (gpr_mpscq_node *)gpr_atm_acq_load(&q->head);
if (tail != head) {
return 0;
// indicates a retry is in order: we're still adding
return NULL;
}
gpr_mpscq_push(q, &q->stub);
next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next);
@ -76,5 +78,6 @@ gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) {
q->tail = next;
return tail;
}
// indicates a retry is in order: we're still adding
return NULL;
}

@ -41,8 +41,8 @@
// implementation from Dmitry Vyukov here:
// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
// List node (include this in a data structure and dangle the rest of the
// interesting bits off the end)
// List node (include this in a data structure at the top, and add application
// fields after it - to simulate inheritance)
typedef struct gpr_mpscq_node { gpr_atm next; } gpr_mpscq_node;
// Actual queue type

@ -80,7 +80,6 @@ typedef struct {
static void check_one(grpc_exec_ctx *exec_ctx, void *a, grpc_error *error) {
ex_args *args = a;
// gpr_log(GPR_DEBUG, "*%p=%d; step %d", args->ctr, *args->ctr, args->value);
GPR_ASSERT(*args->ctr == args->value - 1);
*args->ctr = args->value;
gpr_free(a);
@ -99,6 +98,8 @@ static void execute_many_loop(void *a) {
grpc_closure_create(check_one, c), GRPC_ERROR_NONE);
grpc_exec_ctx_flush(&exec_ctx);
}
// sleep for a little bit, to test a combiner draining and another thread
// picking it up
gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100));
}
grpc_exec_ctx_finish(&exec_ctx);

Loading…
Cancel
Save