Move executor implementation into GrpcExecutor class

pull/15962/head
Sree Kuchibhotla 6 years ago
parent 46f399a282
commit 7e9d52530d
  1. 297
      src/core/lib/iomgr/executor.cc
  2. 56
      src/core/lib/iomgr/executor.h

@ -21,6 +21,7 @@
#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/executor.h"
#include <string.h> #include <string.h>
#include <functional>
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/cpu.h> #include <grpc/support/cpu.h>
@ -28,52 +29,41 @@
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
#include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/spinlock.h"
#include "src/core/lib/gpr/tls.h" #include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gpr/useful.h" #include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/exec_ctx.h"
#define MAX_DEPTH 2 #define MAX_DEPTH 2
typedef struct { #define EXECUTOR_TRACE(format, ...) \
gpr_mu mu; if (executor_trace.enabled()) { \
gpr_cv cv; gpr_log(GPR_INFO, format, __VA_ARGS__); \
grpc_closure_list elems; }
size_t depth;
bool shutdown; grpc_core::TraceFlag executor_trace(false, "executor");
bool queued_long_job;
grpc_core::Thread thd;
} thread_state;
static thread_state* g_thread_state;
static size_t g_max_threads;
static gpr_atm g_cur_threads;
static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER;
GPR_TLS_DECL(g_this_thread_state); GPR_TLS_DECL(g_this_thread_state);
grpc_core::TraceFlag executor_trace(false, "executor"); GrpcExecutor::GrpcExecutor(const char* executor_name) : name(executor_name) {
adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER;
gpr_atm_no_barrier_store(&num_threads, 0);
}
static void executor_thread(void* arg); void GrpcExecutor::Init() { SetThreading(true); }
static size_t run_closures(grpc_closure_list list) { size_t GrpcExecutor::RunClosures(grpc_closure_list list) {
size_t n = 0; size_t n = 0;
grpc_closure* c = list.head; grpc_closure* c = list.head;
while (c != nullptr) { while (c != nullptr) {
grpc_closure* next = c->next_data.next; grpc_closure* next = c->next_data.next;
grpc_error* error = c->error_data.error; grpc_error* error = c->error_data.error;
if (executor_trace.enabled()) {
#ifndef NDEBUG
gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
c->file_created, c->line_created);
#else
gpr_log(GPR_INFO, "EXECUTOR: run %p", c);
#endif
}
#ifndef NDEBUG #ifndef NDEBUG
EXECUTOR_TRACE("EXECUTOR: run %p [created by %s:%d]", c, c->file_created,
c->line_created);
c->scheduled = false; c->scheduled = false;
#else
EXECUTOR_TRACE("EXECUTOR: run %p", c);
#endif #endif
c->cb(c->cb_arg, error); c->cb(c->cb_arg, error);
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
@ -85,63 +75,69 @@ static size_t run_closures(grpc_closure_list list) {
return n; return n;
} }
bool grpc_executor_is_threaded() { bool GrpcExecutor::IsThreaded() {
return gpr_atm_no_barrier_load(&g_cur_threads) > 0; return gpr_atm_no_barrier_load(&num_threads) > 0;
} }
void grpc_executor_set_threading(bool threading) { void GrpcExecutor::SetThreading(bool threading) {
gpr_atm cur_threads = gpr_atm_no_barrier_load(&g_cur_threads); gpr_atm curr_num_threads = gpr_atm_no_barrier_load(&num_threads);
if (threading) { if (threading) {
if (cur_threads > 0) return; if (curr_num_threads > 0) return;
g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores());
gpr_atm_no_barrier_store(&g_cur_threads, 1); // TODO (sreek): max_threads initialization can be moved into the
// constructor
max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores());
gpr_atm_no_barrier_store(&num_threads, 1);
gpr_tls_init(&g_this_thread_state); gpr_tls_init(&g_this_thread_state);
g_thread_state = static_cast<thread_state*>( thd_state = static_cast<thread_state*>(
gpr_zalloc(sizeof(thread_state) * g_max_threads)); gpr_zalloc(sizeof(thread_state) * max_threads));
for (size_t i = 0; i < g_max_threads; i++) {
gpr_mu_init(&g_thread_state[i].mu); for (size_t i = 0; i < max_threads; i++) {
gpr_cv_init(&g_thread_state[i].cv); gpr_mu_init(&thd_state[i].mu);
g_thread_state[i].thd = grpc_core::Thread(); gpr_cv_init(&thd_state[i].cv);
g_thread_state[i].elems = GRPC_CLOSURE_LIST_INIT; thd_state[i].id = i;
thd_state[i].thd = grpc_core::Thread();
thd_state[i].elems = GRPC_CLOSURE_LIST_INIT;
} }
g_thread_state[0].thd = thd_state[0].thd =
grpc_core::Thread("grpc_executor", executor_thread, &g_thread_state[0]); grpc_core::Thread(name, &GrpcExecutor::ThreadMain, &thd_state[0]);
g_thread_state[0].thd.Start(); thd_state[0].thd.Start();
} else { } else {
if (cur_threads == 0) return; if (curr_num_threads == 0) return;
for (size_t i = 0; i < g_max_threads; i++) {
gpr_mu_lock(&g_thread_state[i].mu); for (size_t i = 0; i < max_threads; i++) {
g_thread_state[i].shutdown = true; gpr_mu_lock(&thd_state[i].mu);
gpr_cv_signal(&g_thread_state[i].cv); thd_state[i].shutdown = true;
gpr_mu_unlock(&g_thread_state[i].mu); gpr_cv_signal(&thd_state[i].cv);
gpr_mu_unlock(&thd_state[i].mu);
} }
/* ensure no thread is adding a new thread... once this is past, then
no thread will try to add a new one either (since shutdown is true) */ /* Ensure no thread is adding a new thread. Once this is past, then no
gpr_spinlock_lock(&g_adding_thread_lock); * thread will try to add a new one either (since shutdown is true) */
gpr_spinlock_unlock(&g_adding_thread_lock); gpr_spinlock_lock(&adding_thread_lock);
for (gpr_atm i = 0; i < g_cur_threads; i++) { gpr_spinlock_unlock(&adding_thread_lock);
g_thread_state[i].thd.Join();
for (gpr_atm i = 0; i < num_threads; i++) {
thd_state[i].thd.Join();
} }
gpr_atm_no_barrier_store(&g_cur_threads, 0);
for (size_t i = 0; i < g_max_threads; i++) { gpr_atm_no_barrier_store(&num_threads, 0);
gpr_mu_destroy(&g_thread_state[i].mu); for (size_t i = 0; i < max_threads; i++) {
gpr_cv_destroy(&g_thread_state[i].cv); gpr_mu_destroy(&thd_state[i].mu);
run_closures(g_thread_state[i].elems); gpr_cv_destroy(&thd_state[i].cv);
RunClosures(thd_state[i].elems);
} }
gpr_free(g_thread_state);
gpr_free(thd_state);
gpr_tls_destroy(&g_this_thread_state); gpr_tls_destroy(&g_this_thread_state);
} }
} }
void grpc_executor_init() { void GrpcExecutor::Shutdown() { SetThreading(false); }
gpr_atm_no_barrier_store(&g_cur_threads, 0);
grpc_executor_set_threading(true);
}
void grpc_executor_shutdown() { grpc_executor_set_threading(false); }
static void executor_thread(void* arg) { void GrpcExecutor::ThreadMain(void* arg) {
thread_state* ts = static_cast<thread_state*>(arg); thread_state* ts = static_cast<thread_state*>(arg);
gpr_tls_set(&g_this_thread_state, (intptr_t)ts); gpr_tls_set(&g_this_thread_state, (intptr_t)ts);
@ -149,153 +145,190 @@ static void executor_thread(void* arg) {
size_t subtract_depth = 0; size_t subtract_depth = 0;
for (;;) { for (;;) {
if (executor_trace.enabled()) { EXECUTOR_TRACE("EXECUTOR[%ld]: step (sub_depth=%" PRIdPTR ")", ts->id,
gpr_log(GPR_INFO, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")", subtract_depth);
static_cast<int>(ts - g_thread_state), subtract_depth);
}
gpr_mu_lock(&ts->mu); gpr_mu_lock(&ts->mu);
ts->depth -= subtract_depth; ts->depth -= subtract_depth;
// Wait for closures to be enqueued or for the executor to be shutdown
while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
ts->queued_long_job = false; ts->queued_long_job = false;
gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
} }
if (ts->shutdown) { if (ts->shutdown) {
if (executor_trace.enabled()) { EXECUTOR_TRACE("EXECUTOR[%ld]: shutdown", ts->id);
gpr_log(GPR_INFO, "EXECUTOR[%d]: shutdown",
static_cast<int>(ts - g_thread_state));
}
gpr_mu_unlock(&ts->mu); gpr_mu_unlock(&ts->mu);
break; break;
} }
GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(); GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED();
grpc_closure_list exec = ts->elems; grpc_closure_list closures = ts->elems;
ts->elems = GRPC_CLOSURE_LIST_INIT; ts->elems = GRPC_CLOSURE_LIST_INIT;
gpr_mu_unlock(&ts->mu); gpr_mu_unlock(&ts->mu);
if (executor_trace.enabled()) {
gpr_log(GPR_INFO, "EXECUTOR[%d]: execute", EXECUTOR_TRACE("EXECUTOR[%ld]: execute", ts->id);
static_cast<int>(ts - g_thread_state));
}
grpc_core::ExecCtx::Get()->InvalidateNow(); grpc_core::ExecCtx::Get()->InvalidateNow();
subtract_depth = run_closures(exec); subtract_depth = RunClosures(closures);
} }
} }
static void executor_push(grpc_closure* closure, grpc_error* error, void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
bool is_short) { bool is_short) {
bool retry_push; bool retry_push;
if (is_short) { if (is_short) {
GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(); GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS();
} else { } else {
GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS(); GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS();
} }
do { do {
retry_push = false; retry_push = false;
size_t cur_thread_count = size_t cur_thread_count =
static_cast<size_t>(gpr_atm_no_barrier_load(&g_cur_threads)); static_cast<size_t>(gpr_atm_no_barrier_load(&num_threads));
// If the number of threads is zero(i.e either the executor is not threaded
// or already shutdown), then queue the closure on the exec context itself
if (cur_thread_count == 0) { if (cur_thread_count == 0) {
if (executor_trace.enabled()) {
#ifndef NDEBUG #ifndef NDEBUG
gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p (created %s:%d) inline", EXECUTOR_TRACE("EXECUTOR: schedule %p (created %s:%d) inline", closure,
closure, closure->file_created, closure->line_created); closure->file_created, closure->line_created);
#else #else
gpr_log(GPR_INFO, "EXECUTOR: schedule %p inline", closure); EXECUTOR_TRACE("EXECUTOR: schedule %p inline", closure);
#endif #endif
}
grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(),
closure, error); closure, error);
return; return;
} }
thread_state* ts = (thread_state*)gpr_tls_get(&g_this_thread_state); thread_state* ts = (thread_state*)gpr_tls_get(&g_this_thread_state);
if (ts == nullptr) { if (ts == nullptr) {
ts = &g_thread_state[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(), ts = &thd_state[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(),
cur_thread_count)]; cur_thread_count)];
} else { } else {
GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(); GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF();
} }
thread_state* orig_ts = ts; thread_state* orig_ts = ts;
bool try_new_thread; bool try_new_thread;
for (;;) { for (;;) {
if (executor_trace.enabled()) {
#ifndef NDEBUG #ifndef NDEBUG
gpr_log( EXECUTOR_TRACE(
GPR_DEBUG, "EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread %ld",
"EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread %d", closure, is_short ? "short" : "long", closure->file_created,
closure, is_short ? "short" : "long", closure->file_created, closure->line_created, ts->id);
closure->line_created, static_cast<int>(ts - g_thread_state));
#else #else
gpr_log(GPR_INFO, "EXECUTOR: try to schedule %p (%s) to thread %d", EXECUTOR_TRACE("EXECUTOR: try to schedule %p (%s) to thread %ld", closure,
closure, is_short ? "short" : "long", is_short ? "short" : "long", ts->id);
(int)(ts - g_thread_state));
#endif #endif
}
gpr_mu_lock(&ts->mu); gpr_mu_lock(&ts->mu);
if (ts->queued_long_job) { if (ts->queued_long_job) {
// if there's a long job queued, we never queue anything else to this // if there's a long job queued, we never queue anything else to this
// queue (since long jobs can take 'infinite' time and we need to // queue (since long jobs can take 'infinite' time and we need to
// guarantee no starvation) // guarantee no starvation). Spin through queues and try again
// ... spin through queues and try again
gpr_mu_unlock(&ts->mu); gpr_mu_unlock(&ts->mu);
size_t idx = static_cast<size_t>(ts - g_thread_state); size_t idx = ts->id;
ts = &g_thread_state[(idx + 1) % cur_thread_count]; ts = &thd_state[(idx + 1) % cur_thread_count];
if (ts == orig_ts) { if (ts == orig_ts) {
// We cycled through all the threads. Retry enqueue again (by creating
// a new thread)
retry_push = true; retry_push = true;
// TODO (sreek): What if the executor is shutdown OR if
// cur_thread_count is already equal to max_threads ? (currently - as
// of July 2018, we do not run in to this issue because there is only
// one instance of long job in gRPC. This has to be fixed soon)
try_new_thread = true; try_new_thread = true;
break; break;
} }
continue; continue;
} }
// == Found the thread state (i.e thread) to enqueue this closure! ==
// Also, if this thread has been waiting for closures, wake it up.
// - If grpc_closure_list_empty() is true and the Executor is not
// shutdown, it means that the thread must be waiting in ThreadMain()
// - Note that gpr_cv_signal() won't immediately wakeup the thread. That
// happens after we release the mutex &ts->mu a few lines below
if (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { if (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(); GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED();
gpr_cv_signal(&ts->cv); gpr_cv_signal(&ts->cv);
} }
grpc_closure_list_append(&ts->elems, closure, error); grpc_closure_list_append(&ts->elems, closure, error);
// If we already queued more than MAX_DEPTH number of closures on this
// thread, use this as a hint to create more threads
ts->depth++; ts->depth++;
try_new_thread = ts->depth > MAX_DEPTH && try_new_thread = ts->depth > MAX_DEPTH &&
cur_thread_count < g_max_threads && !ts->shutdown; cur_thread_count < max_threads && !ts->shutdown;
if (!is_short) ts->queued_long_job = true;
ts->queued_long_job = !is_short;
gpr_mu_unlock(&ts->mu); gpr_mu_unlock(&ts->mu);
break; break;
} }
if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) {
if (try_new_thread && gpr_spinlock_trylock(&adding_thread_lock)) {
cur_thread_count = cur_thread_count =
static_cast<size_t>(gpr_atm_no_barrier_load(&g_cur_threads)); static_cast<size_t>(gpr_atm_no_barrier_load(&num_threads));
if (cur_thread_count < g_max_threads) { if (cur_thread_count < max_threads) {
gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); // Increment num_threads (Safe to do a no_barrier_store instead of a
// cas because we always increment num_threads under the
g_thread_state[cur_thread_count].thd = // 'adding_thread_lock')
grpc_core::Thread("grpc_executor", executor_thread, gpr_atm_no_barrier_store(&num_threads, cur_thread_count + 1);
&g_thread_state[cur_thread_count]);
g_thread_state[cur_thread_count].thd.Start(); thd_state[cur_thread_count].thd = grpc_core::Thread(
name, &GrpcExecutor::ThreadMain, &thd_state[cur_thread_count]);
thd_state[cur_thread_count].thd.Start();
} }
gpr_spinlock_unlock(&g_adding_thread_lock); gpr_spinlock_unlock(&adding_thread_lock);
} }
if (retry_push) { if (retry_push) {
GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES(); GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES();
} }
} while (retry_push); } while (retry_push);
} }
static void executor_push_short(grpc_closure* closure, grpc_error* error) { static GrpcExecutor g_global_executor("grpc-executor");
executor_push(closure, error, true);
void enqueue_long(grpc_closure* closure, grpc_error* error) {
g_global_executor.Enqueue(closure, error, false);
} }
static void executor_push_long(grpc_closure* closure, grpc_error* error) { void enqueue_short(grpc_closure* closure, grpc_error* error) {
executor_push(closure, error, false); g_global_executor.Enqueue(closure, error, true);
} }
static const grpc_closure_scheduler_vtable executor_vtable_short = { // Short-Job executor scheduler
executor_push_short, executor_push_short, "executor"}; static const grpc_closure_scheduler_vtable global_executor_vtable_short = {
static grpc_closure_scheduler executor_scheduler_short = { enqueue_short, enqueue_short, "executor-short"};
&executor_vtable_short}; static grpc_closure_scheduler global_scheduler_short = {
&global_executor_vtable_short};
// Long-job executor scheduler
static const grpc_closure_scheduler_vtable global_executor_vtable_long = {
enqueue_long, enqueue_long, "executor-long"};
static grpc_closure_scheduler global_scheduler_long = {
&global_executor_vtable_long};
void grpc_executor_init() { g_global_executor.Init(); }
static const grpc_closure_scheduler_vtable executor_vtable_long = { void grpc_executor_shutdown() { g_global_executor.Shutdown(); }
executor_push_long, executor_push_long, "executor"};
static grpc_closure_scheduler executor_scheduler_long = {&executor_vtable_long}; bool grpc_executor_is_threaded() { return g_global_executor.IsThreaded(); }
void grpc_executor_set_threading(bool enable) {
g_global_executor.SetThreading(enable);
}
grpc_closure_scheduler* grpc_executor_scheduler( grpc_closure_scheduler* grpc_executor_scheduler(
grpc_executor_job_length length) { grpc_executor_job_length length) {
return length == GRPC_EXECUTOR_SHORT ? &executor_scheduler_short return length == GRPC_EXECUTOR_SHORT ? &global_scheduler_short
: &executor_scheduler_long; : &global_scheduler_long;
} }

@ -21,30 +21,66 @@
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include "src/core/lib/gpr/spinlock.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/closure.h"
typedef struct {
gpr_mu mu;
size_t id; // For debugging purposes
gpr_cv cv;
grpc_closure_list elems;
size_t depth; // Number of closures in the closure list
bool shutdown;
bool queued_long_job;
grpc_core::Thread thd;
} thread_state;
typedef enum { typedef enum {
GRPC_EXECUTOR_SHORT, GRPC_EXECUTOR_SHORT,
GRPC_EXECUTOR_LONG GRPC_EXECUTOR_LONG
} grpc_executor_job_length; } grpc_executor_job_length;
/** Initialize the global executor. class GrpcExecutor {
* public:
* This mechanism is meant to outsource work (grpc_closure instances) to a GrpcExecutor(const char* executor_name);
* thread, for those cases where blocking isn't an option but there isn't a void Init();
* non-blocking solution available. */
/** Is the executor multi-threaded? */
bool IsThreaded();
/* Enable/disable threading - must be called after Init and Shutdown() */
void SetThreading(bool threading);
/** Shutdown the executor, running all pending work as part of the call */
void Shutdown();
/** Enqueue the closure onto the executor. is_short is true if the closure is
* a short job (i.e expected to not block and complete quickly) */
void Enqueue(grpc_closure* closure, grpc_error* error, bool is_short);
private:
static size_t RunClosures(grpc_closure_list list);
static void ThreadMain(void* arg);
const char* name;
thread_state* thd_state;
size_t max_threads;
gpr_atm num_threads;
gpr_spinlock adding_thread_lock;
};
// == Global executor functions ==
void grpc_executor_init(); void grpc_executor_init();
grpc_closure_scheduler* grpc_executor_scheduler(grpc_executor_job_length); grpc_closure_scheduler* grpc_executor_scheduler(
grpc_executor_job_length length);
/** Shutdown the executor, running all pending work as part of the call */
void grpc_executor_shutdown(); void grpc_executor_shutdown();
/** Is the executor multi-threaded? */
bool grpc_executor_is_threaded(); bool grpc_executor_is_threaded();
/* enable/disable threading - must be called after grpc_executor_init and before
grpc_executor_shutdown */
void grpc_executor_set_threading(bool enable); void grpc_executor_set_threading(bool enable);
#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */ #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */

Loading…
Cancel
Save