|
|
|
@ -65,9 +65,9 @@ |
|
|
|
|
|
|
|
|
|
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) |
|
|
|
|
|
|
|
|
|
#define GRPC_POLLING_TRACE(fmt, ...) \ |
|
|
|
|
#define GRPC_POLLING_TRACE(...) \ |
|
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { \
|
|
|
|
|
gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
|
|
|
|
|
gpr_log(GPR_INFO, __VA_ARGS__); \
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Uncomment the following to enable extra checks on poll_object operations */ |
|
|
|
@ -732,7 +732,7 @@ static void workqueue_maybe_wakeup(polling_island *pi) { |
|
|
|
|
it right now. Note that since we do an anticipatory mpscq_pop every poll |
|
|
|
|
loop, it's ok if we miss the wakeup here, as we'll get the work item when |
|
|
|
|
the next poller enters anyway. */ |
|
|
|
|
if (current_pollers > min_current_pollers_for_wakeup) { |
|
|
|
|
if (current_pollers >= min_current_pollers_for_wakeup) { |
|
|
|
|
GRPC_LOG_IF_ERROR("workqueue_wakeup_fd", |
|
|
|
|
grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd)); |
|
|
|
|
} |
|
|
|
@ -1332,7 +1332,13 @@ static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, |
|
|
|
|
gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items); |
|
|
|
|
gpr_mu_unlock(&pi->workqueue_read_mu); |
|
|
|
|
if (n != NULL) { |
|
|
|
|
if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) { |
|
|
|
|
gpr_atm remaining = |
|
|
|
|
gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) - 1; |
|
|
|
|
GRPC_POLLING_TRACE( |
|
|
|
|
"maybe_do_workqueue_work: pi: %p: got closure %p, remaining = " |
|
|
|
|
"%" PRIdPTR, |
|
|
|
|
pi, n, remaining); |
|
|
|
|
if (remaining > 0) { |
|
|
|
|
workqueue_maybe_wakeup(pi); |
|
|
|
|
} |
|
|
|
|
grpc_closure *c = (grpc_closure *)n; |
|
|
|
@ -1347,8 +1353,13 @@ static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, |
|
|
|
|
/* n == NULL might mean there's work but it's not available to be popped
|
|
|
|
|
* yet - try to ensure another workqueue wakes up to check shortly if so |
|
|
|
|
*/ |
|
|
|
|
GRPC_POLLING_TRACE( |
|
|
|
|
"maybe_do_workqueue_work: pi: %p: more to do, but not yet", pi); |
|
|
|
|
workqueue_maybe_wakeup(pi); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
GRPC_POLLING_TRACE("maybe_do_workqueue_work: pi: %p: read already locked", |
|
|
|
|
pi); |
|
|
|
|
} |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
@ -1411,7 +1422,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
/* If we get some workqueue work to do, it might end up completing an item on
|
|
|
|
|
the completion queue, so there's no need to poll... so we skip that and |
|
|
|
|
redo the complete loop to verify */ |
|
|
|
|
GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker %p, pi %p", pollset, |
|
|
|
|
worker, pi); |
|
|
|
|
if (!maybe_do_workqueue_work(exec_ctx, pi)) { |
|
|
|
|
GRPC_POLLING_TRACE("pollset_work: begins"); |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1); |
|
|
|
|
g_current_thread_polling_island = pi; |
|
|
|
|
|
|
|
|
@ -1472,6 +1486,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
|
|
g_current_thread_polling_island = NULL; |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1); |
|
|
|
|
GRPC_POLLING_TRACE("pollset_work: ends"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(pi != NULL); |
|
|
|
|