From 460502e5ce71cb132e3be4e932b63cfd4e7f2349 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 13 Oct 2016 10:02:08 -0700 Subject: [PATCH] Expand documentation --- src/core/lib/iomgr/ev_epoll_linux.c | 13 +++++++++++++ src/core/lib/iomgr/exec_ctx.h | 10 ++++++++++ 2 files changed, 23 insertions(+) diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 98fe2defea8..8de42bb7a9d 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -165,6 +165,7 @@ static void fd_global_shutdown(void); #endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */ +/* This is also used as grpc_workqueue (by directly casing it) */ typedef struct polling_island { gpr_mu mu; /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement @@ -184,10 +185,16 @@ typedef struct polling_island { * (except mu and ref_count) are invalid and must be ignored. */ gpr_atm merged_to; + /* Number of threads currently polling on this island */ gpr_atm poller_count; + /* Mutex guarding the read end of the workqueue (must be held to pop from + * workqueue_items) */ gpr_mu workqueue_read_mu; + /* Queue of closures to be executed */ gpr_mpscq workqueue_items; + /* Count of items in workqueue_items */ gpr_atm workqueue_item_count; + /* Wakeup fd used to wake pollers to check the contents of workqueue_items */ grpc_wakeup_fd workqueue_wakeup_fd; /* The fd of the underlying epoll set */ @@ -1396,6 +1403,9 @@ static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, grpc_closure_run(exec_ctx, c, c->error_data.error); return true; } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) { + /* n == NULL might mean there's work but it's not available to be popped + * yet - try to ensure another workqueue wakes up to check shortly if so + */ workqueue_maybe_wakeup(pi); } } @@ -1457,6 +1467,9 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, PI_ADD_REF(pi, "ps_work"); gpr_mu_unlock(&pollset->mu); + /* If we get some workqueue work to do, it might end up completing an item on + the completion queue, so there's no need to poll... so we skip that and + redo the complete loop to verify */ if (!maybe_do_workqueue_work(exec_ctx, pi)) { gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1); g_current_thread_polling_island = pi; diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 4744e21c5ec..7e50cb9825d 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -66,10 +66,20 @@ typedef struct grpc_combiner grpc_combiner; #ifndef GRPC_EXECUTION_CONTEXT_SANITIZER struct grpc_exec_ctx { grpc_closure_list closure_list; + /** The workqueue we're stealing work from. + As items are queued to the execution context, we try to steal one + workqueue item and execute it inline (assuming the exec_ctx is not + finished) - doing so does not invalidate the workqueue's contract, and + provides a small latency win in cases where we get a hit */ grpc_workqueue *stealing_from_workqueue; + /** The workqueue item that was stolen from the workqueue above. When new + items are scheduled to be offloaded to that workqueue, we need to update + this like a 1-deep fifo to maintain the invariant that workqueue items + queued by one thread are started in order */ grpc_closure *stolen_closure; /** currently active combiner: updated only via combiner.c */ grpc_combiner *active_combiner; + /** last active combiner in the active combiner list */ grpc_combiner *last_combiner; bool cached_ready_to_finish; void *check_ready_to_finish_arg;