mirror of https://github.com/grpc/grpc.git
commit
251f2d885b
19 changed files with 775 additions and 36 deletions
@ -0,0 +1,4 @@ |
||||
test_id: 178 |
||||
event_engine_actions { |
||||
run_delay: 261993005056 |
||||
} |
@ -0,0 +1,131 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "test/core/test_util/passthrough_endpoint.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
class PassthroughEndpoint::CallbackHelper { |
||||
public: |
||||
CallbackHelper(EventEngine* event_engine, bool allow_inline_callbacks) |
||||
: event_engine_(allow_inline_callbacks ? nullptr : event_engine) {} |
||||
|
||||
template <typename F> |
||||
void AddCallback(F&& callback) { |
||||
if (event_engine_ != nullptr) { |
||||
event_engine_->Run(std::forward<F>(callback)); |
||||
} else { |
||||
callbacks_.emplace_back(std::forward<F>(callback)); |
||||
} |
||||
} |
||||
|
||||
~CallbackHelper() { |
||||
for (auto& callback : callbacks_) { |
||||
callback(); |
||||
} |
||||
} |
||||
|
||||
private: |
||||
EventEngine* event_engine_; |
||||
absl::InlinedVector<absl::AnyInvocable<void()>, 4> callbacks_; |
||||
}; |
||||
|
||||
PassthroughEndpoint::PassthroughEndpointPair |
||||
PassthroughEndpoint::MakePassthroughEndpoint(int client_port, int server_port, |
||||
bool allow_inline_callbacks) { |
||||
auto send_middle = |
||||
grpc_core::MakeRefCounted<PassthroughEndpoint::Middle>(client_port); |
||||
auto recv_middle = |
||||
grpc_core::MakeRefCounted<PassthroughEndpoint::Middle>(server_port); |
||||
auto client = std::unique_ptr<PassthroughEndpoint>(new PassthroughEndpoint( |
||||
send_middle, recv_middle, allow_inline_callbacks)); |
||||
auto server = std::unique_ptr<PassthroughEndpoint>(new PassthroughEndpoint( |
||||
recv_middle, send_middle, allow_inline_callbacks)); |
||||
return {std::move(client), std::move(server)}; |
||||
} |
||||
|
||||
PassthroughEndpoint::~PassthroughEndpoint() { |
||||
CallbackHelper callback_helper(event_engine_.get(), allow_inline_callbacks_); |
||||
send_middle_->Close(callback_helper); |
||||
recv_middle_->Close(callback_helper); |
||||
} |
||||
|
||||
bool PassthroughEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read, |
||||
SliceBuffer* buffer, const ReadArgs*) { |
||||
CallbackHelper callback_helper(event_engine_.get(), allow_inline_callbacks_); |
||||
grpc_core::MutexLock lock(&recv_middle_->mu); |
||||
if (recv_middle_->closed) { |
||||
callback_helper.AddCallback([on_read = std::move(on_read)]() mutable { |
||||
on_read(absl::CancelledError()); |
||||
}); |
||||
return false; |
||||
} |
||||
if (recv_middle_->on_write != nullptr) { |
||||
*buffer = std::move(*recv_middle_->write_buffer); |
||||
callback_helper.AddCallback( |
||||
[on_write = std::move(recv_middle_->on_write)]() mutable { |
||||
on_write(absl::OkStatus()); |
||||
}); |
||||
recv_middle_->on_write = nullptr; |
||||
return true; |
||||
} |
||||
recv_middle_->read_buffer = buffer; |
||||
recv_middle_->on_read = std::move(on_read); |
||||
return false; |
||||
} |
||||
|
||||
bool PassthroughEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_write, |
||||
SliceBuffer* buffer, const WriteArgs*) { |
||||
CallbackHelper callback_helper(event_engine_.get(), allow_inline_callbacks_); |
||||
grpc_core::MutexLock lock(&send_middle_->mu); |
||||
if (send_middle_->closed) { |
||||
callback_helper.AddCallback([on_write = std::move(on_write)]() mutable { |
||||
on_write(absl::CancelledError()); |
||||
}); |
||||
return false; |
||||
} |
||||
if (send_middle_->on_read != nullptr) { |
||||
*send_middle_->read_buffer = std::move(*buffer); |
||||
callback_helper.AddCallback( |
||||
[on_read = std::move(send_middle_->on_read)]() mutable { |
||||
on_read(absl::OkStatus()); |
||||
}); |
||||
send_middle_->on_read = nullptr; |
||||
return true; |
||||
} |
||||
send_middle_->write_buffer = buffer; |
||||
send_middle_->on_write = std::move(on_write); |
||||
return false; |
||||
} |
||||
|
||||
void PassthroughEndpoint::Middle::Close(CallbackHelper& callback_helper) { |
||||
grpc_core::MutexLock lock(&mu); |
||||
closed = true; |
||||
if (on_read != nullptr) { |
||||
callback_helper.AddCallback([on_read = std::move(on_read)]() mutable { |
||||
on_read(absl::CancelledError()); |
||||
}); |
||||
on_read = nullptr; |
||||
} |
||||
if (on_write != nullptr) { |
||||
callback_helper.AddCallback([on_write = std::move(on_write)]() mutable { |
||||
on_write(absl::CancelledError()); |
||||
}); |
||||
on_write = nullptr; |
||||
} |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
@ -0,0 +1,95 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_TEST_CORE_TEST_UTIL_PASSTHROUGH_ENDPOINT_H |
||||
#define GRPC_TEST_CORE_TEST_UTIL_PASSTHROUGH_ENDPOINT_H |
||||
|
||||
#include <memory> |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
#include "src/core/lib/event_engine/default_event_engine.h" |
||||
#include "src/core/lib/event_engine/tcp_socket_utils.h" |
||||
#include "src/core/lib/gprpp/ref_counted.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
class PassthroughEndpoint final : public EventEngine::Endpoint { |
||||
public: |
||||
struct PassthroughEndpointPair { |
||||
std::unique_ptr<PassthroughEndpoint> client; |
||||
std::unique_ptr<PassthroughEndpoint> server; |
||||
}; |
||||
// client_port, server_port are markers that are baked into the peer/local
|
||||
// addresses for debug information.
|
||||
// allow_inline_callbacks is a flag that allows the endpoint to call the
|
||||
// on_read/on_write callbacks inline (but outside any PassthroughEndpoint
|
||||
// locks)
|
||||
static PassthroughEndpointPair MakePassthroughEndpoint( |
||||
int client_port, int server_port, bool allow_inline_callbacks); |
||||
|
||||
~PassthroughEndpoint() override; |
||||
|
||||
bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer, |
||||
const ReadArgs* args) override; |
||||
|
||||
bool Write(absl::AnyInvocable<void(absl::Status)> on_write, |
||||
SliceBuffer* buffer, const WriteArgs* args) override; |
||||
|
||||
const EventEngine::ResolvedAddress& GetPeerAddress() const override { |
||||
return recv_middle_->address; |
||||
} |
||||
const EventEngine::ResolvedAddress& GetLocalAddress() const override { |
||||
return send_middle_->address; |
||||
} |
||||
|
||||
private: |
||||
class CallbackHelper; |
||||
|
||||
struct Middle : public grpc_core::RefCounted<Middle> { |
||||
explicit Middle(int port) |
||||
: address(URIToResolvedAddress(absl::StrCat("ipv4:127.0.0.1:", port)) |
||||
.value()) {} |
||||
|
||||
void Close(CallbackHelper& callback_helper); |
||||
|
||||
grpc_core::Mutex mu; |
||||
bool closed ABSL_GUARDED_BY(mu) = false; |
||||
SliceBuffer* read_buffer ABSL_GUARDED_BY(mu) = nullptr; |
||||
absl::AnyInvocable<void(absl::Status)> on_read ABSL_GUARDED_BY(mu) = |
||||
nullptr; |
||||
SliceBuffer* write_buffer ABSL_GUARDED_BY(mu) = nullptr; |
||||
absl::AnyInvocable<void(absl::Status)> on_write ABSL_GUARDED_BY(mu) = |
||||
nullptr; |
||||
EventEngine::ResolvedAddress address; |
||||
}; |
||||
|
||||
PassthroughEndpoint(grpc_core::RefCountedPtr<Middle> send_middle, |
||||
grpc_core::RefCountedPtr<Middle> recv_middle, |
||||
bool allow_inline_callbacks) |
||||
: send_middle_(std::move(send_middle)), |
||||
recv_middle_(std::move(recv_middle)), |
||||
allow_inline_callbacks_(allow_inline_callbacks) {} |
||||
|
||||
grpc_core::RefCountedPtr<Middle> send_middle_; |
||||
grpc_core::RefCountedPtr<Middle> recv_middle_; |
||||
std::shared_ptr<EventEngine> event_engine_ = GetDefaultEventEngine(); |
||||
bool allow_inline_callbacks_; |
||||
}; |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GRPC_TEST_CORE_TEST_UTIL_PASSTHROUGH_ENDPOINT_H
|
@ -0,0 +1,48 @@ |
||||
# Copyright 2021 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
load("//bazel:grpc_build_system.bzl", "grpc_package") |
||||
load("//test/cpp/microbenchmarks:grpc_benchmark_config.bzl", "grpc_cc_benchmark") |
||||
|
||||
licenses(["notice"]) |
||||
|
||||
grpc_package( |
||||
name = "test/core/transport/benchmarks", |
||||
) |
||||
|
||||
grpc_cc_benchmark( |
||||
name = "bm_chaotic_good", |
||||
srcs = ["bm_chaotic_good.cc"], |
||||
deps = [ |
||||
"//:grpc", |
||||
"//src/core:chaotic_good_client_transport", |
||||
"//src/core:chaotic_good_server_transport", |
||||
"//src/core:default_event_engine", |
||||
"//test/core/test_util:passthrough_endpoint", |
||||
"//test/core/transport:call_spine_benchmarks", |
||||
], |
||||
) |
||||
|
||||
grpc_cc_benchmark( |
||||
name = "bm_inproc", |
||||
srcs = ["bm_inproc.cc"], |
||||
deps = [ |
||||
"//:grpc", |
||||
"//src/core:chaotic_good_client_transport", |
||||
"//src/core:chaotic_good_server_transport", |
||||
"//src/core:default_event_engine", |
||||
"//test/core/test_util:passthrough_endpoint", |
||||
"//test/core/transport:call_spine_benchmarks", |
||||
], |
||||
) |
@ -0,0 +1,93 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <benchmark/benchmark.h> |
||||
|
||||
#include "absl/memory/memory.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "src/core/ext/transport/chaotic_good/client_transport.h" |
||||
#include "src/core/ext/transport/chaotic_good/server_transport.h" |
||||
#include "src/core/lib/address_utils/parse_address.h" |
||||
#include "test/core/test_util/passthrough_endpoint.h" |
||||
#include "test/core/transport/call_spine_benchmarks.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace { |
||||
|
||||
const Slice kTestPath = Slice::FromExternalString("/foo/bar"); |
||||
|
||||
class ChaoticGoodTraits { |
||||
public: |
||||
BenchmarkTransport MakeTransport() { |
||||
auto channel_args = CoreConfiguration::Get() |
||||
.channel_args_preconditioning() |
||||
.PreconditionChannelArgs(nullptr); |
||||
auto control = grpc_event_engine::experimental::PassthroughEndpoint:: |
||||
MakePassthroughEndpoint(1, 2, true); |
||||
auto data = grpc_event_engine::experimental::PassthroughEndpoint:: |
||||
MakePassthroughEndpoint(3, 4, true); |
||||
auto client = MakeOrphanable<chaotic_good::ChaoticGoodClientTransport>( |
||||
PromiseEndpoint(std::move(control.client), SliceBuffer()), |
||||
PromiseEndpoint(std::move(data.client), SliceBuffer()), channel_args, |
||||
grpc_event_engine::experimental::GetDefaultEventEngine(), HPackParser(), |
||||
HPackCompressor()); |
||||
auto server = MakeOrphanable<chaotic_good::ChaoticGoodServerTransport>( |
||||
channel_args, PromiseEndpoint(std::move(control.server), SliceBuffer()), |
||||
PromiseEndpoint(std::move(data.server), SliceBuffer()), |
||||
grpc_event_engine::experimental::GetDefaultEventEngine(), HPackParser(), |
||||
HPackCompressor()); |
||||
return {std::move(client), std::move(server)}; |
||||
} |
||||
|
||||
ClientMetadataHandle MakeClientInitialMetadata() { |
||||
auto md = Arena::MakePooled<ClientMetadata>(); |
||||
md->Set(HttpPathMetadata(), kTestPath.Copy()); |
||||
return md; |
||||
} |
||||
|
||||
ServerMetadataHandle MakeServerInitialMetadata() { |
||||
return Arena::MakePooled<ServerMetadata>(); |
||||
} |
||||
|
||||
MessageHandle MakePayload() { return Arena::MakePooled<Message>(); } |
||||
|
||||
ServerMetadataHandle MakeServerTrailingMetadata() { |
||||
auto md = Arena::MakePooled<ServerMetadata>(); |
||||
return md; |
||||
} |
||||
}; |
||||
GRPC_CALL_SPINE_BENCHMARK(TransportFixture<ChaoticGoodTraits>); |
||||
|
||||
} // namespace
|
||||
} // namespace grpc_core
|
||||
|
||||
// Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
|
||||
// and others do not. This allows us to support both modes.
|
||||
namespace benchmark { |
||||
void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } |
||||
} // namespace benchmark
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::benchmark::Initialize(&argc, argv); |
||||
grpc_init(); |
||||
{ |
||||
auto ee = grpc_event_engine::experimental::GetDefaultEventEngine(); |
||||
benchmark::RunTheBenchmarksNamespaced(); |
||||
} |
||||
grpc_shutdown(); |
||||
return 0; |
||||
} |
@ -0,0 +1,83 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <memory> |
||||
|
||||
#include <benchmark/benchmark.h> |
||||
|
||||
#include "absl/memory/memory.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "src/core/ext/transport/inproc/inproc_transport.h" |
||||
#include "src/core/lib/address_utils/parse_address.h" |
||||
#include "test/core/transport/call_spine_benchmarks.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace { |
||||
|
||||
const Slice kTestPath = Slice::FromExternalString("/foo/bar"); |
||||
|
||||
class InprocTraits { |
||||
public: |
||||
BenchmarkTransport MakeTransport() { |
||||
auto channel_args = CoreConfiguration::Get() |
||||
.channel_args_preconditioning() |
||||
.PreconditionChannelArgs(nullptr); |
||||
auto t = MakeInProcessTransportPair(channel_args); |
||||
return {OrphanablePtr<ClientTransport>( |
||||
DownCast<ClientTransport*>(t.first.release())), |
||||
OrphanablePtr<ServerTransport>( |
||||
DownCast<ServerTransport*>(t.second.release()))}; |
||||
} |
||||
|
||||
ClientMetadataHandle MakeClientInitialMetadata() { |
||||
auto md = Arena::MakePooled<ClientMetadata>(); |
||||
md->Set(HttpPathMetadata(), kTestPath.Copy()); |
||||
return md; |
||||
} |
||||
|
||||
ServerMetadataHandle MakeServerInitialMetadata() { |
||||
return Arena::MakePooled<ServerMetadata>(); |
||||
} |
||||
|
||||
MessageHandle MakePayload() { return Arena::MakePooled<Message>(); } |
||||
|
||||
ServerMetadataHandle MakeServerTrailingMetadata() { |
||||
auto md = Arena::MakePooled<ServerMetadata>(); |
||||
return md; |
||||
} |
||||
}; |
||||
GRPC_CALL_SPINE_BENCHMARK(TransportFixture<InprocTraits>); |
||||
|
||||
} // namespace
|
||||
} // namespace grpc_core
|
||||
|
||||
// Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
|
||||
// and others do not. This allows us to support both modes.
|
||||
namespace benchmark { |
||||
void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } |
||||
} // namespace benchmark
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::benchmark::Initialize(&argc, argv); |
||||
grpc_init(); |
||||
{ |
||||
auto ee = grpc_event_engine::experimental::GetDefaultEventEngine(); |
||||
benchmark::RunTheBenchmarksNamespaced(); |
||||
} |
||||
grpc_shutdown(); |
||||
return 0; |
||||
} |
Loading…
Reference in new issue