From 50a8141983c543b8fbf1dfcfeb59afb0b75f10dc Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Mon, 21 Oct 2019 17:21:10 -0700 Subject: [PATCH] Move scheduling logic of executor closures to run time rather than initialization time --- src/core/lib/iomgr/combiner.cc | 6 +-- src/core/lib/iomgr/executor.cc | 41 +++++-------------- src/core/lib/iomgr/executor.h | 11 ++--- src/core/lib/iomgr/resolve_address_posix.cc | 8 ++-- src/core/lib/iomgr/resolve_address_windows.cc | 9 ++-- src/core/lib/iomgr/tcp_posix.cc | 12 +++--- src/core/lib/iomgr/udp_server.cc | 20 +++++---- src/core/lib/surface/completion_queue.cc | 13 ++---- src/core/lib/transport/transport.cc | 6 +-- src/cpp/client/secure_credentials.cc | 6 +-- 10 files changed, 51 insertions(+), 81 deletions(-) diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index cfb4bbdd96a..ff3292d00d8 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -57,9 +57,7 @@ grpc_core::Combiner* grpc_combiner_create(void) { gpr_ref_init(&lock->refs, 1); gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); grpc_closure_list_init(&lock->final_list); - GRPC_CLOSURE_INIT( - &lock->offload, offload, lock, - grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)); + GRPC_CLOSURE_INIT(&lock->offload, offload, lock, nullptr); GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p create", lock)); return lock; } @@ -177,7 +175,7 @@ static void queue_offload(grpc_core::Combiner* lock) { GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(); move_next(); GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p queue_offload", lock)); - GRPC_CLOSURE_SCHED(&lock->offload, GRPC_ERROR_NONE); + grpc_core::Executor::Run(&lock->offload, GRPC_ERROR_NONE); } bool grpc_combiner_continue_exec_ctx() { diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index e8a2624c9ce..f271f17b101 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -77,28 +77,13 @@ void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) { closure, error, false /* is_short */); } -const grpc_closure_scheduler_vtable - vtables_[static_cast(ExecutorType::NUM_EXECUTORS)] - [static_cast(ExecutorJobType::NUM_JOB_TYPES)] = { - {{&default_enqueue_short, &default_enqueue_short, - "def-ex-short"}, - {&default_enqueue_long, &default_enqueue_long, "def-ex-long"}}, - {{&resolver_enqueue_short, &resolver_enqueue_short, - "res-ex-short"}, - {&resolver_enqueue_long, &resolver_enqueue_long, - "res-ex-long"}}}; - -grpc_closure_scheduler - schedulers_[static_cast(ExecutorType::NUM_EXECUTORS)] - [static_cast(ExecutorJobType::NUM_JOB_TYPES)] = { - {{&vtables_[static_cast(ExecutorType::DEFAULT)] - [static_cast(ExecutorJobType::SHORT)]}, - {&vtables_[static_cast(ExecutorType::DEFAULT)] - [static_cast(ExecutorJobType::LONG)]}}, - {{&vtables_[static_cast(ExecutorType::RESOLVER)] - [static_cast(ExecutorJobType::SHORT)]}, - {&vtables_[static_cast(ExecutorType::RESOLVER)] - [static_cast(ExecutorJobType::LONG)]}}}; +using enqueue_func = void (*)(grpc_closure* closure, grpc_error* error); + +const enqueue_func + executor_enqueue_fns_[static_cast(ExecutorType::NUM_EXECUTORS)] + [static_cast(ExecutorJobType::NUM_JOB_TYPES)] = + {{default_enqueue_short, default_enqueue_long}, + {resolver_enqueue_short, resolver_enqueue_long}}; } // namespace @@ -418,14 +403,10 @@ void Executor::InitAll() { EXECUTOR_TRACE0("Executor::InitAll() done"); } -grpc_closure_scheduler* Executor::Scheduler(ExecutorType executor_type, - ExecutorJobType job_type) { - return &schedulers_[static_cast(executor_type)] - [static_cast(job_type)]; -} - -grpc_closure_scheduler* Executor::Scheduler(ExecutorJobType job_type) { - return Executor::Scheduler(ExecutorType::DEFAULT, job_type); +void Executor::Run(grpc_closure* closure, grpc_error* error, + ExecutorType executor_type, ExecutorJobType job_type) { + executor_enqueue_fns_[static_cast(executor_type)] + [static_cast(job_type)](closure, error); } void Executor::ShutdownAll() { diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index 8177c7f4625..71337004841 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -83,6 +83,10 @@ class Executor { // Initialize ALL the executors static void InitAll(); + static void Run(grpc_closure* closure, grpc_error* error, + ExecutorType executor_type = ExecutorType::DEFAULT, + ExecutorJobType job_type = ExecutorJobType::SHORT); + // Shutdown ALL the executors static void ShutdownAll(); @@ -92,13 +96,6 @@ class Executor { // Set the threading mode for ALL the executors static void SetThreadingDefault(bool enable); - // Get the DEFAULT executor scheduler for the given job_type - static grpc_closure_scheduler* Scheduler(ExecutorJobType job_type); - - // Get the executor scheduler for a given executor_type and a job_type - static grpc_closure_scheduler* Scheduler(ExecutorType executor_type, - ExecutorJobType job_type); - // Return if a given executor is running in threaded mode (i.e if // SetThreading(true) was called previously on that executor) static bool IsThreaded(ExecutorType executor_type); diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc index e02dc19bb27..84c7bd01a69 100644 --- a/src/core/lib/iomgr/resolve_address_posix.cc +++ b/src/core/lib/iomgr/resolve_address_posix.cc @@ -164,15 +164,13 @@ static void posix_resolve_address(const char* name, const char* default_port, grpc_closure* on_done, grpc_resolved_addresses** addrs) { request* r = static_cast(gpr_malloc(sizeof(request))); - GRPC_CLOSURE_INIT( - &r->request_closure, do_request_thread, r, - grpc_core::Executor::Scheduler(grpc_core::ExecutorType::RESOLVER, - grpc_core::ExecutorJobType::SHORT)); + GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, nullptr); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; r->addrs_out = addrs; - GRPC_CLOSURE_SCHED(&r->request_closure, GRPC_ERROR_NONE); + grpc_core::Executor::Run(&r->request_closure, GRPC_ERROR_NONE, + grpc_core::ExecutorType::RESOLVER); } grpc_address_resolver_vtable grpc_posix_resolver_vtable = { diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc index a06d5cefbcb..81091a07887 100644 --- a/src/core/lib/iomgr/resolve_address_windows.cc +++ b/src/core/lib/iomgr/resolve_address_windows.cc @@ -149,15 +149,14 @@ static void windows_resolve_address(const char* name, const char* default_port, grpc_closure* on_done, grpc_resolved_addresses** addresses) { request* r = (request*)gpr_malloc(sizeof(request)); - GRPC_CLOSURE_INIT( - &r->request_closure, do_request_thread, r, - grpc_core::Executor::Scheduler(grpc_core::ExecutorType::RESOLVER, - grpc_core::ExecutorJobType::SHORT)); + GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, nullptr); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; r->addresses = addresses; - GRPC_CLOSURE_SCHED(&r->request_closure, GRPC_ERROR_NONE); + grpc_core::Executor::Run(&r->request_closure, GRPC_ERROR_NONE, + grpc_core::ExecutorType::RESOLVER, + grpc_core::ExecutorJobType::SHORT); } grpc_address_resolver_vtable grpc_windows_resolver_vtable = { diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 3c7c5496281..0337b114b2d 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -202,7 +202,9 @@ static void run_poller(void* bp, grpc_error* error_ignored) { if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { gpr_log(GPR_INFO, "BACKUP_POLLER:%p reschedule", p); } - GRPC_CLOSURE_SCHED(&p->run_poller, GRPC_ERROR_NONE); + grpc_core::Executor::Run(&p->run_poller, GRPC_ERROR_NONE, + grpc_core::ExecutorType::DEFAULT, + grpc_core::ExecutorJobType::LONG); } } @@ -241,10 +243,10 @@ static void cover_self(grpc_tcp* tcp) { } grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu); gpr_atm_rel_store(&g_backup_poller, (gpr_atm)p); - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, - grpc_core::Executor::Scheduler( - grpc_core::ExecutorJobType::LONG)), - GRPC_ERROR_NONE); + grpc_core::Executor::Run( + GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, nullptr), + GRPC_ERROR_NONE, grpc_core::ExecutorType::DEFAULT, + grpc_core::ExecutorJobType::LONG); } else { while ((p = (backup_poller*)gpr_atm_acq_load(&g_backup_poller)) == nullptr) { diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 3e853945555..1ca6677a78b 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -448,7 +448,9 @@ void GrpcUdpListener::do_read(void* arg, grpc_error* error) { if (!sp->already_shutdown_ && sp->udp_handler_->Read()) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ - GRPC_CLOSURE_SCHED(&sp->do_read_closure_, GRPC_ERROR_NONE); + grpc_core::Executor::Run(&sp->do_read_closure_, GRPC_ERROR_NONE, + grpc_core::ExecutorType::DEFAULT, + grpc_core::ExecutorJobType::LONG); } else { /* Finish reading all the packets, re-arm the notification event so we can * get another chance to read. Or fd already shutdown, re-arm to get a @@ -481,10 +483,10 @@ void GrpcUdpListener::OnRead(grpc_error* error, void* do_read_arg) { if (udp_handler_->Read()) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ - GRPC_CLOSURE_INIT( - &do_read_closure_, do_read, do_read_arg, - grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::LONG)); - GRPC_CLOSURE_SCHED(&do_read_closure_, GRPC_ERROR_NONE); + GRPC_CLOSURE_INIT(&do_read_closure_, do_read, do_read_arg, nullptr); + grpc_core::Executor::Run(&do_read_closure_, GRPC_ERROR_NONE, + grpc_core::ExecutorType::DEFAULT, + grpc_core::ExecutorJobType::LONG); } else { /* Finish reading all the packets, re-arm the notification event so we can * get another chance to read. Or fd already shutdown, re-arm to get a @@ -543,11 +545,11 @@ void GrpcUdpListener::OnCanWrite(grpc_error* error, void* do_write_arg) { } /* Schedule actual write in another thread. */ - GRPC_CLOSURE_INIT( - &do_write_closure_, do_write, do_write_arg, - grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::LONG)); + GRPC_CLOSURE_INIT(&do_write_closure_, do_write, do_write_arg, nullptr); - GRPC_CLOSURE_SCHED(&do_write_closure_, GRPC_ERROR_NONE); + grpc_core::Executor::Run(&do_write_closure_, GRPC_ERROR_NONE, + grpc_core::ExecutorType::DEFAULT, + grpc_core::ExecutorJobType::LONG); } static int add_socket_to_server(grpc_udp_server* s, int fd, diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index bb249331e12..646d2c5fa61 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -860,11 +860,8 @@ static void cq_end_op_for_callback( // Schedule the callback on a closure if not internal or triggered // from a background poller thread. - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE( - functor_callback, functor, - grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), - error); + grpc_core::Executor::Run( + GRPC_CLOSURE_CREATE(functor_callback, functor, nullptr), error); } void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, @@ -1357,10 +1354,8 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { // Schedule the callback on a closure if not internal or triggered // from a background poller thread. - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE( - functor_callback, callback, - grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), + grpc_core::Executor::Run( + GRPC_CLOSURE_CREATE(functor_callback, callback, nullptr), GRPC_ERROR_NONE); } diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index 80ffabce0e5..6235d69c7a5 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -50,10 +50,10 @@ void grpc_stream_destroy(grpc_stream_refcount* refcount) { cope with. Throw this over to the executor (on a core-owned thread) and process it there. */ - refcount->destroy.scheduler = - grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT); + grpc_core::Executor::Run(&refcount->destroy, GRPC_ERROR_NONE); + } else { + GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE); } - GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE); } void slice_stream_destroy(void* arg) { diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index 15a6bec0d05..a5a0794be15 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -393,10 +393,8 @@ void MetadataCredentialsPluginWrapper::Destroy(void* wrapper) { if (wrapper == nullptr) return; grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; - GRPC_CLOSURE_RUN(GRPC_CLOSURE_CREATE(DeleteWrapper, wrapper, - grpc_core::Executor::Scheduler( - grpc_core::ExecutorJobType::SHORT)), - GRPC_ERROR_NONE); + grpc_core::Executor::Run(GRPC_CLOSURE_CREATE(DeleteWrapper, wrapper, nullptr), + GRPC_ERROR_NONE); } int MetadataCredentialsPluginWrapper::GetMetadata(