diff --git a/BUILD b/BUILD index 23a751a8863..e45c8673f15 100644 --- a/BUILD +++ b/BUILD @@ -1326,6 +1326,40 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "closure", + hdrs = [ + "src/core/lib/iomgr/closure.h", + ], + deps = [ + "error", + "gpr", + ], +) + +grpc_cc_library( + name = "exec_ctx", + srcs = [ + "src/core/lib/iomgr/combiner.cc", + "src/core/lib/iomgr/exec_ctx.cc", + "src/core/lib/iomgr/executor.cc", + "src/core/lib/iomgr/iomgr_internal.cc", + ], + hdrs = [ + "src/core/lib/iomgr/combiner.h", + "src/core/lib/iomgr/exec_ctx.h", + "src/core/lib/iomgr/executor.h", + "src/core/lib/iomgr/iomgr_internal.h", + ], + deps = [ + "closure", + "error", + "gpr_base", + "gpr_tls", + "useful", + ], +) + grpc_cc_library( name = "grpc_base_c", srcs = [ @@ -1360,7 +1394,6 @@ grpc_cc_library( "src/core/lib/iomgr/buffer_list.cc", "src/core/lib/iomgr/call_combiner.cc", "src/core/lib/iomgr/cfstream_handle.cc", - "src/core/lib/iomgr/combiner.cc", "src/core/lib/iomgr/dualstack_socket_posix.cc", "src/core/lib/iomgr/endpoint.cc", "src/core/lib/iomgr/endpoint_cfstream.cc", @@ -1382,8 +1415,6 @@ grpc_cc_library( "src/core/lib/iomgr/event_engine/resolver.cc", "src/core/lib/iomgr/event_engine/tcp.cc", "src/core/lib/iomgr/event_engine/timer.cc", - "src/core/lib/iomgr/exec_ctx.cc", - "src/core/lib/iomgr/executor.cc", "src/core/lib/iomgr/fork_posix.cc", "src/core/lib/iomgr/fork_windows.cc", "src/core/lib/iomgr/gethostname_fallback.cc", @@ -1395,7 +1426,6 @@ grpc_cc_library( "src/core/lib/iomgr/iocp_windows.cc", "src/core/lib/iomgr/iomgr.cc", "src/core/lib/iomgr/iomgr_custom.cc", - "src/core/lib/iomgr/iomgr_internal.cc", "src/core/lib/iomgr/iomgr_posix.cc", "src/core/lib/iomgr/iomgr_posix_cfstream.cc", "src/core/lib/iomgr/iomgr_windows.cc", @@ -1527,8 +1557,6 @@ grpc_cc_library( "src/core/lib/iomgr/buffer_list.h", "src/core/lib/iomgr/call_combiner.h", "src/core/lib/iomgr/cfstream_handle.h", - "src/core/lib/iomgr/closure.h", - "src/core/lib/iomgr/combiner.h", "src/core/lib/iomgr/dynamic_annotations.h", "src/core/lib/iomgr/endpoint.h", "src/core/lib/iomgr/endpoint_cfstream.h", @@ -1545,15 +1573,12 @@ grpc_cc_library( "src/core/lib/iomgr/event_engine/pollset.h", "src/core/lib/iomgr/event_engine/promise.h", "src/core/lib/iomgr/event_engine/resolved_address_internal.h", - "src/core/lib/iomgr/exec_ctx.h", - "src/core/lib/iomgr/executor.h", "src/core/lib/iomgr/gethostname.h", "src/core/lib/iomgr/grpc_if_nametoindex.h", "src/core/lib/iomgr/internal_errqueue.h", "src/core/lib/iomgr/iocp_windows.h", "src/core/lib/iomgr/iomgr.h", "src/core/lib/iomgr/iomgr_custom.h", - "src/core/lib/iomgr/iomgr_internal.h", "src/core/lib/iomgr/is_epollexclusive_available.h", "src/core/lib/iomgr/load_file.h", "src/core/lib/iomgr/lockfree_event.h", @@ -1635,10 +1660,15 @@ grpc_cc_library( # re-export these headers from here for now, and when LSC's have completed # to clean this up, we'll remove these. [ + "src/core/lib/iomgr/closure.h", "src/core/lib/iomgr/error.h", "src/core/lib/iomgr/error_internal.h", "src/core/lib/slice/slice_internal.h", "src/core/lib/slice/slice_string_helpers.h", + "src/core/lib/iomgr/exec_ctx.h", + "src/core/lib/iomgr/executor.h", + "src/core/lib/iomgr/combiner.h", + "src/core/lib/iomgr/iomgr_internal.h", ], external_deps = [ "absl/container:flat_hash_map", @@ -1657,8 +1687,10 @@ grpc_cc_library( visibility = ["@grpc:alt_grpc_base_legacy"], deps = [ "bitset", + "closure", "dual_ref_counted", "error", + "exec_ctx", "gpr_base", "gpr_codegen", "gpr_tls", diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 683531325ee..5d833383ed3 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -27,11 +27,9 @@ #include #include -#include "src/core/lib/debug/stats.h" #include "src/core/lib/gprpp/mpscq.h" #include "src/core/lib/iomgr/executor.h" -#include "src/core/lib/iomgr/iomgr.h" -#include "src/core/lib/profiling/timers.h" +#include "src/core/lib/iomgr/iomgr_internal.h" grpc_core::DebugOnlyTraceFlag grpc_combiner_trace(false, "combiner"); @@ -128,15 +126,11 @@ static void push_first_on_exec_ctx(grpc_core::Combiner* lock) { static void combiner_exec(grpc_core::Combiner* lock, grpc_closure* cl, grpc_error_handle error) { - GPR_TIMER_SCOPE("combiner.execute", 0); - GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_ITEMS(); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p grpc_combiner_execute c=%p last=%" PRIdPTR, lock, cl, last)); if (last == 1) { - GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(); - GPR_TIMER_MARK("combiner.initiated", 0); gpr_atm_no_barrier_store( &lock->initiating_exec_ctx_or_null, reinterpret_cast(grpc_core::ExecCtx::Get())); @@ -175,14 +169,12 @@ static void offload(void* arg, grpc_error_handle /*error*/) { } static void queue_offload(grpc_core::Combiner* lock) { - GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(); move_next(); GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p queue_offload", lock)); grpc_core::Executor::Run(&lock->offload, GRPC_ERROR_NONE); } bool grpc_combiner_continue_exec_ctx() { - GPR_TIMER_SCOPE("combiner.continue_exec_ctx", 0); grpc_core::Combiner* lock = grpc_core::ExecCtx::Get()->combiner_data()->active_combiner; if (lock == nullptr) { @@ -207,9 +199,8 @@ bool grpc_combiner_continue_exec_ctx() { // 3. the current thread is not a worker for any background poller // 4. the DEFAULT executor is threaded if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() && - !grpc_iomgr_is_any_background_poller_thread() && + !grpc_iomgr_platform_is_any_background_poller_thread() && grpc_core::Executor::IsThreadedDefault()) { - GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on: schedule remaining work to be // picked up on the executor queue_offload(lock); @@ -226,11 +217,9 @@ bool grpc_combiner_continue_exec_ctx() { if (n == nullptr) { // queue is in an inconsistent state: use this as a cue that we should // go off and do something else for a while (and come back later) - GPR_TIMER_MARK("delay_busy", 0); queue_offload(lock); return true; } - GPR_TIMER_SCOPE("combiner.exec1", 0); grpc_closure* cl = reinterpret_cast(n); grpc_error_handle cl_err = cl->error_data.error; #ifndef NDEBUG @@ -244,7 +233,6 @@ bool grpc_combiner_continue_exec_ctx() { grpc_closure_list_init(&lock->final_list); int loops = 0; while (c != nullptr) { - GPR_TIMER_SCOPE("combiner.exec_1final", 0); GRPC_COMBINER_TRACE( gpr_log(GPR_INFO, "C:%p execute_final[%d] c=%p", lock, loops, c)); grpc_closure* next = c->next_data.next; @@ -258,7 +246,6 @@ bool grpc_combiner_continue_exec_ctx() { } } - GPR_TIMER_MARK("unref", 0); move_next(); lock->time_to_execute_final_list = false; gpr_atm old_state = @@ -305,13 +292,10 @@ static void combiner_finally_exec(grpc_core::Combiner* lock, grpc_closure* closure, grpc_error_handle error) { GPR_ASSERT(lock != nullptr); - GPR_TIMER_SCOPE("combiner.execute_finally", 0); - GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS(); GRPC_COMBINER_TRACE(gpr_log( GPR_INFO, "C:%p grpc_combiner_execute_finally c=%p; ac=%p", lock, closure, grpc_core::ExecCtx::Get()->combiner_data()->active_combiner)); if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner != lock) { - GPR_TIMER_MARK("slowpath", 0); // Using error_data.scratch to store the combiner so that it can be accessed // in enqueue_finally. closure->error_data.scratch = reinterpret_cast(lock); diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc index f6dad6dcac3..ed480d8f742 100644 --- a/src/core/lib/iomgr/exec_ctx.cc +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -20,15 +20,11 @@ #include "src/core/lib/iomgr/exec_ctx.h" -#include #include #include -#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/error.h" -#include "src/core/lib/iomgr/event_engine/closure.h" -#include "src/core/lib/iomgr/event_engine/iomgr.h" #include "src/core/lib/profiling/timers.h" static void exec_ctx_run(grpc_closure* closure, grpc_error_handle error) { @@ -51,14 +47,8 @@ static void exec_ctx_run(grpc_closure* closure, grpc_error_handle error) { } static void exec_ctx_sched(grpc_closure* closure, grpc_error_handle error) { -#if defined(GRPC_USE_EVENT_ENGINE) && \ - defined(GRPC_EVENT_ENGINE_REPLACE_EXEC_CTX) - grpc_iomgr_event_engine()->Run( - grpc_event_engine::experimental::GrpcClosureToCallback(closure, error)); -#else grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure, error); -#endif } static gpr_timespec g_start_time; diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index 44c294d0a4b..4ef0c34a2ce 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -27,12 +27,11 @@ #include #include -#include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/iomgr/iomgr_internal.h" #define MAX_DEPTH 2 @@ -204,7 +203,7 @@ void Executor::SetThreading(bool threading) { // an application. // TODO(guantaol): create another method to finish all the pending closures // registered in the background poller by grpc_core::Executor. - grpc_iomgr_shutdown_background_closure(); + grpc_iomgr_platform_shutdown_background_closure(); } EXECUTOR_TRACE("(%s) SetThreading(%d) done", name_, threading); @@ -237,7 +236,6 @@ void Executor::ThreadMain(void* arg) { break; } - GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(); grpc_closure_list closures = ts->elems; ts->elems = GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); @@ -254,11 +252,6 @@ void Executor::ThreadMain(void* arg) { void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error, bool is_short) { bool retry_push; - if (is_short) { - GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(); - } else { - GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS(); - } do { retry_push = false; @@ -279,7 +272,7 @@ void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error, return; } - if (grpc_iomgr_add_closure_to_background_poller(closure, error)) { + if (grpc_iomgr_platform_add_closure_to_background_poller(closure, error)) { return; } @@ -287,8 +280,6 @@ void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error, if (ts == nullptr) { ts = &thd_state_[grpc_core::HashPointer(grpc_core::ExecCtx::Get(), cur_thread_count)]; - } else { - GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(); } ThreadState* orig_ts = ts; @@ -341,7 +332,6 @@ void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error, // - Note that gpr_cv_signal() won't immediately wakeup the thread. That // happens after we release the mutex &ts->mu a few lines below if (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { - GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(); gpr_cv_signal(&ts->cv); } @@ -372,10 +362,6 @@ void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error, } gpr_spinlock_unlock(&adding_thread_lock_); } - - if (retry_push) { - GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES(); - } } while (retry_push); } diff --git a/src/core/lib/iomgr/iomgr_internal.cc b/src/core/lib/iomgr/iomgr_internal.cc index eb527f7a128..94b77d9b50a 100644 --- a/src/core/lib/iomgr/iomgr_internal.cc +++ b/src/core/lib/iomgr/iomgr_internal.cc @@ -22,9 +22,6 @@ #include -#include "src/core/lib/iomgr/timer.h" -#include "src/core/lib/iomgr/timer_manager.h" - static grpc_iomgr_platform_vtable* iomgr_platform_vtable = nullptr; void grpc_set_iomgr_platform_vtable(grpc_iomgr_platform_vtable* vtable) { diff --git a/src/core/lib/iomgr/iomgr_internal.h b/src/core/lib/iomgr/iomgr_internal.h index 01cd2483ea2..05867914e41 100644 --- a/src/core/lib/iomgr/iomgr_internal.h +++ b/src/core/lib/iomgr/iomgr_internal.h @@ -23,7 +23,8 @@ #include -#include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/error.h" typedef struct grpc_iomgr_object { char* name;