Make event engine client test more robust (#31187)

* Minor event engine related cleanup

* make client_test more robust

* cleanup

* regenerate_projects

* review comments

* iwyu

* review comments

* typo
pull/31194/head
Vignesh Babu 2 years ago committed by GitHub
parent 59b3a03f0f
commit 6e15936d85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 54
      test/core/event_engine/test_suite/client_test.cc

@ -30,12 +30,15 @@
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "test/core/event_engine/test_suite/event_engine_test.h"
#include "test/core/event_engine/test_suite/event_engine_test_utils.h"
#include "test/core/util/port.h"
@ -52,6 +55,7 @@ using ::grpc_event_engine::experimental::URIToResolvedAddress;
using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint;
using Listener = ::grpc_event_engine::experimental::EventEngine::Listener;
using ::grpc_event_engine::experimental::GetNextSendMessage;
using ::grpc_event_engine::experimental::WaitForSingleOwner;
constexpr int kNumExchangedMessages = 100;
@ -61,7 +65,7 @@ constexpr int kNumExchangedMessages = 100;
// and verify that the connection fails.
TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest) {
grpc_core::ExecCtx ctx;
auto test_ee = this->NewEventEngine();
std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
grpc_core::Notification signal;
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
std::string target_addr = absl::StrCat(
@ -78,16 +82,18 @@ TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest) {
URIToResolvedAddress(target_addr), config,
memory_quota->CreateMemoryAllocator("conn-1"), 24h);
signal.WaitForNotification();
WaitForSingleOwner(std::move(test_ee));
}
// Create a connection using the test EventEngine to a listener created
// by the oracle EventEngine and exchange bi-di data over the connection.
// For each data transfer, verify that data written at one end of the stream
// equals data read at the other end of the stream.
TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) {
grpc_core::ExecCtx ctx;
auto oracle_ee = this->NewOracleEventEngine();
auto test_ee = this->NewEventEngine();
std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
std::string target_addr = absl::StrCat(
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
@ -104,7 +110,10 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) {
server_signal.Notify();
};
ChannelArgsEndpointConfig config;
grpc_core::ChannelArgs args;
auto quota = grpc_core::ResourceQuota::Default();
args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
ChannelArgsEndpointConfig config(args);
auto status = oracle_ee->CreateListener(
std::move(accept_cb),
[](absl::Status status) { GPR_ASSERT(status.ok()); }, config,
@ -132,10 +141,11 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) {
client_signal.WaitForNotification();
server_signal.WaitForNotification();
EXPECT_TRUE(client_endpoint != nullptr);
EXPECT_TRUE(server_endpoint != nullptr);
EXPECT_NE(client_endpoint.get(), nullptr);
EXPECT_NE(server_endpoint.get(), nullptr);
// Alternate message exchanges between client -- server and server -- client.
// Alternate message exchanges between client -- server and server --
// client.
for (int i = 0; i < kNumExchangedMessages; i++) {
// Send from client to server and verify data read at the server.
EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint.get(),
@ -147,6 +157,9 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) {
client_endpoint.get())
.ok());
}
client_endpoint.reset();
server_endpoint.reset();
WaitForSingleOwner(std::move(test_ee));
}
// Create 1 listener bound to N IPv6 addresses and M connections where M > N and
@ -156,7 +169,7 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
static constexpr int kNumListenerAddresses = 10; // N
static constexpr int kNumConnections = 10; // M
auto oracle_ee = this->NewOracleEventEngine();
auto test_ee = this->NewEventEngine();
std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
std::unique_ptr<EventEngine::Endpoint> server_endpoint;
// Notifications can only be fired once, so they are newed every loop
@ -172,7 +185,10 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
server_endpoint = std::move(ep);
server_signal->Notify();
};
ChannelArgsEndpointConfig config;
grpc_core::ChannelArgs args;
auto quota = grpc_core::ResourceQuota::Default();
args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
ChannelArgsEndpointConfig config(args);
auto status = oracle_ee->CreateListener(
std::move(accept_cb),
[](absl::Status status) { GPR_ASSERT(status.ok()); }, config,
@ -195,7 +211,10 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
// Create a test EventEngine client endpoint and connect to a one of the
// addresses bound to the oracle listener. Verify that the connection
// succeeds.
ChannelArgsEndpointConfig config;
grpc_core::ChannelArgs client_args;
auto client_quota = grpc_core::ResourceQuota::Default();
client_args = client_args.Set(GRPC_ARG_RESOURCE_QUOTA, client_quota);
ChannelArgsEndpointConfig client_config(client_args);
test_ee->Connect(
[&client_endpoint,
&client_signal](absl::StatusOr<std::unique_ptr<Endpoint>> status) {
@ -208,15 +227,16 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
}
client_signal.Notify();
},
URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]), config,
URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]),
client_config,
memory_quota->CreateMemoryAllocator(
absl::StrCat("conn-", std::to_string(i))),
24h);
client_signal.WaitForNotification();
server_signal->WaitForNotification();
EXPECT_TRUE(client_endpoint != nullptr);
EXPECT_TRUE(server_endpoint != nullptr);
EXPECT_NE(client_endpoint.get(), nullptr);
EXPECT_NE(server_endpoint.get(), nullptr);
connections.push_back(std::make_tuple(std::move(client_endpoint),
std::move(server_endpoint)));
delete server_signal;
@ -226,12 +246,14 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
std::vector<std::thread> threads;
// Create one thread for each connection. For each connection, create
// 2 more worker threads: to exchange and verify bi-directional data transfer.
// 2 more worker threads: to exchange and verify bi-directional data
// transfer.
threads.reserve(kNumConnections);
for (int i = 0; i < kNumConnections; i++) {
// For each connection, simulate a parallel bi-directional data transfer.
// All bi-directional transfers are run in parallel across all connections.
// Each bi-directional data transfer uses a random number of messages.
// All bi-directional transfers are run in parallel across all
// connections. Each bi-directional data transfer uses a random number of
// messages.
threads.emplace_back([client_endpoint =
std::move(std::get<0>(connections[i])),
server_endpoint =
@ -268,6 +290,8 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
for (auto& t : threads) {
t.join();
}
server_endpoint.reset();
WaitForSingleOwner(std::move(test_ee));
}
// TODO(vigneshbabu): Add more tests which create listeners bound to a mix

Loading…
Cancel
Save