Merge branch 'direct-calls' into buffer_pools_for_realsies

reviewable/pr8239/r2
Craig Tiller 9 years ago
commit 7f6b0e7d7a
  1. 6
      include/grpc/support/alloc.h
  2. 57
      src/core/lib/iomgr/combiner.c
  3. 10
      src/core/lib/iomgr/exec_ctx.c

@ -48,7 +48,11 @@ typedef struct gpr_allocation_functions {
void (*free_fn)(void *ptr); void (*free_fn)(void *ptr);
} gpr_allocation_functions; } gpr_allocation_functions;
/* malloc, never returns NULL */ /* malloc.
* If size==0, always returns NULL. Otherwise this function never returns NULL.
* The pointer returned is suitably aligned for any kind of variable it could
* contain.
*/
GPRAPI void *gpr_malloc(size_t size); GPRAPI void *gpr_malloc(size_t size);
/* free */ /* free */
GPRAPI void gpr_free(void *ptr); GPRAPI void gpr_free(void *ptr);

@ -50,17 +50,20 @@ int grpc_combiner_trace = 0;
} \ } \
} while (0) } while (0)
#define STATE_UNORPHANED 1
#define STATE_ELEM_COUNT_LOW_BIT 2
struct grpc_combiner { struct grpc_combiner {
grpc_combiner *next_combiner_on_this_exec_ctx; grpc_combiner *next_combiner_on_this_exec_ctx;
grpc_workqueue *optional_workqueue; grpc_workqueue *optional_workqueue;
gpr_mpscq queue; gpr_mpscq queue;
// state is: // state is:
// lower bit - zero if orphaned // lower bit - zero if orphaned (STATE_UNORPHANED)
// other bits - number of items queued on the lock // other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT)
gpr_atm state; gpr_atm state;
// number of elements in the list that are covered by a poller: if >0, we can // number of elements in the list that are covered by a poller: if >0, we can
// offload safely // offload safely
gpr_atm covered_by_poller; gpr_atm elements_covered_by_poller;
bool time_to_execute_final_list; bool time_to_execute_final_list;
bool final_list_covered_by_poller; bool final_list_covered_by_poller;
grpc_closure_list final_list; grpc_closure_list final_list;
@ -84,7 +87,7 @@ static error_data unpack_error_data(uintptr_t p) {
static bool is_covered_by_poller(grpc_combiner *lock) { static bool is_covered_by_poller(grpc_combiner *lock) {
return lock->final_list_covered_by_poller || return lock->final_list_covered_by_poller ||
gpr_atm_acq_load(&lock->covered_by_poller) > 0; gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0;
} }
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
@ -93,8 +96,8 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
lock->time_to_execute_final_list = false; lock->time_to_execute_final_list = false;
lock->optional_workqueue = optional_workqueue; lock->optional_workqueue = optional_workqueue;
lock->final_list_covered_by_poller = false; lock->final_list_covered_by_poller = false;
gpr_atm_no_barrier_store(&lock->state, 1); gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
gpr_atm_no_barrier_store(&lock->covered_by_poller, 0); gpr_atm_no_barrier_store(&lock->elements_covered_by_poller, 0);
gpr_mpscq_init(&lock->queue); gpr_mpscq_init(&lock->queue);
grpc_closure_list_init(&lock->final_list); grpc_closure_list_init(&lock->final_list);
grpc_closure_init(&lock->offload, offload, lock); grpc_closure_init(&lock->offload, offload, lock);
@ -111,7 +114,7 @@ static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
} }
void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -1); gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED);
GRPC_COMBINER_TRACE(gpr_log( GRPC_COMBINER_TRACE(gpr_log(
GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state)); GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state));
if (old_state == 1) { if (old_state == 1) {
@ -143,20 +146,20 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *cl, grpc_error *error, grpc_closure *cl, grpc_error *error,
bool covered_by_poller) { bool covered_by_poller) {
GPR_TIMER_BEGIN("combiner.execute", 0); GPR_TIMER_BEGIN("combiner.execute", 0);
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
GRPC_COMBINER_TRACE(gpr_log( GRPC_COMBINER_TRACE(gpr_log(
GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d last=%" PRIdPTR, lock, GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d last=%" PRIdPTR, lock,
cl, covered_by_poller, last)); cl, covered_by_poller, last));
GPR_ASSERT(last & 1); // ensure lock has not been destroyed GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed
cl->error_data.scratch = cl->error_data.scratch =
pack_error_data((error_data){error, covered_by_poller}); pack_error_data((error_data){error, covered_by_poller});
if (covered_by_poller) { if (covered_by_poller) {
gpr_atm_no_barrier_fetch_add(&lock->covered_by_poller, 1); gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, 1);
} }
gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
if (last == 1) { if (last == 1) {
// code will be written when the exec_ctx calls // first element on this list: add it to the list of combiner locks
// grpc_combiner_continue_exec_ctx // executing within this exec_ctx
push_last_on_exec_ctx(exec_ctx, lock); push_last_on_exec_ctx(exec_ctx, lock);
} }
GPR_TIMER_END("combiner.execute", 0); GPR_TIMER_END("combiner.execute", 0);
@ -219,7 +222,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
GRPC_COMBINER_TRACE( GRPC_COMBINER_TRACE(
gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n)); gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n));
if (n == NULL) { if (n == NULL) {
// queue is in an inconsistant state: use this as a cue that we should // queue is in an inconsistent state: use this as a cue that we should
// go off and do something else for a while (and come back later) // go off and do something else for a while (and come back later)
GPR_TIMER_MARK("delay_busy", 0); GPR_TIMER_MARK("delay_busy", 0);
if (lock->optional_workqueue != NULL && is_covered_by_poller(lock)) { if (lock->optional_workqueue != NULL && is_covered_by_poller(lock)) {
@ -233,7 +236,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
error_data err = unpack_error_data(cl->error_data.scratch); error_data err = unpack_error_data(cl->error_data.scratch);
cl->cb(exec_ctx, cl->cb_arg, err.error); cl->cb(exec_ctx, cl->cb_arg, err.error);
if (err.covered_by_poller) { if (err.covered_by_poller) {
gpr_atm_no_barrier_fetch_add(&lock->covered_by_poller, -1); gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, -1);
} }
GRPC_ERROR_UNREF(err.error); GRPC_ERROR_UNREF(err.error);
GPR_TIMER_END("combiner.exec1", 0); GPR_TIMER_END("combiner.exec1", 0);
@ -259,28 +262,38 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
GPR_TIMER_MARK("unref", 0); GPR_TIMER_MARK("unref", 0);
move_next(exec_ctx); move_next(exec_ctx);
lock->time_to_execute_final_list = false; lock->time_to_execute_final_list = false;
gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -2); gpr_atm old_state =
gpr_atm_full_fetch_add(&lock->state, -STATE_ELEM_COUNT_LOW_BIT);
GRPC_COMBINER_TRACE( GRPC_COMBINER_TRACE(
gpr_log(GPR_DEBUG, "C:%p finish old_state=%" PRIdPTR, lock, old_state)); gpr_log(GPR_DEBUG, "C:%p finish old_state=%" PRIdPTR, lock, old_state));
// Define a macro to ease readability of the following switch statement.
#define OLD_STATE_WAS(orphaned, elem_count) \
(((orphaned) ? 0 : STATE_UNORPHANED) | \
((elem_count)*STATE_ELEM_COUNT_LOW_BIT))
// Depending on what the previous state was, we need to perform different
// actions.
switch (old_state) { switch (old_state) {
default: default:
// we have multiple queued work items: just continue executing them // we have multiple queued work items: just continue executing them
break; break;
case 5: // we're down to one queued item: if it's the final list we case OLD_STATE_WAS(false, 2):
case 4: // should do that case OLD_STATE_WAS(true, 2):
// we're down to one queued item: if it's the final list we should do that
if (!grpc_closure_list_empty(lock->final_list)) { if (!grpc_closure_list_empty(lock->final_list)) {
lock->time_to_execute_final_list = true; lock->time_to_execute_final_list = true;
} }
break; break;
case 3: // had one count, one unorphaned --> unlocked unorphaned case OLD_STATE_WAS(false, 1):
// had one count, one unorphaned --> unlocked unorphaned
GPR_TIMER_END("combiner.continue_exec_ctx", 0); GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return true; return true;
case 2: // and one count, one orphaned --> unlocked and orphaned case OLD_STATE_WAS(true, 1):
// and one count, one orphaned --> unlocked and orphaned
really_destroy(exec_ctx, lock); really_destroy(exec_ctx, lock);
GPR_TIMER_END("combiner.continue_exec_ctx", 0); GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return true; return true;
case 1: case OLD_STATE_WAS(false, 0):
case 0: case OLD_STATE_WAS(true, 0):
// these values are illegal - representing an already unlocked or // these values are illegal - representing an already unlocked or
// deleted lock // deleted lock
GPR_TIMER_END("combiner.continue_exec_ctx", 0); GPR_TIMER_END("combiner.continue_exec_ctx", 0);
@ -314,7 +327,7 @@ void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
} }
if (grpc_closure_list_empty(lock->final_list)) { if (grpc_closure_list_empty(lock->final_list)) {
gpr_atm_full_fetch_add(&lock->state, 2); gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
} }
if (covered_by_poller) { if (covered_by_poller) {
lock->final_list_covered_by_poller = true; lock->final_list_covered_by_poller = true;

@ -71,12 +71,9 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
grpc_closure_run(exec_ctx, c, c->error_data.error); grpc_closure_run(exec_ctx, c, c->error_data.error);
c = next; c = next;
} }
continue; } else if (!grpc_combiner_continue_exec_ctx(exec_ctx)) {
break;
} }
if (grpc_combiner_continue_exec_ctx(exec_ctx)) {
continue;
}
break;
} }
GPR_ASSERT(exec_ctx->active_combiner == NULL); GPR_ASSERT(exec_ctx->active_combiner == NULL);
if (exec_ctx->stealing_from_workqueue != NULL) { if (exec_ctx->stealing_from_workqueue != NULL) {
@ -100,8 +97,7 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
GPR_TIMER_END("grpc_exec_ctx_flush.stolen_cb", 0); GPR_TIMER_END("grpc_exec_ctx_flush.stolen_cb", 0);
grpc_exec_ctx_flush(exec_ctx); grpc_exec_ctx_flush(exec_ctx);
GPR_TIMER_END("grpc_exec_ctx_flush", 0); did_something = true;
return true;
} }
} }
GPR_TIMER_END("grpc_exec_ctx_flush", 0); GPR_TIMER_END("grpc_exec_ctx_flush", 0);

Loading…
Cancel
Save