Refine async_execution_lock interface, implement exec_ctx based task switching on starvation

pull/6407/head
Craig Tiller 9 years ago
parent a729f60cd3
commit cf600c9f25
  1. 2
      src/core/ext/transport/chttp2/transport/internal.h
  2. 81
      src/core/lib/iomgr/async_execution_lock.c
  3. 16
      src/core/lib/iomgr/async_execution_lock.h
  4. 19
      test/core/iomgr/async_execution_lock_test.c

@ -291,7 +291,7 @@ struct grpc_chttp2_transport_parsing {
int64_t outgoing_window;
};
typedef void (*grpc_chttp2_locked_action)(grpc_exec_ctx *ctx,
typedef void (*grpc_chttp2_locked_action)(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, void *arg);

@ -38,35 +38,88 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#define NO_CONSUMER ((gpr_atm)1)
typedef struct grpc_aelock_qnode {
gpr_mpscq_node mpscq_node;
grpc_aelock_action action;
void *arg;
} grpc_aelock_qnode;
void grpc_aelock_init(grpc_aelock *lock, grpc_workqueue *optional_workqueue) {
struct grpc_aelock {
grpc_workqueue *optional_workqueue;
gpr_mpscq queue;
// state is:
// lower bit - zero if orphaned
// other bits - number of items queued on the lock
gpr_atm state;
grpc_closure continue_finishing;
};
static void continue_finishing(grpc_exec_ctx *exec_ctx, void *arg,
bool success);
grpc_aelock *grpc_aelock_create(grpc_workqueue *optional_workqueue) {
grpc_aelock *lock = gpr_malloc(sizeof(*lock));
lock->optional_workqueue = optional_workqueue;
gpr_atm_no_barrier_store(&lock->locked, 0);
gpr_atm_no_barrier_store(&lock->state, 1);
gpr_mpscq_init(&lock->queue);
grpc_closure_init(&lock->continue_finishing, continue_finishing, lock);
return lock;
}
void grpc_aelock_destroy(grpc_aelock *lock) {
GPR_ASSERT(gpr_atm_no_barrier_load(&lock->locked) == 0);
static void really_destroy(grpc_aelock *lock) {
GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0);
gpr_mpscq_destroy(&lock->queue);
}
void grpc_aelock_destroy(grpc_aelock *lock) {
if (gpr_atm_full_fetch_add(&lock->state, -1) == 1) {
really_destroy(lock);
}
}
static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) {
gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue);
if (n == NULL) {
// queue is in an inconsistant state: use this as a cue that we should
// go off and do something else for a while (and come back later)
grpc_exec_ctx_enqueue(exec_ctx, &lock->continue_finishing, true,
lock->optional_workqueue);
return false;
}
grpc_aelock_qnode *ln = (grpc_aelock_qnode *)n;
ln->action(exec_ctx, ln->arg);
gpr_free(ln);
return true;
}
static void finish(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) {
while (gpr_atm_full_fetch_add(&lock->locked, -1) != 1) {
gpr_mpscq_node *n;
while ((n = gpr_mpscq_pop(&lock->queue)) == NULL) {
// TODO(ctiller): find something to fill in the time
do {
switch (gpr_atm_full_fetch_add(&lock->state, -2)) {
case 3: // had one count, one unorphaned --> unlocked unorphaned
return;
case 2: // and one count, one orphaned --> unlocked and orphaned
really_destroy(lock);
return;
case 1:
case 0:
// these values are illegal - representing an already unlocked or
// deleted lock
GPR_UNREACHABLE_CODE(return );
}
grpc_aelock_qnode *ln = (grpc_aelock_qnode *)n;
ln->action(exec_ctx, ln->arg);
gpr_free(ln);
}
} while (maybe_finish_one(exec_ctx, lock));
}
static void continue_finishing(grpc_exec_ctx *exec_ctx, void *arg,
bool success) {
if (maybe_finish_one(exec_ctx, arg)) finish(exec_ctx, arg);
}
void grpc_aelock_execute(grpc_exec_ctx *exec_ctx, grpc_aelock *lock,
grpc_aelock_action action, void *arg,
size_t sizeof_arg) {
if (gpr_atm_full_fetch_add(&lock->locked, 1) == 0) {
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2);
GPR_ASSERT(last & 1); // ensure lock has not been destroyed
if (last == 1) {
action(exec_ctx, arg);
finish(exec_ctx, lock);
} else {

@ -40,6 +40,8 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/support/mpscq.h"
typedef struct grpc_aelock grpc_aelock;
// Provides serialized access to some resource.
// Each action queued on an aelock is executed serially in a borrowed thread.
// The actual thread executing actions may change over time (but there will only
@ -47,21 +49,9 @@
typedef void (*grpc_aelock_action)(grpc_exec_ctx *exec_ctx, void *arg);
typedef struct grpc_aelock_qnode {
gpr_mpscq_node mpscq_node;
grpc_aelock_action action;
void *arg;
} grpc_aelock_qnode;
typedef struct grpc_aelock {
grpc_workqueue *optional_workqueue;
gpr_mpscq queue;
gpr_atm locked;
} grpc_aelock;
// Initialize the lock, with an optional workqueue to shift load to when
// necessary
void grpc_aelock_init(grpc_aelock *lock, grpc_workqueue *optional_workqueue);
grpc_aelock *grpc_aelock_create(grpc_workqueue *optional_workqueue);
// Destroy the lock
void grpc_aelock_destroy(grpc_aelock *lock);
// Execute \a action within the lock. \a arg is the argument to pass to \a

@ -42,10 +42,7 @@
static void test_no_op(void) {
gpr_log(GPR_DEBUG, "test_no_op");
grpc_aelock lock;
grpc_aelock_init(&lock, NULL);
grpc_aelock_destroy(&lock);
grpc_aelock_destroy(grpc_aelock_create(NULL));
}
static void set_bool_to_true(grpc_exec_ctx *exec_ctx, void *value) {
@ -55,14 +52,13 @@ static void set_bool_to_true(grpc_exec_ctx *exec_ctx, void *value) {
static void test_execute_one(void) {
gpr_log(GPR_DEBUG, "test_execute_one");
grpc_aelock lock;
grpc_aelock_init(&lock, NULL);
grpc_aelock *lock = grpc_aelock_create(NULL);
bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_aelock_execute(&exec_ctx, &lock, set_bool_to_true, &done, 0);
grpc_aelock_execute(&exec_ctx, lock, set_bool_to_true, &done, 0);
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(done);
grpc_aelock_destroy(&lock);
grpc_aelock_destroy(lock);
}
typedef struct {
@ -100,21 +96,20 @@ static void execute_many_loop(void *a) {
static void test_execute_many(void) {
gpr_log(GPR_DEBUG, "test_execute_many");
grpc_aelock lock;
grpc_aelock *lock = grpc_aelock_create(NULL);
gpr_thd_id thds[100];
thd_args ta[GPR_ARRAY_SIZE(thds)];
grpc_aelock_init(&lock, NULL);
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
gpr_thd_options options = gpr_thd_options_default();
gpr_thd_options_set_joinable(&options);
ta[i].ctr = 0;
ta[i].lock = &lock;
ta[i].lock = lock;
GPR_ASSERT(gpr_thd_new(&thds[i], execute_many_loop, &ta[i], &options));
}
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
gpr_thd_join(thds[i]);
}
grpc_aelock_destroy(&lock);
grpc_aelock_destroy(lock);
}
int main(int argc, char **argv) {

Loading…
Cancel
Save