[EventEngine] Refactor WritesPerRpcTest to EventEngine (#35763)

Closes #35763

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35763 from drfloob:eeify-writes-per-rpc ebb79f7dd6
PiperOrigin-RevId: 608719485
pull/35347/head^2
AJ Heller 1 year ago committed by Copybara-Service
parent 4356005b01
commit b6a961fa8e
  1. 6
      CMakeLists.txt
  2. 5
      build_autogenerated.yaml
  3. 7
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
  4. 28
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
  5. 47
      test/core/event_engine/test_suite/fuzzing_event_engine_test.cc
  6. 6
      test/core/event_engine/test_suite/tests/timer_test.cc
  7. 2
      test/cpp/performance/BUILD
  8. 177
      test/cpp/performance/writes_per_rpc_test.cc

6
CMakeLists.txt generated

@ -32440,6 +32440,12 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
test/core/event_engine/event_engine_test_utils.cc
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
test/core/util/cmdline.cc
test/core/util/fuzzer_util.cc
test/core/util/grpc_profiler.cc

@ -20139,6 +20139,8 @@ targets:
build: test
language: c++
headers:
- test/core/event_engine/event_engine_test_utils.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
- test/core/util/cmdline.h
- test/core/util/evaluate_args_test_util.h
- test/core/util/fuzzer_util.h
@ -20156,6 +20158,9 @@ targets:
- src/proto/grpc/testing/echo_messages.proto
- src/proto/grpc/testing/simple_messages.proto
- src/proto/grpc/testing/xds/v3/orca_load_report.proto
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
- test/core/util/cmdline.cc
- test/core/util/fuzzer_util.cc
- test/core/util/grpc_profiler.cc

@ -20,8 +20,6 @@
#include <algorithm>
#include <chrono>
#include <limits>
#include <ratio>
#include <type_traits>
#include <vector>
#include "absl/memory/memory.h"
@ -31,10 +29,10 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/port.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
@ -42,6 +40,8 @@
#if defined(GRPC_POSIX_SOCKET_TCP)
#include "src/core/lib/event_engine/posix_engine/native_posix_dns_resolver.h"
#else
#include "src/core/lib/gprpp/crash.h"
#endif
// IWYU pragma: no_include <sys/socket.h>
@ -343,6 +343,7 @@ bool FuzzingEventEngine::EndpointMiddle::Write(SliceBuffer* data, int index) {
bool FuzzingEventEngine::FuzzingEndpoint::Write(
absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
const WriteArgs*) {
grpc_core::global_stats().IncrementSyscallWrite();
grpc_core::MutexLock lock(&*mu_);
GPR_ASSERT(!middle_->closed[my_index()]);
GPR_ASSERT(!middle_->writing[my_index()]);

@ -23,6 +23,7 @@
#include <memory>
#include <queue>
#include <set>
#include <thread>
#include <utility>
#include <vector>
@ -38,6 +39,7 @@
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/support/time.h>
#include "src/core/lib/event_engine/time_util.h"
#include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/gprpp/sync.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
@ -302,6 +304,32 @@ class FuzzingEventEngine : public EventEngine {
ABSL_GUARDED_BY(run_after_duration_callback_mu_);
};
class ThreadedFuzzingEventEngine : public FuzzingEventEngine {
public:
ThreadedFuzzingEventEngine()
: ThreadedFuzzingEventEngine(std::chrono::milliseconds(10)) {}
explicit ThreadedFuzzingEventEngine(Duration max_time)
: FuzzingEventEngine(FuzzingEventEngine::Options(),
fuzzing_event_engine::Actions()),
main_([this, max_time]() {
while (!done_.load()) {
absl::SleepFor(absl::Milliseconds(
grpc_event_engine::experimental::Milliseconds(max_time)));
Tick();
}
}) {}
~ThreadedFuzzingEventEngine() override {
done_.store(true);
main_.join();
}
private:
std::atomic<bool> done_{false};
std::thread main_;
};
} // namespace experimental
} // namespace grpc_event_engine

@ -14,60 +14,19 @@
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include <atomic>
#include <thread>
#include <chrono>
#include <gtest/gtest.h>
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
#include "test/core/event_engine/test_suite/event_engine_test_framework.h"
#include "test/core/event_engine/test_suite/tests/timer_test.h"
namespace grpc_event_engine {
namespace experimental {
namespace {
class ThreadedFuzzingEventEngine : public FuzzingEventEngine {
public:
ThreadedFuzzingEventEngine()
: FuzzingEventEngine(
[]() {
Options options;
return options;
}(),
fuzzing_event_engine::Actions()),
main_([this]() {
while (!done_.load()) {
auto tick_start = absl::Now();
while (absl::Now() - tick_start < absl::Milliseconds(10)) {
absl::SleepFor(absl::Milliseconds(1));
}
Tick();
}
}) {}
~ThreadedFuzzingEventEngine() override {
done_.store(true);
main_.join();
}
private:
std::atomic<bool> done_{false};
std::thread main_;
};
} // namespace
} // namespace experimental
} // namespace grpc_event_engine
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine> engine =
std::make_shared<
grpc_event_engine::experimental::ThreadedFuzzingEventEngine>();
grpc_event_engine::experimental::ThreadedFuzzingEventEngine>(
std::chrono::milliseconds(2));
SetEventEngineFactories([engine]() { return engine; },
[engine]() { return engine; });
grpc_event_engine::experimental::InitTimerTests();

@ -34,6 +34,7 @@
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/time_util.h"
#include "src/core/lib/gprpp/sync.h"
#include "test/core/event_engine/test_suite/event_engine_test_framework.h"
@ -147,7 +148,10 @@ void EventEngineTimerTest::ScheduleCheckCB(
std::chrono::steady_clock::time_point when, std::atomic<int>* call_count,
std::atomic<int>* fail_count, int total_expected) {
auto now = std::chrono::steady_clock::now();
EXPECT_LE(when, now);
EXPECT_LE(when, now) << "Callback was run "
<< grpc_event_engine::experimental::Milliseconds(when -
now)
<< " ms too early: ";
if (when > now) ++(*fail_count);
if (++(*call_count) == total_expected) {
grpc_core::MutexLock lock(&mu_);

@ -31,6 +31,8 @@ grpc_cc_test(
"//:grpc++",
"//src/core:channel_args",
"//src/proto/grpc/testing:echo_proto",
"//test/core/event_engine:event_engine_test_utils",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/util:grpc_test_util",
"//test/core/util:grpc_test_util_base",
],

@ -16,6 +16,8 @@
//
//
#include <chrono>
#include <gtest/gtest.h>
#include <grpc/support/log.h>
@ -30,63 +32,103 @@
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/endpoint_pair.h"
#include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/server.h"
#include "src/cpp/client/create_channel_internal.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/passthru_endpoint.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
namespace grpc {
namespace testing {
static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
namespace {
using grpc_event_engine::experimental::EventEngine;
using grpc_event_engine::experimental::ThreadedFuzzingEventEngine;
using grpc_event_engine::experimental::URIToResolvedAddress;
static void ApplyCommonServerBuilderConfig(ServerBuilder* b) {
b->SetMaxReceiveMessageSize(INT_MAX);
b->SetMaxSendMessageSize(INT_MAX);
}
void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
static void ApplyCommonChannelArguments(grpc_core::ChannelArgs* c) {
*c = c->Set(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX)
.Set(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX);
}
constexpr int kIterations = 10000;
constexpr int kSnapshotEvery = kIterations / 10;
} // namespace
class EndpointPairFixture {
class InProcessCHTTP2 {
public:
EndpointPairFixture(Service* service, grpc_endpoint_pair endpoints) {
InProcessCHTTP2(Service* service, std::string& addr,
ThreadedFuzzingEventEngine* fuzzing_engine) {
// TODO(hork): move to a endpoint pair helper
// Creating the listener
grpc_core::Notification listener_started;
std::unique_ptr<EventEngine::Endpoint> listener_endpoint;
grpc_core::ChannelArgs args;
grpc_event_engine::experimental::ChannelArgsEndpointConfig config(args);
auto listener = fuzzing_engine->CreateListener(
[&](std::unique_ptr<EventEngine::Endpoint> ep,
grpc_core::MemoryAllocator) {
listener_endpoint = std::move(ep);
listener_started.Notify();
},
[](absl::Status status) { GPR_ASSERT(status.ok()); }, config,
std::make_unique<grpc_core::MemoryQuota>("foo"));
if (!listener.ok()) {
grpc_core::Crash(absl::StrCat("failed to start listener: ",
listener.status().ToString()));
}
auto target_addr = URIToResolvedAddress(addr);
GPR_ASSERT(target_addr.ok());
GPR_ASSERT((*listener)->Bind(*target_addr).ok());
GPR_ASSERT((*listener)->Start().ok());
// Creating the client
std::unique_ptr<EventEngine::Endpoint> client_endpoint;
grpc_core::Notification client_connected;
auto client_memory_quota =
std::make_unique<grpc_core::MemoryQuota>("client");
std::ignore = fuzzing_engine->Connect(
[&](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> endpoint) {
GPR_ASSERT(endpoint.ok());
client_endpoint = std::move(*endpoint);
client_connected.Notify();
},
*target_addr, config,
client_memory_quota->CreateMemoryAllocator("conn-1"),
grpc_core::Duration::Infinity());
client_connected.WaitForNotification();
listener_started.WaitForNotification();
ServerBuilder b;
cq_ = b.AddCompletionQueue(true);
b.RegisterService(service);
ApplyCommonServerBuilderConfig(&b);
b.SetMaxReceiveMessageSize(INT_MAX);
b.SetMaxSendMessageSize(INT_MAX);
server_ = b.BuildAndStart();
grpc_core::ExecCtx exec_ctx;
// add server endpoint to server_
{
grpc_core::Server* core_server =
grpc_core::Server::FromC(server_->c_server());
grpc_endpoint* iomgr_server_endpoint =
grpc_event_engine_endpoint_create(std::move(listener_endpoint));
grpc_core::Transport* transport = grpc_create_chttp2_transport(
core_server->channel_args(), endpoints.server, false /* is_client */);
core_server->channel_args(), iomgr_server_endpoint,
/*is_client=*/false);
for (grpc_pollset* pollset : core_server->pollsets()) {
grpc_endpoint_add_to_pollset(endpoints.server, pollset);
grpc_endpoint_add_to_pollset(iomgr_server_endpoint, pollset);
}
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"SetupTransport",
core_server->SetupTransport(transport, nullptr,
core_server->channel_args(), nullptr)));
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
}
// create channel
{
grpc_core::ChannelArgs args =
@ -94,10 +136,12 @@ class EndpointPairFixture {
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.Set(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority");
ApplyCommonChannelArguments(&args);
grpc_core::Transport* transport =
grpc_create_chttp2_transport(args, endpoints.client, true);
args = args.Set(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX)
.Set(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX)
.Set(GRPC_ARG_HTTP2_BDP_PROBE, 0);
grpc_core::Transport* transport = grpc_create_chttp2_transport(
args, grpc_event_engine_endpoint_create(std::move(client_endpoint)),
/*is_client=*/true);
GPR_ASSERT(transport);
grpc_channel* channel =
grpc_core::Channel::Create("target", args, GRPC_CLIENT_DIRECT_CHANNEL,
@ -105,7 +149,6 @@ class EndpointPairFixture {
->release()
->c_ptr();
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
channel_ = grpc::CreateChannelInternal(
"", channel,
std::vector<std::unique_ptr<
@ -113,7 +156,7 @@ class EndpointPairFixture {
}
}
virtual ~EndpointPairFixture() {
virtual ~InProcessCHTTP2() {
server_->Shutdown();
cq_->Shutdown();
void* tag;
@ -131,35 +174,13 @@ class EndpointPairFixture {
std::shared_ptr<Channel> channel_;
};
class InProcessCHTTP2 : public EndpointPairFixture {
public:
InProcessCHTTP2(Service* service, grpc_passthru_endpoint_stats* stats)
: EndpointPairFixture(service, MakeEndpoints(stats)), stats_(stats) {}
~InProcessCHTTP2() override {
if (stats_ != nullptr) {
grpc_passthru_endpoint_stats_destroy(stats_);
}
}
int writes_performed() const { return gpr_atm_acq_load(&stats_->num_writes); }
private:
grpc_passthru_endpoint_stats* stats_;
static grpc_endpoint_pair MakeEndpoints(grpc_passthru_endpoint_stats* stats) {
grpc_endpoint_pair p;
grpc_passthru_endpoint_create(&p.client, &p.server, stats);
return p;
}
};
static double UnaryPingPong(int request_size, int response_size) {
const int kIterations = 10000;
static double UnaryPingPong(ThreadedFuzzingEventEngine* fuzzing_engine,
int request_size, int response_size) {
EchoTestService::AsyncService service;
std::string target_addr = absl::StrCat(
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
std::unique_ptr<InProcessCHTTP2> fixture(
new InProcessCHTTP2(&service, grpc_passthru_endpoint_stats_create()));
new InProcessCHTTP2(&service, target_addr, fuzzing_engine));
EchoRequest send_request;
EchoResponse send_response;
EchoResponse recv_response;
@ -190,7 +211,21 @@ static double UnaryPingPong(int request_size, int response_size) {
fixture->cq(), tag(1));
std::unique_ptr<EchoTestService::Stub> stub(
EchoTestService::NewStub(fixture->channel()));
auto baseline = grpc_core::global_stats().Collect();
auto snapshot = grpc_core::global_stats().Collect();
for (int iteration = 0; iteration < kIterations; iteration++) {
if (iteration % kSnapshotEvery == 0) {
auto new_snapshot = grpc_core::global_stats().Collect();
auto diff = new_snapshot->Diff(*snapshot);
gpr_log(GPR_DEBUG,
" SNAPSHOT: UnaryPingPong(%d, %d): writes_per_iteration=%0.3f "
"(total=%lu, i=%d) pings=%lu",
request_size, response_size,
static_cast<double>(diff->syscall_write) /
static_cast<double>(kSnapshotEvery),
diff->syscall_write, iteration, diff->http2_pings_sent);
snapshot = std::move(new_snapshot);
}
recv_response.Clear();
ClientContext cli_ctx;
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
@ -218,10 +253,13 @@ static double UnaryPingPong(int request_size, int response_size) {
service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer,
fixture->cq(), fixture->cq(), tag(slot));
}
auto end_stats = grpc_core::global_stats().Collect()->Diff(*baseline);
double writes_per_iteration =
static_cast<double>(fixture->writes_performed()) /
static_cast<double>(kIterations);
end_stats->syscall_write / static_cast<double>(kIterations);
gpr_log(GPR_DEBUG,
"UnaryPingPong(%d, %d): writes_per_iteration=%0.3f (total=%lu)",
request_size, response_size, writes_per_iteration,
end_stats->syscall_write);
fixture.reset();
server_env[0]->~ServerEnv();
@ -231,17 +269,28 @@ static double UnaryPingPong(int request_size, int response_size) {
}
TEST(WritesPerRpcTest, UnaryPingPong) {
EXPECT_LT(UnaryPingPong(0, 0), 2.05);
EXPECT_LT(UnaryPingPong(1, 0), 2.05);
EXPECT_LT(UnaryPingPong(0, 1), 2.05);
EXPECT_LT(UnaryPingPong(4096, 0), 2.5);
EXPECT_LT(UnaryPingPong(0, 4096), 2.5);
auto fuzzing_engine = std::dynamic_pointer_cast<
grpc_event_engine::experimental::ThreadedFuzzingEventEngine>(
grpc_event_engine::experimental::GetDefaultEventEngine());
EXPECT_LT(UnaryPingPong(fuzzing_engine.get(), 0, 0), 2.2);
EXPECT_LT(UnaryPingPong(fuzzing_engine.get(), 1, 0), 2.2);
EXPECT_LT(UnaryPingPong(fuzzing_engine.get(), 0, 1), 2.2);
EXPECT_LT(UnaryPingPong(fuzzing_engine.get(), 4096, 0), 2.5);
EXPECT_LT(UnaryPingPong(fuzzing_engine.get(), 0, 4096), 2.5);
}
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc_event_engine::experimental::SetEventEngineFactory(
[]() -> std::unique_ptr<grpc_event_engine::experimental::EventEngine> {
return std::make_unique<
grpc_event_engine::experimental::ThreadedFuzzingEventEngine>(
std::chrono::milliseconds(1));
});
// avoids a race around gpr_now_impl
auto engine = grpc_event_engine::experimental::GetDefaultEventEngine();
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();

Loading…
Cancel
Save