From d80a8c947c3491ea1491bc16b4f0479add3461f5 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 10 Oct 2016 13:19:56 -0700 Subject: [PATCH] Cleanup --- src/core/lib/iomgr/combiner.c | 49 ++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 48806abc381..24ffe41d997 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -50,17 +50,20 @@ int grpc_combiner_trace = 0; } \ } while (0) +#define STATE_UNORPHANED 1 +#define STATE_ELEM_COUNT_LOW_BIT 2 + struct grpc_combiner { grpc_combiner *next_combiner_on_this_exec_ctx; grpc_workqueue *optional_workqueue; gpr_mpscq queue; // state is: - // lower bit - zero if orphaned - // other bits - number of items queued on the lock + // lower bit - zero if orphaned (STATE_UNORPHANED) + // other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT) gpr_atm state; // number of elements in the list that are covered by a poller: if >0, we can // offload safely - gpr_atm covered_by_poller; + gpr_atm elements_covered_by_poller; bool time_to_execute_final_list; bool final_list_covered_by_poller; grpc_closure_list final_list; @@ -84,7 +87,7 @@ static error_data unpack_error_data(uintptr_t p) { static bool is_covered_by_poller(grpc_combiner *lock) { return lock->final_list_covered_by_poller || - gpr_atm_acq_load(&lock->covered_by_poller) > 0; + gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0; } grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { @@ -93,8 +96,8 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { lock->time_to_execute_final_list = false; lock->optional_workqueue = optional_workqueue; lock->final_list_covered_by_poller = false; - gpr_atm_no_barrier_store(&lock->state, 1); - gpr_atm_no_barrier_store(&lock->covered_by_poller, 0); + gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); + gpr_atm_no_barrier_store(&lock->elements_covered_by_poller, 0); gpr_mpscq_init(&lock->queue); grpc_closure_list_init(&lock->final_list); grpc_closure_init(&lock->offload, offload, lock); @@ -111,7 +114,7 @@ static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { } void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { - gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -1); + gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED); GRPC_COMBINER_TRACE(gpr_log( GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state)); if (old_state == 1) { @@ -143,20 +146,20 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, grpc_closure *cl, grpc_error *error, bool covered_by_poller) { GPR_TIMER_BEGIN("combiner.execute", 0); - gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2); + gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); GRPC_COMBINER_TRACE(gpr_log( GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d last=%" PRIdPTR, lock, cl, covered_by_poller, last)); - GPR_ASSERT(last & 1); // ensure lock has not been destroyed + GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed cl->error_data.scratch = pack_error_data((error_data){error, covered_by_poller}); if (covered_by_poller) { - gpr_atm_no_barrier_fetch_add(&lock->covered_by_poller, 1); + gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, 1); } gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); if (last == 1) { - // code will be written when the exec_ctx calls - // grpc_combiner_continue_exec_ctx + // first element on this list: add it to the list of combiner locks + // executing within this exec_ctx push_last_on_exec_ctx(exec_ctx, lock); } GPR_TIMER_END("combiner.execute", 0); @@ -219,7 +222,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { GRPC_COMBINER_TRACE( gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n)); if (n == NULL) { - // queue is in an inconsistant state: use this as a cue that we should + // queue is in an inconsistent state: use this as a cue that we should // go off and do something else for a while (and come back later) GPR_TIMER_MARK("delay_busy", 0); if (lock->optional_workqueue != NULL && is_covered_by_poller(lock)) { @@ -233,7 +236,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { error_data err = unpack_error_data(cl->error_data.scratch); cl->cb(exec_ctx, cl->cb_arg, err.error); if (err.covered_by_poller) { - gpr_atm_no_barrier_fetch_add(&lock->covered_by_poller, -1); + gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, -1); } GRPC_ERROR_UNREF(err.error); GPR_TIMER_END("combiner.exec1", 0); @@ -259,27 +262,31 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { GPR_TIMER_MARK("unref", 0); move_next(exec_ctx); lock->time_to_execute_final_list = false; - gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -2); + gpr_atm old_state = + gpr_atm_full_fetch_add(&lock->state, -STATE_ELEM_COUNT_LOW_BIT); GRPC_COMBINER_TRACE( gpr_log(GPR_DEBUG, "C:%p finish old_state=%" PRIdPTR, lock, old_state)); switch (old_state) { default: // we have multiple queued work items: just continue executing them break; - case 5: // we're down to one queued item: if it's the final list we - case 4: // should do that + case STATE_UNORPHANED | (2 * STATE_ELEM_COUNT_LOW_BIT): + case 0 | (2 * STATE_ELEM_COUNT_LOW_BIT): + // we're down to one queued item: if it's the final list we should do that if (!grpc_closure_list_empty(lock->final_list)) { lock->time_to_execute_final_list = true; } break; - case 3: // had one count, one unorphaned --> unlocked unorphaned + case STATE_UNORPHANED | STATE_ELEM_COUNT_LOW_BIT: + // had one count, one unorphaned --> unlocked unorphaned GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; - case 2: // and one count, one orphaned --> unlocked and orphaned + case 0 | STATE_ELEM_COUNT_LOW_BIT: + // and one count, one orphaned --> unlocked and orphaned really_destroy(exec_ctx, lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; - case 1: + case STATE_UNORPHANED: case 0: // these values are illegal - representing an already unlocked or // deleted lock @@ -314,7 +321,7 @@ void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, } if (grpc_closure_list_empty(lock->final_list)) { - gpr_atm_full_fetch_add(&lock->state, 2); + gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); } if (covered_by_poller) { lock->final_list_covered_by_poller = true;