Split ExecCtx into its own target (#27446)

* make error independent

* Separate grpc_error into its own library

This is forward work to move Closure, ExecCtx into their own libraries
in order to make use of them in the activity code for resource quota
wakeups.

* Automated change: Fix sanity tests

* fixes

* split out closure lib

* exec_ctx

* Make things compile

* Automated change: Fix sanity tests

* fix merge

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
reviewable/pr27347/r11
Craig Tiller 3 years ago committed by GitHub
parent 693f36f91c
commit 7ecd9d6b40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 50
      BUILD
  2. 20
      src/core/lib/iomgr/combiner.cc
  3. 10
      src/core/lib/iomgr/exec_ctx.cc
  4. 20
      src/core/lib/iomgr/executor.cc
  5. 3
      src/core/lib/iomgr/iomgr_internal.cc
  6. 3
      src/core/lib/iomgr/iomgr_internal.h

50
BUILD

@ -1326,6 +1326,40 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "closure",
hdrs = [
"src/core/lib/iomgr/closure.h",
],
deps = [
"error",
"gpr",
],
)
grpc_cc_library(
name = "exec_ctx",
srcs = [
"src/core/lib/iomgr/combiner.cc",
"src/core/lib/iomgr/exec_ctx.cc",
"src/core/lib/iomgr/executor.cc",
"src/core/lib/iomgr/iomgr_internal.cc",
],
hdrs = [
"src/core/lib/iomgr/combiner.h",
"src/core/lib/iomgr/exec_ctx.h",
"src/core/lib/iomgr/executor.h",
"src/core/lib/iomgr/iomgr_internal.h",
],
deps = [
"closure",
"error",
"gpr_base",
"gpr_tls",
"useful",
],
)
grpc_cc_library(
name = "grpc_base_c",
srcs = [
@ -1360,7 +1394,6 @@ grpc_cc_library(
"src/core/lib/iomgr/buffer_list.cc",
"src/core/lib/iomgr/call_combiner.cc",
"src/core/lib/iomgr/cfstream_handle.cc",
"src/core/lib/iomgr/combiner.cc",
"src/core/lib/iomgr/dualstack_socket_posix.cc",
"src/core/lib/iomgr/endpoint.cc",
"src/core/lib/iomgr/endpoint_cfstream.cc",
@ -1382,8 +1415,6 @@ grpc_cc_library(
"src/core/lib/iomgr/event_engine/resolver.cc",
"src/core/lib/iomgr/event_engine/tcp.cc",
"src/core/lib/iomgr/event_engine/timer.cc",
"src/core/lib/iomgr/exec_ctx.cc",
"src/core/lib/iomgr/executor.cc",
"src/core/lib/iomgr/fork_posix.cc",
"src/core/lib/iomgr/fork_windows.cc",
"src/core/lib/iomgr/gethostname_fallback.cc",
@ -1395,7 +1426,6 @@ grpc_cc_library(
"src/core/lib/iomgr/iocp_windows.cc",
"src/core/lib/iomgr/iomgr.cc",
"src/core/lib/iomgr/iomgr_custom.cc",
"src/core/lib/iomgr/iomgr_internal.cc",
"src/core/lib/iomgr/iomgr_posix.cc",
"src/core/lib/iomgr/iomgr_posix_cfstream.cc",
"src/core/lib/iomgr/iomgr_windows.cc",
@ -1527,8 +1557,6 @@ grpc_cc_library(
"src/core/lib/iomgr/buffer_list.h",
"src/core/lib/iomgr/call_combiner.h",
"src/core/lib/iomgr/cfstream_handle.h",
"src/core/lib/iomgr/closure.h",
"src/core/lib/iomgr/combiner.h",
"src/core/lib/iomgr/dynamic_annotations.h",
"src/core/lib/iomgr/endpoint.h",
"src/core/lib/iomgr/endpoint_cfstream.h",
@ -1545,15 +1573,12 @@ grpc_cc_library(
"src/core/lib/iomgr/event_engine/pollset.h",
"src/core/lib/iomgr/event_engine/promise.h",
"src/core/lib/iomgr/event_engine/resolved_address_internal.h",
"src/core/lib/iomgr/exec_ctx.h",
"src/core/lib/iomgr/executor.h",
"src/core/lib/iomgr/gethostname.h",
"src/core/lib/iomgr/grpc_if_nametoindex.h",
"src/core/lib/iomgr/internal_errqueue.h",
"src/core/lib/iomgr/iocp_windows.h",
"src/core/lib/iomgr/iomgr.h",
"src/core/lib/iomgr/iomgr_custom.h",
"src/core/lib/iomgr/iomgr_internal.h",
"src/core/lib/iomgr/is_epollexclusive_available.h",
"src/core/lib/iomgr/load_file.h",
"src/core/lib/iomgr/lockfree_event.h",
@ -1635,10 +1660,15 @@ grpc_cc_library(
# re-export these headers from here for now, and when LSC's have completed
# to clean this up, we'll remove these.
[
"src/core/lib/iomgr/closure.h",
"src/core/lib/iomgr/error.h",
"src/core/lib/iomgr/error_internal.h",
"src/core/lib/slice/slice_internal.h",
"src/core/lib/slice/slice_string_helpers.h",
"src/core/lib/iomgr/exec_ctx.h",
"src/core/lib/iomgr/executor.h",
"src/core/lib/iomgr/combiner.h",
"src/core/lib/iomgr/iomgr_internal.h",
],
external_deps = [
"absl/container:flat_hash_map",
@ -1657,8 +1687,10 @@ grpc_cc_library(
visibility = ["@grpc:alt_grpc_base_legacy"],
deps = [
"bitset",
"closure",
"dual_ref_counted",
"error",
"exec_ctx",
"gpr_base",
"gpr_codegen",
"gpr_tls",

@ -27,11 +27,9 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gprpp/mpscq.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
grpc_core::DebugOnlyTraceFlag grpc_combiner_trace(false, "combiner");
@ -128,15 +126,11 @@ static void push_first_on_exec_ctx(grpc_core::Combiner* lock) {
static void combiner_exec(grpc_core::Combiner* lock, grpc_closure* cl,
grpc_error_handle error) {
GPR_TIMER_SCOPE("combiner.execute", 0);
GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_ITEMS();
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
GRPC_COMBINER_TRACE(gpr_log(GPR_INFO,
"C:%p grpc_combiner_execute c=%p last=%" PRIdPTR,
lock, cl, last));
if (last == 1) {
GRPC_STATS_INC_COMBINER_LOCKS_INITIATED();
GPR_TIMER_MARK("combiner.initiated", 0);
gpr_atm_no_barrier_store(
&lock->initiating_exec_ctx_or_null,
reinterpret_cast<gpr_atm>(grpc_core::ExecCtx::Get()));
@ -175,14 +169,12 @@ static void offload(void* arg, grpc_error_handle /*error*/) {
}
static void queue_offload(grpc_core::Combiner* lock) {
GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED();
move_next();
GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p queue_offload", lock));
grpc_core::Executor::Run(&lock->offload, GRPC_ERROR_NONE);
}
bool grpc_combiner_continue_exec_ctx() {
GPR_TIMER_SCOPE("combiner.continue_exec_ctx", 0);
grpc_core::Combiner* lock =
grpc_core::ExecCtx::Get()->combiner_data()->active_combiner;
if (lock == nullptr) {
@ -207,9 +199,8 @@ bool grpc_combiner_continue_exec_ctx() {
// 3. the current thread is not a worker for any background poller
// 4. the DEFAULT executor is threaded
if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() &&
!grpc_iomgr_is_any_background_poller_thread() &&
!grpc_iomgr_platform_is_any_background_poller_thread() &&
grpc_core::Executor::IsThreadedDefault()) {
GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
// this execution context wants to move on: schedule remaining work to be
// picked up on the executor
queue_offload(lock);
@ -226,11 +217,9 @@ bool grpc_combiner_continue_exec_ctx() {
if (n == nullptr) {
// queue is in an inconsistent state: use this as a cue that we should
// go off and do something else for a while (and come back later)
GPR_TIMER_MARK("delay_busy", 0);
queue_offload(lock);
return true;
}
GPR_TIMER_SCOPE("combiner.exec1", 0);
grpc_closure* cl = reinterpret_cast<grpc_closure*>(n);
grpc_error_handle cl_err = cl->error_data.error;
#ifndef NDEBUG
@ -244,7 +233,6 @@ bool grpc_combiner_continue_exec_ctx() {
grpc_closure_list_init(&lock->final_list);
int loops = 0;
while (c != nullptr) {
GPR_TIMER_SCOPE("combiner.exec_1final", 0);
GRPC_COMBINER_TRACE(
gpr_log(GPR_INFO, "C:%p execute_final[%d] c=%p", lock, loops, c));
grpc_closure* next = c->next_data.next;
@ -258,7 +246,6 @@ bool grpc_combiner_continue_exec_ctx() {
}
}
GPR_TIMER_MARK("unref", 0);
move_next();
lock->time_to_execute_final_list = false;
gpr_atm old_state =
@ -305,13 +292,10 @@ static void combiner_finally_exec(grpc_core::Combiner* lock,
grpc_closure* closure,
grpc_error_handle error) {
GPR_ASSERT(lock != nullptr);
GPR_TIMER_SCOPE("combiner.execute_finally", 0);
GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS();
GRPC_COMBINER_TRACE(gpr_log(
GPR_INFO, "C:%p grpc_combiner_execute_finally c=%p; ac=%p", lock, closure,
grpc_core::ExecCtx::Get()->combiner_data()->active_combiner));
if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0);
// Using error_data.scratch to store the combiner so that it can be accessed
// in enqueue_finally.
closure->error_data.scratch = reinterpret_cast<uintptr_t>(lock);

@ -20,15 +20,11 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/event_engine/closure.h"
#include "src/core/lib/iomgr/event_engine/iomgr.h"
#include "src/core/lib/profiling/timers.h"
static void exec_ctx_run(grpc_closure* closure, grpc_error_handle error) {
@ -51,14 +47,8 @@ static void exec_ctx_run(grpc_closure* closure, grpc_error_handle error) {
}
static void exec_ctx_sched(grpc_closure* closure, grpc_error_handle error) {
#if defined(GRPC_USE_EVENT_ENGINE) && \
defined(GRPC_EVENT_ENGINE_REPLACE_EXEC_CTX)
grpc_iomgr_event_engine()->Run(
grpc_event_engine::experimental::GrpcClosureToCallback(closure, error));
#else
grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure,
error);
#endif
}
static gpr_timespec g_start_time;

@ -27,12 +27,11 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#define MAX_DEPTH 2
@ -204,7 +203,7 @@ void Executor::SetThreading(bool threading) {
// an application.
// TODO(guantaol): create another method to finish all the pending closures
// registered in the background poller by grpc_core::Executor.
grpc_iomgr_shutdown_background_closure();
grpc_iomgr_platform_shutdown_background_closure();
}
EXECUTOR_TRACE("(%s) SetThreading(%d) done", name_, threading);
@ -237,7 +236,6 @@ void Executor::ThreadMain(void* arg) {
break;
}
GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED();
grpc_closure_list closures = ts->elems;
ts->elems = GRPC_CLOSURE_LIST_INIT;
gpr_mu_unlock(&ts->mu);
@ -254,11 +252,6 @@ void Executor::ThreadMain(void* arg) {
void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error,
bool is_short) {
bool retry_push;
if (is_short) {
GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS();
} else {
GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS();
}
do {
retry_push = false;
@ -279,7 +272,7 @@ void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error,
return;
}
if (grpc_iomgr_add_closure_to_background_poller(closure, error)) {
if (grpc_iomgr_platform_add_closure_to_background_poller(closure, error)) {
return;
}
@ -287,8 +280,6 @@ void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error,
if (ts == nullptr) {
ts = &thd_state_[grpc_core::HashPointer(grpc_core::ExecCtx::Get(),
cur_thread_count)];
} else {
GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF();
}
ThreadState* orig_ts = ts;
@ -341,7 +332,6 @@ void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error,
// - Note that gpr_cv_signal() won't immediately wakeup the thread. That
// happens after we release the mutex &ts->mu a few lines below
if (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED();
gpr_cv_signal(&ts->cv);
}
@ -372,10 +362,6 @@ void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error,
}
gpr_spinlock_unlock(&adding_thread_lock_);
}
if (retry_push) {
GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES();
}
} while (retry_push);
}

@ -22,9 +22,6 @@
#include <stddef.h>
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/timer_manager.h"
static grpc_iomgr_platform_vtable* iomgr_platform_vtable = nullptr;
void grpc_set_iomgr_platform_vtable(grpc_iomgr_platform_vtable* vtable) {

@ -23,7 +23,8 @@
#include <stdbool.h>
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
typedef struct grpc_iomgr_object {
char* name;

Loading…
Cancel
Save