From 427c8a89e97e2df0f014c8b97ef43151b82826e3 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 4 Mar 2024 14:04:20 -0800 Subject: [PATCH] [chaotic-good] Add a microbenchmark for ping pong round trips (#36050) also: - remove tail recursion from promise endpoint read completion (actually overflowed stack!) - remove retry filter from benchmark - we probably don't want this long term, but for now nobody else is using this benchmark and our use case doesn't use grpc retries so.... good enough Closes #36050 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36050 from ctiller:cgbm 65b1c267675035aad5f1721b57cbc4c65c0de6e1 PiperOrigin-RevId: 612577071 --- src/core/lib/transport/promise_endpoint.cc | 79 ++++++------ test/cpp/microbenchmarks/BUILD | 17 +++ ..._fullstack_unary_ping_pong_chaotic_good.cc | 122 ++++++++++++++++++ test/cpp/microbenchmarks/fullstack_fixtures.h | 2 + 4 files changed, 182 insertions(+), 38 deletions(-) create mode 100644 test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong_chaotic_good.cc diff --git a/src/core/lib/transport/promise_endpoint.cc b/src/core/lib/transport/promise_endpoint.cc index 1fd08b8917e..5159b2dc6d4 100644 --- a/src/core/lib/transport/promise_endpoint.cc +++ b/src/core/lib/transport/promise_endpoint.cc @@ -58,50 +58,53 @@ PromiseEndpoint::GetLocalAddress() const { } void PromiseEndpoint::ReadState::Complete(absl::Status status, - size_t num_bytes_requested) { - if (!status.ok()) { - // Invalidates all previous reads. - pending_buffer.Clear(); - buffer.Clear(); + const size_t num_bytes_requested) { + while (true) { + if (!status.ok()) { + // Invalidates all previous reads. + pending_buffer.Clear(); + buffer.Clear(); + result = status; + auto w = std::move(waker); + complete.store(true, std::memory_order_release); + w.Wakeup(); + return; + } + // Appends `pending_buffer` to `buffer`. + pending_buffer.MoveFirstNBytesIntoSliceBuffer(pending_buffer.Length(), + buffer); + GPR_DEBUG_ASSERT(pending_buffer.Count() == 0u); + if (buffer.Length() < num_bytes_requested) { + // A further read is needed. + // Set read args with number of bytes needed as hint. + grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs + read_args = { + static_cast(num_bytes_requested - buffer.Length())}; + // If `Read()` returns true immediately, the callback will not be + // called. We still need to call our callback to pick up the result and + // maybe do further reads. + auto ep = endpoint.lock(); + if (ep == nullptr) { + status = absl::UnavailableError("Endpoint closed during read."); + continue; + } + if (ep->Read( + [self = Ref(), num_bytes_requested](absl::Status status) { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + self->Complete(std::move(status), num_bytes_requested); + }, + &pending_buffer, &read_args)) { + continue; + } + return; + } result = status; auto w = std::move(waker); complete.store(true, std::memory_order_release); w.Wakeup(); return; } - // Appends `pending_buffer` to `buffer`. - pending_buffer.MoveFirstNBytesIntoSliceBuffer(pending_buffer.Length(), - buffer); - GPR_DEBUG_ASSERT(pending_buffer.Count() == 0u); - if (buffer.Length() < num_bytes_requested) { - // A further read is needed. - // Set read args with number of bytes needed as hint. - grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs read_args = - {static_cast(num_bytes_requested - buffer.Length())}; - // If `Read()` returns true immediately, the callback will not be - // called. We still need to call our callback to pick up the result and - // maybe do further reads. - auto ep = endpoint.lock(); - if (ep == nullptr) { - Complete(absl::UnavailableError("Endpoint closed during read."), - num_bytes_requested); - return; - } - if (ep->Read( - [self = Ref(), num_bytes_requested](absl::Status status) { - ApplicationCallbackExecCtx callback_exec_ctx; - ExecCtx exec_ctx; - self->Complete(std::move(status), num_bytes_requested); - }, - &pending_buffer, &read_args)) { - Complete(std::move(status), num_bytes_requested); - } - return; - } - result = status; - auto w = std::move(waker); - complete.store(true, std::memory_order_release); - w.Wakeup(); } } // namespace grpc_core diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD index b54c5258847..b5def4b13f8 100644 --- a/test/cpp/microbenchmarks/BUILD +++ b/test/cpp/microbenchmarks/BUILD @@ -323,6 +323,23 @@ grpc_cc_test( deps = [":fullstack_unary_ping_pong_h"], ) +grpc_cc_test( + name = "bm_fullstack_unary_ping_pong_chaotic_good", + size = "large", + srcs = [ + "bm_fullstack_unary_ping_pong_chaotic_good.cc", + ], + args = grpc_benchmark_args(), + tags = [ + "no_mac", # to emulate "excluded_poll_engines: poll" + "no_windows", + ], + deps = [ + ":fullstack_unary_ping_pong_h", + "//:grpcpp_chaotic_good", + ], +) + grpc_cc_test( name = "bm_chttp2_hpack", srcs = ["bm_chttp2_hpack.cc"], diff --git a/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong_chaotic_good.cc b/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong_chaotic_good.cc new file mode 100644 index 00000000000..01a0b719b4b --- /dev/null +++ b/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong_chaotic_good.cc @@ -0,0 +1,122 @@ +// +// +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +// Benchmark gRPC end2end in various configurations for chaotic good +// TODO(ctiller): fold back into bm_fullstack_unary_ping_pong.cc once chaotic +// good can run without custom experiment configuration. + +#include "src/cpp/ext/chaotic_good.h" +#include "test/core/util/test_config.h" +#include "test/cpp/microbenchmarks/fullstack_unary_ping_pong.h" +#include "test/cpp/util/test_config.h" + +namespace grpc { +namespace testing { + +class ChaoticGoodFixture : public BaseFixture { + public: + explicit ChaoticGoodFixture( + Service* service, + const FixtureConfiguration& config = FixtureConfiguration()) { + auto address = MakeAddress(&port_); + ServerBuilder b; + if (address.length() > 0) { + b.AddListeningPort(address, ChaoticGoodInsecureServerCredentials()); + } + cq_ = b.AddCompletionQueue(true); + b.RegisterService(service); + config.ApplyCommonServerBuilderConfig(&b); + server_ = b.BuildAndStart(); + ChannelArguments args; + config.ApplyCommonChannelArguments(&args); + if (address.length() > 0) { + channel_ = grpc::CreateCustomChannel( + address, ChaoticGoodInsecureChannelCredentials(), args); + } else { + channel_ = server_->InProcessChannel(args); + } + } + + ~ChaoticGoodFixture() override { + server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0)); + cq_->Shutdown(); + void* tag; + bool ok; + while (cq_->Next(&tag, &ok)) { + } + grpc_recycle_unused_port(port_); + } + + ServerCompletionQueue* cq() { return cq_.get(); } + std::shared_ptr channel() { return channel_; } + + private: + static std::string MakeAddress(int* port) { + *port = grpc_pick_unused_port_or_die(); + std::stringstream addr; + addr << "localhost:" << *port; + return addr.str(); + } + + std::unique_ptr server_; + std::unique_ptr cq_; + std::shared_ptr channel_; + int port_; +}; + +//****************************************************************************** +// CONFIGURATIONS +// + +// Replace "benchmark::internal::Benchmark" with "::testing::Benchmark" to use +// internal microbenchmarking tooling +static void SweepSizesArgs(benchmark::internal::Benchmark* b) { + b->Args({0, 0}); + for (int i = 1; i <= 128 * 1024 * 1024; i *= 8) { + b->Args({i, 0}); + b->Args({0, i}); + b->Args({i, i}); + } +} + +BENCHMARK_TEMPLATE(BM_UnaryPingPong, ChaoticGoodFixture, NoOpMutator, + NoOpMutator) + ->Apply(SweepSizesArgs); + +} // namespace testing +} // namespace grpc + +// Some distros have RunSpecifiedBenchmarks under the benchmark namespace, +// and others do not. This allows us to support both modes. +namespace benchmark { +void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } +} // namespace benchmark + +int main(int argc, char** argv) { + grpc_core::ForceEnableExperiment("event_engine_client", true); + grpc_core::ForceEnableExperiment("event_engine_listener", true); + grpc_core::ForceEnableExperiment("promise_based_client_call", true); + grpc_core::ForceEnableExperiment("promise_based_server_call", true); + grpc_core::ForceEnableExperiment("chaotic_good", true); + grpc::testing::TestEnvironment env(&argc, argv); + LibraryInitializer libInit; + ::benchmark::Initialize(&argc, argv); + grpc::testing::InitTest(&argc, &argv, false); + benchmark::RunTheBenchmarksNamespaced(); + return 0; +} diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h index e2f9cfefc6d..40f1a032906 100644 --- a/test/cpp/microbenchmarks/fullstack_fixtures.h +++ b/test/cpp/microbenchmarks/fullstack_fixtures.h @@ -19,6 +19,7 @@ #ifndef GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H #define GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H +#include #include #include #include @@ -54,6 +55,7 @@ class FixtureConfiguration { virtual void ApplyCommonChannelArguments(ChannelArguments* c) const { c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX); c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX); + c->SetInt(GRPC_ARG_ENABLE_RETRIES, 0); c->SetResourceQuota(ResourceQuota()); }