From 7e9d52530d0145fe8202d2fd35621407745e91ab Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Mon, 9 Jul 2018 14:53:54 -0700 Subject: [PATCH 01/11] Move executor implementation into GrpcExecutor class --- src/core/lib/iomgr/executor.cc | 297 ++++++++++++++++++--------------- src/core/lib/iomgr/executor.h | 56 +++++-- 2 files changed, 211 insertions(+), 142 deletions(-) diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index db4b45d1a98..e9e6d0a4bb2 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -21,6 +21,7 @@ #include "src/core/lib/iomgr/executor.h" #include +#include #include #include @@ -28,52 +29,41 @@ #include #include "src/core/lib/debug/stats.h" -#include "src/core/lib/gpr/spinlock.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gpr/useful.h" -#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/exec_ctx.h" #define MAX_DEPTH 2 -typedef struct { - gpr_mu mu; - gpr_cv cv; - grpc_closure_list elems; - size_t depth; - bool shutdown; - bool queued_long_job; - grpc_core::Thread thd; -} thread_state; - -static thread_state* g_thread_state; -static size_t g_max_threads; -static gpr_atm g_cur_threads; -static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER; +#define EXECUTOR_TRACE(format, ...) \ + if (executor_trace.enabled()) { \ + gpr_log(GPR_INFO, format, __VA_ARGS__); \ + } + +grpc_core::TraceFlag executor_trace(false, "executor"); GPR_TLS_DECL(g_this_thread_state); -grpc_core::TraceFlag executor_trace(false, "executor"); +GrpcExecutor::GrpcExecutor(const char* executor_name) : name(executor_name) { + adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER; + gpr_atm_no_barrier_store(&num_threads, 0); +} -static void executor_thread(void* arg); +void GrpcExecutor::Init() { SetThreading(true); } -static size_t run_closures(grpc_closure_list list) { +size_t GrpcExecutor::RunClosures(grpc_closure_list list) { size_t n = 0; grpc_closure* c = list.head; while (c != nullptr) { grpc_closure* next = c->next_data.next; grpc_error* error = c->error_data.error; - if (executor_trace.enabled()) { -#ifndef NDEBUG - gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c, - c->file_created, c->line_created); -#else - gpr_log(GPR_INFO, "EXECUTOR: run %p", c); -#endif - } #ifndef NDEBUG + EXECUTOR_TRACE("EXECUTOR: run %p [created by %s:%d]", c, c->file_created, + c->line_created); c->scheduled = false; +#else + EXECUTOR_TRACE("EXECUTOR: run %p", c); #endif c->cb(c->cb_arg, error); GRPC_ERROR_UNREF(error); @@ -85,63 +75,69 @@ static size_t run_closures(grpc_closure_list list) { return n; } -bool grpc_executor_is_threaded() { - return gpr_atm_no_barrier_load(&g_cur_threads) > 0; +bool GrpcExecutor::IsThreaded() { + return gpr_atm_no_barrier_load(&num_threads) > 0; } -void grpc_executor_set_threading(bool threading) { - gpr_atm cur_threads = gpr_atm_no_barrier_load(&g_cur_threads); +void GrpcExecutor::SetThreading(bool threading) { + gpr_atm curr_num_threads = gpr_atm_no_barrier_load(&num_threads); + if (threading) { - if (cur_threads > 0) return; - g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores()); - gpr_atm_no_barrier_store(&g_cur_threads, 1); + if (curr_num_threads > 0) return; + + // TODO (sreek): max_threads initialization can be moved into the + // constructor + max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores()); + gpr_atm_no_barrier_store(&num_threads, 1); gpr_tls_init(&g_this_thread_state); - g_thread_state = static_cast( - gpr_zalloc(sizeof(thread_state) * g_max_threads)); - for (size_t i = 0; i < g_max_threads; i++) { - gpr_mu_init(&g_thread_state[i].mu); - gpr_cv_init(&g_thread_state[i].cv); - g_thread_state[i].thd = grpc_core::Thread(); - g_thread_state[i].elems = GRPC_CLOSURE_LIST_INIT; + thd_state = static_cast( + gpr_zalloc(sizeof(thread_state) * max_threads)); + + for (size_t i = 0; i < max_threads; i++) { + gpr_mu_init(&thd_state[i].mu); + gpr_cv_init(&thd_state[i].cv); + thd_state[i].id = i; + thd_state[i].thd = grpc_core::Thread(); + thd_state[i].elems = GRPC_CLOSURE_LIST_INIT; } - g_thread_state[0].thd = - grpc_core::Thread("grpc_executor", executor_thread, &g_thread_state[0]); - g_thread_state[0].thd.Start(); + thd_state[0].thd = + grpc_core::Thread(name, &GrpcExecutor::ThreadMain, &thd_state[0]); + thd_state[0].thd.Start(); } else { - if (cur_threads == 0) return; - for (size_t i = 0; i < g_max_threads; i++) { - gpr_mu_lock(&g_thread_state[i].mu); - g_thread_state[i].shutdown = true; - gpr_cv_signal(&g_thread_state[i].cv); - gpr_mu_unlock(&g_thread_state[i].mu); + if (curr_num_threads == 0) return; + + for (size_t i = 0; i < max_threads; i++) { + gpr_mu_lock(&thd_state[i].mu); + thd_state[i].shutdown = true; + gpr_cv_signal(&thd_state[i].cv); + gpr_mu_unlock(&thd_state[i].mu); } - /* ensure no thread is adding a new thread... once this is past, then - no thread will try to add a new one either (since shutdown is true) */ - gpr_spinlock_lock(&g_adding_thread_lock); - gpr_spinlock_unlock(&g_adding_thread_lock); - for (gpr_atm i = 0; i < g_cur_threads; i++) { - g_thread_state[i].thd.Join(); + + /* Ensure no thread is adding a new thread. Once this is past, then no + * thread will try to add a new one either (since shutdown is true) */ + gpr_spinlock_lock(&adding_thread_lock); + gpr_spinlock_unlock(&adding_thread_lock); + + for (gpr_atm i = 0; i < num_threads; i++) { + thd_state[i].thd.Join(); } - gpr_atm_no_barrier_store(&g_cur_threads, 0); - for (size_t i = 0; i < g_max_threads; i++) { - gpr_mu_destroy(&g_thread_state[i].mu); - gpr_cv_destroy(&g_thread_state[i].cv); - run_closures(g_thread_state[i].elems); + + gpr_atm_no_barrier_store(&num_threads, 0); + for (size_t i = 0; i < max_threads; i++) { + gpr_mu_destroy(&thd_state[i].mu); + gpr_cv_destroy(&thd_state[i].cv); + RunClosures(thd_state[i].elems); } - gpr_free(g_thread_state); + + gpr_free(thd_state); gpr_tls_destroy(&g_this_thread_state); } } -void grpc_executor_init() { - gpr_atm_no_barrier_store(&g_cur_threads, 0); - grpc_executor_set_threading(true); -} - -void grpc_executor_shutdown() { grpc_executor_set_threading(false); } +void GrpcExecutor::Shutdown() { SetThreading(false); } -static void executor_thread(void* arg) { +void GrpcExecutor::ThreadMain(void* arg) { thread_state* ts = static_cast(arg); gpr_tls_set(&g_this_thread_state, (intptr_t)ts); @@ -149,153 +145,190 @@ static void executor_thread(void* arg) { size_t subtract_depth = 0; for (;;) { - if (executor_trace.enabled()) { - gpr_log(GPR_INFO, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")", - static_cast(ts - g_thread_state), subtract_depth); - } + EXECUTOR_TRACE("EXECUTOR[%ld]: step (sub_depth=%" PRIdPTR ")", ts->id, + subtract_depth); + gpr_mu_lock(&ts->mu); ts->depth -= subtract_depth; + // Wait for closures to be enqueued or for the executor to be shutdown while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { ts->queued_long_job = false; gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } + if (ts->shutdown) { - if (executor_trace.enabled()) { - gpr_log(GPR_INFO, "EXECUTOR[%d]: shutdown", - static_cast(ts - g_thread_state)); - } + EXECUTOR_TRACE("EXECUTOR[%ld]: shutdown", ts->id); gpr_mu_unlock(&ts->mu); break; } + GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(); - grpc_closure_list exec = ts->elems; + grpc_closure_list closures = ts->elems; ts->elems = GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); - if (executor_trace.enabled()) { - gpr_log(GPR_INFO, "EXECUTOR[%d]: execute", - static_cast(ts - g_thread_state)); - } + + EXECUTOR_TRACE("EXECUTOR[%ld]: execute", ts->id); grpc_core::ExecCtx::Get()->InvalidateNow(); - subtract_depth = run_closures(exec); + subtract_depth = RunClosures(closures); } } -static void executor_push(grpc_closure* closure, grpc_error* error, - bool is_short) { +void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, + bool is_short) { bool retry_push; if (is_short) { GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(); } else { GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS(); } + do { retry_push = false; size_t cur_thread_count = - static_cast(gpr_atm_no_barrier_load(&g_cur_threads)); + static_cast(gpr_atm_no_barrier_load(&num_threads)); + + // If the number of threads is zero(i.e either the executor is not threaded + // or already shutdown), then queue the closure on the exec context itself if (cur_thread_count == 0) { - if (executor_trace.enabled()) { #ifndef NDEBUG - gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p (created %s:%d) inline", - closure, closure->file_created, closure->line_created); + EXECUTOR_TRACE("EXECUTOR: schedule %p (created %s:%d) inline", closure, + closure->file_created, closure->line_created); #else - gpr_log(GPR_INFO, "EXECUTOR: schedule %p inline", closure); + EXECUTOR_TRACE("EXECUTOR: schedule %p inline", closure); #endif - } grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure, error); return; } + thread_state* ts = (thread_state*)gpr_tls_get(&g_this_thread_state); if (ts == nullptr) { - ts = &g_thread_state[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(), - cur_thread_count)]; + ts = &thd_state[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(), + cur_thread_count)]; } else { GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(); } + thread_state* orig_ts = ts; bool try_new_thread; for (;;) { - if (executor_trace.enabled()) { #ifndef NDEBUG - gpr_log( - GPR_DEBUG, - "EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread %d", - closure, is_short ? "short" : "long", closure->file_created, - closure->line_created, static_cast(ts - g_thread_state)); + EXECUTOR_TRACE( + "EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread %ld", + closure, is_short ? "short" : "long", closure->file_created, + closure->line_created, ts->id); #else - gpr_log(GPR_INFO, "EXECUTOR: try to schedule %p (%s) to thread %d", - closure, is_short ? "short" : "long", - (int)(ts - g_thread_state)); + EXECUTOR_TRACE("EXECUTOR: try to schedule %p (%s) to thread %ld", closure, + is_short ? "short" : "long", ts->id); #endif - } + gpr_mu_lock(&ts->mu); if (ts->queued_long_job) { // if there's a long job queued, we never queue anything else to this // queue (since long jobs can take 'infinite' time and we need to - // guarantee no starvation) - // ... spin through queues and try again + // guarantee no starvation). Spin through queues and try again gpr_mu_unlock(&ts->mu); - size_t idx = static_cast(ts - g_thread_state); - ts = &g_thread_state[(idx + 1) % cur_thread_count]; + size_t idx = ts->id; + ts = &thd_state[(idx + 1) % cur_thread_count]; if (ts == orig_ts) { + // We cycled through all the threads. Retry enqueue again (by creating + // a new thread) retry_push = true; + // TODO (sreek): What if the executor is shutdown OR if + // cur_thread_count is already equal to max_threads ? (currently - as + // of July 2018, we do not run in to this issue because there is only + // one instance of long job in gRPC. This has to be fixed soon) try_new_thread = true; break; } + continue; } + + // == Found the thread state (i.e thread) to enqueue this closure! == + + // Also, if this thread has been waiting for closures, wake it up. + // - If grpc_closure_list_empty() is true and the Executor is not + // shutdown, it means that the thread must be waiting in ThreadMain() + // - Note that gpr_cv_signal() won't immediately wakeup the thread. That + // happens after we release the mutex &ts->mu a few lines below if (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(); gpr_cv_signal(&ts->cv); } + grpc_closure_list_append(&ts->elems, closure, error); + + // If we already queued more than MAX_DEPTH number of closures on this + // thread, use this as a hint to create more threads ts->depth++; try_new_thread = ts->depth > MAX_DEPTH && - cur_thread_count < g_max_threads && !ts->shutdown; - if (!is_short) ts->queued_long_job = true; + cur_thread_count < max_threads && !ts->shutdown; + + ts->queued_long_job = !is_short; + gpr_mu_unlock(&ts->mu); break; } - if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) { + + if (try_new_thread && gpr_spinlock_trylock(&adding_thread_lock)) { cur_thread_count = - static_cast(gpr_atm_no_barrier_load(&g_cur_threads)); - if (cur_thread_count < g_max_threads) { - gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); - - g_thread_state[cur_thread_count].thd = - grpc_core::Thread("grpc_executor", executor_thread, - &g_thread_state[cur_thread_count]); - g_thread_state[cur_thread_count].thd.Start(); + static_cast(gpr_atm_no_barrier_load(&num_threads)); + if (cur_thread_count < max_threads) { + // Increment num_threads (Safe to do a no_barrier_store instead of a + // cas because we always increment num_threads under the + // 'adding_thread_lock') + gpr_atm_no_barrier_store(&num_threads, cur_thread_count + 1); + + thd_state[cur_thread_count].thd = grpc_core::Thread( + name, &GrpcExecutor::ThreadMain, &thd_state[cur_thread_count]); + thd_state[cur_thread_count].thd.Start(); } - gpr_spinlock_unlock(&g_adding_thread_lock); + gpr_spinlock_unlock(&adding_thread_lock); } + if (retry_push) { GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES(); } } while (retry_push); } -static void executor_push_short(grpc_closure* closure, grpc_error* error) { - executor_push(closure, error, true); +static GrpcExecutor g_global_executor("grpc-executor"); + +void enqueue_long(grpc_closure* closure, grpc_error* error) { + g_global_executor.Enqueue(closure, error, false); } -static void executor_push_long(grpc_closure* closure, grpc_error* error) { - executor_push(closure, error, false); +void enqueue_short(grpc_closure* closure, grpc_error* error) { + g_global_executor.Enqueue(closure, error, true); } -static const grpc_closure_scheduler_vtable executor_vtable_short = { - executor_push_short, executor_push_short, "executor"}; -static grpc_closure_scheduler executor_scheduler_short = { - &executor_vtable_short}; +// Short-Job executor scheduler +static const grpc_closure_scheduler_vtable global_executor_vtable_short = { + enqueue_short, enqueue_short, "executor-short"}; +static grpc_closure_scheduler global_scheduler_short = { + &global_executor_vtable_short}; + +// Long-job executor scheduler +static const grpc_closure_scheduler_vtable global_executor_vtable_long = { + enqueue_long, enqueue_long, "executor-long"}; +static grpc_closure_scheduler global_scheduler_long = { + &global_executor_vtable_long}; + +void grpc_executor_init() { g_global_executor.Init(); } -static const grpc_closure_scheduler_vtable executor_vtable_long = { - executor_push_long, executor_push_long, "executor"}; -static grpc_closure_scheduler executor_scheduler_long = {&executor_vtable_long}; +void grpc_executor_shutdown() { g_global_executor.Shutdown(); } + +bool grpc_executor_is_threaded() { return g_global_executor.IsThreaded(); } + +void grpc_executor_set_threading(bool enable) { + g_global_executor.SetThreading(enable); +} grpc_closure_scheduler* grpc_executor_scheduler( grpc_executor_job_length length) { - return length == GRPC_EXECUTOR_SHORT ? &executor_scheduler_short - : &executor_scheduler_long; + return length == GRPC_EXECUTOR_SHORT ? &global_scheduler_short + : &global_scheduler_long; } diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index 68d540af557..cafe47decb9 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -21,30 +21,66 @@ #include +#include "src/core/lib/gpr/spinlock.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/closure.h" +typedef struct { + gpr_mu mu; + size_t id; // For debugging purposes + gpr_cv cv; + grpc_closure_list elems; + size_t depth; // Number of closures in the closure list + bool shutdown; + bool queued_long_job; + grpc_core::Thread thd; +} thread_state; + typedef enum { GRPC_EXECUTOR_SHORT, GRPC_EXECUTOR_LONG } grpc_executor_job_length; -/** Initialize the global executor. - * - * This mechanism is meant to outsource work (grpc_closure instances) to a - * thread, for those cases where blocking isn't an option but there isn't a - * non-blocking solution available. */ +class GrpcExecutor { + public: + GrpcExecutor(const char* executor_name); + void Init(); + + /** Is the executor multi-threaded? */ + bool IsThreaded(); + + /* Enable/disable threading - must be called after Init and Shutdown() */ + void SetThreading(bool threading); + + /** Shutdown the executor, running all pending work as part of the call */ + void Shutdown(); + + /** Enqueue the closure onto the executor. is_short is true if the closure is + * a short job (i.e expected to not block and complete quickly) */ + void Enqueue(grpc_closure* closure, grpc_error* error, bool is_short); + + private: + static size_t RunClosures(grpc_closure_list list); + static void ThreadMain(void* arg); + + const char* name; + thread_state* thd_state; + size_t max_threads; + gpr_atm num_threads; + gpr_spinlock adding_thread_lock; +}; + +// == Global executor functions == + void grpc_executor_init(); -grpc_closure_scheduler* grpc_executor_scheduler(grpc_executor_job_length); +grpc_closure_scheduler* grpc_executor_scheduler( + grpc_executor_job_length length); -/** Shutdown the executor, running all pending work as part of the call */ void grpc_executor_shutdown(); -/** Is the executor multi-threaded? */ bool grpc_executor_is_threaded(); -/* enable/disable threading - must be called after grpc_executor_init and before - grpc_executor_shutdown */ void grpc_executor_set_threading(bool enable); #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */ From 83d0bfa3dbc6a15abb3821f6b16a0b1535e4c880 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 10 Jul 2018 11:29:43 -0700 Subject: [PATCH 02/11] address code review comemnts --- src/core/lib/iomgr/executor.cc | 144 ++++++++++++++++----------------- src/core/lib/iomgr/executor.h | 21 +++-- 2 files changed, 81 insertions(+), 84 deletions(-) diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index e9e6d0a4bb2..f72e3944230 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -35,18 +35,19 @@ #define MAX_DEPTH 2 -#define EXECUTOR_TRACE(format, ...) \ - if (executor_trace.enabled()) { \ - gpr_log(GPR_INFO, format, __VA_ARGS__); \ +#define EXECUTOR_TRACE(format, ...) \ + if (executor_trace.enabled()) { \ + gpr_log(GPR_INFO, "EXECUTOR " format, __VA_ARGS__); \ } grpc_core::TraceFlag executor_trace(false, "executor"); GPR_TLS_DECL(g_this_thread_state); -GrpcExecutor::GrpcExecutor(const char* executor_name) : name(executor_name) { - adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER; - gpr_atm_no_barrier_store(&num_threads, 0); +GrpcExecutor::GrpcExecutor(const char* executor_name) : name_(executor_name) { + adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER; + gpr_atm_no_barrier_store(&num_threads_, 0); + max_threads_ = GPR_MAX(1, 2 * gpr_cpu_num_cores()); } void GrpcExecutor::Init() { SetThreading(true); } @@ -59,11 +60,11 @@ size_t GrpcExecutor::RunClosures(grpc_closure_list list) { grpc_closure* next = c->next_data.next; grpc_error* error = c->error_data.error; #ifndef NDEBUG - EXECUTOR_TRACE("EXECUTOR: run %p [created by %s:%d]", c, c->file_created, + EXECUTOR_TRACE("run %p [created by %s:%d]", c, c->file_created, c->line_created); c->scheduled = false; #else - EXECUTOR_TRACE("EXECUTOR: run %p", c); + EXECUTOR_TRACE("run %p", c); #endif c->cb(c->cb_arg, error); GRPC_ERROR_UNREF(error); @@ -75,62 +76,60 @@ size_t GrpcExecutor::RunClosures(grpc_closure_list list) { return n; } -bool GrpcExecutor::IsThreaded() { - return gpr_atm_no_barrier_load(&num_threads) > 0; +bool GrpcExecutor::IsThreaded() const { + return gpr_atm_no_barrier_load(&num_threads_) > 0; } void GrpcExecutor::SetThreading(bool threading) { - gpr_atm curr_num_threads = gpr_atm_no_barrier_load(&num_threads); + const gpr_atm curr_num_threads = gpr_atm_no_barrier_load(&num_threads_); if (threading) { if (curr_num_threads > 0) return; - // TODO (sreek): max_threads initialization can be moved into the - // constructor - max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores()); - gpr_atm_no_barrier_store(&num_threads, 1); - gpr_tls_init(&g_this_thread_state); - thd_state = static_cast( - gpr_zalloc(sizeof(thread_state) * max_threads)); - - for (size_t i = 0; i < max_threads; i++) { - gpr_mu_init(&thd_state[i].mu); - gpr_cv_init(&thd_state[i].cv); - thd_state[i].id = i; - thd_state[i].thd = grpc_core::Thread(); - thd_state[i].elems = GRPC_CLOSURE_LIST_INIT; + GPR_ASSERT(num_threads_ == 0); + gpr_atm_no_barrier_store(&num_threads_, 1); + gpr_tls_init(&g_this_thread_state_); + thd_state_ = static_cast( + gpr_zalloc(sizeof(ThreadState) * max_threads_)); + + for (size_t i = 0; i < max_threads_; i++) { + gpr_mu_init(&thd_state_[i].mu); + gpr_cv_init(&thd_state_[i].cv); + thd_state_[i].id = i; + thd_state_[i].thd = grpc_core::Thread(); + thd_state_[i].elems = GRPC_CLOSURE_LIST_INIT; } - thd_state[0].thd = - grpc_core::Thread(name, &GrpcExecutor::ThreadMain, &thd_state[0]); - thd_state[0].thd.Start(); + thd_state_[0].thd = + grpc_core::Thread(name_, &GrpcExecutor::ThreadMain, &thd_state_[0]); + thd_state_[0].thd.Start(); } else { if (curr_num_threads == 0) return; - for (size_t i = 0; i < max_threads; i++) { - gpr_mu_lock(&thd_state[i].mu); - thd_state[i].shutdown = true; - gpr_cv_signal(&thd_state[i].cv); - gpr_mu_unlock(&thd_state[i].mu); + for (size_t i = 0; i < max_threads_; i++) { + gpr_mu_lock(&thd_state_[i].mu); + thd_state_[i].shutdown = true; + gpr_cv_signal(&thd_state_[i].cv); + gpr_mu_unlock(&thd_state_[i].mu); } /* Ensure no thread is adding a new thread. Once this is past, then no * thread will try to add a new one either (since shutdown is true) */ - gpr_spinlock_lock(&adding_thread_lock); - gpr_spinlock_unlock(&adding_thread_lock); + gpr_spinlock_lock(&adding_thread_lock_); + gpr_spinlock_unlock(&adding_thread_lock_); - for (gpr_atm i = 0; i < num_threads; i++) { - thd_state[i].thd.Join(); + for (gpr_atm i = 0; i < num_threads_; i++) { + thd_state_[i].thd.Join(); } - gpr_atm_no_barrier_store(&num_threads, 0); - for (size_t i = 0; i < max_threads; i++) { - gpr_mu_destroy(&thd_state[i].mu); - gpr_cv_destroy(&thd_state[i].cv); - RunClosures(thd_state[i].elems); + gpr_atm_no_barrier_store(&num_threads_, 0); + for (size_t i = 0; i < max_threads_; i++) { + gpr_mu_destroy(&thd_state_[i].mu); + gpr_cv_destroy(&thd_state_[i].cv); + RunClosures(thd_state_[i].elems); } - gpr_free(thd_state); + gpr_free(thd_state_); gpr_tls_destroy(&g_this_thread_state); } } @@ -138,14 +137,14 @@ void GrpcExecutor::SetThreading(bool threading) { void GrpcExecutor::Shutdown() { SetThreading(false); } void GrpcExecutor::ThreadMain(void* arg) { - thread_state* ts = static_cast(arg); + ThreadState* ts = static_cast(arg); gpr_tls_set(&g_this_thread_state, (intptr_t)ts); grpc_core::ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD); size_t subtract_depth = 0; for (;;) { - EXECUTOR_TRACE("EXECUTOR[%ld]: step (sub_depth=%" PRIdPTR ")", ts->id, + EXECUTOR_TRACE("[%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")", ts->id, subtract_depth); gpr_mu_lock(&ts->mu); @@ -157,7 +156,7 @@ void GrpcExecutor::ThreadMain(void* arg) { } if (ts->shutdown) { - EXECUTOR_TRACE("EXECUTOR[%ld]: shutdown", ts->id); + EXECUTOR_TRACE("[%" PRIdPTR "]: shutdown", ts->id); gpr_mu_unlock(&ts->mu); break; } @@ -167,7 +166,7 @@ void GrpcExecutor::ThreadMain(void* arg) { ts->elems = GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); - EXECUTOR_TRACE("EXECUTOR[%ld]: execute", ts->id); + EXECUTOR_TRACE("[%" PRIdPTR "]: execute", ts->id); grpc_core::ExecCtx::Get()->InvalidateNow(); subtract_depth = RunClosures(closures); @@ -186,41 +185,42 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, do { retry_push = false; size_t cur_thread_count = - static_cast(gpr_atm_no_barrier_load(&num_threads)); + static_cast(gpr_atm_no_barrier_load(&num_threads_)); // If the number of threads is zero(i.e either the executor is not threaded // or already shutdown), then queue the closure on the exec context itself if (cur_thread_count == 0) { #ifndef NDEBUG - EXECUTOR_TRACE("EXECUTOR: schedule %p (created %s:%d) inline", closure, + EXECUTOR_TRACE("schedule %p (created %s:%d) inline", closure, closure->file_created, closure->line_created); #else - EXECUTOR_TRACE("EXECUTOR: schedule %p inline", closure); + EXECUTOR_TRACE("schedule %p inline", closure); #endif grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure, error); return; } - thread_state* ts = (thread_state*)gpr_tls_get(&g_this_thread_state); + ThreadState* ts = (ThreadState*)gpr_tls_get(&g_this_thread_state); if (ts == nullptr) { - ts = &thd_state[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(), - cur_thread_count)]; + ts = &thd_state_[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(), + cur_thread_count)]; } else { GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(); } - thread_state* orig_ts = ts; + ThreadState* orig_ts = ts; - bool try_new_thread; + bool try_new_thread = false; for (;;) { #ifndef NDEBUG EXECUTOR_TRACE( - "EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread %ld", + "try to schedule %p (%s) (created %s:%d) to thread " + "%" PRIdPTR, closure, is_short ? "short" : "long", closure->file_created, closure->line_created, ts->id); #else - EXECUTOR_TRACE("EXECUTOR: try to schedule %p (%s) to thread %ld", closure, + EXECUTOR_TRACE("try to schedule %p (%s) to thread %" PRIdPTR, closure, is_short ? "short" : "long", ts->id); #endif @@ -231,7 +231,7 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, // guarantee no starvation). Spin through queues and try again gpr_mu_unlock(&ts->mu); size_t idx = ts->id; - ts = &thd_state[(idx + 1) % cur_thread_count]; + ts = &thd_state_[(idx + 1) % cur_thread_count]; if (ts == orig_ts) { // We cycled through all the threads. Retry enqueue again (by creating // a new thread) @@ -265,7 +265,7 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, // thread, use this as a hint to create more threads ts->depth++; try_new_thread = ts->depth > MAX_DEPTH && - cur_thread_count < max_threads && !ts->shutdown; + cur_thread_count < max_threads_ && !ts->shutdown; ts->queued_long_job = !is_short; @@ -273,20 +273,20 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, break; } - if (try_new_thread && gpr_spinlock_trylock(&adding_thread_lock)) { + if (try_new_thread && gpr_spinlock_trylock(&adding_thread_lock_)) { cur_thread_count = - static_cast(gpr_atm_no_barrier_load(&num_threads)); - if (cur_thread_count < max_threads) { + static_cast(gpr_atm_no_barrier_load(&num_threads_)); + if (cur_thread_count < max_threads_) { // Increment num_threads (Safe to do a no_barrier_store instead of a // cas because we always increment num_threads under the // 'adding_thread_lock') - gpr_atm_no_barrier_store(&num_threads, cur_thread_count + 1); + gpr_atm_no_barrier_store(&num_threads_, cur_thread_count + 1); - thd_state[cur_thread_count].thd = grpc_core::Thread( - name, &GrpcExecutor::ThreadMain, &thd_state[cur_thread_count]); - thd_state[cur_thread_count].thd.Start(); + thd_state_[cur_thread_count].thd = grpc_core::Thread( + name_, &GrpcExecutor::ThreadMain, &thd_state_[cur_thread_count]); + thd_state_[cur_thread_count].thd.Start(); } - gpr_spinlock_unlock(&adding_thread_lock); + gpr_spinlock_unlock(&adding_thread_lock_); } if (retry_push) { @@ -298,11 +298,11 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, static GrpcExecutor g_global_executor("grpc-executor"); void enqueue_long(grpc_closure* closure, grpc_error* error) { - g_global_executor.Enqueue(closure, error, false); + g_global_executor.Enqueue(closure, error, false /* is_short */); } void enqueue_short(grpc_closure* closure, grpc_error* error) { - g_global_executor.Enqueue(closure, error, true); + g_global_executor.Enqueue(closure, error, true /* is_short */); } // Short-Job executor scheduler @@ -328,7 +328,7 @@ void grpc_executor_set_threading(bool enable) { } grpc_closure_scheduler* grpc_executor_scheduler( - grpc_executor_job_length length) { - return length == GRPC_EXECUTOR_SHORT ? &global_scheduler_short - : &global_scheduler_long; + grpc_executor_job_type job_type) { + return job_type == GRPC_EXECUTOR_SHORT ? &global_scheduler_short + : &global_scheduler_long; } diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index cafe47decb9..b6515605cb1 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -34,12 +34,9 @@ typedef struct { bool shutdown; bool queued_long_job; grpc_core::Thread thd; -} thread_state; +} ThreadState; -typedef enum { - GRPC_EXECUTOR_SHORT, - GRPC_EXECUTOR_LONG -} grpc_executor_job_length; +typedef enum { GRPC_EXECUTOR_SHORT, GRPC_EXECUTOR_LONG } grpc_executor_job_type; class GrpcExecutor { public: @@ -47,7 +44,7 @@ class GrpcExecutor { void Init(); /** Is the executor multi-threaded? */ - bool IsThreaded(); + bool IsThreaded() const; /* Enable/disable threading - must be called after Init and Shutdown() */ void SetThreading(bool threading); @@ -63,11 +60,11 @@ class GrpcExecutor { static size_t RunClosures(grpc_closure_list list); static void ThreadMain(void* arg); - const char* name; - thread_state* thd_state; - size_t max_threads; - gpr_atm num_threads; - gpr_spinlock adding_thread_lock; + const char* name_; + ThreadState* thd_state_; + size_t max_threads_; + gpr_atm num_threads_; + gpr_spinlock adding_thread_lock_; }; // == Global executor functions == @@ -75,7 +72,7 @@ class GrpcExecutor { void grpc_executor_init(); grpc_closure_scheduler* grpc_executor_scheduler( - grpc_executor_job_length length); + grpc_executor_job_type job_type); void grpc_executor_shutdown(); From 8cc3a003a1094215fcccf8e21cd5b72611b51fe1 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 10 Jul 2018 13:32:35 -0700 Subject: [PATCH 03/11] Fix typo --- src/core/lib/iomgr/executor.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index f72e3944230..a7fe6c7fae7 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -88,7 +88,7 @@ void GrpcExecutor::SetThreading(bool threading) { GPR_ASSERT(num_threads_ == 0); gpr_atm_no_barrier_store(&num_threads_, 1); - gpr_tls_init(&g_this_thread_state_); + gpr_tls_init(&g_this_thread_state); thd_state_ = static_cast( gpr_zalloc(sizeof(ThreadState) * max_threads_)); From 02872df249e72e0b9ea783d6ffc566a6fc539f04 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 10 Jul 2018 14:21:51 -0700 Subject: [PATCH 04/11] more code review comments --- src/core/lib/iomgr/executor.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index a7fe6c7fae7..b01fea0c2d7 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -103,7 +103,7 @@ void GrpcExecutor::SetThreading(bool threading) { thd_state_[0].thd = grpc_core::Thread(name_, &GrpcExecutor::ThreadMain, &thd_state_[0]); thd_state_[0].thd.Start(); - } else { + } else { // !threading if (curr_num_threads == 0) return; for (size_t i = 0; i < max_threads_; i++) { @@ -120,6 +120,7 @@ void GrpcExecutor::SetThreading(bool threading) { for (gpr_atm i = 0; i < num_threads_; i++) { thd_state_[i].thd.Join(); + EXECUTOR_TRACE(" Thread %" PRIdPTR " joined", i); } gpr_atm_no_barrier_store(&num_threads_, 0); @@ -138,7 +139,7 @@ void GrpcExecutor::Shutdown() { SetThreading(false); } void GrpcExecutor::ThreadMain(void* arg) { ThreadState* ts = static_cast(arg); - gpr_tls_set(&g_this_thread_state, (intptr_t)ts); + gpr_tls_set(&g_this_thread_state, reinterpret_cast(ts)); grpc_core::ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD); From 1e69b7c0a52f1b7dbd5b3a1d802f66e83005db22 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 10 Jul 2018 19:30:09 -0700 Subject: [PATCH 05/11] Rename grpc_executor_job_type to C++ style name --- src/core/lib/iomgr/executor.cc | 3 +-- src/core/lib/iomgr/executor.h | 5 ++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index b01fea0c2d7..c81882069e2 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -328,8 +328,7 @@ void grpc_executor_set_threading(bool enable) { g_global_executor.SetThreading(enable); } -grpc_closure_scheduler* grpc_executor_scheduler( - grpc_executor_job_type job_type) { +grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) { return job_type == GRPC_EXECUTOR_SHORT ? &global_scheduler_short : &global_scheduler_long; } diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index b6515605cb1..ceeca2fc14f 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -36,7 +36,7 @@ typedef struct { grpc_core::Thread thd; } ThreadState; -typedef enum { GRPC_EXECUTOR_SHORT, GRPC_EXECUTOR_LONG } grpc_executor_job_type; +typedef enum { GRPC_EXECUTOR_SHORT, GRPC_EXECUTOR_LONG } GrpcExecutorJobType; class GrpcExecutor { public: @@ -71,8 +71,7 @@ class GrpcExecutor { void grpc_executor_init(); -grpc_closure_scheduler* grpc_executor_scheduler( - grpc_executor_job_type job_type); +grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type); void grpc_executor_shutdown(); From 7b8a6b68510d38c64c4e13b40a668608a627ff7b Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Wed, 11 Jul 2018 11:51:46 -0700 Subject: [PATCH 06/11] More PR comments; useful tracing --- src/core/lib/iomgr/executor.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index c81882069e2..fc1e044c2e9 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -81,7 +81,7 @@ bool GrpcExecutor::IsThreaded() const { } void GrpcExecutor::SetThreading(bool threading) { - const gpr_atm curr_num_threads = gpr_atm_no_barrier_load(&num_threads_); + gpr_atm curr_num_threads = gpr_atm_no_barrier_load(&num_threads_); if (threading) { if (curr_num_threads > 0) return; @@ -118,9 +118,11 @@ void GrpcExecutor::SetThreading(bool threading) { gpr_spinlock_lock(&adding_thread_lock_); gpr_spinlock_unlock(&adding_thread_lock_); - for (gpr_atm i = 0; i < num_threads_; i++) { + curr_num_threads = gpr_atm_no_barrier_load(&num_threads_); + for (gpr_atm i = 0; i < curr_num_threads; i++) { thd_state_[i].thd.Join(); - EXECUTOR_TRACE(" Thread %" PRIdPTR " joined", i); + EXECUTOR_TRACE(" Thread %" PRIdPTR " of %" PRIdPTR " joined", i, + curr_num_threads); } gpr_atm_no_barrier_store(&num_threads_, 0); From a0a81a13869446520261d095f77d942f58602e07 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Wed, 11 Jul 2018 13:08:17 -0700 Subject: [PATCH 07/11] remove unnecessary header --- src/core/lib/iomgr/executor.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index fc1e044c2e9..f85b3ad40d4 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -21,7 +21,6 @@ #include "src/core/lib/iomgr/executor.h" #include -#include #include #include From f0ed1a287854dddd2aa029e05129b58079a48228 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Wed, 11 Jul 2018 17:16:03 -0700 Subject: [PATCH 08/11] Create on first use pattern for initializing global executor --- src/core/lib/iomgr/executor.cc | 27 ++++++++++++++++++++------- src/core/lib/iomgr/executor.h | 1 + 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index f85b3ad40d4..9d9ee01b58d 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -297,14 +297,27 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, } while (retry_push); } -static GrpcExecutor g_global_executor("grpc-executor"); +/* Using Create-on-first-use pattern here because: + * global_executor has to be initialized by the time grpc_executor_init() is + * called. + * + * Since grpc_executor_init() may sometimes be called as a part of other + * static variables being initialized (for example, see the microbenchmarks + * helper code that calls grpc::internal::GrpcLibrary::Init(), which + * eventually ends up calling grpc_executor_init()), we need to use a + * create-on-first-use pattern here. + * */ +static GrpcExecutor* get_global_executor() { + static GrpcExecutor* global_executor = new GrpcExecutor("global-executor"); + return global_executor; +} void enqueue_long(grpc_closure* closure, grpc_error* error) { - g_global_executor.Enqueue(closure, error, false /* is_short */); + get_global_executor()->Enqueue(closure, error, false /* is_short */); } void enqueue_short(grpc_closure* closure, grpc_error* error) { - g_global_executor.Enqueue(closure, error, true /* is_short */); + get_global_executor()->Enqueue(closure, error, true /* is_short */); } // Short-Job executor scheduler @@ -319,14 +332,14 @@ static const grpc_closure_scheduler_vtable global_executor_vtable_long = { static grpc_closure_scheduler global_scheduler_long = { &global_executor_vtable_long}; -void grpc_executor_init() { g_global_executor.Init(); } +void grpc_executor_init() { get_global_executor()->Init(); } -void grpc_executor_shutdown() { g_global_executor.Shutdown(); } +void grpc_executor_shutdown() { get_global_executor()->Shutdown(); } -bool grpc_executor_is_threaded() { return g_global_executor.IsThreaded(); } +bool grpc_executor_is_threaded() { return get_global_executor()->IsThreaded(); } void grpc_executor_set_threading(bool enable) { - g_global_executor.SetThreading(enable); + get_global_executor()->SetThreading(enable); } grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) { diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index ceeca2fc14f..395fc528637 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -41,6 +41,7 @@ typedef enum { GRPC_EXECUTOR_SHORT, GRPC_EXECUTOR_LONG } GrpcExecutorJobType; class GrpcExecutor { public: GrpcExecutor(const char* executor_name); + void Init(); /** Is the executor multi-threaded? */ From 8aefdd3653818e7dd9e12249dd9af9c048b3de2a Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Wed, 11 Jul 2018 17:44:08 -0700 Subject: [PATCH 09/11] use a global executor pointer that is initialized by grpc_executor_init() --- src/core/lib/iomgr/executor.cc | 35 +++++++++++++++------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index 9d9ee01b58d..2aadb035bf4 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -297,27 +297,14 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, } while (retry_push); } -/* Using Create-on-first-use pattern here because: - * global_executor has to be initialized by the time grpc_executor_init() is - * called. - * - * Since grpc_executor_init() may sometimes be called as a part of other - * static variables being initialized (for example, see the microbenchmarks - * helper code that calls grpc::internal::GrpcLibrary::Init(), which - * eventually ends up calling grpc_executor_init()), we need to use a - * create-on-first-use pattern here. - * */ -static GrpcExecutor* get_global_executor() { - static GrpcExecutor* global_executor = new GrpcExecutor("global-executor"); - return global_executor; -} +static GrpcExecutor* global_executor; void enqueue_long(grpc_closure* closure, grpc_error* error) { - get_global_executor()->Enqueue(closure, error, false /* is_short */); + global_executor->Enqueue(closure, error, false /* is_short */); } void enqueue_short(grpc_closure* closure, grpc_error* error) { - get_global_executor()->Enqueue(closure, error, true /* is_short */); + global_executor->Enqueue(closure, error, true /* is_short */); } // Short-Job executor scheduler @@ -332,14 +319,22 @@ static const grpc_closure_scheduler_vtable global_executor_vtable_long = { static grpc_closure_scheduler global_scheduler_long = { &global_executor_vtable_long}; -void grpc_executor_init() { get_global_executor()->Init(); } +void grpc_executor_init() { + GPR_ASSERT(global_executor == nullptr); + global_executor = new GrpcExecutor("global-executor"); + global_executor->Init(); +} -void grpc_executor_shutdown() { get_global_executor()->Shutdown(); } +void grpc_executor_shutdown() { + global_executor->Shutdown(); + delete global_executor; + global_executor = nullptr; +} -bool grpc_executor_is_threaded() { return get_global_executor()->IsThreaded(); } +bool grpc_executor_is_threaded() { return global_executor->IsThreaded(); } void grpc_executor_set_threading(bool enable) { - get_global_executor()->SetThreading(enable); + global_executor->SetThreading(enable); } grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) { From 37e4990be3fd97f7776d6bed75a54055cae71403 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Wed, 11 Jul 2018 18:46:29 -0700 Subject: [PATCH 10/11] replace new/delete with grpc_core::New and grpc_core::Delete --- src/core/lib/iomgr/executor.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index 2aadb035bf4..aeba050a991 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -30,6 +30,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/iomgr/exec_ctx.h" #define MAX_DEPTH 2 @@ -321,13 +322,13 @@ static grpc_closure_scheduler global_scheduler_long = { void grpc_executor_init() { GPR_ASSERT(global_executor == nullptr); - global_executor = new GrpcExecutor("global-executor"); + global_executor = grpc_core::New("global-executor"); global_executor->Init(); } void grpc_executor_shutdown() { global_executor->Shutdown(); - delete global_executor; + grpc_core::Delete(global_executor); global_executor = nullptr; } From 69e5dfff8693d0eb40b49301a443a7d4c9022be4 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Thu, 12 Jul 2018 10:58:04 -0700 Subject: [PATCH 11/11] Handle multiple invocations of grpc_executor_shutdown() --- src/core/lib/iomgr/executor.cc | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index aeba050a991..1ad13b831dd 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -320,13 +320,26 @@ static const grpc_closure_scheduler_vtable global_executor_vtable_long = { static grpc_closure_scheduler global_scheduler_long = { &global_executor_vtable_long}; +// grpc_executor_init() and grpc_executor_shutdown() functions are called in the +// the grpc_init() and grpc_shutdown() code paths which are protected by a +// global mutex. So it is okay to assume that these functions are thread-safe void grpc_executor_init() { - GPR_ASSERT(global_executor == nullptr); + if (global_executor != nullptr) { + // grpc_executor_init() already called once (and grpc_executor_shutdown() + // wasn't called) + return; + } + global_executor = grpc_core::New("global-executor"); global_executor->Init(); } void grpc_executor_shutdown() { + // Shutdown already called + if (global_executor == nullptr) { + return; + } + global_executor->Shutdown(); grpc_core::Delete(global_executor); global_executor = nullptr;