Merge pull request #17764 from vjpai/executor_cleanup

Make executor look more like the rest of the codebase (namespace, etc)
pull/17751/head^2
Vijay Pai 6 years ago committed by GitHub
commit 86953f6694
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 7
      src/core/lib/iomgr/combiner.cc
  3. 201
      src/core/lib/iomgr/executor.cc
  4. 101
      src/core/lib/iomgr/executor.h
  5. 6
      src/core/lib/iomgr/fork_posix.cc
  6. 4
      src/core/lib/iomgr/iomgr.cc
  7. 2
      src/core/lib/iomgr/iomgr_custom.cc
  8. 5
      src/core/lib/iomgr/resolve_address_posix.cc
  9. 3
      src/core/lib/iomgr/resolve_address_windows.cc
  10. 8
      src/core/lib/iomgr/tcp_posix.cc
  11. 10
      src/core/lib/iomgr/udp_server.cc
  12. 2
      src/core/lib/surface/init.cc
  13. 5
      src/core/lib/surface/server.cc
  14. 2
      src/core/lib/transport/transport.cc
  15. 2
      test/core/end2end/fuzzers/api_fuzzer.cc
  16. 2
      test/core/end2end/fuzzers/client_fuzzer.cc
  17. 2
      test/core/end2end/fuzzers/server_fuzzer.cc
  18. 2
      test/core/iomgr/resolve_address_test.cc

@ -968,19 +968,19 @@ static grpc_closure_scheduler* write_scheduler(grpc_chttp2_transport* t,
get better latency overall if we switch writing work elsewhere and continue get better latency overall if we switch writing work elsewhere and continue
with application work above */ with application work above */
if (!t->is_first_write_in_batch) { if (!t->is_first_write_in_batch) {
return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
} }
/* equivalently, if it's a partial write, we *know* we're going to be taking a /* equivalently, if it's a partial write, we *know* we're going to be taking a
thread jump to write it because of the above, may as well do so thread jump to write it because of the above, may as well do so
immediately */ immediately */
if (partial_write) { if (partial_write) {
return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
} }
switch (t->opt_target) { switch (t->opt_target) {
case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT: case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT:
/* executor gives us the largest probability of being able to batch a /* executor gives us the largest probability of being able to batch a
* write with others on this transport */ * write with others on this transport */
return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY: case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY:
return grpc_schedule_on_exec_ctx; return grpc_schedule_on_exec_ctx;
} }

@ -83,8 +83,9 @@ grpc_combiner* grpc_combiner_create(void) {
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
gpr_mpscq_init(&lock->queue); gpr_mpscq_init(&lock->queue);
grpc_closure_list_init(&lock->final_list); grpc_closure_list_init(&lock->final_list);
GRPC_CLOSURE_INIT(&lock->offload, offload, lock, GRPC_CLOSURE_INIT(
grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); &lock->offload, offload, lock,
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT));
GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p create", lock)); GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p create", lock));
return lock; return lock;
} }
@ -235,7 +236,7 @@ bool grpc_combiner_continue_exec_ctx() {
// 3. the DEFAULT executor is threaded // 3. the DEFAULT executor is threaded
// 4. the current thread is not a worker for any background poller // 4. the current thread is not a worker for any background poller
if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() && if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() &&
grpc_executor_is_threaded() && grpc_core::Executor::IsThreadedDefault() &&
!grpc_iomgr_is_any_background_poller_thread()) { !grpc_iomgr_is_any_background_poller_thread()) {
GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
// this execution context wants to move on: schedule remaining work to be // this execution context wants to move on: schedule remaining work to be

@ -45,20 +45,70 @@
gpr_log(GPR_INFO, "EXECUTOR " str); \ gpr_log(GPR_INFO, "EXECUTOR " str); \
} }
grpc_core::TraceFlag executor_trace(false, "executor"); namespace grpc_core {
namespace {
GPR_TLS_DECL(g_this_thread_state); GPR_TLS_DECL(g_this_thread_state);
GrpcExecutor::GrpcExecutor(const char* name) : name_(name) { Executor* executors[static_cast<size_t>(ExecutorType::NUM_EXECUTORS)];
void default_enqueue_short(grpc_closure* closure, grpc_error* error) {
executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Enqueue(
closure, error, true /* is_short */);
}
void default_enqueue_long(grpc_closure* closure, grpc_error* error) {
executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Enqueue(
closure, error, false /* is_short */);
}
void resolver_enqueue_short(grpc_closure* closure, grpc_error* error) {
executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Enqueue(
closure, error, true /* is_short */);
}
void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) {
executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Enqueue(
closure, error, false /* is_short */);
}
const grpc_closure_scheduler_vtable
vtables_[static_cast<size_t>(ExecutorType::NUM_EXECUTORS)]
[static_cast<size_t>(ExecutorJobType::NUM_JOB_TYPES)] = {
{{&default_enqueue_short, &default_enqueue_short,
"def-ex-short"},
{&default_enqueue_long, &default_enqueue_long, "def-ex-long"}},
{{&resolver_enqueue_short, &resolver_enqueue_short,
"res-ex-short"},
{&resolver_enqueue_long, &resolver_enqueue_long,
"res-ex-long"}}};
grpc_closure_scheduler
schedulers_[static_cast<size_t>(ExecutorType::NUM_EXECUTORS)]
[static_cast<size_t>(ExecutorJobType::NUM_JOB_TYPES)] = {
{{&vtables_[static_cast<size_t>(ExecutorType::DEFAULT)]
[static_cast<size_t>(ExecutorJobType::SHORT)]},
{&vtables_[static_cast<size_t>(ExecutorType::DEFAULT)]
[static_cast<size_t>(ExecutorJobType::LONG)]}},
{{&vtables_[static_cast<size_t>(ExecutorType::RESOLVER)]
[static_cast<size_t>(ExecutorJobType::SHORT)]},
{&vtables_[static_cast<size_t>(ExecutorType::RESOLVER)]
[static_cast<size_t>(ExecutorJobType::LONG)]}}};
} // namespace
TraceFlag executor_trace(false, "executor");
Executor::Executor(const char* name) : name_(name) {
adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER; adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER;
gpr_atm_rel_store(&num_threads_, 0); gpr_atm_rel_store(&num_threads_, 0);
max_threads_ = GPR_MAX(1, 2 * gpr_cpu_num_cores()); max_threads_ = GPR_MAX(1, 2 * gpr_cpu_num_cores());
} }
void GrpcExecutor::Init() { SetThreading(true); } void Executor::Init() { SetThreading(true); }
size_t GrpcExecutor::RunClosures(const char* executor_name, size_t Executor::RunClosures(const char* executor_name,
grpc_closure_list list) { grpc_closure_list list) {
size_t n = 0; size_t n = 0;
grpc_closure* c = list.head; grpc_closure* c = list.head;
@ -82,11 +132,11 @@ size_t GrpcExecutor::RunClosures(const char* executor_name,
return n; return n;
} }
bool GrpcExecutor::IsThreaded() const { bool Executor::IsThreaded() const {
return gpr_atm_acq_load(&num_threads_) > 0; return gpr_atm_acq_load(&num_threads_) > 0;
} }
void GrpcExecutor::SetThreading(bool threading) { void Executor::SetThreading(bool threading) {
gpr_atm curr_num_threads = gpr_atm_acq_load(&num_threads_); gpr_atm curr_num_threads = gpr_atm_acq_load(&num_threads_);
EXECUTOR_TRACE("(%s) SetThreading(%d) begin", name_, threading); EXECUTOR_TRACE("(%s) SetThreading(%d) begin", name_, threading);
@ -112,7 +162,7 @@ void GrpcExecutor::SetThreading(bool threading) {
} }
thd_state_[0].thd = thd_state_[0].thd =
grpc_core::Thread(name_, &GrpcExecutor::ThreadMain, &thd_state_[0]); grpc_core::Thread(name_, &Executor::ThreadMain, &thd_state_[0]);
thd_state_[0].thd.Start(); thd_state_[0].thd.Start();
} else { // !threading } else { // !threading
if (curr_num_threads == 0) { if (curr_num_threads == 0) {
@ -153,9 +203,9 @@ void GrpcExecutor::SetThreading(bool threading) {
EXECUTOR_TRACE("(%s) SetThreading(%d) done", name_, threading); EXECUTOR_TRACE("(%s) SetThreading(%d) done", name_, threading);
} }
void GrpcExecutor::Shutdown() { SetThreading(false); } void Executor::Shutdown() { SetThreading(false); }
void GrpcExecutor::ThreadMain(void* arg) { void Executor::ThreadMain(void* arg) {
ThreadState* ts = static_cast<ThreadState*>(arg); ThreadState* ts = static_cast<ThreadState*>(arg);
gpr_tls_set(&g_this_thread_state, reinterpret_cast<intptr_t>(ts)); gpr_tls_set(&g_this_thread_state, reinterpret_cast<intptr_t>(ts));
@ -192,8 +242,8 @@ void GrpcExecutor::ThreadMain(void* arg) {
} }
} }
void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, void Executor::Enqueue(grpc_closure* closure, grpc_error* error,
bool is_short) { bool is_short) {
bool retry_push; bool retry_push;
if (is_short) { if (is_short) {
GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(); GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS();
@ -304,7 +354,7 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
gpr_atm_rel_store(&num_threads_, cur_thread_count + 1); gpr_atm_rel_store(&num_threads_, cur_thread_count + 1);
thd_state_[cur_thread_count].thd = grpc_core::Thread( thd_state_[cur_thread_count].thd = grpc_core::Thread(
name_, &GrpcExecutor::ThreadMain, &thd_state_[cur_thread_count]); name_, &Executor::ThreadMain, &thd_state_[cur_thread_count]);
thd_state_[cur_thread_count].thd.Start(); thd_state_[cur_thread_count].thd.Start();
} }
gpr_spinlock_unlock(&adding_thread_lock_); gpr_spinlock_unlock(&adding_thread_lock_);
@ -316,85 +366,52 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
} while (retry_push); } while (retry_push);
} }
static GrpcExecutor* executors[GRPC_NUM_EXECUTORS]; // Executor::InitAll() and Executor::ShutdownAll() functions are called in the
void default_enqueue_short(grpc_closure* closure, grpc_error* error) {
executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error,
true /* is_short */);
}
void default_enqueue_long(grpc_closure* closure, grpc_error* error) {
executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error,
false /* is_short */);
}
void resolver_enqueue_short(grpc_closure* closure, grpc_error* error) {
executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error,
true /* is_short */);
}
void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) {
executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error,
false /* is_short */);
}
static const grpc_closure_scheduler_vtable
vtables_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = {
{{&default_enqueue_short, &default_enqueue_short, "def-ex-short"},
{&default_enqueue_long, &default_enqueue_long, "def-ex-long"}},
{{&resolver_enqueue_short, &resolver_enqueue_short, "res-ex-short"},
{&resolver_enqueue_long, &resolver_enqueue_long, "res-ex-long"}}};
static grpc_closure_scheduler
schedulers_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = {
{{&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_SHORT]},
{&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_LONG]}},
{{&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_SHORT]},
{&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_LONG]}}};
// grpc_executor_init() and grpc_executor_shutdown() functions are called in the
// the grpc_init() and grpc_shutdown() code paths which are protected by a // the grpc_init() and grpc_shutdown() code paths which are protected by a
// global mutex. So it is okay to assume that these functions are thread-safe // global mutex. So it is okay to assume that these functions are thread-safe
void grpc_executor_init() { void Executor::InitAll() {
EXECUTOR_TRACE0("grpc_executor_init() enter"); EXECUTOR_TRACE0("Executor::InitAll() enter");
// Return if grpc_executor_init() is already called earlier // Return if Executor::InitAll() is already called earlier
if (executors[GRPC_DEFAULT_EXECUTOR] != nullptr) { if (executors[static_cast<size_t>(ExecutorType::DEFAULT)] != nullptr) {
GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] != nullptr); GPR_ASSERT(executors[static_cast<size_t>(ExecutorType::RESOLVER)] !=
nullptr);
return; return;
} }
executors[GRPC_DEFAULT_EXECUTOR] = executors[static_cast<size_t>(ExecutorType::DEFAULT)] =
grpc_core::New<GrpcExecutor>("default-executor"); grpc_core::New<Executor>("default-executor");
executors[GRPC_RESOLVER_EXECUTOR] = executors[static_cast<size_t>(ExecutorType::RESOLVER)] =
grpc_core::New<GrpcExecutor>("resolver-executor"); grpc_core::New<Executor>("resolver-executor");
executors[GRPC_DEFAULT_EXECUTOR]->Init(); executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Init();
executors[GRPC_RESOLVER_EXECUTOR]->Init(); executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Init();
EXECUTOR_TRACE0("grpc_executor_init() done"); EXECUTOR_TRACE0("Executor::InitAll() done");
} }
grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type, grpc_closure_scheduler* Executor::Scheduler(ExecutorType executor_type,
GrpcExecutorJobType job_type) { ExecutorJobType job_type) {
return &schedulers_[executor_type][job_type]; return &schedulers_[static_cast<size_t>(executor_type)]
[static_cast<size_t>(job_type)];
} }
grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) { grpc_closure_scheduler* Executor::Scheduler(ExecutorJobType job_type) {
return grpc_executor_scheduler(GRPC_DEFAULT_EXECUTOR, job_type); return Executor::Scheduler(ExecutorType::DEFAULT, job_type);
} }
void grpc_executor_shutdown() { void Executor::ShutdownAll() {
EXECUTOR_TRACE0("grpc_executor_shutdown() enter"); EXECUTOR_TRACE0("Executor::ShutdownAll() enter");
// Return if grpc_executor_shutdown() is already called earlier // Return if Executor:SshutdownAll() is already called earlier
if (executors[GRPC_DEFAULT_EXECUTOR] == nullptr) { if (executors[static_cast<size_t>(ExecutorType::DEFAULT)] == nullptr) {
GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] == nullptr); GPR_ASSERT(executors[static_cast<size_t>(ExecutorType::RESOLVER)] ==
nullptr);
return; return;
} }
executors[GRPC_DEFAULT_EXECUTOR]->Shutdown(); executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Shutdown();
executors[GRPC_RESOLVER_EXECUTOR]->Shutdown(); executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Shutdown();
// Delete the executor objects. // Delete the executor objects.
// //
@ -408,26 +425,36 @@ void grpc_executor_shutdown() {
// By ensuring that all executors are shutdown first, we are also ensuring // By ensuring that all executors are shutdown first, we are also ensuring
// that no thread is active across all executors. // that no thread is active across all executors.
grpc_core::Delete<GrpcExecutor>(executors[GRPC_DEFAULT_EXECUTOR]); grpc_core::Delete<Executor>(
grpc_core::Delete<GrpcExecutor>(executors[GRPC_RESOLVER_EXECUTOR]); executors[static_cast<size_t>(ExecutorType::DEFAULT)]);
executors[GRPC_DEFAULT_EXECUTOR] = nullptr; grpc_core::Delete<Executor>(
executors[GRPC_RESOLVER_EXECUTOR] = nullptr; executors[static_cast<size_t>(ExecutorType::RESOLVER)]);
executors[static_cast<size_t>(ExecutorType::DEFAULT)] = nullptr;
executors[static_cast<size_t>(ExecutorType::RESOLVER)] = nullptr;
EXECUTOR_TRACE0("grpc_executor_shutdown() done"); EXECUTOR_TRACE0("Executor::ShutdownAll() done");
} }
bool grpc_executor_is_threaded(GrpcExecutorType executor_type) { bool Executor::IsThreaded(ExecutorType executor_type) {
GPR_ASSERT(executor_type < GRPC_NUM_EXECUTORS); GPR_ASSERT(executor_type < ExecutorType::NUM_EXECUTORS);
return executors[executor_type]->IsThreaded(); return executors[static_cast<size_t>(executor_type)]->IsThreaded();
} }
bool grpc_executor_is_threaded() { bool Executor::IsThreadedDefault() {
return grpc_executor_is_threaded(GRPC_DEFAULT_EXECUTOR); return Executor::IsThreaded(ExecutorType::DEFAULT);
} }
void grpc_executor_set_threading(bool enable) { void Executor::SetThreadingAll(bool enable) {
EXECUTOR_TRACE("grpc_executor_set_threading(%d) called", enable); EXECUTOR_TRACE("Executor::SetThreadingAll(%d) called", enable);
for (int i = 0; i < GRPC_NUM_EXECUTORS; i++) { for (size_t i = 0; i < static_cast<size_t>(ExecutorType::NUM_EXECUTORS);
i++) {
executors[i]->SetThreading(enable); executors[i]->SetThreading(enable);
} }
} }
void Executor::SetThreadingDefault(bool enable) {
EXECUTOR_TRACE("Executor::SetThreadingDefault(%d) called", enable);
executors[static_cast<size_t>(ExecutorType::DEFAULT)]->SetThreading(enable);
}
} // namespace grpc_core

@ -25,7 +25,9 @@
#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/closure.h"
typedef struct { namespace grpc_core {
struct ThreadState {
gpr_mu mu; gpr_mu mu;
size_t id; // For debugging purposes size_t id; // For debugging purposes
const char* name; // Thread state name const char* name; // Thread state name
@ -35,17 +37,24 @@ typedef struct {
bool shutdown; bool shutdown;
bool queued_long_job; bool queued_long_job;
grpc_core::Thread thd; grpc_core::Thread thd;
} ThreadState; };
typedef enum { enum class ExecutorType {
GRPC_EXECUTOR_SHORT = 0, DEFAULT = 0,
GRPC_EXECUTOR_LONG, RESOLVER,
GRPC_NUM_EXECUTOR_JOB_TYPES // Add new values above this
} GrpcExecutorJobType; NUM_EXECUTORS // Add new values above this
};
class GrpcExecutor { enum class ExecutorJobType {
SHORT = 0,
LONG,
NUM_JOB_TYPES // Add new values above this
};
class Executor {
public: public:
GrpcExecutor(const char* executor_name); Executor(const char* executor_name);
void Init(); void Init();
@ -62,55 +71,51 @@ class GrpcExecutor {
* a short job (i.e expected to not block and complete quickly) */ * a short job (i.e expected to not block and complete quickly) */
void Enqueue(grpc_closure* closure, grpc_error* error, bool is_short); void Enqueue(grpc_closure* closure, grpc_error* error, bool is_short);
private: // TODO(sreek): Currently we have two executors (available globally): The
static size_t RunClosures(const char* executor_name, grpc_closure_list list); // default executor and the resolver executor.
static void ThreadMain(void* arg); //
// Some of the functions below operate on the DEFAULT executor only while some
// operate of ALL the executors. This is a bit confusing and should be cleaned
// up in future (where we make all the following functions take ExecutorType
// and/or JobType)
const char* name_; // Initialize ALL the executors
ThreadState* thd_state_; static void InitAll();
size_t max_threads_;
gpr_atm num_threads_;
gpr_spinlock adding_thread_lock_;
};
// == Global executor functions ==
typedef enum { // Shutdown ALL the executors
GRPC_DEFAULT_EXECUTOR = 0, static void ShutdownAll();
GRPC_RESOLVER_EXECUTOR,
GRPC_NUM_EXECUTORS // Add new values above this // Set the threading mode for ALL the executors
} GrpcExecutorType; static void SetThreadingAll(bool enable);
// TODO(sreek): Currently we have two executors (available globally): The // Set the threading mode for ALL the executors
// default executor and the resolver executor. static void SetThreadingDefault(bool enable);
//
// Some of the functions below operate on the DEFAULT executor only while some
// operate of ALL the executors. This is a bit confusing and should be cleaned
// up in future (where we make all the following functions take executor_type
// and/or job_type)
// Initialize ALL the executors // Get the DEFAULT executor scheduler for the given job_type
void grpc_executor_init(); static grpc_closure_scheduler* Scheduler(ExecutorJobType job_type);
// Shutdown ALL the executors // Get the executor scheduler for a given executor_type and a job_type
void grpc_executor_shutdown(); static grpc_closure_scheduler* Scheduler(ExecutorType executor_type,
ExecutorJobType job_type);
// Set the threading mode for ALL the executors // Return if a given executor is running in threaded mode (i.e if
void grpc_executor_set_threading(bool enable); // SetThreading(true) was called previously on that executor)
static bool IsThreaded(ExecutorType executor_type);
// Get the DEFAULT executor scheduler for the given job_type // Return if the DEFAULT executor is threaded
grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type); static bool IsThreadedDefault();
// Get the executor scheduler for a given executor_type and a job_type private:
grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type, static size_t RunClosures(const char* executor_name, grpc_closure_list list);
GrpcExecutorJobType job_type); static void ThreadMain(void* arg);
// Return if a given executor is running in threaded mode (i.e if const char* name_;
// grpc_executor_set_threading(true) was called previously on that executor) ThreadState* thd_state_;
bool grpc_executor_is_threaded(GrpcExecutorType executor_type); size_t max_threads_;
gpr_atm num_threads_;
gpr_spinlock adding_thread_lock_;
};
// Return if the DEFAULT executor is threaded } // namespace grpc_core
bool grpc_executor_is_threaded();
#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */ #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */

@ -71,7 +71,7 @@ void grpc_prefork() {
return; return;
} }
grpc_timer_manager_set_threading(false); grpc_timer_manager_set_threading(false);
grpc_executor_set_threading(false); grpc_core::Executor::SetThreadingAll(false);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
grpc_core::Fork::AwaitThreads(); grpc_core::Fork::AwaitThreads();
skipped_handler = false; skipped_handler = false;
@ -82,7 +82,7 @@ void grpc_postfork_parent() {
grpc_core::Fork::AllowExecCtx(); grpc_core::Fork::AllowExecCtx();
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
grpc_timer_manager_set_threading(true); grpc_timer_manager_set_threading(true);
grpc_executor_set_threading(true); grpc_core::Executor::SetThreadingAll(true);
} }
} }
@ -96,7 +96,7 @@ void grpc_postfork_child() {
reset_polling_engine(); reset_polling_engine();
} }
grpc_timer_manager_set_threading(true); grpc_timer_manager_set_threading(true);
grpc_executor_set_threading(true); grpc_core::Executor::SetThreadingAll(true);
} }
} }

@ -52,7 +52,7 @@ void grpc_iomgr_init() {
g_shutdown = 0; g_shutdown = 0;
gpr_mu_init(&g_mu); gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv); gpr_cv_init(&g_rcv);
grpc_executor_init(); grpc_core::Executor::InitAll();
grpc_timer_list_init(); grpc_timer_list_init();
g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = (char*)"root"; g_root_object.name = (char*)"root";
@ -88,7 +88,7 @@ void grpc_iomgr_shutdown() {
{ {
grpc_timer_manager_shutdown(); grpc_timer_manager_shutdown();
grpc_iomgr_platform_flush(); grpc_iomgr_platform_flush();
grpc_executor_shutdown(); grpc_core::Executor::ShutdownAll();
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
g_shutdown = 1; g_shutdown = 1;

@ -34,7 +34,7 @@ gpr_thd_id g_init_thread;
static void iomgr_platform_init(void) { static void iomgr_platform_init(void) {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
grpc_executor_set_threading(false); grpc_core::Executor::SetThreadingAll(false);
g_init_thread = gpr_thd_currentid(); g_init_thread = gpr_thd_currentid();
grpc_pollset_global_init(); grpc_pollset_global_init();
} }

@ -150,7 +150,7 @@ typedef struct {
void* arg; void* arg;
} request; } request;
/* Callback to be passed to grpc_executor to asynch-ify /* Callback to be passed to grpc Executor to asynch-ify
* grpc_blocking_resolve_address */ * grpc_blocking_resolve_address */
static void do_request_thread(void* rp, grpc_error* error) { static void do_request_thread(void* rp, grpc_error* error) {
request* r = static_cast<request*>(rp); request* r = static_cast<request*>(rp);
@ -168,7 +168,8 @@ static void posix_resolve_address(const char* name, const char* default_port,
request* r = static_cast<request*>(gpr_malloc(sizeof(request))); request* r = static_cast<request*>(gpr_malloc(sizeof(request)));
GRPC_CLOSURE_INIT( GRPC_CLOSURE_INIT(
&r->request_closure, do_request_thread, r, &r->request_closure, do_request_thread, r,
grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT)); grpc_core::Executor::Scheduler(grpc_core::ExecutorType::RESOLVER,
grpc_core::ExecutorJobType::SHORT));
r->name = gpr_strdup(name); r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port); r->default_port = gpr_strdup(default_port);
r->on_done = on_done; r->on_done = on_done;

@ -153,7 +153,8 @@ static void windows_resolve_address(const char* name, const char* default_port,
request* r = (request*)gpr_malloc(sizeof(request)); request* r = (request*)gpr_malloc(sizeof(request));
GRPC_CLOSURE_INIT( GRPC_CLOSURE_INIT(
&r->request_closure, do_request_thread, r, &r->request_closure, do_request_thread, r,
grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT)); grpc_core::Executor::Scheduler(grpc_core::ExecutorType::RESOLVER,
grpc_core::ExecutorJobType::SHORT));
r->name = gpr_strdup(name); r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port); r->default_port = gpr_strdup(default_port);
r->on_done = on_done; r->on_done = on_done;

@ -227,10 +227,10 @@ static void cover_self(grpc_tcp* tcp) {
} }
grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu); grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu);
gpr_atm_rel_store(&g_backup_poller, (gpr_atm)p); gpr_atm_rel_store(&g_backup_poller, (gpr_atm)p);
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p,
GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, grpc_core::Executor::Scheduler(
grpc_executor_scheduler(GRPC_EXECUTOR_LONG)), grpc_core::ExecutorJobType::LONG)),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} else { } else {
while ((p = (backup_poller*)gpr_atm_acq_load(&g_backup_poller)) == while ((p = (backup_poller*)gpr_atm_acq_load(&g_backup_poller)) ==
nullptr) { nullptr) {

@ -481,8 +481,9 @@ void GrpcUdpListener::OnRead(grpc_error* error, void* do_read_arg) {
if (udp_handler_->Read()) { if (udp_handler_->Read()) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run /* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */ * after finishing this event loop. */
GRPC_CLOSURE_INIT(&do_read_closure_, do_read, do_read_arg, GRPC_CLOSURE_INIT(
grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); &do_read_closure_, do_read, do_read_arg,
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::LONG));
GRPC_CLOSURE_SCHED(&do_read_closure_, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(&do_read_closure_, GRPC_ERROR_NONE);
} else { } else {
/* Finish reading all the packets, re-arm the notification event so we can /* Finish reading all the packets, re-arm the notification event so we can
@ -542,8 +543,9 @@ void GrpcUdpListener::OnCanWrite(grpc_error* error, void* do_write_arg) {
} }
/* Schedule actual write in another thread. */ /* Schedule actual write in another thread. */
GRPC_CLOSURE_INIT(&do_write_closure_, do_write, do_write_arg, GRPC_CLOSURE_INIT(
grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); &do_write_closure_, do_write, do_write_arg,
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::LONG));
GRPC_CLOSURE_SCHED(&do_write_closure_, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(&do_write_closure_, GRPC_ERROR_NONE);
} }

@ -165,7 +165,7 @@ void grpc_shutdown(void) {
{ {
grpc_timer_manager_set_threading( grpc_timer_manager_set_threading(
false); // shutdown timer_manager thread false); // shutdown timer_manager thread
grpc_executor_shutdown(); grpc_core::Executor::ShutdownAll();
for (i = g_number_of_plugins; i >= 0; i--) { for (i = g_number_of_plugins; i >= 0; i--) {
if (g_all_of_the_plugins[i].destroy != nullptr) { if (g_all_of_the_plugins[i].destroy != nullptr) {
g_all_of_the_plugins[i].destroy(); g_all_of_the_plugins[i].destroy();

@ -1134,8 +1134,9 @@ void grpc_server_start(grpc_server* server) {
server_ref(server); server_ref(server);
server->starting = true; server->starting = true;
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_CREATE(start_listeners, server, GRPC_CLOSURE_CREATE(
grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)), start_listeners, server,
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }

@ -73,7 +73,7 @@ void grpc_stream_unref(grpc_stream_refcount* refcount) {
Throw this over to the executor (on a core-owned thread) and process it Throw this over to the executor (on a core-owned thread) and process it
there. */ there. */
refcount->destroy.scheduler = refcount->destroy.scheduler =
grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
} }
GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE);
} }

@ -706,7 +706,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_timer_manager_set_threading(false); grpc_timer_manager_set_threading(false);
{ {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
grpc_executor_set_threading(false); grpc_core::Executor::SetThreadingAll(false);
} }
grpc_set_resolver_impl(&fuzzer_resolver); grpc_set_resolver_impl(&fuzzer_resolver);
grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked; grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked;

@ -46,7 +46,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_init(); grpc_init();
{ {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
grpc_executor_set_threading(false); grpc_core::Executor::SetThreadingAll(false);
grpc_resource_quota* resource_quota = grpc_resource_quota* resource_quota =
grpc_resource_quota_create("client_fuzzer"); grpc_resource_quota_create("client_fuzzer");

@ -43,7 +43,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_init(); grpc_init();
{ {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
grpc_executor_set_threading(false); grpc_core::Executor::SetThreadingAll(false);
grpc_resource_quota* resource_quota = grpc_resource_quota* resource_quota =
grpc_resource_quota_create("server_fuzzer"); grpc_resource_quota_create("server_fuzzer");

@ -290,7 +290,7 @@ int main(int argc, char** argv) {
test_invalid_ip_addresses(); test_invalid_ip_addresses();
test_unparseable_hostports(); test_unparseable_hostports();
} }
grpc_executor_shutdown(); grpc_core::Executor::ShutdownAll();
} }
gpr_cmdline_destroy(cl); gpr_cmdline_destroy(cl);

Loading…
Cancel
Save