From 872d2787a0628f0c88c5726433c8bbaca0e5813c Mon Sep 17 00:00:00 2001 From: Guantao Liu Date: Mon, 25 Mar 2019 16:19:08 -0700 Subject: [PATCH 1/2] Avoid using grpc_core::Executor when the background poller is available. Instead, run closures in the background poller. This will generally avoid the thread hop in the gRPC runtime. --- src/core/lib/iomgr/ev_epoll1_linux.cc | 6 ++++++ src/core/lib/iomgr/ev_epollex_linux.cc | 6 ++++++ src/core/lib/iomgr/ev_poll_posix.cc | 6 ++++++ src/core/lib/iomgr/ev_posix.cc | 5 +++++ src/core/lib/iomgr/ev_posix.h | 8 ++++++++ src/core/lib/iomgr/executor.cc | 13 +++++++++++++ src/core/lib/iomgr/executor.h | 3 ++- src/core/lib/iomgr/iomgr.cc | 5 +++++ src/core/lib/iomgr/iomgr.h | 7 +++++++ src/core/lib/iomgr/iomgr_custom.cc | 7 ++++++- src/core/lib/iomgr/iomgr_internal.cc | 6 ++++++ src/core/lib/iomgr/iomgr_internal.h | 10 +++++++++- src/core/lib/iomgr/iomgr_posix.cc | 8 +++++++- src/core/lib/iomgr/iomgr_posix_cfstream.cc | 8 +++++++- src/core/lib/iomgr/iomgr_windows.cc | 8 +++++++- test/cpp/microbenchmarks/bm_cq_multiple_threads.cc | 2 ++ 16 files changed, 102 insertions(+), 6 deletions(-) diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 9eb4c089d86..b6f804cdfca 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -1246,6 +1246,11 @@ static bool is_any_background_poller_thread(void) { return false; } static void shutdown_background_closure(void) {} +static bool add_closure_to_background_poller(grpc_closure* closure, + grpc_error* error) { + return false; +} + static void shutdown_engine(void) { fd_global_shutdown(); pollset_global_shutdown(); @@ -1292,6 +1297,7 @@ static const grpc_event_engine_vtable vtable = { is_any_background_poller_thread, shutdown_background_closure, shutdown_engine, + add_closure_to_background_poller, }; /* Called by the child process's post-fork handler to close open fds, including diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 27656063ba5..01be46c9f68 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -1578,6 +1578,11 @@ static bool is_any_background_poller_thread(void) { return false; } static void shutdown_background_closure(void) {} +static bool add_closure_to_background_poller(grpc_closure* closure, + grpc_error* error) { + return false; +} + static void shutdown_engine(void) { fd_global_shutdown(); pollset_global_shutdown(); @@ -1619,6 +1624,7 @@ static const grpc_event_engine_vtable vtable = { is_any_background_poller_thread, shutdown_background_closure, shutdown_engine, + add_closure_to_background_poller, }; const grpc_event_engine_vtable* grpc_init_epollex_linux( diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 29111dd44ed..0c95cb75c6d 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -1320,6 +1320,11 @@ static bool is_any_background_poller_thread(void) { return false; } static void shutdown_background_closure(void) {} +static bool add_closure_to_background_poller(grpc_closure* closure, + grpc_error* error) { + return false; +} + static void shutdown_engine(void) { pollset_global_shutdown(); if (track_fds_for_fork) { @@ -1364,6 +1369,7 @@ static const grpc_event_engine_vtable vtable = { is_any_background_poller_thread, shutdown_background_closure, shutdown_engine, + add_closure_to_background_poller, }; /* Called by the child process's post-fork handler to close open fds, including diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index d7aeb81c69e..898686b06c3 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -402,6 +402,11 @@ bool grpc_is_any_background_poller_thread(void) { return g_event_engine->is_any_background_poller_thread(); } +bool grpc_add_closure_to_background_poller(grpc_closure* closure, + grpc_error* error) { + return g_event_engine->add_closure_to_background_poller(closure, error); +} + void grpc_shutdown_background_closure(void) { g_event_engine->shutdown_background_closure(); } diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 94ac9fdba6f..699173fe255 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -83,6 +83,8 @@ typedef struct grpc_event_engine_vtable { bool (*is_any_background_poller_thread)(void); void (*shutdown_background_closure)(void); void (*shutdown_engine)(void); + bool (*add_closure_to_background_poller)(grpc_closure* closure, + grpc_error* error); } grpc_event_engine_vtable; /* register a new event engine factory */ @@ -185,6 +187,12 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd); /* Returns true if the caller is a worker thread for any background poller. */ bool grpc_is_any_background_poller_thread(); +/* Returns true if the closure is registered into the background poller. Note + * that the closure may or may not run yet when this function returns, and the + * closure should not be blocking or long-running. */ +bool grpc_add_closure_to_background_poller(grpc_closure* closure, + grpc_error* error); + /* Shut down all the closures registered in the background poller. */ void grpc_shutdown_background_closure(); diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index 2ad8972fc79..47836acacc0 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -32,6 +32,7 @@ #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/iomgr.h" #define MAX_DEPTH 2 @@ -206,6 +207,14 @@ void Executor::SetThreading(bool threading) { gpr_free(thd_state_); gpr_tls_destroy(&g_this_thread_state); + + // grpc_iomgr_shutdown_background_closure() will close all the registered + // fds in the background poller, and wait for all pending closures to + // finish. Thus, never call Executor::SetThreading(false) in the middle of + // an application. + // TODO(guantaol): create another method to finish all the pending closures + // registered in the background poller by grpc_core::Executor. + grpc_iomgr_shutdown_background_closure(); } EXECUTOR_TRACE("(%s) SetThreading(%d) done", name_, threading); @@ -278,6 +287,10 @@ void Executor::Enqueue(grpc_closure* closure, grpc_error* error, return; } + if (grpc_iomgr_add_closure_to_background_poller(closure, error)) { + return; + } + ThreadState* ts = (ThreadState*)gpr_tls_get(&g_this_thread_state); if (ts == nullptr) { ts = &thd_state_[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(), diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index 9e472279b7b..a9c609bd7d5 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -61,7 +61,8 @@ class Executor { /** Is the executor multi-threaded? */ bool IsThreaded() const; - /* Enable/disable threading - must be called after Init and Shutdown() */ + /* Enable/disable threading - must be called after Init and Shutdown(). Never + * call SetThreading(false) in the middle of an application */ void SetThreading(bool threading); /** Shutdown the executor, running all pending work as part of the call */ diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index 33153d9cc3b..0fbfcfce04f 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -162,6 +162,11 @@ bool grpc_iomgr_is_any_background_poller_thread() { return grpc_iomgr_platform_is_any_background_poller_thread(); } +bool grpc_iomgr_add_closure_to_background_poller(grpc_closure* closure, + grpc_error* error) { + return grpc_iomgr_platform_add_closure_to_background_poller(closure, error); +} + void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name) { obj->name = gpr_strdup(name); gpr_mu_lock(&g_mu); diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h index 74775de8146..e02f15e551c 100644 --- a/src/core/lib/iomgr/iomgr.h +++ b/src/core/lib/iomgr/iomgr.h @@ -21,6 +21,7 @@ #include +#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/port.h" #include @@ -47,6 +48,12 @@ bool grpc_iomgr_run_in_background(); /** Returns true if the caller is a worker thread for any background poller. */ bool grpc_iomgr_is_any_background_poller_thread(); +/** Returns true if the closure is registered into the background poller. Note + * that the closure may or may not run yet when this function returns, and the + * closure should not be blocking or long-running. */ +bool grpc_iomgr_add_closure_to_background_poller(grpc_closure* closure, + grpc_error* error); + /* Exposed only for testing */ size_t grpc_iomgr_count_objects_for_testing(); diff --git a/src/core/lib/iomgr/iomgr_custom.cc b/src/core/lib/iomgr/iomgr_custom.cc index 3d07f1abe9a..381e00e07a7 100644 --- a/src/core/lib/iomgr/iomgr_custom.cc +++ b/src/core/lib/iomgr/iomgr_custom.cc @@ -44,11 +44,16 @@ static void iomgr_platform_shutdown_background_closure(void) {} static bool iomgr_platform_is_any_background_poller_thread(void) { return false; } +static bool iomgr_platform_add_closure_to_background_poller( + grpc_closure* closure, grpc_error* error) { + return false; +} static grpc_iomgr_platform_vtable vtable = { iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, iomgr_platform_shutdown_background_closure, - iomgr_platform_is_any_background_poller_thread}; + iomgr_platform_is_any_background_poller_thread, + iomgr_platform_add_closure_to_background_poller}; void grpc_custom_iomgr_init(grpc_socket_vtable* socket, grpc_custom_resolver_vtable* resolver, diff --git a/src/core/lib/iomgr/iomgr_internal.cc b/src/core/lib/iomgr/iomgr_internal.cc index e68b1cf5812..896d9fce67c 100644 --- a/src/core/lib/iomgr/iomgr_internal.cc +++ b/src/core/lib/iomgr/iomgr_internal.cc @@ -49,3 +49,9 @@ void grpc_iomgr_platform_shutdown_background_closure() { bool grpc_iomgr_platform_is_any_background_poller_thread() { return iomgr_platform_vtable->is_any_background_poller_thread(); } + +bool grpc_iomgr_platform_add_closure_to_background_poller(grpc_closure* closure, + grpc_error* error) { + return iomgr_platform_vtable->add_closure_to_background_poller(closure, + error); +} diff --git a/src/core/lib/iomgr/iomgr_internal.h b/src/core/lib/iomgr/iomgr_internal.h index 2250ad9a18c..17607f98f11 100644 --- a/src/core/lib/iomgr/iomgr_internal.h +++ b/src/core/lib/iomgr/iomgr_internal.h @@ -37,6 +37,8 @@ typedef struct grpc_iomgr_platform_vtable { void (*shutdown)(void); void (*shutdown_background_closure)(void); bool (*is_any_background_poller_thread)(void); + bool (*add_closure_to_background_poller)(grpc_closure* closure, + grpc_error* error); } grpc_iomgr_platform_vtable; void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name); @@ -57,9 +59,15 @@ void grpc_iomgr_platform_shutdown(void); /** shut down all the closures registered in the background poller */ void grpc_iomgr_platform_shutdown_background_closure(void); -/** return true is the caller is a worker thread for any background poller */ +/** return true if the caller is a worker thread for any background poller */ bool grpc_iomgr_platform_is_any_background_poller_thread(void); +/** Return true if the closure is registered into the background poller. Note + * that the closure may or may not run yet when this function returns, and the + * closure should not be blocking or long-running. */ +bool grpc_iomgr_platform_add_closure_to_background_poller(grpc_closure* closure, + grpc_error* error); + bool grpc_iomgr_abort_on_leaks(void); #endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */ diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc index 690e81f3b1d..a4010b8cf9d 100644 --- a/src/core/lib/iomgr/iomgr_posix.cc +++ b/src/core/lib/iomgr/iomgr_posix.cc @@ -59,10 +59,16 @@ static bool iomgr_platform_is_any_background_poller_thread(void) { return grpc_is_any_background_poller_thread(); } +static bool iomgr_platform_add_closure_to_background_poller( + grpc_closure* closure, grpc_error* error) { + return grpc_add_closure_to_background_poller(closure, error); +} + static grpc_iomgr_platform_vtable vtable = { iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, iomgr_platform_shutdown_background_closure, - iomgr_platform_is_any_background_poller_thread}; + iomgr_platform_is_any_background_poller_thread, + iomgr_platform_add_closure_to_background_poller}; void grpc_set_default_iomgr_platform() { grpc_set_tcp_client_impl(&grpc_posix_tcp_client_vtable); diff --git a/src/core/lib/iomgr/iomgr_posix_cfstream.cc b/src/core/lib/iomgr/iomgr_posix_cfstream.cc index 462ac41fcde..61b8bd100eb 100644 --- a/src/core/lib/iomgr/iomgr_posix_cfstream.cc +++ b/src/core/lib/iomgr/iomgr_posix_cfstream.cc @@ -62,10 +62,16 @@ static bool iomgr_platform_is_any_background_poller_thread(void) { return grpc_is_any_background_poller_thread(); } +static bool iomgr_platform_add_closure_to_background_poller( + grpc_closure* closure, grpc_error* error) { + return grpc_add_closure_to_background_poller(closure, error); +} + static grpc_iomgr_platform_vtable vtable = { iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, iomgr_platform_shutdown_background_closure, - iomgr_platform_is_any_background_poller_thread}; + iomgr_platform_is_any_background_poller_thread, + iomgr_platform_add_closure_to_background_poller}; void grpc_set_default_iomgr_platform() { char* enable_cfstream = getenv(grpc_cfstream_env_var); diff --git a/src/core/lib/iomgr/iomgr_windows.cc b/src/core/lib/iomgr/iomgr_windows.cc index e517a6caee4..0e1a9ba5b7a 100644 --- a/src/core/lib/iomgr/iomgr_windows.cc +++ b/src/core/lib/iomgr/iomgr_windows.cc @@ -77,10 +77,16 @@ static bool iomgr_platform_is_any_background_poller_thread(void) { return false; } +static bool iomgr_platform_add_closure_to_background_poller( + grpc_closure* closure, grpc_error* error) { + return false; +} + static grpc_iomgr_platform_vtable vtable = { iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, iomgr_platform_shutdown_background_closure, - iomgr_platform_is_any_background_poller_thread}; + iomgr_platform_is_any_background_poller_thread, + iomgr_platform_add_closure_to_background_poller}; void grpc_set_default_iomgr_platform() { grpc_set_tcp_client_impl(&grpc_windows_tcp_client_vtable); diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc index 7aa197b5979..54455350c24 100644 --- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc +++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc @@ -95,6 +95,8 @@ static const grpc_event_engine_vtable* init_engine_vtable(bool) { g_vtable.pollset_work = pollset_work; g_vtable.pollset_kick = pollset_kick; g_vtable.is_any_background_poller_thread = [] { return false; }; + g_vtable.add_closure_to_background_poller = + [](grpc_closure* closure, grpc_error* error) { return false; }; g_vtable.shutdown_background_closure = [] {}; g_vtable.shutdown_engine = [] {}; From 53065db36686887d6655cd4c83b5a88c3b226a48 Mon Sep 17 00:00:00 2001 From: Guantao Liu Date: Mon, 25 Mar 2019 16:23:19 -0700 Subject: [PATCH 2/2] Clang formatting --- src/core/lib/iomgr/iomgr_custom.cc | 4 +++- src/core/lib/iomgr/iomgr_posix.cc | 4 +++- src/core/lib/iomgr/iomgr_posix_cfstream.cc | 4 +++- src/core/lib/iomgr/iomgr_windows.cc | 4 +++- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/core/lib/iomgr/iomgr_custom.cc b/src/core/lib/iomgr/iomgr_custom.cc index 381e00e07a7..56363c35fd6 100644 --- a/src/core/lib/iomgr/iomgr_custom.cc +++ b/src/core/lib/iomgr/iomgr_custom.cc @@ -50,7 +50,9 @@ static bool iomgr_platform_add_closure_to_background_poller( } static grpc_iomgr_platform_vtable vtable = { - iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, + iomgr_platform_init, + iomgr_platform_flush, + iomgr_platform_shutdown, iomgr_platform_shutdown_background_closure, iomgr_platform_is_any_background_poller_thread, iomgr_platform_add_closure_to_background_poller}; diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc index a4010b8cf9d..de22d20a639 100644 --- a/src/core/lib/iomgr/iomgr_posix.cc +++ b/src/core/lib/iomgr/iomgr_posix.cc @@ -65,7 +65,9 @@ static bool iomgr_platform_add_closure_to_background_poller( } static grpc_iomgr_platform_vtable vtable = { - iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, + iomgr_platform_init, + iomgr_platform_flush, + iomgr_platform_shutdown, iomgr_platform_shutdown_background_closure, iomgr_platform_is_any_background_poller_thread, iomgr_platform_add_closure_to_background_poller}; diff --git a/src/core/lib/iomgr/iomgr_posix_cfstream.cc b/src/core/lib/iomgr/iomgr_posix_cfstream.cc index 61b8bd100eb..cf4d05318ea 100644 --- a/src/core/lib/iomgr/iomgr_posix_cfstream.cc +++ b/src/core/lib/iomgr/iomgr_posix_cfstream.cc @@ -68,7 +68,9 @@ static bool iomgr_platform_add_closure_to_background_poller( } static grpc_iomgr_platform_vtable vtable = { - iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, + iomgr_platform_init, + iomgr_platform_flush, + iomgr_platform_shutdown, iomgr_platform_shutdown_background_closure, iomgr_platform_is_any_background_poller_thread, iomgr_platform_add_closure_to_background_poller}; diff --git a/src/core/lib/iomgr/iomgr_windows.cc b/src/core/lib/iomgr/iomgr_windows.cc index 0e1a9ba5b7a..13b5f87bd18 100644 --- a/src/core/lib/iomgr/iomgr_windows.cc +++ b/src/core/lib/iomgr/iomgr_windows.cc @@ -83,7 +83,9 @@ static bool iomgr_platform_add_closure_to_background_poller( } static grpc_iomgr_platform_vtable vtable = { - iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, + iomgr_platform_init, + iomgr_platform_flush, + iomgr_platform_shutdown, iomgr_platform_shutdown_background_closure, iomgr_platform_is_any_background_poller_thread, iomgr_platform_add_closure_to_background_poller};