Revert "Add the ability to run some action when the lock becomes idle"

This reverts commit c9d4b81dab.
pull/7644/head
Craig Tiller 9 years ago
parent 3f417b7e73
commit 765c538d72
  1. 94
      src/core/lib/iomgr/async_execution_lock.c
  2. 4
      src/core/lib/iomgr/async_execution_lock.h
  3. 22
      test/core/iomgr/async_execution_lock_test.c

@ -38,9 +38,6 @@
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#define STATE_BIT_ALIVE 1
#define STATE_BIT_REFS 2
typedef struct grpc_aelock_qnode { typedef struct grpc_aelock_qnode {
gpr_mpscq_node mpscq_node; gpr_mpscq_node mpscq_node;
grpc_aelock_action action; grpc_aelock_action action;
@ -53,24 +50,17 @@ struct grpc_aelock {
// state is: // state is:
// lower bit - zero if orphaned // lower bit - zero if orphaned
// other bits - number of items queued on the lock // other bits - number of items queued on the lock
// see: STATE_BIT_xxx
gpr_atm state; gpr_atm state;
grpc_aelock_action before_idle_action;
void *before_idle_action_arg;
grpc_closure continue_finishing; grpc_closure continue_finishing;
}; };
static void continue_finishing(grpc_exec_ctx *exec_ctx, void *arg, static void continue_finishing(grpc_exec_ctx *exec_ctx, void *arg,
bool success); bool success);
grpc_aelock *grpc_aelock_create(grpc_workqueue *optional_workqueue, grpc_aelock *grpc_aelock_create(grpc_workqueue *optional_workqueue) {
grpc_aelock_action before_idle_action,
void *before_idle_action_arg) {
grpc_aelock *lock = gpr_malloc(sizeof(*lock)); grpc_aelock *lock = gpr_malloc(sizeof(*lock));
lock->before_idle_action = before_idle_action;
lock->before_idle_action_arg = before_idle_action_arg;
lock->optional_workqueue = optional_workqueue; lock->optional_workqueue = optional_workqueue;
gpr_atm_no_barrier_store(&lock->state, STATE_BIT_ALIVE); gpr_atm_no_barrier_store(&lock->state, 1);
gpr_mpscq_init(&lock->queue); gpr_mpscq_init(&lock->queue);
grpc_closure_init(&lock->continue_finishing, continue_finishing, lock); grpc_closure_init(&lock->continue_finishing, continue_finishing, lock);
return lock; return lock;
@ -83,8 +73,7 @@ static void really_destroy(grpc_aelock *lock) {
} }
void grpc_aelock_destroy(grpc_aelock *lock) { void grpc_aelock_destroy(grpc_aelock *lock) {
if (gpr_atm_full_fetch_add(&lock->state, -STATE_BIT_ALIVE) == if (gpr_atm_full_fetch_add(&lock->state, -1) == 1) {
STATE_BIT_ALIVE) {
really_destroy(lock); really_destroy(lock);
} }
} }
@ -92,6 +81,10 @@ void grpc_aelock_destroy(grpc_aelock *lock) {
static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) { static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) {
gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue); gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue);
if (n == NULL) { if (n == NULL) {
// queue is in an inconsistant state: use this as a cue that we should
// go off and do something else for a while (and come back later)
grpc_exec_ctx_enqueue(exec_ctx, &lock->continue_finishing, true,
lock->optional_workqueue);
return false; return false;
} }
grpc_aelock_qnode *ln = (grpc_aelock_qnode *)n; grpc_aelock_qnode *ln = (grpc_aelock_qnode *)n;
@ -101,89 +94,36 @@ static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) {
} }
static void finish(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) { static void finish(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) {
for (;;) { do {
gpr_atm last_state = gpr_atm_full_fetch_add(&lock->state, -STATE_BIT_REFS); switch (gpr_atm_full_fetch_add(&lock->state, -2)) {
switch (last_state) { case 3: // had one count, one unorphaned --> unlocked unorphaned
default:
perform_one_step:
gpr_log(GPR_DEBUG, "ls=%d execute", last_state);
if (!maybe_finish_one(exec_ctx, lock)) {
// perform the idle action before going off to do something else
lock->before_idle_action(exec_ctx, lock->before_idle_action_arg);
// quick peek to see if we can immediately resume
if (!maybe_finish_one(exec_ctx, lock)) {
// queue is in an inconsistant state: use this as a cue that we
// should
// go off and do something else for a while (and come back later)
grpc_exec_ctx_enqueue(exec_ctx, &lock->continue_finishing, true,
lock->optional_workqueue);
return;
}
}
break;
case STATE_BIT_ALIVE | (2 * STATE_BIT_REFS):
gpr_log(GPR_DEBUG, "ls=%d final", last_state);
lock->before_idle_action(exec_ctx, lock->before_idle_action_arg);
switch (gpr_atm_full_fetch_add(&lock->state, -STATE_BIT_REFS)) {
case STATE_BIT_ALIVE | STATE_BIT_REFS:
return;
case STATE_BIT_REFS:
really_destroy(lock);
return;
default:
gpr_log(GPR_DEBUG, "retry");
// oops: did the before action, but something else came in
// better add another ref so we remember to do this again
gpr_atm_full_fetch_add(&lock->state, STATE_BIT_REFS);
goto perform_one_step;
}
break;
case STATE_BIT_ALIVE | STATE_BIT_REFS:
gpr_log(GPR_DEBUG, "ls=%d unlock", last_state);
return; return;
case 2 * STATE_BIT_REFS: case 2: // and one count, one orphaned --> unlocked and orphaned
gpr_log(GPR_DEBUG, "ls=%d idle", last_state);
lock->before_idle_action(exec_ctx, lock->before_idle_action_arg);
GPR_ASSERT(gpr_atm_full_fetch_add(&lock->state, -STATE_BIT_REFS) ==
STATE_BIT_REFS);
case STATE_BIT_REFS:
gpr_log(GPR_DEBUG, "ls=%d destroy", last_state);
really_destroy(lock); really_destroy(lock);
return; return;
case STATE_BIT_ALIVE: case 1:
case 0: case 0:
// these values are illegal - representing an already unlocked or // these values are illegal - representing an already unlocked or
// deleted lock // deleted lock
GPR_UNREACHABLE_CODE(return ); GPR_UNREACHABLE_CODE(return );
} }
} } while (maybe_finish_one(exec_ctx, lock));
// while (maybe_finish_one(exec_ctx, lock));
} }
static void continue_finishing(grpc_exec_ctx *exec_ctx, void *arg, static void continue_finishing(grpc_exec_ctx *exec_ctx, void *arg,
bool success) { bool success) {
grpc_aelock *lock = arg; if (maybe_finish_one(exec_ctx, arg)) finish(exec_ctx, arg);
if (maybe_finish_one(exec_ctx, lock)) {
finish(exec_ctx, lock);
} else {
// queue is in an inconsistant state: use this as a cue that we should
// go off and do something else for a while (and come back later)
grpc_exec_ctx_enqueue(exec_ctx, &lock->continue_finishing, true,
lock->optional_workqueue);
}
} }
void grpc_aelock_execute(grpc_exec_ctx *exec_ctx, grpc_aelock *lock, void grpc_aelock_execute(grpc_exec_ctx *exec_ctx, grpc_aelock *lock,
grpc_aelock_action action, void *arg, grpc_aelock_action action, void *arg,
size_t sizeof_arg) { size_t sizeof_arg) {
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2 * STATE_BIT_REFS); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2);
GPR_ASSERT(last & STATE_BIT_ALIVE); // ensure lock has not been destroyed GPR_ASSERT(last & 1); // ensure lock has not been destroyed
if (last == STATE_BIT_ALIVE) { if (last == 1) {
action(exec_ctx, arg); action(exec_ctx, arg);
finish(exec_ctx, lock); finish(exec_ctx, lock);
} else { } else {
gpr_atm_full_fetch_add(&lock->state, -STATE_BIT_REFS);
grpc_aelock_qnode *n = gpr_malloc(sizeof(*n) + sizeof_arg); grpc_aelock_qnode *n = gpr_malloc(sizeof(*n) + sizeof_arg);
n->action = action; n->action = action;
if (sizeof_arg > 0) { if (sizeof_arg > 0) {

@ -51,9 +51,7 @@ typedef void (*grpc_aelock_action)(grpc_exec_ctx *exec_ctx, void *arg);
// Initialize the lock, with an optional workqueue to shift load to when // Initialize the lock, with an optional workqueue to shift load to when
// necessary // necessary
grpc_aelock *grpc_aelock_create(grpc_workqueue *optional_workqueue, grpc_aelock *grpc_aelock_create(grpc_workqueue *optional_workqueue);
grpc_aelock_action before_idle_action,
void *before_idle_action_arg);
// Destroy the lock // Destroy the lock
void grpc_aelock_destroy(grpc_aelock *lock); void grpc_aelock_destroy(grpc_aelock *lock);
// Execute \a action within the lock. \a arg is the argument to pass to \a // Execute \a action within the lock. \a arg is the argument to pass to \a

@ -40,34 +40,25 @@
#include "test/core/util/test_config.h" #include "test/core/util/test_config.h"
static void do_nothing_action(grpc_exec_ctx *exec_ctx, void *ignored) {}
static void test_no_op(void) { static void test_no_op(void) {
gpr_log(GPR_DEBUG, "test_no_op"); gpr_log(GPR_DEBUG, "test_no_op");
grpc_aelock_destroy(grpc_aelock_create(NULL, do_nothing_action, NULL)); grpc_aelock_destroy(grpc_aelock_create(NULL));
} }
static void set_bool_to_true(grpc_exec_ctx *exec_ctx, void *value) { static void set_bool_to_true(grpc_exec_ctx *exec_ctx, void *value) {
*(bool *)value = true; *(bool *)value = true;
} }
static void increment_atomic(grpc_exec_ctx *exec_ctx, void *value) {
gpr_atm_full_fetch_add((gpr_atm *)value, 1);
}
static void test_execute_one(void) { static void test_execute_one(void) {
gpr_log(GPR_DEBUG, "test_execute_one"); gpr_log(GPR_DEBUG, "test_execute_one");
gpr_atm idles; grpc_aelock *lock = grpc_aelock_create(NULL);
gpr_atm_no_barrier_store(&idles, 0);
grpc_aelock *lock = grpc_aelock_create(NULL, increment_atomic, &idles);
bool done = false; bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_aelock_execute(&exec_ctx, lock, set_bool_to_true, &done, 0); grpc_aelock_execute(&exec_ctx, lock, set_bool_to_true, &done, 0);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(done); GPR_ASSERT(done);
grpc_aelock_destroy(lock); grpc_aelock_destroy(lock);
GPR_ASSERT(gpr_atm_no_barrier_load(&idles) == 1);
} }
typedef struct { typedef struct {
@ -92,7 +83,7 @@ static void execute_many_loop(void *a) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
size_t n = 1; size_t n = 1;
for (size_t i = 0; i < 10; i++) { for (size_t i = 0; i < 10; i++) {
for (size_t j = 0; j < 100; j++) { for (size_t j = 0; j < 10000; j++) {
ex_args c = {&args->ctr, n++}; ex_args c = {&args->ctr, n++};
grpc_aelock_execute(&exec_ctx, args->lock, check_one, &c, sizeof(c)); grpc_aelock_execute(&exec_ctx, args->lock, check_one, &c, sizeof(c));
grpc_exec_ctx_flush(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx);
@ -105,10 +96,7 @@ static void execute_many_loop(void *a) {
static void test_execute_many(void) { static void test_execute_many(void) {
gpr_log(GPR_DEBUG, "test_execute_many"); gpr_log(GPR_DEBUG, "test_execute_many");
gpr_atm idles; grpc_aelock *lock = grpc_aelock_create(NULL);
gpr_atm_no_barrier_store(&idles, 0);
grpc_aelock *lock = grpc_aelock_create(NULL, increment_atomic, &idles);
gpr_thd_id thds[100]; gpr_thd_id thds[100];
thd_args ta[GPR_ARRAY_SIZE(thds)]; thd_args ta[GPR_ARRAY_SIZE(thds)];
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
@ -122,8 +110,6 @@ static void test_execute_many(void) {
gpr_thd_join(thds[i]); gpr_thd_join(thds[i]);
} }
grpc_aelock_destroy(lock); grpc_aelock_destroy(lock);
gpr_log(GPR_DEBUG, "idles: %d", gpr_atm_no_barrier_load(&idles));
} }
int main(int argc, char **argv) { int main(int argc, char **argv) {

Loading…
Cancel
Save