diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 5347d2692dc..5852f5582e0 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -806,7 +806,7 @@ static void set_write_state(grpc_chttp2_transport* t, * to be closed after all writes finish (for example, if we received a go-away * from peer while we had some pending writes) */ if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) { - GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); + grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write); if (t->close_transport_on_writes_finished != nullptr) { grpc_error* err = t->close_transport_on_writes_finished; t->close_transport_on_writes_finished = nullptr; @@ -1033,7 +1033,7 @@ static void write_action_end_locked(void* tp, grpc_error* error) { // write finishes, or the callbacks will be invoked when the stream is // closed. if (!closed) { - GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); + grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write); } t->combiner->FinallyRun( GRPC_CLOSURE_INIT(&t->write_action_begin_locked, @@ -1673,7 +1673,7 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { GPR_ASSERT(error != GRPC_ERROR_NONE); for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error)); - GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]); + grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &pq->lists[j]); } GRPC_ERROR_UNREF(error); } @@ -1759,7 +1759,8 @@ void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) { gpr_free(from); return; } - GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); + grpc_core::ExecCtx::RunList(DEBUG_LOCATION, + &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS); } diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 312ef54e80e..d4029f61c78 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -107,7 +107,8 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) { pq->inflight_id = t->ping_ctr; t->ping_ctr++; - GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]); + grpc_core::ExecCtx::RunList(DEBUG_LOCATION, + &pq->lists[GRPC_CHTTP2_PCL_INITIATE]); grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT], &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); grpc_slice_buffer_add(&t->outbuf, diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 60400c8017a..aae932e305c 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -52,21 +52,6 @@ typedef struct grpc_closure_list { * the closure scheduler will do that after the cb returns */ typedef void (*grpc_iomgr_cb_func)(void* arg, grpc_error* error); -typedef struct grpc_closure_scheduler grpc_closure_scheduler; - -typedef struct grpc_closure_scheduler_vtable { - /* NOTE: for all these functions, closure->scheduler == the scheduler that was - used to find this vtable */ - void (*run)(grpc_closure* closure, grpc_error* error); - void (*sched)(grpc_closure* closure, grpc_error* error); - const char* name; -} grpc_closure_scheduler_vtable; - -/** Abstract type that can schedule closures for execution */ -struct grpc_closure_scheduler { - const grpc_closure_scheduler_vtable* vtable; -}; - /** A closure over a grpc_iomgr_cb_func. */ struct grpc_closure { /** Once queued, next indicates the next queued closure; before then, scratch @@ -85,10 +70,6 @@ struct grpc_closure { /** Arguments to be passed to "cb". */ void* cb_arg; - /** Scheduler to schedule against: nullptr to schedule against current - execution context */ - grpc_closure_scheduler* scheduler; - /** Once queued, the result of the closure. Before then: scratch space */ union { grpc_error* error; @@ -110,16 +91,13 @@ struct grpc_closure { #ifndef NDEBUG inline grpc_closure* grpc_closure_init(const char* file, int line, grpc_closure* closure, - grpc_iomgr_cb_func cb, void* cb_arg, - grpc_closure_scheduler* scheduler) { + grpc_iomgr_cb_func cb, void* cb_arg) { #else inline grpc_closure* grpc_closure_init(grpc_closure* closure, - grpc_iomgr_cb_func cb, void* cb_arg, - grpc_closure_scheduler* scheduler) { + grpc_iomgr_cb_func cb, void* cb_arg) { #endif closure->cb = cb; closure->cb_arg = cb_arg; - closure->scheduler = scheduler; closure->error_data.error = GRPC_ERROR_NONE; #ifndef NDEBUG closure->scheduled = false; @@ -135,10 +113,10 @@ inline grpc_closure* grpc_closure_init(grpc_closure* closure, /** Initializes \a closure with \a cb and \a cb_arg. Returns \a closure. */ #ifndef NDEBUG #define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \ - grpc_closure_init(__FILE__, __LINE__, closure, cb, cb_arg, scheduler) + grpc_closure_init(__FILE__, __LINE__, closure, cb, cb_arg) #else #define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \ - grpc_closure_init(closure, cb, cb_arg, scheduler) + grpc_closure_init(closure, cb, cb_arg) #endif namespace closure_impl { @@ -161,21 +139,19 @@ inline void closure_wrapper(void* arg, grpc_error* error) { #ifndef NDEBUG inline grpc_closure* grpc_closure_create(const char* file, int line, - grpc_iomgr_cb_func cb, void* cb_arg, - grpc_closure_scheduler* scheduler) { + grpc_iomgr_cb_func cb, void* cb_arg) { #else -inline grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg, - grpc_closure_scheduler* scheduler) { +inline grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg) { #endif closure_impl::wrapped_closure* wc = static_cast(gpr_malloc(sizeof(*wc))); wc->cb = cb; wc->cb_arg = cb_arg; #ifndef NDEBUG - grpc_closure_init(file, line, &wc->wrapper, closure_impl::closure_wrapper, wc, - scheduler); + grpc_closure_init(file, line, &wc->wrapper, closure_impl::closure_wrapper, + wc); #else - grpc_closure_init(&wc->wrapper, closure_impl::closure_wrapper, wc, scheduler); + grpc_closure_init(&wc->wrapper, closure_impl::closure_wrapper, wc); #endif return &wc->wrapper; } @@ -183,10 +159,10 @@ inline grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg, /* Create a heap allocated closure: try to avoid except for very rare events */ #ifndef NDEBUG #define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \ - grpc_closure_create(__FILE__, __LINE__, cb, cb_arg, scheduler) + grpc_closure_create(__FILE__, __LINE__, cb, cb_arg) #else #define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \ - grpc_closure_create(cb, cb_arg, scheduler) + grpc_closure_create(cb, cb_arg) #endif #define GRPC_CLOSURE_LIST_INIT \ @@ -253,6 +229,7 @@ class Closure { public: static void Run(const DebugLocation& location, grpc_closure* closure, grpc_error* error) { + (void)location; if (closure == nullptr) { GRPC_ERROR_UNREF(error); return; @@ -276,44 +253,4 @@ class Closure { }; } // namespace grpc_core -#ifndef NDEBUG -inline void grpc_closure_list_sched(const char* file, int line, - grpc_closure_list* list) { -#else -inline void grpc_closure_list_sched(grpc_closure_list* list) { -#endif - grpc_closure* c = list->head; - while (c != nullptr) { - grpc_closure* next = c->next_data.next; -#ifndef NDEBUG - if (c->scheduled) { - gpr_log(GPR_ERROR, - "Closure already scheduled. (closure: %p, created: [%s:%d], " - "previously scheduled at: [%s: %d] run?: %s", - c, c->file_created, c->line_created, c->file_initiated, - c->line_initiated, c->run ? "true" : "false"); - abort(); - } - c->scheduled = true; - c->file_initiated = file; - c->line_initiated = line; - c->run = false; - GPR_ASSERT(c->cb != nullptr); -#endif - c->scheduler->vtable->sched(c, c->error_data.error); - c = next; - } - list->head = list->tail = nullptr; -} - -/** Schedule all closures in a list to be run. Does not need to be run from a - * safe point. */ -#ifndef NDEBUG -#define GRPC_CLOSURE_LIST_SCHED(closure_list) \ - grpc_closure_list_sched(__FILE__, __LINE__, closure_list) -#else -#define GRPC_CLOSURE_LIST_SCHED(closure_list) \ - grpc_closure_list_sched(closure_list) -#endif - #endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */ diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 884c7f211f3..4b85766aef6 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -308,9 +308,9 @@ static void combiner_finally_exec(grpc_core::Combiner* lock, grpc_core::ExecCtx::Get()->combiner_data()->active_combiner)); if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner != lock) { GPR_TIMER_MARK("slowpath", 0); - // Reusing scheduler to store the combiner so that it can be accessed in - // enqueue_finally - closure->scheduler = reinterpret_cast(lock); + // Using error_data.scratch to store the combiner so that it can be accessed + // in enqueue_finally. + closure->error_data.scratch = reinterpret_cast(lock); lock->Run(GRPC_CLOSURE_CREATE(enqueue_finally, closure, nullptr), error); return; } @@ -323,22 +323,17 @@ static void combiner_finally_exec(grpc_core::Combiner* lock, static void enqueue_finally(void* closure, grpc_error* error) { grpc_closure* cl = static_cast(closure); - combiner_finally_exec(reinterpret_cast(cl->scheduler), - cl, GRPC_ERROR_REF(error)); + combiner_finally_exec( + reinterpret_cast(cl->error_data.scratch), cl, + GRPC_ERROR_REF(error)); } namespace grpc_core { void Combiner::Run(grpc_closure* closure, grpc_error* error) { - GPR_ASSERT(closure->scheduler == nullptr || - closure->scheduler == - reinterpret_cast(this)); combiner_exec(this, closure, error); } void Combiner::FinallyRun(grpc_closure* closure, grpc_error* error) { - GPR_ASSERT(closure->scheduler == nullptr || - closure->scheduler == - reinterpret_cast(this)); combiner_finally_exec(this, closure, error); } } // namespace grpc_core diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index 53e52c1bbba..32712066bef 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -36,8 +36,6 @@ class Combiner { // TODO(yashkt) : Remove this method void FinallyRun(grpc_closure* closure, grpc_error* error); Combiner* next_combiner_on_this_exec_ctx = nullptr; - grpc_closure_scheduler scheduler; - grpc_closure_scheduler finally_scheduler; MultiProducerSingleConsumerQueue queue; // either: // a pointer to the initiating exec ctx if that is the only exec_ctx that has diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc index 5c40c00d49c..71c7d3cbecb 100644 --- a/src/core/lib/iomgr/exec_ctx.cc +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -118,11 +118,6 @@ grpc_millis grpc_cycle_counter_to_millis_round_up(gpr_cycle_counter cycles) { gpr_cycle_counter_sub(cycles, g_start_cycle)); } -static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = { - exec_ctx_run, exec_ctx_sched, "exec_ctx"}; -static grpc_closure_scheduler exec_ctx_scheduler = {&exec_ctx_scheduler_vtable}; -grpc_closure_scheduler* grpc_schedule_on_exec_ctx = &exec_ctx_scheduler; - namespace grpc_core { GPR_TLS_CLASS_DEF(ExecCtx::exec_ctx_); GPR_TLS_CLASS_DEF(ApplicationCallbackExecCtx::callback_exec_ctx_); @@ -176,6 +171,7 @@ grpc_millis ExecCtx::Now() { void ExecCtx::Run(const DebugLocation& location, grpc_closure* closure, grpc_error* error) { + (void)location; if (closure == nullptr) { GRPC_ERROR_UNREF(error); return; @@ -200,4 +196,30 @@ void ExecCtx::Run(const DebugLocation& location, grpc_closure* closure, exec_ctx_sched(closure, error); } +void ExecCtx::RunList(const DebugLocation& location, grpc_closure_list* list) { + (void)location; + grpc_closure* c = list->head; + while (c != nullptr) { + grpc_closure* next = c->next_data.next; +#ifndef NDEBUG + if (c->scheduled) { + gpr_log(GPR_ERROR, + "Closure already scheduled. (closure: %p, created: [%s:%d], " + "previously scheduled at: [%s: %d] run?: %s", + c, c->file_created, c->line_created, c->file_initiated, + c->line_initiated, c->run ? "true" : "false"); + abort(); + } + c->scheduled = true; + c->file_initiated = location.file(); + c->line_initiated = location.line(); + c->run = false; + GPR_ASSERT(c->cb != nullptr); +#endif + exec_ctx_sched(c, c->error_data.error); + c = next; + } + list->head = list->tail = nullptr; +} + } // namespace grpc_core diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 49251b99ecb..4e746bc8505 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -55,8 +55,6 @@ typedef struct grpc_combiner grpc_combiner; should not be counted by fork handlers */ #define GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 1 -extern grpc_closure_scheduler* grpc_schedule_on_exec_ctx; - gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock); grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec timespec); grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec timespec); @@ -225,6 +223,8 @@ class ExecCtx { static void Run(const DebugLocation& location, grpc_closure* closure, grpc_error* error); + static void RunList(const DebugLocation& location, grpc_closure_list* list); + protected: /** Check if ready to finish. */ virtual bool CheckReadyToFinish() { return false; } diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index 8b0f102f744..0fc02792234 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -333,7 +333,7 @@ static bool rq_alloc(grpc_resource_quota* resource_quota) { int64_t aborted_allocations = resource_user->outstanding_allocations; resource_user->outstanding_allocations = 0; resource_user->free_pool += aborted_allocations; - GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated); + grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &resource_user->on_allocated); gpr_mu_unlock(&resource_user->mu); ru_unref_by(resource_user, static_cast(aborted_allocations)); continue; @@ -359,7 +359,7 @@ static bool rq_alloc(grpc_resource_quota* resource_quota) { if (resource_user->free_pool >= 0) { resource_user->allocating = false; resource_user->outstanding_allocations = 0; - GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated); + grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &resource_user->on_allocated); gpr_mu_unlock(&resource_user->mu); } else { rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION); diff --git a/src/core/lib/iomgr/tcp_server_custom.cc b/src/core/lib/iomgr/tcp_server_custom.cc index d5fd1919ca2..2df94adb5fc 100644 --- a/src/core/lib/iomgr/tcp_server_custom.cc +++ b/src/core/lib/iomgr/tcp_server_custom.cc @@ -196,7 +196,7 @@ static void tcp_server_unref(grpc_tcp_server* s) { if (gpr_unref(&s->refs)) { /* Complete shutdown_starting work before destroying. */ grpc_core::ExecCtx exec_ctx; - GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting); + grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting); grpc_core::ExecCtx::Get()->Flush(); tcp_server_destroy(s); } diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index a9bf800e362..25f2e7746bb 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -547,7 +547,7 @@ static void tcp_server_unref(grpc_tcp_server* s) { if (gpr_unref(&s->refs)) { grpc_tcp_server_shutdown_listeners(s); gpr_mu_lock(&s->mu); - GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting); + grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting); gpr_mu_unlock(&s->mu); tcp_server_destroy(s); } diff --git a/src/core/lib/iomgr/tcp_server_windows.cc b/src/core/lib/iomgr/tcp_server_windows.cc index fc09481098d..0052175ae46 100644 --- a/src/core/lib/iomgr/tcp_server_windows.cc +++ b/src/core/lib/iomgr/tcp_server_windows.cc @@ -179,7 +179,7 @@ static void tcp_server_unref(grpc_tcp_server* s) { if (gpr_unref(&s->refs)) { grpc_tcp_server_shutdown_listeners(s); gpr_mu_lock(&s->mu); - GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting); + grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting); gpr_mu_unlock(&s->mu); tcp_server_destroy(s); } diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc index dc3016ad8ec..0f00eea8a3f 100644 --- a/src/core/lib/iomgr/timer_generic.cc +++ b/src/core/lib/iomgr/timer_generic.cc @@ -548,9 +548,8 @@ static grpc_timer* pop_one(timer_shard* shard, grpc_millis now) { } if (timer->deadline > now) return nullptr; if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_trace)) { - gpr_log(GPR_INFO, "TIMER %p: FIRE %" PRId64 "ms late via %s scheduler", - timer, now - timer->deadline, - timer->closure->scheduler->vtable->name); + gpr_log(GPR_INFO, "TIMER %p: FIRE %" PRId64 "ms late", timer, + now - timer->deadline); } timer->pending = false; grpc_timer_heap_pop(&shard->heap); diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index dfc6acbb68b..9d88eed66de 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -160,23 +160,19 @@ class Closure : public grpc_closure { }; template -std::unique_ptr MakeClosure( - F f, grpc_closure_scheduler* sched = grpc_schedule_on_exec_ctx) { +std::unique_ptr MakeClosure(F f) { struct C : public Closure { - C(const F& f, grpc_closure_scheduler* sched) : f_(f) { - GRPC_CLOSURE_INIT(this, Execute, this, sched); - } + C(const F& f) : f_(f) { GRPC_CLOSURE_INIT(this, Execute, this, nullptr); } F f_; static void Execute(void* arg, grpc_error* error) { static_cast(arg)->f_(error); } }; - return std::unique_ptr(new C(f, sched)); + return std::unique_ptr(new C(f)); } template -grpc_closure* MakeOnceClosure( - F f, grpc_closure_scheduler* sched = grpc_schedule_on_exec_ctx) { +grpc_closure* MakeOnceClosure(F f) { struct C : public grpc_closure { C(const F& f) : f_(f) {} F f_; @@ -186,7 +182,7 @@ grpc_closure* MakeOnceClosure( } }; auto* c = new C{f}; - return GRPC_CLOSURE_INIT(c, C::Execute, c, sched); + return GRPC_CLOSURE_INIT(c, C::Execute, c, nullptr); } class Stream { diff --git a/test/cpp/microbenchmarks/bm_closure.cc b/test/cpp/microbenchmarks/bm_closure.cc index 7736012128a..979e8f6ca04 100644 --- a/test/cpp/microbenchmarks/bm_closure.cc +++ b/test/cpp/microbenchmarks/bm_closure.cc @@ -350,19 +350,17 @@ BENCHMARK(BM_ClosureSched4OnTwoCombiners); // the benchmark is complete class Rescheduler { public: - Rescheduler(benchmark::State& state, grpc_closure_scheduler* scheduler) - : state_(state) { - GRPC_CLOSURE_INIT(&closure_, Step, this, scheduler); + Rescheduler(benchmark::State& state) : state_(state) { + GRPC_CLOSURE_INIT(&closure_, Step, this, nullptr); } void ScheduleFirst() { grpc_core::ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); } - void ScheduleFirstAgainstDifferentScheduler( - grpc_closure_scheduler* scheduler) { + void ScheduleFirstAgainstDifferentScheduler() { grpc_core::ExecCtx::Run(DEBUG_LOCATION, - GRPC_CLOSURE_CREATE(Step, this, scheduler), + GRPC_CLOSURE_CREATE(Step, this, nullptr), GRPC_ERROR_NONE); } @@ -381,7 +379,7 @@ class Rescheduler { static void BM_ClosureReschedOnExecCtx(benchmark::State& state) { TrackCounters track_counters; grpc_core::ExecCtx exec_ctx; - Rescheduler r(state, grpc_schedule_on_exec_ctx); + Rescheduler r(state); r.ScheduleFirst(); grpc_core::ExecCtx::Get()->Flush(); track_counters.Finish(state); diff --git a/test/cpp/microbenchmarks/bm_pollset.cc b/test/cpp/microbenchmarks/bm_pollset.cc index 477ae190bda..968e1b331b6 100644 --- a/test/cpp/microbenchmarks/bm_pollset.cc +++ b/test/cpp/microbenchmarks/bm_pollset.cc @@ -163,18 +163,16 @@ class Closure : public grpc_closure { }; template -Closure* MakeClosure(F f, grpc_closure_scheduler* scheduler) { +Closure* MakeClosure(F f) { struct C : public Closure { - C(F f, grpc_closure_scheduler* scheduler) : f_(f) { - GRPC_CLOSURE_INIT(this, C::cbfn, this, scheduler); - } + C(F f) : f_(f) { GRPC_CLOSURE_INIT(this, C::cbfn, this, nullptr); } static void cbfn(void* arg, grpc_error* /*error*/) { C* p = static_cast(arg); p->f_(); } F f_; }; - return new C(f, scheduler); + return new C(f); } #ifdef GRPC_LINUX_MULTIPOLL_WITH_EPOLL @@ -223,17 +221,15 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) { grpc_fd* wakeup = grpc_fd_create(wakeup_fd.read_fd, "wakeup_read", false); grpc_pollset_add_fd(ps, wakeup); bool done = false; - Closure* continue_closure = MakeClosure( - [&]() { - GRPC_ERROR_UNREF(grpc_wakeup_fd_consume_wakeup(&wakeup_fd)); - if (!state.KeepRunning()) { - done = true; - return; - } - GRPC_ERROR_UNREF(grpc_wakeup_fd_wakeup(&wakeup_fd)); - grpc_fd_notify_on_read(wakeup, continue_closure); - }, - grpc_schedule_on_exec_ctx); + Closure* continue_closure = MakeClosure([&]() { + GRPC_ERROR_UNREF(grpc_wakeup_fd_consume_wakeup(&wakeup_fd)); + if (!state.KeepRunning()) { + done = true; + return; + } + GRPC_ERROR_UNREF(grpc_wakeup_fd_wakeup(&wakeup_fd)); + grpc_fd_notify_on_read(wakeup, continue_closure); + }); GRPC_ERROR_UNREF(grpc_wakeup_fd_wakeup(&wakeup_fd)); grpc_fd_notify_on_read(wakeup, continue_closure); gpr_mu_lock(mu);