[chaotic-good] Add a microbenchmark for ping pong round trips (#36050)

also:
- remove tail recursion from promise endpoint read completion (actually overflowed stack!)
- remove retry filter from benchmark - we probably don't want this long term, but for now nobody else is using this benchmark and our use case doesn't use grpc retries so.... good enough

Closes #36050

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36050 from ctiller:cgbm 65b1c26767
PiperOrigin-RevId: 612577071
pull/36052/head
Craig Tiller 9 months ago committed by Copybara-Service
parent c43f1a63b0
commit 427c8a89e9
  1. 79
      src/core/lib/transport/promise_endpoint.cc
  2. 17
      test/cpp/microbenchmarks/BUILD
  3. 122
      test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong_chaotic_good.cc
  4. 2
      test/cpp/microbenchmarks/fullstack_fixtures.h

@ -58,50 +58,53 @@ PromiseEndpoint::GetLocalAddress() const {
}
void PromiseEndpoint::ReadState::Complete(absl::Status status,
size_t num_bytes_requested) {
if (!status.ok()) {
// Invalidates all previous reads.
pending_buffer.Clear();
buffer.Clear();
const size_t num_bytes_requested) {
while (true) {
if (!status.ok()) {
// Invalidates all previous reads.
pending_buffer.Clear();
buffer.Clear();
result = status;
auto w = std::move(waker);
complete.store(true, std::memory_order_release);
w.Wakeup();
return;
}
// Appends `pending_buffer` to `buffer`.
pending_buffer.MoveFirstNBytesIntoSliceBuffer(pending_buffer.Length(),
buffer);
GPR_DEBUG_ASSERT(pending_buffer.Count() == 0u);
if (buffer.Length() < num_bytes_requested) {
// A further read is needed.
// Set read args with number of bytes needed as hint.
grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs
read_args = {
static_cast<int64_t>(num_bytes_requested - buffer.Length())};
// If `Read()` returns true immediately, the callback will not be
// called. We still need to call our callback to pick up the result and
// maybe do further reads.
auto ep = endpoint.lock();
if (ep == nullptr) {
status = absl::UnavailableError("Endpoint closed during read.");
continue;
}
if (ep->Read(
[self = Ref(), num_bytes_requested](absl::Status status) {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->Complete(std::move(status), num_bytes_requested);
},
&pending_buffer, &read_args)) {
continue;
}
return;
}
result = status;
auto w = std::move(waker);
complete.store(true, std::memory_order_release);
w.Wakeup();
return;
}
// Appends `pending_buffer` to `buffer`.
pending_buffer.MoveFirstNBytesIntoSliceBuffer(pending_buffer.Length(),
buffer);
GPR_DEBUG_ASSERT(pending_buffer.Count() == 0u);
if (buffer.Length() < num_bytes_requested) {
// A further read is needed.
// Set read args with number of bytes needed as hint.
grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs read_args =
{static_cast<int64_t>(num_bytes_requested - buffer.Length())};
// If `Read()` returns true immediately, the callback will not be
// called. We still need to call our callback to pick up the result and
// maybe do further reads.
auto ep = endpoint.lock();
if (ep == nullptr) {
Complete(absl::UnavailableError("Endpoint closed during read."),
num_bytes_requested);
return;
}
if (ep->Read(
[self = Ref(), num_bytes_requested](absl::Status status) {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->Complete(std::move(status), num_bytes_requested);
},
&pending_buffer, &read_args)) {
Complete(std::move(status), num_bytes_requested);
}
return;
}
result = status;
auto w = std::move(waker);
complete.store(true, std::memory_order_release);
w.Wakeup();
}
} // namespace grpc_core

@ -323,6 +323,23 @@ grpc_cc_test(
deps = [":fullstack_unary_ping_pong_h"],
)
grpc_cc_test(
name = "bm_fullstack_unary_ping_pong_chaotic_good",
size = "large",
srcs = [
"bm_fullstack_unary_ping_pong_chaotic_good.cc",
],
args = grpc_benchmark_args(),
tags = [
"no_mac", # to emulate "excluded_poll_engines: poll"
"no_windows",
],
deps = [
":fullstack_unary_ping_pong_h",
"//:grpcpp_chaotic_good",
],
)
grpc_cc_test(
name = "bm_chttp2_hpack",
srcs = ["bm_chttp2_hpack.cc"],

@ -0,0 +1,122 @@
//
//
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
// Benchmark gRPC end2end in various configurations for chaotic good
// TODO(ctiller): fold back into bm_fullstack_unary_ping_pong.cc once chaotic
// good can run without custom experiment configuration.
#include "src/cpp/ext/chaotic_good.h"
#include "test/core/util/test_config.h"
#include "test/cpp/microbenchmarks/fullstack_unary_ping_pong.h"
#include "test/cpp/util/test_config.h"
namespace grpc {
namespace testing {
class ChaoticGoodFixture : public BaseFixture {
public:
explicit ChaoticGoodFixture(
Service* service,
const FixtureConfiguration& config = FixtureConfiguration()) {
auto address = MakeAddress(&port_);
ServerBuilder b;
if (address.length() > 0) {
b.AddListeningPort(address, ChaoticGoodInsecureServerCredentials());
}
cq_ = b.AddCompletionQueue(true);
b.RegisterService(service);
config.ApplyCommonServerBuilderConfig(&b);
server_ = b.BuildAndStart();
ChannelArguments args;
config.ApplyCommonChannelArguments(&args);
if (address.length() > 0) {
channel_ = grpc::CreateCustomChannel(
address, ChaoticGoodInsecureChannelCredentials(), args);
} else {
channel_ = server_->InProcessChannel(args);
}
}
~ChaoticGoodFixture() override {
server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
cq_->Shutdown();
void* tag;
bool ok;
while (cq_->Next(&tag, &ok)) {
}
grpc_recycle_unused_port(port_);
}
ServerCompletionQueue* cq() { return cq_.get(); }
std::shared_ptr<Channel> channel() { return channel_; }
private:
static std::string MakeAddress(int* port) {
*port = grpc_pick_unused_port_or_die();
std::stringstream addr;
addr << "localhost:" << *port;
return addr.str();
}
std::unique_ptr<Server> server_;
std::unique_ptr<ServerCompletionQueue> cq_;
std::shared_ptr<Channel> channel_;
int port_;
};
//******************************************************************************
// CONFIGURATIONS
//
// Replace "benchmark::internal::Benchmark" with "::testing::Benchmark" to use
// internal microbenchmarking tooling
static void SweepSizesArgs(benchmark::internal::Benchmark* b) {
b->Args({0, 0});
for (int i = 1; i <= 128 * 1024 * 1024; i *= 8) {
b->Args({i, 0});
b->Args({0, i});
b->Args({i, i});
}
}
BENCHMARK_TEMPLATE(BM_UnaryPingPong, ChaoticGoodFixture, NoOpMutator,
NoOpMutator)
->Apply(SweepSizesArgs);
} // namespace testing
} // namespace grpc
// Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
// and others do not. This allows us to support both modes.
namespace benchmark {
void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
} // namespace benchmark
int main(int argc, char** argv) {
grpc_core::ForceEnableExperiment("event_engine_client", true);
grpc_core::ForceEnableExperiment("event_engine_listener", true);
grpc_core::ForceEnableExperiment("promise_based_client_call", true);
grpc_core::ForceEnableExperiment("promise_based_server_call", true);
grpc_core::ForceEnableExperiment("chaotic_good", true);
grpc::testing::TestEnvironment env(&argc, argv);
LibraryInitializer libInit;
::benchmark::Initialize(&argc, argv);
grpc::testing::InitTest(&argc, &argv, false);
benchmark::RunTheBenchmarksNamespaced();
return 0;
}

@ -19,6 +19,7 @@
#ifndef GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H
#define GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H
#include <grpc/grpc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpcpp/channel.h>
@ -54,6 +55,7 @@ class FixtureConfiguration {
virtual void ApplyCommonChannelArguments(ChannelArguments* c) const {
c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX);
c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX);
c->SetInt(GRPC_ARG_ENABLE_RETRIES, 0);
c->SetResourceQuota(ResourceQuota());
}

Loading…
Cancel
Save