|
|
|
@ -165,6 +165,7 @@ static void fd_global_shutdown(void); |
|
|
|
|
|
|
|
|
|
#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */ |
|
|
|
|
|
|
|
|
|
/* This is also used as grpc_workqueue (by directly casing it) */ |
|
|
|
|
typedef struct polling_island { |
|
|
|
|
gpr_mu mu; |
|
|
|
|
/* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
|
|
|
|
@ -184,10 +185,16 @@ typedef struct polling_island { |
|
|
|
|
* (except mu and ref_count) are invalid and must be ignored. */ |
|
|
|
|
gpr_atm merged_to; |
|
|
|
|
|
|
|
|
|
/* Number of threads currently polling on this island */ |
|
|
|
|
gpr_atm poller_count; |
|
|
|
|
/* Mutex guarding the read end of the workqueue (must be held to pop from
|
|
|
|
|
* workqueue_items) */ |
|
|
|
|
gpr_mu workqueue_read_mu; |
|
|
|
|
/* Queue of closures to be executed */ |
|
|
|
|
gpr_mpscq workqueue_items; |
|
|
|
|
/* Count of items in workqueue_items */ |
|
|
|
|
gpr_atm workqueue_item_count; |
|
|
|
|
/* Wakeup fd used to wake pollers to check the contents of workqueue_items */ |
|
|
|
|
grpc_wakeup_fd workqueue_wakeup_fd; |
|
|
|
|
|
|
|
|
|
/* The fd of the underlying epoll set */ |
|
|
|
@ -1396,6 +1403,9 @@ static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_closure_run(exec_ctx, c, c->error_data.error); |
|
|
|
|
return true; |
|
|
|
|
} else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) { |
|
|
|
|
/* n == NULL might mean there's work but it's not available to be popped
|
|
|
|
|
* yet - try to ensure another workqueue wakes up to check shortly if so |
|
|
|
|
*/ |
|
|
|
|
workqueue_maybe_wakeup(pi); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -1457,6 +1467,9 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
PI_ADD_REF(pi, "ps_work"); |
|
|
|
|
gpr_mu_unlock(&pollset->mu); |
|
|
|
|
|
|
|
|
|
/* If we get some workqueue work to do, it might end up completing an item on
|
|
|
|
|
the completion queue, so there's no need to poll... so we skip that and |
|
|
|
|
redo the complete loop to verify */ |
|
|
|
|
if (!maybe_do_workqueue_work(exec_ctx, pi)) { |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1); |
|
|
|
|
g_current_thread_polling_island = pi; |
|
|
|
|