diff --git a/test/core/event_engine/test_suite/client_test.cc b/test/core/event_engine/test_suite/client_test.cc index 4553a6f5158..34ac4f05570 100644 --- a/test/core/event_engine/test_suite/client_test.cc +++ b/test/core/event_engine/test_suite/client_test.cc @@ -30,12 +30,15 @@ #include #include +#include #include +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/gprpp/notification.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/resource_quota/memory_quota.h" +#include "src/core/lib/resource_quota/resource_quota.h" #include "test/core/event_engine/test_suite/event_engine_test.h" #include "test/core/event_engine/test_suite/event_engine_test_utils.h" #include "test/core/util/port.h" @@ -52,6 +55,7 @@ using ::grpc_event_engine::experimental::URIToResolvedAddress; using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint; using Listener = ::grpc_event_engine::experimental::EventEngine::Listener; using ::grpc_event_engine::experimental::GetNextSendMessage; +using ::grpc_event_engine::experimental::WaitForSingleOwner; constexpr int kNumExchangedMessages = 100; @@ -61,7 +65,7 @@ constexpr int kNumExchangedMessages = 100; // and verify that the connection fails. TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest) { grpc_core::ExecCtx ctx; - auto test_ee = this->NewEventEngine(); + std::shared_ptr test_ee(this->NewEventEngine()); grpc_core::Notification signal; auto memory_quota = std::make_unique("bar"); std::string target_addr = absl::StrCat( @@ -78,16 +82,18 @@ TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest) { URIToResolvedAddress(target_addr), config, memory_quota->CreateMemoryAllocator("conn-1"), 24h); signal.WaitForNotification(); + WaitForSingleOwner(std::move(test_ee)); } // Create a connection using the test EventEngine to a listener created // by the oracle EventEngine and exchange bi-di data over the connection. // For each data transfer, verify that data written at one end of the stream // equals data read at the other end of the stream. + TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) { grpc_core::ExecCtx ctx; auto oracle_ee = this->NewOracleEventEngine(); - auto test_ee = this->NewEventEngine(); + std::shared_ptr test_ee(this->NewEventEngine()); auto memory_quota = std::make_unique("bar"); std::string target_addr = absl::StrCat( "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); @@ -104,7 +110,10 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) { server_signal.Notify(); }; - ChannelArgsEndpointConfig config; + grpc_core::ChannelArgs args; + auto quota = grpc_core::ResourceQuota::Default(); + args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); + ChannelArgsEndpointConfig config(args); auto status = oracle_ee->CreateListener( std::move(accept_cb), [](absl::Status status) { GPR_ASSERT(status.ok()); }, config, @@ -132,10 +141,11 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) { client_signal.WaitForNotification(); server_signal.WaitForNotification(); - EXPECT_TRUE(client_endpoint != nullptr); - EXPECT_TRUE(server_endpoint != nullptr); + EXPECT_NE(client_endpoint.get(), nullptr); + EXPECT_NE(server_endpoint.get(), nullptr); - // Alternate message exchanges between client -- server and server -- client. + // Alternate message exchanges between client -- server and server -- + // client. for (int i = 0; i < kNumExchangedMessages; i++) { // Send from client to server and verify data read at the server. EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint.get(), @@ -147,6 +157,9 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) { client_endpoint.get()) .ok()); } + client_endpoint.reset(); + server_endpoint.reset(); + WaitForSingleOwner(std::move(test_ee)); } // Create 1 listener bound to N IPv6 addresses and M connections where M > N and @@ -156,7 +169,7 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { static constexpr int kNumListenerAddresses = 10; // N static constexpr int kNumConnections = 10; // M auto oracle_ee = this->NewOracleEventEngine(); - auto test_ee = this->NewEventEngine(); + std::shared_ptr test_ee(this->NewEventEngine()); auto memory_quota = std::make_unique("bar"); std::unique_ptr server_endpoint; // Notifications can only be fired once, so they are newed every loop @@ -172,7 +185,10 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { server_endpoint = std::move(ep); server_signal->Notify(); }; - ChannelArgsEndpointConfig config; + grpc_core::ChannelArgs args; + auto quota = grpc_core::ResourceQuota::Default(); + args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); + ChannelArgsEndpointConfig config(args); auto status = oracle_ee->CreateListener( std::move(accept_cb), [](absl::Status status) { GPR_ASSERT(status.ok()); }, config, @@ -195,7 +211,10 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { // Create a test EventEngine client endpoint and connect to a one of the // addresses bound to the oracle listener. Verify that the connection // succeeds. - ChannelArgsEndpointConfig config; + grpc_core::ChannelArgs client_args; + auto client_quota = grpc_core::ResourceQuota::Default(); + client_args = client_args.Set(GRPC_ARG_RESOURCE_QUOTA, client_quota); + ChannelArgsEndpointConfig client_config(client_args); test_ee->Connect( [&client_endpoint, &client_signal](absl::StatusOr> status) { @@ -208,15 +227,16 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { } client_signal.Notify(); }, - URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]), config, + URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]), + client_config, memory_quota->CreateMemoryAllocator( absl::StrCat("conn-", std::to_string(i))), 24h); client_signal.WaitForNotification(); server_signal->WaitForNotification(); - EXPECT_TRUE(client_endpoint != nullptr); - EXPECT_TRUE(server_endpoint != nullptr); + EXPECT_NE(client_endpoint.get(), nullptr); + EXPECT_NE(server_endpoint.get(), nullptr); connections.push_back(std::make_tuple(std::move(client_endpoint), std::move(server_endpoint))); delete server_signal; @@ -226,12 +246,14 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { std::vector threads; // Create one thread for each connection. For each connection, create - // 2 more worker threads: to exchange and verify bi-directional data transfer. + // 2 more worker threads: to exchange and verify bi-directional data + // transfer. threads.reserve(kNumConnections); for (int i = 0; i < kNumConnections; i++) { // For each connection, simulate a parallel bi-directional data transfer. - // All bi-directional transfers are run in parallel across all connections. - // Each bi-directional data transfer uses a random number of messages. + // All bi-directional transfers are run in parallel across all + // connections. Each bi-directional data transfer uses a random number of + // messages. threads.emplace_back([client_endpoint = std::move(std::get<0>(connections[i])), server_endpoint = @@ -268,6 +290,8 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { for (auto& t : threads) { t.join(); } + server_endpoint.reset(); + WaitForSingleOwner(std::move(test_ee)); } // TODO(vigneshbabu): Add more tests which create listeners bound to a mix