|
|
|
@ -50,17 +50,20 @@ int grpc_combiner_trace = 0; |
|
|
|
|
} \
|
|
|
|
|
} while (0) |
|
|
|
|
|
|
|
|
|
#define STATE_UNORPHANED 1 |
|
|
|
|
#define STATE_ELEM_COUNT_LOW_BIT 2 |
|
|
|
|
|
|
|
|
|
struct grpc_combiner { |
|
|
|
|
grpc_combiner *next_combiner_on_this_exec_ctx; |
|
|
|
|
grpc_workqueue *optional_workqueue; |
|
|
|
|
gpr_mpscq queue; |
|
|
|
|
// state is:
|
|
|
|
|
// lower bit - zero if orphaned
|
|
|
|
|
// other bits - number of items queued on the lock
|
|
|
|
|
// lower bit - zero if orphaned (STATE_UNORPHANED)
|
|
|
|
|
// other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT)
|
|
|
|
|
gpr_atm state; |
|
|
|
|
// number of elements in the list that are covered by a poller: if >0, we can
|
|
|
|
|
// offload safely
|
|
|
|
|
gpr_atm covered_by_poller; |
|
|
|
|
gpr_atm elements_covered_by_poller; |
|
|
|
|
bool time_to_execute_final_list; |
|
|
|
|
bool final_list_covered_by_poller; |
|
|
|
|
grpc_closure_list final_list; |
|
|
|
@ -84,7 +87,7 @@ static error_data unpack_error_data(uintptr_t p) { |
|
|
|
|
|
|
|
|
|
static bool is_covered_by_poller(grpc_combiner *lock) { |
|
|
|
|
return lock->final_list_covered_by_poller || |
|
|
|
|
gpr_atm_acq_load(&lock->covered_by_poller) > 0; |
|
|
|
|
gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { |
|
|
|
@ -93,8 +96,8 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { |
|
|
|
|
lock->time_to_execute_final_list = false; |
|
|
|
|
lock->optional_workqueue = optional_workqueue; |
|
|
|
|
lock->final_list_covered_by_poller = false; |
|
|
|
|
gpr_atm_no_barrier_store(&lock->state, 1); |
|
|
|
|
gpr_atm_no_barrier_store(&lock->covered_by_poller, 0); |
|
|
|
|
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); |
|
|
|
|
gpr_atm_no_barrier_store(&lock->elements_covered_by_poller, 0); |
|
|
|
|
gpr_mpscq_init(&lock->queue); |
|
|
|
|
grpc_closure_list_init(&lock->final_list); |
|
|
|
|
grpc_closure_init(&lock->offload, offload, lock); |
|
|
|
@ -111,7 +114,7 @@ static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { |
|
|
|
|
gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -1); |
|
|
|
|
gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED); |
|
|
|
|
GRPC_COMBINER_TRACE(gpr_log( |
|
|
|
|
GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state)); |
|
|
|
|
if (old_state == 1) { |
|
|
|
@ -143,20 +146,20 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, |
|
|
|
|
grpc_closure *cl, grpc_error *error, |
|
|
|
|
bool covered_by_poller) { |
|
|
|
|
GPR_TIMER_BEGIN("combiner.execute", 0); |
|
|
|
|
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2); |
|
|
|
|
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); |
|
|
|
|
GRPC_COMBINER_TRACE(gpr_log( |
|
|
|
|
GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d last=%" PRIdPTR, lock, |
|
|
|
|
cl, covered_by_poller, last)); |
|
|
|
|
GPR_ASSERT(last & 1); // ensure lock has not been destroyed
|
|
|
|
|
GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed
|
|
|
|
|
cl->error_data.scratch = |
|
|
|
|
pack_error_data((error_data){error, covered_by_poller}); |
|
|
|
|
if (covered_by_poller) { |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&lock->covered_by_poller, 1); |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, 1); |
|
|
|
|
} |
|
|
|
|
gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); |
|
|
|
|
if (last == 1) { |
|
|
|
|
// code will be written when the exec_ctx calls
|
|
|
|
|
// grpc_combiner_continue_exec_ctx
|
|
|
|
|
// first element on this list: add it to the list of combiner locks
|
|
|
|
|
// executing within this exec_ctx
|
|
|
|
|
push_last_on_exec_ctx(exec_ctx, lock); |
|
|
|
|
} |
|
|
|
|
GPR_TIMER_END("combiner.execute", 0); |
|
|
|
@ -219,7 +222,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { |
|
|
|
|
GRPC_COMBINER_TRACE( |
|
|
|
|
gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n)); |
|
|
|
|
if (n == NULL) { |
|
|
|
|
// queue is in an inconsistant state: use this as a cue that we should
|
|
|
|
|
// queue is in an inconsistent state: use this as a cue that we should
|
|
|
|
|
// go off and do something else for a while (and come back later)
|
|
|
|
|
GPR_TIMER_MARK("delay_busy", 0); |
|
|
|
|
if (lock->optional_workqueue != NULL && is_covered_by_poller(lock)) { |
|
|
|
@ -233,7 +236,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { |
|
|
|
|
error_data err = unpack_error_data(cl->error_data.scratch); |
|
|
|
|
cl->cb(exec_ctx, cl->cb_arg, err.error); |
|
|
|
|
if (err.covered_by_poller) { |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&lock->covered_by_poller, -1); |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, -1); |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(err.error); |
|
|
|
|
GPR_TIMER_END("combiner.exec1", 0); |
|
|
|
@ -259,27 +262,31 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { |
|
|
|
|
GPR_TIMER_MARK("unref", 0); |
|
|
|
|
move_next(exec_ctx); |
|
|
|
|
lock->time_to_execute_final_list = false; |
|
|
|
|
gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -2); |
|
|
|
|
gpr_atm old_state = |
|
|
|
|
gpr_atm_full_fetch_add(&lock->state, -STATE_ELEM_COUNT_LOW_BIT); |
|
|
|
|
GRPC_COMBINER_TRACE( |
|
|
|
|
gpr_log(GPR_DEBUG, "C:%p finish old_state=%" PRIdPTR, lock, old_state)); |
|
|
|
|
switch (old_state) { |
|
|
|
|
default: |
|
|
|
|
// we have multiple queued work items: just continue executing them
|
|
|
|
|
break; |
|
|
|
|
case 5: // we're down to one queued item: if it's the final list we
|
|
|
|
|
case 4: // should do that
|
|
|
|
|
case STATE_UNORPHANED | (2 * STATE_ELEM_COUNT_LOW_BIT): |
|
|
|
|
case 0 | (2 * STATE_ELEM_COUNT_LOW_BIT): |
|
|
|
|
// we're down to one queued item: if it's the final list we should do that
|
|
|
|
|
if (!grpc_closure_list_empty(lock->final_list)) { |
|
|
|
|
lock->time_to_execute_final_list = true; |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
case 3: // had one count, one unorphaned --> unlocked unorphaned
|
|
|
|
|
case STATE_UNORPHANED | STATE_ELEM_COUNT_LOW_BIT: |
|
|
|
|
// had one count, one unorphaned --> unlocked unorphaned
|
|
|
|
|
GPR_TIMER_END("combiner.continue_exec_ctx", 0); |
|
|
|
|
return true; |
|
|
|
|
case 2: // and one count, one orphaned --> unlocked and orphaned
|
|
|
|
|
case 0 | STATE_ELEM_COUNT_LOW_BIT: |
|
|
|
|
// and one count, one orphaned --> unlocked and orphaned
|
|
|
|
|
really_destroy(exec_ctx, lock); |
|
|
|
|
GPR_TIMER_END("combiner.continue_exec_ctx", 0); |
|
|
|
|
return true; |
|
|
|
|
case 1: |
|
|
|
|
case STATE_UNORPHANED: |
|
|
|
|
case 0: |
|
|
|
|
// these values are illegal - representing an already unlocked or
|
|
|
|
|
// deleted lock
|
|
|
|
@ -314,7 +321,7 @@ void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (grpc_closure_list_empty(lock->final_list)) { |
|
|
|
|
gpr_atm_full_fetch_add(&lock->state, 2); |
|
|
|
|
gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); |
|
|
|
|
} |
|
|
|
|
if (covered_by_poller) { |
|
|
|
|
lock->final_list_covered_by_poller = true; |
|
|
|
|