From 44402ad0a1b36770f3546393d346f8c0419e46cf Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 17 Jan 2019 23:21:08 -0800 Subject: [PATCH] Make executor look more like the rest of the codebase (namespace, etc) --- .../chttp2/transport/chttp2_transport.cc | 6 +- src/core/lib/iomgr/combiner.cc | 7 +- src/core/lib/iomgr/executor.cc | 201 ++++++++++-------- src/core/lib/iomgr/executor.h | 101 ++++----- src/core/lib/iomgr/fork_posix.cc | 6 +- src/core/lib/iomgr/iomgr.cc | 4 +- src/core/lib/iomgr/iomgr_custom.cc | 2 +- src/core/lib/iomgr/resolve_address_posix.cc | 5 +- src/core/lib/iomgr/resolve_address_windows.cc | 3 +- src/core/lib/iomgr/tcp_posix.cc | 8 +- src/core/lib/iomgr/udp_server.cc | 10 +- src/core/lib/surface/init.cc | 2 +- src/core/lib/surface/server.cc | 5 +- src/core/lib/transport/transport.cc | 2 +- test/core/end2end/fuzzers/api_fuzzer.cc | 2 +- test/core/end2end/fuzzers/client_fuzzer.cc | 2 +- test/core/end2end/fuzzers/server_fuzzer.cc | 2 +- test/core/iomgr/resolve_address_test.cc | 2 +- 18 files changed, 204 insertions(+), 166 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 7f4627fa773..fe88d4818e4 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -968,19 +968,19 @@ static grpc_closure_scheduler* write_scheduler(grpc_chttp2_transport* t, get better latency overall if we switch writing work elsewhere and continue with application work above */ if (!t->is_first_write_in_batch) { - return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); + return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT); } /* equivalently, if it's a partial write, we *know* we're going to be taking a thread jump to write it because of the above, may as well do so immediately */ if (partial_write) { - return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); + return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT); } switch (t->opt_target) { case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT: /* executor gives us the largest probability of being able to batch a * write with others on this transport */ - return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); + return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT); case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY: return grpc_schedule_on_exec_ctx; } diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 402f8904eae..4fc4a9dccf4 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -83,8 +83,9 @@ grpc_combiner* grpc_combiner_create(void) { gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); gpr_mpscq_init(&lock->queue); grpc_closure_list_init(&lock->final_list); - GRPC_CLOSURE_INIT(&lock->offload, offload, lock, - grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); + GRPC_CLOSURE_INIT( + &lock->offload, offload, lock, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)); GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p create", lock)); return lock; } @@ -235,7 +236,7 @@ bool grpc_combiner_continue_exec_ctx() { // 3. the DEFAULT executor is threaded // 4. the current thread is not a worker for any background poller if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() && - grpc_executor_is_threaded() && + grpc_core::Executor::IsThreadedDefault() && !grpc_iomgr_is_any_background_poller_thread()) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on: schedule remaining work to be diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index 45d96b80eb4..2703e1a0b77 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -45,20 +45,70 @@ gpr_log(GPR_INFO, "EXECUTOR " str); \ } -grpc_core::TraceFlag executor_trace(false, "executor"); +namespace grpc_core { +namespace { GPR_TLS_DECL(g_this_thread_state); -GrpcExecutor::GrpcExecutor(const char* name) : name_(name) { +Executor* executors[static_cast(ExecutorType::NUM_EXECUTORS)]; + +void default_enqueue_short(grpc_closure* closure, grpc_error* error) { + executors[static_cast(ExecutorType::DEFAULT)]->Enqueue( + closure, error, true /* is_short */); +} + +void default_enqueue_long(grpc_closure* closure, grpc_error* error) { + executors[static_cast(ExecutorType::DEFAULT)]->Enqueue( + closure, error, false /* is_short */); +} + +void resolver_enqueue_short(grpc_closure* closure, grpc_error* error) { + executors[static_cast(ExecutorType::RESOLVER)]->Enqueue( + closure, error, true /* is_short */); +} + +void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) { + executors[static_cast(ExecutorType::RESOLVER)]->Enqueue( + closure, error, false /* is_short */); +} + +const grpc_closure_scheduler_vtable + vtables_[static_cast(ExecutorType::NUM_EXECUTORS)] + [static_cast(ExecutorJobType::NUM_JOB_TYPES)] = { + {{&default_enqueue_short, &default_enqueue_short, + "def-ex-short"}, + {&default_enqueue_long, &default_enqueue_long, "def-ex-long"}}, + {{&resolver_enqueue_short, &resolver_enqueue_short, + "res-ex-short"}, + {&resolver_enqueue_long, &resolver_enqueue_long, + "res-ex-long"}}}; + +grpc_closure_scheduler + schedulers_[static_cast(ExecutorType::NUM_EXECUTORS)] + [static_cast(ExecutorJobType::NUM_JOB_TYPES)] = { + {{&vtables_[static_cast(ExecutorType::DEFAULT)] + [static_cast(ExecutorJobType::SHORT)]}, + {&vtables_[static_cast(ExecutorType::DEFAULT)] + [static_cast(ExecutorJobType::LONG)]}}, + {{&vtables_[static_cast(ExecutorType::RESOLVER)] + [static_cast(ExecutorJobType::SHORT)]}, + {&vtables_[static_cast(ExecutorType::RESOLVER)] + [static_cast(ExecutorJobType::LONG)]}}}; + +} // namespace + +TraceFlag executor_trace(false, "executor"); + +Executor::Executor(const char* name) : name_(name) { adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER; gpr_atm_rel_store(&num_threads_, 0); max_threads_ = GPR_MAX(1, 2 * gpr_cpu_num_cores()); } -void GrpcExecutor::Init() { SetThreading(true); } +void Executor::Init() { SetThreading(true); } -size_t GrpcExecutor::RunClosures(const char* executor_name, - grpc_closure_list list) { +size_t Executor::RunClosures(const char* executor_name, + grpc_closure_list list) { size_t n = 0; grpc_closure* c = list.head; @@ -82,11 +132,11 @@ size_t GrpcExecutor::RunClosures(const char* executor_name, return n; } -bool GrpcExecutor::IsThreaded() const { +bool Executor::IsThreaded() const { return gpr_atm_acq_load(&num_threads_) > 0; } -void GrpcExecutor::SetThreading(bool threading) { +void Executor::SetThreading(bool threading) { gpr_atm curr_num_threads = gpr_atm_acq_load(&num_threads_); EXECUTOR_TRACE("(%s) SetThreading(%d) begin", name_, threading); @@ -112,7 +162,7 @@ void GrpcExecutor::SetThreading(bool threading) { } thd_state_[0].thd = - grpc_core::Thread(name_, &GrpcExecutor::ThreadMain, &thd_state_[0]); + grpc_core::Thread(name_, &Executor::ThreadMain, &thd_state_[0]); thd_state_[0].thd.Start(); } else { // !threading if (curr_num_threads == 0) { @@ -153,9 +203,9 @@ void GrpcExecutor::SetThreading(bool threading) { EXECUTOR_TRACE("(%s) SetThreading(%d) done", name_, threading); } -void GrpcExecutor::Shutdown() { SetThreading(false); } +void Executor::Shutdown() { SetThreading(false); } -void GrpcExecutor::ThreadMain(void* arg) { +void Executor::ThreadMain(void* arg) { ThreadState* ts = static_cast(arg); gpr_tls_set(&g_this_thread_state, reinterpret_cast(ts)); @@ -192,8 +242,8 @@ void GrpcExecutor::ThreadMain(void* arg) { } } -void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, - bool is_short) { +void Executor::Enqueue(grpc_closure* closure, grpc_error* error, + bool is_short) { bool retry_push; if (is_short) { GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(); @@ -304,7 +354,7 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, gpr_atm_rel_store(&num_threads_, cur_thread_count + 1); thd_state_[cur_thread_count].thd = grpc_core::Thread( - name_, &GrpcExecutor::ThreadMain, &thd_state_[cur_thread_count]); + name_, &Executor::ThreadMain, &thd_state_[cur_thread_count]); thd_state_[cur_thread_count].thd.Start(); } gpr_spinlock_unlock(&adding_thread_lock_); @@ -316,85 +366,52 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, } while (retry_push); } -static GrpcExecutor* executors[GRPC_NUM_EXECUTORS]; - -void default_enqueue_short(grpc_closure* closure, grpc_error* error) { - executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error, - true /* is_short */); -} - -void default_enqueue_long(grpc_closure* closure, grpc_error* error) { - executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error, - false /* is_short */); -} - -void resolver_enqueue_short(grpc_closure* closure, grpc_error* error) { - executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error, - true /* is_short */); -} - -void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) { - executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error, - false /* is_short */); -} - -static const grpc_closure_scheduler_vtable - vtables_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = { - {{&default_enqueue_short, &default_enqueue_short, "def-ex-short"}, - {&default_enqueue_long, &default_enqueue_long, "def-ex-long"}}, - {{&resolver_enqueue_short, &resolver_enqueue_short, "res-ex-short"}, - {&resolver_enqueue_long, &resolver_enqueue_long, "res-ex-long"}}}; - -static grpc_closure_scheduler - schedulers_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = { - {{&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_SHORT]}, - {&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_LONG]}}, - {{&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_SHORT]}, - {&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_LONG]}}}; - -// grpc_executor_init() and grpc_executor_shutdown() functions are called in the +// Executor::InitAll() and Executor::ShutdownAll() functions are called in the // the grpc_init() and grpc_shutdown() code paths which are protected by a // global mutex. So it is okay to assume that these functions are thread-safe -void grpc_executor_init() { - EXECUTOR_TRACE0("grpc_executor_init() enter"); +void Executor::InitAll() { + EXECUTOR_TRACE0("Executor::InitAll() enter"); - // Return if grpc_executor_init() is already called earlier - if (executors[GRPC_DEFAULT_EXECUTOR] != nullptr) { - GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] != nullptr); + // Return if Executor::InitAll() is already called earlier + if (executors[static_cast(ExecutorType::DEFAULT)] != nullptr) { + GPR_ASSERT(executors[static_cast(ExecutorType::RESOLVER)] != + nullptr); return; } - executors[GRPC_DEFAULT_EXECUTOR] = - grpc_core::New("default-executor"); - executors[GRPC_RESOLVER_EXECUTOR] = - grpc_core::New("resolver-executor"); + executors[static_cast(ExecutorType::DEFAULT)] = + grpc_core::New("default-executor"); + executors[static_cast(ExecutorType::RESOLVER)] = + grpc_core::New("resolver-executor"); - executors[GRPC_DEFAULT_EXECUTOR]->Init(); - executors[GRPC_RESOLVER_EXECUTOR]->Init(); + executors[static_cast(ExecutorType::DEFAULT)]->Init(); + executors[static_cast(ExecutorType::RESOLVER)]->Init(); - EXECUTOR_TRACE0("grpc_executor_init() done"); + EXECUTOR_TRACE0("Executor::InitAll() done"); } -grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type, - GrpcExecutorJobType job_type) { - return &schedulers_[executor_type][job_type]; +grpc_closure_scheduler* Executor::Scheduler(ExecutorType executor_type, + ExecutorJobType job_type) { + return &schedulers_[static_cast(executor_type)] + [static_cast(job_type)]; } -grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) { - return grpc_executor_scheduler(GRPC_DEFAULT_EXECUTOR, job_type); +grpc_closure_scheduler* Executor::Scheduler(ExecutorJobType job_type) { + return Executor::Scheduler(ExecutorType::DEFAULT, job_type); } -void grpc_executor_shutdown() { - EXECUTOR_TRACE0("grpc_executor_shutdown() enter"); +void Executor::ShutdownAll() { + EXECUTOR_TRACE0("Executor::ShutdownAll() enter"); - // Return if grpc_executor_shutdown() is already called earlier - if (executors[GRPC_DEFAULT_EXECUTOR] == nullptr) { - GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] == nullptr); + // Return if Executor:SshutdownAll() is already called earlier + if (executors[static_cast(ExecutorType::DEFAULT)] == nullptr) { + GPR_ASSERT(executors[static_cast(ExecutorType::RESOLVER)] == + nullptr); return; } - executors[GRPC_DEFAULT_EXECUTOR]->Shutdown(); - executors[GRPC_RESOLVER_EXECUTOR]->Shutdown(); + executors[static_cast(ExecutorType::DEFAULT)]->Shutdown(); + executors[static_cast(ExecutorType::RESOLVER)]->Shutdown(); // Delete the executor objects. // @@ -408,26 +425,36 @@ void grpc_executor_shutdown() { // By ensuring that all executors are shutdown first, we are also ensuring // that no thread is active across all executors. - grpc_core::Delete(executors[GRPC_DEFAULT_EXECUTOR]); - grpc_core::Delete(executors[GRPC_RESOLVER_EXECUTOR]); - executors[GRPC_DEFAULT_EXECUTOR] = nullptr; - executors[GRPC_RESOLVER_EXECUTOR] = nullptr; + grpc_core::Delete( + executors[static_cast(ExecutorType::DEFAULT)]); + grpc_core::Delete( + executors[static_cast(ExecutorType::RESOLVER)]); + executors[static_cast(ExecutorType::DEFAULT)] = nullptr; + executors[static_cast(ExecutorType::RESOLVER)] = nullptr; - EXECUTOR_TRACE0("grpc_executor_shutdown() done"); + EXECUTOR_TRACE0("Executor::ShutdownAll() done"); } -bool grpc_executor_is_threaded(GrpcExecutorType executor_type) { - GPR_ASSERT(executor_type < GRPC_NUM_EXECUTORS); - return executors[executor_type]->IsThreaded(); +bool Executor::IsThreaded(ExecutorType executor_type) { + GPR_ASSERT(executor_type < ExecutorType::NUM_EXECUTORS); + return executors[static_cast(executor_type)]->IsThreaded(); } -bool grpc_executor_is_threaded() { - return grpc_executor_is_threaded(GRPC_DEFAULT_EXECUTOR); +bool Executor::IsThreadedDefault() { + return Executor::IsThreaded(ExecutorType::DEFAULT); } -void grpc_executor_set_threading(bool enable) { - EXECUTOR_TRACE("grpc_executor_set_threading(%d) called", enable); - for (int i = 0; i < GRPC_NUM_EXECUTORS; i++) { +void Executor::SetThreadingAll(bool enable) { + EXECUTOR_TRACE("Executor::SetThreadingAll(%d) called", enable); + for (size_t i = 0; i < static_cast(ExecutorType::NUM_EXECUTORS); + i++) { executors[i]->SetThreading(enable); } } + +void Executor::SetThreadingDefault(bool enable) { + EXECUTOR_TRACE("Executor::SetThreadingDefault(%d) called", enable); + executors[static_cast(ExecutorType::DEFAULT)]->SetThreading(enable); +} + +} // namespace grpc_core diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index 8829138c5fa..9e472279b7b 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -25,7 +25,9 @@ #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/closure.h" -typedef struct { +namespace grpc_core { + +struct ThreadState { gpr_mu mu; size_t id; // For debugging purposes const char* name; // Thread state name @@ -35,17 +37,24 @@ typedef struct { bool shutdown; bool queued_long_job; grpc_core::Thread thd; -} ThreadState; +}; -typedef enum { - GRPC_EXECUTOR_SHORT = 0, - GRPC_EXECUTOR_LONG, - GRPC_NUM_EXECUTOR_JOB_TYPES // Add new values above this -} GrpcExecutorJobType; +enum class ExecutorType { + DEFAULT = 0, + RESOLVER, + + NUM_EXECUTORS // Add new values above this +}; -class GrpcExecutor { +enum class ExecutorJobType { + SHORT = 0, + LONG, + NUM_JOB_TYPES // Add new values above this +}; + +class Executor { public: - GrpcExecutor(const char* executor_name); + Executor(const char* executor_name); void Init(); @@ -62,55 +71,51 @@ class GrpcExecutor { * a short job (i.e expected to not block and complete quickly) */ void Enqueue(grpc_closure* closure, grpc_error* error, bool is_short); - private: - static size_t RunClosures(const char* executor_name, grpc_closure_list list); - static void ThreadMain(void* arg); + // TODO(sreek): Currently we have two executors (available globally): The + // default executor and the resolver executor. + // + // Some of the functions below operate on the DEFAULT executor only while some + // operate of ALL the executors. This is a bit confusing and should be cleaned + // up in future (where we make all the following functions take ExecutorType + // and/or JobType) - const char* name_; - ThreadState* thd_state_; - size_t max_threads_; - gpr_atm num_threads_; - gpr_spinlock adding_thread_lock_; -}; - -// == Global executor functions == + // Initialize ALL the executors + static void InitAll(); -typedef enum { - GRPC_DEFAULT_EXECUTOR = 0, - GRPC_RESOLVER_EXECUTOR, + // Shutdown ALL the executors + static void ShutdownAll(); - GRPC_NUM_EXECUTORS // Add new values above this -} GrpcExecutorType; + // Set the threading mode for ALL the executors + static void SetThreadingAll(bool enable); -// TODO(sreek): Currently we have two executors (available globally): The -// default executor and the resolver executor. -// -// Some of the functions below operate on the DEFAULT executor only while some -// operate of ALL the executors. This is a bit confusing and should be cleaned -// up in future (where we make all the following functions take executor_type -// and/or job_type) + // Set the threading mode for ALL the executors + static void SetThreadingDefault(bool enable); -// Initialize ALL the executors -void grpc_executor_init(); + // Get the DEFAULT executor scheduler for the given job_type + static grpc_closure_scheduler* Scheduler(ExecutorJobType job_type); -// Shutdown ALL the executors -void grpc_executor_shutdown(); + // Get the executor scheduler for a given executor_type and a job_type + static grpc_closure_scheduler* Scheduler(ExecutorType executor_type, + ExecutorJobType job_type); -// Set the threading mode for ALL the executors -void grpc_executor_set_threading(bool enable); + // Return if a given executor is running in threaded mode (i.e if + // SetThreading(true) was called previously on that executor) + static bool IsThreaded(ExecutorType executor_type); -// Get the DEFAULT executor scheduler for the given job_type -grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type); + // Return if the DEFAULT executor is threaded + static bool IsThreadedDefault(); -// Get the executor scheduler for a given executor_type and a job_type -grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type, - GrpcExecutorJobType job_type); + private: + static size_t RunClosures(const char* executor_name, grpc_closure_list list); + static void ThreadMain(void* arg); -// Return if a given executor is running in threaded mode (i.e if -// grpc_executor_set_threading(true) was called previously on that executor) -bool grpc_executor_is_threaded(GrpcExecutorType executor_type); + const char* name_; + ThreadState* thd_state_; + size_t max_threads_; + gpr_atm num_threads_; + gpr_spinlock adding_thread_lock_; +}; -// Return if the DEFAULT executor is threaded -bool grpc_executor_is_threaded(); +} // namespace grpc_core #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */ diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc index 05ecd2a49b7..2eebe3f26f6 100644 --- a/src/core/lib/iomgr/fork_posix.cc +++ b/src/core/lib/iomgr/fork_posix.cc @@ -71,7 +71,7 @@ void grpc_prefork() { return; } grpc_timer_manager_set_threading(false); - grpc_executor_set_threading(false); + grpc_core::Executor::SetThreadingAll(false); grpc_core::ExecCtx::Get()->Flush(); grpc_core::Fork::AwaitThreads(); skipped_handler = false; @@ -82,7 +82,7 @@ void grpc_postfork_parent() { grpc_core::Fork::AllowExecCtx(); grpc_core::ExecCtx exec_ctx; grpc_timer_manager_set_threading(true); - grpc_executor_set_threading(true); + grpc_core::Executor::SetThreadingAll(true); } } @@ -96,7 +96,7 @@ void grpc_postfork_child() { reset_polling_engine(); } grpc_timer_manager_set_threading(true); - grpc_executor_set_threading(true); + grpc_core::Executor::SetThreadingAll(true); } } diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index dcc69332e0b..33153d9cc3b 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -52,7 +52,7 @@ void grpc_iomgr_init() { g_shutdown = 0; gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); - grpc_executor_init(); + grpc_core::Executor::InitAll(); grpc_timer_list_init(); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = (char*)"root"; @@ -88,7 +88,7 @@ void grpc_iomgr_shutdown() { { grpc_timer_manager_shutdown(); grpc_iomgr_platform_flush(); - grpc_executor_shutdown(); + grpc_core::Executor::ShutdownAll(); gpr_mu_lock(&g_mu); g_shutdown = 1; diff --git a/src/core/lib/iomgr/iomgr_custom.cc b/src/core/lib/iomgr/iomgr_custom.cc index e1cd8f73104..3d07f1abe9a 100644 --- a/src/core/lib/iomgr/iomgr_custom.cc +++ b/src/core/lib/iomgr/iomgr_custom.cc @@ -34,7 +34,7 @@ gpr_thd_id g_init_thread; static void iomgr_platform_init(void) { grpc_core::ExecCtx exec_ctx; - grpc_executor_set_threading(false); + grpc_core::Executor::SetThreadingAll(false); g_init_thread = gpr_thd_currentid(); grpc_pollset_global_init(); } diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc index 2a03244ff7d..e6dd8f1ceab 100644 --- a/src/core/lib/iomgr/resolve_address_posix.cc +++ b/src/core/lib/iomgr/resolve_address_posix.cc @@ -150,7 +150,7 @@ typedef struct { void* arg; } request; -/* Callback to be passed to grpc_executor to asynch-ify +/* Callback to be passed to grpc Executor to asynch-ify * grpc_blocking_resolve_address */ static void do_request_thread(void* rp, grpc_error* error) { request* r = static_cast(rp); @@ -168,7 +168,8 @@ static void posix_resolve_address(const char* name, const char* default_port, request* r = static_cast(gpr_malloc(sizeof(request))); GRPC_CLOSURE_INIT( &r->request_closure, do_request_thread, r, - grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT)); + grpc_core::Executor::Scheduler(grpc_core::ExecutorType::RESOLVER, + grpc_core::ExecutorJobType::SHORT)); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc index 3e977dca2da..64351c38a8f 100644 --- a/src/core/lib/iomgr/resolve_address_windows.cc +++ b/src/core/lib/iomgr/resolve_address_windows.cc @@ -153,7 +153,8 @@ static void windows_resolve_address(const char* name, const char* default_port, request* r = (request*)gpr_malloc(sizeof(request)); GRPC_CLOSURE_INIT( &r->request_closure, do_request_thread, r, - grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT)); + grpc_core::Executor::Scheduler(grpc_core::ExecutorType::RESOLVER, + grpc_core::ExecutorJobType::SHORT)); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index d0642c015ff..92f163b58e9 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -227,10 +227,10 @@ static void cover_self(grpc_tcp* tcp) { } grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu); gpr_atm_rel_store(&g_backup_poller, (gpr_atm)p); - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, - grpc_executor_scheduler(GRPC_EXECUTOR_LONG)), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, + grpc_core::Executor::Scheduler( + grpc_core::ExecutorJobType::LONG)), + GRPC_ERROR_NONE); } else { while ((p = (backup_poller*)gpr_atm_acq_load(&g_backup_poller)) == nullptr) { diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 3dd7cab855c..5f8865ca57f 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -481,8 +481,9 @@ void GrpcUdpListener::OnRead(grpc_error* error, void* do_read_arg) { if (udp_handler_->Read()) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ - GRPC_CLOSURE_INIT(&do_read_closure_, do_read, do_read_arg, - grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); + GRPC_CLOSURE_INIT( + &do_read_closure_, do_read, do_read_arg, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::LONG)); GRPC_CLOSURE_SCHED(&do_read_closure_, GRPC_ERROR_NONE); } else { /* Finish reading all the packets, re-arm the notification event so we can @@ -542,8 +543,9 @@ void GrpcUdpListener::OnCanWrite(grpc_error* error, void* do_write_arg) { } /* Schedule actual write in another thread. */ - GRPC_CLOSURE_INIT(&do_write_closure_, do_write, do_write_arg, - grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); + GRPC_CLOSURE_INIT( + &do_write_closure_, do_write, do_write_arg, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::LONG)); GRPC_CLOSURE_SCHED(&do_write_closure_, GRPC_ERROR_NONE); } diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index 67cf5d89bff..60f506ef5e2 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -165,7 +165,7 @@ void grpc_shutdown(void) { { grpc_timer_manager_set_threading( false); // shutdown timer_manager thread - grpc_executor_shutdown(); + grpc_core::Executor::ShutdownAll(); for (i = g_number_of_plugins; i >= 0; i--) { if (g_all_of_the_plugins[i].destroy != nullptr) { g_all_of_the_plugins[i].destroy(); diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 7ae6e51a5fb..cdfd3336437 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -1134,8 +1134,9 @@ void grpc_server_start(grpc_server* server) { server_ref(server); server->starting = true; GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE(start_listeners, server, - grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)), + GRPC_CLOSURE_CREATE( + start_listeners, server, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), GRPC_ERROR_NONE); } diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index b32f9c6ec1a..43add28ce03 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -73,7 +73,7 @@ void grpc_stream_unref(grpc_stream_refcount* refcount) { Throw this over to the executor (on a core-owned thread) and process it there. */ refcount->destroy.scheduler = - grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT); } GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE); } diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index a0b82904753..57bc8ad768c 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -706,7 +706,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_timer_manager_set_threading(false); { grpc_core::ExecCtx exec_ctx; - grpc_executor_set_threading(false); + grpc_core::Executor::SetThreadingAll(false); } grpc_set_resolver_impl(&fuzzer_resolver); grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked; diff --git a/test/core/end2end/fuzzers/client_fuzzer.cc b/test/core/end2end/fuzzers/client_fuzzer.cc index e21006bb673..8520fb53755 100644 --- a/test/core/end2end/fuzzers/client_fuzzer.cc +++ b/test/core/end2end/fuzzers/client_fuzzer.cc @@ -46,7 +46,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_init(); { grpc_core::ExecCtx exec_ctx; - grpc_executor_set_threading(false); + grpc_core::Executor::SetThreadingAll(false); grpc_resource_quota* resource_quota = grpc_resource_quota_create("client_fuzzer"); diff --git a/test/core/end2end/fuzzers/server_fuzzer.cc b/test/core/end2end/fuzzers/server_fuzzer.cc index d370dc7de85..644f98e37ac 100644 --- a/test/core/end2end/fuzzers/server_fuzzer.cc +++ b/test/core/end2end/fuzzers/server_fuzzer.cc @@ -43,7 +43,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_init(); { grpc_core::ExecCtx exec_ctx; - grpc_executor_set_threading(false); + grpc_core::Executor::SetThreadingAll(false); grpc_resource_quota* resource_quota = grpc_resource_quota_create("server_fuzzer"); diff --git a/test/core/iomgr/resolve_address_test.cc b/test/core/iomgr/resolve_address_test.cc index 1d9e1ee27e2..0ae0ec888b6 100644 --- a/test/core/iomgr/resolve_address_test.cc +++ b/test/core/iomgr/resolve_address_test.cc @@ -290,7 +290,7 @@ int main(int argc, char** argv) { test_invalid_ip_addresses(); test_unparseable_hostports(); } - grpc_executor_shutdown(); + grpc_core::Executor::ShutdownAll(); } gpr_cmdline_destroy(cl);