Almost working...

pull/6407/head
Craig Tiller 9 years ago
parent a26637fdca
commit 2743ba94f9
  1. 64
      src/core/lib/iomgr/async_execution_lock.c
  2. 2
      src/core/lib/iomgr/async_execution_lock.h
  3. 29
      test/core/iomgr/async_execution_lock_test.c

@ -40,11 +40,16 @@
#define NO_CONSUMER ((gpr_atm)1)
static void bad_action(grpc_exec_ctx *exec_ctx, void *arg) {
GPR_UNREACHABLE_CODE(return );
}
void grpc_aelock_init(grpc_aelock *lock, grpc_workqueue *optional_workqueue) {
lock->optional_workqueue = optional_workqueue;
gpr_atm_no_barrier_store(&lock->head, NO_CONSUMER);
lock->tail = &lock->stub;
gpr_atm_no_barrier_store(&lock->stub.next, 0);
gpr_atm_no_barrier_store(&lock->tombstone.next, 0);
lock->tombstone.action = bad_action;
lock->tail = &lock->tombstone;
}
void grpc_aelock_destroy(grpc_aelock *lock) {
@ -56,15 +61,35 @@ static void finish(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) {
grpc_aelock_qnode *tail = lock->tail;
grpc_aelock_qnode *next =
(grpc_aelock_qnode *)gpr_atm_acq_load(&tail->next);
if (next == NULL) {
if (gpr_atm_rel_cas(&lock->head, (gpr_atm)&lock->stub, NO_CONSUMER)) {
return;
if (tail == &lock->tombstone) {
if (next == NULL) {
if (gpr_atm_rel_cas(&lock->head, (gpr_atm)&lock->tombstone,
NO_CONSUMER)) {
return;
}
} else {
lock->tail = next;
tail = next;
next = (grpc_aelock_qnode *)gpr_atm_acq_load(&tail->next);
}
} else {
}
if (next != NULL) {
lock->tail = next;
next->action(exec_ctx, next->arg);
gpr_free(next);
tail->action(exec_ctx, tail->arg);
gpr_free(tail);
} else {
grpc_aelock_qnode *head =
(grpc_aelock_qnode *)gpr_atm_acq_load(&lock->head);
if (head != tail) {
// TODO(ctiller): consider sleeping?
continue;
}
gpr_atm_no_barrier_store(&lock->tombstone.next, 0);
while (!gpr_atm_rel_cas(&lock->head, (gpr_atm)head,
(gpr_atm)&lock->tombstone)) {
head = (grpc_aelock_qnode *)gpr_atm_acq_load(&lock->head);
}
gpr_atm_rel_store(&head->next, (gpr_atm)&lock->tombstone);
}
}
}
@ -72,11 +97,11 @@ static void finish(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) {
void grpc_aelock_execute(grpc_exec_ctx *exec_ctx, grpc_aelock *lock,
grpc_aelock_action action, void *arg,
size_t sizeof_arg) {
gpr_atm cur;
gpr_atm head;
retry_top:
cur = gpr_atm_acq_load(&lock->head);
if (cur == NO_CONSUMER) {
if (!gpr_atm_rel_cas(&lock->head, NO_CONSUMER, (gpr_atm)&lock->stub)) {
head = gpr_atm_acq_load(&lock->head);
if (head == NO_CONSUMER) {
if (!gpr_atm_rel_cas(&lock->head, NO_CONSUMER, (gpr_atm)&lock->tombstone)) {
goto retry_top;
}
action(exec_ctx, arg);
@ -86,18 +111,19 @@ retry_top:
grpc_aelock_qnode *n = gpr_malloc(sizeof(*n) + sizeof_arg);
n->action = action;
gpr_atm_no_barrier_store(&n->next, 0);
if (sizeof_arg > 0) {
memcpy(n + 1, arg, sizeof_arg);
n->arg = n + 1;
} else {
n->arg = arg;
}
while (!gpr_atm_rel_cas(&lock->head, cur, (gpr_atm)n)) {
gpr_atm_no_barrier_store(&n->next, 0);
while (!gpr_atm_rel_cas(&lock->head, head, (gpr_atm)n)) {
retry_queue_load:
cur = gpr_atm_acq_load(&lock->head);
if (cur == NO_CONSUMER) {
if (!gpr_atm_rel_cas(&lock->head, NO_CONSUMER, (gpr_atm)&lock->stub)) {
head = gpr_atm_acq_load(&lock->head);
if (head == NO_CONSUMER) {
if (!gpr_atm_rel_cas(&lock->head, NO_CONSUMER,
(gpr_atm)&lock->tombstone)) {
goto retry_queue_load;
}
gpr_free(n);
@ -106,5 +132,5 @@ retry_top:
return; // early out
}
}
gpr_atm_no_barrier_store(&((grpc_aelock_qnode *)cur)->next, (gpr_atm)n);
gpr_atm_rel_store(&((grpc_aelock_qnode *)head)->next, (gpr_atm)n);
}

@ -52,7 +52,7 @@ typedef struct grpc_aelock {
// grpc_aelock_qnode*
gpr_atm head;
grpc_aelock_qnode *tail;
grpc_aelock_qnode stub;
grpc_aelock_qnode tombstone;
} grpc_aelock;
void grpc_aelock_init(grpc_aelock *lock, grpc_workqueue *optional_workqueue);

@ -65,6 +65,11 @@ static void test_execute_one(void) {
grpc_aelock_destroy(&lock);
}
typedef struct {
size_t ctr;
grpc_aelock *lock;
} thd_args;
typedef struct {
size_t *ctr;
size_t value;
@ -72,31 +77,39 @@ typedef struct {
static void check_one(grpc_exec_ctx *exec_ctx, void *a) {
ex_args *args = a;
// gpr_log(GPR_DEBUG, "*%p=%d; step %d", args->ctr, *args->ctr, args->value);
GPR_ASSERT(*args->ctr == args->value - 1);
*args->ctr = args->value;
}
static void execute_many_loop(void *lock) {
static void execute_many_loop(void *a) {
thd_args *args = a;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
for (size_t i = 0; i < 100; i++) {
size_t ctr = 0;
for (size_t j = 1; j <= 1000; j++) {
ex_args args = {&ctr, j};
grpc_aelock_execute(&exec_ctx, lock, check_one, &args, sizeof(args));
size_t n = 1;
for (size_t i = 0; i < 10; i++) {
for (size_t j = 0; j < 1000; j++) {
ex_args c = {&args->ctr, n++};
grpc_aelock_execute(&exec_ctx, args->lock, check_one, &c, sizeof(c));
grpc_exec_ctx_flush(&exec_ctx);
}
gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(1));
gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100));
}
grpc_exec_ctx_finish(&exec_ctx);
}
static void test_execute_many(void) {
gpr_log(GPR_DEBUG, "test_execute_many");
grpc_aelock lock;
gpr_thd_id thds[100];
thd_args ta[GPR_ARRAY_SIZE(thds)];
grpc_aelock_init(&lock, NULL);
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
gpr_thd_options options = gpr_thd_options_default();
gpr_thd_options_set_joinable(&options);
GPR_ASSERT(gpr_thd_new(&thds[i], execute_many_loop, &lock, &options));
ta[i].ctr = 0;
ta[i].lock = &lock;
GPR_ASSERT(gpr_thd_new(&thds[i], execute_many_loop, &ta[i], &options));
}
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
gpr_thd_join(thds[i]);

Loading…
Cancel
Save