From 7e9d52530d0145fe8202d2fd35621407745e91ab Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Mon, 9 Jul 2018 14:53:54 -0700 Subject: [PATCH] Move executor implementation into GrpcExecutor class --- src/core/lib/iomgr/executor.cc | 297 ++++++++++++++++++--------------- src/core/lib/iomgr/executor.h | 56 +++++-- 2 files changed, 211 insertions(+), 142 deletions(-) diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index db4b45d1a98..e9e6d0a4bb2 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -21,6 +21,7 @@ #include "src/core/lib/iomgr/executor.h" #include +#include #include #include @@ -28,52 +29,41 @@ #include #include "src/core/lib/debug/stats.h" -#include "src/core/lib/gpr/spinlock.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gpr/useful.h" -#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/exec_ctx.h" #define MAX_DEPTH 2 -typedef struct { - gpr_mu mu; - gpr_cv cv; - grpc_closure_list elems; - size_t depth; - bool shutdown; - bool queued_long_job; - grpc_core::Thread thd; -} thread_state; - -static thread_state* g_thread_state; -static size_t g_max_threads; -static gpr_atm g_cur_threads; -static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER; +#define EXECUTOR_TRACE(format, ...) \ + if (executor_trace.enabled()) { \ + gpr_log(GPR_INFO, format, __VA_ARGS__); \ + } + +grpc_core::TraceFlag executor_trace(false, "executor"); GPR_TLS_DECL(g_this_thread_state); -grpc_core::TraceFlag executor_trace(false, "executor"); +GrpcExecutor::GrpcExecutor(const char* executor_name) : name(executor_name) { + adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER; + gpr_atm_no_barrier_store(&num_threads, 0); +} -static void executor_thread(void* arg); +void GrpcExecutor::Init() { SetThreading(true); } -static size_t run_closures(grpc_closure_list list) { +size_t GrpcExecutor::RunClosures(grpc_closure_list list) { size_t n = 0; grpc_closure* c = list.head; while (c != nullptr) { grpc_closure* next = c->next_data.next; grpc_error* error = c->error_data.error; - if (executor_trace.enabled()) { -#ifndef NDEBUG - gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c, - c->file_created, c->line_created); -#else - gpr_log(GPR_INFO, "EXECUTOR: run %p", c); -#endif - } #ifndef NDEBUG + EXECUTOR_TRACE("EXECUTOR: run %p [created by %s:%d]", c, c->file_created, + c->line_created); c->scheduled = false; +#else + EXECUTOR_TRACE("EXECUTOR: run %p", c); #endif c->cb(c->cb_arg, error); GRPC_ERROR_UNREF(error); @@ -85,63 +75,69 @@ static size_t run_closures(grpc_closure_list list) { return n; } -bool grpc_executor_is_threaded() { - return gpr_atm_no_barrier_load(&g_cur_threads) > 0; +bool GrpcExecutor::IsThreaded() { + return gpr_atm_no_barrier_load(&num_threads) > 0; } -void grpc_executor_set_threading(bool threading) { - gpr_atm cur_threads = gpr_atm_no_barrier_load(&g_cur_threads); +void GrpcExecutor::SetThreading(bool threading) { + gpr_atm curr_num_threads = gpr_atm_no_barrier_load(&num_threads); + if (threading) { - if (cur_threads > 0) return; - g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores()); - gpr_atm_no_barrier_store(&g_cur_threads, 1); + if (curr_num_threads > 0) return; + + // TODO (sreek): max_threads initialization can be moved into the + // constructor + max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores()); + gpr_atm_no_barrier_store(&num_threads, 1); gpr_tls_init(&g_this_thread_state); - g_thread_state = static_cast( - gpr_zalloc(sizeof(thread_state) * g_max_threads)); - for (size_t i = 0; i < g_max_threads; i++) { - gpr_mu_init(&g_thread_state[i].mu); - gpr_cv_init(&g_thread_state[i].cv); - g_thread_state[i].thd = grpc_core::Thread(); - g_thread_state[i].elems = GRPC_CLOSURE_LIST_INIT; + thd_state = static_cast( + gpr_zalloc(sizeof(thread_state) * max_threads)); + + for (size_t i = 0; i < max_threads; i++) { + gpr_mu_init(&thd_state[i].mu); + gpr_cv_init(&thd_state[i].cv); + thd_state[i].id = i; + thd_state[i].thd = grpc_core::Thread(); + thd_state[i].elems = GRPC_CLOSURE_LIST_INIT; } - g_thread_state[0].thd = - grpc_core::Thread("grpc_executor", executor_thread, &g_thread_state[0]); - g_thread_state[0].thd.Start(); + thd_state[0].thd = + grpc_core::Thread(name, &GrpcExecutor::ThreadMain, &thd_state[0]); + thd_state[0].thd.Start(); } else { - if (cur_threads == 0) return; - for (size_t i = 0; i < g_max_threads; i++) { - gpr_mu_lock(&g_thread_state[i].mu); - g_thread_state[i].shutdown = true; - gpr_cv_signal(&g_thread_state[i].cv); - gpr_mu_unlock(&g_thread_state[i].mu); + if (curr_num_threads == 0) return; + + for (size_t i = 0; i < max_threads; i++) { + gpr_mu_lock(&thd_state[i].mu); + thd_state[i].shutdown = true; + gpr_cv_signal(&thd_state[i].cv); + gpr_mu_unlock(&thd_state[i].mu); } - /* ensure no thread is adding a new thread... once this is past, then - no thread will try to add a new one either (since shutdown is true) */ - gpr_spinlock_lock(&g_adding_thread_lock); - gpr_spinlock_unlock(&g_adding_thread_lock); - for (gpr_atm i = 0; i < g_cur_threads; i++) { - g_thread_state[i].thd.Join(); + + /* Ensure no thread is adding a new thread. Once this is past, then no + * thread will try to add a new one either (since shutdown is true) */ + gpr_spinlock_lock(&adding_thread_lock); + gpr_spinlock_unlock(&adding_thread_lock); + + for (gpr_atm i = 0; i < num_threads; i++) { + thd_state[i].thd.Join(); } - gpr_atm_no_barrier_store(&g_cur_threads, 0); - for (size_t i = 0; i < g_max_threads; i++) { - gpr_mu_destroy(&g_thread_state[i].mu); - gpr_cv_destroy(&g_thread_state[i].cv); - run_closures(g_thread_state[i].elems); + + gpr_atm_no_barrier_store(&num_threads, 0); + for (size_t i = 0; i < max_threads; i++) { + gpr_mu_destroy(&thd_state[i].mu); + gpr_cv_destroy(&thd_state[i].cv); + RunClosures(thd_state[i].elems); } - gpr_free(g_thread_state); + + gpr_free(thd_state); gpr_tls_destroy(&g_this_thread_state); } } -void grpc_executor_init() { - gpr_atm_no_barrier_store(&g_cur_threads, 0); - grpc_executor_set_threading(true); -} - -void grpc_executor_shutdown() { grpc_executor_set_threading(false); } +void GrpcExecutor::Shutdown() { SetThreading(false); } -static void executor_thread(void* arg) { +void GrpcExecutor::ThreadMain(void* arg) { thread_state* ts = static_cast(arg); gpr_tls_set(&g_this_thread_state, (intptr_t)ts); @@ -149,153 +145,190 @@ static void executor_thread(void* arg) { size_t subtract_depth = 0; for (;;) { - if (executor_trace.enabled()) { - gpr_log(GPR_INFO, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")", - static_cast(ts - g_thread_state), subtract_depth); - } + EXECUTOR_TRACE("EXECUTOR[%ld]: step (sub_depth=%" PRIdPTR ")", ts->id, + subtract_depth); + gpr_mu_lock(&ts->mu); ts->depth -= subtract_depth; + // Wait for closures to be enqueued or for the executor to be shutdown while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { ts->queued_long_job = false; gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } + if (ts->shutdown) { - if (executor_trace.enabled()) { - gpr_log(GPR_INFO, "EXECUTOR[%d]: shutdown", - static_cast(ts - g_thread_state)); - } + EXECUTOR_TRACE("EXECUTOR[%ld]: shutdown", ts->id); gpr_mu_unlock(&ts->mu); break; } + GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(); - grpc_closure_list exec = ts->elems; + grpc_closure_list closures = ts->elems; ts->elems = GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); - if (executor_trace.enabled()) { - gpr_log(GPR_INFO, "EXECUTOR[%d]: execute", - static_cast(ts - g_thread_state)); - } + + EXECUTOR_TRACE("EXECUTOR[%ld]: execute", ts->id); grpc_core::ExecCtx::Get()->InvalidateNow(); - subtract_depth = run_closures(exec); + subtract_depth = RunClosures(closures); } } -static void executor_push(grpc_closure* closure, grpc_error* error, - bool is_short) { +void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, + bool is_short) { bool retry_push; if (is_short) { GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(); } else { GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS(); } + do { retry_push = false; size_t cur_thread_count = - static_cast(gpr_atm_no_barrier_load(&g_cur_threads)); + static_cast(gpr_atm_no_barrier_load(&num_threads)); + + // If the number of threads is zero(i.e either the executor is not threaded + // or already shutdown), then queue the closure on the exec context itself if (cur_thread_count == 0) { - if (executor_trace.enabled()) { #ifndef NDEBUG - gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p (created %s:%d) inline", - closure, closure->file_created, closure->line_created); + EXECUTOR_TRACE("EXECUTOR: schedule %p (created %s:%d) inline", closure, + closure->file_created, closure->line_created); #else - gpr_log(GPR_INFO, "EXECUTOR: schedule %p inline", closure); + EXECUTOR_TRACE("EXECUTOR: schedule %p inline", closure); #endif - } grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure, error); return; } + thread_state* ts = (thread_state*)gpr_tls_get(&g_this_thread_state); if (ts == nullptr) { - ts = &g_thread_state[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(), - cur_thread_count)]; + ts = &thd_state[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(), + cur_thread_count)]; } else { GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(); } + thread_state* orig_ts = ts; bool try_new_thread; for (;;) { - if (executor_trace.enabled()) { #ifndef NDEBUG - gpr_log( - GPR_DEBUG, - "EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread %d", - closure, is_short ? "short" : "long", closure->file_created, - closure->line_created, static_cast(ts - g_thread_state)); + EXECUTOR_TRACE( + "EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread %ld", + closure, is_short ? "short" : "long", closure->file_created, + closure->line_created, ts->id); #else - gpr_log(GPR_INFO, "EXECUTOR: try to schedule %p (%s) to thread %d", - closure, is_short ? "short" : "long", - (int)(ts - g_thread_state)); + EXECUTOR_TRACE("EXECUTOR: try to schedule %p (%s) to thread %ld", closure, + is_short ? "short" : "long", ts->id); #endif - } + gpr_mu_lock(&ts->mu); if (ts->queued_long_job) { // if there's a long job queued, we never queue anything else to this // queue (since long jobs can take 'infinite' time and we need to - // guarantee no starvation) - // ... spin through queues and try again + // guarantee no starvation). Spin through queues and try again gpr_mu_unlock(&ts->mu); - size_t idx = static_cast(ts - g_thread_state); - ts = &g_thread_state[(idx + 1) % cur_thread_count]; + size_t idx = ts->id; + ts = &thd_state[(idx + 1) % cur_thread_count]; if (ts == orig_ts) { + // We cycled through all the threads. Retry enqueue again (by creating + // a new thread) retry_push = true; + // TODO (sreek): What if the executor is shutdown OR if + // cur_thread_count is already equal to max_threads ? (currently - as + // of July 2018, we do not run in to this issue because there is only + // one instance of long job in gRPC. This has to be fixed soon) try_new_thread = true; break; } + continue; } + + // == Found the thread state (i.e thread) to enqueue this closure! == + + // Also, if this thread has been waiting for closures, wake it up. + // - If grpc_closure_list_empty() is true and the Executor is not + // shutdown, it means that the thread must be waiting in ThreadMain() + // - Note that gpr_cv_signal() won't immediately wakeup the thread. That + // happens after we release the mutex &ts->mu a few lines below if (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(); gpr_cv_signal(&ts->cv); } + grpc_closure_list_append(&ts->elems, closure, error); + + // If we already queued more than MAX_DEPTH number of closures on this + // thread, use this as a hint to create more threads ts->depth++; try_new_thread = ts->depth > MAX_DEPTH && - cur_thread_count < g_max_threads && !ts->shutdown; - if (!is_short) ts->queued_long_job = true; + cur_thread_count < max_threads && !ts->shutdown; + + ts->queued_long_job = !is_short; + gpr_mu_unlock(&ts->mu); break; } - if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) { + + if (try_new_thread && gpr_spinlock_trylock(&adding_thread_lock)) { cur_thread_count = - static_cast(gpr_atm_no_barrier_load(&g_cur_threads)); - if (cur_thread_count < g_max_threads) { - gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); - - g_thread_state[cur_thread_count].thd = - grpc_core::Thread("grpc_executor", executor_thread, - &g_thread_state[cur_thread_count]); - g_thread_state[cur_thread_count].thd.Start(); + static_cast(gpr_atm_no_barrier_load(&num_threads)); + if (cur_thread_count < max_threads) { + // Increment num_threads (Safe to do a no_barrier_store instead of a + // cas because we always increment num_threads under the + // 'adding_thread_lock') + gpr_atm_no_barrier_store(&num_threads, cur_thread_count + 1); + + thd_state[cur_thread_count].thd = grpc_core::Thread( + name, &GrpcExecutor::ThreadMain, &thd_state[cur_thread_count]); + thd_state[cur_thread_count].thd.Start(); } - gpr_spinlock_unlock(&g_adding_thread_lock); + gpr_spinlock_unlock(&adding_thread_lock); } + if (retry_push) { GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES(); } } while (retry_push); } -static void executor_push_short(grpc_closure* closure, grpc_error* error) { - executor_push(closure, error, true); +static GrpcExecutor g_global_executor("grpc-executor"); + +void enqueue_long(grpc_closure* closure, grpc_error* error) { + g_global_executor.Enqueue(closure, error, false); } -static void executor_push_long(grpc_closure* closure, grpc_error* error) { - executor_push(closure, error, false); +void enqueue_short(grpc_closure* closure, grpc_error* error) { + g_global_executor.Enqueue(closure, error, true); } -static const grpc_closure_scheduler_vtable executor_vtable_short = { - executor_push_short, executor_push_short, "executor"}; -static grpc_closure_scheduler executor_scheduler_short = { - &executor_vtable_short}; +// Short-Job executor scheduler +static const grpc_closure_scheduler_vtable global_executor_vtable_short = { + enqueue_short, enqueue_short, "executor-short"}; +static grpc_closure_scheduler global_scheduler_short = { + &global_executor_vtable_short}; + +// Long-job executor scheduler +static const grpc_closure_scheduler_vtable global_executor_vtable_long = { + enqueue_long, enqueue_long, "executor-long"}; +static grpc_closure_scheduler global_scheduler_long = { + &global_executor_vtable_long}; + +void grpc_executor_init() { g_global_executor.Init(); } -static const grpc_closure_scheduler_vtable executor_vtable_long = { - executor_push_long, executor_push_long, "executor"}; -static grpc_closure_scheduler executor_scheduler_long = {&executor_vtable_long}; +void grpc_executor_shutdown() { g_global_executor.Shutdown(); } + +bool grpc_executor_is_threaded() { return g_global_executor.IsThreaded(); } + +void grpc_executor_set_threading(bool enable) { + g_global_executor.SetThreading(enable); +} grpc_closure_scheduler* grpc_executor_scheduler( grpc_executor_job_length length) { - return length == GRPC_EXECUTOR_SHORT ? &executor_scheduler_short - : &executor_scheduler_long; + return length == GRPC_EXECUTOR_SHORT ? &global_scheduler_short + : &global_scheduler_long; } diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index 68d540af557..cafe47decb9 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -21,30 +21,66 @@ #include +#include "src/core/lib/gpr/spinlock.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/closure.h" +typedef struct { + gpr_mu mu; + size_t id; // For debugging purposes + gpr_cv cv; + grpc_closure_list elems; + size_t depth; // Number of closures in the closure list + bool shutdown; + bool queued_long_job; + grpc_core::Thread thd; +} thread_state; + typedef enum { GRPC_EXECUTOR_SHORT, GRPC_EXECUTOR_LONG } grpc_executor_job_length; -/** Initialize the global executor. - * - * This mechanism is meant to outsource work (grpc_closure instances) to a - * thread, for those cases where blocking isn't an option but there isn't a - * non-blocking solution available. */ +class GrpcExecutor { + public: + GrpcExecutor(const char* executor_name); + void Init(); + + /** Is the executor multi-threaded? */ + bool IsThreaded(); + + /* Enable/disable threading - must be called after Init and Shutdown() */ + void SetThreading(bool threading); + + /** Shutdown the executor, running all pending work as part of the call */ + void Shutdown(); + + /** Enqueue the closure onto the executor. is_short is true if the closure is + * a short job (i.e expected to not block and complete quickly) */ + void Enqueue(grpc_closure* closure, grpc_error* error, bool is_short); + + private: + static size_t RunClosures(grpc_closure_list list); + static void ThreadMain(void* arg); + + const char* name; + thread_state* thd_state; + size_t max_threads; + gpr_atm num_threads; + gpr_spinlock adding_thread_lock; +}; + +// == Global executor functions == + void grpc_executor_init(); -grpc_closure_scheduler* grpc_executor_scheduler(grpc_executor_job_length); +grpc_closure_scheduler* grpc_executor_scheduler( + grpc_executor_job_length length); -/** Shutdown the executor, running all pending work as part of the call */ void grpc_executor_shutdown(); -/** Is the executor multi-threaded? */ bool grpc_executor_is_threaded(); -/* enable/disable threading - must be called after grpc_executor_init and before - grpc_executor_shutdown */ void grpc_executor_set_threading(bool enable); #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */