mirror of https://github.com/grpc/grpc.git
EventEngine::Run microbenchmarks (#30769)
* [WIP] EventEngine::Run microbenchmarks * Add fanout impl and fix tracking of time spent doing work in threads * tune down benchmarks; fix fanout counting logic. * tune down closure fanout tests * format * odr * reviewer feedback * unify some fanout logic; add a large-AnyInvocable test lambdas that take an allocation are about 10x slower * reviewer feedback * fix invalid vector access * rm DNS * format * copy params for each lambda callback This fixes segfaults when we cannot ensure all callbacks are complete before exiting the test. * s/promise/Notification/g bm_exec_ctx * ODR and leak * fix division by zero * fixpull/31143/head
parent
96d27634a4
commit
a874b8f6ca
3 changed files with 403 additions and 0 deletions
@ -0,0 +1,272 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <atomic> |
||||
#include <cmath> |
||||
#include <memory> |
||||
|
||||
#include <benchmark/benchmark.h> |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpcpp/impl/grpc_library.h> |
||||
|
||||
#include "src/core/lib/event_engine/common_closures.h" |
||||
#include "src/core/lib/event_engine/default_event_engine.h" |
||||
#include "src/core/lib/gprpp/notification.h" |
||||
#include "test/core/util/test_config.h" |
||||
#include "test/cpp/microbenchmarks/helpers.h" |
||||
#include "test/cpp/util/test_config.h" |
||||
|
||||
namespace { |
||||
|
||||
using ::grpc_event_engine::experimental::AnyInvocableClosure; |
||||
using ::grpc_event_engine::experimental::EventEngine; |
||||
using ::grpc_event_engine::experimental::GetDefaultEventEngine; |
||||
using ::grpc_event_engine::experimental::ResetDefaultEventEngine; |
||||
|
||||
struct FanoutParameters { |
||||
int depth; |
||||
int fanout; |
||||
int limit; |
||||
}; |
||||
|
||||
void BM_EventEngine_RunSmallLambda(benchmark::State& state) { |
||||
auto engine = GetDefaultEventEngine(); |
||||
const int cb_count = state.range(0); |
||||
std::atomic_int count{0}; |
||||
for (auto _ : state) { |
||||
state.PauseTiming(); |
||||
grpc_core::Notification signal; |
||||
auto cb = [&signal, &count, cb_count]() { |
||||
if (++count == cb_count) signal.Notify(); |
||||
}; |
||||
state.ResumeTiming(); |
||||
for (int i = 0; i < cb_count; i++) { |
||||
engine->Run(cb); |
||||
} |
||||
signal.WaitForNotification(); |
||||
count.store(0); |
||||
} |
||||
ResetDefaultEventEngine(); |
||||
state.SetItemsProcessed(cb_count * state.iterations()); |
||||
} |
||||
BENCHMARK(BM_EventEngine_RunSmallLambda) |
||||
->Range(100, 4096) |
||||
->MeasureProcessCPUTime() |
||||
->UseRealTime(); |
||||
|
||||
void BM_EventEngine_RunLargeLambda(benchmark::State& state) { |
||||
int cb_count = state.range(0); |
||||
// large lambdas require an extra allocation
|
||||
std::string extra = "12345678"; |
||||
auto engine = GetDefaultEventEngine(); |
||||
std::atomic_int count{0}; |
||||
for (auto _ : state) { |
||||
state.PauseTiming(); |
||||
grpc_core::Notification signal; |
||||
auto cb = [&signal, &count, cb_count, extra]() { |
||||
(void)extra; |
||||
if (++count == cb_count) signal.Notify(); |
||||
}; |
||||
state.ResumeTiming(); |
||||
for (int i = 0; i < cb_count; i++) { |
||||
engine->Run(cb); |
||||
} |
||||
signal.WaitForNotification(); |
||||
count.store(0); |
||||
} |
||||
ResetDefaultEventEngine(); |
||||
state.SetItemsProcessed(cb_count * state.iterations()); |
||||
} |
||||
BENCHMARK(BM_EventEngine_RunLargeLambda) |
||||
->Range(100, 4096) |
||||
->MeasureProcessCPUTime() |
||||
->UseRealTime(); |
||||
|
||||
void BM_EventEngine_RunClosure(benchmark::State& state) { |
||||
int cb_count = state.range(0); |
||||
grpc_core::Notification* signal = new grpc_core::Notification(); |
||||
std::atomic_int count{0}; |
||||
AnyInvocableClosure closure([signal_holder = &signal, cb_count, &count]() { |
||||
if (++count == cb_count) { |
||||
(*signal_holder)->Notify(); |
||||
} |
||||
}); |
||||
auto engine = GetDefaultEventEngine(); |
||||
for (auto _ : state) { |
||||
for (int i = 0; i < cb_count; i++) { |
||||
engine->Run(&closure); |
||||
} |
||||
signal->WaitForNotification(); |
||||
state.PauseTiming(); |
||||
delete signal; |
||||
signal = new grpc_core::Notification(); |
||||
count.store(0); |
||||
state.ResumeTiming(); |
||||
} |
||||
delete signal; |
||||
ResetDefaultEventEngine(); |
||||
state.SetItemsProcessed(cb_count * state.iterations()); |
||||
} |
||||
BENCHMARK(BM_EventEngine_RunClosure) |
||||
->Range(100, 4096) |
||||
->MeasureProcessCPUTime() |
||||
->UseRealTime(); |
||||
|
||||
void FanoutTestArguments(benchmark::internal::Benchmark* b) { |
||||
// TODO(hork): enable when the engines are fast enough to run these:
|
||||
// ->Args({10000, 1}) // chain of callbacks scheduling callbacks
|
||||
// ->Args({1, 10000}) // flat scheduling of callbacks
|
||||
// ->Args({5, 6}) // depth 5, fans out to 9,330 callbacks
|
||||
// ->Args({2, 100}) // depth 2, fans out 10,101 callbacks
|
||||
// ->Args({4, 10}) // depth 4, fans out to 11,110 callbacks
|
||||
b->Args({1000, 1}) // chain of callbacks scheduling callbacks
|
||||
->Args({100, 1}) // chain of callbacks scheduling callbacks
|
||||
->Args({1, 1000}) // flat scheduling of callbacks
|
||||
->Args({1, 100}) // flat scheduling of callbacks
|
||||
->Args({2, 70}) // depth 2, fans out 4971
|
||||
->Args({4, 8}) // depth 4, fans out 4681
|
||||
->UseRealTime() |
||||
->MeasureProcessCPUTime(); |
||||
} |
||||
|
||||
FanoutParameters GetFanoutParameters(benchmark::State& state) { |
||||
FanoutParameters params; |
||||
params.depth = state.range(0); |
||||
params.fanout = state.range(1); |
||||
if (params.depth == 1 || params.fanout == 1) { |
||||
params.limit = std::max(params.depth, params.fanout) + 1; |
||||
} else { |
||||
// sum of geometric series
|
||||
params.limit = |
||||
(1 - std::pow(params.fanout, params.depth + 1)) / (1 - params.fanout); |
||||
} |
||||
// sanity checking
|
||||
GPR_ASSERT(params.limit >= params.fanout * params.depth); |
||||
return params; |
||||
} |
||||
|
||||
// EventEngine callback for Lambda FanOut tests
|
||||
//
|
||||
// Note that params are copied each time for 2 reasons: 1) callbacks will
|
||||
// inevitably continue to shut down after the end of the test, so a reference
|
||||
// parameter will become invalid and crash some callbacks, and 2) in my RBE
|
||||
// tests, copies are slightly faster than a shared_ptr<FanoutParams>
|
||||
// alternative.
|
||||
void FanOutCallback(EventEngine* engine, const FanoutParameters params, |
||||
grpc_core::Notification& signal, std::atomic_int& count, |
||||
int processing_layer) { |
||||
int local_cnt = count.fetch_add(1, std::memory_order_acq_rel) + 1; |
||||
if (local_cnt == params.limit) { |
||||
signal.Notify(); |
||||
return; |
||||
} |
||||
GPR_DEBUG_ASSERT(local_cnt < params.limit); |
||||
if (params.depth == processing_layer) return; |
||||
for (int i = 0; i < params.fanout; i++) { |
||||
engine->Run([engine, params, processing_layer, &count, &signal]() { |
||||
FanOutCallback(engine, params, signal, count, processing_layer + 1); |
||||
}); |
||||
} |
||||
} |
||||
|
||||
void BM_EventEngine_Lambda_FanOut(benchmark::State& state) { |
||||
auto params = GetFanoutParameters(state); |
||||
auto engine = GetDefaultEventEngine(); |
||||
for (auto _ : state) { |
||||
std::atomic_int count{0}; |
||||
grpc_core::Notification signal; |
||||
FanOutCallback(engine, params, signal, count, /*processing_layer=*/0); |
||||
do { |
||||
signal.WaitForNotification(); |
||||
} while (count.load() != params.limit); |
||||
} |
||||
state.SetItemsProcessed(params.limit * state.iterations()); |
||||
} |
||||
BENCHMARK(BM_EventEngine_Lambda_FanOut)->Apply(FanoutTestArguments); |
||||
|
||||
void ClosureFanOutCallback(EventEngine::Closure* child_closure, |
||||
EventEngine* engine, |
||||
grpc_core::Notification** signal_holder, |
||||
std::atomic_int& count, |
||||
const FanoutParameters params) { |
||||
int local_cnt = count.fetch_add(1, std::memory_order_acq_rel) + 1; |
||||
if (local_cnt == params.limit) { |
||||
(*signal_holder)->Notify(); |
||||
return; |
||||
} |
||||
if (local_cnt > params.limit) { |
||||
gpr_log(GPR_ERROR, "Ran too many closures: %d/%d", local_cnt, params.limit); |
||||
abort(); |
||||
} |
||||
if (child_closure == nullptr) return; |
||||
for (int i = 0; i < params.fanout; i++) { |
||||
engine->Run(child_closure); |
||||
} |
||||
} |
||||
|
||||
void BM_EventEngine_Closure_FanOut(benchmark::State& state) { |
||||
auto params = GetFanoutParameters(state); |
||||
auto engine = GetDefaultEventEngine(); |
||||
std::vector<EventEngine::Closure*> closures; |
||||
closures.reserve(params.depth + 2); |
||||
closures.push_back(nullptr); |
||||
grpc_core::Notification* signal = new grpc_core::Notification(); |
||||
std::atomic_int count{0}; |
||||
// prepare a unique closure for each depth
|
||||
for (int i = 0; i <= params.depth; i++) { |
||||
// call the previous closure (e.g., closures[2] calls closures[1] during
|
||||
// fanout)
|
||||
closures.push_back(new AnyInvocableClosure( |
||||
[i, engine, &closures, params, signal_holder = &signal, &count]() { |
||||
ClosureFanOutCallback(closures[i], engine, signal_holder, count, |
||||
params); |
||||
})); |
||||
} |
||||
for (auto _ : state) { |
||||
GPR_DEBUG_ASSERT(count.load(std::memory_order_relaxed) == 0); |
||||
engine->Run(closures[params.depth + 1]); |
||||
do { |
||||
signal->WaitForNotification(); |
||||
} while (count.load() != params.limit); |
||||
// cleanup
|
||||
state.PauseTiming(); |
||||
delete signal; |
||||
signal = new grpc_core::Notification(); |
||||
count.store(0); |
||||
state.ResumeTiming(); |
||||
} |
||||
delete signal; |
||||
state.SetItemsProcessed(params.limit * state.iterations()); |
||||
for (auto i : closures) delete i; |
||||
} |
||||
BENCHMARK(BM_EventEngine_Closure_FanOut)->Apply(FanoutTestArguments); |
||||
|
||||
} // namespace
|
||||
|
||||
// Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
|
||||
// and others do not. This allows us to support both modes.
|
||||
namespace benchmark { |
||||
void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } |
||||
} // namespace benchmark
|
||||
|
||||
int main(int argc, char** argv) { |
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
LibraryInitializer libInit; |
||||
benchmark::Initialize(&argc, argv); |
||||
grpc::testing::InitTest(&argc, &argv, false); |
||||
|
||||
benchmark::RunTheBenchmarksNamespaced(); |
||||
return 0; |
||||
} |
@ -0,0 +1,104 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <atomic> |
||||
#include <memory> |
||||
|
||||
#include <benchmark/benchmark.h> |
||||
|
||||
#include <grpcpp/impl/grpc_library.h> |
||||
|
||||
#include "src/core/lib/gprpp/notification.h" |
||||
#include "test/core/util/test_config.h" |
||||
#include "test/cpp/microbenchmarks/helpers.h" |
||||
#include "test/cpp/util/test_config.h" |
||||
|
||||
namespace { |
||||
void NoOpCb(void* /* arg */, grpc_error_handle /* error */) {} |
||||
|
||||
void BM_ExecCtx_Run(benchmark::State& state) { |
||||
int cb_count = state.range(0); |
||||
grpc_closure cb; |
||||
GRPC_CLOSURE_INIT(&cb, NoOpCb, nullptr, nullptr); |
||||
grpc_core::ExecCtx exec_ctx; |
||||
for (auto _ : state) { |
||||
for (int i = 0; i < cb_count; i++) { |
||||
exec_ctx.Run(DEBUG_LOCATION, &cb, GRPC_ERROR_NONE); |
||||
exec_ctx.Flush(); |
||||
} |
||||
} |
||||
state.SetItemsProcessed(cb_count * state.iterations()); |
||||
} |
||||
BENCHMARK(BM_ExecCtx_Run) |
||||
->Range(100, 10000) |
||||
->MeasureProcessCPUTime() |
||||
->UseRealTime(); |
||||
|
||||
struct CountingCbData { |
||||
std::atomic_int cnt{0}; |
||||
grpc_core::Notification* signal; |
||||
int limit; |
||||
}; |
||||
|
||||
void CountingCb(void* arg, grpc_error_handle) { |
||||
auto* data = static_cast<CountingCbData*>(arg); |
||||
if (++(data->cnt) == data->limit) data->signal->Notify(); |
||||
} |
||||
|
||||
void BM_ExecCtx_RunCounted(benchmark::State& state) { |
||||
// A more fair comparison with EventEngine::Run, which must wait for all
|
||||
// executions to finish
|
||||
int cb_count = state.range(0); |
||||
CountingCbData data; |
||||
data.limit = cb_count; |
||||
data.signal = new grpc_core::Notification(); |
||||
grpc_closure cb; |
||||
GRPC_CLOSURE_INIT(&cb, CountingCb, &data, nullptr); |
||||
grpc_core::ExecCtx exec_ctx; |
||||
for (auto _ : state) { |
||||
for (int i = 0; i < cb_count; i++) { |
||||
exec_ctx.Run(DEBUG_LOCATION, &cb, GRPC_ERROR_NONE); |
||||
exec_ctx.Flush(); |
||||
} |
||||
data.signal->WaitForNotification(); |
||||
state.PauseTiming(); |
||||
delete data.signal; |
||||
data.signal = new grpc_core::Notification(); |
||||
data.cnt = 0; |
||||
state.ResumeTiming(); |
||||
} |
||||
delete data.signal; |
||||
state.SetItemsProcessed(cb_count * state.iterations()); |
||||
} |
||||
BENCHMARK(BM_ExecCtx_RunCounted) |
||||
->Range(100, 10000) |
||||
->MeasureProcessCPUTime() |
||||
->UseRealTime(); |
||||
} // namespace
|
||||
|
||||
// Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
|
||||
// and others do not. This allows us to support both modes.
|
||||
namespace benchmark { |
||||
void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } |
||||
} // namespace benchmark
|
||||
|
||||
int main(int argc, char** argv) { |
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
LibraryInitializer libInit; |
||||
benchmark::Initialize(&argc, argv); |
||||
grpc::testing::InitTest(&argc, &argv, false); |
||||
|
||||
benchmark::RunTheBenchmarksNamespaced(); |
||||
return 0; |
||||
} |
Loading…
Reference in new issue