From 17e3611c0d5c55da1c7ae69f956e6024502edc25 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 9 Aug 2018 09:18:17 -0700 Subject: [PATCH 1/5] Infrastructure for adding custom polling engines --- build.yaml | 1 + src/core/lib/iomgr/ev_posix.cc | 41 +++++++++++++++---- src/core/lib/iomgr/ev_posix.h | 10 ++--- .../microbenchmarks/bm_cq_multiple_threads.cc | 28 ++++++------- tools/run_tests/generated/tests.json | 2 +- 5 files changed, 53 insertions(+), 29 deletions(-) diff --git a/build.yaml b/build.yaml index 70af96046cb..53c93c2b161 100644 --- a/build.yaml +++ b/build.yaml @@ -4106,6 +4106,7 @@ targets: - mac - linux - posix + uses_polling: false - name: bm_error build: test language: c++ diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 0e45fc42cad..c30614e7e53 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -101,10 +101,15 @@ const grpc_event_engine_vtable* init_non_polling(bool explicit_request) { } } // namespace -static const event_engine_factory g_factories[] = { +#define ENGINE_HEAD_CUSTOM "head_custom" +#define ENGINE_TAIL_CUSTOM "tail_custom" + +static event_engine_factory g_factories[] = { + {ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr}, {"epollex", grpc_init_epollex_linux}, {"epoll1", grpc_init_epoll1_linux}, {"epollsig", grpc_init_epollsig_linux}, {"poll", grpc_init_poll_posix}, {"poll-cv", grpc_init_poll_cv_posix}, {"none", init_non_polling}, + {ENGINE_TAIL_CUSTOM, nullptr}, {ENGINE_TAIL_CUSTOM, nullptr}, }; static void add(const char* beg, const char* end, char*** ss, size_t* ns) { @@ -138,7 +143,7 @@ static bool is(const char* want, const char* have) { static void try_engine(const char* engine) { for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) { - if (is(engine, g_factories[i].name)) { + if (g_factories[i].factory != nullptr && is(engine, g_factories[i].name)) { if ((g_event_engine = g_factories[i].factory( 0 == strcmp(engine, g_factories[i].name)))) { g_poll_strategy_name = g_factories[i].name; @@ -149,14 +154,32 @@ static void try_engine(const char* engine) { } } -/* This should be used for testing purposes ONLY */ -void grpc_set_event_engine_test_only( - const grpc_event_engine_vtable* ev_engine) { - g_event_engine = ev_engine; -} +/* Call this before calling grpc_event_engine_init() */ +void grpc_register_event_engine_factory(const char* name, + event_engine_factory_fn factory, + bool add_at_head) { + const char* custom_match = + add_at_head ? ENGINE_HEAD_CUSTOM : ENGINE_TAIL_CUSTOM; + + // Overwrite an existing registration if already registered + for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) { + if (0 == strcmp(name, g_factories[i].name)) { + g_factories[i].factory = factory; + return; + } + } + + // Otherwise fill in an available custom slot + for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) { + if (0 == strcmp(g_factories[i].name, custom_match)) { + g_factories[i].name = name; + g_factories[i].factory = factory; + return; + } + } -const grpc_event_engine_vtable* grpc_get_event_engine_test_only() { - return g_event_engine; + // Otherwise fail + GPR_ASSERT(false); } /* Call this only after calling grpc_event_engine_init() */ diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 8d0bcc07101..b8fb8f534b4 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -82,6 +82,11 @@ typedef struct grpc_event_engine_vtable { void (*shutdown_engine)(void); } grpc_event_engine_vtable; +/* register a new event engine factory */ +void grpc_register_event_engine_factory( + const char* name, const grpc_event_engine_vtable* (*factory)(bool), + bool add_at_head); + void grpc_event_engine_init(void); void grpc_event_engine_shutdown(void); @@ -173,9 +178,4 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd); typedef int (*grpc_poll_function_type)(struct pollfd*, nfds_t, int); extern grpc_poll_function_type grpc_poll_function; -/* WARNING: The following two functions should be used for testing purposes - * ONLY */ -void grpc_set_event_engine_test_only(const grpc_event_engine_vtable*); -const grpc_event_engine_vtable* grpc_get_event_engine_test_only(); - #endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */ diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc index da095c3e68b..2f66a6d53ec 100644 --- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc +++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc @@ -37,12 +37,9 @@ struct grpc_pollset { namespace grpc { namespace testing { -auto& force_library_initialization = Library::get(); - static void* g_tag = (void*)static_cast(10); // Some random number static grpc_completion_queue* g_cq; static grpc_event_engine_vtable g_vtable; -static const grpc_event_engine_vtable* g_old_vtable; static void pollset_shutdown(grpc_pollset* ps, grpc_closure* closure) { GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); @@ -83,7 +80,7 @@ static grpc_error* pollset_work(grpc_pollset* ps, grpc_pollset_worker** worker, return GRPC_ERROR_NONE; } -static void init_engine_vtable() { +static const grpc_event_engine_vtable* init_engine_vtable(bool) { memset(&g_vtable, 0, sizeof(g_vtable)); g_vtable.pollset_size = sizeof(grpc_pollset); @@ -92,17 +89,23 @@ static void init_engine_vtable() { g_vtable.pollset_destroy = pollset_destroy; g_vtable.pollset_work = pollset_work; g_vtable.pollset_kick = pollset_kick; + g_vtable.shutdown_engine = [] {}; + + return &g_vtable; } static void setup() { - grpc_init(); + // This test should only ever be run with a non or any polling engine + // Override the polling engine for the non-polling engine + // and add a custom polling engine + grpc_register_event_engine_factory("none", init_engine_vtable, false); + grpc_register_event_engine_factory("bm_cq_multiple_threads", + init_engine_vtable, true); - /* Override the event engine with our test event engine (g_vtable); but before - * that, save the current event engine in g_old_vtable. We will have to set - * g_old_vtable back before calling grpc_shutdown() */ - init_engine_vtable(); - g_old_vtable = grpc_get_event_engine_test_only(); - grpc_set_event_engine_test_only(&g_vtable); + grpc_init(); + GPR_ASSERT(strcmp(grpc_get_poll_strategy_name(), "none") == 0 || + strcmp(grpc_get_poll_strategy_name(), "bm_cq_multiple_threads") == + 0); g_cq = grpc_completion_queue_create_for_next(nullptr); } @@ -118,9 +121,6 @@ static void teardown() { } grpc_completion_queue_destroy(g_cq); - - /* Restore the old event engine before calling grpc_shutdown */ - grpc_set_event_engine_test_only(g_old_vtable); grpc_shutdown(); } diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index cf3b54e0440..03e809c298a 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -3483,7 +3483,7 @@ "mac", "posix" ], - "uses_polling": true + "uses_polling": false }, { "args": [], From 9043a4f56d899a377d8763fa868b277d23e7b213 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Fri, 10 Aug 2018 06:11:40 +0000 Subject: [PATCH 2/5] Some cleanup --- test/cpp/microbenchmarks/bm_cq_multiple_threads.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc index 2f66a6d53ec..4a5487f1c43 100644 --- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc +++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc @@ -141,8 +141,9 @@ static void teardown() { static void BM_Cq_Throughput(benchmark::State& state) { TrackCounters track_counters; gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + auto thd_idx = state.thread_index; - if (state.thread_index == 0) { + if (thd_idx == 0) { setup(); } @@ -152,12 +153,11 @@ static void BM_Cq_Throughput(benchmark::State& state) { } state.SetItemsProcessed(state.iterations()); + track_counters.Finish(state); - if (state.thread_index == 0) { + if (thd_idx == 0) { teardown(); } - - track_counters.Finish(state); } BENCHMARK(BM_Cq_Throughput)->ThreadRange(1, 16)->UseRealTime(); From a0e92e7727ded204e3ada8f5cfa455805098852f Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Fri, 10 Aug 2018 14:57:52 -0700 Subject: [PATCH 3/5] Add proper synchronization so that stats are setup and destroyed cleanly --- .../microbenchmarks/bm_cq_multiple_threads.cc | 45 ++++++++++++++++--- 1 file changed, 40 insertions(+), 5 deletions(-) diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc index 4a5487f1c43..06922afda3c 100644 --- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc +++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc @@ -34,10 +34,13 @@ struct grpc_pollset { gpr_mu mu; }; +static gpr_mu g_mu; +static gpr_cv g_cv; +static int g_threads_active; +static bool g_active; + namespace grpc { namespace testing { - -static void* g_tag = (void*)static_cast(10); // Some random number static grpc_completion_queue* g_cq; static grpc_event_engine_vtable g_vtable; @@ -71,9 +74,11 @@ static grpc_error* pollset_work(grpc_pollset* ps, grpc_pollset_worker** worker, } gpr_mu_unlock(&ps->mu); - GPR_ASSERT(grpc_cq_begin_op(g_cq, g_tag)); + + void* tag = (void*)static_cast(10); // Some random number + GPR_ASSERT(grpc_cq_begin_op(g_cq, tag)); grpc_cq_end_op( - g_cq, g_tag, GRPC_ERROR_NONE, cq_done_cb, nullptr, + g_cq, tag, GRPC_ERROR_NONE, cq_done_cb, nullptr, static_cast(gpr_malloc(sizeof(grpc_cq_completion)))); grpc_core::ExecCtx::Get()->Flush(); gpr_mu_lock(&ps->mu); @@ -137,15 +142,31 @@ static void teardown() { code (i.e the code between two successive calls of state.KeepRunning()) if state.KeepRunning() returns false. So it is safe to do the teardown in one of the threads after state.keepRunning() returns false. + + However, our use requires synchronization because we do additional work at + each thread that requires specific ordering (TrackCounters must be constructed + after grpc_init because it needs the number of cores, initialized by grpc, + and its Finish call must take place before grpc_shutdown so that it can use + grpc_stats). */ static void BM_Cq_Throughput(benchmark::State& state) { - TrackCounters track_counters; gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); auto thd_idx = state.thread_index; + gpr_mu_lock(&g_mu); + g_threads_active++; if (thd_idx == 0) { setup(); + g_active = true; + gpr_cv_broadcast(&g_cv); + } else { + while (!g_active) { + gpr_cv_wait(&g_cv, &g_mu, deadline); + } } + gpr_mu_unlock(&g_mu); + + TrackCounters track_counters; while (state.KeepRunning()) { GPR_ASSERT(grpc_completion_queue_next(g_cq, deadline, nullptr).type == @@ -155,8 +176,20 @@ static void BM_Cq_Throughput(benchmark::State& state) { state.SetItemsProcessed(state.iterations()); track_counters.Finish(state); + gpr_mu_lock(&g_mu); + g_threads_active--; + if (g_threads_active == 0) { + gpr_cv_broadcast(&g_cv); + } else { + while (g_threads_active > 0) { + gpr_cv_wait(&g_cv, &g_mu, deadline); + } + } + gpr_mu_unlock(&g_mu); + if (thd_idx == 0) { teardown(); + g_active = false; } } @@ -172,6 +205,8 @@ void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } } // namespace benchmark int main(int argc, char** argv) { + gpr_mu_init(&g_mu); + gpr_cv_init(&g_cv); ::benchmark::Initialize(&argc, argv); ::grpc::testing::InitTest(&argc, &argv, false); benchmark::RunTheBenchmarksNamespaced(); From a4326eb7b829cb479d14d1c4029c8522ab7f572b Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 13 Aug 2018 23:01:23 -0700 Subject: [PATCH 4/5] Add comment to address reviewer comment --- test/cpp/microbenchmarks/bm_cq_multiple_threads.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc index 06922afda3c..85767c8758f 100644 --- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc +++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc @@ -166,6 +166,8 @@ static void BM_Cq_Throughput(benchmark::State& state) { } gpr_mu_unlock(&g_mu); + // Use a TrackCounters object to monitor the gRPC performance statistics + // (optionally including low-level counters) before and after the test TrackCounters track_counters; while (state.KeepRunning()) { From 4eff37345fbe03f850ec0ddb039e73035eaa931b Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 13 Aug 2018 23:13:41 -0700 Subject: [PATCH 5/5] Add detailed comment for g_factories --- src/core/lib/iomgr/ev_posix.cc | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index c30614e7e53..b8fe017ce7d 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -104,12 +104,25 @@ const grpc_event_engine_vtable* init_non_polling(bool explicit_request) { #define ENGINE_HEAD_CUSTOM "head_custom" #define ENGINE_TAIL_CUSTOM "tail_custom" +// The global array of event-engine factories. Each entry is a pair with a name +// and an event-engine generator function (nullptr if there is no generator +// registered for this name). The middle entries are the engines predefined by +// open-source gRPC. The head entries represent an opportunity for specific +// high-priority custom pollers to be added by the initializer plugins of +// custom-built gRPC libraries. The tail entries represent the same, but for +// low-priority custom pollers. The actual poller selected is either the first +// available one in the list if no specific poller is requested, or the first +// specific poller that is requested by name in the GRPC_POLL_STRATEGY +// environment variable if that variable is set (which should be a +// comma-separated list of one or more event engine names) static event_engine_factory g_factories[] = { + {ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr}, {"epollex", grpc_init_epollex_linux}, {"epoll1", grpc_init_epoll1_linux}, {"epollsig", grpc_init_epollsig_linux}, {"poll", grpc_init_poll_posix}, {"poll-cv", grpc_init_poll_cv_posix}, {"none", init_non_polling}, {ENGINE_TAIL_CUSTOM, nullptr}, {ENGINE_TAIL_CUSTOM, nullptr}, + {ENGINE_TAIL_CUSTOM, nullptr}, {ENGINE_TAIL_CUSTOM, nullptr}, }; static void add(const char* beg, const char* end, char*** ss, size_t* ns) {