From 7ecd9d6b4022be7a0474c50502a3ccecae5842e0 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 22 Sep 2021 21:43:30 -0700 Subject: [PATCH] Split ExecCtx into its own target (#27446) * make error independent * Separate grpc_error into its own library This is forward work to move Closure, ExecCtx into their own libraries in order to make use of them in the activity code for resource quota wakeups. * Automated change: Fix sanity tests * fixes * split out closure lib * exec_ctx * Make things compile * Automated change: Fix sanity tests * fix merge Co-authored-by: ctiller --- BUILD | 50 +++++++++++++++++++++++----- src/core/lib/iomgr/combiner.cc | 20 ++--------- src/core/lib/iomgr/exec_ctx.cc | 10 ------ src/core/lib/iomgr/executor.cc | 20 ++--------- src/core/lib/iomgr/iomgr_internal.cc | 3 -- src/core/lib/iomgr/iomgr_internal.h | 3 +- 6 files changed, 48 insertions(+), 58 deletions(-) diff --git a/BUILD b/BUILD index 23a751a8863..e45c8673f15 100644 --- a/BUILD +++ b/BUILD @@ -1326,6 +1326,40 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "closure", + hdrs = [ + "src/core/lib/iomgr/closure.h", + ], + deps = [ + "error", + "gpr", + ], +) + +grpc_cc_library( + name = "exec_ctx", + srcs = [ + "src/core/lib/iomgr/combiner.cc", + "src/core/lib/iomgr/exec_ctx.cc", + "src/core/lib/iomgr/executor.cc", + "src/core/lib/iomgr/iomgr_internal.cc", + ], + hdrs = [ + "src/core/lib/iomgr/combiner.h", + "src/core/lib/iomgr/exec_ctx.h", + "src/core/lib/iomgr/executor.h", + "src/core/lib/iomgr/iomgr_internal.h", + ], + deps = [ + "closure", + "error", + "gpr_base", + "gpr_tls", + "useful", + ], +) + grpc_cc_library( name = "grpc_base_c", srcs = [ @@ -1360,7 +1394,6 @@ grpc_cc_library( "src/core/lib/iomgr/buffer_list.cc", "src/core/lib/iomgr/call_combiner.cc", "src/core/lib/iomgr/cfstream_handle.cc", - "src/core/lib/iomgr/combiner.cc", "src/core/lib/iomgr/dualstack_socket_posix.cc", "src/core/lib/iomgr/endpoint.cc", "src/core/lib/iomgr/endpoint_cfstream.cc", @@ -1382,8 +1415,6 @@ grpc_cc_library( "src/core/lib/iomgr/event_engine/resolver.cc", "src/core/lib/iomgr/event_engine/tcp.cc", "src/core/lib/iomgr/event_engine/timer.cc", - "src/core/lib/iomgr/exec_ctx.cc", - "src/core/lib/iomgr/executor.cc", "src/core/lib/iomgr/fork_posix.cc", "src/core/lib/iomgr/fork_windows.cc", "src/core/lib/iomgr/gethostname_fallback.cc", @@ -1395,7 +1426,6 @@ grpc_cc_library( "src/core/lib/iomgr/iocp_windows.cc", "src/core/lib/iomgr/iomgr.cc", "src/core/lib/iomgr/iomgr_custom.cc", - "src/core/lib/iomgr/iomgr_internal.cc", "src/core/lib/iomgr/iomgr_posix.cc", "src/core/lib/iomgr/iomgr_posix_cfstream.cc", "src/core/lib/iomgr/iomgr_windows.cc", @@ -1527,8 +1557,6 @@ grpc_cc_library( "src/core/lib/iomgr/buffer_list.h", "src/core/lib/iomgr/call_combiner.h", "src/core/lib/iomgr/cfstream_handle.h", - "src/core/lib/iomgr/closure.h", - "src/core/lib/iomgr/combiner.h", "src/core/lib/iomgr/dynamic_annotations.h", "src/core/lib/iomgr/endpoint.h", "src/core/lib/iomgr/endpoint_cfstream.h", @@ -1545,15 +1573,12 @@ grpc_cc_library( "src/core/lib/iomgr/event_engine/pollset.h", "src/core/lib/iomgr/event_engine/promise.h", "src/core/lib/iomgr/event_engine/resolved_address_internal.h", - "src/core/lib/iomgr/exec_ctx.h", - "src/core/lib/iomgr/executor.h", "src/core/lib/iomgr/gethostname.h", "src/core/lib/iomgr/grpc_if_nametoindex.h", "src/core/lib/iomgr/internal_errqueue.h", "src/core/lib/iomgr/iocp_windows.h", "src/core/lib/iomgr/iomgr.h", "src/core/lib/iomgr/iomgr_custom.h", - "src/core/lib/iomgr/iomgr_internal.h", "src/core/lib/iomgr/is_epollexclusive_available.h", "src/core/lib/iomgr/load_file.h", "src/core/lib/iomgr/lockfree_event.h", @@ -1635,10 +1660,15 @@ grpc_cc_library( # re-export these headers from here for now, and when LSC's have completed # to clean this up, we'll remove these. [ + "src/core/lib/iomgr/closure.h", "src/core/lib/iomgr/error.h", "src/core/lib/iomgr/error_internal.h", "src/core/lib/slice/slice_internal.h", "src/core/lib/slice/slice_string_helpers.h", + "src/core/lib/iomgr/exec_ctx.h", + "src/core/lib/iomgr/executor.h", + "src/core/lib/iomgr/combiner.h", + "src/core/lib/iomgr/iomgr_internal.h", ], external_deps = [ "absl/container:flat_hash_map", @@ -1657,8 +1687,10 @@ grpc_cc_library( visibility = ["@grpc:alt_grpc_base_legacy"], deps = [ "bitset", + "closure", "dual_ref_counted", "error", + "exec_ctx", "gpr_base", "gpr_codegen", "gpr_tls", diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 683531325ee..5d833383ed3 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -27,11 +27,9 @@ #include #include -#include "src/core/lib/debug/stats.h" #include "src/core/lib/gprpp/mpscq.h" #include "src/core/lib/iomgr/executor.h" -#include "src/core/lib/iomgr/iomgr.h" -#include "src/core/lib/profiling/timers.h" +#include "src/core/lib/iomgr/iomgr_internal.h" grpc_core::DebugOnlyTraceFlag grpc_combiner_trace(false, "combiner"); @@ -128,15 +126,11 @@ static void push_first_on_exec_ctx(grpc_core::Combiner* lock) { static void combiner_exec(grpc_core::Combiner* lock, grpc_closure* cl, grpc_error_handle error) { - GPR_TIMER_SCOPE("combiner.execute", 0); - GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_ITEMS(); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p grpc_combiner_execute c=%p last=%" PRIdPTR, lock, cl, last)); if (last == 1) { - GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(); - GPR_TIMER_MARK("combiner.initiated", 0); gpr_atm_no_barrier_store( &lock->initiating_exec_ctx_or_null, reinterpret_cast(grpc_core::ExecCtx::Get())); @@ -175,14 +169,12 @@ static void offload(void* arg, grpc_error_handle /*error*/) { } static void queue_offload(grpc_core::Combiner* lock) { - GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(); move_next(); GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p queue_offload", lock)); grpc_core::Executor::Run(&lock->offload, GRPC_ERROR_NONE); } bool grpc_combiner_continue_exec_ctx() { - GPR_TIMER_SCOPE("combiner.continue_exec_ctx", 0); grpc_core::Combiner* lock = grpc_core::ExecCtx::Get()->combiner_data()->active_combiner; if (lock == nullptr) { @@ -207,9 +199,8 @@ bool grpc_combiner_continue_exec_ctx() { // 3. the current thread is not a worker for any background poller // 4. the DEFAULT executor is threaded if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() && - !grpc_iomgr_is_any_background_poller_thread() && + !grpc_iomgr_platform_is_any_background_poller_thread() && grpc_core::Executor::IsThreadedDefault()) { - GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on: schedule remaining work to be // picked up on the executor queue_offload(lock); @@ -226,11 +217,9 @@ bool grpc_combiner_continue_exec_ctx() { if (n == nullptr) { // queue is in an inconsistent state: use this as a cue that we should // go off and do something else for a while (and come back later) - GPR_TIMER_MARK("delay_busy", 0); queue_offload(lock); return true; } - GPR_TIMER_SCOPE("combiner.exec1", 0); grpc_closure* cl = reinterpret_cast(n); grpc_error_handle cl_err = cl->error_data.error; #ifndef NDEBUG @@ -244,7 +233,6 @@ bool grpc_combiner_continue_exec_ctx() { grpc_closure_list_init(&lock->final_list); int loops = 0; while (c != nullptr) { - GPR_TIMER_SCOPE("combiner.exec_1final", 0); GRPC_COMBINER_TRACE( gpr_log(GPR_INFO, "C:%p execute_final[%d] c=%p", lock, loops, c)); grpc_closure* next = c->next_data.next; @@ -258,7 +246,6 @@ bool grpc_combiner_continue_exec_ctx() { } } - GPR_TIMER_MARK("unref", 0); move_next(); lock->time_to_execute_final_list = false; gpr_atm old_state = @@ -305,13 +292,10 @@ static void combiner_finally_exec(grpc_core::Combiner* lock, grpc_closure* closure, grpc_error_handle error) { GPR_ASSERT(lock != nullptr); - GPR_TIMER_SCOPE("combiner.execute_finally", 0); - GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS(); GRPC_COMBINER_TRACE(gpr_log( GPR_INFO, "C:%p grpc_combiner_execute_finally c=%p; ac=%p", lock, closure, grpc_core::ExecCtx::Get()->combiner_data()->active_combiner)); if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner != lock) { - GPR_TIMER_MARK("slowpath", 0); // Using error_data.scratch to store the combiner so that it can be accessed // in enqueue_finally. closure->error_data.scratch = reinterpret_cast(lock); diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc index f6dad6dcac3..ed480d8f742 100644 --- a/src/core/lib/iomgr/exec_ctx.cc +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -20,15 +20,11 @@ #include "src/core/lib/iomgr/exec_ctx.h" -#include #include #include -#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/error.h" -#include "src/core/lib/iomgr/event_engine/closure.h" -#include "src/core/lib/iomgr/event_engine/iomgr.h" #include "src/core/lib/profiling/timers.h" static void exec_ctx_run(grpc_closure* closure, grpc_error_handle error) { @@ -51,14 +47,8 @@ static void exec_ctx_run(grpc_closure* closure, grpc_error_handle error) { } static void exec_ctx_sched(grpc_closure* closure, grpc_error_handle error) { -#if defined(GRPC_USE_EVENT_ENGINE) && \ - defined(GRPC_EVENT_ENGINE_REPLACE_EXEC_CTX) - grpc_iomgr_event_engine()->Run( - grpc_event_engine::experimental::GrpcClosureToCallback(closure, error)); -#else grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure, error); -#endif } static gpr_timespec g_start_time; diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index 44c294d0a4b..4ef0c34a2ce 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -27,12 +27,11 @@ #include #include -#include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/iomgr/iomgr_internal.h" #define MAX_DEPTH 2 @@ -204,7 +203,7 @@ void Executor::SetThreading(bool threading) { // an application. // TODO(guantaol): create another method to finish all the pending closures // registered in the background poller by grpc_core::Executor. - grpc_iomgr_shutdown_background_closure(); + grpc_iomgr_platform_shutdown_background_closure(); } EXECUTOR_TRACE("(%s) SetThreading(%d) done", name_, threading); @@ -237,7 +236,6 @@ void Executor::ThreadMain(void* arg) { break; } - GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(); grpc_closure_list closures = ts->elems; ts->elems = GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); @@ -254,11 +252,6 @@ void Executor::ThreadMain(void* arg) { void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error, bool is_short) { bool retry_push; - if (is_short) { - GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(); - } else { - GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS(); - } do { retry_push = false; @@ -279,7 +272,7 @@ void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error, return; } - if (grpc_iomgr_add_closure_to_background_poller(closure, error)) { + if (grpc_iomgr_platform_add_closure_to_background_poller(closure, error)) { return; } @@ -287,8 +280,6 @@ void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error, if (ts == nullptr) { ts = &thd_state_[grpc_core::HashPointer(grpc_core::ExecCtx::Get(), cur_thread_count)]; - } else { - GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(); } ThreadState* orig_ts = ts; @@ -341,7 +332,6 @@ void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error, // - Note that gpr_cv_signal() won't immediately wakeup the thread. That // happens after we release the mutex &ts->mu a few lines below if (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { - GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(); gpr_cv_signal(&ts->cv); } @@ -372,10 +362,6 @@ void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error, } gpr_spinlock_unlock(&adding_thread_lock_); } - - if (retry_push) { - GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES(); - } } while (retry_push); } diff --git a/src/core/lib/iomgr/iomgr_internal.cc b/src/core/lib/iomgr/iomgr_internal.cc index eb527f7a128..94b77d9b50a 100644 --- a/src/core/lib/iomgr/iomgr_internal.cc +++ b/src/core/lib/iomgr/iomgr_internal.cc @@ -22,9 +22,6 @@ #include -#include "src/core/lib/iomgr/timer.h" -#include "src/core/lib/iomgr/timer_manager.h" - static grpc_iomgr_platform_vtable* iomgr_platform_vtable = nullptr; void grpc_set_iomgr_platform_vtable(grpc_iomgr_platform_vtable* vtable) { diff --git a/src/core/lib/iomgr/iomgr_internal.h b/src/core/lib/iomgr/iomgr_internal.h index 01cd2483ea2..05867914e41 100644 --- a/src/core/lib/iomgr/iomgr_internal.h +++ b/src/core/lib/iomgr/iomgr_internal.h @@ -23,7 +23,8 @@ #include -#include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/error.h" typedef struct grpc_iomgr_object { char* name;