Merge pull request #20724 from yashykt/executornew

Move scheduling logic of executor closures to run time rather than initialization time
pull/20674/head
Yash Tibrewal 6 years ago committed by GitHub
commit 8a6d71c284
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      src/core/lib/iomgr/combiner.cc
  2. 41
      src/core/lib/iomgr/executor.cc
  3. 11
      src/core/lib/iomgr/executor.h
  4. 8
      src/core/lib/iomgr/resolve_address_posix.cc
  5. 8
      src/core/lib/iomgr/resolve_address_windows.cc
  6. 12
      src/core/lib/iomgr/tcp_posix.cc
  7. 20
      src/core/lib/iomgr/udp_server.cc
  8. 13
      src/core/lib/surface/completion_queue.cc
  9. 6
      src/core/lib/transport/transport.cc
  10. 6
      src/cpp/client/secure_credentials.cc

@ -57,9 +57,7 @@ grpc_core::Combiner* grpc_combiner_create(void) {
gpr_ref_init(&lock->refs, 1);
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
grpc_closure_list_init(&lock->final_list);
GRPC_CLOSURE_INIT(
&lock->offload, offload, lock,
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT));
GRPC_CLOSURE_INIT(&lock->offload, offload, lock, nullptr);
GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p create", lock));
return lock;
}
@ -177,7 +175,7 @@ static void queue_offload(grpc_core::Combiner* lock) {
GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED();
move_next();
GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p queue_offload", lock));
GRPC_CLOSURE_SCHED(&lock->offload, GRPC_ERROR_NONE);
grpc_core::Executor::Run(&lock->offload, GRPC_ERROR_NONE);
}
bool grpc_combiner_continue_exec_ctx() {

@ -77,28 +77,13 @@ void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) {
closure, error, false /* is_short */);
}
const grpc_closure_scheduler_vtable
vtables_[static_cast<size_t>(ExecutorType::NUM_EXECUTORS)]
[static_cast<size_t>(ExecutorJobType::NUM_JOB_TYPES)] = {
{{&default_enqueue_short, &default_enqueue_short,
"def-ex-short"},
{&default_enqueue_long, &default_enqueue_long, "def-ex-long"}},
{{&resolver_enqueue_short, &resolver_enqueue_short,
"res-ex-short"},
{&resolver_enqueue_long, &resolver_enqueue_long,
"res-ex-long"}}};
grpc_closure_scheduler
schedulers_[static_cast<size_t>(ExecutorType::NUM_EXECUTORS)]
[static_cast<size_t>(ExecutorJobType::NUM_JOB_TYPES)] = {
{{&vtables_[static_cast<size_t>(ExecutorType::DEFAULT)]
[static_cast<size_t>(ExecutorJobType::SHORT)]},
{&vtables_[static_cast<size_t>(ExecutorType::DEFAULT)]
[static_cast<size_t>(ExecutorJobType::LONG)]}},
{{&vtables_[static_cast<size_t>(ExecutorType::RESOLVER)]
[static_cast<size_t>(ExecutorJobType::SHORT)]},
{&vtables_[static_cast<size_t>(ExecutorType::RESOLVER)]
[static_cast<size_t>(ExecutorJobType::LONG)]}}};
using EnqueueFunc = void (*)(grpc_closure* closure, grpc_error* error);
const EnqueueFunc
executor_enqueue_fns_[static_cast<size_t>(ExecutorType::NUM_EXECUTORS)]
[static_cast<size_t>(ExecutorJobType::NUM_JOB_TYPES)] =
{{default_enqueue_short, default_enqueue_long},
{resolver_enqueue_short, resolver_enqueue_long}};
} // namespace
@ -418,14 +403,10 @@ void Executor::InitAll() {
EXECUTOR_TRACE0("Executor::InitAll() done");
}
grpc_closure_scheduler* Executor::Scheduler(ExecutorType executor_type,
ExecutorJobType job_type) {
return &schedulers_[static_cast<size_t>(executor_type)]
[static_cast<size_t>(job_type)];
}
grpc_closure_scheduler* Executor::Scheduler(ExecutorJobType job_type) {
return Executor::Scheduler(ExecutorType::DEFAULT, job_type);
void Executor::Run(grpc_closure* closure, grpc_error* error,
ExecutorType executor_type, ExecutorJobType job_type) {
executor_enqueue_fns_[static_cast<size_t>(executor_type)]
[static_cast<size_t>(job_type)](closure, error);
}
void Executor::ShutdownAll() {

@ -83,6 +83,10 @@ class Executor {
// Initialize ALL the executors
static void InitAll();
static void Run(grpc_closure* closure, grpc_error* error,
ExecutorType executor_type = ExecutorType::DEFAULT,
ExecutorJobType job_type = ExecutorJobType::SHORT);
// Shutdown ALL the executors
static void ShutdownAll();
@ -92,13 +96,6 @@ class Executor {
// Set the threading mode for ALL the executors
static void SetThreadingDefault(bool enable);
// Get the DEFAULT executor scheduler for the given job_type
static grpc_closure_scheduler* Scheduler(ExecutorJobType job_type);
// Get the executor scheduler for a given executor_type and a job_type
static grpc_closure_scheduler* Scheduler(ExecutorType executor_type,
ExecutorJobType job_type);
// Return if a given executor is running in threaded mode (i.e if
// SetThreading(true) was called previously on that executor)
static bool IsThreaded(ExecutorType executor_type);

@ -164,15 +164,13 @@ static void posix_resolve_address(const char* name, const char* default_port,
grpc_closure* on_done,
grpc_resolved_addresses** addrs) {
request* r = static_cast<request*>(gpr_malloc(sizeof(request)));
GRPC_CLOSURE_INIT(
&r->request_closure, do_request_thread, r,
grpc_core::Executor::Scheduler(grpc_core::ExecutorType::RESOLVER,
grpc_core::ExecutorJobType::SHORT));
GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, nullptr);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
r->addrs_out = addrs;
GRPC_CLOSURE_SCHED(&r->request_closure, GRPC_ERROR_NONE);
grpc_core::Executor::Run(&r->request_closure, GRPC_ERROR_NONE,
grpc_core::ExecutorType::RESOLVER);
}
grpc_address_resolver_vtable grpc_posix_resolver_vtable = {

@ -149,15 +149,13 @@ static void windows_resolve_address(const char* name, const char* default_port,
grpc_closure* on_done,
grpc_resolved_addresses** addresses) {
request* r = (request*)gpr_malloc(sizeof(request));
GRPC_CLOSURE_INIT(
&r->request_closure, do_request_thread, r,
grpc_core::Executor::Scheduler(grpc_core::ExecutorType::RESOLVER,
grpc_core::ExecutorJobType::SHORT));
GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, nullptr);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
r->addresses = addresses;
GRPC_CLOSURE_SCHED(&r->request_closure, GRPC_ERROR_NONE);
grpc_core::Executor::Run(&r->request_closure, GRPC_ERROR_NONE,
grpc_core::ExecutorType::RESOLVER);
}
grpc_address_resolver_vtable grpc_windows_resolver_vtable = {

@ -202,7 +202,9 @@ static void run_poller(void* bp, grpc_error* /*error_ignored*/) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "BACKUP_POLLER:%p reschedule", p);
}
GRPC_CLOSURE_SCHED(&p->run_poller, GRPC_ERROR_NONE);
grpc_core::Executor::Run(&p->run_poller, GRPC_ERROR_NONE,
grpc_core::ExecutorType::DEFAULT,
grpc_core::ExecutorJobType::LONG);
}
}
@ -241,10 +243,10 @@ static void cover_self(grpc_tcp* tcp) {
}
grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu);
gpr_atm_rel_store(&g_backup_poller, (gpr_atm)p);
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p,
grpc_core::Executor::Scheduler(
grpc_core::ExecutorJobType::LONG)),
GRPC_ERROR_NONE);
grpc_core::Executor::Run(
GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, nullptr),
GRPC_ERROR_NONE, grpc_core::ExecutorType::DEFAULT,
grpc_core::ExecutorJobType::LONG);
} else {
while ((p = (backup_poller*)gpr_atm_acq_load(&g_backup_poller)) ==
nullptr) {

@ -448,7 +448,9 @@ void GrpcUdpListener::do_read(void* arg, grpc_error* error) {
if (!sp->already_shutdown_ && sp->udp_handler_->Read()) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */
GRPC_CLOSURE_SCHED(&sp->do_read_closure_, GRPC_ERROR_NONE);
grpc_core::Executor::Run(&sp->do_read_closure_, GRPC_ERROR_NONE,
grpc_core::ExecutorType::DEFAULT,
grpc_core::ExecutorJobType::LONG);
} else {
/* Finish reading all the packets, re-arm the notification event so we can
* get another chance to read. Or fd already shutdown, re-arm to get a
@ -481,10 +483,10 @@ void GrpcUdpListener::OnRead(grpc_error* error, void* do_read_arg) {
if (udp_handler_->Read()) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */
GRPC_CLOSURE_INIT(
&do_read_closure_, do_read, do_read_arg,
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::LONG));
GRPC_CLOSURE_SCHED(&do_read_closure_, GRPC_ERROR_NONE);
GRPC_CLOSURE_INIT(&do_read_closure_, do_read, do_read_arg, nullptr);
grpc_core::Executor::Run(&do_read_closure_, GRPC_ERROR_NONE,
grpc_core::ExecutorType::DEFAULT,
grpc_core::ExecutorJobType::LONG);
} else {
/* Finish reading all the packets, re-arm the notification event so we can
* get another chance to read. Or fd already shutdown, re-arm to get a
@ -544,11 +546,11 @@ void GrpcUdpListener::OnCanWrite(grpc_error* error, void* do_write_arg) {
}
/* Schedule actual write in another thread. */
GRPC_CLOSURE_INIT(
&do_write_closure_, do_write, do_write_arg,
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::LONG));
GRPC_CLOSURE_INIT(&do_write_closure_, do_write, do_write_arg, nullptr);
GRPC_CLOSURE_SCHED(&do_write_closure_, GRPC_ERROR_NONE);
grpc_core::Executor::Run(&do_write_closure_, GRPC_ERROR_NONE,
grpc_core::ExecutorType::DEFAULT,
grpc_core::ExecutorJobType::LONG);
}
static int add_socket_to_server(grpc_udp_server* s, int fd,

@ -860,11 +860,8 @@ static void cq_end_op_for_callback(
// Schedule the callback on a closure if not internal or triggered
// from a background poller thread.
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_CREATE(
functor_callback, functor,
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)),
error);
grpc_core::Executor::Run(
GRPC_CLOSURE_CREATE(functor_callback, functor, nullptr), error);
}
void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
@ -1357,10 +1354,8 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
// Schedule the callback on a closure if not internal or triggered
// from a background poller thread.
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_CREATE(
functor_callback, callback,
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)),
grpc_core::Executor::Run(
GRPC_CLOSURE_CREATE(functor_callback, callback, nullptr),
GRPC_ERROR_NONE);
}

@ -50,10 +50,10 @@ void grpc_stream_destroy(grpc_stream_refcount* refcount) {
cope with.
Throw this over to the executor (on a core-owned thread) and process it
there. */
refcount->destroy.scheduler =
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
grpc_core::Executor::Run(&refcount->destroy, GRPC_ERROR_NONE);
} else {
GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE);
}
GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE);
}
void slice_stream_destroy(void* arg) {

@ -393,10 +393,8 @@ void MetadataCredentialsPluginWrapper::Destroy(void* wrapper) {
if (wrapper == nullptr) return;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_CLOSURE_RUN(GRPC_CLOSURE_CREATE(DeleteWrapper, wrapper,
grpc_core::Executor::Scheduler(
grpc_core::ExecutorJobType::SHORT)),
GRPC_ERROR_NONE);
grpc_core::Executor::Run(GRPC_CLOSURE_CREATE(DeleteWrapper, wrapper, nullptr),
GRPC_ERROR_NONE);
}
int MetadataCredentialsPluginWrapper::GetMetadata(

Loading…
Cancel
Save