Remove GRPC_CLOSURE_LIST_SCHED and remove scheduler field from grpc_closure

reviewable/pr21119/r1
Yash Tibrewal 5 years ago
parent 970f703337
commit 61020b55f9
  1. 9
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 3
      src/core/ext/transport/chttp2/transport/writing.cc
  3. 87
      src/core/lib/iomgr/closure.h
  4. 17
      src/core/lib/iomgr/combiner.cc
  5. 2
      src/core/lib/iomgr/combiner.h
  6. 32
      src/core/lib/iomgr/exec_ctx.cc
  7. 4
      src/core/lib/iomgr/exec_ctx.h
  8. 4
      src/core/lib/iomgr/resource_quota.cc
  9. 2
      src/core/lib/iomgr/tcp_server_custom.cc
  10. 2
      src/core/lib/iomgr/tcp_server_posix.cc
  11. 2
      src/core/lib/iomgr/tcp_server_windows.cc
  12. 5
      src/core/lib/iomgr/timer_generic.cc
  13. 14
      test/cpp/microbenchmarks/bm_chttp2_transport.cc
  14. 12
      test/cpp/microbenchmarks/bm_closure.cc
  15. 28
      test/cpp/microbenchmarks/bm_pollset.cc

@ -806,7 +806,7 @@ static void set_write_state(grpc_chttp2_transport* t,
* to be closed after all writes finish (for example, if we received a go-away
* from peer while we had some pending writes) */
if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
if (t->close_transport_on_writes_finished != nullptr) {
grpc_error* err = t->close_transport_on_writes_finished;
t->close_transport_on_writes_finished = nullptr;
@ -1033,7 +1033,7 @@ static void write_action_end_locked(void* tp, grpc_error* error) {
// write finishes, or the callbacks will be invoked when the stream is
// closed.
if (!closed) {
GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
}
t->combiner->FinallyRun(
GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
@ -1673,7 +1673,7 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
GPR_ASSERT(error != GRPC_ERROR_NONE);
for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]);
grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &pq->lists[j]);
}
GRPC_ERROR_UNREF(error);
}
@ -1759,7 +1759,8 @@ void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
gpr_free(from);
return;
}
GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
grpc_core::ExecCtx::RunList(DEBUG_LOCATION,
&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
}

@ -107,7 +107,8 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
pq->inflight_id = t->ping_ctr;
t->ping_ctr++;
GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
grpc_core::ExecCtx::RunList(DEBUG_LOCATION,
&pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
grpc_slice_buffer_add(&t->outbuf,

@ -52,21 +52,6 @@ typedef struct grpc_closure_list {
* the closure scheduler will do that after the cb returns */
typedef void (*grpc_iomgr_cb_func)(void* arg, grpc_error* error);
typedef struct grpc_closure_scheduler grpc_closure_scheduler;
typedef struct grpc_closure_scheduler_vtable {
/* NOTE: for all these functions, closure->scheduler == the scheduler that was
used to find this vtable */
void (*run)(grpc_closure* closure, grpc_error* error);
void (*sched)(grpc_closure* closure, grpc_error* error);
const char* name;
} grpc_closure_scheduler_vtable;
/** Abstract type that can schedule closures for execution */
struct grpc_closure_scheduler {
const grpc_closure_scheduler_vtable* vtable;
};
/** A closure over a grpc_iomgr_cb_func. */
struct grpc_closure {
/** Once queued, next indicates the next queued closure; before then, scratch
@ -85,10 +70,6 @@ struct grpc_closure {
/** Arguments to be passed to "cb". */
void* cb_arg;
/** Scheduler to schedule against: nullptr to schedule against current
execution context */
grpc_closure_scheduler* scheduler;
/** Once queued, the result of the closure. Before then: scratch space */
union {
grpc_error* error;
@ -110,16 +91,13 @@ struct grpc_closure {
#ifndef NDEBUG
inline grpc_closure* grpc_closure_init(const char* file, int line,
grpc_closure* closure,
grpc_iomgr_cb_func cb, void* cb_arg,
grpc_closure_scheduler* scheduler) {
grpc_iomgr_cb_func cb, void* cb_arg) {
#else
inline grpc_closure* grpc_closure_init(grpc_closure* closure,
grpc_iomgr_cb_func cb, void* cb_arg,
grpc_closure_scheduler* scheduler) {
grpc_iomgr_cb_func cb, void* cb_arg) {
#endif
closure->cb = cb;
closure->cb_arg = cb_arg;
closure->scheduler = scheduler;
closure->error_data.error = GRPC_ERROR_NONE;
#ifndef NDEBUG
closure->scheduled = false;
@ -135,10 +113,10 @@ inline grpc_closure* grpc_closure_init(grpc_closure* closure,
/** Initializes \a closure with \a cb and \a cb_arg. Returns \a closure. */
#ifndef NDEBUG
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \
grpc_closure_init(__FILE__, __LINE__, closure, cb, cb_arg, scheduler)
grpc_closure_init(__FILE__, __LINE__, closure, cb, cb_arg)
#else
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \
grpc_closure_init(closure, cb, cb_arg, scheduler)
grpc_closure_init(closure, cb, cb_arg)
#endif
namespace closure_impl {
@ -161,21 +139,19 @@ inline void closure_wrapper(void* arg, grpc_error* error) {
#ifndef NDEBUG
inline grpc_closure* grpc_closure_create(const char* file, int line,
grpc_iomgr_cb_func cb, void* cb_arg,
grpc_closure_scheduler* scheduler) {
grpc_iomgr_cb_func cb, void* cb_arg) {
#else
inline grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg,
grpc_closure_scheduler* scheduler) {
inline grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg) {
#endif
closure_impl::wrapped_closure* wc =
static_cast<closure_impl::wrapped_closure*>(gpr_malloc(sizeof(*wc)));
wc->cb = cb;
wc->cb_arg = cb_arg;
#ifndef NDEBUG
grpc_closure_init(file, line, &wc->wrapper, closure_impl::closure_wrapper, wc,
scheduler);
grpc_closure_init(file, line, &wc->wrapper, closure_impl::closure_wrapper,
wc);
#else
grpc_closure_init(&wc->wrapper, closure_impl::closure_wrapper, wc, scheduler);
grpc_closure_init(&wc->wrapper, closure_impl::closure_wrapper, wc);
#endif
return &wc->wrapper;
}
@ -183,10 +159,10 @@ inline grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg,
/* Create a heap allocated closure: try to avoid except for very rare events */
#ifndef NDEBUG
#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \
grpc_closure_create(__FILE__, __LINE__, cb, cb_arg, scheduler)
grpc_closure_create(__FILE__, __LINE__, cb, cb_arg)
#else
#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \
grpc_closure_create(cb, cb_arg, scheduler)
grpc_closure_create(cb, cb_arg)
#endif
#define GRPC_CLOSURE_LIST_INIT \
@ -253,6 +229,7 @@ class Closure {
public:
static void Run(const DebugLocation& location, grpc_closure* closure,
grpc_error* error) {
(void)location;
if (closure == nullptr) {
GRPC_ERROR_UNREF(error);
return;
@ -276,44 +253,4 @@ class Closure {
};
} // namespace grpc_core
#ifndef NDEBUG
inline void grpc_closure_list_sched(const char* file, int line,
grpc_closure_list* list) {
#else
inline void grpc_closure_list_sched(grpc_closure_list* list) {
#endif
grpc_closure* c = list->head;
while (c != nullptr) {
grpc_closure* next = c->next_data.next;
#ifndef NDEBUG
if (c->scheduled) {
gpr_log(GPR_ERROR,
"Closure already scheduled. (closure: %p, created: [%s:%d], "
"previously scheduled at: [%s: %d] run?: %s",
c, c->file_created, c->line_created, c->file_initiated,
c->line_initiated, c->run ? "true" : "false");
abort();
}
c->scheduled = true;
c->file_initiated = file;
c->line_initiated = line;
c->run = false;
GPR_ASSERT(c->cb != nullptr);
#endif
c->scheduler->vtable->sched(c, c->error_data.error);
c = next;
}
list->head = list->tail = nullptr;
}
/** Schedule all closures in a list to be run. Does not need to be run from a
* safe point. */
#ifndef NDEBUG
#define GRPC_CLOSURE_LIST_SCHED(closure_list) \
grpc_closure_list_sched(__FILE__, __LINE__, closure_list)
#else
#define GRPC_CLOSURE_LIST_SCHED(closure_list) \
grpc_closure_list_sched(closure_list)
#endif
#endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */

@ -308,9 +308,9 @@ static void combiner_finally_exec(grpc_core::Combiner* lock,
grpc_core::ExecCtx::Get()->combiner_data()->active_combiner));
if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0);
// Reusing scheduler to store the combiner so that it can be accessed in
// enqueue_finally
closure->scheduler = reinterpret_cast<grpc_closure_scheduler*>(lock);
// Using error_data.scratch to store the combiner so that it can be accessed
// in enqueue_finally.
closure->error_data.scratch = reinterpret_cast<uintptr_t>(lock);
lock->Run(GRPC_CLOSURE_CREATE(enqueue_finally, closure, nullptr), error);
return;
}
@ -323,22 +323,17 @@ static void combiner_finally_exec(grpc_core::Combiner* lock,
static void enqueue_finally(void* closure, grpc_error* error) {
grpc_closure* cl = static_cast<grpc_closure*>(closure);
combiner_finally_exec(reinterpret_cast<grpc_core::Combiner*>(cl->scheduler),
cl, GRPC_ERROR_REF(error));
combiner_finally_exec(
reinterpret_cast<grpc_core::Combiner*>(cl->error_data.scratch), cl,
GRPC_ERROR_REF(error));
}
namespace grpc_core {
void Combiner::Run(grpc_closure* closure, grpc_error* error) {
GPR_ASSERT(closure->scheduler == nullptr ||
closure->scheduler ==
reinterpret_cast<grpc_closure_scheduler*>(this));
combiner_exec(this, closure, error);
}
void Combiner::FinallyRun(grpc_closure* closure, grpc_error* error) {
GPR_ASSERT(closure->scheduler == nullptr ||
closure->scheduler ==
reinterpret_cast<grpc_closure_scheduler*>(this));
combiner_finally_exec(this, closure, error);
}
} // namespace grpc_core

@ -36,8 +36,6 @@ class Combiner {
// TODO(yashkt) : Remove this method
void FinallyRun(grpc_closure* closure, grpc_error* error);
Combiner* next_combiner_on_this_exec_ctx = nullptr;
grpc_closure_scheduler scheduler;
grpc_closure_scheduler finally_scheduler;
MultiProducerSingleConsumerQueue queue;
// either:
// a pointer to the initiating exec ctx if that is the only exec_ctx that has

@ -118,11 +118,6 @@ grpc_millis grpc_cycle_counter_to_millis_round_up(gpr_cycle_counter cycles) {
gpr_cycle_counter_sub(cycles, g_start_cycle));
}
static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = {
exec_ctx_run, exec_ctx_sched, "exec_ctx"};
static grpc_closure_scheduler exec_ctx_scheduler = {&exec_ctx_scheduler_vtable};
grpc_closure_scheduler* grpc_schedule_on_exec_ctx = &exec_ctx_scheduler;
namespace grpc_core {
GPR_TLS_CLASS_DEF(ExecCtx::exec_ctx_);
GPR_TLS_CLASS_DEF(ApplicationCallbackExecCtx::callback_exec_ctx_);
@ -176,6 +171,7 @@ grpc_millis ExecCtx::Now() {
void ExecCtx::Run(const DebugLocation& location, grpc_closure* closure,
grpc_error* error) {
(void)location;
if (closure == nullptr) {
GRPC_ERROR_UNREF(error);
return;
@ -200,4 +196,30 @@ void ExecCtx::Run(const DebugLocation& location, grpc_closure* closure,
exec_ctx_sched(closure, error);
}
void ExecCtx::RunList(const DebugLocation& location, grpc_closure_list* list) {
(void)location;
grpc_closure* c = list->head;
while (c != nullptr) {
grpc_closure* next = c->next_data.next;
#ifndef NDEBUG
if (c->scheduled) {
gpr_log(GPR_ERROR,
"Closure already scheduled. (closure: %p, created: [%s:%d], "
"previously scheduled at: [%s: %d] run?: %s",
c, c->file_created, c->line_created, c->file_initiated,
c->line_initiated, c->run ? "true" : "false");
abort();
}
c->scheduled = true;
c->file_initiated = location.file();
c->line_initiated = location.line();
c->run = false;
GPR_ASSERT(c->cb != nullptr);
#endif
exec_ctx_sched(c, c->error_data.error);
c = next;
}
list->head = list->tail = nullptr;
}
} // namespace grpc_core

@ -55,8 +55,6 @@ typedef struct grpc_combiner grpc_combiner;
should not be counted by fork handlers */
#define GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 1
extern grpc_closure_scheduler* grpc_schedule_on_exec_ctx;
gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock);
grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec timespec);
grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec timespec);
@ -225,6 +223,8 @@ class ExecCtx {
static void Run(const DebugLocation& location, grpc_closure* closure,
grpc_error* error);
static void RunList(const DebugLocation& location, grpc_closure_list* list);
protected:
/** Check if ready to finish. */
virtual bool CheckReadyToFinish() { return false; }

@ -333,7 +333,7 @@ static bool rq_alloc(grpc_resource_quota* resource_quota) {
int64_t aborted_allocations = resource_user->outstanding_allocations;
resource_user->outstanding_allocations = 0;
resource_user->free_pool += aborted_allocations;
GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated);
grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &resource_user->on_allocated);
gpr_mu_unlock(&resource_user->mu);
ru_unref_by(resource_user, static_cast<gpr_atm>(aborted_allocations));
continue;
@ -359,7 +359,7 @@ static bool rq_alloc(grpc_resource_quota* resource_quota) {
if (resource_user->free_pool >= 0) {
resource_user->allocating = false;
resource_user->outstanding_allocations = 0;
GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated);
grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &resource_user->on_allocated);
gpr_mu_unlock(&resource_user->mu);
} else {
rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);

@ -196,7 +196,7 @@ static void tcp_server_unref(grpc_tcp_server* s) {
if (gpr_unref(&s->refs)) {
/* Complete shutdown_starting work before destroying. */
grpc_core::ExecCtx exec_ctx;
GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting);
grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
grpc_core::ExecCtx::Get()->Flush();
tcp_server_destroy(s);
}

@ -547,7 +547,7 @@ static void tcp_server_unref(grpc_tcp_server* s) {
if (gpr_unref(&s->refs)) {
grpc_tcp_server_shutdown_listeners(s);
gpr_mu_lock(&s->mu);
GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting);
grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
gpr_mu_unlock(&s->mu);
tcp_server_destroy(s);
}

@ -179,7 +179,7 @@ static void tcp_server_unref(grpc_tcp_server* s) {
if (gpr_unref(&s->refs)) {
grpc_tcp_server_shutdown_listeners(s);
gpr_mu_lock(&s->mu);
GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting);
grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
gpr_mu_unlock(&s->mu);
tcp_server_destroy(s);
}

@ -548,9 +548,8 @@ static grpc_timer* pop_one(timer_shard* shard, grpc_millis now) {
}
if (timer->deadline > now) return nullptr;
if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_trace)) {
gpr_log(GPR_INFO, "TIMER %p: FIRE %" PRId64 "ms late via %s scheduler",
timer, now - timer->deadline,
timer->closure->scheduler->vtable->name);
gpr_log(GPR_INFO, "TIMER %p: FIRE %" PRId64 "ms late", timer,
now - timer->deadline);
}
timer->pending = false;
grpc_timer_heap_pop(&shard->heap);

@ -160,23 +160,19 @@ class Closure : public grpc_closure {
};
template <class F>
std::unique_ptr<Closure> MakeClosure(
F f, grpc_closure_scheduler* sched = grpc_schedule_on_exec_ctx) {
std::unique_ptr<Closure> MakeClosure(F f) {
struct C : public Closure {
C(const F& f, grpc_closure_scheduler* sched) : f_(f) {
GRPC_CLOSURE_INIT(this, Execute, this, sched);
}
C(const F& f) : f_(f) { GRPC_CLOSURE_INIT(this, Execute, this, nullptr); }
F f_;
static void Execute(void* arg, grpc_error* error) {
static_cast<C*>(arg)->f_(error);
}
};
return std::unique_ptr<Closure>(new C(f, sched));
return std::unique_ptr<Closure>(new C(f));
}
template <class F>
grpc_closure* MakeOnceClosure(
F f, grpc_closure_scheduler* sched = grpc_schedule_on_exec_ctx) {
grpc_closure* MakeOnceClosure(F f) {
struct C : public grpc_closure {
C(const F& f) : f_(f) {}
F f_;
@ -186,7 +182,7 @@ grpc_closure* MakeOnceClosure(
}
};
auto* c = new C{f};
return GRPC_CLOSURE_INIT(c, C::Execute, c, sched);
return GRPC_CLOSURE_INIT(c, C::Execute, c, nullptr);
}
class Stream {

@ -350,19 +350,17 @@ BENCHMARK(BM_ClosureSched4OnTwoCombiners);
// the benchmark is complete
class Rescheduler {
public:
Rescheduler(benchmark::State& state, grpc_closure_scheduler* scheduler)
: state_(state) {
GRPC_CLOSURE_INIT(&closure_, Step, this, scheduler);
Rescheduler(benchmark::State& state) : state_(state) {
GRPC_CLOSURE_INIT(&closure_, Step, this, nullptr);
}
void ScheduleFirst() {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
void ScheduleFirstAgainstDifferentScheduler(
grpc_closure_scheduler* scheduler) {
void ScheduleFirstAgainstDifferentScheduler() {
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(Step, this, scheduler),
GRPC_CLOSURE_CREATE(Step, this, nullptr),
GRPC_ERROR_NONE);
}
@ -381,7 +379,7 @@ class Rescheduler {
static void BM_ClosureReschedOnExecCtx(benchmark::State& state) {
TrackCounters track_counters;
grpc_core::ExecCtx exec_ctx;
Rescheduler r(state, grpc_schedule_on_exec_ctx);
Rescheduler r(state);
r.ScheduleFirst();
grpc_core::ExecCtx::Get()->Flush();
track_counters.Finish(state);

@ -163,18 +163,16 @@ class Closure : public grpc_closure {
};
template <class F>
Closure* MakeClosure(F f, grpc_closure_scheduler* scheduler) {
Closure* MakeClosure(F f) {
struct C : public Closure {
C(F f, grpc_closure_scheduler* scheduler) : f_(f) {
GRPC_CLOSURE_INIT(this, C::cbfn, this, scheduler);
}
C(F f) : f_(f) { GRPC_CLOSURE_INIT(this, C::cbfn, this, nullptr); }
static void cbfn(void* arg, grpc_error* /*error*/) {
C* p = static_cast<C*>(arg);
p->f_();
}
F f_;
};
return new C(f, scheduler);
return new C(f);
}
#ifdef GRPC_LINUX_MULTIPOLL_WITH_EPOLL
@ -223,17 +221,15 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) {
grpc_fd* wakeup = grpc_fd_create(wakeup_fd.read_fd, "wakeup_read", false);
grpc_pollset_add_fd(ps, wakeup);
bool done = false;
Closure* continue_closure = MakeClosure(
[&]() {
GRPC_ERROR_UNREF(grpc_wakeup_fd_consume_wakeup(&wakeup_fd));
if (!state.KeepRunning()) {
done = true;
return;
}
GRPC_ERROR_UNREF(grpc_wakeup_fd_wakeup(&wakeup_fd));
grpc_fd_notify_on_read(wakeup, continue_closure);
},
grpc_schedule_on_exec_ctx);
Closure* continue_closure = MakeClosure([&]() {
GRPC_ERROR_UNREF(grpc_wakeup_fd_consume_wakeup(&wakeup_fd));
if (!state.KeepRunning()) {
done = true;
return;
}
GRPC_ERROR_UNREF(grpc_wakeup_fd_wakeup(&wakeup_fd));
grpc_fd_notify_on_read(wakeup, continue_closure);
});
GRPC_ERROR_UNREF(grpc_wakeup_fd_wakeup(&wakeup_fd));
grpc_fd_notify_on_read(wakeup, continue_closure);
gpr_mu_lock(mu);

Loading…
Cancel
Save