mirror of https://github.com/grpc/grpc.git
[call-v3] Begin adding benchmarks (#36946)
Closes #36946
COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36946 from ctiller:transport-refs-10 16f50573e7
PiperOrigin-RevId: 645243962
pull/37002/head
parent
ff54357fe6
commit
23adb994cf
28 changed files with 1753 additions and 27 deletions
@ -0,0 +1,192 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <benchmark/benchmark.h> |
||||
|
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "src/core/lib/event_engine/default_event_engine.h" |
||||
#include "src/core/lib/promise/all_ok.h" |
||||
#include "src/core/lib/promise/map.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/resource_quota/resource_quota.h" |
||||
#include "src/core/lib/slice/slice.h" |
||||
#include "src/core/lib/surface/client_call.h" |
||||
#include "src/core/lib/transport/call_arena_allocator.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace { |
||||
|
||||
class TestCallDestination : public UnstartedCallDestination { |
||||
public: |
||||
void StartCall(UnstartedCallHandler handler) override { |
||||
handler_ = std::move(handler); |
||||
} |
||||
|
||||
UnstartedCallHandler TakeHandler() { |
||||
CHECK(handler_.has_value()); |
||||
auto handler = std::move(*handler_); |
||||
handler_.reset(); |
||||
return handler; |
||||
} |
||||
|
||||
void Orphaned() override { handler_.reset(); } |
||||
|
||||
private: |
||||
absl::optional<UnstartedCallHandler> handler_; |
||||
}; |
||||
|
||||
class Helper { |
||||
public: |
||||
~Helper() { |
||||
grpc_completion_queue_shutdown(cq_); |
||||
auto ev = grpc_completion_queue_next( |
||||
cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); |
||||
CHECK_EQ(ev.type, GRPC_QUEUE_SHUTDOWN); |
||||
grpc_completion_queue_destroy(cq_); |
||||
} |
||||
|
||||
auto MakeCall() { |
||||
return std::unique_ptr<grpc_call, void (*)(grpc_call*)>( |
||||
MakeClientCall(nullptr, 0, cq_, path_.Copy(), absl::nullopt, true, |
||||
Timestamp::InfFuture(), compression_options_, |
||||
event_engine_.get(), arena_allocator_->MakeArena(), |
||||
destination_), |
||||
grpc_call_unref); |
||||
} |
||||
|
||||
UnstartedCallHandler TakeHandler() { return destination_->TakeHandler(); } |
||||
|
||||
grpc_completion_queue* cq() { return cq_; } |
||||
|
||||
private: |
||||
grpc_completion_queue* cq_ = grpc_completion_queue_create_for_next(nullptr); |
||||
Slice path_ = Slice::FromStaticString("/foo/bar"); |
||||
const grpc_compression_options compression_options_ = { |
||||
1, |
||||
{0, GRPC_COMPRESS_LEVEL_NONE}, |
||||
{0, GRPC_COMPRESS_NONE}, |
||||
}; |
||||
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_ = |
||||
grpc_event_engine::experimental::GetDefaultEventEngine(); |
||||
RefCountedPtr<CallArenaAllocator> arena_allocator_ = |
||||
MakeRefCounted<CallArenaAllocator>( |
||||
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( |
||||
"test-allocator"), |
||||
1024); |
||||
RefCountedPtr<TestCallDestination> destination_ = |
||||
MakeRefCounted<TestCallDestination>(); |
||||
}; |
||||
|
||||
void BM_CreateDestroy(benchmark::State& state) { |
||||
Helper helper; |
||||
for (auto _ : state) { |
||||
helper.MakeCall(); |
||||
} |
||||
} |
||||
BENCHMARK(BM_CreateDestroy); |
||||
|
||||
void BM_Unary(benchmark::State& state) { |
||||
Helper helper; |
||||
grpc_slice request_payload_slice = grpc_slice_from_static_string("hello"); |
||||
grpc_byte_buffer* request_payload = |
||||
grpc_raw_byte_buffer_create(&request_payload_slice, 1); |
||||
grpc_status_code status; |
||||
grpc_slice status_details; |
||||
grpc_metadata_array initial_metadata_recv; |
||||
grpc_metadata_array trailing_metadata_recv; |
||||
Slice response_payload = Slice::FromStaticString("world"); |
||||
grpc_byte_buffer* recv_response_payload = nullptr; |
||||
for (auto _ : state) { |
||||
auto call = helper.MakeCall(); |
||||
// Create ops the old school way to avoid any overheads
|
||||
grpc_op ops[6]; |
||||
memset(ops, 0, sizeof(ops)); |
||||
grpc_metadata_array_init(&initial_metadata_recv); |
||||
grpc_metadata_array_init(&trailing_metadata_recv); |
||||
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; |
||||
ops[0].data.send_initial_metadata.count = 0; |
||||
ops[1].op = GRPC_OP_SEND_MESSAGE; |
||||
ops[1].data.send_message.send_message = request_payload; |
||||
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; |
||||
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; |
||||
ops[3].data.recv_initial_metadata.recv_initial_metadata = |
||||
&initial_metadata_recv; |
||||
ops[4].op = GRPC_OP_RECV_MESSAGE; |
||||
ops[4].data.recv_message.recv_message = &recv_response_payload; |
||||
ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT; |
||||
ops[5].data.recv_status_on_client.status = &status; |
||||
ops[5].data.recv_status_on_client.status_details = &status_details; |
||||
ops[5].data.recv_status_on_client.trailing_metadata = |
||||
&trailing_metadata_recv; |
||||
grpc_call_start_batch(call.get(), ops, 6, reinterpret_cast<void*>(1), |
||||
nullptr); |
||||
// Now fetch the handler at the other side, retrieve the request, and poke
|
||||
// back a response.
|
||||
auto unstarted_handler = helper.TakeHandler(); |
||||
unstarted_handler.SpawnInfallible("run_handler", [&]() mutable { |
||||
auto handler = unstarted_handler.StartWithEmptyFilterStack(); |
||||
handler.PushServerInitialMetadata(Arena::MakePooled<ServerMetadata>()); |
||||
auto response = |
||||
Arena::MakePooled<Message>(SliceBuffer(response_payload.Copy()), 0); |
||||
return Map( |
||||
AllOk<StatusFlag>( |
||||
Map(handler.PullClientInitialMetadata(), |
||||
[](ValueOrFailure<ClientMetadataHandle> status) { |
||||
return status.status(); |
||||
}), |
||||
Map(handler.PullMessage(), |
||||
[](ValueOrFailure<absl::optional<MessageHandle>> message) { |
||||
return message.status(); |
||||
}), |
||||
handler.PushMessage(std::move(response))), |
||||
[handler](StatusFlag status) mutable { |
||||
CHECK(status.ok()); |
||||
auto trailing_metadata = Arena::MakePooled<ServerMetadata>(); |
||||
trailing_metadata->Set(GrpcStatusMetadata(), GRPC_STATUS_OK); |
||||
handler.PushServerTrailingMetadata(std::move(trailing_metadata)); |
||||
return Empty{}; |
||||
}); |
||||
}); |
||||
auto ev = grpc_completion_queue_next( |
||||
helper.cq(), gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); |
||||
CHECK_EQ(ev.type, GRPC_OP_COMPLETE); |
||||
call.reset(); |
||||
grpc_byte_buffer_destroy(recv_response_payload); |
||||
grpc_metadata_array_destroy(&initial_metadata_recv); |
||||
grpc_metadata_array_destroy(&trailing_metadata_recv); |
||||
} |
||||
grpc_byte_buffer_destroy(request_payload); |
||||
} |
||||
BENCHMARK(BM_Unary); |
||||
|
||||
} // namespace
|
||||
} // namespace grpc_core
|
||||
|
||||
// Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
|
||||
// and others do not. This allows us to support both modes.
|
||||
namespace benchmark { |
||||
void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } |
||||
} // namespace benchmark
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::benchmark::Initialize(&argc, argv); |
||||
grpc_init(); |
||||
{ |
||||
auto ee = grpc_event_engine::experimental::GetDefaultEventEngine(); |
||||
benchmark::RunTheBenchmarksNamespaced(); |
||||
} |
||||
grpc_shutdown(); |
||||
return 0; |
||||
} |
@ -0,0 +1,200 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <benchmark/benchmark.h> |
||||
|
||||
#include "absl/memory/memory.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "src/core/client_channel/client_channel.h" |
||||
#include "src/core/lib/address_utils/parse_address.h" |
||||
#include "test/core/transport/call_spine_benchmarks.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
namespace { |
||||
const Slice kTestPath = Slice::FromExternalString("/foo/bar"); |
||||
} |
||||
|
||||
class ClientChannelTraits { |
||||
public: |
||||
RefCountedPtr<UnstartedCallDestination> CreateCallDestination( |
||||
RefCountedPtr<UnstartedCallDestination> final_destination) { |
||||
call_destination_factory_ = std::make_unique<TestCallDestinationFactory>( |
||||
std::move(final_destination)); |
||||
auto channel = ClientChannel::Create( |
||||
"test:///target", |
||||
ChannelArgs() |
||||
.SetObject(&client_channel_factory_) |
||||
.SetObject(call_destination_factory_.get()) |
||||
.SetObject(ResourceQuota::Default()) |
||||
.SetObject(grpc_event_engine::experimental::GetDefaultEventEngine()) |
||||
.Set(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, true) |
||||
// TODO(ctiller): remove once v3 supports retries
|
||||
.Set(GRPC_ARG_ENABLE_RETRIES, 0)); |
||||
CHECK_OK(channel); |
||||
return std::move(*channel); |
||||
} |
||||
|
||||
ClientMetadataHandle MakeClientInitialMetadata() { |
||||
auto md = Arena::MakePooled<ClientMetadata>(); |
||||
md->Set(HttpPathMetadata(), kTestPath.Copy()); |
||||
return md; |
||||
} |
||||
|
||||
ServerMetadataHandle MakeServerInitialMetadata() { |
||||
return Arena::MakePooled<ServerMetadata>(); |
||||
} |
||||
|
||||
MessageHandle MakePayload() { return Arena::MakePooled<Message>(); } |
||||
|
||||
ServerMetadataHandle MakeServerTrailingMetadata() { |
||||
auto md = Arena::MakePooled<ServerMetadata>(); |
||||
return md; |
||||
} |
||||
|
||||
private: |
||||
class TestConnector final : public SubchannelConnector { |
||||
public: |
||||
void Connect(const Args&, Result*, grpc_closure* notify) override { |
||||
CHECK_EQ(notify_, nullptr); |
||||
notify_ = notify; |
||||
} |
||||
|
||||
void Shutdown(grpc_error_handle error) override { |
||||
if (notify_ != nullptr) ExecCtx::Run(DEBUG_LOCATION, notify_, error); |
||||
} |
||||
|
||||
private: |
||||
grpc_closure* notify_ = nullptr; |
||||
}; |
||||
|
||||
class TestClientChannelFactory final : public ClientChannelFactory { |
||||
public: |
||||
RefCountedPtr<Subchannel> CreateSubchannel( |
||||
const grpc_resolved_address& address, |
||||
const ChannelArgs& args) override { |
||||
return Subchannel::Create(MakeOrphanable<TestConnector>(), address, args); |
||||
} |
||||
}; |
||||
|
||||
class TestCallDestinationFactory final |
||||
: public ClientChannel::CallDestinationFactory { |
||||
public: |
||||
explicit TestCallDestinationFactory( |
||||
RefCountedPtr<UnstartedCallDestination> call_destination) |
||||
: call_destination_(std::move(call_destination)) {} |
||||
|
||||
RefCountedPtr<UnstartedCallDestination> CreateCallDestination( |
||||
ClientChannel::PickerObservable picker) override { |
||||
return call_destination_; |
||||
} |
||||
|
||||
private: |
||||
RefCountedPtr<UnstartedCallDestination> call_destination_; |
||||
}; |
||||
|
||||
std::unique_ptr<TestCallDestinationFactory> call_destination_factory_; |
||||
TestClientChannelFactory client_channel_factory_; |
||||
}; |
||||
GRPC_CALL_SPINE_BENCHMARK(UnstartedCallDestinationFixture<ClientChannelTraits>); |
||||
|
||||
namespace { |
||||
class TestResolver final : public Resolver { |
||||
public: |
||||
explicit TestResolver(ChannelArgs args, |
||||
std::unique_ptr<Resolver::ResultHandler> result_handler, |
||||
std::shared_ptr<WorkSerializer> work_serializer) |
||||
: args_(std::move(args)), |
||||
result_handler_(std::move(result_handler)), |
||||
work_serializer_(std::move(work_serializer)) {} |
||||
|
||||
void StartLocked() override { |
||||
work_serializer_->Run( |
||||
[self = RefAsSubclass<TestResolver>()] { |
||||
self->result_handler_->ReportResult( |
||||
self->MakeSuccessfulResolutionResult("ipv4:127.0.0.1:1234")); |
||||
}, |
||||
DEBUG_LOCATION); |
||||
} |
||||
void ShutdownLocked() override {} |
||||
|
||||
private: |
||||
Resolver::Result MakeSuccessfulResolutionResult( |
||||
absl::string_view endpoint_address) { |
||||
Resolver::Result result; |
||||
result.args = args_; |
||||
grpc_resolved_address address; |
||||
CHECK(grpc_parse_uri(URI::Parse(endpoint_address).value(), &address)); |
||||
result.addresses = EndpointAddressesList({EndpointAddresses{address, {}}}); |
||||
return result; |
||||
} |
||||
|
||||
const ChannelArgs args_; |
||||
const std::unique_ptr<Resolver::ResultHandler> result_handler_; |
||||
const std::shared_ptr<WorkSerializer> work_serializer_; |
||||
}; |
||||
|
||||
class TestResolverFactory final : public ResolverFactory { |
||||
public: |
||||
OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override { |
||||
return MakeOrphanable<TestResolver>(std::move(args.args), |
||||
std::move(args.result_handler), |
||||
std::move(args.work_serializer)); |
||||
} |
||||
|
||||
absl::string_view scheme() const override { return "test"; } |
||||
bool IsValidUri(const URI&) const override { return true; } |
||||
}; |
||||
|
||||
void BM_CreateClientChannel(benchmark::State& state) { |
||||
class FinalDestination : public UnstartedCallDestination { |
||||
public: |
||||
void StartCall(UnstartedCallHandler) override {} |
||||
void Orphaned() override {} |
||||
}; |
||||
ClientChannelTraits traits; |
||||
auto final_destination = MakeRefCounted<FinalDestination>(); |
||||
for (auto _ : state) { |
||||
traits.CreateCallDestination(final_destination); |
||||
} |
||||
} |
||||
BENCHMARK(BM_CreateClientChannel); |
||||
|
||||
} // namespace
|
||||
} // namespace grpc_core
|
||||
|
||||
// Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
|
||||
// and others do not. This allows us to support both modes.
|
||||
namespace benchmark { |
||||
void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } |
||||
} // namespace benchmark
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::benchmark::Initialize(&argc, argv); |
||||
grpc_core::CoreConfiguration::RegisterBuilder( |
||||
[](grpc_core::CoreConfiguration::Builder* builder) { |
||||
builder->resolver_registry()->RegisterResolverFactory( |
||||
std::make_unique<grpc_core::TestResolverFactory>()); |
||||
}); |
||||
grpc_init(); |
||||
{ |
||||
auto ee = grpc_event_engine::experimental::GetDefaultEventEngine(); |
||||
benchmark::RunTheBenchmarksNamespaced(); |
||||
} |
||||
grpc_shutdown(); |
||||
return 0; |
||||
} |
@ -0,0 +1,86 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <benchmark/benchmark.h> |
||||
|
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "src/core/ext/filters/http/client/http_client_filter.h" |
||||
#include "src/core/lib/event_engine/default_event_engine.h" |
||||
#include "src/core/lib/transport/connectivity_state.h" |
||||
#include "src/core/lib/transport/metadata.h" |
||||
#include "src/core/lib/transport/transport.h" |
||||
#include "test/core/transport/call_spine_benchmarks.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class HttpClientFilterTraits { |
||||
public: |
||||
using Filter = HttpClientFilter; |
||||
|
||||
ChannelArgs MakeChannelArgs() { return ChannelArgs().SetObject(&transport_); } |
||||
|
||||
ClientMetadataHandle MakeClientInitialMetadata() { |
||||
return Arena::MakePooled<ClientMetadata>(); |
||||
} |
||||
|
||||
ServerMetadataHandle MakeServerInitialMetadata() { |
||||
return Arena::MakePooled<ServerMetadata>(); |
||||
} |
||||
|
||||
MessageHandle MakePayload() { return Arena::MakePooled<Message>(); } |
||||
|
||||
ServerMetadataHandle MakeServerTrailingMetadata() { |
||||
auto md = Arena::MakePooled<ServerMetadata>(); |
||||
md->Set(HttpStatusMetadata(), 200); |
||||
return md; |
||||
} |
||||
|
||||
private: |
||||
class FakeTransport final : public Transport { |
||||
public: |
||||
FilterStackTransport* filter_stack_transport() override { return nullptr; } |
||||
ClientTransport* client_transport() override { return nullptr; } |
||||
ServerTransport* server_transport() override { return nullptr; } |
||||
absl::string_view GetTransportName() const override { return "fake-http"; } |
||||
void SetPollset(grpc_stream*, grpc_pollset*) override {} |
||||
void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {} |
||||
void PerformOp(grpc_transport_op*) override {} |
||||
void Orphan() override {} |
||||
}; |
||||
|
||||
FakeTransport transport_; |
||||
}; |
||||
GRPC_CALL_SPINE_BENCHMARK(FilterFixture<HttpClientFilterTraits>); |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
// Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
|
||||
// and others do not. This allows us to support both modes.
|
||||
namespace benchmark { |
||||
void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } |
||||
} // namespace benchmark
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::benchmark::Initialize(&argc, argv); |
||||
grpc_init(); |
||||
{ |
||||
auto ee = grpc_event_engine::experimental::GetDefaultEventEngine(); |
||||
benchmark::RunTheBenchmarksNamespaced(); |
||||
} |
||||
grpc_shutdown(); |
||||
return 0; |
||||
} |
@ -0,0 +1,110 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <benchmark/benchmark.h> |
||||
|
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "src/core/lib/event_engine/default_event_engine.h" |
||||
#include "src/core/lib/promise/party.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace { |
||||
|
||||
class TestParty final : public Party { |
||||
public: |
||||
TestParty() : Party(1) {} |
||||
~TestParty() override {} |
||||
std::string DebugTag() const override { return "TestParty"; } |
||||
|
||||
using Party::IncrementRefCount; |
||||
using Party::Unref; |
||||
|
||||
bool RunParty() override { |
||||
promise_detail::Context<grpc_event_engine::experimental::EventEngine> |
||||
ee_ctx(ee_.get()); |
||||
return Party::RunParty(); |
||||
} |
||||
|
||||
void PartyOver() override { |
||||
{ |
||||
promise_detail::Context<grpc_event_engine::experimental::EventEngine> |
||||
ee_ctx(ee_.get()); |
||||
CancelRemainingParticipants(); |
||||
} |
||||
delete this; |
||||
} |
||||
|
||||
private: |
||||
grpc_event_engine::experimental::EventEngine* event_engine() const final { |
||||
return ee_.get(); |
||||
} |
||||
|
||||
std::shared_ptr<grpc_event_engine::experimental::EventEngine> ee_ = |
||||
grpc_event_engine::experimental::GetDefaultEventEngine(); |
||||
}; |
||||
|
||||
void BM_PartyCreate(benchmark::State& state) { |
||||
for (auto _ : state) { |
||||
auto* party = new TestParty(); |
||||
party->Unref(); |
||||
} |
||||
} |
||||
BENCHMARK(BM_PartyCreate); |
||||
|
||||
void BM_AddParticipant(benchmark::State& state) { |
||||
auto* party = new TestParty(); |
||||
for (auto _ : state) { |
||||
party->Spawn( |
||||
"participant", []() { return Success{}; }, [](StatusFlag) {}); |
||||
} |
||||
party->Unref(); |
||||
} |
||||
BENCHMARK(BM_AddParticipant); |
||||
|
||||
void BM_WakeupParticipant(benchmark::State& state) { |
||||
auto* party = new TestParty(); |
||||
party->Spawn( |
||||
"driver", |
||||
[&state, w = IntraActivityWaiter()]() mutable -> Poll<StatusFlag> { |
||||
w.pending(); |
||||
if (state.KeepRunning()) { |
||||
w.Wake(); |
||||
return Pending{}; |
||||
} |
||||
return Success{}; |
||||
}, |
||||
[party](StatusFlag) { party->Unref(); }); |
||||
} |
||||
BENCHMARK(BM_WakeupParticipant); |
||||
|
||||
} // namespace
|
||||
} // namespace grpc_core
|
||||
|
||||
// Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
|
||||
// and others do not. This allows us to support both modes.
|
||||
namespace benchmark { |
||||
void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } |
||||
} // namespace benchmark
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::benchmark::Initialize(&argc, argv); |
||||
grpc_init(); |
||||
{ |
||||
auto ee = grpc_event_engine::experimental::GetDefaultEventEngine(); |
||||
benchmark::RunTheBenchmarksNamespaced(); |
||||
} |
||||
grpc_shutdown(); |
||||
return 0; |
||||
} |
@ -0,0 +1,121 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <benchmark/benchmark.h> |
||||
|
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "src/core/lib/event_engine/default_event_engine.h" |
||||
#include "src/core/lib/resource_quota/resource_quota.h" |
||||
#include "test/core/transport/call_spine_benchmarks.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class CallSpineFixture { |
||||
public: |
||||
BenchmarkCall MakeCall() { |
||||
auto p = MakeCallPair(Arena::MakePooled<ClientMetadata>(), |
||||
event_engine_.get(), arena_allocator_->MakeArena()); |
||||
return {std::move(p.initiator), p.handler.StartCall(stack_)}; |
||||
} |
||||
|
||||
ServerMetadataHandle MakeServerInitialMetadata() { |
||||
return Arena::MakePooled<ServerMetadata>(); |
||||
} |
||||
|
||||
MessageHandle MakePayload() { return Arena::MakePooled<Message>(); } |
||||
|
||||
ServerMetadataHandle MakeServerTrailingMetadata() { |
||||
return Arena::MakePooled<ServerMetadata>(); |
||||
} |
||||
|
||||
private: |
||||
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_ = |
||||
grpc_event_engine::experimental::GetDefaultEventEngine(); |
||||
RefCountedPtr<CallArenaAllocator> arena_allocator_ = |
||||
MakeRefCounted<CallArenaAllocator>( |
||||
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( |
||||
"test-allocator"), |
||||
1024); |
||||
RefCountedPtr<CallFilters::Stack> stack_ = |
||||
CallFilters::StackBuilder().Build(); |
||||
}; |
||||
GRPC_CALL_SPINE_BENCHMARK(CallSpineFixture); |
||||
|
||||
class ForwardCallFixture { |
||||
public: |
||||
BenchmarkCall MakeCall() { |
||||
auto p1 = MakeCallPair(Arena::MakePooled<ClientMetadata>(), |
||||
event_engine_.get(), arena_allocator_->MakeArena()); |
||||
auto p2 = MakeCallPair(Arena::MakePooled<ClientMetadata>(), |
||||
event_engine_.get(), arena_allocator_->MakeArena()); |
||||
p1.handler.SpawnInfallible("initial_metadata", [&]() { |
||||
auto p1_handler = p1.handler.StartCall(stack_); |
||||
return Map( |
||||
p1_handler.PullClientInitialMetadata(), |
||||
[p1_handler, &p2](ValueOrFailure<ClientMetadataHandle> md) mutable { |
||||
CHECK(md.ok()); |
||||
ForwardCall(std::move(p1_handler), std::move(p2.initiator)); |
||||
return Empty{}; |
||||
}); |
||||
}); |
||||
absl::optional<CallHandler> p2_handler; |
||||
p2.handler.SpawnInfallible("start", [&]() { |
||||
p2_handler = p2.handler.StartCall(stack_); |
||||
return Empty{}; |
||||
}); |
||||
return {std::move(p1.initiator), std::move(*p2_handler)}; |
||||
} |
||||
|
||||
ServerMetadataHandle MakeServerInitialMetadata() { |
||||
return Arena::MakePooled<ServerMetadata>(); |
||||
} |
||||
|
||||
MessageHandle MakePayload() { return Arena::MakePooled<Message>(); } |
||||
|
||||
ServerMetadataHandle MakeServerTrailingMetadata() { |
||||
return Arena::MakePooled<ServerMetadata>(); |
||||
} |
||||
|
||||
private: |
||||
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_ = |
||||
grpc_event_engine::experimental::GetDefaultEventEngine(); |
||||
RefCountedPtr<CallArenaAllocator> arena_allocator_ = |
||||
MakeRefCounted<CallArenaAllocator>( |
||||
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( |
||||
"test-allocator"), |
||||
1024); |
||||
RefCountedPtr<CallFilters::Stack> stack_ = |
||||
CallFilters::StackBuilder().Build(); |
||||
}; |
||||
GRPC_CALL_SPINE_BENCHMARK(ForwardCallFixture); |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
// Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
|
||||
// and others do not. This allows us to support both modes.
|
||||
namespace benchmark { |
||||
void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } |
||||
} // namespace benchmark
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::benchmark::Initialize(&argc, argv); |
||||
grpc_init(); |
||||
{ |
||||
auto ee = grpc_event_engine::experimental::GetDefaultEventEngine(); |
||||
benchmark::RunTheBenchmarksNamespaced(); |
||||
} |
||||
grpc_shutdown(); |
||||
return 0; |
||||
} |
@ -0,0 +1,367 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_TEST_CORE_TRANSPORT_CALL_SPINE_BENCHMARKS_H |
||||
#define GRPC_TEST_CORE_TRANSPORT_CALL_SPINE_BENCHMARKS_H |
||||
|
||||
#include <memory> |
||||
|
||||
#include "benchmark/benchmark.h" |
||||
|
||||
#include "src/core/lib/event_engine/default_event_engine.h" |
||||
#include "src/core/lib/gprpp/notification.h" |
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
#include "src/core/lib/promise/all_ok.h" |
||||
#include "src/core/lib/promise/map.h" |
||||
#include "src/core/lib/resource_quota/resource_quota.h" |
||||
#include "src/core/lib/transport/call_spine.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
struct BenchmarkCall { |
||||
CallInitiator initiator; |
||||
CallHandler handler; |
||||
}; |
||||
|
||||
// Unary call with one spawn on each end of the spine.
|
||||
template <typename Fixture> |
||||
void BM_UnaryWithSpawnPerEnd(benchmark::State& state) { |
||||
Fixture fixture; |
||||
for (auto _ : state) { |
||||
Notification handler_done; |
||||
Notification initiator_done; |
||||
{ |
||||
ExecCtx exec_ctx; |
||||
BenchmarkCall call = fixture.MakeCall(); |
||||
call.handler.SpawnInfallible("handler", [handler = call.handler, &fixture, |
||||
&handler_done]() mutable { |
||||
handler.PushServerInitialMetadata(fixture.MakeServerInitialMetadata()); |
||||
return Map( |
||||
AllOk<StatusFlag>( |
||||
Map(handler.PullClientInitialMetadata(), |
||||
[](ValueOrFailure<ClientMetadataHandle> md) { |
||||
return md.status(); |
||||
}), |
||||
Map(handler.PullMessage(), |
||||
[](ValueOrFailure<absl::optional<MessageHandle>> msg) { |
||||
return msg.status(); |
||||
}), |
||||
handler.PushMessage(fixture.MakePayload())), |
||||
[&handler_done, &fixture, handler](StatusFlag status) mutable { |
||||
CHECK(status.ok()); |
||||
handler.PushServerTrailingMetadata( |
||||
fixture.MakeServerTrailingMetadata()); |
||||
handler_done.Notify(); |
||||
return Empty{}; |
||||
}); |
||||
}); |
||||
call.initiator.SpawnInfallible( |
||||
"initiator", |
||||
[initiator = call.initiator, &fixture, &initiator_done]() mutable { |
||||
return Map( |
||||
AllOk<StatusFlag>( |
||||
Map(initiator.PushMessage(fixture.MakePayload()), |
||||
[](StatusFlag) { return Success{}; }), |
||||
Map(initiator.PullServerInitialMetadata(), |
||||
[](absl::optional<ServerMetadataHandle> md) { |
||||
return Success{}; |
||||
}), |
||||
Map(initiator.PullMessage(), |
||||
[](ValueOrFailure<absl::optional<MessageHandle>> msg) { |
||||
return msg.status(); |
||||
}), |
||||
Map(initiator.PullServerTrailingMetadata(), |
||||
[](ServerMetadataHandle) { return Success(); })), |
||||
[&initiator_done](StatusFlag result) { |
||||
CHECK(result.ok()); |
||||
initiator_done.Notify(); |
||||
return Empty{}; |
||||
}); |
||||
}); |
||||
} |
||||
handler_done.WaitForNotification(); |
||||
initiator_done.WaitForNotification(); |
||||
} |
||||
} |
||||
|
||||
// Unary call with one promise spawned per operation on the spine.
|
||||
// It's a little unclear what the optimum should be between the above variant
|
||||
// and this: whilst a spawn per end minimizes the number of spawns we need to
|
||||
// do, a spawn per operation can conceivably (but not at the time of writing)
|
||||
// minimize the number of internal wakeups in the parties.
|
||||
// For now we track both.
|
||||
template <typename Fixture> |
||||
void BM_UnaryWithSpawnPerOp(benchmark::State& state) { |
||||
Fixture fixture; |
||||
for (auto _ : state) { |
||||
BenchmarkCall call = fixture.MakeCall(); |
||||
Notification handler_done; |
||||
Notification initiator_done; |
||||
{ |
||||
ExecCtx exec_ctx; |
||||
Party::BulkSpawner handler_spawner(call.handler.party()); |
||||
Party::BulkSpawner initiator_spawner(call.initiator.party()); |
||||
handler_spawner.Spawn( |
||||
"HANDLER:PushServerInitialMetadata", |
||||
[&]() { |
||||
call.handler.PushServerInitialMetadata( |
||||
fixture.MakeServerInitialMetadata()); |
||||
return Empty{}; |
||||
}, |
||||
[](Empty) {}); |
||||
handler_spawner.Spawn( |
||||
"HANDLER:PullClientInitialMetadata", |
||||
[&]() { return call.handler.PullClientInitialMetadata(); }, |
||||
[](ValueOrFailure<ClientMetadataHandle> md) { CHECK(md.ok()); }); |
||||
handler_spawner.Spawn( |
||||
"HANDLER:PullMessage", [&]() { return call.handler.PullMessage(); }, |
||||
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) { |
||||
CHECK(msg.ok()); |
||||
call.handler.SpawnInfallible( |
||||
"HANDLER:PushServerTrailingMetadata", [&]() { |
||||
call.handler.PushServerTrailingMetadata( |
||||
fixture.MakeServerTrailingMetadata()); |
||||
handler_done.Notify(); |
||||
return Empty{}; |
||||
}); |
||||
}); |
||||
handler_spawner.Spawn( |
||||
"HANDLER:PushMessage", |
||||
[&]() { return call.handler.PushMessage(fixture.MakePayload()); }, |
||||
[](StatusFlag) {}); |
||||
|
||||
initiator_spawner.Spawn( |
||||
"INITIATOR:PushMessage", |
||||
[&]() { return call.initiator.PushMessage(fixture.MakePayload()); }, |
||||
[](StatusFlag) {}); |
||||
initiator_spawner.Spawn( |
||||
"INITIATOR:PullServerInitialMetadata", |
||||
[&]() { return call.initiator.PullServerInitialMetadata(); }, |
||||
[](absl::optional<ServerMetadataHandle> md) { |
||||
CHECK(md.has_value()); |
||||
}); |
||||
initiator_spawner.Spawn( |
||||
"INITIATOR:PullMessage", |
||||
[&]() { return call.initiator.PullMessage(); }, |
||||
[](ValueOrFailure<absl::optional<MessageHandle>> msg) { |
||||
CHECK(msg.ok()); |
||||
}); |
||||
initiator_spawner.Spawn( |
||||
"INITIATOR:PullServerTrailingMetadata", |
||||
[&]() { return call.initiator.PullServerTrailingMetadata(); }, |
||||
[&](ServerMetadataHandle md) { |
||||
initiator_done.Notify(); |
||||
return Empty{}; |
||||
}); |
||||
} |
||||
handler_done.WaitForNotification(); |
||||
initiator_done.WaitForNotification(); |
||||
} |
||||
} |
||||
|
||||
template <typename Fixture> |
||||
void BM_ClientToServerStreaming(benchmark::State& state) { |
||||
Fixture fixture; |
||||
BenchmarkCall call = fixture.MakeCall(); |
||||
Notification handler_metadata_done; |
||||
Notification initiator_metadata_done; |
||||
call.handler.SpawnInfallible("handler-initial-metadata", [&]() { |
||||
return Map(call.handler.PullClientInitialMetadata(), |
||||
[&](ValueOrFailure<ClientMetadataHandle> md) { |
||||
CHECK(md.ok()); |
||||
call.handler.PushServerInitialMetadata( |
||||
fixture.MakeServerInitialMetadata()); |
||||
handler_metadata_done.Notify(); |
||||
return Empty{}; |
||||
}); |
||||
}); |
||||
call.initiator.SpawnInfallible("initiator-initial-metadata", [&]() { |
||||
return Map(call.initiator.PullServerInitialMetadata(), |
||||
[&](absl::optional<ServerMetadataHandle> md) { |
||||
CHECK(md.has_value()); |
||||
initiator_metadata_done.Notify(); |
||||
return Empty{}; |
||||
}); |
||||
}); |
||||
handler_metadata_done.WaitForNotification(); |
||||
initiator_metadata_done.WaitForNotification(); |
||||
for (auto _ : state) { |
||||
Notification handler_done; |
||||
Notification initiator_done; |
||||
call.handler.SpawnInfallible("handler", [&]() { |
||||
return Map(call.handler.PullMessage(), |
||||
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) { |
||||
CHECK(msg.ok()); |
||||
handler_done.Notify(); |
||||
return Empty{}; |
||||
}); |
||||
}); |
||||
call.initiator.SpawnInfallible("initiator", [&]() { |
||||
return Map(call.initiator.PushMessage(fixture.MakePayload()), |
||||
[&](StatusFlag result) { |
||||
CHECK(result.ok()); |
||||
initiator_done.Notify(); |
||||
return Empty{}; |
||||
}); |
||||
}); |
||||
handler_done.WaitForNotification(); |
||||
initiator_done.WaitForNotification(); |
||||
} |
||||
call.initiator.SpawnInfallible("done", [&]() { |
||||
call.initiator.Cancel(); |
||||
return Empty{}; |
||||
}); |
||||
call.handler.SpawnInfallible("done", [&]() { |
||||
call.handler.PushServerTrailingMetadata( |
||||
CancelledServerMetadataFromStatus(absl::CancelledError())); |
||||
return Empty{}; |
||||
}); |
||||
} |
||||
|
||||
// Base class for fixtures that wrap a single filter.
|
||||
// Traits should have MakeClientInitialMetadata, MakeServerInitialMetadata,
|
||||
// MakePayload, MakeServerTrailingMetadata, MakeChannelArgs and a type named
|
||||
// Filter.
|
||||
template <class Traits> |
||||
class FilterFixture { |
||||
public: |
||||
BenchmarkCall MakeCall() { |
||||
auto p = MakeCallPair(traits_.MakeClientInitialMetadata(), |
||||
event_engine_.get(), arena_allocator_->MakeArena()); |
||||
return {std::move(p.initiator), p.handler.StartCall(stack_)}; |
||||
} |
||||
|
||||
ServerMetadataHandle MakeServerInitialMetadata() { |
||||
return traits_.MakeServerInitialMetadata(); |
||||
} |
||||
|
||||
MessageHandle MakePayload() { return traits_.MakePayload(); } |
||||
|
||||
ServerMetadataHandle MakeServerTrailingMetadata() { |
||||
return traits_.MakeServerTrailingMetadata(); |
||||
} |
||||
|
||||
private: |
||||
Traits traits_; |
||||
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_ = |
||||
grpc_event_engine::experimental::GetDefaultEventEngine(); |
||||
RefCountedPtr<CallArenaAllocator> arena_allocator_ = |
||||
MakeRefCounted<CallArenaAllocator>( |
||||
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( |
||||
"test-allocator"), |
||||
1024); |
||||
const RefCountedPtr<CallFilters::Stack> stack_ = [this]() { |
||||
auto filter = Traits::Filter::Create(traits_.MakeChannelArgs(), |
||||
typename Traits::Filter::Args{}); |
||||
CHECK(filter.ok()); |
||||
CallFilters::StackBuilder builder; |
||||
builder.Add(filter->get()); |
||||
builder.AddOwnedObject(std::move(*filter)); |
||||
return builder.Build(); |
||||
}(); |
||||
}; |
||||
|
||||
// Base class for fixtures that wrap an UnstartedCallDestination.
|
||||
template <class Traits> |
||||
class UnstartedCallDestinationFixture { |
||||
public: |
||||
BenchmarkCall MakeCall() { |
||||
auto p = MakeCallPair(traits_->MakeClientInitialMetadata(), |
||||
event_engine_.get(), arena_allocator_->MakeArena()); |
||||
top_destination_->StartCall(std::move(p.handler)); |
||||
auto handler = bottom_destination_->TakeHandler(); |
||||
absl::optional<CallHandler> started_handler; |
||||
Notification started; |
||||
handler.SpawnInfallible("handler_setup", [&]() { |
||||
started_handler = handler.StartCall(stack_); |
||||
started.Notify(); |
||||
return Empty{}; |
||||
}); |
||||
started.WaitForNotification(); |
||||
CHECK(started_handler.has_value()); |
||||
return {std::move(p.initiator), std::move(*started_handler)}; |
||||
} |
||||
|
||||
~UnstartedCallDestinationFixture() { |
||||
// TODO(ctiller): entire destructor can be deleted once ExecCtx is gone.
|
||||
ExecCtx exec_ctx; |
||||
stack_.reset(); |
||||
top_destination_.reset(); |
||||
bottom_destination_.reset(); |
||||
arena_allocator_.reset(); |
||||
event_engine_.reset(); |
||||
traits_.reset(); |
||||
} |
||||
|
||||
ServerMetadataHandle MakeServerInitialMetadata() { |
||||
return traits_->MakeServerInitialMetadata(); |
||||
} |
||||
|
||||
MessageHandle MakePayload() { return traits_->MakePayload(); } |
||||
|
||||
ServerMetadataHandle MakeServerTrailingMetadata() { |
||||
return traits_->MakeServerTrailingMetadata(); |
||||
} |
||||
|
||||
private: |
||||
class SinkDestination : public UnstartedCallDestination { |
||||
public: |
||||
void StartCall(UnstartedCallHandler handler) override { |
||||
MutexLock lock(&mu_); |
||||
handler_ = std::move(handler); |
||||
} |
||||
void Orphaned() override {} |
||||
|
||||
UnstartedCallHandler TakeHandler() { |
||||
mu_.LockWhen(absl::Condition( |
||||
+[](SinkDestination* dest) ABSL_EXCLUSIVE_LOCKS_REQUIRED(dest->mu_) { |
||||
return dest->handler_.has_value(); |
||||
}, |
||||
this)); |
||||
auto h = std::move(*handler_); |
||||
handler_.reset(); |
||||
mu_.Unlock(); |
||||
return h; |
||||
} |
||||
|
||||
private: |
||||
absl::Mutex mu_; |
||||
absl::optional<UnstartedCallHandler> handler_ ABSL_GUARDED_BY(mu_); |
||||
}; |
||||
|
||||
// TODO(ctiller): no need for unique_ptr once ExecCtx is gone
|
||||
std::unique_ptr<Traits> traits_ = std::make_unique<Traits>(); |
||||
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_ = |
||||
grpc_event_engine::experimental::GetDefaultEventEngine(); |
||||
RefCountedPtr<CallArenaAllocator> arena_allocator_ = |
||||
MakeRefCounted<CallArenaAllocator>( |
||||
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( |
||||
"test-allocator"), |
||||
1024); |
||||
RefCountedPtr<SinkDestination> bottom_destination_ = |
||||
MakeRefCounted<SinkDestination>(); |
||||
RefCountedPtr<UnstartedCallDestination> top_destination_ = |
||||
traits_->CreateCallDestination(bottom_destination_); |
||||
RefCountedPtr<CallFilters::Stack> stack_ = |
||||
CallFilters::StackBuilder().Build(); |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#define GRPC_CALL_SPINE_BENCHMARK(Fixture) \ |
||||
BENCHMARK(grpc_core::BM_UnaryWithSpawnPerEnd<Fixture>); \
|
||||
BENCHMARK(grpc_core::BM_UnaryWithSpawnPerOp<Fixture>); \
|
||||
BENCHMARK(grpc_core::BM_ClientToServerStreaming<Fixture>) |
||||
|
||||
#endif // GRPC_TEST_CORE_TRANSPORT_CALL_SPINE_BENCHMARKS_H
|
Loading…
Reference in new issue