From a874b8f6cab0a5f77f8106c54865c9e036b3e72e Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Tue, 27 Sep 2022 14:57:36 -0700 Subject: [PATCH] EventEngine::Run microbenchmarks (#30769) * [WIP] EventEngine::Run microbenchmarks * Add fanout impl and fix tracking of time spent doing work in threads * tune down benchmarks; fix fanout counting logic. * tune down closure fanout tests * format * odr * reviewer feedback * unify some fanout logic; add a large-AnyInvocable test lambdas that take an allocation are about 10x slower * reviewer feedback * fix invalid vector access * rm DNS * format * copy params for each lambda callback This fixes segfaults when we cannot ensure all callbacks are complete before exiting the test. * s/promise/Notification/g bm_exec_ctx * ODR and leak * fix division by zero * fix --- test/cpp/microbenchmarks/BUILD | 27 ++ .../microbenchmarks/bm_event_engine_run.cc | 272 ++++++++++++++++++ test/cpp/microbenchmarks/bm_exec_ctx.cc | 104 +++++++ 3 files changed, 403 insertions(+) create mode 100644 test/cpp/microbenchmarks/bm_event_engine_run.cc create mode 100644 test/cpp/microbenchmarks/bm_exec_ctx.cc diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD index 13e42712a88..3c7d4244a9b 100644 --- a/test/cpp/microbenchmarks/BUILD +++ b/test/cpp/microbenchmarks/BUILD @@ -42,6 +42,33 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "bm_exec_ctx", + srcs = ["bm_exec_ctx.cc"], + args = ["--benchmark_min_time=0.3"], + external_deps = [ + "benchmark", + ], + uses_event_engine = False, + uses_polling = False, + deps = [":helpers"], +) + +grpc_cc_test( + name = "bm_event_engine_run", + size = "small", + srcs = ["bm_event_engine_run.cc"], + args = ["--benchmark_min_time=0.3"], + external_deps = [ + "benchmark", + ], + uses_polling = False, + deps = [ + ":helpers", + "//:common_event_engine_closures", + ], +) + grpc_cc_library( name = "helpers", testonly = 1, diff --git a/test/cpp/microbenchmarks/bm_event_engine_run.cc b/test/cpp/microbenchmarks/bm_event_engine_run.cc new file mode 100644 index 00000000000..93bb4689718 --- /dev/null +++ b/test/cpp/microbenchmarks/bm_event_engine_run.cc @@ -0,0 +1,272 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include + +#include +#include + +#include "src/core/lib/event_engine/common_closures.h" +#include "src/core/lib/event_engine/default_event_engine.h" +#include "src/core/lib/gprpp/notification.h" +#include "test/core/util/test_config.h" +#include "test/cpp/microbenchmarks/helpers.h" +#include "test/cpp/util/test_config.h" + +namespace { + +using ::grpc_event_engine::experimental::AnyInvocableClosure; +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::GetDefaultEventEngine; +using ::grpc_event_engine::experimental::ResetDefaultEventEngine; + +struct FanoutParameters { + int depth; + int fanout; + int limit; +}; + +void BM_EventEngine_RunSmallLambda(benchmark::State& state) { + auto engine = GetDefaultEventEngine(); + const int cb_count = state.range(0); + std::atomic_int count{0}; + for (auto _ : state) { + state.PauseTiming(); + grpc_core::Notification signal; + auto cb = [&signal, &count, cb_count]() { + if (++count == cb_count) signal.Notify(); + }; + state.ResumeTiming(); + for (int i = 0; i < cb_count; i++) { + engine->Run(cb); + } + signal.WaitForNotification(); + count.store(0); + } + ResetDefaultEventEngine(); + state.SetItemsProcessed(cb_count * state.iterations()); +} +BENCHMARK(BM_EventEngine_RunSmallLambda) + ->Range(100, 4096) + ->MeasureProcessCPUTime() + ->UseRealTime(); + +void BM_EventEngine_RunLargeLambda(benchmark::State& state) { + int cb_count = state.range(0); + // large lambdas require an extra allocation + std::string extra = "12345678"; + auto engine = GetDefaultEventEngine(); + std::atomic_int count{0}; + for (auto _ : state) { + state.PauseTiming(); + grpc_core::Notification signal; + auto cb = [&signal, &count, cb_count, extra]() { + (void)extra; + if (++count == cb_count) signal.Notify(); + }; + state.ResumeTiming(); + for (int i = 0; i < cb_count; i++) { + engine->Run(cb); + } + signal.WaitForNotification(); + count.store(0); + } + ResetDefaultEventEngine(); + state.SetItemsProcessed(cb_count * state.iterations()); +} +BENCHMARK(BM_EventEngine_RunLargeLambda) + ->Range(100, 4096) + ->MeasureProcessCPUTime() + ->UseRealTime(); + +void BM_EventEngine_RunClosure(benchmark::State& state) { + int cb_count = state.range(0); + grpc_core::Notification* signal = new grpc_core::Notification(); + std::atomic_int count{0}; + AnyInvocableClosure closure([signal_holder = &signal, cb_count, &count]() { + if (++count == cb_count) { + (*signal_holder)->Notify(); + } + }); + auto engine = GetDefaultEventEngine(); + for (auto _ : state) { + for (int i = 0; i < cb_count; i++) { + engine->Run(&closure); + } + signal->WaitForNotification(); + state.PauseTiming(); + delete signal; + signal = new grpc_core::Notification(); + count.store(0); + state.ResumeTiming(); + } + delete signal; + ResetDefaultEventEngine(); + state.SetItemsProcessed(cb_count * state.iterations()); +} +BENCHMARK(BM_EventEngine_RunClosure) + ->Range(100, 4096) + ->MeasureProcessCPUTime() + ->UseRealTime(); + +void FanoutTestArguments(benchmark::internal::Benchmark* b) { + // TODO(hork): enable when the engines are fast enough to run these: + // ->Args({10000, 1}) // chain of callbacks scheduling callbacks + // ->Args({1, 10000}) // flat scheduling of callbacks + // ->Args({5, 6}) // depth 5, fans out to 9,330 callbacks + // ->Args({2, 100}) // depth 2, fans out 10,101 callbacks + // ->Args({4, 10}) // depth 4, fans out to 11,110 callbacks + b->Args({1000, 1}) // chain of callbacks scheduling callbacks + ->Args({100, 1}) // chain of callbacks scheduling callbacks + ->Args({1, 1000}) // flat scheduling of callbacks + ->Args({1, 100}) // flat scheduling of callbacks + ->Args({2, 70}) // depth 2, fans out 4971 + ->Args({4, 8}) // depth 4, fans out 4681 + ->UseRealTime() + ->MeasureProcessCPUTime(); +} + +FanoutParameters GetFanoutParameters(benchmark::State& state) { + FanoutParameters params; + params.depth = state.range(0); + params.fanout = state.range(1); + if (params.depth == 1 || params.fanout == 1) { + params.limit = std::max(params.depth, params.fanout) + 1; + } else { + // sum of geometric series + params.limit = + (1 - std::pow(params.fanout, params.depth + 1)) / (1 - params.fanout); + } + // sanity checking + GPR_ASSERT(params.limit >= params.fanout * params.depth); + return params; +} + +// EventEngine callback for Lambda FanOut tests +// +// Note that params are copied each time for 2 reasons: 1) callbacks will +// inevitably continue to shut down after the end of the test, so a reference +// parameter will become invalid and crash some callbacks, and 2) in my RBE +// tests, copies are slightly faster than a shared_ptr +// alternative. +void FanOutCallback(EventEngine* engine, const FanoutParameters params, + grpc_core::Notification& signal, std::atomic_int& count, + int processing_layer) { + int local_cnt = count.fetch_add(1, std::memory_order_acq_rel) + 1; + if (local_cnt == params.limit) { + signal.Notify(); + return; + } + GPR_DEBUG_ASSERT(local_cnt < params.limit); + if (params.depth == processing_layer) return; + for (int i = 0; i < params.fanout; i++) { + engine->Run([engine, params, processing_layer, &count, &signal]() { + FanOutCallback(engine, params, signal, count, processing_layer + 1); + }); + } +} + +void BM_EventEngine_Lambda_FanOut(benchmark::State& state) { + auto params = GetFanoutParameters(state); + auto engine = GetDefaultEventEngine(); + for (auto _ : state) { + std::atomic_int count{0}; + grpc_core::Notification signal; + FanOutCallback(engine, params, signal, count, /*processing_layer=*/0); + do { + signal.WaitForNotification(); + } while (count.load() != params.limit); + } + state.SetItemsProcessed(params.limit * state.iterations()); +} +BENCHMARK(BM_EventEngine_Lambda_FanOut)->Apply(FanoutTestArguments); + +void ClosureFanOutCallback(EventEngine::Closure* child_closure, + EventEngine* engine, + grpc_core::Notification** signal_holder, + std::atomic_int& count, + const FanoutParameters params) { + int local_cnt = count.fetch_add(1, std::memory_order_acq_rel) + 1; + if (local_cnt == params.limit) { + (*signal_holder)->Notify(); + return; + } + if (local_cnt > params.limit) { + gpr_log(GPR_ERROR, "Ran too many closures: %d/%d", local_cnt, params.limit); + abort(); + } + if (child_closure == nullptr) return; + for (int i = 0; i < params.fanout; i++) { + engine->Run(child_closure); + } +} + +void BM_EventEngine_Closure_FanOut(benchmark::State& state) { + auto params = GetFanoutParameters(state); + auto engine = GetDefaultEventEngine(); + std::vector closures; + closures.reserve(params.depth + 2); + closures.push_back(nullptr); + grpc_core::Notification* signal = new grpc_core::Notification(); + std::atomic_int count{0}; + // prepare a unique closure for each depth + for (int i = 0; i <= params.depth; i++) { + // call the previous closure (e.g., closures[2] calls closures[1] during + // fanout) + closures.push_back(new AnyInvocableClosure( + [i, engine, &closures, params, signal_holder = &signal, &count]() { + ClosureFanOutCallback(closures[i], engine, signal_holder, count, + params); + })); + } + for (auto _ : state) { + GPR_DEBUG_ASSERT(count.load(std::memory_order_relaxed) == 0); + engine->Run(closures[params.depth + 1]); + do { + signal->WaitForNotification(); + } while (count.load() != params.limit); + // cleanup + state.PauseTiming(); + delete signal; + signal = new grpc_core::Notification(); + count.store(0); + state.ResumeTiming(); + } + delete signal; + state.SetItemsProcessed(params.limit * state.iterations()); + for (auto i : closures) delete i; +} +BENCHMARK(BM_EventEngine_Closure_FanOut)->Apply(FanoutTestArguments); + +} // namespace + +// Some distros have RunSpecifiedBenchmarks under the benchmark namespace, +// and others do not. This allows us to support both modes. +namespace benchmark { +void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } +} // namespace benchmark + +int main(int argc, char** argv) { + grpc::testing::TestEnvironment env(&argc, argv); + LibraryInitializer libInit; + benchmark::Initialize(&argc, argv); + grpc::testing::InitTest(&argc, &argv, false); + + benchmark::RunTheBenchmarksNamespaced(); + return 0; +} diff --git a/test/cpp/microbenchmarks/bm_exec_ctx.cc b/test/cpp/microbenchmarks/bm_exec_ctx.cc new file mode 100644 index 00000000000..da707c0dabe --- /dev/null +++ b/test/cpp/microbenchmarks/bm_exec_ctx.cc @@ -0,0 +1,104 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include + +#include + +#include "src/core/lib/gprpp/notification.h" +#include "test/core/util/test_config.h" +#include "test/cpp/microbenchmarks/helpers.h" +#include "test/cpp/util/test_config.h" + +namespace { +void NoOpCb(void* /* arg */, grpc_error_handle /* error */) {} + +void BM_ExecCtx_Run(benchmark::State& state) { + int cb_count = state.range(0); + grpc_closure cb; + GRPC_CLOSURE_INIT(&cb, NoOpCb, nullptr, nullptr); + grpc_core::ExecCtx exec_ctx; + for (auto _ : state) { + for (int i = 0; i < cb_count; i++) { + exec_ctx.Run(DEBUG_LOCATION, &cb, GRPC_ERROR_NONE); + exec_ctx.Flush(); + } + } + state.SetItemsProcessed(cb_count * state.iterations()); +} +BENCHMARK(BM_ExecCtx_Run) + ->Range(100, 10000) + ->MeasureProcessCPUTime() + ->UseRealTime(); + +struct CountingCbData { + std::atomic_int cnt{0}; + grpc_core::Notification* signal; + int limit; +}; + +void CountingCb(void* arg, grpc_error_handle) { + auto* data = static_cast(arg); + if (++(data->cnt) == data->limit) data->signal->Notify(); +} + +void BM_ExecCtx_RunCounted(benchmark::State& state) { + // A more fair comparison with EventEngine::Run, which must wait for all + // executions to finish + int cb_count = state.range(0); + CountingCbData data; + data.limit = cb_count; + data.signal = new grpc_core::Notification(); + grpc_closure cb; + GRPC_CLOSURE_INIT(&cb, CountingCb, &data, nullptr); + grpc_core::ExecCtx exec_ctx; + for (auto _ : state) { + for (int i = 0; i < cb_count; i++) { + exec_ctx.Run(DEBUG_LOCATION, &cb, GRPC_ERROR_NONE); + exec_ctx.Flush(); + } + data.signal->WaitForNotification(); + state.PauseTiming(); + delete data.signal; + data.signal = new grpc_core::Notification(); + data.cnt = 0; + state.ResumeTiming(); + } + delete data.signal; + state.SetItemsProcessed(cb_count * state.iterations()); +} +BENCHMARK(BM_ExecCtx_RunCounted) + ->Range(100, 10000) + ->MeasureProcessCPUTime() + ->UseRealTime(); +} // namespace + +// Some distros have RunSpecifiedBenchmarks under the benchmark namespace, +// and others do not. This allows us to support both modes. +namespace benchmark { +void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } +} // namespace benchmark + +int main(int argc, char** argv) { + grpc::testing::TestEnvironment env(&argc, argv); + LibraryInitializer libInit; + benchmark::Initialize(&argc, argv); + grpc::testing::InitTest(&argc, &argv, false); + + benchmark::RunTheBenchmarksNamespaced(); + return 0; +}