diff --git a/BUILD b/BUILD index d07df3ee332..27619bd7e7e 100644 --- a/BUILD +++ b/BUILD @@ -2557,6 +2557,7 @@ grpc_cc_library( "src/core/lib/debug/stats.h", "src/core/lib/debug/stats_data.h", "src/core/lib/event_engine/channel_args_endpoint_config.h", + "src/core/lib/event_engine/promise.h", "src/core/lib/iomgr/block_annotate.h", "src/core/lib/iomgr/buffer_list.h", "src/core/lib/iomgr/call_combiner.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index dedbe0981e5..5f939e1944e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1093,6 +1093,9 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx mock_test) add_dependencies(buildtests_cxx nonblocking_test) add_dependencies(buildtests_cxx observable_test) + if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + add_dependencies(buildtests_cxx oracle_event_engine_posix_test) + endif() add_dependencies(buildtests_cxx orca_service_end2end_test) add_dependencies(buildtests_cxx orphanable_test) add_dependencies(buildtests_cxx out_of_bounds_bad_client_test) @@ -10852,6 +10855,7 @@ add_executable(fuzzing_event_engine_test ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc test/core/event_engine/test_suite/event_engine_test.cc + test/core/event_engine/test_suite/event_engine_test_utils.cc test/core/event_engine/test_suite/fuzzing_event_engine_test.cc test/core/event_engine/test_suite/timer_test.cc third_party/googletest/googletest/src/gtest-all.cc @@ -12653,6 +12657,7 @@ if(gRPC_BUILD_TESTS) add_executable(iomgr_event_engine_test test/core/event_engine/test_suite/event_engine_test.cc + test/core/event_engine/test_suite/event_engine_test_utils.cc test/core/event_engine/test_suite/iomgr_event_engine_test.cc test/core/event_engine/test_suite/timer_test.cc third_party/googletest/googletest/src/gtest-all.cc @@ -13583,6 +13588,47 @@ target_link_libraries(observable_test ) +endif() +if(gRPC_BUILD_TESTS) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + + add_executable(oracle_event_engine_posix_test + test/core/event_engine/test_suite/client_test.cc + test/core/event_engine/test_suite/event_engine_test.cc + test/core/event_engine/test_suite/event_engine_test_utils.cc + test/core/event_engine/test_suite/oracle_event_engine_posix.cc + test/core/event_engine/test_suite/oracle_event_engine_posix_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc + ) + + target_include_directories(oracle_event_engine_posix_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} + ) + + target_link_libraries(oracle_event_engine_posix_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + ) + + +endif() endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 1c6fb7a829c..c381e217d24 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -745,6 +745,7 @@ libs: - src/core/lib/event_engine/event_engine_factory.h - src/core/lib/event_engine/handle_containers.h - src/core/lib/event_engine/iomgr_engine.h + - src/core/lib/event_engine/promise.h - src/core/lib/event_engine/trace.h - src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/bitset.h @@ -1955,6 +1956,7 @@ libs: - src/core/lib/event_engine/event_engine_factory.h - src/core/lib/event_engine/handle_containers.h - src/core/lib/event_engine/iomgr_engine.h + - src/core/lib/event_engine/promise.h - src/core/lib/event_engine/trace.h - src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/bitset.h @@ -5990,10 +5992,12 @@ targets: headers: - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h - test/core/event_engine/test_suite/event_engine_test.h + - test/core/event_engine/test_suite/event_engine_test_utils.h src: - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc - test/core/event_engine/test_suite/event_engine_test.cc + - test/core/event_engine/test_suite/event_engine_test_utils.cc - test/core/event_engine/test_suite/fuzzing_event_engine_test.cc - test/core/event_engine/test_suite/timer_test.cc deps: @@ -6550,8 +6554,10 @@ targets: language: c++ headers: - test/core/event_engine/test_suite/event_engine_test.h + - test/core/event_engine/test_suite/event_engine_test_utils.h src: - test/core/event_engine/test_suite/event_engine_test.cc + - test/core/event_engine/test_suite/event_engine_test_utils.cc - test/core/event_engine/test_suite/iomgr_event_engine_test.cc - test/core/event_engine/test_suite/timer_test.cc deps: @@ -7069,6 +7075,27 @@ targets: - absl/types:variant - absl/utility:utility uses_polling: false +- name: oracle_event_engine_posix_test + gtest: true + build: test + language: c++ + headers: + - test/core/event_engine/test_suite/event_engine_test.h + - test/core/event_engine/test_suite/event_engine_test_utils.h + - test/core/event_engine/test_suite/oracle_event_engine_posix.h + src: + - test/core/event_engine/test_suite/client_test.cc + - test/core/event_engine/test_suite/event_engine_test.cc + - test/core/event_engine/test_suite/event_engine_test_utils.cc + - test/core/event_engine/test_suite/oracle_event_engine_posix.cc + - test/core/event_engine/test_suite/oracle_event_engine_posix_test.cc + deps: + - grpc_test_util + platforms: + - linux + - posix + - mac + uses_polling: false - name: orca_service_end2end_test gtest: true build: test diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 73ca24739b2..4464db64ff1 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -676,6 +676,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/event_engine_factory.h', 'src/core/lib/event_engine/handle_containers.h', 'src/core/lib/event_engine/iomgr_engine.h', + 'src/core/lib/event_engine/promise.h', 'src/core/lib/event_engine/trace.h', 'src/core/lib/gpr/alloc.h', 'src/core/lib/gpr/env.h', @@ -1497,6 +1498,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/event_engine_factory.h', 'src/core/lib/event_engine/handle_containers.h', 'src/core/lib/event_engine/iomgr_engine.h', + 'src/core/lib/event_engine/promise.h', 'src/core/lib/event_engine/trace.h', 'src/core/lib/gpr/alloc.h', 'src/core/lib/gpr/env.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 8c3264b8d8f..c397402e92c 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1043,6 +1043,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/iomgr_engine.cc', 'src/core/lib/event_engine/iomgr_engine.h', 'src/core/lib/event_engine/memory_allocator.cc', + 'src/core/lib/event_engine/promise.h', 'src/core/lib/event_engine/resolved_address.cc', 'src/core/lib/event_engine/slice.cc', 'src/core/lib/event_engine/slice_buffer.cc', @@ -2099,6 +2100,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/event_engine_factory.h', 'src/core/lib/event_engine/handle_containers.h', 'src/core/lib/event_engine/iomgr_engine.h', + 'src/core/lib/event_engine/promise.h', 'src/core/lib/event_engine/trace.h', 'src/core/lib/gpr/alloc.h', 'src/core/lib/gpr/env.h', diff --git a/grpc.gemspec b/grpc.gemspec index 07028ed1399..2245d7441a1 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -956,6 +956,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/event_engine/iomgr_engine.cc ) s.files += %w( src/core/lib/event_engine/iomgr_engine.h ) s.files += %w( src/core/lib/event_engine/memory_allocator.cc ) + s.files += %w( src/core/lib/event_engine/promise.h ) s.files += %w( src/core/lib/event_engine/resolved_address.cc ) s.files += %w( src/core/lib/event_engine/slice.cc ) s.files += %w( src/core/lib/event_engine/slice_buffer.cc ) diff --git a/package.xml b/package.xml index 1f1c46b56fa..9621b439e30 100644 --- a/package.xml +++ b/package.xml @@ -938,6 +938,7 @@ + diff --git a/src/core/lib/event_engine/promise.h b/src/core/lib/event_engine/promise.h new file mode 100644 index 00000000000..9891d3bba4d --- /dev/null +++ b/src/core/lib/event_engine/promise.h @@ -0,0 +1,69 @@ +// Copyright 2021 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_PROMISE_H +#define GRPC_CORE_LIB_EVENT_ENGINE_PROMISE_H +#include + +#include + +#include "src/core/lib/gprpp/sync.h" + +namespace grpc_event_engine { +namespace experimental { + +/// A minimal promise implementation. +/// +/// This is light-duty, syntactical sugar around cv wait & signal, which is +/// useful in some cases. A more robust implementation is being worked on +/// separately. +template +class Promise { + public: + // The getter will wait until the setter has been called, and will return the + // value passed during Set. + T& Get() { + grpc_core::MutexLock lock(&mu_); + if (!set_) { + cv_.Wait(&mu_); + } + return val_; + } + // This setter can only be called exactly once without a Reset. + // Will automatically unblock getters. + void Set(T&& val) { + grpc_core::MutexLock lock(&mu_); + GPR_ASSERT(!set_); + val_ = std::move(val); + set_ = true; + cv_.SignalAll(); + } + + // Can only be called after a set operation. + void Reset() { + grpc_core::MutexLock lock(&mu_); + GPR_ASSERT(set_); + set_ = false; + } + + private: + grpc_core::Mutex mu_; + grpc_core::CondVar cv_; + T val_; + bool set_ = false; +}; + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GRPC_CORE_LIB_EVENT_ENGINE_PROMISE_H diff --git a/test/core/event_engine/test_suite/BUILD b/test/core/event_engine/test_suite/BUILD index 6b5f47ac0de..60db8b7a321 100644 --- a/test/core/event_engine/test_suite/BUILD +++ b/test/core/event_engine/test_suite/BUILD @@ -18,7 +18,10 @@ licenses(["notice"]) grpc_package(name = "test/core/event_engine/test_suite") -COMMON_HEADERS = ["event_engine_test.h"] +COMMON_HEADERS = [ + "event_engine_test.h", + "event_engine_test_utils.h", +] grpc_cc_library( name = "timer", @@ -90,10 +93,46 @@ grpc_cc_test( # -- Internal targets -- +grpc_cc_library( + name = "oracle_event_engine_posix", + testonly = True, + srcs = ["oracle_event_engine_posix.cc"], + hdrs = ["oracle_event_engine_posix.h"], + tags = [ + "no_windows", + ], + deps = [ + ":conformance_test_base_lib", + "//:grpc", + "//test/core/util:grpc_test_util", + ], +) + +grpc_cc_test( + name = "oracle_event_engine_posix_test", + srcs = ["oracle_event_engine_posix_test.cc"], + external_deps = ["gtest"], + language = "C++", + tags = [ + "no_test_ios", + "no_windows", + ], + uses_polling = False, + deps = [ + ":client", + ":oracle_event_engine_posix", + "//:grpc", + "//test/core/util:grpc_test_util", + ], +) + grpc_cc_library( name = "conformance_test_base_lib", testonly = True, - srcs = ["event_engine_test.cc"], + srcs = [ + "event_engine_test.cc", + "event_engine_test_utils.cc", + ], hdrs = COMMON_HEADERS, external_deps = ["gtest"], deps = [ diff --git a/test/core/event_engine/test_suite/client_test.cc b/test/core/event_engine/test_suite/client_test.cc index f184c317e44..126a5d72c92 100644 --- a/test/core/event_engine/test_suite/client_test.cc +++ b/test/core/event_engine/test_suite/client_test.cc @@ -12,10 +12,272 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include +#include + +#include "absl/status/status.h" +#include "absl/strings/str_cat.h" + +#include +#include +#include + +#include "src/core/lib/address_utils/parse_address.h" +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" +#include "src/core/lib/event_engine/promise.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/uri/uri_parser.h" #include "test/core/event_engine/test_suite/event_engine_test.h" +#include "test/core/event_engine/test_suite/event_engine_test_utils.h" class EventEngineClientTest : public EventEngineTest {}; -// TODO(hork): establish meaningful tests -TEST_F(EventEngineClientTest, TODO) { grpc_core::ExecCtx exec_ctx; } +namespace { + +using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig; +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::Promise; +using ::grpc_event_engine::experimental::URIToResolvedAddress; +using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint; +using Listener = ::grpc_event_engine::experimental::EventEngine::Listener; +using namespace std::chrono_literals; + +constexpr int kMinMessageSize = 1024; +constexpr int kMaxMessageSize = 4096; +constexpr int kNumExchangedMessages = 100; + +// Returns a random message with bounded length. +std::string GetNextSendMessage() { + static const char alphanum[] = + "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + static std::random_device rd; + static std::seed_seq seed{rd()}; + static std::mt19937 gen(seed); + static std::uniform_real_distribution<> dis(kMinMessageSize, kMaxMessageSize); + static grpc_core::Mutex g_mu; + std::string tmp_s; + int len; + { + grpc_core::MutexLock lock(&g_mu); + len = dis(gen); + } + tmp_s.reserve(len); + for (int i = 0; i < len; ++i) { + tmp_s += alphanum[rand() % (sizeof(alphanum) - 1)]; + } + return tmp_s; +} + +} // namespace + +// Create a connection using the test EventEngine to a non-existent listener +// and verify that the connection fails. +TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest) { + grpc_core::ExecCtx ctx; + auto test_ee = this->NewEventEngine(); + Promise> client_endpoint_promise; + auto memory_quota = std::make_unique("bar"); + // Create a test EventEngine client endpoint and connect to a non existent + // listener. + test_ee->Connect( + [&client_endpoint_promise]( + absl::StatusOr> status) { + // Connect should fail. + EXPECT_FALSE(status.ok()); + client_endpoint_promise.Set(nullptr); + }, + URIToResolvedAddress("ipv6:[::1]:7000"), + ChannelArgsEndpointConfig(nullptr), + memory_quota->CreateMemoryAllocator("conn-1"), 24h); + + auto client_endpoint = std::move(client_endpoint_promise.Get()); + EXPECT_EQ(client_endpoint, nullptr); +} + +// Create a connection using the test EventEngine to a listener created +// by the oracle EventEngine and exchange bi-di data over the connection. +// For each data transfer, verify that data written at one end of the stream +// equals data read at the other end of the stream. +TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) { + grpc_core::ExecCtx ctx; + auto oracle_ee = this->NewOracleEventEngine(); + auto test_ee = this->NewEventEngine(); + auto memory_quota = std::make_unique("bar"); + std::string target_addr = "ipv6:[::1]:7000"; + Promise> client_endpoint_promise; + Promise> server_endpoint_promise; + + Listener::AcceptCallback accept_cb = + [&server_endpoint_promise]( + std::unique_ptr ep, + grpc_core::MemoryAllocator /*memory_allocator*/) { + server_endpoint_promise.Set(std::move(ep)); + }; + + auto status = oracle_ee->CreateListener( + std::move(accept_cb), + [](absl::Status status) { GPR_ASSERT(status.ok()); }, + ChannelArgsEndpointConfig(nullptr), + std::make_unique("foo")); + EXPECT_TRUE(status.ok()); + + std::unique_ptr listener = std::move(*status); + EXPECT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); + EXPECT_TRUE(listener->Start().ok()); + + test_ee->Connect( + [&client_endpoint_promise]( + absl::StatusOr> status) { + if (!status.ok()) { + gpr_log(GPR_ERROR, "Connect failed: %s", + status.status().ToString().c_str()); + client_endpoint_promise.Set(nullptr); + } else { + client_endpoint_promise.Set(std::move(*status)); + } + }, + URIToResolvedAddress(target_addr), ChannelArgsEndpointConfig(nullptr), + memory_quota->CreateMemoryAllocator("conn-1"), 24h); + + auto client_endpoint = std::move(client_endpoint_promise.Get()); + auto server_endpoint = std::move(server_endpoint_promise.Get()); + EXPECT_TRUE(client_endpoint != nullptr); + EXPECT_TRUE(server_endpoint != nullptr); + + // Alternate message exchanges between client -- server and server -- client. + for (int i = 0; i < kNumExchangedMessages; i++) { + // Send from client to server and verify data read at the server. + EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint.get(), + server_endpoint.get()) + .ok()); + + // Send from server to client and verify data read at the client. + EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), server_endpoint.get(), + client_endpoint.get()) + .ok()); + } +} + +// Create 1 listener bound to N IPv6 addresses and M connections where M > N and +// exchange and verify random number of messages over each connection. +TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { + grpc_core::ExecCtx ctx; + static constexpr int kStartPortNumber = 7000; + static constexpr int kNumListenerAddresses = 10; // N + static constexpr int kNumConnections = 100; // M + auto oracle_ee = this->NewOracleEventEngine(); + auto test_ee = this->NewEventEngine(); + auto memory_quota = std::make_unique("bar"); + Promise> client_endpoint_promise; + Promise> server_endpoint_promise; + std::vector target_addrs; + std::vector, std::unique_ptr>> + connections; + + Listener::AcceptCallback accept_cb = + [&server_endpoint_promise]( + std::unique_ptr ep, + grpc_core::MemoryAllocator /*memory_allocator*/) { + server_endpoint_promise.Set(std::move(ep)); + }; + auto status = oracle_ee->CreateListener( + std::move(accept_cb), + [](absl::Status status) { GPR_ASSERT(status.ok()); }, + ChannelArgsEndpointConfig(nullptr), + std::make_unique("foo")); + EXPECT_TRUE(status.ok()); + std::unique_ptr listener = std::move(*status); + + target_addrs.reserve(kNumListenerAddresses); + for (int i = 0; i < kNumListenerAddresses; i++) { + std::string target_addr = + absl::StrCat("ipv6:[::1]:", std::to_string(kStartPortNumber + i)); + EXPECT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); + target_addrs.push_back(target_addr); + } + EXPECT_TRUE(listener->Start().ok()); + absl::SleepFor(absl::Milliseconds(500)); + for (int i = 0; i < kNumConnections; i++) { + // Create a test EventEngine client endpoint and connect to a one of the + // addresses bound to the oracle listener. Verify that the connection + // succeeds. + test_ee->Connect( + [&client_endpoint_promise]( + absl::StatusOr> status) { + if (!status.ok()) { + gpr_log(GPR_ERROR, "Connect failed: %s", + status.status().ToString().c_str()); + client_endpoint_promise.Set(nullptr); + } else { + client_endpoint_promise.Set(std::move(*status)); + } + }, + URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]), + ChannelArgsEndpointConfig(nullptr), + memory_quota->CreateMemoryAllocator( + absl::StrCat("conn-", std::to_string(i))), + 24h); + + auto client_endpoint = std::move(client_endpoint_promise.Get()); + auto server_endpoint = std::move(server_endpoint_promise.Get()); + EXPECT_TRUE(client_endpoint != nullptr); + EXPECT_TRUE(server_endpoint != nullptr); + connections.push_back(std::make_tuple(std::move(client_endpoint), + std::move(server_endpoint))); + client_endpoint_promise.Reset(); + server_endpoint_promise.Reset(); + } + + std::vector threads; + // Create one thread for each connection. For each connection, create + // 2 more worker threads: to exchange and verify bi-directional data transfer. + threads.reserve(kNumConnections); + for (int i = 0; i < kNumConnections; i++) { + // For each connection, simulate a parallel bi-directional data transfer. + // All bi-directional transfers are run in parallel across all connections. + // Each bi-directional data transfer uses a random number of messages. + threads.emplace_back([client_endpoint = + std::move(std::get<0>(connections[i])), + server_endpoint = + std::move(std::get<1>(connections[i]))]() { + std::vector workers; + workers.reserve(2); + auto worker = [client_endpoint = client_endpoint.get(), + server_endpoint = + server_endpoint.get()](bool client_to_server) { + grpc_core::ExecCtx ctx; + for (int i = 0; i < kNumExchangedMessages; i++) { + // If client_to_server is true, send from client to server and + // verify data read at the server. Otherwise send data from server + // to client and verify data read at client. + if (client_to_server) { + EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), + client_endpoint, server_endpoint) + .ok()); + } else { + EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), + server_endpoint, client_endpoint) + .ok()); + } + } + }; + // worker[0] simulates a flow from client to server endpoint + workers.emplace_back([&worker]() { worker(true); }); + // worker[1] simulates a flow from server to client endpoint + workers.emplace_back([&worker]() { worker(false); }); + workers[0].join(); + workers[1].join(); + }); + } + for (auto& t : threads) { + t.join(); + } +} + +// TODO(vigneshbabu): Add more tests which create listeners bound to a mix +// Ipv6 and other type of addresses (UDS) in the same test. diff --git a/test/core/event_engine/test_suite/event_engine_test.cc b/test/core/event_engine/test_suite/event_engine_test.cc index 5ba038f2683..a3e115d08ef 100644 --- a/test/core/event_engine/test_suite/event_engine_test.cc +++ b/test/core/event_engine/test_suite/event_engine_test.cc @@ -20,9 +20,16 @@ std::function()>* g_ee_factory = nullptr; -void SetEventEngineFactory( +std::function()>* + g_oracle_ee_factory = nullptr; + +void SetEventEngineFactories( + std::function< + std::unique_ptr()> + factory, std::function< std::unique_ptr()> - factory) { - testing::AddGlobalTestEnvironment(new EventEngineTestEnvironment(factory)); + oracle_ee_factory) { + testing::AddGlobalTestEnvironment( + new EventEngineTestEnvironment(factory, oracle_ee_factory)); } diff --git a/test/core/event_engine/test_suite/event_engine_test.h b/test/core/event_engine/test_suite/event_engine_test.h index 743217bb827..27197486704 100644 --- a/test/core/event_engine/test_suite/event_engine_test.h +++ b/test/core/event_engine/test_suite/event_engine_test.h @@ -24,22 +24,37 @@ extern std::function< std::unique_ptr()>* g_ee_factory; +extern std::function< + std::unique_ptr()>* + g_oracle_ee_factory; + // Manages the lifetime of the global EventEngine factory. class EventEngineTestEnvironment : public testing::Environment { public: - explicit EventEngineTestEnvironment( + EventEngineTestEnvironment( std::function< std::unique_ptr()> - factory) - : factory_(factory) {} + factory, + std::function< + std::unique_ptr()> + oracle_factory) + : factory_(factory), oracle_factory_(oracle_factory) {} - void SetUp() override { g_ee_factory = &factory_; } + void SetUp() override { + g_ee_factory = &factory_; + g_oracle_ee_factory = &oracle_factory_; + } - void TearDown() override { g_ee_factory = nullptr; } + void TearDown() override { + g_ee_factory = nullptr; + g_oracle_ee_factory = nullptr; + } private: std::function()> factory_; + std::function()> + oracle_factory_; }; class EventEngineTest : public testing::Test { @@ -49,12 +64,22 @@ class EventEngineTest : public testing::Test { GPR_ASSERT(g_ee_factory != nullptr); return (*g_ee_factory)(); } + + std::unique_ptr + NewOracleEventEngine() { + GPR_ASSERT(g_oracle_ee_factory != nullptr); + return (*g_oracle_ee_factory)(); + } }; -// Set a custom factory for the EventEngine test suite. -void SetEventEngineFactory( +// Set a custom factory for the EventEngine test suite. An optional oracle +// EventEngine can additionally be specified here. +void SetEventEngineFactories( + std::function< + std::unique_ptr()> + ee_factory, std::function< std::unique_ptr()> - factory); + oracle_ee_factory); #endif // GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_EVENT_ENGINE_TEST_H diff --git a/test/core/event_engine/test_suite/event_engine_test_utils.cc b/test/core/event_engine/test_suite/event_engine_test_utils.cc new file mode 100644 index 00000000000..7831495f97e --- /dev/null +++ b/test/core/event_engine/test_suite/event_engine_test_utils.cc @@ -0,0 +1,206 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "test/core/event_engine/test_suite/event_engine_test_utils.h" + +#include +#include +#include +#include + +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/strings/str_cat.h" +#include "absl/synchronization/mutex.h" +#include "absl/time/time.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "src/core/lib/address_utils/parse_address.h" +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" +#include "src/core/lib/resource_quota/memory_quota.h" +#include "src/core/lib/uri/uri_parser.h" + +using ::grpc_event_engine::experimental::EventEngine; +using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint; +using Listener = ::grpc_event_engine::experimental::EventEngine::Listener; + +namespace grpc_event_engine { +namespace experimental { + +EventEngine::ResolvedAddress URIToResolvedAddress(std::string address_str) { + grpc_resolved_address addr; + absl::StatusOr uri = grpc_core::URI::Parse(address_str); + if (!uri.ok()) { + gpr_log(GPR_ERROR, "Failed to parse. Error: %s", + uri.status().ToString().c_str()); + GPR_ASSERT(uri.ok()); + } + GPR_ASSERT(grpc_parse_uri(*uri, &addr)); + return EventEngine::ResolvedAddress( + reinterpret_cast(addr.addr), addr.len); +} + +void AppendStringToSliceBuffer(SliceBuffer* buf, std::string data) { + buf->Append(Slice::FromCopiedString(data)); +} + +std::string ExtractSliceBufferIntoString(SliceBuffer* buf) { + if (!buf->Length()) { + return std::string(); + } + std::string tmp(buf->Length(), '\0'); + char* bytes = const_cast(tmp.c_str()); + grpc_slice_buffer_move_first_into_buffer(buf->RawSliceBuffer(), buf->Length(), + bytes); + return tmp; +} + +absl::Status SendValidatePayload(std::string data, Endpoint* send_endpoint, + Endpoint* receive_endpoint) { + GPR_ASSERT(receive_endpoint != nullptr && send_endpoint != nullptr); + int num_bytes_written = data.size(); + Promise read_promise; + Promise write_promise; + SliceBuffer read_slice_buf; + SliceBuffer write_slice_buf; + + AppendStringToSliceBuffer(&write_slice_buf, data); + EventEngine::Endpoint::ReadArgs args = {num_bytes_written}; + std::function read_cb; + read_cb = [receive_endpoint, &read_slice_buf, &read_cb, &read_promise, + &args](absl::Status status) { + GPR_ASSERT(status.ok()); + if (read_slice_buf.Length() == static_cast(args.read_hint_bytes)) { + read_promise.Set(true); + return; + } + args.read_hint_bytes -= read_slice_buf.Length(); + receive_endpoint->Read(std::move(read_cb), &read_slice_buf, &args); + }; + // Start asynchronous reading at the receive_endpoint. + receive_endpoint->Read(std::move(read_cb), &read_slice_buf, &args); + // Start asynchronous writing at the send_endpoint. + send_endpoint->Write( + [&write_promise](absl::Status status) { + GPR_ASSERT(status.ok()); + write_promise.Set(true); + }, + &write_slice_buf, nullptr); + // Wait for async write to complete. + GPR_ASSERT(write_promise.Get() == true); + // Wait for async read to complete. + GPR_ASSERT(read_promise.Get() == true); + // Check if data written == data read + if (data != ExtractSliceBufferIntoString(&read_slice_buf)) { + return absl::CancelledError("Data read != Data written"); + } + return absl::OkStatus(); +} + +absl::Status ConnectionManager::BindAndStartListener( + std::vector addrs, bool listener_type_oracle) { + grpc_core::MutexLock lock(&mu_); + if (addrs.empty()) { + return absl::InvalidArgumentError( + "Atleast one bind address must be specified"); + } + for (auto& addr : addrs) { + if (listeners_.find(addr) != listeners_.end()) { + // There is already a listener at this address. Return error. + return absl::AlreadyExistsError( + absl::StrCat("Listener already existis for address: ", addr)); + } + } + Listener::AcceptCallback accept_cb = + [this](std::unique_ptr ep, + MemoryAllocator /*memory_allocator*/) { + last_in_progress_connection_.SetServerEndpoint(std::move(ep)); + }; + + EventEngine* event_engine = listener_type_oracle ? oracle_event_engine_.get() + : test_event_engine_.get(); + + auto status = event_engine->CreateListener( + std::move(accept_cb), + [](absl::Status status) { GPR_ASSERT(status.ok()); }, + ChannelArgsEndpointConfig(nullptr), + std::make_unique("foo")); + if (!status.ok()) { + return status.status(); + } + + std::shared_ptr listener((*status).release()); + for (auto& addr : addrs) { + auto bind_status = listener->Bind(URIToResolvedAddress(addr)); + if (!bind_status.ok()) { + gpr_log(GPR_ERROR, "Binding listener failed: %s", + bind_status.status().ToString().c_str()); + return bind_status.status(); + } + } + GPR_ASSERT(listener->Start().ok()); + // Insert same listener pointer for all bind addresses after the listener + // has started successfully. + for (auto& addr : addrs) { + listeners_.insert(std::make_pair(addr, listener)); + } + return absl::OkStatus(); +} + +absl::StatusOr, std::unique_ptr>> +ConnectionManager::CreateConnection(std::string target_addr, + EventEngine::Duration timeout, + bool client_type_oracle) { + // Only allow one CreateConnection call to proceed at a time. + grpc_core::MutexLock lock(&mu_); + std::string conn_name = + absl::StrCat("connection-", std::to_string(num_processed_connections_++)); + EventEngine* event_engine = client_type_oracle ? oracle_event_engine_.get() + : test_event_engine_.get(); + event_engine->Connect( + [this](absl::StatusOr> status) { + if (!status.ok()) { + gpr_log(GPR_ERROR, "Connect failed: %s", + status.status().ToString().c_str()); + last_in_progress_connection_.SetClientEndpoint(nullptr); + } else { + last_in_progress_connection_.SetClientEndpoint(std::move(*status)); + } + }, + URIToResolvedAddress(target_addr), ChannelArgsEndpointConfig(nullptr), + memory_quota_->CreateMemoryAllocator(conn_name), timeout); + + auto client_endpoint = last_in_progress_connection_.GetClientEndpoint(); + if (client_endpoint != nullptr && + listeners_.find(target_addr) != listeners_.end()) { + // There is a listener for the specified address. Wait until it + // creates a ServerEndpoint after accepting the connection. + auto server_endpoint = last_in_progress_connection_.GetServerEndpoint(); + GPR_ASSERT(server_endpoint != nullptr); + // Set last_in_progress_connection_ to nullptr + return std::make_tuple(std::move(client_endpoint), + std::move(server_endpoint)); + } + return absl::CancelledError("Failed to create connection."); +} + +} // namespace experimental +} // namespace grpc_event_engine diff --git a/test/core/event_engine/test_suite/event_engine_test_utils.h b/test/core/event_engine/test_suite/event_engine_test_utils.h new file mode 100644 index 00000000000..00868e7c1d3 --- /dev/null +++ b/test/core/event_engine/test_suite/event_engine_test_utils.h @@ -0,0 +1,126 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_EVENT_ENGINE_TEST_UTILS_H_ +#define GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_EVENT_ENGINE_TEST_UTILS_H_ + +#include +#include +#include +#include +#include + +#include "absl/status/status.h" +#include "absl/status/statusor.h" + +#include +#include + +#include "src/core/lib/event_engine/promise.h" +#include "src/core/lib/resource_quota/memory_quota.h" + +using EventEngineFactory = std::function< + std::unique_ptr()>; + +namespace grpc_event_engine { +namespace experimental { + +void AppendStringToSliceBuffer(SliceBuffer* buf, std::string data); + +std::string ExtractSliceBufferIntoString(SliceBuffer* buf); + +EventEngine::ResolvedAddress URIToResolvedAddress(std::string address_str); + +// A helper method to exchange data between two endpoints. It is assumed that +// both endpoints are connected. The data (specified as a string) is written by +// the sender_endpoint and read by the receiver_endpoint. It returns OK +// status only if data written == data read. It also blocks the calling thread +// until said Write and Read operations are complete. +absl::Status SendValidatePayload(std::string data, + EventEngine::Endpoint* send_endpoint, + EventEngine::Endpoint* receive_endpoint); + +// A helper class to create clients/listeners and connections between them. +// The clients and listeners can be created by the oracle event engine +// or the event engine under test. The class provides handles into the +// connections that are created. Inidividual tests can test expected behavior by +// exchanging arbitrary data over these connections. +class ConnectionManager { + public: + ConnectionManager(std::unique_ptr test_event_engine, + std::unique_ptr oracle_event_engine) + : memory_quota_(std::make_unique("foo")), + test_event_engine_(std::move(test_event_engine)), + oracle_event_engine_(std::move(oracle_event_engine)) {} + ~ConnectionManager() = default; + + // It creates and starts a listener bound to all the specified list of + // addresses. If successful, return OK status. The type of the listener is + // determined by the 2nd argument. + absl::Status BindAndStartListener(std::vector addrs, + bool listener_type_oracle = true); + + // If connection is successful, returns a tuple containing: + // 1. a pointer to the client side endpoint of the connection. + // 2. a pointer to the server side endpoint of the connection. + // If un-successfull it returns a non-OK status containing the error + // encountered. + absl::StatusOr, + std::unique_ptr>> + CreateConnection(std::string target_addr, EventEngine::Duration timeout, + bool client_type_oracle); + + private: + class Connection { + public: + Connection() = default; + ~Connection() = default; + + void SetClientEndpoint( + std::unique_ptr&& client_endpoint) { + client_endpoint_promise_.Set(std::move(client_endpoint)); + } + void SetServerEndpoint( + std::unique_ptr&& server_endpoint) { + server_endpoint_promise_.Set(std::move(server_endpoint)); + } + std::unique_ptr GetClientEndpoint() { + auto client_endpoint = std::move(client_endpoint_promise_.Get()); + client_endpoint_promise_.Reset(); + return client_endpoint; + } + std::unique_ptr GetServerEndpoint() { + auto server_endpoint = std::move(server_endpoint_promise_.Get()); + server_endpoint_promise_.Reset(); + return server_endpoint; + } + + private: + Promise> client_endpoint_promise_; + Promise> server_endpoint_promise_; + }; + + grpc_core::Mutex mu_; + std::unique_ptr memory_quota_; + int num_processed_connections_ = 0; + Connection last_in_progress_connection_; + std::map> listeners_; + std::unique_ptr test_event_engine_; + std::unique_ptr oracle_event_engine_; +}; + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_EVENT_ENGINE_TEST_UTILS_H_ diff --git a/test/core/event_engine/test_suite/fuzzing_event_engine_test.cc b/test/core/event_engine/test_suite/fuzzing_event_engine_test.cc index 7f42579aaec..1db451fff2e 100644 --- a/test/core/event_engine/test_suite/fuzzing_event_engine_test.cc +++ b/test/core/event_engine/test_suite/fuzzing_event_engine_test.cc @@ -61,9 +61,11 @@ class ThreadedFuzzingEventEngine : public FuzzingEventEngine { int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); - SetEventEngineFactory([]() { - return absl::make_unique< - grpc_event_engine::experimental::ThreadedFuzzingEventEngine>(); - }); + SetEventEngineFactories( + []() { + return absl::make_unique< + grpc_event_engine::experimental::ThreadedFuzzingEventEngine>(); + }, + nullptr); return RUN_ALL_TESTS(); } diff --git a/test/core/event_engine/test_suite/iomgr_event_engine_test.cc b/test/core/event_engine/test_suite/iomgr_event_engine_test.cc index 387ac5b5036..ba0c30ecced 100644 --- a/test/core/event_engine/test_suite/iomgr_event_engine_test.cc +++ b/test/core/event_engine/test_suite/iomgr_event_engine_test.cc @@ -20,10 +20,12 @@ int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); grpc::testing::TestEnvironment env(&argc, argv); - SetEventEngineFactory([]() { - return absl::make_unique< - grpc_event_engine::experimental::IomgrEventEngine>(); - }); + SetEventEngineFactories( + []() { + return absl::make_unique< + grpc_event_engine::experimental::IomgrEventEngine>(); + }, + nullptr); grpc_init(); auto result = RUN_ALL_TESTS(); grpc_shutdown(); diff --git a/test/core/event_engine/test_suite/oracle_event_engine_posix.cc b/test/core/event_engine/test_suite/oracle_event_engine_posix.cc new file mode 100644 index 00000000000..9c5fd519eb9 --- /dev/null +++ b/test/core/event_engine/test_suite/oracle_event_engine_posix.cc @@ -0,0 +1,473 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "test/core/event_engine/test_suite/oracle_event_engine_posix.h" + +#include +#include +#include + +#include +#include +#include +#include + +#include "absl/status/status.h" +#include "absl/strings/str_cat.h" +#include "absl/synchronization/mutex.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" + +#include +#include +#include + +#include "src/core/lib/address_utils/sockaddr_utils.h" +#include "src/core/lib/iomgr/resolved_address.h" + +namespace grpc_event_engine { +namespace experimental { + +namespace { + +const char* kStopMessage = "STOP"; + +grpc_resolved_address CreateGRPCResolvedAddress( + const EventEngine::ResolvedAddress& ra) { + grpc_resolved_address grpc_addr; + memcpy(grpc_addr.addr, ra.address(), ra.size()); + grpc_addr.len = ra.size(); + return grpc_addr; +} + +// Blocks until poll(2) indicates that one of the fds has pending I/O +// the deadline is reached whichever comes first. Returns an OK +// status a valid I/O event is available for atleast one of the fds, a Status +// with canonical code DEADLINE_EXCEEDED if the deadline expired and a non-OK +// Status if any other error occurred. +absl::Status PollFds(struct pollfd* pfds, int nfds, absl::Duration timeout) { + int rv; + while (true) { + if (timeout != absl::InfiniteDuration()) { + rv = poll(pfds, nfds, + static_cast(absl::ToInt64Milliseconds(timeout))); + } else { + rv = poll(pfds, nfds, /* timeout = */ -1); + } + const int saved_errno = errno; + errno = saved_errno; + if (rv >= 0 || errno != EINTR) { + break; + } + } + if (rv < 0) { + return absl::UnknownError(std::strerror(errno)); + } + if (rv == 0) { + return absl::CancelledError("Deadline exceeded"); + } + return absl::OkStatus(); +} + +absl::Status BlockUntilReadable(int fd) { + struct pollfd pfd; + pfd.fd = fd; + pfd.events = POLLIN; + pfd.revents = 0; + return PollFds(&pfd, 1, absl::InfiniteDuration()); +} + +absl::Status BlockUntilWritableWithTimeout(int fd, absl::Duration timeout) { + struct pollfd pfd; + pfd.fd = fd; + pfd.events = POLLOUT; + pfd.revents = 0; + return PollFds(&pfd, 1, timeout); +} + +absl::Status BlockUntilWritable(int fd) { + return BlockUntilWritableWithTimeout(fd, absl::InfiniteDuration()); +} + +// Tries to read upto num_expected_bytes from the socket. It returns early if +// specified data is not yet available. +std::string TryReadBytes(int sockfd, int& saved_errno, int num_expected_bytes) { + int ret = 0; + static constexpr int kDefaultNumExpectedBytes = 1024; + if (num_expected_bytes <= 0) { + num_expected_bytes = kDefaultNumExpectedBytes; + } + std::string read_data = std::string(num_expected_bytes, '\0'); + char* buffer = const_cast(read_data.c_str()); + int pending_bytes = num_expected_bytes; + do { + errno = 0; + ret = read(sockfd, buffer + num_expected_bytes - pending_bytes, + pending_bytes); + if (ret > 0) { + pending_bytes -= ret; + } + } while (pending_bytes > 0 && ((ret > 0) || (ret < 0 && errno == EINTR))); + saved_errno = errno; + return read_data.substr(0, num_expected_bytes - pending_bytes); +} + +// Blocks calling thread until the specified number of bytes have been +// read from the provided socket or it encounters an unrecoverable error. It +// puts the read bytes into a string and returns the string. If it encounters an +// error, it returns an empty string and updates saved_errno with the +// appropriate errno. +std::string ReadBytes(int sockfd, int& saved_errno, int num_expected_bytes) { + std::string read_data; + do { + saved_errno = 0; + read_data += TryReadBytes(sockfd, saved_errno, + num_expected_bytes - read_data.length()); + if (saved_errno == EAGAIN && + read_data.length() < static_cast(num_expected_bytes)) { + GPR_ASSERT(BlockUntilReadable(sockfd).ok()); + } else if (saved_errno != 0 && num_expected_bytes > 0) { + read_data.clear(); + break; + } + } while (read_data.length() < static_cast(num_expected_bytes)); + return read_data; +} + +// Tries to write the specified bytes over the socket. It returns the number of +// bytes actually written. +int TryWriteBytes(int sockfd, int& saved_errno, std::string write_bytes) { + int ret = 0; + int pending_bytes = write_bytes.length(); + do { + errno = 0; + ret = write(sockfd, + write_bytes.c_str() + write_bytes.length() - pending_bytes, + pending_bytes); + if (ret > 0) { + pending_bytes -= ret; + } + } while (pending_bytes > 0 && ((ret > 0) || (ret < 0 && errno == EINTR))); + saved_errno = errno; + return write_bytes.length() - pending_bytes; +} + +// Blocks calling thread until the specified number of bytes have been +// written over the provided socket or it encounters an unrecoverable error. The +// bytes to write are specified as a string. If it encounters an error, it +// returns an empty string and updates saved_errno with the appropriate errno +// and returns a value less than zero. +int WriteBytes(int sockfd, int& saved_errno, std::string write_bytes) { + int ret = 0; + int original_write_length = write_bytes.length(); + do { + saved_errno = 0; + ret = TryWriteBytes(sockfd, saved_errno, write_bytes); + if (saved_errno == EAGAIN && ret < static_cast(write_bytes.length())) { + GPR_ASSERT(ret >= 0); + GPR_ASSERT(BlockUntilWritable(sockfd).ok()); + } else if (saved_errno != 0) { + GPR_ASSERT(ret < 0); + return ret; + } + write_bytes = write_bytes.substr(ret, std::string::npos); + } while (write_bytes.length() > 0); + return original_write_length; +} +} // namespace + +PosixOracleEndpoint::PosixOracleEndpoint(int socket_fd) + : socket_fd_(socket_fd) { + read_ops_ = grpc_core::Thread( + "read_ops_thread", + [](void* arg) { + static_cast(arg)->ProcessReadOperations(); + }, + this); + write_ops_ = grpc_core::Thread( + "write_ops_thread", + [](void* arg) { + static_cast(arg)->ProcessWriteOperations(); + }, + this); + read_ops_.Start(); + write_ops_.Start(); +} + +void PosixOracleEndpoint::Shutdown() { + absl::MutexLock lock(&mu_); + if (absl::exchange(is_shutdown_, true)) { + return; + } + read_ops_channel_.Set(ReadOperation()); + write_ops_channel_.Set(WriteOperation()); + read_ops_.Join(); + write_ops_.Join(); +} + +std::unique_ptr PosixOracleEndpoint::Create( + int socket_fd) { + return std::make_unique(socket_fd); +} + +PosixOracleEndpoint::~PosixOracleEndpoint() { + Shutdown(); + close(socket_fd_); +} + +void PosixOracleEndpoint::Read(std::function on_read, + SliceBuffer* buffer, const ReadArgs* args) { + GPR_ASSERT(buffer != nullptr); + int read_hint_bytes = + args != nullptr ? std::max(1, static_cast(args->read_hint_bytes)) + : 0; + read_ops_channel_.Set( + ReadOperation(read_hint_bytes, buffer, std::move(on_read))); +} + +void PosixOracleEndpoint::Write(std::function on_writable, + SliceBuffer* data, const WriteArgs* /*args*/) { + GPR_ASSERT(data != nullptr); + write_ops_channel_.Set(WriteOperation(data, std::move(on_writable))); +} + +void PosixOracleEndpoint::ProcessReadOperations() { + gpr_log(GPR_INFO, "Starting thread to process read ops ..."); + while (true) { + ReadOperation read_op; + read_op = read_ops_channel_.Get(); + read_ops_channel_.Reset(); + if (!read_op.IsValid()) { + read_op(std::string(), absl::CancelledError("Closed")); + break; + } + int saved_errno; + std::string read_data = + ReadBytes(socket_fd_, saved_errno, read_op.GetNumBytesToRead()); + read_op(read_data, read_data.empty() ? absl::CancelledError(absl::StrCat( + "Read failed with error = ", + std::strerror(saved_errno))) + : absl::OkStatus()); + } + gpr_log(GPR_INFO, "Shutting down read ops thread ..."); +} + +void PosixOracleEndpoint::ProcessWriteOperations() { + gpr_log(GPR_INFO, "Starting thread to process write ops ..."); + while (true) { + WriteOperation write_op; + write_op = write_ops_channel_.Get(); + write_ops_channel_.Reset(); + if (!write_op.IsValid()) { + write_op(absl::CancelledError("Closed")); + break; + } + int saved_errno; + int ret = WriteBytes(socket_fd_, saved_errno, write_op.GetBytesToWrite()); + write_op( + ret < 0 ? absl::CancelledError(absl::StrCat( + "Write failed with error = ", std::strerror(saved_errno))) + : absl::OkStatus()); + } + gpr_log(GPR_INFO, "Shutting down write ops thread ..."); +} + +PosixOracleListener::PosixOracleListener( + EventEngine::Listener::AcceptCallback on_accept, + std::function on_shutdown, + std::unique_ptr memory_allocator_factory) + : on_accept_(std::move(on_accept)), + on_shutdown_(std::move(on_shutdown)), + memory_allocator_factory_(std::move(memory_allocator_factory)) { + if (pipe(pipefd_) == -1) { + gpr_log(GPR_ERROR, "Error creating pipe: %s", std::strerror(errno)); + abort(); + } +} + +absl::Status PosixOracleListener::Start() { + absl::MutexLock lock(&mu_); + GPR_ASSERT(!listener_fds_.empty()); + if (absl::exchange(is_started_, true)) { + return absl::InternalError("Cannot start listener more than once ..."); + } + serve_ = grpc_core::Thread( + "accept_thread", + [](void* arg) { + static_cast(arg)->HandleIncomingConnections(); + }, + this); + serve_.Start(); + return absl::OkStatus(); +} + +PosixOracleListener::~PosixOracleListener() { + absl::MutexLock lock(&mu_); + if (!is_started_) { + serve_.Join(); + return; + } + for (int i = 0; i < static_cast(listener_fds_.size()); i++) { + shutdown(listener_fds_[i], SHUT_RDWR); + } + // Send a STOP message over the pipe. + write(pipefd_[1], kStopMessage, strlen(kStopMessage)); + serve_.Join(); + on_shutdown_(absl::OkStatus()); +} + +void PosixOracleListener::HandleIncomingConnections() { + gpr_log(GPR_INFO, "Starting accept thread ..."); + GPR_ASSERT(!listener_fds_.empty()); + int nfds = listener_fds_.size(); + // Add one extra file descriptor to poll the pipe fd. + ++nfds; + struct pollfd* pfds = + static_cast(gpr_malloc(sizeof(struct pollfd) * nfds)); + memset(pfds, 0, sizeof(struct pollfd) * nfds); + while (true) { + for (int i = 0; i < nfds; i++) { + pfds[i].fd = i == nfds - 1 ? pipefd_[0] : listener_fds_[i]; + pfds[i].events = POLLIN; + pfds[i].revents = 0; + } + if (!PollFds(pfds, nfds, absl::InfiniteDuration()).ok()) { + break; + } + int saved_errno = 0; + if ((pfds[nfds - 1].revents & POLLIN) && + ReadBytes(pipefd_[0], saved_errno, strlen(kStopMessage)) == + std::string(kStopMessage)) { + break; + } + for (int i = 0; i < nfds - 1; i++) { + if (!(pfds[i].revents & POLLIN)) { + continue; + } + // pfds[i].fd has a readable event. + int client_sock_fd = accept(pfds[i].fd, nullptr, nullptr); + if (client_sock_fd < 0) { + gpr_log(GPR_ERROR, + "Error accepting new connection: %s. Ignoring connection " + "attempt ...", + std::strerror(errno)); + continue; + } + on_accept_(PosixOracleEndpoint::Create(client_sock_fd), + memory_allocator_factory_->CreateMemoryAllocator("test")); + } + } + gpr_log(GPR_INFO, "Shutting down accept thread ..."); + gpr_free(pfds); +} + +absl::StatusOr PosixOracleListener::Bind( + const EventEngine::ResolvedAddress& addr) { + absl::MutexLock lock(&mu_); + int new_socket; + int opt = -1; + grpc_resolved_address address = CreateGRPCResolvedAddress(addr); + const char* scheme = grpc_sockaddr_get_uri_scheme(&address); + if (scheme == nullptr || strcmp(scheme, "ipv6") != 0) { + return absl::UnimplementedError( + "Unsupported bind address type. Only IPV6 addresses are suported " + "currently by the PosixOracleListener ..."); + } + + // Creating a new socket file descriptor. + if ((new_socket = socket(AF_INET6, SOCK_STREAM, 0)) <= 0) { + return absl::UnknownError( + absl::StrCat("Error creating socket: ", std::strerror(errno))); + } + // MacOS biulds fail if SO_REUSEADDR and SO_REUSEPORT are set in the same + // setsockopt syscall. So they are set separately one after the other. + if (setsockopt(new_socket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) { + return absl::UnknownError( + absl::StrCat("Error setsockopt(SO_REUSEADDR): ", std::strerror(errno))); + } + if (setsockopt(new_socket, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt))) { + return absl::UnknownError( + absl::StrCat("Error setsockopt(SO_REUSEPORT): ", std::strerror(errno))); + } + + // Forcefully bind the new socket. + if (bind(new_socket, reinterpret_cast(addr.address()), + address.len) < 0) { + return absl::UnknownError( + absl::StrCat("Error bind: ", std::strerror(errno))); + } + // Set the new socket to listen for one active connection at a time. + if (listen(new_socket, 1) < 0) { + return absl::UnknownError( + absl::StrCat("Error listen: ", std::strerror(errno))); + } + listener_fds_.push_back(new_socket); + return 0; +} + +// PosixOracleEventEngine implements blocking connect. It blocks the calling +// thread until either connect succeeds or fails with timeout. +EventEngine::ConnectionHandle PosixOracleEventEngine::Connect( + OnConnectCallback on_connect, const ResolvedAddress& addr, + const EndpointConfig& /*args*/, MemoryAllocator /*memory_allocator*/, + EventEngine::Duration timeout) { + int client_sock_fd; + absl::Time deadline = absl::Now() + absl::FromChrono(timeout); + grpc_resolved_address address = CreateGRPCResolvedAddress(addr); + const char* scheme = grpc_sockaddr_get_uri_scheme(&address); + if (scheme == nullptr || strcmp(scheme, "ipv6") != 0) { + on_connect( + absl::CancelledError("Unsupported bind address type. Only ipv6 " + "addresses are currently supported.")); + return {}; + } + if ((client_sock_fd = socket(AF_INET6, SOCK_STREAM, 0)) < 0) { + on_connect(absl::CancelledError(absl::StrCat( + "Connect failed: socket creation error: ", std::strerror(errno)))); + return {}; + } + int err; + int num_retries = 0; + static constexpr int kMaxRetries = 5; + do { + err = connect(client_sock_fd, const_cast(addr.address()), + address.len); + if (err < 0 && (errno == EINPROGRESS || errno == EWOULDBLOCK)) { + auto status = BlockUntilWritableWithTimeout( + client_sock_fd, + std::max(deadline - absl::Now(), absl::ZeroDuration())); + if (!status.ok()) { + on_connect(status); + return {}; + } + } else if (err < 0) { + if (errno != ECONNREFUSED || ++num_retries > kMaxRetries) { + on_connect(absl::CancelledError("Connect failed.")); + return {}; + } + // If ECONNREFUSED && num_retries < kMaxRetries, wait a while and try + // again. + absl::SleepFor(absl::Milliseconds(100)); + } + } while (err < 0 && absl::Now() < deadline); + if (err < 0 && absl::Now() >= deadline) { + on_connect(absl::CancelledError("Deadline exceeded")); + } else { + on_connect(PosixOracleEndpoint::Create(client_sock_fd)); + } + return {}; +} + +} // namespace experimental +} // namespace grpc_event_engine diff --git a/test/core/event_engine/test_suite/oracle_event_engine_posix.h b/test/core/event_engine/test_suite/oracle_event_engine_posix.h new file mode 100644 index 00000000000..effaaec8f32 --- /dev/null +++ b/test/core/event_engine/test_suite/oracle_event_engine_posix.h @@ -0,0 +1,194 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_ORACLE_EVENT_ENGINE_POSIX_H_ +#define GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_ORACLE_EVENT_ENGINE_POSIX_H_ + +#include +#include +#include +#include +#include + +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/time/time.h" + +#include +#include +#include + +#include "src/core/lib/event_engine/promise.h" +#include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/resource_quota/memory_quota.h" +#include "test/core/event_engine/test_suite/event_engine_test_utils.h" + +namespace grpc_event_engine { +namespace experimental { + +class PosixOracleEndpoint : public EventEngine::Endpoint { + public: + explicit PosixOracleEndpoint(int socket_fd); + static std::unique_ptr Create(int socket_fd); + ~PosixOracleEndpoint() override; + void Read(std::function on_read, SliceBuffer* buffer, + const ReadArgs* args) override; + void Write(std::function on_writable, SliceBuffer* data, + const WriteArgs* args) override; + void Shutdown(); + EventEngine::ResolvedAddress& GetPeerAddress() const override { + GPR_ASSERT(false && "unimplemented"); + } + EventEngine::ResolvedAddress& GetLocalAddress() const override { + GPR_ASSERT(false && "unimplemented"); + } + + private: + // An internal helper class definition of Read operations to be performed + // by the TCPServerEndpoint. + class ReadOperation { + public: + ReadOperation() + : num_bytes_to_read_(-1), buffer_(nullptr), on_complete_(nullptr) {} + ReadOperation(int num_bytes_to_read, SliceBuffer* buffer, + std::function&& on_complete) + : num_bytes_to_read_(num_bytes_to_read), + buffer_(buffer), + on_complete_(std::move(on_complete)) {} + bool IsValid() { return num_bytes_to_read_ >= 0 && buffer_ != nullptr; } + int GetNumBytesToRead() const { return num_bytes_to_read_; } + void operator()(std::string read_data, absl::Status status) { + if (on_complete_ != nullptr) { + AppendStringToSliceBuffer(absl::exchange(buffer_, nullptr), read_data); + absl::exchange(on_complete_, nullptr)(status); + } + } + + private: + int num_bytes_to_read_; + SliceBuffer* buffer_; + std::function on_complete_; + }; + + // An internal helper class definition of Write operations to be performed + // by the TCPServerEndpoint. + class WriteOperation { + public: + WriteOperation() : bytes_to_write_(std::string()), on_complete_(nullptr) {} + WriteOperation(SliceBuffer* buffer, + std::function&& on_complete) + : bytes_to_write_(ExtractSliceBufferIntoString(buffer)), + on_complete_(std::move(on_complete)) {} + bool IsValid() { return bytes_to_write_.length() > 0; } + std::string GetBytesToWrite() const { return bytes_to_write_; } + void operator()(absl::Status status) { + if (on_complete_ != nullptr) { + absl::exchange(on_complete_, nullptr)(status); + } + } + + private: + std::string bytes_to_write_; + std::function on_complete_; + }; + + void ProcessReadOperations(); + void ProcessWriteOperations(); + + mutable absl::Mutex mu_; + bool is_shutdown_ = false; + int socket_fd_; + Promise read_ops_channel_; + Promise write_ops_channel_; + grpc_core::Thread read_ops_ ABSL_GUARDED_BY(mu_); + grpc_core::Thread write_ops_ ABSL_GUARDED_BY(mu_); +}; + +class PosixOracleListener : public EventEngine::Listener { + public: + PosixOracleListener( + EventEngine::Listener::AcceptCallback on_accept, + std::function on_shutdown, + std::unique_ptr memory_allocator_factory); + ~PosixOracleListener() override; + absl::StatusOr Bind(const EventEngine::ResolvedAddress& addr) override; + absl::Status Start() override; + + private: + void HandleIncomingConnections(); + + mutable absl::Mutex mu_; + EventEngine::Listener::AcceptCallback on_accept_; + std::function on_shutdown_; + std::unique_ptr memory_allocator_factory_; + grpc_core::Thread serve_; + int pipefd_[2]; + bool is_started_ = false; + std::vector listener_fds_; +}; + +// A posix based oracle event engine. +class PosixOracleEventEngine final : public EventEngine { + public: + PosixOracleEventEngine() = default; + ~PosixOracleEventEngine() override = default; + + absl::StatusOr> CreateListener( + Listener::AcceptCallback on_accept, + std::function on_shutdown, + const EndpointConfig& /*config*/, + std::unique_ptr memory_allocator_factory) + override { + return std::make_unique( + std::move(on_accept), std::move(on_shutdown), + std::move(memory_allocator_factory)); + } + + ConnectionHandle Connect(OnConnectCallback on_connect, + const ResolvedAddress& addr, + const EndpointConfig& args, + MemoryAllocator memory_allocator, + EventEngine::Duration timeout) override; + + bool CancelConnect(ConnectionHandle /*handle*/) override { + GPR_ASSERT(false && "unimplemented"); + } + bool IsWorkerThread() override { return false; }; + std::unique_ptr GetDNSResolver( + const DNSResolver::ResolverOptions& /*options*/) override { + GPR_ASSERT(false && "unimplemented"); + } + void Run(Closure* /*closure*/) override { + GPR_ASSERT(false && "unimplemented"); + } + void Run(std::function /*closure*/) override { + GPR_ASSERT(false && "unimplemented"); + } + TaskHandle RunAfter(EventEngine::Duration /*duration*/, + Closure* /*closure*/) override { + GPR_ASSERT(false && "unimplemented"); + } + TaskHandle RunAfter(EventEngine::Duration /*duration*/, + std::function /*closure*/) override { + GPR_ASSERT(false && "unimplemented"); + } + bool Cancel(TaskHandle /*handle*/) override { + GPR_ASSERT(false && "unimplemented"); + } +}; + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_ORACLE_EVENT_ENGINE_POSIX_H_ diff --git a/test/core/event_engine/test_suite/oracle_event_engine_posix_test.cc b/test/core/event_engine/test_suite/oracle_event_engine_posix_test.cc new file mode 100644 index 00000000000..2a9f4bd3945 --- /dev/null +++ b/test/core/event_engine/test_suite/oracle_event_engine_posix_test.cc @@ -0,0 +1,29 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include "test/core/event_engine/test_suite/oracle_event_engine_posix.h" + +#include "test/core/event_engine/test_suite/event_engine_test.h" +#include "test/core/util/test_config.h" + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(&argc, argv); + auto ee_factory = []() { + return absl::make_unique< + grpc_event_engine::experimental::PosixOracleEventEngine>(); + }; + SetEventEngineFactories(/*ee_factory=*/ee_factory, + /*oracle_ee_factory=*/ee_factory); + return RUN_ALL_TESTS(); +} diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 305d875380b..2ed97e217e2 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1939,6 +1939,7 @@ src/core/lib/event_engine/handle_containers.h \ src/core/lib/event_engine/iomgr_engine.cc \ src/core/lib/event_engine/iomgr_engine.h \ src/core/lib/event_engine/memory_allocator.cc \ +src/core/lib/event_engine/promise.h \ src/core/lib/event_engine/resolved_address.cc \ src/core/lib/event_engine/slice.cc \ src/core/lib/event_engine/slice_buffer.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index cd53ea6db47..dd0fb48ae6d 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1730,6 +1730,7 @@ src/core/lib/event_engine/handle_containers.h \ src/core/lib/event_engine/iomgr_engine.cc \ src/core/lib/event_engine/iomgr_engine.h \ src/core/lib/event_engine/memory_allocator.cc \ +src/core/lib/event_engine/promise.h \ src/core/lib/event_engine/resolved_address.cc \ src/core/lib/event_engine/slice.cc \ src/core/lib/event_engine/slice_buffer.cc \ diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 5b5e1bce6ef..cb10ba8780d 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -5523,6 +5523,28 @@ ], "uses_polling": false }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "oracle_event_engine_posix_test", + "platforms": [ + "linux", + "mac", + "posix" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,