diff --git a/CMakeLists.txt b/CMakeLists.txt index dc9c2601ad3..ae6bb82180e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1184,6 +1184,7 @@ if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx tcp_server_posix_test) endif() + add_dependencies(buildtests_cxx tcp_socket_utils_test) add_dependencies(buildtests_cxx test_core_event_engine_posix_timer_heap_test) add_dependencies(buildtests_cxx test_core_event_engine_posix_timer_list_test) add_dependencies(buildtests_cxx test_core_event_engine_slice_buffer_test) @@ -1240,6 +1241,9 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx win_socket_test) endif() add_dependencies(buildtests_cxx window_overflow_bad_client_test) + if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX OR _gRPC_PLATFORM_WINDOWS) + add_dependencies(buildtests_cxx windows_endpoint_test) + endif() add_dependencies(buildtests_cxx wire_reader_test) add_dependencies(buildtests_cxx wire_writer_test) add_dependencies(buildtests_cxx work_queue_test) @@ -2111,6 +2115,7 @@ add_library(grpc src/core/lib/event_engine/resolved_address.cc src/core/lib/event_engine/slice.cc src/core/lib/event_engine/slice_buffer.cc + src/core/lib/event_engine/tcp_socket_utils.cc src/core/lib/event_engine/thread_pool.cc src/core/lib/event_engine/time_util.cc src/core/lib/event_engine/trace.cc @@ -2774,6 +2779,7 @@ add_library(grpc_unsecure src/core/lib/event_engine/resolved_address.cc src/core/lib/event_engine/slice.cc src/core/lib/event_engine/slice_buffer.cc + src/core/lib/event_engine/tcp_socket_utils.cc src/core/lib/event_engine/thread_pool.cc src/core/lib/event_engine/time_util.cc src/core/lib/event_engine/trace.cc @@ -4269,6 +4275,7 @@ add_library(grpc_authorization_provider src/core/lib/event_engine/resolved_address.cc src/core/lib/event_engine/slice.cc src/core/lib/event_engine/slice_buffer.cc + src/core/lib/event_engine/tcp_socket_utils.cc src/core/lib/event_engine/thread_pool.cc src/core/lib/event_engine/time_util.cc src/core/lib/event_engine/trace.cc @@ -11161,6 +11168,7 @@ add_executable(frame_test src/core/lib/event_engine/resolved_address.cc src/core/lib/event_engine/slice.cc src/core/lib/event_engine/slice_buffer.cc + src/core/lib/event_engine/tcp_socket_utils.cc src/core/lib/event_engine/thread_pool.cc src/core/lib/event_engine/time_util.cc src/core/lib/event_engine/trace.cc @@ -19569,6 +19577,43 @@ endif() endif() if(gRPC_BUILD_TESTS) +add_executable(tcp_socket_utils_test + test/core/event_engine/tcp_socket_utils_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(tcp_socket_utils_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(tcp_socket_utils_test + ${_gRPC_BASELIB_LIBRARIES} + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ZLIB_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc +) + + +endif() +if(gRPC_BUILD_TESTS) + add_executable(test_core_event_engine_posix_timer_heap_test src/core/lib/event_engine/posix_engine/timer.cc src/core/lib/event_engine/posix_engine/timer_heap.cc @@ -21531,6 +21576,47 @@ target_link_libraries(window_overflow_bad_client_test ) +endif() +if(gRPC_BUILD_TESTS) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX OR _gRPC_PLATFORM_WINDOWS) + + add_executable(windows_endpoint_test + src/core/lib/event_engine/windows/windows_endpoint.cc + test/core/event_engine/windows/create_sockpair.cc + test/core/event_engine/windows/windows_endpoint_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc + ) + + target_include_directories(windows_endpoint_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} + ) + + target_link_libraries(windows_endpoint_test + ${_gRPC_BASELIB_LIBRARIES} + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ZLIB_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + ) + + +endif() endif() if(gRPC_BUILD_TESTS) diff --git a/Makefile b/Makefile index 9581367dd6c..26b74f4e647 100644 --- a/Makefile +++ b/Makefile @@ -1413,6 +1413,7 @@ LIBGRPC_SRC = \ src/core/lib/event_engine/resolved_address.cc \ src/core/lib/event_engine/slice.cc \ src/core/lib/event_engine/slice_buffer.cc \ + src/core/lib/event_engine/tcp_socket_utils.cc \ src/core/lib/event_engine/thread_pool.cc \ src/core/lib/event_engine/time_util.cc \ src/core/lib/event_engine/trace.cc \ @@ -1935,6 +1936,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/event_engine/resolved_address.cc \ src/core/lib/event_engine/slice.cc \ src/core/lib/event_engine/slice_buffer.cc \ + src/core/lib/event_engine/tcp_socket_utils.cc \ src/core/lib/event_engine/thread_pool.cc \ src/core/lib/event_engine/time_util.cc \ src/core/lib/event_engine/trace.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index ac84623aa2a..09aad0f37e5 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -767,6 +767,7 @@ libs: - src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h - src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h - src/core/lib/event_engine/socket_notifier.h + - src/core/lib/event_engine/tcp_socket_utils.h - src/core/lib/event_engine/thread_pool.h - src/core/lib/event_engine/time_util.h - src/core/lib/event_engine/trace.h @@ -1499,6 +1500,7 @@ libs: - src/core/lib/event_engine/resolved_address.cc - src/core/lib/event_engine/slice.cc - src/core/lib/event_engine/slice_buffer.cc + - src/core/lib/event_engine/tcp_socket_utils.cc - src/core/lib/event_engine/thread_pool.cc - src/core/lib/event_engine/time_util.cc - src/core/lib/event_engine/trace.cc @@ -2040,6 +2042,7 @@ libs: - src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h - src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h - src/core/lib/event_engine/socket_notifier.h + - src/core/lib/event_engine/tcp_socket_utils.h - src/core/lib/event_engine/thread_pool.h - src/core/lib/event_engine/time_util.h - src/core/lib/event_engine/trace.h @@ -2418,6 +2421,7 @@ libs: - src/core/lib/event_engine/resolved_address.cc - src/core/lib/event_engine/slice.cc - src/core/lib/event_engine/slice_buffer.cc + - src/core/lib/event_engine/tcp_socket_utils.cc - src/core/lib/event_engine/thread_pool.cc - src/core/lib/event_engine/time_util.cc - src/core/lib/event_engine/trace.cc @@ -3483,6 +3487,7 @@ libs: - src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h - src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h - src/core/lib/event_engine/socket_notifier.h + - src/core/lib/event_engine/tcp_socket_utils.h - src/core/lib/event_engine/thread_pool.h - src/core/lib/event_engine/time_util.h - src/core/lib/event_engine/trace.h @@ -3741,6 +3746,7 @@ libs: - src/core/lib/event_engine/resolved_address.cc - src/core/lib/event_engine/slice.cc - src/core/lib/event_engine/slice_buffer.cc + - src/core/lib/event_engine/tcp_socket_utils.cc - src/core/lib/event_engine/thread_pool.cc - src/core/lib/event_engine/time_util.cc - src/core/lib/event_engine/trace.cc @@ -7170,6 +7176,7 @@ targets: - src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h - src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h - src/core/lib/event_engine/socket_notifier.h + - src/core/lib/event_engine/tcp_socket_utils.h - src/core/lib/event_engine/thread_pool.h - src/core/lib/event_engine/time_util.h - src/core/lib/event_engine/trace.h @@ -7410,6 +7417,7 @@ targets: - src/core/lib/event_engine/resolved_address.cc - src/core/lib/event_engine/slice.cc - src/core/lib/event_engine/slice_buffer.cc + - src/core/lib/event_engine/tcp_socket_utils.cc - src/core/lib/event_engine/thread_pool.cc - src/core/lib/event_engine/time_util.cc - src/core/lib/event_engine/trace.cc @@ -11191,6 +11199,16 @@ targets: - linux - posix - mac +- name: tcp_socket_utils_test + gtest: true + build: test + language: c++ + headers: [] + src: + - test/core/event_engine/tcp_socket_utils_test.cc + deps: + - grpc + uses_polling: false - name: test_core_event_engine_posix_timer_heap_test gtest: true build: test @@ -12078,6 +12096,24 @@ targets: - test/core/end2end/cq_verifier.cc deps: - grpc_test_util +- name: windows_endpoint_test + gtest: true + build: test + language: c++ + headers: + - src/core/lib/event_engine/windows/windows_endpoint.h + - test/core/event_engine/windows/create_sockpair.h + src: + - src/core/lib/event_engine/windows/windows_endpoint.cc + - test/core/event_engine/windows/create_sockpair.cc + - test/core/event_engine/windows/windows_endpoint_test.cc + deps: + - grpc_test_util + platforms: + - linux + - posix + - windows + uses_polling: false - name: wire_reader_test gtest: true build: test diff --git a/config.m4 b/config.m4 index 6cd04edb730..8bfc67c1735 100644 --- a/config.m4 +++ b/config.m4 @@ -495,6 +495,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/event_engine/resolved_address.cc \ src/core/lib/event_engine/slice.cc \ src/core/lib/event_engine/slice_buffer.cc \ + src/core/lib/event_engine/tcp_socket_utils.cc \ src/core/lib/event_engine/thread_pool.cc \ src/core/lib/event_engine/time_util.cc \ src/core/lib/event_engine/trace.cc \ diff --git a/config.w32 b/config.w32 index d23443c7ce6..712e29de2ab 100644 --- a/config.w32 +++ b/config.w32 @@ -461,6 +461,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\event_engine\\resolved_address.cc " + "src\\core\\lib\\event_engine\\slice.cc " + "src\\core\\lib\\event_engine\\slice_buffer.cc " + + "src\\core\\lib\\event_engine\\tcp_socket_utils.cc " + "src\\core\\lib\\event_engine\\thread_pool.cc " + "src\\core\\lib\\event_engine\\time_util.cc " + "src\\core\\lib\\event_engine\\trace.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index bb4ddd1b6cd..da1491bdfbc 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -724,6 +724,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h', 'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h', 'src/core/lib/event_engine/socket_notifier.h', + 'src/core/lib/event_engine/tcp_socket_utils.h', 'src/core/lib/event_engine/thread_pool.h', 'src/core/lib/event_engine/time_util.h', 'src/core/lib/event_engine/trace.h', @@ -1610,6 +1611,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h', 'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h', 'src/core/lib/event_engine/socket_notifier.h', + 'src/core/lib/event_engine/tcp_socket_utils.h', 'src/core/lib/event_engine/thread_pool.h', 'src/core/lib/event_engine/time_util.h', 'src/core/lib/event_engine/trace.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index b261e98fdd2..fe267a73d83 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1104,6 +1104,8 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/slice.cc', 'src/core/lib/event_engine/slice_buffer.cc', 'src/core/lib/event_engine/socket_notifier.h', + 'src/core/lib/event_engine/tcp_socket_utils.cc', + 'src/core/lib/event_engine/tcp_socket_utils.h', 'src/core/lib/event_engine/thread_pool.cc', 'src/core/lib/event_engine/thread_pool.h', 'src/core/lib/event_engine/time_util.cc', @@ -2241,6 +2243,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h', 'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h', 'src/core/lib/event_engine/socket_notifier.h', + 'src/core/lib/event_engine/tcp_socket_utils.h', 'src/core/lib/event_engine/thread_pool.h', 'src/core/lib/event_engine/time_util.h', 'src/core/lib/event_engine/trace.h', diff --git a/grpc.gemspec b/grpc.gemspec index a95bc561652..f45663d0677 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -1015,6 +1015,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/event_engine/slice.cc ) s.files += %w( src/core/lib/event_engine/slice_buffer.cc ) s.files += %w( src/core/lib/event_engine/socket_notifier.h ) + s.files += %w( src/core/lib/event_engine/tcp_socket_utils.cc ) + s.files += %w( src/core/lib/event_engine/tcp_socket_utils.h ) s.files += %w( src/core/lib/event_engine/thread_pool.cc ) s.files += %w( src/core/lib/event_engine/thread_pool.h ) s.files += %w( src/core/lib/event_engine/time_util.cc ) diff --git a/grpc.gyp b/grpc.gyp index dc94ac864aa..c5da8cbfc7e 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -825,6 +825,7 @@ 'src/core/lib/event_engine/resolved_address.cc', 'src/core/lib/event_engine/slice.cc', 'src/core/lib/event_engine/slice_buffer.cc', + 'src/core/lib/event_engine/tcp_socket_utils.cc', 'src/core/lib/event_engine/thread_pool.cc', 'src/core/lib/event_engine/time_util.cc', 'src/core/lib/event_engine/trace.cc', @@ -1293,6 +1294,7 @@ 'src/core/lib/event_engine/resolved_address.cc', 'src/core/lib/event_engine/slice.cc', 'src/core/lib/event_engine/slice_buffer.cc', + 'src/core/lib/event_engine/tcp_socket_utils.cc', 'src/core/lib/event_engine/thread_pool.cc', 'src/core/lib/event_engine/time_util.cc', 'src/core/lib/event_engine/trace.cc', @@ -1797,6 +1799,7 @@ 'src/core/lib/event_engine/resolved_address.cc', 'src/core/lib/event_engine/slice.cc', 'src/core/lib/event_engine/slice_buffer.cc', + 'src/core/lib/event_engine/tcp_socket_utils.cc', 'src/core/lib/event_engine/thread_pool.cc', 'src/core/lib/event_engine/time_util.cc', 'src/core/lib/event_engine/trace.cc', diff --git a/package.xml b/package.xml index d2eea2fb4a0..fd574363123 100644 --- a/package.xml +++ b/package.xml @@ -997,6 +997,8 @@ + + diff --git a/src/core/BUILD b/src/core/BUILD index 22094928779..c758fc8f51c 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -1696,10 +1696,10 @@ grpc_cc_library( "absl/status", "absl/status:statusor", "absl/strings", - "absl/strings:str_format", "absl/types:optional", ], deps = [ + "event_engine_tcp_socket_utils", "iomgr_port", "resource_quota", "socket_mutator", @@ -1727,6 +1727,7 @@ grpc_cc_library( "absl/strings", ], deps = [ + "event_engine_tcp_socket_utils", "iomgr_port", "posix_event_engine_tcp_socket_utils", "socket_mutator", @@ -1754,6 +1755,7 @@ grpc_cc_library( "absl/types:optional", ], deps = [ + "event_engine_tcp_socket_utils", "iomgr_port", "posix_event_engine_closure", "posix_event_engine_endpoint", @@ -1785,6 +1787,7 @@ grpc_cc_library( deps = [ "event_engine_common", "event_engine_poller", + "event_engine_tcp_socket_utils", "event_engine_thread_pool", "event_engine_trace", "event_engine_utils", @@ -1857,6 +1860,59 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "windows_endpoint", + srcs = [ + "lib/event_engine/windows/windows_endpoint.cc", + ], + hdrs = [ + "lib/event_engine/windows/windows_endpoint.h", + ], + external_deps = [ + "absl/cleanup", + "absl/functional:any_invocable", + "absl/status", + "absl/strings:str_format", + ], + deps = [ + "error", + "event_engine_tcp_socket_utils", + "event_engine_trace", + "status_helper", + "windows_iocp", + "//:debug_location", + "//:event_engine_base_hdrs", + "//:gpr", + "//:gpr_platform", + ], +) + +grpc_cc_library( + name = "event_engine_tcp_socket_utils", + srcs = [ + "lib/event_engine/tcp_socket_utils.cc", + ], + hdrs = [ + "lib/event_engine/tcp_socket_utils.h", + ], + external_deps = [ + "absl/status", + "absl/status:statusor", + "absl/strings", + "absl/strings:str_format", + "absl/types:optional", + ], + deps = [ + "grpc_sockaddr", + "iomgr_port", + "status_helper", + "//:event_engine_base_hdrs", + "//:gpr", + "//:gpr_platform", + "//:uri_parser", + ], +) + grpc_cc_library( name = "event_engine_trace", srcs = [ diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.cc b/src/core/lib/event_engine/posix_engine/posix_engine.cc index aade89b967e..7007545ee38 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine.cc @@ -37,6 +37,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/event_engine/poller.h" #include "src/core/lib/event_engine/posix_engine/timer.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/event_engine/trace.h" #include "src/core/lib/event_engine/utils.h" #include "src/core/lib/experiments/experiments.h" @@ -61,13 +62,13 @@ namespace grpc_event_engine { namespace experimental { #ifdef GRPC_POSIX_SOCKET_TCP +using ::grpc_event_engine::experimental::ResolvedAddressToNormalizedString; using ::grpc_event_engine::posix_engine::EventHandle; using ::grpc_event_engine::posix_engine::PosixEngineClosure; using ::grpc_event_engine::posix_engine::PosixEngineListener; using ::grpc_event_engine::posix_engine::PosixEventPoller; using ::grpc_event_engine::posix_engine::PosixSocketWrapper; using ::grpc_event_engine::posix_engine::PosixTcpOptions; -using ::grpc_event_engine::posix_engine::SockaddrToString; using ::grpc_event_engine::posix_engine::TcpOptionsFromEndpointConfig; void AsyncConnect::Start(EventEngine::Duration timeout) { @@ -226,7 +227,7 @@ EventEngine::ConnectionHandle PosixEventEngine::ConnectInternal( } while (err < 0 && errno == EINTR); saved_errno = errno; - auto addr_uri = SockaddrToString(&addr, true); + auto addr_uri = ResolvedAddressToNormalizedString(addr); if (!addr_uri.ok()) { Run([on_connect = std::move(on_connect), ep = absl::FailedPreconditionError(absl::StrCat( diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc b/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc index 42fed6baa28..ef16412a8ea 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc @@ -14,7 +14,13 @@ #include -#include "src/core/lib/event_engine/posix_engine/posix_engine_listener.h" +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_POSIX_SOCKET_TCP + +#include // IWYU pragma: keep +#include // IWYU pragma: keep +#include // IWYU pragma: keep #include #include @@ -28,22 +34,25 @@ #include #include -#include "src/core/lib/gprpp/status_helper.h" -#ifdef GRPC_POSIX_SOCKET_TCP -#include // IWYU pragma: keep -#include // IWYU pragma: keep -#include // IWYU pragma: keep - #include "src/core/lib/event_engine/posix_engine/event_poller.h" #include "src/core/lib/event_engine/posix_engine/posix_endpoint.h" +#include "src/core/lib/event_engine/posix_engine/posix_engine_listener.h" #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" +#include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/iomgr/socket_mutator.h" -#endif namespace grpc_event_engine { namespace posix_engine { -#ifdef GRPC_POSIX_SOCKET_TCP +namespace { +using ::grpc_event_engine::experimental::ResolvedAddressGetPort; +using ::grpc_event_engine::experimental::ResolvedAddressIsWildcard; +using ::grpc_event_engine::experimental::ResolvedAddressSetPort; +using ::grpc_event_engine::experimental::ResolvedAddressToNormalizedString; +using ::grpc_event_engine::experimental::ResolvedAddressToV4Mapped; +} // namespace + PosixEngineListenerImpl::PosixEngineListenerImpl( EventEngine::Listener::AcceptCallback on_accept, absl::AnyInvocable on_shutdown, @@ -63,7 +72,7 @@ absl::StatusOr PosixEngineListenerImpl::Bind( const EventEngine::ResolvedAddress& addr) { EventEngine::ResolvedAddress res_addr = addr; EventEngine::ResolvedAddress addr6_v4mapped; - int requested_port = SockaddrGetPort(res_addr); + int requested_port = ResolvedAddressGetPort(res_addr); absl::MutexLock lock(&this->mu_); GPR_ASSERT(!this->started_); GPR_ASSERT(addr.size() <= EventEngine::ResolvedAddress::MAX_SIZE_BYTES); @@ -78,22 +87,22 @@ absl::StatusOr PosixEngineListenerImpl::Bind( if (0 == getsockname((*it)->Socket().sock.Fd(), const_cast(sockname_temp.address()), &len)) { - int used_port = SockaddrGetPort(sockname_temp); + int used_port = ResolvedAddressGetPort(sockname_temp); if (used_port > 0) { requested_port = used_port; - SockaddrSetPort(res_addr, requested_port); + ResolvedAddressSetPort(res_addr, requested_port); break; } } } - auto used_port = SockaddrIsWildcard(res_addr); + auto used_port = ResolvedAddressIsWildcard(res_addr); if (used_port.has_value()) { requested_port = *used_port; return ListenerContainerAddWildcardAddresses(acceptors_, options_, requested_port); } - if (SockaddrToV4Mapped(&res_addr, &addr6_v4mapped)) { + if (ResolvedAddressToV4Mapped(res_addr, &addr6_v4mapped)) { res_addr = addr6_v4mapped; } @@ -170,7 +179,7 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept( } // Create an Endpoint here. - std::string peer_name = *SockaddrToString(&addr, true); + std::string peer_name = *ResolvedAddressToNormalizedString(addr); auto endpoint = CreatePosixEndpoint( /*handle=*/listener_->poller_->CreateHandle( fd, peer_name, listener_->poller_->CanTrackErrors()), @@ -229,7 +238,7 @@ PosixEngineListenerImpl::~PosixEngineListenerImpl() { } } -#endif // GRPC_POSIX_SOCKET_TCP - } // namespace posix_engine } // namespace grpc_event_engine + +#endif // GRPC_POSIX_SOCKET_TCP diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener.h b/src/core/lib/event_engine/posix_engine/posix_engine_listener.h index dfb45642934..e47f1e537c3 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine_listener.h +++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener.h @@ -41,6 +41,7 @@ #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" #include "src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h" #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" #endif namespace grpc_event_engine { @@ -81,7 +82,9 @@ class PosixEngineListenerImpl listener_(std::move(listener)), socket_(socket), handle_(listener_->poller_->CreateHandle( - socket_.sock.Fd(), *SockaddrToString(&socket_.addr, true), + socket_.sock.Fd(), + *grpc_event_engine::experimental:: + ResolvedAddressToNormalizedString(socket_.addr), listener_->poller_->CanTrackErrors())), notify_on_accept_(PosixEngineClosure::ToPermanentClosure( [this](absl::Status status) { NotifyOnAccept(status); })){}; diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc b/src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc index a87d6730bd5..350a363c69f 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc @@ -29,6 +29,7 @@ #include #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/iomgr/port.h" #include "src/core/lib/iomgr/socket_mutator.h" @@ -55,18 +56,24 @@ namespace { using ResolvedAddress = grpc_event_engine::experimental::EventEngine::ResolvedAddress; using ListenerSocket = ListenerSocketsContainer::ListenerSocket; +using ::grpc_event_engine::experimental::ResolvedAddressGetPort; +using ::grpc_event_engine::experimental::ResolvedAddressIsV4Mapped; +using ::grpc_event_engine::experimental::ResolvedAddressMakeWild4; +using ::grpc_event_engine::experimental::ResolvedAddressMakeWild6; +using ::grpc_event_engine::experimental::ResolvedAddressSetPort; +using ::grpc_event_engine::experimental::ResolvedAddressToString; #ifdef GRPC_HAVE_IFADDRS // Bind to "::" to get a port number not used by any address. absl::StatusOr GetUnusedPort() { - ResolvedAddress wild = SockaddrMakeWild6(0); + ResolvedAddress wild = ResolvedAddressMakeWild6(0); PosixSocketWrapper::DSMode dsmode; auto sock = PosixSocketWrapper::CreateDualStackSocket(nullptr, wild, SOCK_STREAM, 0, dsmode); GRPC_RETURN_IF_ERROR(sock.status()); if (dsmode == PosixSocketWrapper::DSMode::DSMODE_IPV4) { - wild = SockaddrMakeWild4(0); + wild = ResolvedAddressMakeWild4(0); } if (bind(sock->Fd(), wild.address(), wild.size()) != 0) { close(sock->Fd()); @@ -81,7 +88,7 @@ absl::StatusOr GetUnusedPort() { absl::StrCat("getsockname(GetUnusedPort): ", std::strerror(errno))); } close(sock->Fd()); - int port = SockaddrGetPort(wild); + int port = ResolvedAddressGetPort(wild); if (port <= 0) { return absl::FailedPreconditionError("Bad port"); } @@ -188,7 +195,8 @@ absl::Status PrepareSocket(const PosixTcpOptions& options, absl::StrCat("Error in getsockname: ", std::strerror(errno))); } - socket.port = SockaddrGetPort(ResolvedAddress(sockname_temp.address(), len)); + socket.port = + ResolvedAddressGetPort(ResolvedAddress(sockname_temp.address(), len)); // No errors. Set close_fd to false to ensure the socket is not closed. close_fd = false; return absl::OkStatus(); @@ -207,7 +215,7 @@ absl::StatusOr CreateAndPrepareListenerSocket( } socket.sock = *result; if (socket.dsmode == PosixSocketWrapper::DSMODE_IPV4 && - SockaddrIsV4Mapped(&addr, &addr4_copy)) { + ResolvedAddressIsV4Mapped(addr, &addr4_copy)) { socket.addr = addr4_copy; } else { socket.addr = addr; @@ -251,8 +259,8 @@ absl::StatusOr ListenerContainerAddAllLocalAddresses( continue; } memcpy(const_cast(addr.address()), ifa_it->ifa_addr, len); - SockaddrSetPort(addr, requested_port); - std::string addr_str = *SockaddrToString(&addr, false); + ResolvedAddressSetPort(addr, requested_port); + std::string addr_str = *ResolvedAddressToString(addr); gpr_log(GPR_DEBUG, "Adding local addr from interface %s flags 0x%x to server: %s", ifa_name, ifa_it->ifa_flags, addr_str.c_str()); @@ -293,8 +301,8 @@ absl::StatusOr ListenerContainerAddAllLocalAddresses( absl::StatusOr ListenerContainerAddWildcardAddresses( ListenerSocketsContainer& listener_sockets, const PosixTcpOptions& options, int requested_port) { - ResolvedAddress wild4 = SockaddrMakeWild4(requested_port); - ResolvedAddress wild6 = SockaddrMakeWild6(requested_port); + ResolvedAddress wild4 = ResolvedAddressMakeWild4(requested_port); + ResolvedAddress wild6 = ResolvedAddressMakeWild6(requested_port); absl::StatusOr v6_sock; absl::StatusOr v4_sock; int assigned_port = 0; @@ -316,7 +324,7 @@ absl::StatusOr ListenerContainerAddWildcardAddresses( } } // If we got a v6-only socket or nothing, try adding 0.0.0.0. - SockaddrSetPort(wild4, requested_port); + ResolvedAddressSetPort(wild4, requested_port); v4_sock = CreateAndPrepareListenerSocket(options, wild4); if (v4_sock.ok()) { assigned_port = v4_sock->port; diff --git a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc index 089c73139fa..511043d22fa 100644 --- a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc +++ b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc @@ -19,7 +19,6 @@ #include #include #include -#include #include "absl/cleanup/cleanup.h" #include "absl/status/statusor.h" @@ -43,18 +42,17 @@ #include #include #include -#endif +#endif // GRPC_POSIX_SOCKET_UTILS_COMMON #include #include #include "absl/status/status.h" -#include "absl/strings/str_format.h" #include #include -#include "src/core/lib/gprpp/host_port.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/gprpp/strerror.h" @@ -66,10 +64,13 @@ namespace grpc_event_engine { namespace posix_engine { +namespace { + using ::grpc_event_engine::experimental::EndpointConfig; using ::grpc_event_engine::experimental::EventEngine; - -namespace { +using ::grpc_event_engine::experimental::ResolvedAddressIsV4Mapped; +using ::grpc_event_engine::experimental::ResolvedAddressToNormalizedString; +using ::grpc_event_engine::experimental::ResolvedAddressToV4Mapped; int AdjustValue(int default_value, int min_value, int max_value, absl::optional actual_value) { @@ -105,8 +106,6 @@ int CreateSocket(std::function socket_factory, int family, : socket(family, type, protocol); } -const uint8_t kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}; - absl::Status PrepareTcpClientSocket(PosixSocketWrapper sock, const EventEngine::ResolvedAddress& addr, const PosixTcpOptions& options) { @@ -234,192 +233,6 @@ int Accept4(int sockfd, #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON -bool SockaddrIsV4Mapped(const EventEngine::ResolvedAddress* resolved_addr, - EventEngine::ResolvedAddress* resolved_addr4_out) { - const sockaddr* addr = resolved_addr->address(); - if (addr->sa_family == AF_INET6) { - const sockaddr_in6* addr6 = reinterpret_cast(addr); - sockaddr_in* addr4_out = - resolved_addr4_out == nullptr - ? nullptr - : reinterpret_cast( - const_cast(resolved_addr4_out->address())); - - if (memcmp(addr6->sin6_addr.s6_addr, kV4MappedPrefix, - sizeof(kV4MappedPrefix)) == 0) { - if (resolved_addr4_out != nullptr) { - // Normalize ::ffff:0.0.0.0/96 to IPv4. - memset(addr4_out, 0, sizeof(sockaddr_in)); - addr4_out->sin_family = AF_INET; - // s6_addr32 would be nice, but it's non-standard. - memcpy(&addr4_out->sin_addr, &addr6->sin6_addr.s6_addr[12], 4); - addr4_out->sin_port = addr6->sin6_port; - *resolved_addr4_out = EventEngine::ResolvedAddress( - reinterpret_cast(addr4_out), - static_cast(sizeof(sockaddr_in))); - } - return true; - } - } - return false; -} - -bool SockaddrToV4Mapped(const EventEngine::ResolvedAddress* resolved_addr, - EventEngine::ResolvedAddress* resolved_addr6_out) { - GPR_ASSERT(resolved_addr != resolved_addr6_out); - const sockaddr* addr = resolved_addr->address(); - sockaddr_in6* addr6_out = const_cast( - reinterpret_cast(resolved_addr6_out->address())); - if (addr->sa_family == AF_INET) { - const sockaddr_in* addr4 = reinterpret_cast(addr); - memset(resolved_addr6_out, 0, sizeof(*resolved_addr6_out)); - addr6_out->sin6_family = AF_INET6; - memcpy(&addr6_out->sin6_addr.s6_addr[0], kV4MappedPrefix, 12); - memcpy(&addr6_out->sin6_addr.s6_addr[12], &addr4->sin_addr, 4); - addr6_out->sin6_port = addr4->sin_port; - *resolved_addr6_out = EventEngine::ResolvedAddress( - reinterpret_cast(addr6_out), - static_cast(sizeof(sockaddr_in6))); - return true; - } - return false; -} - -absl::StatusOr SockaddrToString( - const EventEngine::ResolvedAddress* resolved_addr, bool normalize) { - const int save_errno = errno; - EventEngine::ResolvedAddress addr_normalized; - if (normalize && SockaddrIsV4Mapped(resolved_addr, &addr_normalized)) { - resolved_addr = &addr_normalized; - } - const sockaddr* addr = - reinterpret_cast(resolved_addr->address()); - std::string out; -#ifdef GRPC_HAVE_UNIX_SOCKET - if (addr->sa_family == AF_UNIX) { - const sockaddr_un* addr_un = reinterpret_cast(addr); - bool abstract = addr_un->sun_path[0] == '\0'; - if (abstract) { -#ifdef GPR_APPLE - int len = resolved_addr->size() - sizeof(addr_un->sun_family) - - sizeof(addr_un->sun_len); -#else - int len = resolved_addr->size() - sizeof(addr_un->sun_family); -#endif - if (len <= 0) { - return absl::InvalidArgumentError("Empty UDS abstract path"); - } - out = std::string(addr_un->sun_path, len); - } else { - size_t maxlen = sizeof(addr_un->sun_path); - if (strnlen(addr_un->sun_path, maxlen) == maxlen) { - return absl::InvalidArgumentError("UDS path is not null-terminated"); - } - out = std::string(addr_un->sun_path); - } - return out; - } -#endif - - const void* ip = nullptr; - int port = 0; - uint32_t sin6_scope_id = 0; - if (addr->sa_family == AF_INET) { - const sockaddr_in* addr4 = reinterpret_cast(addr); - ip = &addr4->sin_addr; - port = ntohs(addr4->sin_port); - } else if (addr->sa_family == AF_INET6) { - const sockaddr_in6* addr6 = reinterpret_cast(addr); - ip = &addr6->sin6_addr; - port = ntohs(addr6->sin6_port); - sin6_scope_id = addr6->sin6_scope_id; - } - char ntop_buf[INET6_ADDRSTRLEN]; - if (ip != nullptr && - inet_ntop(addr->sa_family, ip, ntop_buf, sizeof(ntop_buf)) != nullptr) { - if (sin6_scope_id != 0) { - // Enclose sin6_scope_id with the format defined in RFC 6874 - // section 2. - std::string host_with_scope = - absl::StrFormat("%s%%%" PRIu32, ntop_buf, sin6_scope_id); - out = grpc_core::JoinHostPort(host_with_scope, port); - } else { - out = grpc_core::JoinHostPort(ntop_buf, port); - } - } else { - return absl::InvalidArgumentError( - absl::StrCat("Unknown sockaddr family: ", addr->sa_family)); - } - // This is probably redundant, but we wouldn't want to log the wrong - // error. - errno = save_errno; - return out; -} - -EventEngine::ResolvedAddress SockaddrMakeWild6(int port) { - EventEngine::ResolvedAddress resolved_wild_out; - sockaddr_in6* wild_out = reinterpret_cast( - const_cast(resolved_wild_out.address())); - GPR_ASSERT(port >= 0 && port < 65536); - memset(wild_out, 0, sizeof(sockaddr_in6)); - wild_out->sin6_family = AF_INET6; - wild_out->sin6_port = htons(static_cast(port)); - return EventEngine::ResolvedAddress( - reinterpret_cast(wild_out), - static_cast(sizeof(sockaddr_in6))); -} - -EventEngine::ResolvedAddress SockaddrMakeWild4(int port) { - EventEngine::ResolvedAddress resolved_wild_out; - sockaddr_in* wild_out = reinterpret_cast( - const_cast(resolved_wild_out.address())); - GPR_ASSERT(port >= 0 && port < 65536); - memset(wild_out, 0, sizeof(sockaddr_in)); - wild_out->sin_family = AF_INET; - wild_out->sin_port = htons(static_cast(port)); - return EventEngine::ResolvedAddress( - reinterpret_cast(wild_out), - static_cast(sizeof(sockaddr_in))); -} - -int SockaddrGetPort(const EventEngine::ResolvedAddress& resolved_addr) { - const sockaddr* addr = resolved_addr.address(); - switch (addr->sa_family) { - case AF_INET: - return ntohs((reinterpret_cast(addr))->sin_port); - case AF_INET6: - return ntohs((reinterpret_cast(addr))->sin6_port); -#ifdef GRPC_HAVE_UNIX_SOCKET - case AF_UNIX: - return 1; -#endif - default: - gpr_log(GPR_ERROR, "Unknown socket family %d in SockaddrGetPort", - addr->sa_family); - abort(); - } -} - -void SockaddrSetPort(EventEngine::ResolvedAddress& resolved_addr, int port) { - sockaddr* addr = const_cast(resolved_addr.address()); - switch (addr->sa_family) { - case AF_INET: - GPR_ASSERT(port >= 0 && port < 65536); - (reinterpret_cast(addr))->sin_port = - htons(static_cast(port)); - return; - case AF_INET6: - GPR_ASSERT(port >= 0 && port < 65536); - (reinterpret_cast(addr))->sin6_port = - htons(static_cast(port)); - return; - default: - gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_set_port", - addr->sa_family); - abort(); - } -} - void UnlinkIfUnixDomainSocket( const EventEngine::ResolvedAddress& resolved_addr) { #ifdef GRPC_HAVE_UNIX_SOCKET @@ -443,37 +256,6 @@ void UnlinkIfUnixDomainSocket( #endif } -absl::optional SockaddrIsWildcard( - const EventEngine::ResolvedAddress& addr) { - const EventEngine::ResolvedAddress* resolved_addr = &addr; - EventEngine::ResolvedAddress addr4_normalized; - if (SockaddrIsV4Mapped(resolved_addr, &addr4_normalized)) { - resolved_addr = &addr4_normalized; - } - if (resolved_addr->address()->sa_family == AF_INET) { - // Check for 0.0.0.0 - const sockaddr_in* addr4 = - reinterpret_cast(resolved_addr->address()); - if (addr4->sin_addr.s_addr != 0) { - return absl::nullopt; - } - return static_cast(ntohs(addr4->sin_port)); - } else if (resolved_addr->address()->sa_family == AF_INET6) { - // Check for :: - const sockaddr_in6* addr6 = - reinterpret_cast(resolved_addr->address()); - int i; - for (i = 0; i < 16; i++) { - if (addr6->sin6_addr.s6_addr[i] != 0) { - return absl::nullopt; - } - } - return static_cast(ntohs(addr6->sin6_port)); - } else { - return absl::nullopt; - } -} - // Instruct the kernel to wait for specified number of bytes to be received on // the socket before generating an interrupt for packet receive. If the call // succeeds, it returns the number of bytes (wait threshold) that was actually @@ -874,7 +656,7 @@ absl::StatusOr PosixSocketWrapper::LocalAddressString() { if (!status.ok()) { return status.status(); } - return SockaddrToString(&(*status), true); + return ResolvedAddressToNormalizedString((*status)); } absl::StatusOr PosixSocketWrapper::PeerAddressString() { @@ -882,7 +664,7 @@ absl::StatusOr PosixSocketWrapper::PeerAddressString() { if (!status.ok()) { return status.status(); } - return SockaddrToString(&(*status), true); + return ResolvedAddressToNormalizedString((*status)); } absl::StatusOr PosixSocketWrapper::CreateDualStackSocket( @@ -909,7 +691,7 @@ absl::StatusOr PosixSocketWrapper::CreateDualStackSocket( return sock; } // If this isn't an IPv4 address, then return whatever we've got. - if (!SockaddrIsV4Mapped(&addr, nullptr)) { + if (!ResolvedAddressIsV4Mapped(addr, nullptr)) { dsmode = PosixSocketWrapper::DSMode::DSMODE_IPV6; return sock; } @@ -937,7 +719,7 @@ PosixSocketWrapper::CreateAndPrepareTcpClientSocket( // Use dualstack sockets where available. Set mapped to v6 or v4 mapped to // v6. - if (!SockaddrToV4Mapped(&target_addr, &mapped_target_addr)) { + if (!ResolvedAddressToV4Mapped(target_addr, &mapped_target_addr)) { // addr is v4 mapped to v6 or just v6. mapped_target_addr = target_addr; } @@ -950,7 +732,7 @@ PosixSocketWrapper::CreateAndPrepareTcpClientSocket( if (dsmode == PosixSocketWrapper::DSMode::DSMODE_IPV4) { // Original addr is either v4 or v4 mapped to v6. Set mapped_addr to v4. - if (!SockaddrIsV4Mapped(&target_addr, &mapped_target_addr)) { + if (!ResolvedAddressIsV4Mapped(target_addr, &mapped_target_addr)) { mapped_target_addr = target_addr; } } @@ -966,21 +748,6 @@ PosixSocketWrapper::CreateAndPrepareTcpClientSocket( #else /* GRPC_POSIX_SOCKET_UTILS_COMMON */ -bool SockaddrIsV4Mapped(const EventEngine::ResolvedAddress* /*resolved_addr*/, - EventEngine::ResolvedAddress* /*resolved_addr4_out*/) { - GPR_ASSERT(false && "unimplemented"); -} - -bool SockaddrToV4Mapped(const EventEngine::ResolvedAddress* /*resolved_addr*/, - EventEngine::ResolvedAddress* /*resolved_addr6_out*/) { - GPR_ASSERT(false && "unimplemented"); -} - -absl::StatusOr SockaddrToString( - const EventEngine::ResolvedAddress* /*resolved_addr*/, bool /*normalize*/) { - GPR_ASSERT(false && "unimplemented"); -} - absl::StatusOr PosixSocketWrapper::SetSocketRcvLowat(int /*bytes*/) { GPR_ASSERT(false && "unimplemented"); } diff --git a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.h b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.h index 7a1db0e416d..e126856a9dd 100644 --- a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.h +++ b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.h @@ -23,7 +23,6 @@ #include "absl/status/status.h" #include "absl/status/statusor.h" -#include "absl/types/optional.h" #include #include @@ -146,52 +145,10 @@ int Accept4(int sockfd, grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr, int nonblock, int cloexec); -// Returns true if resolved_addr is an IPv4-mapped IPv6 address within the -// ::ffff:0.0.0.0/96 range, or false otherwise. - -// If resolved_addr4_out is non-NULL, the inner IPv4 address will be copied -// here when returning true. -bool SockaddrIsV4Mapped(const EventEngine::ResolvedAddress* resolved_addr, - EventEngine::ResolvedAddress* resolved_addr4_out); - -// If resolved_addr is an AF_INET address, writes the corresponding -// ::ffff:0.0.0.0/96 address to resolved_addr6_out and returns true. Otherwise -// returns false. -bool SockaddrToV4Mapped(const EventEngine::ResolvedAddress* resolved_addr, - EventEngine::ResolvedAddress* resolved_addr6_out); - -// Make wild card IPv6 address with specified port. -EventEngine::ResolvedAddress SockaddrMakeWild6(int port); - -// Make wild card IPv4 address with specified port. -EventEngine::ResolvedAddress SockaddrMakeWild4(int port); - -// Given a resolved address, return the port number in the address. -int SockaddrGetPort(const EventEngine::ResolvedAddress& resolved_addr); - -// Modifes the passed address to use the specified port number. The -// operation would only succeed if the passed address is an IPv4 or Ipv6 -// address. Otherwise the function call would abort fail. -void SockaddrSetPort(EventEngine::ResolvedAddress& resolved_addr, int port); - // Unlink the path pointed to by the given address if it refers to UDS path. void UnlinkIfUnixDomainSocket( const EventEngine::ResolvedAddress& resolved_addr); -// Returns the port number associated with the address if the given address is -// not a wildcard ipv6 or ipv6 address. Otherwise returns absl::nullopt -absl::optional SockaddrIsWildcard( - const EventEngine::ResolvedAddress& addr); - -// Converts a EventEngine::ResolvedAddress into a newly-allocated -// human-readable string. -// -// Currently, only the AF_INET, AF_INET6, and AF_UNIX families are -// recognized. If the normalize flag is enabled, ::ffff:0.0.0.0/96 IPv6 -// addresses are displayed as plain IPv4. -absl::StatusOr SockaddrToString( - const EventEngine::ResolvedAddress* resolved_addr, bool normalize); - class PosixSocketWrapper { public: explicit PosixSocketWrapper(int fd) : fd_(fd) { GPR_ASSERT(fd_ > 0); } diff --git a/src/core/lib/event_engine/tcp_socket_utils.cc b/src/core/lib/event_engine/tcp_socket_utils.cc new file mode 100644 index 00000000000..e517ae57f59 --- /dev/null +++ b/src/core/lib/event_engine/tcp_socket_utils.cc @@ -0,0 +1,320 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#include "src/core/lib/event_engine/tcp_socket_utils.h" + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON +#include // IWYU pragma: keep + +#ifdef GRPC_LINUX_TCP_H +#include +#else +#include // IWYU pragma: keep +#include +#endif +#include +#include +#include +#endif // GRPC_POSIX_SOCKET_UTILS_COMMON + +#ifdef GRPC_HAVE_UNIX_SOCKET +#include // IWYU pragma: keep +#include +#endif + +#include +#include +#include +#include + +#include + +#include "absl/status/status.h" +#include "absl/strings/str_cat.h" +#include "absl/strings/str_format.h" + +#include + +#include "src/core/lib/gprpp/host_port.h" +#include "src/core/lib/gprpp/status_helper.h" +#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/uri/uri_parser.h" + +namespace grpc_event_engine { +namespace experimental { + +namespace { +constexpr uint8_t kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0xff, 0xff}; +absl::StatusOr GetScheme( + const EventEngine::ResolvedAddress& resolved_address) { + switch (resolved_address.address()->sa_family) { + case AF_INET: + return "ipv4"; + case AF_INET6: + return "ipv6"; + case AF_UNIX: + return "unix"; + default: + return absl::InvalidArgumentError(absl::StrFormat( + "Unknown scheme: %d", resolved_address.address()->sa_family)); + } +} +} // namespace + +bool ResolvedAddressIsV4Mapped( + const EventEngine::ResolvedAddress& resolved_addr, + EventEngine::ResolvedAddress* resolved_addr4_out) { + const sockaddr* addr = resolved_addr.address(); + if (addr->sa_family == AF_INET6) { + const sockaddr_in6* addr6 = reinterpret_cast(addr); + sockaddr_in* addr4_out = + resolved_addr4_out == nullptr + ? nullptr + : reinterpret_cast( + const_cast(resolved_addr4_out->address())); + + if (memcmp(addr6->sin6_addr.s6_addr, kV4MappedPrefix, + sizeof(kV4MappedPrefix)) == 0) { + if (resolved_addr4_out != nullptr) { + // Normalize ::ffff:0.0.0.0/96 to IPv4. + memset(addr4_out, 0, sizeof(sockaddr_in)); + addr4_out->sin_family = AF_INET; + // s6_addr32 would be nice, but it's non-standard. + memcpy(&addr4_out->sin_addr, &addr6->sin6_addr.s6_addr[12], 4); + addr4_out->sin_port = addr6->sin6_port; + *resolved_addr4_out = EventEngine::ResolvedAddress( + reinterpret_cast(addr4_out), + static_cast(sizeof(sockaddr_in))); + } + return true; + } + } + return false; +} + +bool ResolvedAddressToV4Mapped( + const EventEngine::ResolvedAddress& resolved_addr, + EventEngine::ResolvedAddress* resolved_addr6_out) { + GPR_ASSERT(&resolved_addr != resolved_addr6_out); + const sockaddr* addr = resolved_addr.address(); + sockaddr_in6* addr6_out = const_cast( + reinterpret_cast(resolved_addr6_out->address())); + if (addr->sa_family == AF_INET) { + const sockaddr_in* addr4 = reinterpret_cast(addr); + memset(resolved_addr6_out, 0, sizeof(*resolved_addr6_out)); + addr6_out->sin6_family = AF_INET6; + memcpy(&addr6_out->sin6_addr.s6_addr[0], kV4MappedPrefix, 12); + memcpy(&addr6_out->sin6_addr.s6_addr[12], &addr4->sin_addr, 4); + addr6_out->sin6_port = addr4->sin_port; + *resolved_addr6_out = EventEngine::ResolvedAddress( + reinterpret_cast(addr6_out), + static_cast(sizeof(sockaddr_in6))); + return true; + } + return false; +} + +EventEngine::ResolvedAddress ResolvedAddressMakeWild6(int port) { + EventEngine::ResolvedAddress resolved_wild_out; + sockaddr_in6* wild_out = reinterpret_cast( + const_cast(resolved_wild_out.address())); + GPR_ASSERT(port >= 0 && port < 65536); + memset(wild_out, 0, sizeof(sockaddr_in6)); + wild_out->sin6_family = AF_INET6; + wild_out->sin6_port = htons(static_cast(port)); + return EventEngine::ResolvedAddress( + reinterpret_cast(wild_out), + static_cast(sizeof(sockaddr_in6))); +} + +EventEngine::ResolvedAddress ResolvedAddressMakeWild4(int port) { + EventEngine::ResolvedAddress resolved_wild_out; + sockaddr_in* wild_out = reinterpret_cast( + const_cast(resolved_wild_out.address())); + GPR_ASSERT(port >= 0 && port < 65536); + memset(wild_out, 0, sizeof(sockaddr_in)); + wild_out->sin_family = AF_INET; + wild_out->sin_port = htons(static_cast(port)); + return EventEngine::ResolvedAddress( + reinterpret_cast(wild_out), + static_cast(sizeof(sockaddr_in))); +} + +int ResolvedAddressGetPort(const EventEngine::ResolvedAddress& resolved_addr) { + const sockaddr* addr = resolved_addr.address(); + switch (addr->sa_family) { + case AF_INET: + return ntohs((reinterpret_cast(addr))->sin_port); + case AF_INET6: + return ntohs((reinterpret_cast(addr))->sin6_port); +#ifdef GRPC_HAVE_UNIX_SOCKET + case AF_UNIX: + return 1; +#endif + default: + gpr_log(GPR_ERROR, "Unknown socket family %d in ResolvedAddressGetPort", + addr->sa_family); + abort(); + } +} + +void ResolvedAddressSetPort(EventEngine::ResolvedAddress& resolved_addr, + int port) { + sockaddr* addr = const_cast(resolved_addr.address()); + switch (addr->sa_family) { + case AF_INET: + GPR_ASSERT(port >= 0 && port < 65536); + (reinterpret_cast(addr))->sin_port = + htons(static_cast(port)); + return; + case AF_INET6: + GPR_ASSERT(port >= 0 && port < 65536); + (reinterpret_cast(addr))->sin6_port = + htons(static_cast(port)); + return; + default: + gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_set_port", + addr->sa_family); + abort(); + } +} + +absl::optional ResolvedAddressIsWildcard( + const EventEngine::ResolvedAddress& addr) { + const EventEngine::ResolvedAddress* resolved_addr = &addr; + EventEngine::ResolvedAddress addr4_normalized; + if (ResolvedAddressIsV4Mapped(addr, &addr4_normalized)) { + resolved_addr = &addr4_normalized; + } + if (resolved_addr->address()->sa_family == AF_INET) { + // Check for 0.0.0.0 + const sockaddr_in* addr4 = + reinterpret_cast(resolved_addr->address()); + if (addr4->sin_addr.s_addr != 0) { + return absl::nullopt; + } + return static_cast(ntohs(addr4->sin_port)); + } else if (resolved_addr->address()->sa_family == AF_INET6) { + // Check for :: + const sockaddr_in6* addr6 = + reinterpret_cast(resolved_addr->address()); + int i; + for (i = 0; i < 16; i++) { + if (addr6->sin6_addr.s6_addr[i] != 0) { + return absl::nullopt; + } + } + return static_cast(ntohs(addr6->sin6_port)); + } else { + return absl::nullopt; + } +} + +absl::StatusOr ResolvedAddressToNormalizedString( + const EventEngine::ResolvedAddress& resolved_addr) { + EventEngine::ResolvedAddress addr_normalized; + if (ResolvedAddressIsV4Mapped(resolved_addr, &addr_normalized)) { + return ResolvedAddressToString(addr_normalized); + } + return ResolvedAddressToString(resolved_addr); +} + +absl::StatusOr ResolvedAddressToString( + const EventEngine::ResolvedAddress& resolved_addr) { + const int save_errno = errno; + const sockaddr* addr = resolved_addr.address(); + std::string out; +#ifdef GRPC_HAVE_UNIX_SOCKET + if (addr->sa_family == AF_UNIX) { + const sockaddr_un* addr_un = reinterpret_cast(addr); + bool abstract = addr_un->sun_path[0] == '\0'; + if (abstract) { +#ifdef GPR_APPLE + int len = resolved_addr.size() - sizeof(addr_un->sun_family) - + sizeof(addr_un->sun_len); +#else + int len = resolved_addr.size() - sizeof(addr_un->sun_family); +#endif + if (len <= 0) { + return absl::InvalidArgumentError("Empty UDS abstract path"); + } + out = std::string(addr_un->sun_path, len); + } else { + size_t maxlen = sizeof(addr_un->sun_path); + if (strnlen(addr_un->sun_path, maxlen) == maxlen) { + return absl::InvalidArgumentError("UDS path is not null-terminated"); + } + out = std::string(addr_un->sun_path); + } + return out; + } +#endif // GRPC_HAVE_UNIX_SOCKET + + const void* ip = nullptr; + int port = 0; + uint32_t sin6_scope_id = 0; + if (addr->sa_family == AF_INET) { + const sockaddr_in* addr4 = reinterpret_cast(addr); + ip = &addr4->sin_addr; + port = ntohs(addr4->sin_port); + } else if (addr->sa_family == AF_INET6) { + const sockaddr_in6* addr6 = reinterpret_cast(addr); + ip = &addr6->sin6_addr; + port = ntohs(addr6->sin6_port); + sin6_scope_id = addr6->sin6_scope_id; + } + char ntop_buf[INET6_ADDRSTRLEN]; + if (ip != nullptr && + inet_ntop(addr->sa_family, ip, ntop_buf, sizeof(ntop_buf)) != nullptr) { + if (sin6_scope_id != 0) { + // Enclose sin6_scope_id with the format defined in RFC 6874 + // section 2. + std::string host_with_scope = + absl::StrFormat("%s%%%" PRIu32, ntop_buf, sin6_scope_id); + out = grpc_core::JoinHostPort(host_with_scope, port); + } else { + out = grpc_core::JoinHostPort(ntop_buf, port); + } + } else { + return absl::InvalidArgumentError( + absl::StrCat("Unknown sockaddr family: ", addr->sa_family)); + } + // This is probably redundant, but we wouldn't want to log the wrong + // error. + errno = save_errno; + return out; +} + +absl::StatusOr ResolvedAddressToURI( + const EventEngine::ResolvedAddress& resolved_address) { + if (resolved_address.size() == 0) { + return absl::InvalidArgumentError("Empty address"); + } + auto scheme = GetScheme(resolved_address); + GRPC_RETURN_IF_ERROR(scheme.status()); + auto path = ResolvedAddressToString(resolved_address); + GRPC_RETURN_IF_ERROR(path.status()); + absl::StatusOr uri = + grpc_core::URI::Create(*scheme, /*authority=*/"", std::move(path.value()), + /*query_parameter_pairs=*/{}, /*fragment=*/""); + if (!uri.ok()) return uri.status(); + return uri->ToString(); +} + +} // namespace experimental +} // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/tcp_socket_utils.h b/src/core/lib/event_engine/tcp_socket_utils.h new file mode 100644 index 00000000000..e76900568ea --- /dev/null +++ b/src/core/lib/event_engine/tcp_socket_utils.h @@ -0,0 +1,85 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_TCP_SOCKET_UTILS_H +#define GRPC_CORE_LIB_EVENT_ENGINE_TCP_SOCKET_UTILS_H + +#include + +#include + +#include "absl/status/statusor.h" +#include "absl/types/optional.h" + +#include + +namespace grpc_event_engine { +namespace experimental { + +// Returns true if resolved_addr is an IPv4-mapped IPv6 address within the +// ::ffff:0.0.0.0/96 range, or false otherwise. +// If resolved_addr4_out is non-NULL, the inner IPv4 address will be copied +// here when returning true. +bool ResolvedAddressIsV4Mapped( + const EventEngine::ResolvedAddress& resolved_addr, + EventEngine::ResolvedAddress* resolved_addr4_out); + +// If resolved_addr is an AF_INET address, writes the corresponding +// ::ffff:0.0.0.0/96 address to resolved_addr6_out and returns true. Otherwise +// returns false. +bool ResolvedAddressToV4Mapped( + const EventEngine::ResolvedAddress& resolved_addr, + EventEngine::ResolvedAddress* resolved_addr6_out); + +// Make wild card IPv6 address with specified port. +EventEngine::ResolvedAddress ResolvedAddressMakeWild6(int port); + +// Make wild card IPv4 address with specified port. +EventEngine::ResolvedAddress ResolvedAddressMakeWild4(int port); + +// Given a resolved address, return the port number in the address. +int ResolvedAddressGetPort(const EventEngine::ResolvedAddress& resolved_addr); + +// Modifies the address, setting the specified port number. +// The operation would only succeed if the passed address is an IPv4 or Ipv6 +// address. Otherwise the function call would abort fail. +void ResolvedAddressSetPort(EventEngine::ResolvedAddress& resolved_addr, + int port); + +// Returns the port number associated with the address if the given address is +// not a wildcard ipv6 or ipv6 address. Otherwise returns absl::nullopt +absl::optional ResolvedAddressIsWildcard( + const EventEngine::ResolvedAddress& addr); + +// Converts a EventEngine::ResolvedAddress into a newly-allocated +// human-readable string. +// Currently, only the AF_INET, AF_INET6, and AF_UNIX families are +// recognized. +absl::StatusOr ResolvedAddressToString( + const EventEngine::ResolvedAddress& resolved_addr); + +// Converts a EventEngine::ResolvedAddress into a newly-allocated +// human-readable string. See ResolvedAddressToString. +// This functional normalizes, so for example: ::ffff:0.0.0.0/96 IPv6 +// addresses are displayed as plain IPv4. +absl::StatusOr ResolvedAddressToNormalizedString( + const EventEngine::ResolvedAddress& resolved_addr); + +// Returns the URI string corresponding to the resolved_address +absl::StatusOr ResolvedAddressToURI( + const EventEngine::ResolvedAddress& resolved_address); + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GRPC_CORE_LIB_EVENT_ENGINE_TCP_SOCKET_UTILS_H diff --git a/src/core/lib/event_engine/windows/windows_endpoint.cc b/src/core/lib/event_engine/windows/windows_endpoint.cc new file mode 100644 index 00000000000..c77b925c96f --- /dev/null +++ b/src/core/lib/event_engine/windows/windows_endpoint.cc @@ -0,0 +1,273 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#ifdef GPR_WINDOWS + +#include "absl/cleanup/cleanup.h" +#include "absl/functional/any_invocable.h" +#include "absl/status/status.h" +#include "absl/strings/str_format.h" + +#include +#include + +#include "src/core/lib/event_engine/tcp_socket_utils.h" +#include "src/core/lib/event_engine/trace.h" +#include "src/core/lib/event_engine/windows/windows_endpoint.h" +#include "src/core/lib/gprpp/debug_location.h" +#include "src/core/lib/gprpp/status_helper.h" +#include "src/core/lib/iomgr/error.h" + +namespace grpc_event_engine { +namespace experimental { + +// TODO(hork): The previous implementation required internal ref counting. Add +// this when it becomes necessary. +// TODO(hork): The previous implementation required a 2-phase shutdown. Add this +// when it becomes necessary. + +namespace { +constexpr int64_t kDefaultTargetReadSize = 8192; +constexpr int kMaxWSABUFCount = 16; + +void AbortOnEvent(absl::Status) { + GPR_ASSERT(false && + "INTERNAL ERROR: Asked to handle read/write event with an invalid " + "callback"); +} + +} // namespace + +WindowsEndpoint::WindowsEndpoint( + const EventEngine::ResolvedAddress& peer_address, + std::unique_ptr socket, MemoryAllocator&& allocator, + const EndpointConfig& /* config */, Executor* executor) + : peer_address_(peer_address), + socket_(std::move(socket)), + allocator_(std::move(allocator)), + handle_read_event_(this), + handle_write_event_(this), + executor_(executor) { + sockaddr addr; + int addr_len = sizeof(addr); + if (getsockname(socket_->socket(), &addr, &addr_len) < 0) { + gpr_log(GPR_ERROR, "Unrecoverable error: Failed to get local socket name."); + abort(); + } + local_address_ = EventEngine::ResolvedAddress(&addr, addr_len); + local_address_string_ = *ResolvedAddressToURI(local_address_); + peer_address_string_ = *ResolvedAddressToURI(peer_address_); +} + +WindowsEndpoint::~WindowsEndpoint() { + socket_->MaybeShutdown(absl::OkStatus()); +} + +void WindowsEndpoint::Read(absl::AnyInvocable on_read, + SliceBuffer* buffer, const ReadArgs* args) { + // TODO(hork): last_read_buffer from iomgr: Is it only garbage, or optimized? + GRPC_EVENT_ENGINE_TRACE("WindowsEndpoint::%p reading", this); + // Prepare the WSABUF struct + WSABUF wsa_buffers[kMaxWSABUFCount]; + int min_read_size = kDefaultTargetReadSize; + if (args != nullptr && args->read_hint_bytes > 0) { + min_read_size = args->read_hint_bytes; + } + if (buffer->Length() < min_read_size && buffer->Count() < kMaxWSABUFCount) { + buffer->AppendIndexed(Slice(allocator_.MakeSlice(min_read_size))); + } + GPR_ASSERT(buffer->Count() <= kMaxWSABUFCount); + for (int i = 0; i < buffer->Count(); i++) { + Slice tmp = buffer->RefSlice(i); + wsa_buffers[i].buf = (char*)tmp.begin(); + wsa_buffers[i].len = tmp.size(); + } + DWORD bytes_read = 0; + DWORD flags = 0; + // First let's try a synchronous, non-blocking read. + int status = WSARecv(socket_->socket(), wsa_buffers, (DWORD)buffer->Count(), + &bytes_read, &flags, nullptr, nullptr); + int wsa_error = status == 0 ? 0 : WSAGetLastError(); + // Did we get data immediately ? Yay. + if (wsa_error != WSAEWOULDBLOCK) { + // prune slicebuffer + if (bytes_read != buffer->Length()) { + buffer->RemoveLastNBytes(buffer->Length() - bytes_read); + } + executor_->Run([on_read = std::move(on_read)]() mutable { + on_read(absl::OkStatus()); + }); + return; + } + // Otherwise, let's retry, by queuing a read. + memset(socket_->read_info()->overlapped(), 0, sizeof(OVERLAPPED)); + status = + WSARecv(socket_->socket(), wsa_buffers, (DWORD)buffer->Count(), + &bytes_read, &flags, socket_->read_info()->overlapped(), nullptr); + wsa_error = status == 0 ? 0 : WSAGetLastError(); + if (wsa_error != 0 && wsa_error != WSA_IO_PENDING) { + // Async read returned immediately with an error + executor_->Run([this, on_read = std::move(on_read), wsa_error]() mutable { + on_read(GRPC_WSA_ERROR( + wsa_error, + absl::StrFormat("WindowsEndpont::%p Read failed", this).c_str())); + }); + return; + } + + handle_read_event_.Prime(buffer, std::move(on_read)); + socket_->NotifyOnRead(&handle_read_event_); +} + +void WindowsEndpoint::Write(absl::AnyInvocable on_writable, + SliceBuffer* data, const WriteArgs* /* args */) { + if (grpc_event_engine_trace.enabled()) { + for (int i = 0; i < data->Count(); i++) { + auto str = data->RefSlice(i).as_string_view(); + gpr_log(GPR_INFO, "WindowsEndpoint::%p WRITE (peer=%s): %.*s", this, + peer_address_string_.c_str(), str.length(), str.data()); + } + } + GPR_ASSERT(data->Count() <= UINT_MAX); + absl::InlinedVector buffers(data->Count()); + for (int i = 0; i < data->Count(); i++) { + auto slice = data->RefSlice(i); + GPR_ASSERT(slice.size() <= ULONG_MAX); + buffers[i].len = slice.size(); + buffers[i].buf = (char*)slice.begin(); + } + // First, let's try a synchronous, non-blocking write. + DWORD bytes_sent; + int status = WSASend(socket_->socket(), buffers.data(), (DWORD)buffers.size(), + &bytes_sent, 0, nullptr, nullptr); + size_t async_buffers_offset; + if (status == 0) { + if (bytes_sent == data->Length()) { + // Write completed, exiting early + executor_->Run( + [cb = std::move(on_writable)]() mutable { cb(absl::OkStatus()); }); + return; + } + // The data was not completely delivered, we should send the rest of it by + // doing an async write operation. + for (int i = 0; i < data->Count(); i++) { + if (buffers[i].len > bytes_sent) { + buffers[i].buf += bytes_sent; + buffers[i].len -= bytes_sent; + break; + } + bytes_sent -= buffers[i].len; + async_buffers_offset++; + } + } else { + // We would kind of expect to get a WSAEWOULDBLOCK here, especially on a + // busy connection that has its send queue filled up. But if we don't, + // then we can avoid doing an async write operation at all. + int wsa_error = WSAGetLastError(); + if (wsa_error != WSAEWOULDBLOCK) { + executor_->Run([cb = std::move(on_writable), wsa_error]() mutable { + cb(GRPC_WSA_ERROR(wsa_error, "WSASend")); + }); + return; + } + } + auto write_info = socket_->write_info(); + memset(write_info->overlapped(), 0, sizeof(OVERLAPPED)); + status = WSASend(socket_->socket(), &buffers[async_buffers_offset], + (DWORD)(data->Count() - async_buffers_offset), nullptr, 0, + write_info->overlapped(), nullptr); + + if (status != 0) { + int wsa_error = WSAGetLastError(); + if (wsa_error != WSA_IO_PENDING) { + executor_->Run([cb = std::move(on_writable), wsa_error]() mutable { + cb(GRPC_WSA_ERROR(wsa_error, "WSASend")); + }); + return; + } + } + // As all is now setup, we can now ask for the IOCP notification. It may + // trigger the callback immediately however, but no matter. + handle_write_event_.Prime(data, std::move(on_writable)); + socket_->NotifyOnWrite(&handle_write_event_); +} +const EventEngine::ResolvedAddress& WindowsEndpoint::GetPeerAddress() const { + return peer_address_; +} +const EventEngine::ResolvedAddress& WindowsEndpoint::GetLocalAddress() const { + return local_address_; +} + +// ---- Handle{Read|Write}Closure + +WindowsEndpoint::BaseEventClosure::BaseEventClosure(WindowsEndpoint* endpoint) + : endpoint_(endpoint), cb_(&AbortOnEvent) {} + +void WindowsEndpoint::HandleReadClosure::Run() { + GRPC_EVENT_ENGINE_TRACE("WindowsEndpoint::%p Handling Read Event", endpoint_); + absl::Status status; + auto* read_info = endpoint_->socket_->read_info(); + auto cb_cleanup = absl::MakeCleanup([this, &status]() { + auto cb = std::move(cb_); + cb_ = &AbortOnEvent; + cb(status); + }); + if (read_info->wsa_error() != 0) { + status = GRPC_WSA_ERROR(read_info->wsa_error(), "Async Read Error"); + buffer_->Clear(); + return; + } + if (read_info->bytes_transferred() > 0) { + GPR_ASSERT(read_info->bytes_transferred() <= buffer_->Length()); + if (read_info->bytes_transferred() != buffer_->Length()) { + buffer_->RemoveLastNBytes(buffer_->Length() - + read_info->bytes_transferred()); + } + GPR_ASSERT(read_info->bytes_transferred() == buffer_->Length()); + if (grpc_event_engine_trace.enabled()) { + for (int i = 0; i < buffer_->Count(); i++) { + auto str = buffer_->RefSlice(i).as_string_view(); + gpr_log(GPR_INFO, "WindowsEndpoint::%p READ (peer=%s): %.*s", this, + endpoint_->peer_address_string_.c_str(), str.length(), + str.data()); + } + } + return; + } + // Either the endpoint is shut down or we've seen the end of the stream + buffer_->Clear(); + // TODO(hork): different error message if shut down + status = absl::UnavailableError("End of TCP stream"); +} + +void WindowsEndpoint::HandleWriteClosure::Run() { + GRPC_EVENT_ENGINE_TRACE("WindowsEndpoint::%p Handling Write Event", + endpoint_); + auto* write_info = endpoint_->socket_->write_info(); + auto cb = std::move(cb_); + cb_ = &AbortOnEvent; + absl::Status status; + if (write_info->wsa_error() != 0) { + status = GRPC_WSA_ERROR(write_info->wsa_error(), "WSASend"); + } else { + GPR_ASSERT(write_info->bytes_transferred() == buffer_->Length()); + } + cb(status); +} + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GPR_WINDOWS diff --git a/src/core/lib/event_engine/windows/windows_endpoint.h b/src/core/lib/event_engine/windows/windows_endpoint.h new file mode 100644 index 00000000000..6bd698df329 --- /dev/null +++ b/src/core/lib/event_engine/windows/windows_endpoint.h @@ -0,0 +1,94 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENDPOINT_H +#define GRPC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENDPOINT_H +#include + +#ifdef GPR_WINDOWS + +#include + +#include "src/core/lib/event_engine/windows/win_socket.h" + +namespace grpc_event_engine { +namespace experimental { + +class WindowsEndpoint : public EventEngine::Endpoint { + public: + WindowsEndpoint(const EventEngine::ResolvedAddress& peer_address, + std::unique_ptr socket, + MemoryAllocator&& allocator, const EndpointConfig& config, + Executor* Executor); + ~WindowsEndpoint() override; + void Read(absl::AnyInvocable on_read, SliceBuffer* buffer, + const ReadArgs* args) override; + void Write(absl::AnyInvocable on_writable, + SliceBuffer* data, const WriteArgs* args) override; + const EventEngine::ResolvedAddress& GetPeerAddress() const override; + const EventEngine::ResolvedAddress& GetLocalAddress() const override; + + private: + // Base class for the Read- and Write-specific event handler callbacks + class BaseEventClosure : public EventEngine::Closure { + public: + explicit BaseEventClosure(WindowsEndpoint* endpoint); + // Calls the bound application callback, inline. + // If called through IOCP, this will be run from within an Executor. + virtual void Run() = 0; + + // Prepare the closure by setting the application callback and SliceBuffer + void Prime(SliceBuffer* buffer, absl::AnyInvocable cb) { + cb_ = std::move(cb); + buffer_ = buffer; + } + + protected: + absl::AnyInvocable cb_; + SliceBuffer* buffer_; + WindowsEndpoint* endpoint_; + }; + + // Permanent closure type for Read callbacks + class HandleReadClosure : public BaseEventClosure { + public: + explicit HandleReadClosure(WindowsEndpoint* endpoint) + : BaseEventClosure(endpoint) {} + void Run() override; + }; + + // Permanent closure type for Write callbacks + class HandleWriteClosure : public BaseEventClosure { + public: + explicit HandleWriteClosure(WindowsEndpoint* endpoint) + : BaseEventClosure(endpoint) {} + void Run() override; + }; + + EventEngine::ResolvedAddress peer_address_; + std::string peer_address_string_; + EventEngine::ResolvedAddress local_address_; + std::string local_address_string_; + std::unique_ptr socket_; + MemoryAllocator allocator_; + HandleReadClosure handle_read_event_; + HandleWriteClosure handle_write_event_; + Executor* executor_; +}; + +} // namespace experimental +} // namespace grpc_event_engine + +#endif + +#endif // GRPC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENDPOINT_H diff --git a/src/core/lib/event_engine/windows/windows_engine.cc b/src/core/lib/event_engine/windows/windows_engine.cc index c91e1c1a8bc..254cf941e66 100644 --- a/src/core/lib/event_engine/windows/windows_engine.cc +++ b/src/core/lib/event_engine/windows/windows_engine.cc @@ -162,4 +162,5 @@ WindowsEventEngine::CreateListener( } // namespace experimental } // namespace grpc_event_engine + #endif // GPR_WINDOWS diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc index 0a5ee704eea..c5f2ea9d084 100644 --- a/src/core/lib/iomgr/error.cc +++ b/src/core/lib/iomgr/error.cc @@ -68,8 +68,9 @@ absl::Status grpc_os_error(const grpc_core::DebugLocation& location, int err, } #ifdef GPR_WINDOWS +// TODO(veblush): lift out of iomgr for use in the WindowsEventEngine absl::Status grpc_wsa_error(const grpc_core::DebugLocation& location, int err, - const char* call_name) { + absl::string_view call_name) { char* utf8_message = gpr_format_message(err); absl::Status s = StatusCreate(absl::StatusCode::kUnavailable, "WSA Error", location, {}); diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 467b732d0a4..f9cc88fb210 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -84,7 +84,7 @@ inline absl::Status grpc_assert_never_ok(absl::Status error) { grpc_assert_never_ok(grpc_os_error(DEBUG_LOCATION, err, call_name)) absl::Status grpc_wsa_error(const grpc_core::DebugLocation& location, int err, - const char* call_name) GRPC_MUST_USE_RESULT; + absl::string_view call_name) GRPC_MUST_USE_RESULT; /// windows only: create an error associated with WSAGetLastError()!=0 #define GRPC_WSA_ERROR(err, call_name) \ diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index bb788181bdf..681beb2cfd0 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -470,6 +470,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/event_engine/resolved_address.cc', 'src/core/lib/event_engine/slice.cc', 'src/core/lib/event_engine/slice_buffer.cc', + 'src/core/lib/event_engine/tcp_socket_utils.cc', 'src/core/lib/event_engine/thread_pool.cc', 'src/core/lib/event_engine/time_util.cc', 'src/core/lib/event_engine/trace.cc', diff --git a/test/core/event_engine/BUILD b/test/core/event_engine/BUILD index 7ab9e8a7940..932e4f48f93 100644 --- a/test/core/event_engine/BUILD +++ b/test/core/event_engine/BUILD @@ -169,3 +169,22 @@ grpc_cc_library( ], deps = ["//:event_engine_base_hdrs"], ) + +grpc_cc_test( + name = "tcp_socket_utils_test", + srcs = ["tcp_socket_utils_test.cc"], + external_deps = [ + "absl/status", + "absl/status:statusor", + "gtest", + ], + language = "C++", + uses_event_engine = False, + uses_polling = False, + deps = [ + "//:gpr", + "//:grpc", + "//src/core:event_engine_tcp_socket_utils", + "//src/core:iomgr_port", + ], +) diff --git a/test/core/event_engine/posix/BUILD b/test/core/event_engine/posix/BUILD index 0ff078f870f..a42f8361978 100644 --- a/test/core/event_engine/posix/BUILD +++ b/test/core/event_engine/posix/BUILD @@ -206,6 +206,7 @@ grpc_cc_test( uses_event_engine = False, deps = [ "//src/core:event_engine_common", + "//src/core:event_engine_tcp_socket_utils", "//src/core:posix_event_engine_listener_utils", "//src/core:posix_event_engine_tcp_socket_utils", "//src/core:socket_mutator", diff --git a/test/core/event_engine/posix/posix_engine_listener_utils_test.cc b/test/core/event_engine/posix/posix_engine_listener_utils_test.cc index af05658124f..86ff00bfeb6 100644 --- a/test/core/event_engine/posix/posix_engine_listener_utils_test.cc +++ b/test/core/event_engine/posix/posix_engine_listener_utils_test.cc @@ -38,6 +38,7 @@ #include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h" #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" #include "test/core/util/port.h" namespace grpc_event_engine { @@ -46,6 +47,8 @@ namespace posix_engine { namespace { using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig; +using ::grpc_event_engine::experimental::ResolvedAddressGetPort; +using ::grpc_event_engine::experimental::ResolvedAddressToNormalizedString; class TestListenerSocketsContainer : public ListenerSocketsContainer { public: @@ -90,10 +93,10 @@ TEST(PosixEngineListenerUtils, ListenerContainerAddWildcardAddressesTest) { ASSERT_TRUE((*socket).addr.address()->sa_family == AF_INET6 || (*socket).addr.address()->sa_family == AF_INET); if ((*socket).addr.address()->sa_family == AF_INET6) { - EXPECT_EQ(SockaddrToString(&(*socket).addr, true).value(), + EXPECT_EQ(ResolvedAddressToNormalizedString((*socket).addr).value(), absl::StrCat("[::]:", std::to_string(port))); } else if ((*socket).addr.address()->sa_family == AF_INET) { - EXPECT_EQ(SockaddrToString(&(*socket).addr, true).value(), + EXPECT_EQ(ResolvedAddressToNormalizedString((*socket).addr).value(), absl::StrCat("0.0.0.0:", std::to_string(port))); } close(socket->sock.Fd()); @@ -139,7 +142,7 @@ TEST(PosixEngineListenerUtils, ListenerContainerAddAllLocalAddressesTest) { ++socket) { ASSERT_TRUE((*socket).addr.address()->sa_family == AF_INET6 || (*socket).addr.address()->sa_family == AF_INET); - EXPECT_EQ(SockaddrGetPort((*socket).addr), port); + EXPECT_EQ(ResolvedAddressGetPort((*socket).addr), port); close(socket->sock.Fd()); } } diff --git a/test/core/event_engine/posix/posix_event_engine_connect_test.cc b/test/core/event_engine/posix/posix_event_engine_connect_test.cc index 563737de612..5c1c9483047 100644 --- a/test/core/event_engine/posix/posix_event_engine_connect_test.cc +++ b/test/core/event_engine/posix/posix_event_engine_connect_test.cc @@ -41,7 +41,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/event_engine/posix_engine/posix_engine.h" -#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gprpp/notification.h" #include "src/core/lib/resource_quota/memory_quota.h" @@ -56,6 +56,7 @@ namespace posix_engine { using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig; using ::grpc_event_engine::experimental::EventEngine; using ::grpc_event_engine::experimental::PosixEventEngine; +using ::grpc_event_engine::experimental::ResolvedAddressToNormalizedString; using ::grpc_event_engine::experimental::URIToResolvedAddress; using ::grpc_event_engine::experimental::WaitForSingleOwner; using namespace std::chrono_literals; @@ -156,7 +157,7 @@ TEST(PosixEventEngineTest, IndefiniteConnectTimeoutOrRstTest) { auto resolved_addr = URIToResolvedAddress(target_addr); std::shared_ptr posix_ee = std::make_shared(); std::string resolved_addr_str = - SockaddrToString(&resolved_addr, true).value(); + ResolvedAddressToNormalizedString(resolved_addr).value(); auto sockets = CreateConnectedSockets(resolved_addr); grpc_core::Notification signal; grpc_core::ChannelArgs args; @@ -184,7 +185,7 @@ TEST(PosixEventEngineTest, IndefiniteConnectCancellationTest) { auto resolved_addr = URIToResolvedAddress(target_addr); std::shared_ptr posix_ee = std::make_shared(); std::string resolved_addr_str = - SockaddrToString(&resolved_addr, true).value(); + ResolvedAddressToNormalizedString(resolved_addr).value(); auto sockets = CreateConnectedSockets(resolved_addr); grpc_core::ChannelArgs args; auto quota = grpc_core::ResourceQuota::Default(); diff --git a/test/core/event_engine/posix/tcp_posix_socket_utils_test.cc b/test/core/event_engine/posix/tcp_posix_socket_utils_test.cc index c7236f22d45..4ea8c1e41bb 100644 --- a/test/core/event_engine/posix/tcp_posix_socket_utils_test.cc +++ b/test/core/event_engine/posix/tcp_posix_socket_utils_test.cc @@ -12,12 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include #include -#include - #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/str_cat.h" @@ -34,7 +31,6 @@ // This test won't work except with posix sockets enabled #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON -#include #include #include #include @@ -43,7 +39,6 @@ #endif #include -#include #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" #include "src/core/lib/gpr/useful.h" @@ -118,97 +113,6 @@ const grpc_socket_mutator_vtable mutator_vtable = {MutateFd, CompareTestMutator, const grpc_socket_mutator_vtable mutator_vtable2 = { nullptr, CompareTestMutator, DestroyTestMutator, MutateFd2}; -const uint8_t kMapped[] = {0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0xff, 0xff, 192, 0, 2, 1}; - -const uint8_t kNotQuiteMapped[] = {0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0xff, 0xfe, 192, 0, 2, 99}; -const uint8_t kIPv4[] = {192, 0, 2, 1}; - -const uint8_t kIPv6[] = {0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 1}; - -EventEngine::ResolvedAddress MakeAddr4(const uint8_t* data, size_t data_len) { - EventEngine::ResolvedAddress resolved_addr4; - sockaddr_in* addr4 = reinterpret_cast( - const_cast(resolved_addr4.address())); - memset(&resolved_addr4, 0, sizeof(resolved_addr4)); - addr4->sin_family = AF_INET; - GPR_ASSERT(data_len == sizeof(addr4->sin_addr.s_addr)); - memcpy(&addr4->sin_addr.s_addr, data, data_len); - addr4->sin_port = htons(12345); - return EventEngine::ResolvedAddress( - reinterpret_cast(addr4), - static_cast(sizeof(sockaddr_in))); -} - -EventEngine::ResolvedAddress MakeAddr6(const uint8_t* data, size_t data_len) { - EventEngine::ResolvedAddress resolved_addr6; - sockaddr_in6* addr6 = reinterpret_cast( - const_cast(resolved_addr6.address())); - memset(&resolved_addr6, 0, sizeof(resolved_addr6)); - addr6->sin6_family = AF_INET6; - GPR_ASSERT(data_len == sizeof(addr6->sin6_addr.s6_addr)); - memcpy(&addr6->sin6_addr.s6_addr, data, data_len); - addr6->sin6_port = htons(12345); - return EventEngine::ResolvedAddress( - reinterpret_cast(addr6), - static_cast(sizeof(sockaddr_in6))); -} - -void SetIPv6ScopeId(EventEngine::ResolvedAddress* addr, uint32_t scope_id) { - sockaddr_in6* addr6 = - reinterpret_cast(const_cast(addr->address())); - ASSERT_EQ(addr6->sin6_family, AF_INET6); - addr6->sin6_scope_id = scope_id; -} - -#ifdef GRPC_HAVE_UNIX_SOCKET -absl::StatusOr UnixSockaddrPopulate( - absl::string_view path) { - EventEngine::ResolvedAddress resolved_addr; - memset(const_cast(resolved_addr.address()), 0, - resolved_addr.size()); - struct sockaddr_un* un = reinterpret_cast( - const_cast(resolved_addr.address())); - const size_t maxlen = sizeof(un->sun_path) - 1; - if (path.size() > maxlen) { - return absl::InternalError(absl::StrCat( - "Path name should not have more than ", maxlen, " characters")); - } - un->sun_family = AF_UNIX; - path.copy(un->sun_path, path.size()); - un->sun_path[path.size()] = '\0'; - return EventEngine::ResolvedAddress(reinterpret_cast(un), - static_cast(sizeof(*un))); -} - -absl::StatusOr UnixAbstractSockaddrPopulate( - absl::string_view path) { - EventEngine::ResolvedAddress resolved_addr; - memset(const_cast(resolved_addr.address()), 0, - resolved_addr.size()); - struct sockaddr* addr = const_cast(resolved_addr.address()); - struct sockaddr_un* un = reinterpret_cast(addr); - const size_t maxlen = sizeof(un->sun_path) - 1; - if (path.size() > maxlen) { - return absl::InternalError(absl::StrCat( - "Path name should not have more than ", maxlen, " characters")); - } - un->sun_family = AF_UNIX; - un->sun_path[0] = '\0'; - path.copy(un->sun_path + 1, path.size()); -#ifdef GPR_APPLE - return EventEngine::ResolvedAddress( - addr, static_cast(sizeof(un->sun_len) + - sizeof(un->sun_family) + path.size() + 1)); -#else - return EventEngine::ResolvedAddress( - addr, static_cast(sizeof(un->sun_family) + path.size() + 1)); -#endif -} -#endif - } // namespace TEST(TcpPosixSocketUtilsTest, SocketMutatorTest) { @@ -274,134 +178,6 @@ TEST(TcpPosixSocketUtilsTest, SocketOptionsTest) { close(sock); } -TEST(SockAddrUtilsTest, SockAddrIsV4MappedTest) { - // v4mapped input should succeed. - EventEngine::ResolvedAddress input6 = MakeAddr6(kMapped, sizeof(kMapped)); - ASSERT_TRUE(SockaddrIsV4Mapped(&input6, nullptr)); - EventEngine::ResolvedAddress output4; - ASSERT_TRUE(SockaddrIsV4Mapped(&input6, &output4)); - EventEngine::ResolvedAddress expect4 = MakeAddr4(kIPv4, sizeof(kIPv4)); - ASSERT_EQ(memcmp(expect4.address(), output4.address(), expect4.size()), 0); - - // Non-v4mapped input should fail. - input6 = MakeAddr6(kNotQuiteMapped, sizeof(kNotQuiteMapped)); - ASSERT_FALSE(SockaddrIsV4Mapped(&input6, nullptr)); - ASSERT_FALSE(SockaddrIsV4Mapped(&input6, &output4)); - // Output is unchanged. - ASSERT_EQ(memcmp(expect4.address(), output4.address(), expect4.size()), 0); - - // Plain IPv4 input should also fail. - EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4)); - ASSERT_FALSE(SockaddrIsV4Mapped(&input4, nullptr)); -} - -TEST(TcpPosixSocketUtilsTest, SockAddrToV4MappedTest) { - // IPv4 input should succeed. - EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4)); - EventEngine::ResolvedAddress output6; - ASSERT_TRUE(SockaddrToV4Mapped(&input4, &output6)); - EventEngine::ResolvedAddress expect6 = MakeAddr6(kMapped, sizeof(kMapped)); - ASSERT_EQ(memcmp(expect6.address(), output6.address(), output6.size()), 0); - - // IPv6 input should fail. - EventEngine::ResolvedAddress input6 = MakeAddr6(kIPv6, sizeof(kIPv6)); - ASSERT_TRUE(!SockaddrToV4Mapped(&input6, &output6)); - // Output is unchanged. - ASSERT_EQ(memcmp(expect6.address(), output6.address(), output6.size()), 0); - - // Already-v4mapped input should also fail. - input6 = MakeAddr6(kMapped, sizeof(kMapped)); - ASSERT_TRUE(!SockaddrToV4Mapped(&input6, &output6)); -} - -TEST(TcpPosixSocketUtilsTest, SockAddrToStringTest) { - errno = 0x7EADBEEF; - - EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4)); - EXPECT_EQ(SockaddrToString(&input4, false).value(), "192.0.2.1:12345"); - EXPECT_EQ(SockaddrToString(&input4, true).value(), "192.0.2.1:12345"); - - EventEngine::ResolvedAddress input6 = MakeAddr6(kIPv6, sizeof(kIPv6)); - EXPECT_EQ(SockaddrToString(&input6, false).value(), "[2001:db8::1]:12345"); - EXPECT_EQ(SockaddrToString(&input6, true).value(), "[2001:db8::1]:12345"); - - SetIPv6ScopeId(&input6, 2); - EXPECT_EQ(SockaddrToString(&input6, false).value(), "[2001:db8::1%2]:12345"); - EXPECT_EQ(SockaddrToString(&input6, true).value(), "[2001:db8::1%2]:12345"); - - SetIPv6ScopeId(&input6, 101); - EXPECT_EQ(SockaddrToString(&input6, false).value(), - "[2001:db8::1%101]:12345"); - EXPECT_EQ(SockaddrToString(&input6, true).value(), "[2001:db8::1%101]:12345"); - - EventEngine::ResolvedAddress input6x = MakeAddr6(kMapped, sizeof(kMapped)); - EXPECT_EQ(SockaddrToString(&input6x, false).value(), - "[::ffff:192.0.2.1]:12345"); - EXPECT_EQ(SockaddrToString(&input6x, true).value(), "192.0.2.1:12345"); - - EventEngine::ResolvedAddress input6y = - MakeAddr6(kNotQuiteMapped, sizeof(kNotQuiteMapped)); - EXPECT_EQ(SockaddrToString(&input6y, false).value(), - "[::fffe:c000:263]:12345"); - EXPECT_EQ(SockaddrToString(&input6y, true).value(), - "[::fffe:c000:263]:12345"); - - EventEngine::ResolvedAddress phony; - memset(const_cast(phony.address()), 0, phony.size()); - sockaddr* phony_addr = const_cast(phony.address()); - phony_addr->sa_family = 123; - EXPECT_EQ(SockaddrToString(&phony, false).status(), - absl::InvalidArgumentError("Unknown sockaddr family: 123")); - EXPECT_EQ(SockaddrToString(&phony, true).status(), - absl::InvalidArgumentError("Unknown sockaddr family: 123")); - -#ifdef GRPC_HAVE_UNIX_SOCKET - EventEngine::ResolvedAddress inputun = - *UnixSockaddrPopulate("/some/unix/path"); - struct sockaddr_un* sock_un = reinterpret_cast( - const_cast(inputun.address())); - EXPECT_EQ(SockaddrToString(&inputun, true).value(), "/some/unix/path"); - - std::string max_filepath(sizeof(sock_un->sun_path) - 1, 'x'); - inputun = *UnixSockaddrPopulate(max_filepath); - EXPECT_EQ(SockaddrToString(&inputun, true).value(), max_filepath); - - inputun = *UnixSockaddrPopulate(max_filepath); - sock_un->sun_path[sizeof(sockaddr_un::sun_path) - 1] = 'x'; - EXPECT_EQ(SockaddrToString(&inputun, true).status(), - absl::InvalidArgumentError("UDS path is not null-terminated")); - - EventEngine::ResolvedAddress inputun2 = - *UnixAbstractSockaddrPopulate("some_unix_path"); - EXPECT_EQ(SockaddrToString(&inputun2, true).value(), - absl::StrCat(std::string(1, '\0'), "some_unix_path")); - - std::string max_abspath(sizeof(sock_un->sun_path) - 1, '\0'); - EventEngine::ResolvedAddress inputun3 = - *UnixAbstractSockaddrPopulate(max_abspath); - EXPECT_EQ(SockaddrToString(&inputun3, true).value(), - absl::StrCat(std::string(1, '\0'), max_abspath)); -#endif -} - -TEST(TcpPosixSocketUtilsTest, SockAddrPortTest) { - EventEngine::ResolvedAddress wild6 = SockaddrMakeWild6(20); - EventEngine::ResolvedAddress wild4 = SockaddrMakeWild4(20); - // Verify the string description matches the expected wildcard address with - // correct port number. - EXPECT_EQ(SockaddrToString(&wild6, true).value(), "[::]:20"); - EXPECT_EQ(SockaddrToString(&wild4, true).value(), "0.0.0.0:20"); - // Update the port values. - SockaddrSetPort(wild4, 21); - SockaddrSetPort(wild6, 22); - // Read back the port values. - EXPECT_EQ(SockaddrGetPort(wild4), 21); - EXPECT_EQ(SockaddrGetPort(wild6), 22); - // Ensure the string description reflects the updated port values. - EXPECT_EQ(SockaddrToString(&wild4, true).value(), "0.0.0.0:21"); - EXPECT_EQ(SockaddrToString(&wild6, true).value(), "[::]:22"); -} - } // namespace posix_engine } // namespace grpc_event_engine diff --git a/test/core/event_engine/tcp_socket_utils_test.cc b/test/core/event_engine/tcp_socket_utils_test.cc new file mode 100644 index 00000000000..4a2a9bf2cbe --- /dev/null +++ b/test/core/event_engine/tcp_socket_utils_test.cc @@ -0,0 +1,283 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#include "src/core/lib/event_engine/tcp_socket_utils.h" + +#include +#include +#include + +// IWYU pragma: no_include + +#include + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_HAVE_UNIX_SOCKET +#include +#endif + +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "gtest/gtest.h" + +#include +#include + +#include "src/core/lib/iomgr/sockaddr.h" + +namespace grpc_event_engine { +namespace experimental { + +namespace { +using ::grpc_event_engine::experimental::EventEngine; + +const uint8_t kMapped[] = {0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0xff, 0xff, 192, 0, 2, 1}; +const uint8_t kNotQuiteMapped[] = {0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0xff, 0xfe, 192, 0, 2, 99}; +const uint8_t kIPv4[] = {192, 0, 2, 1}; +const uint8_t kIPv6[] = {0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 1}; + +EventEngine::ResolvedAddress MakeAddr4(const uint8_t* data, size_t data_len) { + EventEngine::ResolvedAddress resolved_addr4; + sockaddr_in* addr4 = reinterpret_cast( + const_cast(resolved_addr4.address())); + memset(&resolved_addr4, 0, sizeof(resolved_addr4)); + addr4->sin_family = AF_INET; + GPR_ASSERT(data_len == sizeof(addr4->sin_addr.s_addr)); + memcpy(&addr4->sin_addr.s_addr, data, data_len); + addr4->sin_port = htons(12345); + return EventEngine::ResolvedAddress( + reinterpret_cast(addr4), + static_cast(sizeof(sockaddr_in))); +} + +EventEngine::ResolvedAddress MakeAddr6(const uint8_t* data, size_t data_len) { + EventEngine::ResolvedAddress resolved_addr6; + sockaddr_in6* addr6 = reinterpret_cast( + const_cast(resolved_addr6.address())); + memset(&resolved_addr6, 0, sizeof(resolved_addr6)); + addr6->sin6_family = AF_INET6; + GPR_ASSERT(data_len == sizeof(addr6->sin6_addr.s6_addr)); + memcpy(&addr6->sin6_addr.s6_addr, data, data_len); + addr6->sin6_port = htons(12345); + return EventEngine::ResolvedAddress( + reinterpret_cast(addr6), + static_cast(sizeof(sockaddr_in6))); +} + +void SetIPv6ScopeId(EventEngine::ResolvedAddress& addr, uint32_t scope_id) { + sockaddr_in6* addr6 = + reinterpret_cast(const_cast(addr.address())); + ASSERT_EQ(addr6->sin6_family, AF_INET6); + addr6->sin6_scope_id = scope_id; +} + +#ifdef GRPC_HAVE_UNIX_SOCKET +absl::StatusOr UnixSockaddrPopulate( + absl::string_view path) { + EventEngine::ResolvedAddress resolved_addr; + memset(const_cast(resolved_addr.address()), 0, + resolved_addr.size()); + struct sockaddr_un* un = reinterpret_cast( + const_cast(resolved_addr.address())); + const size_t maxlen = sizeof(un->sun_path) - 1; + if (path.size() > maxlen) { + return absl::InternalError(absl::StrCat( + "Path name should not have more than ", maxlen, " characters")); + } + un->sun_family = AF_UNIX; + path.copy(un->sun_path, path.size()); + un->sun_path[path.size()] = '\0'; + return EventEngine::ResolvedAddress(reinterpret_cast(un), + static_cast(sizeof(*un))); +} + +absl::StatusOr UnixAbstractSockaddrPopulate( + absl::string_view path) { + EventEngine::ResolvedAddress resolved_addr; + memset(const_cast(resolved_addr.address()), 0, + resolved_addr.size()); + struct sockaddr* addr = const_cast(resolved_addr.address()); + struct sockaddr_un* un = reinterpret_cast(addr); + const size_t maxlen = sizeof(un->sun_path) - 1; + if (path.size() > maxlen) { + return absl::InternalError(absl::StrCat( + "Path name should not have more than ", maxlen, " characters")); + } + un->sun_family = AF_UNIX; + un->sun_path[0] = '\0'; + path.copy(un->sun_path + 1, path.size()); +#ifdef GPR_APPLE + return EventEngine::ResolvedAddress( + addr, static_cast(sizeof(un->sun_len) + + sizeof(un->sun_family) + path.size() + 1)); +#else + return EventEngine::ResolvedAddress( + addr, static_cast(sizeof(un->sun_family) + path.size() + 1)); +#endif +} +#endif // GRPC_HAVE_UNIX_SOCKET + +} // namespace + +TEST(TcpSocketUtilsTest, ResolvedAddressIsV4MappedTest) { + // v4mapped input should succeed. + EventEngine::ResolvedAddress input6 = MakeAddr6(kMapped, sizeof(kMapped)); + ASSERT_TRUE(ResolvedAddressIsV4Mapped(input6, nullptr)); + EventEngine::ResolvedAddress output4; + ASSERT_TRUE(ResolvedAddressIsV4Mapped(input6, &output4)); + EventEngine::ResolvedAddress expect4 = MakeAddr4(kIPv4, sizeof(kIPv4)); + ASSERT_EQ(memcmp(expect4.address(), output4.address(), expect4.size()), 0); + + // Non-v4mapped input should fail. + input6 = MakeAddr6(kNotQuiteMapped, sizeof(kNotQuiteMapped)); + ASSERT_FALSE(ResolvedAddressIsV4Mapped(input6, nullptr)); + ASSERT_FALSE(ResolvedAddressIsV4Mapped(input6, &output4)); + // Output is unchanged. + ASSERT_EQ(memcmp(expect4.address(), output4.address(), expect4.size()), 0); + + // Plain IPv4 input should also fail. + EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4)); + ASSERT_FALSE(ResolvedAddressIsV4Mapped(input4, nullptr)); +} + +TEST(TcpSocketUtilsTest, ResolvedAddressToV4MappedTest) { + // IPv4 input should succeed. + EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4)); + EventEngine::ResolvedAddress output6; + ASSERT_TRUE(ResolvedAddressToV4Mapped(input4, &output6)); + EventEngine::ResolvedAddress expect6 = MakeAddr6(kMapped, sizeof(kMapped)); + ASSERT_EQ(memcmp(expect6.address(), output6.address(), output6.size()), 0); + + // IPv6 input should fail. + EventEngine::ResolvedAddress input6 = MakeAddr6(kIPv6, sizeof(kIPv6)); + ASSERT_TRUE(!ResolvedAddressToV4Mapped(input6, &output6)); + // Output is unchanged. + ASSERT_EQ(memcmp(expect6.address(), output6.address(), output6.size()), 0); + + // Already-v4mapped input should also fail. + input6 = MakeAddr6(kMapped, sizeof(kMapped)); + ASSERT_TRUE(!ResolvedAddressToV4Mapped(input6, &output6)); +} + +TEST(TcpSocketUtilsTest, ResolvedAddressToStringTest) { + errno = 0x7EADBEEF; + + EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4)); + EXPECT_EQ(ResolvedAddressToString(input4).value(), "192.0.2.1:12345"); + EventEngine::ResolvedAddress input6 = MakeAddr6(kIPv6, sizeof(kIPv6)); + EXPECT_EQ(ResolvedAddressToString(input6).value(), "[2001:db8::1]:12345"); + SetIPv6ScopeId(input6, 2); + EXPECT_EQ(ResolvedAddressToString(input6).value(), "[2001:db8::1%2]:12345"); + SetIPv6ScopeId(input6, 101); + EXPECT_EQ(ResolvedAddressToString(input6).value(), "[2001:db8::1%101]:12345"); + EventEngine::ResolvedAddress input6x = MakeAddr6(kMapped, sizeof(kMapped)); + EXPECT_EQ(ResolvedAddressToString(input6x).value(), + "[::ffff:192.0.2.1]:12345"); + EventEngine::ResolvedAddress input6y = + MakeAddr6(kNotQuiteMapped, sizeof(kNotQuiteMapped)); + EXPECT_EQ(ResolvedAddressToString(input6y).value(), + "[::fffe:c000:263]:12345"); + EventEngine::ResolvedAddress phony; + memset(const_cast(phony.address()), 0, phony.size()); + sockaddr* phony_addr = const_cast(phony.address()); + phony_addr->sa_family = 123; + EXPECT_EQ(ResolvedAddressToString(phony).status(), + absl::InvalidArgumentError("Unknown sockaddr family: 123")); +} + +TEST(TcpSocketUtilsTest, ResolvedAddressToNormalizedStringTest) { + errno = 0x7EADBEEF; + + EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4)); + EXPECT_EQ(ResolvedAddressToNormalizedString(input4).value(), + "192.0.2.1:12345"); + EventEngine::ResolvedAddress input6 = MakeAddr6(kIPv6, sizeof(kIPv6)); + EXPECT_EQ(ResolvedAddressToNormalizedString(input6).value(), + "[2001:db8::1]:12345"); + SetIPv6ScopeId(input6, 2); + EXPECT_EQ(ResolvedAddressToNormalizedString(input6).value(), + "[2001:db8::1%2]:12345"); + SetIPv6ScopeId(input6, 101); + EXPECT_EQ(ResolvedAddressToNormalizedString(input6).value(), + "[2001:db8::1%101]:12345"); + EventEngine::ResolvedAddress input6x = MakeAddr6(kMapped, sizeof(kMapped)); + EXPECT_EQ(ResolvedAddressToNormalizedString(input6x).value(), + "192.0.2.1:12345"); + EventEngine::ResolvedAddress input6y = + MakeAddr6(kNotQuiteMapped, sizeof(kNotQuiteMapped)); + EXPECT_EQ(ResolvedAddressToNormalizedString(input6y).value(), + "[::fffe:c000:263]:12345"); + EventEngine::ResolvedAddress phony; + memset(const_cast(phony.address()), 0, phony.size()); + sockaddr* phony_addr = const_cast(phony.address()); + phony_addr->sa_family = 123; + EXPECT_EQ(ResolvedAddressToNormalizedString(phony).status(), + absl::InvalidArgumentError("Unknown sockaddr family: 123")); + +#ifdef GRPC_HAVE_UNIX_SOCKET + EventEngine::ResolvedAddress inputun = + *UnixSockaddrPopulate("/some/unix/path"); + struct sockaddr_un* sock_un = reinterpret_cast( + const_cast(inputun.address())); + EXPECT_EQ(ResolvedAddressToNormalizedString(inputun).value(), + "/some/unix/path"); + std::string max_filepath(sizeof(sock_un->sun_path) - 1, 'x'); + inputun = *UnixSockaddrPopulate(max_filepath); + EXPECT_EQ(ResolvedAddressToNormalizedString(inputun).value(), max_filepath); + inputun = *UnixSockaddrPopulate(max_filepath); + sock_un->sun_path[sizeof(sockaddr_un::sun_path) - 1] = 'x'; + EXPECT_EQ(ResolvedAddressToNormalizedString(inputun).status(), + absl::InvalidArgumentError("UDS path is not null-terminated")); + EventEngine::ResolvedAddress inputun2 = + *UnixAbstractSockaddrPopulate("some_unix_path"); + EXPECT_EQ(ResolvedAddressToNormalizedString(inputun2).value(), + absl::StrCat(std::string(1, '\0'), "some_unix_path")); + std::string max_abspath(sizeof(sock_un->sun_path) - 1, '\0'); + EventEngine::ResolvedAddress inputun3 = + *UnixAbstractSockaddrPopulate(max_abspath); + EXPECT_EQ(ResolvedAddressToNormalizedString(inputun3).value(), + absl::StrCat(std::string(1, '\0'), max_abspath)); +#endif +} + +TEST(TcpSocketUtilsTest, SockAddrPortTest) { + EventEngine::ResolvedAddress wild6 = ResolvedAddressMakeWild6(20); + EventEngine::ResolvedAddress wild4 = ResolvedAddressMakeWild4(20); + // Verify the string description matches the expected wildcard address with + // correct port number. + EXPECT_EQ(ResolvedAddressToNormalizedString(wild6).value(), "[::]:20"); + EXPECT_EQ(ResolvedAddressToNormalizedString(wild4).value(), "0.0.0.0:20"); + // Update the port values. + ResolvedAddressSetPort(wild4, 21); + ResolvedAddressSetPort(wild6, 22); + // Read back the port values. + EXPECT_EQ(ResolvedAddressGetPort(wild4), 21); + EXPECT_EQ(ResolvedAddressGetPort(wild6), 22); + // Ensure the string description reflects the updated port values. + EXPECT_EQ(ResolvedAddressToNormalizedString(wild4).value(), "0.0.0.0:21"); + EXPECT_EQ(ResolvedAddressToNormalizedString(wild6).value(), "[::]:22"); +} + +} // namespace experimental +} // namespace grpc_event_engine + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/core/event_engine/windows/BUILD b/test/core/event_engine/windows/BUILD index 50a3b3ceac2..04b305addf8 100644 --- a/test/core/event_engine/windows/BUILD +++ b/test/core/event_engine/windows/BUILD @@ -64,6 +64,28 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "windows_endpoint_test", + timeout = "short", + srcs = ["windows_endpoint_test.cc"], + external_deps = ["gtest"], + language = "C++", + tags = [ + "no_linux", + "no_mac", + "no_test_ios", + ], + uses_event_engine = False, + uses_polling = False, + deps = [ + "create_sockpair", + "//:gpr_platform", + "//src/core:common_event_engine_closures", + "//src/core:windows_endpoint", + "//test/core/util:grpc_test_util", + ], +) + grpc_cc_library( name = "create_sockpair", srcs = ["create_sockpair.cc"], diff --git a/test/core/event_engine/windows/create_sockpair.cc b/test/core/event_engine/windows/create_sockpair.cc index 0371be48ef0..314b574e4bb 100644 --- a/test/core/event_engine/windows/create_sockpair.cc +++ b/test/core/event_engine/windows/create_sockpair.cc @@ -26,19 +26,24 @@ namespace grpc_event_engine { namespace experimental { +sockaddr_in GetSomeIpv4LoopbackAddress() { + sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + addr.sin_family = AF_INET; + return addr; +} + void CreateSockpair(SOCKET sockpair[2], DWORD flags) { SOCKET svr_sock = INVALID_SOCKET; SOCKET lst_sock = INVALID_SOCKET; SOCKET cli_sock = INVALID_SOCKET; - SOCKADDR_IN addr; + auto addr = GetSomeIpv4LoopbackAddress(); int addr_len = sizeof(addr); lst_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, flags); GPR_ASSERT(lst_sock != INVALID_SOCKET); - memset(&addr, 0, sizeof(addr)); - addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - addr.sin_family = AF_INET; GPR_ASSERT(bind(lst_sock, (sockaddr*)&addr, sizeof(addr)) != SOCKET_ERROR); GPR_ASSERT(listen(lst_sock, SOMAXCONN) != SOCKET_ERROR); GPR_ASSERT(getsockname(lst_sock, (sockaddr*)&addr, &addr_len) != @@ -47,11 +52,17 @@ void CreateSockpair(SOCKET sockpair[2], DWORD flags) { cli_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, flags); GPR_ASSERT(cli_sock != INVALID_SOCKET); - GPR_ASSERT(WSAConnect(cli_sock, (sockaddr*)&addr, addr_len, NULL, NULL, NULL, - NULL) == 0); + auto result = + WSAConnect(cli_sock, (sockaddr*)&addr, addr_len, NULL, NULL, NULL, NULL); + if (result != 0) { + gpr_log(GPR_DEBUG, "%s", + GRPC_WSA_ERROR(WSAGetLastError(), "Failed in WSAConnect") + .ToString() + .c_str()); + abort(); + } svr_sock = accept(lst_sock, (sockaddr*)&addr, &addr_len); GPR_ASSERT(svr_sock != INVALID_SOCKET); - closesocket(lst_sock); // TODO(hork): see if we can migrate this to IPv6, or break up the socket prep // stages. diff --git a/test/core/event_engine/windows/create_sockpair.h b/test/core/event_engine/windows/create_sockpair.h index c2c68dede9e..98f606f8f0e 100644 --- a/test/core/event_engine/windows/create_sockpair.h +++ b/test/core/event_engine/windows/create_sockpair.h @@ -22,6 +22,11 @@ namespace grpc_event_engine { namespace experimental { +sockaddr_in GetSomeIpv4LoopbackAddress(); + +// Creates a connected pair of sockets on the loopback address +// sockpair[0] is a connected client +// sockpair[1] is a listener running `accept` on the socket void CreateSockpair(SOCKET sockpair[2], DWORD flags); } // namespace experimental diff --git a/test/core/event_engine/windows/iocp_test.cc b/test/core/event_engine/windows/iocp_test.cc index 3b5f0bcec2a..fa0750ffef0 100644 --- a/test/core/event_engine/windows/iocp_test.cc +++ b/test/core/event_engine/windows/iocp_test.cc @@ -206,7 +206,6 @@ TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) { wrapped_client_socket->NotifyOnRead(on_read); // wait for the callbacks to run read_called.WaitForNotification(); - delete on_read; wrapped_client_socket->MaybeShutdown(absl::OkStatus()); executor.Quiesce(); diff --git a/test/core/event_engine/windows/windows_endpoint_test.cc b/test/core/event_engine/windows/windows_endpoint_test.cc new file mode 100644 index 00000000000..a581a7eced0 --- /dev/null +++ b/test/core/event_engine/windows/windows_endpoint_test.cc @@ -0,0 +1,170 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#ifdef GPR_WINDOWS + +#include + +#include "absl/status/status.h" + +#include +#include + +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" +#include "src/core/lib/event_engine/thread_pool.h" +#include "src/core/lib/event_engine/windows/iocp.h" +#include "src/core/lib/event_engine/windows/windows_endpoint.h" +#include "src/core/lib/gprpp/notification.h" +#include "src/core/lib/resource_quota/memory_quota.h" +#include "test/core/event_engine/windows/create_sockpair.h" + +namespace grpc_event_engine { +namespace experimental { + +using namespace std::chrono_literals; + +class WindowsEndpointTest : public testing::Test {}; + +TEST_F(WindowsEndpointTest, BasicCommunication) { + // TODO(hork): deduplicate against winsocket and iocp tests + // Setup + ThreadPool executor; + IOCP iocp(&executor); + grpc_core::MemoryQuota quota("endpoint_test"); + SOCKET sockpair[2]; + CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags()); + auto wrapped_client_socket = iocp.Watch(sockpair[0]); + auto wrapped_server_socket = iocp.Watch(sockpair[1]); + sockaddr_in loopback_addr = GetSomeIpv4LoopbackAddress(); + EventEngine::ResolvedAddress addr((sockaddr*)&loopback_addr, + sizeof(loopback_addr)); + WindowsEndpoint client(addr, std::move(wrapped_client_socket), + quota.CreateMemoryAllocator("client"), + ChannelArgsEndpointConfig(), &executor); + WindowsEndpoint server(addr, std::move(wrapped_server_socket), + quota.CreateMemoryAllocator("server"), + ChannelArgsEndpointConfig(), &executor); + // Test + std::string message = "0xDEADBEEF"; + grpc_core::Notification read_done; + SliceBuffer read_buffer; + server.Read( + [&read_done, &message, &read_buffer](absl::Status status) { + ASSERT_EQ(read_buffer.Count(), 1); + auto slice = read_buffer.TakeFirst(); + EXPECT_EQ(slice.as_string_view(), message); + read_done.Notify(); + }, + &read_buffer, nullptr); + grpc_core::Notification write_done; + SliceBuffer write_buffer; + write_buffer.Append(Slice::FromCopiedString(message)); + client.Write([&write_done](absl::Status status) { write_done.Notify(); }, + &write_buffer, nullptr); + iocp.Work(5s, []() {}); + // Cleanup + write_done.WaitForNotification(); + read_done.WaitForNotification(); + executor.Quiesce(); +} + +TEST_F(WindowsEndpointTest, Conversation) { + // Setup + ThreadPool executor; + IOCP iocp(&executor); + grpc_core::MemoryQuota quota("endpoint_test"); + SOCKET sockpair[2]; + CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags()); + sockaddr_in loopback_addr = GetSomeIpv4LoopbackAddress(); + EventEngine::ResolvedAddress addr((sockaddr*)&loopback_addr, + sizeof(loopback_addr)); + // Test + struct AppState { + AppState(const EventEngine::ResolvedAddress& addr, + std::unique_ptr client, + std::unique_ptr server, grpc_core::MemoryQuota& quota, + Executor& executor) + : client(addr, std::move(client), quota.CreateMemoryAllocator("client"), + ChannelArgsEndpointConfig(), &executor), + server(addr, std::move(server), quota.CreateMemoryAllocator("server"), + ChannelArgsEndpointConfig(), &executor) {} + grpc_core::Notification done; + WindowsEndpoint client; + WindowsEndpoint server; + SliceBuffer read_buffer; + SliceBuffer write_buffer; + const std::vector messages{ + "Java is to Javascript what car is to carpet. -Heilmann", + "Make it work, make it right, make it fast. -Beck", + "First, solve the problem. Then write the code. -Johnson", + "It works on my machine."}; + // incremented after a corresponding read of a previous write + // if exchange%2 == 0, client -> server + // if exchange%2 == 1, server -> client + // if exchange == messages.length, done + std::atomic exchange{0}; + + // Initiates a Write and corresponding Read on two endpoints. + void WriteAndQueueReader(WindowsEndpoint* writer, WindowsEndpoint* reader) { + write_buffer.Clear(); + write_buffer.Append(Slice::FromCopiedString(messages[exchange])); + writer->Write([](absl::Status) {}, &write_buffer, /*args=*/nullptr); + auto cb = [this](absl::Status status) { ReadCB(status); }; + read_buffer.Clear(); + reader->Read(cb, &read_buffer, /*args=*/nullptr); + } + + // Asserts that the received string matches, then queues the next Write/Read + // pair + void ReadCB(absl::Status status) { + ASSERT_EQ(read_buffer.Count(), 1); + ASSERT_EQ(read_buffer.TakeFirst().as_string_view(), messages[exchange]); + if (++exchange == messages.size()) { + done.Notify(); + return; + } + if (exchange % 2 == 0) { + WriteAndQueueReader(/*writer=*/&client, /*reader=*/&server); + } else { + WriteAndQueueReader(/*writer=*/&server, /*reader=*/&client); + } + } + }; + AppState state(addr, /*client=*/iocp.Watch(sockpair[0]), + /*server=*/iocp.Watch(sockpair[1]), quota, executor); + state.WriteAndQueueReader(/*writer=*/&state.client, /*reader=*/&state.server); + while (iocp.Work(100ms, []() {}) == Poller::WorkResult::kOk || + !state.done.HasBeenNotified()) { + } + // Cleanup + state.done.WaitForNotification(); + executor.Quiesce(); +} + +} // namespace experimental +} // namespace grpc_event_engine + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc_init(); + int status = RUN_ALL_TESTS(); + grpc_shutdown(); + return status; +} + +#else // not GPR_WINDOWS +int main(int /* argc */, char** /* argv */) { return 0; } +#endif diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 6bb1123f9cc..bfd18171dfc 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -2012,6 +2012,8 @@ src/core/lib/event_engine/resolved_address.cc \ src/core/lib/event_engine/slice.cc \ src/core/lib/event_engine/slice_buffer.cc \ src/core/lib/event_engine/socket_notifier.h \ +src/core/lib/event_engine/tcp_socket_utils.cc \ +src/core/lib/event_engine/tcp_socket_utils.h \ src/core/lib/event_engine/thread_pool.cc \ src/core/lib/event_engine/thread_pool.h \ src/core/lib/event_engine/time_util.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 2e9ea2d7dcf..0bc5ea0732e 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1788,6 +1788,8 @@ src/core/lib/event_engine/resolved_address.cc \ src/core/lib/event_engine/slice.cc \ src/core/lib/event_engine/slice_buffer.cc \ src/core/lib/event_engine/socket_notifier.h \ +src/core/lib/event_engine/tcp_socket_utils.cc \ +src/core/lib/event_engine/tcp_socket_utils.h \ src/core/lib/event_engine/thread_pool.cc \ src/core/lib/event_engine/thread_pool.h \ src/core/lib/event_engine/time_util.cc \ diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 789ec3fdc5d..dcebed73079 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -7103,6 +7103,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "tcp_socket_utils_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false, @@ -8127,6 +8151,28 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "windows_endpoint_test", + "platforms": [ + "linux", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,