[EventEngine] WindowsEndpoint (#31735)

* [EventEngine] WindowsEndpoint

Initial sketch, all tests passing

* Port fix from #28432

* GPR_WINDOWS guard

* use MemoryAllocator::MakeReservation for allocated buffers

* better logging (respect slice length)

* Automated change: Fix sanity tests

* improvements

* Automated change: Fix sanity tests

* InlinedVector<WSABUF, kMaxWSABUFCount>

* initial attempt at socket util reunification

* posix fixes + local run of sanitize.sh

* posix socket includes

* fix

* Automated change: Fix sanity tests

* remove unused include (breaks windows)

* remove stale comment

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
pull/31768/head
AJ Heller 2 years ago committed by GitHub
parent 808347ffe8
commit 557e558825
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 86
      CMakeLists.txt
  2. 2
      Makefile
  3. 36
      build_autogenerated.yaml
  4. 1
      config.m4
  5. 1
      config.w32
  6. 2
      gRPC-C++.podspec
  7. 3
      gRPC-Core.podspec
  8. 2
      grpc.gemspec
  9. 3
      grpc.gyp
  10. 2
      package.xml
  11. 58
      src/core/BUILD
  12. 5
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  13. 43
      src/core/lib/event_engine/posix_engine/posix_engine_listener.cc
  14. 5
      src/core/lib/event_engine/posix_engine/posix_engine_listener.h
  15. 28
      src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc
  16. 257
      src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
  17. 43
      src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
  18. 320
      src/core/lib/event_engine/tcp_socket_utils.cc
  19. 85
      src/core/lib/event_engine/tcp_socket_utils.h
  20. 273
      src/core/lib/event_engine/windows/windows_endpoint.cc
  21. 94
      src/core/lib/event_engine/windows/windows_endpoint.h
  22. 1
      src/core/lib/event_engine/windows/windows_engine.cc
  23. 3
      src/core/lib/iomgr/error.cc
  24. 2
      src/core/lib/iomgr/error.h
  25. 1
      src/python/grpcio/grpc_core_dependencies.py
  26. 19
      test/core/event_engine/BUILD
  27. 1
      test/core/event_engine/posix/BUILD
  28. 9
      test/core/event_engine/posix/posix_engine_listener_utils_test.cc
  29. 7
      test/core/event_engine/posix/posix_event_engine_connect_test.cc
  30. 224
      test/core/event_engine/posix/tcp_posix_socket_utils_test.cc
  31. 283
      test/core/event_engine/tcp_socket_utils_test.cc
  32. 22
      test/core/event_engine/windows/BUILD
  33. 25
      test/core/event_engine/windows/create_sockpair.cc
  34. 5
      test/core/event_engine/windows/create_sockpair.h
  35. 1
      test/core/event_engine/windows/iocp_test.cc
  36. 170
      test/core/event_engine/windows/windows_endpoint_test.cc
  37. 2
      tools/doxygen/Doxyfile.c++.internal
  38. 2
      tools/doxygen/Doxyfile.core.internal
  39. 46
      tools/run_tests/generated/tests.json

86
CMakeLists.txt generated

@ -1184,6 +1184,7 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx tcp_server_posix_test)
endif()
add_dependencies(buildtests_cxx tcp_socket_utils_test)
add_dependencies(buildtests_cxx test_core_event_engine_posix_timer_heap_test)
add_dependencies(buildtests_cxx test_core_event_engine_posix_timer_list_test)
add_dependencies(buildtests_cxx test_core_event_engine_slice_buffer_test)
@ -1240,6 +1241,9 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx win_socket_test)
endif()
add_dependencies(buildtests_cxx window_overflow_bad_client_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX OR _gRPC_PLATFORM_WINDOWS)
add_dependencies(buildtests_cxx windows_endpoint_test)
endif()
add_dependencies(buildtests_cxx wire_reader_test)
add_dependencies(buildtests_cxx wire_writer_test)
add_dependencies(buildtests_cxx work_queue_test)
@ -2111,6 +2115,7 @@ add_library(grpc
src/core/lib/event_engine/resolved_address.cc
src/core/lib/event_engine/slice.cc
src/core/lib/event_engine/slice_buffer.cc
src/core/lib/event_engine/tcp_socket_utils.cc
src/core/lib/event_engine/thread_pool.cc
src/core/lib/event_engine/time_util.cc
src/core/lib/event_engine/trace.cc
@ -2774,6 +2779,7 @@ add_library(grpc_unsecure
src/core/lib/event_engine/resolved_address.cc
src/core/lib/event_engine/slice.cc
src/core/lib/event_engine/slice_buffer.cc
src/core/lib/event_engine/tcp_socket_utils.cc
src/core/lib/event_engine/thread_pool.cc
src/core/lib/event_engine/time_util.cc
src/core/lib/event_engine/trace.cc
@ -4269,6 +4275,7 @@ add_library(grpc_authorization_provider
src/core/lib/event_engine/resolved_address.cc
src/core/lib/event_engine/slice.cc
src/core/lib/event_engine/slice_buffer.cc
src/core/lib/event_engine/tcp_socket_utils.cc
src/core/lib/event_engine/thread_pool.cc
src/core/lib/event_engine/time_util.cc
src/core/lib/event_engine/trace.cc
@ -11161,6 +11168,7 @@ add_executable(frame_test
src/core/lib/event_engine/resolved_address.cc
src/core/lib/event_engine/slice.cc
src/core/lib/event_engine/slice_buffer.cc
src/core/lib/event_engine/tcp_socket_utils.cc
src/core/lib/event_engine/thread_pool.cc
src/core/lib/event_engine/time_util.cc
src/core/lib/event_engine/trace.cc
@ -19569,6 +19577,43 @@ endif()
endif()
if(gRPC_BUILD_TESTS)
add_executable(tcp_socket_utils_test
test/core/event_engine/tcp_socket_utils_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(tcp_socket_utils_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(tcp_socket_utils_test
${_gRPC_BASELIB_LIBRARIES}
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(test_core_event_engine_posix_timer_heap_test
src/core/lib/event_engine/posix_engine/timer.cc
src/core/lib/event_engine/posix_engine/timer_heap.cc
@ -21531,6 +21576,47 @@ target_link_libraries(window_overflow_bad_client_test
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX OR _gRPC_PLATFORM_WINDOWS)
add_executable(windows_endpoint_test
src/core/lib/event_engine/windows/windows_endpoint.cc
test/core/event_engine/windows/create_sockpair.cc
test/core/event_engine/windows/windows_endpoint_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(windows_endpoint_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(windows_endpoint_test
${_gRPC_BASELIB_LIBRARIES}
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
endif()
if(gRPC_BUILD_TESTS)

2
Makefile generated

@ -1413,6 +1413,7 @@ LIBGRPC_SRC = \
src/core/lib/event_engine/resolved_address.cc \
src/core/lib/event_engine/slice.cc \
src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/tcp_socket_utils.cc \
src/core/lib/event_engine/thread_pool.cc \
src/core/lib/event_engine/time_util.cc \
src/core/lib/event_engine/trace.cc \
@ -1935,6 +1936,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/event_engine/resolved_address.cc \
src/core/lib/event_engine/slice.cc \
src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/tcp_socket_utils.cc \
src/core/lib/event_engine/thread_pool.cc \
src/core/lib/event_engine/time_util.cc \
src/core/lib/event_engine/trace.cc \

@ -767,6 +767,7 @@ libs:
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h
- src/core/lib/event_engine/socket_notifier.h
- src/core/lib/event_engine/tcp_socket_utils.h
- src/core/lib/event_engine/thread_pool.h
- src/core/lib/event_engine/time_util.h
- src/core/lib/event_engine/trace.h
@ -1499,6 +1500,7 @@ libs:
- src/core/lib/event_engine/resolved_address.cc
- src/core/lib/event_engine/slice.cc
- src/core/lib/event_engine/slice_buffer.cc
- src/core/lib/event_engine/tcp_socket_utils.cc
- src/core/lib/event_engine/thread_pool.cc
- src/core/lib/event_engine/time_util.cc
- src/core/lib/event_engine/trace.cc
@ -2040,6 +2042,7 @@ libs:
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h
- src/core/lib/event_engine/socket_notifier.h
- src/core/lib/event_engine/tcp_socket_utils.h
- src/core/lib/event_engine/thread_pool.h
- src/core/lib/event_engine/time_util.h
- src/core/lib/event_engine/trace.h
@ -2418,6 +2421,7 @@ libs:
- src/core/lib/event_engine/resolved_address.cc
- src/core/lib/event_engine/slice.cc
- src/core/lib/event_engine/slice_buffer.cc
- src/core/lib/event_engine/tcp_socket_utils.cc
- src/core/lib/event_engine/thread_pool.cc
- src/core/lib/event_engine/time_util.cc
- src/core/lib/event_engine/trace.cc
@ -3483,6 +3487,7 @@ libs:
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h
- src/core/lib/event_engine/socket_notifier.h
- src/core/lib/event_engine/tcp_socket_utils.h
- src/core/lib/event_engine/thread_pool.h
- src/core/lib/event_engine/time_util.h
- src/core/lib/event_engine/trace.h
@ -3741,6 +3746,7 @@ libs:
- src/core/lib/event_engine/resolved_address.cc
- src/core/lib/event_engine/slice.cc
- src/core/lib/event_engine/slice_buffer.cc
- src/core/lib/event_engine/tcp_socket_utils.cc
- src/core/lib/event_engine/thread_pool.cc
- src/core/lib/event_engine/time_util.cc
- src/core/lib/event_engine/trace.cc
@ -7170,6 +7176,7 @@ targets:
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h
- src/core/lib/event_engine/socket_notifier.h
- src/core/lib/event_engine/tcp_socket_utils.h
- src/core/lib/event_engine/thread_pool.h
- src/core/lib/event_engine/time_util.h
- src/core/lib/event_engine/trace.h
@ -7410,6 +7417,7 @@ targets:
- src/core/lib/event_engine/resolved_address.cc
- src/core/lib/event_engine/slice.cc
- src/core/lib/event_engine/slice_buffer.cc
- src/core/lib/event_engine/tcp_socket_utils.cc
- src/core/lib/event_engine/thread_pool.cc
- src/core/lib/event_engine/time_util.cc
- src/core/lib/event_engine/trace.cc
@ -11191,6 +11199,16 @@ targets:
- linux
- posix
- mac
- name: tcp_socket_utils_test
gtest: true
build: test
language: c++
headers: []
src:
- test/core/event_engine/tcp_socket_utils_test.cc
deps:
- grpc
uses_polling: false
- name: test_core_event_engine_posix_timer_heap_test
gtest: true
build: test
@ -12078,6 +12096,24 @@ targets:
- test/core/end2end/cq_verifier.cc
deps:
- grpc_test_util
- name: windows_endpoint_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/event_engine/windows/windows_endpoint.h
- test/core/event_engine/windows/create_sockpair.h
src:
- src/core/lib/event_engine/windows/windows_endpoint.cc
- test/core/event_engine/windows/create_sockpair.cc
- test/core/event_engine/windows/windows_endpoint_test.cc
deps:
- grpc_test_util
platforms:
- linux
- posix
- windows
uses_polling: false
- name: wire_reader_test
gtest: true
build: test

1
config.m4 generated

@ -495,6 +495,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/event_engine/resolved_address.cc \
src/core/lib/event_engine/slice.cc \
src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/tcp_socket_utils.cc \
src/core/lib/event_engine/thread_pool.cc \
src/core/lib/event_engine/time_util.cc \
src/core/lib/event_engine/trace.cc \

1
config.w32 generated

@ -461,6 +461,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\event_engine\\resolved_address.cc " +
"src\\core\\lib\\event_engine\\slice.cc " +
"src\\core\\lib\\event_engine\\slice_buffer.cc " +
"src\\core\\lib\\event_engine\\tcp_socket_utils.cc " +
"src\\core\\lib\\event_engine\\thread_pool.cc " +
"src\\core\\lib\\event_engine\\time_util.cc " +
"src\\core\\lib\\event_engine\\trace.cc " +

2
gRPC-C++.podspec generated

@ -724,6 +724,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h',
'src/core/lib/event_engine/socket_notifier.h',
'src/core/lib/event_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/thread_pool.h',
'src/core/lib/event_engine/time_util.h',
'src/core/lib/event_engine/trace.h',
@ -1610,6 +1611,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h',
'src/core/lib/event_engine/socket_notifier.h',
'src/core/lib/event_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/thread_pool.h',
'src/core/lib/event_engine/time_util.h',
'src/core/lib/event_engine/trace.h',

3
gRPC-Core.podspec generated

@ -1104,6 +1104,8 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/slice.cc',
'src/core/lib/event_engine/slice_buffer.cc',
'src/core/lib/event_engine/socket_notifier.h',
'src/core/lib/event_engine/tcp_socket_utils.cc',
'src/core/lib/event_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/thread_pool.cc',
'src/core/lib/event_engine/thread_pool.h',
'src/core/lib/event_engine/time_util.cc',
@ -2241,6 +2243,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h',
'src/core/lib/event_engine/socket_notifier.h',
'src/core/lib/event_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/thread_pool.h',
'src/core/lib/event_engine/time_util.h',
'src/core/lib/event_engine/trace.h',

2
grpc.gemspec generated

@ -1015,6 +1015,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/slice.cc )
s.files += %w( src/core/lib/event_engine/slice_buffer.cc )
s.files += %w( src/core/lib/event_engine/socket_notifier.h )
s.files += %w( src/core/lib/event_engine/tcp_socket_utils.cc )
s.files += %w( src/core/lib/event_engine/tcp_socket_utils.h )
s.files += %w( src/core/lib/event_engine/thread_pool.cc )
s.files += %w( src/core/lib/event_engine/thread_pool.h )
s.files += %w( src/core/lib/event_engine/time_util.cc )

3
grpc.gyp generated

@ -825,6 +825,7 @@
'src/core/lib/event_engine/resolved_address.cc',
'src/core/lib/event_engine/slice.cc',
'src/core/lib/event_engine/slice_buffer.cc',
'src/core/lib/event_engine/tcp_socket_utils.cc',
'src/core/lib/event_engine/thread_pool.cc',
'src/core/lib/event_engine/time_util.cc',
'src/core/lib/event_engine/trace.cc',
@ -1293,6 +1294,7 @@
'src/core/lib/event_engine/resolved_address.cc',
'src/core/lib/event_engine/slice.cc',
'src/core/lib/event_engine/slice_buffer.cc',
'src/core/lib/event_engine/tcp_socket_utils.cc',
'src/core/lib/event_engine/thread_pool.cc',
'src/core/lib/event_engine/time_util.cc',
'src/core/lib/event_engine/trace.cc',
@ -1797,6 +1799,7 @@
'src/core/lib/event_engine/resolved_address.cc',
'src/core/lib/event_engine/slice.cc',
'src/core/lib/event_engine/slice_buffer.cc',
'src/core/lib/event_engine/tcp_socket_utils.cc',
'src/core/lib/event_engine/thread_pool.cc',
'src/core/lib/event_engine/time_util.cc',
'src/core/lib/event_engine/trace.cc',

2
package.xml generated

@ -997,6 +997,8 @@
<file baseinstalldir="/" name="src/core/lib/event_engine/slice.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/slice_buffer.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/socket_notifier.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/tcp_socket_utils.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/tcp_socket_utils.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/thread_pool.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/thread_pool.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/time_util.cc" role="src" />

@ -1696,10 +1696,10 @@ grpc_cc_library(
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/strings:str_format",
"absl/types:optional",
],
deps = [
"event_engine_tcp_socket_utils",
"iomgr_port",
"resource_quota",
"socket_mutator",
@ -1727,6 +1727,7 @@ grpc_cc_library(
"absl/strings",
],
deps = [
"event_engine_tcp_socket_utils",
"iomgr_port",
"posix_event_engine_tcp_socket_utils",
"socket_mutator",
@ -1754,6 +1755,7 @@ grpc_cc_library(
"absl/types:optional",
],
deps = [
"event_engine_tcp_socket_utils",
"iomgr_port",
"posix_event_engine_closure",
"posix_event_engine_endpoint",
@ -1785,6 +1787,7 @@ grpc_cc_library(
deps = [
"event_engine_common",
"event_engine_poller",
"event_engine_tcp_socket_utils",
"event_engine_thread_pool",
"event_engine_trace",
"event_engine_utils",
@ -1857,6 +1860,59 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "windows_endpoint",
srcs = [
"lib/event_engine/windows/windows_endpoint.cc",
],
hdrs = [
"lib/event_engine/windows/windows_endpoint.h",
],
external_deps = [
"absl/cleanup",
"absl/functional:any_invocable",
"absl/status",
"absl/strings:str_format",
],
deps = [
"error",
"event_engine_tcp_socket_utils",
"event_engine_trace",
"status_helper",
"windows_iocp",
"//:debug_location",
"//:event_engine_base_hdrs",
"//:gpr",
"//:gpr_platform",
],
)
grpc_cc_library(
name = "event_engine_tcp_socket_utils",
srcs = [
"lib/event_engine/tcp_socket_utils.cc",
],
hdrs = [
"lib/event_engine/tcp_socket_utils.h",
],
external_deps = [
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/strings:str_format",
"absl/types:optional",
],
deps = [
"grpc_sockaddr",
"iomgr_port",
"status_helper",
"//:event_engine_base_hdrs",
"//:gpr",
"//:gpr_platform",
"//:uri_parser",
],
)
grpc_cc_library(
name = "event_engine_trace",
srcs = [

@ -37,6 +37,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/timer.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/event_engine/utils.h"
#include "src/core/lib/experiments/experiments.h"
@ -61,13 +62,13 @@ namespace grpc_event_engine {
namespace experimental {
#ifdef GRPC_POSIX_SOCKET_TCP
using ::grpc_event_engine::experimental::ResolvedAddressToNormalizedString;
using ::grpc_event_engine::posix_engine::EventHandle;
using ::grpc_event_engine::posix_engine::PosixEngineClosure;
using ::grpc_event_engine::posix_engine::PosixEngineListener;
using ::grpc_event_engine::posix_engine::PosixEventPoller;
using ::grpc_event_engine::posix_engine::PosixSocketWrapper;
using ::grpc_event_engine::posix_engine::PosixTcpOptions;
using ::grpc_event_engine::posix_engine::SockaddrToString;
using ::grpc_event_engine::posix_engine::TcpOptionsFromEndpointConfig;
void AsyncConnect::Start(EventEngine::Duration timeout) {
@ -226,7 +227,7 @@ EventEngine::ConnectionHandle PosixEventEngine::ConnectInternal(
} while (err < 0 && errno == EINTR);
saved_errno = errno;
auto addr_uri = SockaddrToString(&addr, true);
auto addr_uri = ResolvedAddressToNormalizedString(addr);
if (!addr_uri.ok()) {
Run([on_connect = std::move(on_connect),
ep = absl::FailedPreconditionError(absl::StrCat(

@ -14,7 +14,13 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/posix_engine/posix_engine_listener.h"
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_TCP
#include <errno.h> // IWYU pragma: keep
#include <sys/socket.h> // IWYU pragma: keep
#include <unistd.h> // IWYU pragma: keep
#include <string>
#include <utility>
@ -28,22 +34,25 @@
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/status_helper.h"
#ifdef GRPC_POSIX_SOCKET_TCP
#include <errno.h> // IWYU pragma: keep
#include <sys/socket.h> // IWYU pragma: keep
#include <unistd.h> // IWYU pragma: keep
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_listener.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/socket_mutator.h"
#endif
namespace grpc_event_engine {
namespace posix_engine {
#ifdef GRPC_POSIX_SOCKET_TCP
namespace {
using ::grpc_event_engine::experimental::ResolvedAddressGetPort;
using ::grpc_event_engine::experimental::ResolvedAddressIsWildcard;
using ::grpc_event_engine::experimental::ResolvedAddressSetPort;
using ::grpc_event_engine::experimental::ResolvedAddressToNormalizedString;
using ::grpc_event_engine::experimental::ResolvedAddressToV4Mapped;
} // namespace
PosixEngineListenerImpl::PosixEngineListenerImpl(
EventEngine::Listener::AcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
@ -63,7 +72,7 @@ absl::StatusOr<int> PosixEngineListenerImpl::Bind(
const EventEngine::ResolvedAddress& addr) {
EventEngine::ResolvedAddress res_addr = addr;
EventEngine::ResolvedAddress addr6_v4mapped;
int requested_port = SockaddrGetPort(res_addr);
int requested_port = ResolvedAddressGetPort(res_addr);
absl::MutexLock lock(&this->mu_);
GPR_ASSERT(!this->started_);
GPR_ASSERT(addr.size() <= EventEngine::ResolvedAddress::MAX_SIZE_BYTES);
@ -78,22 +87,22 @@ absl::StatusOr<int> PosixEngineListenerImpl::Bind(
if (0 == getsockname((*it)->Socket().sock.Fd(),
const_cast<sockaddr*>(sockname_temp.address()),
&len)) {
int used_port = SockaddrGetPort(sockname_temp);
int used_port = ResolvedAddressGetPort(sockname_temp);
if (used_port > 0) {
requested_port = used_port;
SockaddrSetPort(res_addr, requested_port);
ResolvedAddressSetPort(res_addr, requested_port);
break;
}
}
}
auto used_port = SockaddrIsWildcard(res_addr);
auto used_port = ResolvedAddressIsWildcard(res_addr);
if (used_port.has_value()) {
requested_port = *used_port;
return ListenerContainerAddWildcardAddresses(acceptors_, options_,
requested_port);
}
if (SockaddrToV4Mapped(&res_addr, &addr6_v4mapped)) {
if (ResolvedAddressToV4Mapped(res_addr, &addr6_v4mapped)) {
res_addr = addr6_v4mapped;
}
@ -170,7 +179,7 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept(
}
// Create an Endpoint here.
std::string peer_name = *SockaddrToString(&addr, true);
std::string peer_name = *ResolvedAddressToNormalizedString(addr);
auto endpoint = CreatePosixEndpoint(
/*handle=*/listener_->poller_->CreateHandle(
fd, peer_name, listener_->poller_->CanTrackErrors()),
@ -229,7 +238,7 @@ PosixEngineListenerImpl::~PosixEngineListenerImpl() {
}
}
#endif // GRPC_POSIX_SOCKET_TCP
} // namespace posix_engine
} // namespace grpc_event_engine
#endif // GRPC_POSIX_SOCKET_TCP

@ -41,6 +41,7 @@
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#endif
namespace grpc_event_engine {
@ -81,7 +82,9 @@ class PosixEngineListenerImpl
listener_(std::move(listener)),
socket_(socket),
handle_(listener_->poller_->CreateHandle(
socket_.sock.Fd(), *SockaddrToString(&socket_.addr, true),
socket_.sock.Fd(),
*grpc_event_engine::experimental::
ResolvedAddressToNormalizedString(socket_.addr),
listener_->poller_->CanTrackErrors())),
notify_on_accept_(PosixEngineClosure::ToPermanentClosure(
[this](absl::Status status) { NotifyOnAccept(status); })){};

@ -29,6 +29,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/port.h"
#include "src/core/lib/iomgr/socket_mutator.h"
@ -55,18 +56,24 @@ namespace {
using ResolvedAddress =
grpc_event_engine::experimental::EventEngine::ResolvedAddress;
using ListenerSocket = ListenerSocketsContainer::ListenerSocket;
using ::grpc_event_engine::experimental::ResolvedAddressGetPort;
using ::grpc_event_engine::experimental::ResolvedAddressIsV4Mapped;
using ::grpc_event_engine::experimental::ResolvedAddressMakeWild4;
using ::grpc_event_engine::experimental::ResolvedAddressMakeWild6;
using ::grpc_event_engine::experimental::ResolvedAddressSetPort;
using ::grpc_event_engine::experimental::ResolvedAddressToString;
#ifdef GRPC_HAVE_IFADDRS
// Bind to "::" to get a port number not used by any address.
absl::StatusOr<int> GetUnusedPort() {
ResolvedAddress wild = SockaddrMakeWild6(0);
ResolvedAddress wild = ResolvedAddressMakeWild6(0);
PosixSocketWrapper::DSMode dsmode;
auto sock = PosixSocketWrapper::CreateDualStackSocket(nullptr, wild,
SOCK_STREAM, 0, dsmode);
GRPC_RETURN_IF_ERROR(sock.status());
if (dsmode == PosixSocketWrapper::DSMode::DSMODE_IPV4) {
wild = SockaddrMakeWild4(0);
wild = ResolvedAddressMakeWild4(0);
}
if (bind(sock->Fd(), wild.address(), wild.size()) != 0) {
close(sock->Fd());
@ -81,7 +88,7 @@ absl::StatusOr<int> GetUnusedPort() {
absl::StrCat("getsockname(GetUnusedPort): ", std::strerror(errno)));
}
close(sock->Fd());
int port = SockaddrGetPort(wild);
int port = ResolvedAddressGetPort(wild);
if (port <= 0) {
return absl::FailedPreconditionError("Bad port");
}
@ -188,7 +195,8 @@ absl::Status PrepareSocket(const PosixTcpOptions& options,
absl::StrCat("Error in getsockname: ", std::strerror(errno)));
}
socket.port = SockaddrGetPort(ResolvedAddress(sockname_temp.address(), len));
socket.port =
ResolvedAddressGetPort(ResolvedAddress(sockname_temp.address(), len));
// No errors. Set close_fd to false to ensure the socket is not closed.
close_fd = false;
return absl::OkStatus();
@ -207,7 +215,7 @@ absl::StatusOr<ListenerSocket> CreateAndPrepareListenerSocket(
}
socket.sock = *result;
if (socket.dsmode == PosixSocketWrapper::DSMODE_IPV4 &&
SockaddrIsV4Mapped(&addr, &addr4_copy)) {
ResolvedAddressIsV4Mapped(addr, &addr4_copy)) {
socket.addr = addr4_copy;
} else {
socket.addr = addr;
@ -251,8 +259,8 @@ absl::StatusOr<int> ListenerContainerAddAllLocalAddresses(
continue;
}
memcpy(const_cast<sockaddr*>(addr.address()), ifa_it->ifa_addr, len);
SockaddrSetPort(addr, requested_port);
std::string addr_str = *SockaddrToString(&addr, false);
ResolvedAddressSetPort(addr, requested_port);
std::string addr_str = *ResolvedAddressToString(addr);
gpr_log(GPR_DEBUG,
"Adding local addr from interface %s flags 0x%x to server: %s",
ifa_name, ifa_it->ifa_flags, addr_str.c_str());
@ -293,8 +301,8 @@ absl::StatusOr<int> ListenerContainerAddAllLocalAddresses(
absl::StatusOr<int> ListenerContainerAddWildcardAddresses(
ListenerSocketsContainer& listener_sockets, const PosixTcpOptions& options,
int requested_port) {
ResolvedAddress wild4 = SockaddrMakeWild4(requested_port);
ResolvedAddress wild6 = SockaddrMakeWild6(requested_port);
ResolvedAddress wild4 = ResolvedAddressMakeWild4(requested_port);
ResolvedAddress wild6 = ResolvedAddressMakeWild6(requested_port);
absl::StatusOr<ListenerSocket> v6_sock;
absl::StatusOr<ListenerSocket> v4_sock;
int assigned_port = 0;
@ -316,7 +324,7 @@ absl::StatusOr<int> ListenerContainerAddWildcardAddresses(
}
}
// If we got a v6-only socket or nothing, try adding 0.0.0.0.
SockaddrSetPort(wild4, requested_port);
ResolvedAddressSetPort(wild4, requested_port);
v4_sock = CreateAndPrepareListenerSocket(options, wild4);
if (v4_sock.ok()) {
assigned_port = v4_sock->port;

@ -19,7 +19,6 @@
#include <errno.h>
#include <inttypes.h>
#include <limits.h>
#include <stdlib.h>
#include "absl/cleanup/cleanup.h"
#include "absl/status/statusor.h"
@ -43,18 +42,17 @@
#include <fcntl.h>
#include <sys/socket.h>
#include <unistd.h>
#endif
#endif // GRPC_POSIX_SOCKET_UTILS_COMMON
#include <atomic>
#include <cstring>
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/strerror.h"
@ -66,10 +64,13 @@
namespace grpc_event_engine {
namespace posix_engine {
namespace {
using ::grpc_event_engine::experimental::EndpointConfig;
using ::grpc_event_engine::experimental::EventEngine;
namespace {
using ::grpc_event_engine::experimental::ResolvedAddressIsV4Mapped;
using ::grpc_event_engine::experimental::ResolvedAddressToNormalizedString;
using ::grpc_event_engine::experimental::ResolvedAddressToV4Mapped;
int AdjustValue(int default_value, int min_value, int max_value,
absl::optional<int> actual_value) {
@ -105,8 +106,6 @@ int CreateSocket(std::function<int(int, int, int)> socket_factory, int family,
: socket(family, type, protocol);
}
const uint8_t kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff};
absl::Status PrepareTcpClientSocket(PosixSocketWrapper sock,
const EventEngine::ResolvedAddress& addr,
const PosixTcpOptions& options) {
@ -234,192 +233,6 @@ int Accept4(int sockfd,
#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
bool SockaddrIsV4Mapped(const EventEngine::ResolvedAddress* resolved_addr,
EventEngine::ResolvedAddress* resolved_addr4_out) {
const sockaddr* addr = resolved_addr->address();
if (addr->sa_family == AF_INET6) {
const sockaddr_in6* addr6 = reinterpret_cast<const sockaddr_in6*>(addr);
sockaddr_in* addr4_out =
resolved_addr4_out == nullptr
? nullptr
: reinterpret_cast<sockaddr_in*>(
const_cast<sockaddr*>(resolved_addr4_out->address()));
if (memcmp(addr6->sin6_addr.s6_addr, kV4MappedPrefix,
sizeof(kV4MappedPrefix)) == 0) {
if (resolved_addr4_out != nullptr) {
// Normalize ::ffff:0.0.0.0/96 to IPv4.
memset(addr4_out, 0, sizeof(sockaddr_in));
addr4_out->sin_family = AF_INET;
// s6_addr32 would be nice, but it's non-standard.
memcpy(&addr4_out->sin_addr, &addr6->sin6_addr.s6_addr[12], 4);
addr4_out->sin_port = addr6->sin6_port;
*resolved_addr4_out = EventEngine::ResolvedAddress(
reinterpret_cast<sockaddr*>(addr4_out),
static_cast<socklen_t>(sizeof(sockaddr_in)));
}
return true;
}
}
return false;
}
bool SockaddrToV4Mapped(const EventEngine::ResolvedAddress* resolved_addr,
EventEngine::ResolvedAddress* resolved_addr6_out) {
GPR_ASSERT(resolved_addr != resolved_addr6_out);
const sockaddr* addr = resolved_addr->address();
sockaddr_in6* addr6_out = const_cast<sockaddr_in6*>(
reinterpret_cast<const sockaddr_in6*>(resolved_addr6_out->address()));
if (addr->sa_family == AF_INET) {
const sockaddr_in* addr4 = reinterpret_cast<const sockaddr_in*>(addr);
memset(resolved_addr6_out, 0, sizeof(*resolved_addr6_out));
addr6_out->sin6_family = AF_INET6;
memcpy(&addr6_out->sin6_addr.s6_addr[0], kV4MappedPrefix, 12);
memcpy(&addr6_out->sin6_addr.s6_addr[12], &addr4->sin_addr, 4);
addr6_out->sin6_port = addr4->sin_port;
*resolved_addr6_out = EventEngine::ResolvedAddress(
reinterpret_cast<sockaddr*>(addr6_out),
static_cast<socklen_t>(sizeof(sockaddr_in6)));
return true;
}
return false;
}
absl::StatusOr<std::string> SockaddrToString(
const EventEngine::ResolvedAddress* resolved_addr, bool normalize) {
const int save_errno = errno;
EventEngine::ResolvedAddress addr_normalized;
if (normalize && SockaddrIsV4Mapped(resolved_addr, &addr_normalized)) {
resolved_addr = &addr_normalized;
}
const sockaddr* addr =
reinterpret_cast<const sockaddr*>(resolved_addr->address());
std::string out;
#ifdef GRPC_HAVE_UNIX_SOCKET
if (addr->sa_family == AF_UNIX) {
const sockaddr_un* addr_un = reinterpret_cast<const sockaddr_un*>(addr);
bool abstract = addr_un->sun_path[0] == '\0';
if (abstract) {
#ifdef GPR_APPLE
int len = resolved_addr->size() - sizeof(addr_un->sun_family) -
sizeof(addr_un->sun_len);
#else
int len = resolved_addr->size() - sizeof(addr_un->sun_family);
#endif
if (len <= 0) {
return absl::InvalidArgumentError("Empty UDS abstract path");
}
out = std::string(addr_un->sun_path, len);
} else {
size_t maxlen = sizeof(addr_un->sun_path);
if (strnlen(addr_un->sun_path, maxlen) == maxlen) {
return absl::InvalidArgumentError("UDS path is not null-terminated");
}
out = std::string(addr_un->sun_path);
}
return out;
}
#endif
const void* ip = nullptr;
int port = 0;
uint32_t sin6_scope_id = 0;
if (addr->sa_family == AF_INET) {
const sockaddr_in* addr4 = reinterpret_cast<const sockaddr_in*>(addr);
ip = &addr4->sin_addr;
port = ntohs(addr4->sin_port);
} else if (addr->sa_family == AF_INET6) {
const sockaddr_in6* addr6 = reinterpret_cast<const sockaddr_in6*>(addr);
ip = &addr6->sin6_addr;
port = ntohs(addr6->sin6_port);
sin6_scope_id = addr6->sin6_scope_id;
}
char ntop_buf[INET6_ADDRSTRLEN];
if (ip != nullptr &&
inet_ntop(addr->sa_family, ip, ntop_buf, sizeof(ntop_buf)) != nullptr) {
if (sin6_scope_id != 0) {
// Enclose sin6_scope_id with the format defined in RFC 6874
// section 2.
std::string host_with_scope =
absl::StrFormat("%s%%%" PRIu32, ntop_buf, sin6_scope_id);
out = grpc_core::JoinHostPort(host_with_scope, port);
} else {
out = grpc_core::JoinHostPort(ntop_buf, port);
}
} else {
return absl::InvalidArgumentError(
absl::StrCat("Unknown sockaddr family: ", addr->sa_family));
}
// This is probably redundant, but we wouldn't want to log the wrong
// error.
errno = save_errno;
return out;
}
EventEngine::ResolvedAddress SockaddrMakeWild6(int port) {
EventEngine::ResolvedAddress resolved_wild_out;
sockaddr_in6* wild_out = reinterpret_cast<sockaddr_in6*>(
const_cast<sockaddr*>(resolved_wild_out.address()));
GPR_ASSERT(port >= 0 && port < 65536);
memset(wild_out, 0, sizeof(sockaddr_in6));
wild_out->sin6_family = AF_INET6;
wild_out->sin6_port = htons(static_cast<uint16_t>(port));
return EventEngine::ResolvedAddress(
reinterpret_cast<sockaddr*>(wild_out),
static_cast<socklen_t>(sizeof(sockaddr_in6)));
}
EventEngine::ResolvedAddress SockaddrMakeWild4(int port) {
EventEngine::ResolvedAddress resolved_wild_out;
sockaddr_in* wild_out = reinterpret_cast<sockaddr_in*>(
const_cast<sockaddr*>(resolved_wild_out.address()));
GPR_ASSERT(port >= 0 && port < 65536);
memset(wild_out, 0, sizeof(sockaddr_in));
wild_out->sin_family = AF_INET;
wild_out->sin_port = htons(static_cast<uint16_t>(port));
return EventEngine::ResolvedAddress(
reinterpret_cast<sockaddr*>(wild_out),
static_cast<socklen_t>(sizeof(sockaddr_in)));
}
int SockaddrGetPort(const EventEngine::ResolvedAddress& resolved_addr) {
const sockaddr* addr = resolved_addr.address();
switch (addr->sa_family) {
case AF_INET:
return ntohs((reinterpret_cast<const sockaddr_in*>(addr))->sin_port);
case AF_INET6:
return ntohs((reinterpret_cast<const sockaddr_in6*>(addr))->sin6_port);
#ifdef GRPC_HAVE_UNIX_SOCKET
case AF_UNIX:
return 1;
#endif
default:
gpr_log(GPR_ERROR, "Unknown socket family %d in SockaddrGetPort",
addr->sa_family);
abort();
}
}
void SockaddrSetPort(EventEngine::ResolvedAddress& resolved_addr, int port) {
sockaddr* addr = const_cast<sockaddr*>(resolved_addr.address());
switch (addr->sa_family) {
case AF_INET:
GPR_ASSERT(port >= 0 && port < 65536);
(reinterpret_cast<sockaddr_in*>(addr))->sin_port =
htons(static_cast<uint16_t>(port));
return;
case AF_INET6:
GPR_ASSERT(port >= 0 && port < 65536);
(reinterpret_cast<sockaddr_in6*>(addr))->sin6_port =
htons(static_cast<uint16_t>(port));
return;
default:
gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_set_port",
addr->sa_family);
abort();
}
}
void UnlinkIfUnixDomainSocket(
const EventEngine::ResolvedAddress& resolved_addr) {
#ifdef GRPC_HAVE_UNIX_SOCKET
@ -443,37 +256,6 @@ void UnlinkIfUnixDomainSocket(
#endif
}
absl::optional<int> SockaddrIsWildcard(
const EventEngine::ResolvedAddress& addr) {
const EventEngine::ResolvedAddress* resolved_addr = &addr;
EventEngine::ResolvedAddress addr4_normalized;
if (SockaddrIsV4Mapped(resolved_addr, &addr4_normalized)) {
resolved_addr = &addr4_normalized;
}
if (resolved_addr->address()->sa_family == AF_INET) {
// Check for 0.0.0.0
const sockaddr_in* addr4 =
reinterpret_cast<const sockaddr_in*>(resolved_addr->address());
if (addr4->sin_addr.s_addr != 0) {
return absl::nullopt;
}
return static_cast<int>(ntohs(addr4->sin_port));
} else if (resolved_addr->address()->sa_family == AF_INET6) {
// Check for ::
const sockaddr_in6* addr6 =
reinterpret_cast<const sockaddr_in6*>(resolved_addr->address());
int i;
for (i = 0; i < 16; i++) {
if (addr6->sin6_addr.s6_addr[i] != 0) {
return absl::nullopt;
}
}
return static_cast<int>(ntohs(addr6->sin6_port));
} else {
return absl::nullopt;
}
}
// Instruct the kernel to wait for specified number of bytes to be received on
// the socket before generating an interrupt for packet receive. If the call
// succeeds, it returns the number of bytes (wait threshold) that was actually
@ -874,7 +656,7 @@ absl::StatusOr<std::string> PosixSocketWrapper::LocalAddressString() {
if (!status.ok()) {
return status.status();
}
return SockaddrToString(&(*status), true);
return ResolvedAddressToNormalizedString((*status));
}
absl::StatusOr<std::string> PosixSocketWrapper::PeerAddressString() {
@ -882,7 +664,7 @@ absl::StatusOr<std::string> PosixSocketWrapper::PeerAddressString() {
if (!status.ok()) {
return status.status();
}
return SockaddrToString(&(*status), true);
return ResolvedAddressToNormalizedString((*status));
}
absl::StatusOr<PosixSocketWrapper> PosixSocketWrapper::CreateDualStackSocket(
@ -909,7 +691,7 @@ absl::StatusOr<PosixSocketWrapper> PosixSocketWrapper::CreateDualStackSocket(
return sock;
}
// If this isn't an IPv4 address, then return whatever we've got.
if (!SockaddrIsV4Mapped(&addr, nullptr)) {
if (!ResolvedAddressIsV4Mapped(addr, nullptr)) {
dsmode = PosixSocketWrapper::DSMode::DSMODE_IPV6;
return sock;
}
@ -937,7 +719,7 @@ PosixSocketWrapper::CreateAndPrepareTcpClientSocket(
// Use dualstack sockets where available. Set mapped to v6 or v4 mapped to
// v6.
if (!SockaddrToV4Mapped(&target_addr, &mapped_target_addr)) {
if (!ResolvedAddressToV4Mapped(target_addr, &mapped_target_addr)) {
// addr is v4 mapped to v6 or just v6.
mapped_target_addr = target_addr;
}
@ -950,7 +732,7 @@ PosixSocketWrapper::CreateAndPrepareTcpClientSocket(
if (dsmode == PosixSocketWrapper::DSMode::DSMODE_IPV4) {
// Original addr is either v4 or v4 mapped to v6. Set mapped_addr to v4.
if (!SockaddrIsV4Mapped(&target_addr, &mapped_target_addr)) {
if (!ResolvedAddressIsV4Mapped(target_addr, &mapped_target_addr)) {
mapped_target_addr = target_addr;
}
}
@ -966,21 +748,6 @@ PosixSocketWrapper::CreateAndPrepareTcpClientSocket(
#else /* GRPC_POSIX_SOCKET_UTILS_COMMON */
bool SockaddrIsV4Mapped(const EventEngine::ResolvedAddress* /*resolved_addr*/,
EventEngine::ResolvedAddress* /*resolved_addr4_out*/) {
GPR_ASSERT(false && "unimplemented");
}
bool SockaddrToV4Mapped(const EventEngine::ResolvedAddress* /*resolved_addr*/,
EventEngine::ResolvedAddress* /*resolved_addr6_out*/) {
GPR_ASSERT(false && "unimplemented");
}
absl::StatusOr<std::string> SockaddrToString(
const EventEngine::ResolvedAddress* /*resolved_addr*/, bool /*normalize*/) {
GPR_ASSERT(false && "unimplemented");
}
absl::StatusOr<int> PosixSocketWrapper::SetSocketRcvLowat(int /*bytes*/) {
GPR_ASSERT(false && "unimplemented");
}

@ -23,7 +23,6 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/event_engine/event_engine.h>
@ -146,52 +145,10 @@ int Accept4(int sockfd,
grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr,
int nonblock, int cloexec);
// Returns true if resolved_addr is an IPv4-mapped IPv6 address within the
// ::ffff:0.0.0.0/96 range, or false otherwise.
// If resolved_addr4_out is non-NULL, the inner IPv4 address will be copied
// here when returning true.
bool SockaddrIsV4Mapped(const EventEngine::ResolvedAddress* resolved_addr,
EventEngine::ResolvedAddress* resolved_addr4_out);
// If resolved_addr is an AF_INET address, writes the corresponding
// ::ffff:0.0.0.0/96 address to resolved_addr6_out and returns true. Otherwise
// returns false.
bool SockaddrToV4Mapped(const EventEngine::ResolvedAddress* resolved_addr,
EventEngine::ResolvedAddress* resolved_addr6_out);
// Make wild card IPv6 address with specified port.
EventEngine::ResolvedAddress SockaddrMakeWild6(int port);
// Make wild card IPv4 address with specified port.
EventEngine::ResolvedAddress SockaddrMakeWild4(int port);
// Given a resolved address, return the port number in the address.
int SockaddrGetPort(const EventEngine::ResolvedAddress& resolved_addr);
// Modifes the passed address to use the specified port number. The
// operation would only succeed if the passed address is an IPv4 or Ipv6
// address. Otherwise the function call would abort fail.
void SockaddrSetPort(EventEngine::ResolvedAddress& resolved_addr, int port);
// Unlink the path pointed to by the given address if it refers to UDS path.
void UnlinkIfUnixDomainSocket(
const EventEngine::ResolvedAddress& resolved_addr);
// Returns the port number associated with the address if the given address is
// not a wildcard ipv6 or ipv6 address. Otherwise returns absl::nullopt
absl::optional<int> SockaddrIsWildcard(
const EventEngine::ResolvedAddress& addr);
// Converts a EventEngine::ResolvedAddress into a newly-allocated
// human-readable string.
//
// Currently, only the AF_INET, AF_INET6, and AF_UNIX families are
// recognized. If the normalize flag is enabled, ::ffff:0.0.0.0/96 IPv6
// addresses are displayed as plain IPv4.
absl::StatusOr<std::string> SockaddrToString(
const EventEngine::ResolvedAddress* resolved_addr, bool normalize);
class PosixSocketWrapper {
public:
explicit PosixSocketWrapper(int fd) : fd_(fd) { GPR_ASSERT(fd_ > 0); }

@ -0,0 +1,320 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
#include <arpa/inet.h> // IWYU pragma: keep
#ifdef GRPC_LINUX_TCP_H
#include <linux/tcp.h>
#else
#include <netinet/in.h> // IWYU pragma: keep
#include <netinet/tcp.h>
#endif
#include <fcntl.h>
#include <sys/socket.h>
#include <unistd.h>
#endif // GRPC_POSIX_SOCKET_UTILS_COMMON
#ifdef GRPC_HAVE_UNIX_SOCKET
#include <sys/stat.h> // IWYU pragma: keep
#include <sys/un.h>
#endif
#include <errno.h>
#include <inttypes.h>
#include <stdlib.h>
#include <string.h>
#include <utility>
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/uri/uri_parser.h"
namespace grpc_event_engine {
namespace experimental {
namespace {
constexpr uint8_t kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0xff, 0xff};
absl::StatusOr<std::string> GetScheme(
const EventEngine::ResolvedAddress& resolved_address) {
switch (resolved_address.address()->sa_family) {
case AF_INET:
return "ipv4";
case AF_INET6:
return "ipv6";
case AF_UNIX:
return "unix";
default:
return absl::InvalidArgumentError(absl::StrFormat(
"Unknown scheme: %d", resolved_address.address()->sa_family));
}
}
} // namespace
bool ResolvedAddressIsV4Mapped(
const EventEngine::ResolvedAddress& resolved_addr,
EventEngine::ResolvedAddress* resolved_addr4_out) {
const sockaddr* addr = resolved_addr.address();
if (addr->sa_family == AF_INET6) {
const sockaddr_in6* addr6 = reinterpret_cast<const sockaddr_in6*>(addr);
sockaddr_in* addr4_out =
resolved_addr4_out == nullptr
? nullptr
: reinterpret_cast<sockaddr_in*>(
const_cast<sockaddr*>(resolved_addr4_out->address()));
if (memcmp(addr6->sin6_addr.s6_addr, kV4MappedPrefix,
sizeof(kV4MappedPrefix)) == 0) {
if (resolved_addr4_out != nullptr) {
// Normalize ::ffff:0.0.0.0/96 to IPv4.
memset(addr4_out, 0, sizeof(sockaddr_in));
addr4_out->sin_family = AF_INET;
// s6_addr32 would be nice, but it's non-standard.
memcpy(&addr4_out->sin_addr, &addr6->sin6_addr.s6_addr[12], 4);
addr4_out->sin_port = addr6->sin6_port;
*resolved_addr4_out = EventEngine::ResolvedAddress(
reinterpret_cast<sockaddr*>(addr4_out),
static_cast<socklen_t>(sizeof(sockaddr_in)));
}
return true;
}
}
return false;
}
bool ResolvedAddressToV4Mapped(
const EventEngine::ResolvedAddress& resolved_addr,
EventEngine::ResolvedAddress* resolved_addr6_out) {
GPR_ASSERT(&resolved_addr != resolved_addr6_out);
const sockaddr* addr = resolved_addr.address();
sockaddr_in6* addr6_out = const_cast<sockaddr_in6*>(
reinterpret_cast<const sockaddr_in6*>(resolved_addr6_out->address()));
if (addr->sa_family == AF_INET) {
const sockaddr_in* addr4 = reinterpret_cast<const sockaddr_in*>(addr);
memset(resolved_addr6_out, 0, sizeof(*resolved_addr6_out));
addr6_out->sin6_family = AF_INET6;
memcpy(&addr6_out->sin6_addr.s6_addr[0], kV4MappedPrefix, 12);
memcpy(&addr6_out->sin6_addr.s6_addr[12], &addr4->sin_addr, 4);
addr6_out->sin6_port = addr4->sin_port;
*resolved_addr6_out = EventEngine::ResolvedAddress(
reinterpret_cast<sockaddr*>(addr6_out),
static_cast<socklen_t>(sizeof(sockaddr_in6)));
return true;
}
return false;
}
EventEngine::ResolvedAddress ResolvedAddressMakeWild6(int port) {
EventEngine::ResolvedAddress resolved_wild_out;
sockaddr_in6* wild_out = reinterpret_cast<sockaddr_in6*>(
const_cast<sockaddr*>(resolved_wild_out.address()));
GPR_ASSERT(port >= 0 && port < 65536);
memset(wild_out, 0, sizeof(sockaddr_in6));
wild_out->sin6_family = AF_INET6;
wild_out->sin6_port = htons(static_cast<uint16_t>(port));
return EventEngine::ResolvedAddress(
reinterpret_cast<sockaddr*>(wild_out),
static_cast<socklen_t>(sizeof(sockaddr_in6)));
}
EventEngine::ResolvedAddress ResolvedAddressMakeWild4(int port) {
EventEngine::ResolvedAddress resolved_wild_out;
sockaddr_in* wild_out = reinterpret_cast<sockaddr_in*>(
const_cast<sockaddr*>(resolved_wild_out.address()));
GPR_ASSERT(port >= 0 && port < 65536);
memset(wild_out, 0, sizeof(sockaddr_in));
wild_out->sin_family = AF_INET;
wild_out->sin_port = htons(static_cast<uint16_t>(port));
return EventEngine::ResolvedAddress(
reinterpret_cast<sockaddr*>(wild_out),
static_cast<socklen_t>(sizeof(sockaddr_in)));
}
int ResolvedAddressGetPort(const EventEngine::ResolvedAddress& resolved_addr) {
const sockaddr* addr = resolved_addr.address();
switch (addr->sa_family) {
case AF_INET:
return ntohs((reinterpret_cast<const sockaddr_in*>(addr))->sin_port);
case AF_INET6:
return ntohs((reinterpret_cast<const sockaddr_in6*>(addr))->sin6_port);
#ifdef GRPC_HAVE_UNIX_SOCKET
case AF_UNIX:
return 1;
#endif
default:
gpr_log(GPR_ERROR, "Unknown socket family %d in ResolvedAddressGetPort",
addr->sa_family);
abort();
}
}
void ResolvedAddressSetPort(EventEngine::ResolvedAddress& resolved_addr,
int port) {
sockaddr* addr = const_cast<sockaddr*>(resolved_addr.address());
switch (addr->sa_family) {
case AF_INET:
GPR_ASSERT(port >= 0 && port < 65536);
(reinterpret_cast<sockaddr_in*>(addr))->sin_port =
htons(static_cast<uint16_t>(port));
return;
case AF_INET6:
GPR_ASSERT(port >= 0 && port < 65536);
(reinterpret_cast<sockaddr_in6*>(addr))->sin6_port =
htons(static_cast<uint16_t>(port));
return;
default:
gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_set_port",
addr->sa_family);
abort();
}
}
absl::optional<int> ResolvedAddressIsWildcard(
const EventEngine::ResolvedAddress& addr) {
const EventEngine::ResolvedAddress* resolved_addr = &addr;
EventEngine::ResolvedAddress addr4_normalized;
if (ResolvedAddressIsV4Mapped(addr, &addr4_normalized)) {
resolved_addr = &addr4_normalized;
}
if (resolved_addr->address()->sa_family == AF_INET) {
// Check for 0.0.0.0
const sockaddr_in* addr4 =
reinterpret_cast<const sockaddr_in*>(resolved_addr->address());
if (addr4->sin_addr.s_addr != 0) {
return absl::nullopt;
}
return static_cast<int>(ntohs(addr4->sin_port));
} else if (resolved_addr->address()->sa_family == AF_INET6) {
// Check for ::
const sockaddr_in6* addr6 =
reinterpret_cast<const sockaddr_in6*>(resolved_addr->address());
int i;
for (i = 0; i < 16; i++) {
if (addr6->sin6_addr.s6_addr[i] != 0) {
return absl::nullopt;
}
}
return static_cast<int>(ntohs(addr6->sin6_port));
} else {
return absl::nullopt;
}
}
absl::StatusOr<std::string> ResolvedAddressToNormalizedString(
const EventEngine::ResolvedAddress& resolved_addr) {
EventEngine::ResolvedAddress addr_normalized;
if (ResolvedAddressIsV4Mapped(resolved_addr, &addr_normalized)) {
return ResolvedAddressToString(addr_normalized);
}
return ResolvedAddressToString(resolved_addr);
}
absl::StatusOr<std::string> ResolvedAddressToString(
const EventEngine::ResolvedAddress& resolved_addr) {
const int save_errno = errno;
const sockaddr* addr = resolved_addr.address();
std::string out;
#ifdef GRPC_HAVE_UNIX_SOCKET
if (addr->sa_family == AF_UNIX) {
const sockaddr_un* addr_un = reinterpret_cast<const sockaddr_un*>(addr);
bool abstract = addr_un->sun_path[0] == '\0';
if (abstract) {
#ifdef GPR_APPLE
int len = resolved_addr.size() - sizeof(addr_un->sun_family) -
sizeof(addr_un->sun_len);
#else
int len = resolved_addr.size() - sizeof(addr_un->sun_family);
#endif
if (len <= 0) {
return absl::InvalidArgumentError("Empty UDS abstract path");
}
out = std::string(addr_un->sun_path, len);
} else {
size_t maxlen = sizeof(addr_un->sun_path);
if (strnlen(addr_un->sun_path, maxlen) == maxlen) {
return absl::InvalidArgumentError("UDS path is not null-terminated");
}
out = std::string(addr_un->sun_path);
}
return out;
}
#endif // GRPC_HAVE_UNIX_SOCKET
const void* ip = nullptr;
int port = 0;
uint32_t sin6_scope_id = 0;
if (addr->sa_family == AF_INET) {
const sockaddr_in* addr4 = reinterpret_cast<const sockaddr_in*>(addr);
ip = &addr4->sin_addr;
port = ntohs(addr4->sin_port);
} else if (addr->sa_family == AF_INET6) {
const sockaddr_in6* addr6 = reinterpret_cast<const sockaddr_in6*>(addr);
ip = &addr6->sin6_addr;
port = ntohs(addr6->sin6_port);
sin6_scope_id = addr6->sin6_scope_id;
}
char ntop_buf[INET6_ADDRSTRLEN];
if (ip != nullptr &&
inet_ntop(addr->sa_family, ip, ntop_buf, sizeof(ntop_buf)) != nullptr) {
if (sin6_scope_id != 0) {
// Enclose sin6_scope_id with the format defined in RFC 6874
// section 2.
std::string host_with_scope =
absl::StrFormat("%s%%%" PRIu32, ntop_buf, sin6_scope_id);
out = grpc_core::JoinHostPort(host_with_scope, port);
} else {
out = grpc_core::JoinHostPort(ntop_buf, port);
}
} else {
return absl::InvalidArgumentError(
absl::StrCat("Unknown sockaddr family: ", addr->sa_family));
}
// This is probably redundant, but we wouldn't want to log the wrong
// error.
errno = save_errno;
return out;
}
absl::StatusOr<std::string> ResolvedAddressToURI(
const EventEngine::ResolvedAddress& resolved_address) {
if (resolved_address.size() == 0) {
return absl::InvalidArgumentError("Empty address");
}
auto scheme = GetScheme(resolved_address);
GRPC_RETURN_IF_ERROR(scheme.status());
auto path = ResolvedAddressToString(resolved_address);
GRPC_RETURN_IF_ERROR(path.status());
absl::StatusOr<grpc_core::URI> uri =
grpc_core::URI::Create(*scheme, /*authority=*/"", std::move(path.value()),
/*query_parameter_pairs=*/{}, /*fragment=*/"");
if (!uri.ok()) return uri.status();
return uri->ToString();
}
} // namespace experimental
} // namespace grpc_event_engine

@ -0,0 +1,85 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_TCP_SOCKET_UTILS_H
#define GRPC_CORE_LIB_EVENT_ENGINE_TCP_SOCKET_UTILS_H
#include <grpc/support/port_platform.h>
#include <string>
#include "absl/status/statusor.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
namespace grpc_event_engine {
namespace experimental {
// Returns true if resolved_addr is an IPv4-mapped IPv6 address within the
// ::ffff:0.0.0.0/96 range, or false otherwise.
// If resolved_addr4_out is non-NULL, the inner IPv4 address will be copied
// here when returning true.
bool ResolvedAddressIsV4Mapped(
const EventEngine::ResolvedAddress& resolved_addr,
EventEngine::ResolvedAddress* resolved_addr4_out);
// If resolved_addr is an AF_INET address, writes the corresponding
// ::ffff:0.0.0.0/96 address to resolved_addr6_out and returns true. Otherwise
// returns false.
bool ResolvedAddressToV4Mapped(
const EventEngine::ResolvedAddress& resolved_addr,
EventEngine::ResolvedAddress* resolved_addr6_out);
// Make wild card IPv6 address with specified port.
EventEngine::ResolvedAddress ResolvedAddressMakeWild6(int port);
// Make wild card IPv4 address with specified port.
EventEngine::ResolvedAddress ResolvedAddressMakeWild4(int port);
// Given a resolved address, return the port number in the address.
int ResolvedAddressGetPort(const EventEngine::ResolvedAddress& resolved_addr);
// Modifies the address, setting the specified port number.
// The operation would only succeed if the passed address is an IPv4 or Ipv6
// address. Otherwise the function call would abort fail.
void ResolvedAddressSetPort(EventEngine::ResolvedAddress& resolved_addr,
int port);
// Returns the port number associated with the address if the given address is
// not a wildcard ipv6 or ipv6 address. Otherwise returns absl::nullopt
absl::optional<int> ResolvedAddressIsWildcard(
const EventEngine::ResolvedAddress& addr);
// Converts a EventEngine::ResolvedAddress into a newly-allocated
// human-readable string.
// Currently, only the AF_INET, AF_INET6, and AF_UNIX families are
// recognized.
absl::StatusOr<std::string> ResolvedAddressToString(
const EventEngine::ResolvedAddress& resolved_addr);
// Converts a EventEngine::ResolvedAddress into a newly-allocated
// human-readable string. See ResolvedAddressToString.
// This functional normalizes, so for example: ::ffff:0.0.0.0/96 IPv6
// addresses are displayed as plain IPv4.
absl::StatusOr<std::string> ResolvedAddressToNormalizedString(
const EventEngine::ResolvedAddress& resolved_addr);
// Returns the URI string corresponding to the resolved_address
absl::StatusOr<std::string> ResolvedAddressToURI(
const EventEngine::ResolvedAddress& resolved_address);
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_CORE_LIB_EVENT_ENGINE_TCP_SOCKET_UTILS_H

@ -0,0 +1,273 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#ifdef GPR_WINDOWS
#include "absl/cleanup/cleanup.h"
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/support/log_windows.h>
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/event_engine/windows/windows_endpoint.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/error.h"
namespace grpc_event_engine {
namespace experimental {
// TODO(hork): The previous implementation required internal ref counting. Add
// this when it becomes necessary.
// TODO(hork): The previous implementation required a 2-phase shutdown. Add this
// when it becomes necessary.
namespace {
constexpr int64_t kDefaultTargetReadSize = 8192;
constexpr int kMaxWSABUFCount = 16;
void AbortOnEvent(absl::Status) {
GPR_ASSERT(false &&
"INTERNAL ERROR: Asked to handle read/write event with an invalid "
"callback");
}
} // namespace
WindowsEndpoint::WindowsEndpoint(
const EventEngine::ResolvedAddress& peer_address,
std::unique_ptr<WinSocket> socket, MemoryAllocator&& allocator,
const EndpointConfig& /* config */, Executor* executor)
: peer_address_(peer_address),
socket_(std::move(socket)),
allocator_(std::move(allocator)),
handle_read_event_(this),
handle_write_event_(this),
executor_(executor) {
sockaddr addr;
int addr_len = sizeof(addr);
if (getsockname(socket_->socket(), &addr, &addr_len) < 0) {
gpr_log(GPR_ERROR, "Unrecoverable error: Failed to get local socket name.");
abort();
}
local_address_ = EventEngine::ResolvedAddress(&addr, addr_len);
local_address_string_ = *ResolvedAddressToURI(local_address_);
peer_address_string_ = *ResolvedAddressToURI(peer_address_);
}
WindowsEndpoint::~WindowsEndpoint() {
socket_->MaybeShutdown(absl::OkStatus());
}
void WindowsEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
SliceBuffer* buffer, const ReadArgs* args) {
// TODO(hork): last_read_buffer from iomgr: Is it only garbage, or optimized?
GRPC_EVENT_ENGINE_TRACE("WindowsEndpoint::%p reading", this);
// Prepare the WSABUF struct
WSABUF wsa_buffers[kMaxWSABUFCount];
int min_read_size = kDefaultTargetReadSize;
if (args != nullptr && args->read_hint_bytes > 0) {
min_read_size = args->read_hint_bytes;
}
if (buffer->Length() < min_read_size && buffer->Count() < kMaxWSABUFCount) {
buffer->AppendIndexed(Slice(allocator_.MakeSlice(min_read_size)));
}
GPR_ASSERT(buffer->Count() <= kMaxWSABUFCount);
for (int i = 0; i < buffer->Count(); i++) {
Slice tmp = buffer->RefSlice(i);
wsa_buffers[i].buf = (char*)tmp.begin();
wsa_buffers[i].len = tmp.size();
}
DWORD bytes_read = 0;
DWORD flags = 0;
// First let's try a synchronous, non-blocking read.
int status = WSARecv(socket_->socket(), wsa_buffers, (DWORD)buffer->Count(),
&bytes_read, &flags, nullptr, nullptr);
int wsa_error = status == 0 ? 0 : WSAGetLastError();
// Did we get data immediately ? Yay.
if (wsa_error != WSAEWOULDBLOCK) {
// prune slicebuffer
if (bytes_read != buffer->Length()) {
buffer->RemoveLastNBytes(buffer->Length() - bytes_read);
}
executor_->Run([on_read = std::move(on_read)]() mutable {
on_read(absl::OkStatus());
});
return;
}
// Otherwise, let's retry, by queuing a read.
memset(socket_->read_info()->overlapped(), 0, sizeof(OVERLAPPED));
status =
WSARecv(socket_->socket(), wsa_buffers, (DWORD)buffer->Count(),
&bytes_read, &flags, socket_->read_info()->overlapped(), nullptr);
wsa_error = status == 0 ? 0 : WSAGetLastError();
if (wsa_error != 0 && wsa_error != WSA_IO_PENDING) {
// Async read returned immediately with an error
executor_->Run([this, on_read = std::move(on_read), wsa_error]() mutable {
on_read(GRPC_WSA_ERROR(
wsa_error,
absl::StrFormat("WindowsEndpont::%p Read failed", this).c_str()));
});
return;
}
handle_read_event_.Prime(buffer, std::move(on_read));
socket_->NotifyOnRead(&handle_read_event_);
}
void WindowsEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
SliceBuffer* data, const WriteArgs* /* args */) {
if (grpc_event_engine_trace.enabled()) {
for (int i = 0; i < data->Count(); i++) {
auto str = data->RefSlice(i).as_string_view();
gpr_log(GPR_INFO, "WindowsEndpoint::%p WRITE (peer=%s): %.*s", this,
peer_address_string_.c_str(), str.length(), str.data());
}
}
GPR_ASSERT(data->Count() <= UINT_MAX);
absl::InlinedVector<WSABUF, kMaxWSABUFCount> buffers(data->Count());
for (int i = 0; i < data->Count(); i++) {
auto slice = data->RefSlice(i);
GPR_ASSERT(slice.size() <= ULONG_MAX);
buffers[i].len = slice.size();
buffers[i].buf = (char*)slice.begin();
}
// First, let's try a synchronous, non-blocking write.
DWORD bytes_sent;
int status = WSASend(socket_->socket(), buffers.data(), (DWORD)buffers.size(),
&bytes_sent, 0, nullptr, nullptr);
size_t async_buffers_offset;
if (status == 0) {
if (bytes_sent == data->Length()) {
// Write completed, exiting early
executor_->Run(
[cb = std::move(on_writable)]() mutable { cb(absl::OkStatus()); });
return;
}
// The data was not completely delivered, we should send the rest of it by
// doing an async write operation.
for (int i = 0; i < data->Count(); i++) {
if (buffers[i].len > bytes_sent) {
buffers[i].buf += bytes_sent;
buffers[i].len -= bytes_sent;
break;
}
bytes_sent -= buffers[i].len;
async_buffers_offset++;
}
} else {
// We would kind of expect to get a WSAEWOULDBLOCK here, especially on a
// busy connection that has its send queue filled up. But if we don't,
// then we can avoid doing an async write operation at all.
int wsa_error = WSAGetLastError();
if (wsa_error != WSAEWOULDBLOCK) {
executor_->Run([cb = std::move(on_writable), wsa_error]() mutable {
cb(GRPC_WSA_ERROR(wsa_error, "WSASend"));
});
return;
}
}
auto write_info = socket_->write_info();
memset(write_info->overlapped(), 0, sizeof(OVERLAPPED));
status = WSASend(socket_->socket(), &buffers[async_buffers_offset],
(DWORD)(data->Count() - async_buffers_offset), nullptr, 0,
write_info->overlapped(), nullptr);
if (status != 0) {
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
executor_->Run([cb = std::move(on_writable), wsa_error]() mutable {
cb(GRPC_WSA_ERROR(wsa_error, "WSASend"));
});
return;
}
}
// As all is now setup, we can now ask for the IOCP notification. It may
// trigger the callback immediately however, but no matter.
handle_write_event_.Prime(data, std::move(on_writable));
socket_->NotifyOnWrite(&handle_write_event_);
}
const EventEngine::ResolvedAddress& WindowsEndpoint::GetPeerAddress() const {
return peer_address_;
}
const EventEngine::ResolvedAddress& WindowsEndpoint::GetLocalAddress() const {
return local_address_;
}
// ---- Handle{Read|Write}Closure
WindowsEndpoint::BaseEventClosure::BaseEventClosure(WindowsEndpoint* endpoint)
: endpoint_(endpoint), cb_(&AbortOnEvent) {}
void WindowsEndpoint::HandleReadClosure::Run() {
GRPC_EVENT_ENGINE_TRACE("WindowsEndpoint::%p Handling Read Event", endpoint_);
absl::Status status;
auto* read_info = endpoint_->socket_->read_info();
auto cb_cleanup = absl::MakeCleanup([this, &status]() {
auto cb = std::move(cb_);
cb_ = &AbortOnEvent;
cb(status);
});
if (read_info->wsa_error() != 0) {
status = GRPC_WSA_ERROR(read_info->wsa_error(), "Async Read Error");
buffer_->Clear();
return;
}
if (read_info->bytes_transferred() > 0) {
GPR_ASSERT(read_info->bytes_transferred() <= buffer_->Length());
if (read_info->bytes_transferred() != buffer_->Length()) {
buffer_->RemoveLastNBytes(buffer_->Length() -
read_info->bytes_transferred());
}
GPR_ASSERT(read_info->bytes_transferred() == buffer_->Length());
if (grpc_event_engine_trace.enabled()) {
for (int i = 0; i < buffer_->Count(); i++) {
auto str = buffer_->RefSlice(i).as_string_view();
gpr_log(GPR_INFO, "WindowsEndpoint::%p READ (peer=%s): %.*s", this,
endpoint_->peer_address_string_.c_str(), str.length(),
str.data());
}
}
return;
}
// Either the endpoint is shut down or we've seen the end of the stream
buffer_->Clear();
// TODO(hork): different error message if shut down
status = absl::UnavailableError("End of TCP stream");
}
void WindowsEndpoint::HandleWriteClosure::Run() {
GRPC_EVENT_ENGINE_TRACE("WindowsEndpoint::%p Handling Write Event",
endpoint_);
auto* write_info = endpoint_->socket_->write_info();
auto cb = std::move(cb_);
cb_ = &AbortOnEvent;
absl::Status status;
if (write_info->wsa_error() != 0) {
status = GRPC_WSA_ERROR(write_info->wsa_error(), "WSASend");
} else {
GPR_ASSERT(write_info->bytes_transferred() == buffer_->Length());
}
cb(status);
}
} // namespace experimental
} // namespace grpc_event_engine
#endif // GPR_WINDOWS

@ -0,0 +1,94 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENDPOINT_H
#define GRPC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENDPOINT_H
#include <grpc/support/port_platform.h>
#ifdef GPR_WINDOWS
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/windows/win_socket.h"
namespace grpc_event_engine {
namespace experimental {
class WindowsEndpoint : public EventEngine::Endpoint {
public:
WindowsEndpoint(const EventEngine::ResolvedAddress& peer_address,
std::unique_ptr<WinSocket> socket,
MemoryAllocator&& allocator, const EndpointConfig& config,
Executor* Executor);
~WindowsEndpoint() override;
void Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
const ReadArgs* args) override;
void Write(absl::AnyInvocable<void(absl::Status)> on_writable,
SliceBuffer* data, const WriteArgs* args) override;
const EventEngine::ResolvedAddress& GetPeerAddress() const override;
const EventEngine::ResolvedAddress& GetLocalAddress() const override;
private:
// Base class for the Read- and Write-specific event handler callbacks
class BaseEventClosure : public EventEngine::Closure {
public:
explicit BaseEventClosure(WindowsEndpoint* endpoint);
// Calls the bound application callback, inline.
// If called through IOCP, this will be run from within an Executor.
virtual void Run() = 0;
// Prepare the closure by setting the application callback and SliceBuffer
void Prime(SliceBuffer* buffer, absl::AnyInvocable<void(absl::Status)> cb) {
cb_ = std::move(cb);
buffer_ = buffer;
}
protected:
absl::AnyInvocable<void(absl::Status)> cb_;
SliceBuffer* buffer_;
WindowsEndpoint* endpoint_;
};
// Permanent closure type for Read callbacks
class HandleReadClosure : public BaseEventClosure {
public:
explicit HandleReadClosure(WindowsEndpoint* endpoint)
: BaseEventClosure(endpoint) {}
void Run() override;
};
// Permanent closure type for Write callbacks
class HandleWriteClosure : public BaseEventClosure {
public:
explicit HandleWriteClosure(WindowsEndpoint* endpoint)
: BaseEventClosure(endpoint) {}
void Run() override;
};
EventEngine::ResolvedAddress peer_address_;
std::string peer_address_string_;
EventEngine::ResolvedAddress local_address_;
std::string local_address_string_;
std::unique_ptr<WinSocket> socket_;
MemoryAllocator allocator_;
HandleReadClosure handle_read_event_;
HandleWriteClosure handle_write_event_;
Executor* executor_;
};
} // namespace experimental
} // namespace grpc_event_engine
#endif
#endif // GRPC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENDPOINT_H

@ -162,4 +162,5 @@ WindowsEventEngine::CreateListener(
} // namespace experimental
} // namespace grpc_event_engine
#endif // GPR_WINDOWS

@ -68,8 +68,9 @@ absl::Status grpc_os_error(const grpc_core::DebugLocation& location, int err,
}
#ifdef GPR_WINDOWS
// TODO(veblush): lift out of iomgr for use in the WindowsEventEngine
absl::Status grpc_wsa_error(const grpc_core::DebugLocation& location, int err,
const char* call_name) {
absl::string_view call_name) {
char* utf8_message = gpr_format_message(err);
absl::Status s =
StatusCreate(absl::StatusCode::kUnavailable, "WSA Error", location, {});

@ -84,7 +84,7 @@ inline absl::Status grpc_assert_never_ok(absl::Status error) {
grpc_assert_never_ok(grpc_os_error(DEBUG_LOCATION, err, call_name))
absl::Status grpc_wsa_error(const grpc_core::DebugLocation& location, int err,
const char* call_name) GRPC_MUST_USE_RESULT;
absl::string_view call_name) GRPC_MUST_USE_RESULT;
/// windows only: create an error associated with WSAGetLastError()!=0
#define GRPC_WSA_ERROR(err, call_name) \

@ -470,6 +470,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/event_engine/resolved_address.cc',
'src/core/lib/event_engine/slice.cc',
'src/core/lib/event_engine/slice_buffer.cc',
'src/core/lib/event_engine/tcp_socket_utils.cc',
'src/core/lib/event_engine/thread_pool.cc',
'src/core/lib/event_engine/time_util.cc',
'src/core/lib/event_engine/trace.cc',

@ -169,3 +169,22 @@ grpc_cc_library(
],
deps = ["//:event_engine_base_hdrs"],
)
grpc_cc_test(
name = "tcp_socket_utils_test",
srcs = ["tcp_socket_utils_test.cc"],
external_deps = [
"absl/status",
"absl/status:statusor",
"gtest",
],
language = "C++",
uses_event_engine = False,
uses_polling = False,
deps = [
"//:gpr",
"//:grpc",
"//src/core:event_engine_tcp_socket_utils",
"//src/core:iomgr_port",
],
)

@ -206,6 +206,7 @@ grpc_cc_test(
uses_event_engine = False,
deps = [
"//src/core:event_engine_common",
"//src/core:event_engine_tcp_socket_utils",
"//src/core:posix_event_engine_listener_utils",
"//src/core:posix_event_engine_tcp_socket_utils",
"//src/core:socket_mutator",

@ -38,6 +38,7 @@
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "test/core/util/port.h"
namespace grpc_event_engine {
@ -46,6 +47,8 @@ namespace posix_engine {
namespace {
using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
using ::grpc_event_engine::experimental::ResolvedAddressGetPort;
using ::grpc_event_engine::experimental::ResolvedAddressToNormalizedString;
class TestListenerSocketsContainer : public ListenerSocketsContainer {
public:
@ -90,10 +93,10 @@ TEST(PosixEngineListenerUtils, ListenerContainerAddWildcardAddressesTest) {
ASSERT_TRUE((*socket).addr.address()->sa_family == AF_INET6 ||
(*socket).addr.address()->sa_family == AF_INET);
if ((*socket).addr.address()->sa_family == AF_INET6) {
EXPECT_EQ(SockaddrToString(&(*socket).addr, true).value(),
EXPECT_EQ(ResolvedAddressToNormalizedString((*socket).addr).value(),
absl::StrCat("[::]:", std::to_string(port)));
} else if ((*socket).addr.address()->sa_family == AF_INET) {
EXPECT_EQ(SockaddrToString(&(*socket).addr, true).value(),
EXPECT_EQ(ResolvedAddressToNormalizedString((*socket).addr).value(),
absl::StrCat("0.0.0.0:", std::to_string(port)));
}
close(socket->sock.Fd());
@ -139,7 +142,7 @@ TEST(PosixEngineListenerUtils, ListenerContainerAddAllLocalAddressesTest) {
++socket) {
ASSERT_TRUE((*socket).addr.address()->sa_family == AF_INET6 ||
(*socket).addr.address()->sa_family == AF_INET);
EXPECT_EQ(SockaddrGetPort((*socket).addr), port);
EXPECT_EQ(ResolvedAddressGetPort((*socket).addr), port);
close(socket->sock.Fd());
}
}

@ -41,7 +41,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/resource_quota/memory_quota.h"
@ -56,6 +56,7 @@ namespace posix_engine {
using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::PosixEventEngine;
using ::grpc_event_engine::experimental::ResolvedAddressToNormalizedString;
using ::grpc_event_engine::experimental::URIToResolvedAddress;
using ::grpc_event_engine::experimental::WaitForSingleOwner;
using namespace std::chrono_literals;
@ -156,7 +157,7 @@ TEST(PosixEventEngineTest, IndefiniteConnectTimeoutOrRstTest) {
auto resolved_addr = URIToResolvedAddress(target_addr);
std::shared_ptr<EventEngine> posix_ee = std::make_shared<PosixEventEngine>();
std::string resolved_addr_str =
SockaddrToString(&resolved_addr, true).value();
ResolvedAddressToNormalizedString(resolved_addr).value();
auto sockets = CreateConnectedSockets(resolved_addr);
grpc_core::Notification signal;
grpc_core::ChannelArgs args;
@ -184,7 +185,7 @@ TEST(PosixEventEngineTest, IndefiniteConnectCancellationTest) {
auto resolved_addr = URIToResolvedAddress(target_addr);
std::shared_ptr<EventEngine> posix_ee = std::make_shared<PosixEventEngine>();
std::string resolved_addr_str =
SockaddrToString(&resolved_addr, true).value();
ResolvedAddressToNormalizedString(resolved_addr).value();
auto sockets = CreateConnectedSockets(resolved_addr);
grpc_core::ChannelArgs args;
auto quota = grpc_core::ResourceQuota::Default();

@ -12,12 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <stdint.h>
#include <sys/socket.h>
#include <unistd.h>
#include <string>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
@ -34,7 +31,6 @@
// This test won't work except with posix sockets enabled
#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
#include <errno.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <string.h>
@ -43,7 +39,6 @@
#endif
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/gpr/useful.h"
@ -118,97 +113,6 @@ const grpc_socket_mutator_vtable mutator_vtable = {MutateFd, CompareTestMutator,
const grpc_socket_mutator_vtable mutator_vtable2 = {
nullptr, CompareTestMutator, DestroyTestMutator, MutateFd2};
const uint8_t kMapped[] = {0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0xff, 0xff, 192, 0, 2, 1};
const uint8_t kNotQuiteMapped[] = {0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0xff, 0xfe, 192, 0, 2, 99};
const uint8_t kIPv4[] = {192, 0, 2, 1};
const uint8_t kIPv6[] = {0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 1};
EventEngine::ResolvedAddress MakeAddr4(const uint8_t* data, size_t data_len) {
EventEngine::ResolvedAddress resolved_addr4;
sockaddr_in* addr4 = reinterpret_cast<sockaddr_in*>(
const_cast<sockaddr*>(resolved_addr4.address()));
memset(&resolved_addr4, 0, sizeof(resolved_addr4));
addr4->sin_family = AF_INET;
GPR_ASSERT(data_len == sizeof(addr4->sin_addr.s_addr));
memcpy(&addr4->sin_addr.s_addr, data, data_len);
addr4->sin_port = htons(12345);
return EventEngine::ResolvedAddress(
reinterpret_cast<sockaddr*>(addr4),
static_cast<socklen_t>(sizeof(sockaddr_in)));
}
EventEngine::ResolvedAddress MakeAddr6(const uint8_t* data, size_t data_len) {
EventEngine::ResolvedAddress resolved_addr6;
sockaddr_in6* addr6 = reinterpret_cast<sockaddr_in6*>(
const_cast<sockaddr*>(resolved_addr6.address()));
memset(&resolved_addr6, 0, sizeof(resolved_addr6));
addr6->sin6_family = AF_INET6;
GPR_ASSERT(data_len == sizeof(addr6->sin6_addr.s6_addr));
memcpy(&addr6->sin6_addr.s6_addr, data, data_len);
addr6->sin6_port = htons(12345);
return EventEngine::ResolvedAddress(
reinterpret_cast<sockaddr*>(addr6),
static_cast<socklen_t>(sizeof(sockaddr_in6)));
}
void SetIPv6ScopeId(EventEngine::ResolvedAddress* addr, uint32_t scope_id) {
sockaddr_in6* addr6 =
reinterpret_cast<sockaddr_in6*>(const_cast<sockaddr*>(addr->address()));
ASSERT_EQ(addr6->sin6_family, AF_INET6);
addr6->sin6_scope_id = scope_id;
}
#ifdef GRPC_HAVE_UNIX_SOCKET
absl::StatusOr<EventEngine::ResolvedAddress> UnixSockaddrPopulate(
absl::string_view path) {
EventEngine::ResolvedAddress resolved_addr;
memset(const_cast<sockaddr*>(resolved_addr.address()), 0,
resolved_addr.size());
struct sockaddr_un* un = reinterpret_cast<struct sockaddr_un*>(
const_cast<sockaddr*>(resolved_addr.address()));
const size_t maxlen = sizeof(un->sun_path) - 1;
if (path.size() > maxlen) {
return absl::InternalError(absl::StrCat(
"Path name should not have more than ", maxlen, " characters"));
}
un->sun_family = AF_UNIX;
path.copy(un->sun_path, path.size());
un->sun_path[path.size()] = '\0';
return EventEngine::ResolvedAddress(reinterpret_cast<sockaddr*>(un),
static_cast<socklen_t>(sizeof(*un)));
}
absl::StatusOr<EventEngine::ResolvedAddress> UnixAbstractSockaddrPopulate(
absl::string_view path) {
EventEngine::ResolvedAddress resolved_addr;
memset(const_cast<sockaddr*>(resolved_addr.address()), 0,
resolved_addr.size());
struct sockaddr* addr = const_cast<sockaddr*>(resolved_addr.address());
struct sockaddr_un* un = reinterpret_cast<struct sockaddr_un*>(addr);
const size_t maxlen = sizeof(un->sun_path) - 1;
if (path.size() > maxlen) {
return absl::InternalError(absl::StrCat(
"Path name should not have more than ", maxlen, " characters"));
}
un->sun_family = AF_UNIX;
un->sun_path[0] = '\0';
path.copy(un->sun_path + 1, path.size());
#ifdef GPR_APPLE
return EventEngine::ResolvedAddress(
addr, static_cast<socklen_t>(sizeof(un->sun_len) +
sizeof(un->sun_family) + path.size() + 1));
#else
return EventEngine::ResolvedAddress(
addr, static_cast<socklen_t>(sizeof(un->sun_family) + path.size() + 1));
#endif
}
#endif
} // namespace
TEST(TcpPosixSocketUtilsTest, SocketMutatorTest) {
@ -274,134 +178,6 @@ TEST(TcpPosixSocketUtilsTest, SocketOptionsTest) {
close(sock);
}
TEST(SockAddrUtilsTest, SockAddrIsV4MappedTest) {
// v4mapped input should succeed.
EventEngine::ResolvedAddress input6 = MakeAddr6(kMapped, sizeof(kMapped));
ASSERT_TRUE(SockaddrIsV4Mapped(&input6, nullptr));
EventEngine::ResolvedAddress output4;
ASSERT_TRUE(SockaddrIsV4Mapped(&input6, &output4));
EventEngine::ResolvedAddress expect4 = MakeAddr4(kIPv4, sizeof(kIPv4));
ASSERT_EQ(memcmp(expect4.address(), output4.address(), expect4.size()), 0);
// Non-v4mapped input should fail.
input6 = MakeAddr6(kNotQuiteMapped, sizeof(kNotQuiteMapped));
ASSERT_FALSE(SockaddrIsV4Mapped(&input6, nullptr));
ASSERT_FALSE(SockaddrIsV4Mapped(&input6, &output4));
// Output is unchanged.
ASSERT_EQ(memcmp(expect4.address(), output4.address(), expect4.size()), 0);
// Plain IPv4 input should also fail.
EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4));
ASSERT_FALSE(SockaddrIsV4Mapped(&input4, nullptr));
}
TEST(TcpPosixSocketUtilsTest, SockAddrToV4MappedTest) {
// IPv4 input should succeed.
EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4));
EventEngine::ResolvedAddress output6;
ASSERT_TRUE(SockaddrToV4Mapped(&input4, &output6));
EventEngine::ResolvedAddress expect6 = MakeAddr6(kMapped, sizeof(kMapped));
ASSERT_EQ(memcmp(expect6.address(), output6.address(), output6.size()), 0);
// IPv6 input should fail.
EventEngine::ResolvedAddress input6 = MakeAddr6(kIPv6, sizeof(kIPv6));
ASSERT_TRUE(!SockaddrToV4Mapped(&input6, &output6));
// Output is unchanged.
ASSERT_EQ(memcmp(expect6.address(), output6.address(), output6.size()), 0);
// Already-v4mapped input should also fail.
input6 = MakeAddr6(kMapped, sizeof(kMapped));
ASSERT_TRUE(!SockaddrToV4Mapped(&input6, &output6));
}
TEST(TcpPosixSocketUtilsTest, SockAddrToStringTest) {
errno = 0x7EADBEEF;
EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4));
EXPECT_EQ(SockaddrToString(&input4, false).value(), "192.0.2.1:12345");
EXPECT_EQ(SockaddrToString(&input4, true).value(), "192.0.2.1:12345");
EventEngine::ResolvedAddress input6 = MakeAddr6(kIPv6, sizeof(kIPv6));
EXPECT_EQ(SockaddrToString(&input6, false).value(), "[2001:db8::1]:12345");
EXPECT_EQ(SockaddrToString(&input6, true).value(), "[2001:db8::1]:12345");
SetIPv6ScopeId(&input6, 2);
EXPECT_EQ(SockaddrToString(&input6, false).value(), "[2001:db8::1%2]:12345");
EXPECT_EQ(SockaddrToString(&input6, true).value(), "[2001:db8::1%2]:12345");
SetIPv6ScopeId(&input6, 101);
EXPECT_EQ(SockaddrToString(&input6, false).value(),
"[2001:db8::1%101]:12345");
EXPECT_EQ(SockaddrToString(&input6, true).value(), "[2001:db8::1%101]:12345");
EventEngine::ResolvedAddress input6x = MakeAddr6(kMapped, sizeof(kMapped));
EXPECT_EQ(SockaddrToString(&input6x, false).value(),
"[::ffff:192.0.2.1]:12345");
EXPECT_EQ(SockaddrToString(&input6x, true).value(), "192.0.2.1:12345");
EventEngine::ResolvedAddress input6y =
MakeAddr6(kNotQuiteMapped, sizeof(kNotQuiteMapped));
EXPECT_EQ(SockaddrToString(&input6y, false).value(),
"[::fffe:c000:263]:12345");
EXPECT_EQ(SockaddrToString(&input6y, true).value(),
"[::fffe:c000:263]:12345");
EventEngine::ResolvedAddress phony;
memset(const_cast<sockaddr*>(phony.address()), 0, phony.size());
sockaddr* phony_addr = const_cast<sockaddr*>(phony.address());
phony_addr->sa_family = 123;
EXPECT_EQ(SockaddrToString(&phony, false).status(),
absl::InvalidArgumentError("Unknown sockaddr family: 123"));
EXPECT_EQ(SockaddrToString(&phony, true).status(),
absl::InvalidArgumentError("Unknown sockaddr family: 123"));
#ifdef GRPC_HAVE_UNIX_SOCKET
EventEngine::ResolvedAddress inputun =
*UnixSockaddrPopulate("/some/unix/path");
struct sockaddr_un* sock_un = reinterpret_cast<struct sockaddr_un*>(
const_cast<sockaddr*>(inputun.address()));
EXPECT_EQ(SockaddrToString(&inputun, true).value(), "/some/unix/path");
std::string max_filepath(sizeof(sock_un->sun_path) - 1, 'x');
inputun = *UnixSockaddrPopulate(max_filepath);
EXPECT_EQ(SockaddrToString(&inputun, true).value(), max_filepath);
inputun = *UnixSockaddrPopulate(max_filepath);
sock_un->sun_path[sizeof(sockaddr_un::sun_path) - 1] = 'x';
EXPECT_EQ(SockaddrToString(&inputun, true).status(),
absl::InvalidArgumentError("UDS path is not null-terminated"));
EventEngine::ResolvedAddress inputun2 =
*UnixAbstractSockaddrPopulate("some_unix_path");
EXPECT_EQ(SockaddrToString(&inputun2, true).value(),
absl::StrCat(std::string(1, '\0'), "some_unix_path"));
std::string max_abspath(sizeof(sock_un->sun_path) - 1, '\0');
EventEngine::ResolvedAddress inputun3 =
*UnixAbstractSockaddrPopulate(max_abspath);
EXPECT_EQ(SockaddrToString(&inputun3, true).value(),
absl::StrCat(std::string(1, '\0'), max_abspath));
#endif
}
TEST(TcpPosixSocketUtilsTest, SockAddrPortTest) {
EventEngine::ResolvedAddress wild6 = SockaddrMakeWild6(20);
EventEngine::ResolvedAddress wild4 = SockaddrMakeWild4(20);
// Verify the string description matches the expected wildcard address with
// correct port number.
EXPECT_EQ(SockaddrToString(&wild6, true).value(), "[::]:20");
EXPECT_EQ(SockaddrToString(&wild4, true).value(), "0.0.0.0:20");
// Update the port values.
SockaddrSetPort(wild4, 21);
SockaddrSetPort(wild6, 22);
// Read back the port values.
EXPECT_EQ(SockaddrGetPort(wild4), 21);
EXPECT_EQ(SockaddrGetPort(wild6), 22);
// Ensure the string description reflects the updated port values.
EXPECT_EQ(SockaddrToString(&wild4, true).value(), "0.0.0.0:21");
EXPECT_EQ(SockaddrToString(&wild6, true).value(), "[::]:22");
}
} // namespace posix_engine
} // namespace grpc_event_engine

@ -0,0 +1,283 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include <errno.h>
#include <stdint.h>
#include <string.h>
// IWYU pragma: no_include <arpa/inet.h>
#include <string>
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_HAVE_UNIX_SOCKET
#include <sys/un.h>
#endif
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>
#include "src/core/lib/iomgr/sockaddr.h"
namespace grpc_event_engine {
namespace experimental {
namespace {
using ::grpc_event_engine::experimental::EventEngine;
const uint8_t kMapped[] = {0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0xff, 0xff, 192, 0, 2, 1};
const uint8_t kNotQuiteMapped[] = {0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0xff, 0xfe, 192, 0, 2, 99};
const uint8_t kIPv4[] = {192, 0, 2, 1};
const uint8_t kIPv6[] = {0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 1};
EventEngine::ResolvedAddress MakeAddr4(const uint8_t* data, size_t data_len) {
EventEngine::ResolvedAddress resolved_addr4;
sockaddr_in* addr4 = reinterpret_cast<sockaddr_in*>(
const_cast<sockaddr*>(resolved_addr4.address()));
memset(&resolved_addr4, 0, sizeof(resolved_addr4));
addr4->sin_family = AF_INET;
GPR_ASSERT(data_len == sizeof(addr4->sin_addr.s_addr));
memcpy(&addr4->sin_addr.s_addr, data, data_len);
addr4->sin_port = htons(12345);
return EventEngine::ResolvedAddress(
reinterpret_cast<sockaddr*>(addr4),
static_cast<socklen_t>(sizeof(sockaddr_in)));
}
EventEngine::ResolvedAddress MakeAddr6(const uint8_t* data, size_t data_len) {
EventEngine::ResolvedAddress resolved_addr6;
sockaddr_in6* addr6 = reinterpret_cast<sockaddr_in6*>(
const_cast<sockaddr*>(resolved_addr6.address()));
memset(&resolved_addr6, 0, sizeof(resolved_addr6));
addr6->sin6_family = AF_INET6;
GPR_ASSERT(data_len == sizeof(addr6->sin6_addr.s6_addr));
memcpy(&addr6->sin6_addr.s6_addr, data, data_len);
addr6->sin6_port = htons(12345);
return EventEngine::ResolvedAddress(
reinterpret_cast<sockaddr*>(addr6),
static_cast<socklen_t>(sizeof(sockaddr_in6)));
}
void SetIPv6ScopeId(EventEngine::ResolvedAddress& addr, uint32_t scope_id) {
sockaddr_in6* addr6 =
reinterpret_cast<sockaddr_in6*>(const_cast<sockaddr*>(addr.address()));
ASSERT_EQ(addr6->sin6_family, AF_INET6);
addr6->sin6_scope_id = scope_id;
}
#ifdef GRPC_HAVE_UNIX_SOCKET
absl::StatusOr<EventEngine::ResolvedAddress> UnixSockaddrPopulate(
absl::string_view path) {
EventEngine::ResolvedAddress resolved_addr;
memset(const_cast<sockaddr*>(resolved_addr.address()), 0,
resolved_addr.size());
struct sockaddr_un* un = reinterpret_cast<struct sockaddr_un*>(
const_cast<sockaddr*>(resolved_addr.address()));
const size_t maxlen = sizeof(un->sun_path) - 1;
if (path.size() > maxlen) {
return absl::InternalError(absl::StrCat(
"Path name should not have more than ", maxlen, " characters"));
}
un->sun_family = AF_UNIX;
path.copy(un->sun_path, path.size());
un->sun_path[path.size()] = '\0';
return EventEngine::ResolvedAddress(reinterpret_cast<sockaddr*>(un),
static_cast<socklen_t>(sizeof(*un)));
}
absl::StatusOr<EventEngine::ResolvedAddress> UnixAbstractSockaddrPopulate(
absl::string_view path) {
EventEngine::ResolvedAddress resolved_addr;
memset(const_cast<sockaddr*>(resolved_addr.address()), 0,
resolved_addr.size());
struct sockaddr* addr = const_cast<sockaddr*>(resolved_addr.address());
struct sockaddr_un* un = reinterpret_cast<struct sockaddr_un*>(addr);
const size_t maxlen = sizeof(un->sun_path) - 1;
if (path.size() > maxlen) {
return absl::InternalError(absl::StrCat(
"Path name should not have more than ", maxlen, " characters"));
}
un->sun_family = AF_UNIX;
un->sun_path[0] = '\0';
path.copy(un->sun_path + 1, path.size());
#ifdef GPR_APPLE
return EventEngine::ResolvedAddress(
addr, static_cast<socklen_t>(sizeof(un->sun_len) +
sizeof(un->sun_family) + path.size() + 1));
#else
return EventEngine::ResolvedAddress(
addr, static_cast<socklen_t>(sizeof(un->sun_family) + path.size() + 1));
#endif
}
#endif // GRPC_HAVE_UNIX_SOCKET
} // namespace
TEST(TcpSocketUtilsTest, ResolvedAddressIsV4MappedTest) {
// v4mapped input should succeed.
EventEngine::ResolvedAddress input6 = MakeAddr6(kMapped, sizeof(kMapped));
ASSERT_TRUE(ResolvedAddressIsV4Mapped(input6, nullptr));
EventEngine::ResolvedAddress output4;
ASSERT_TRUE(ResolvedAddressIsV4Mapped(input6, &output4));
EventEngine::ResolvedAddress expect4 = MakeAddr4(kIPv4, sizeof(kIPv4));
ASSERT_EQ(memcmp(expect4.address(), output4.address(), expect4.size()), 0);
// Non-v4mapped input should fail.
input6 = MakeAddr6(kNotQuiteMapped, sizeof(kNotQuiteMapped));
ASSERT_FALSE(ResolvedAddressIsV4Mapped(input6, nullptr));
ASSERT_FALSE(ResolvedAddressIsV4Mapped(input6, &output4));
// Output is unchanged.
ASSERT_EQ(memcmp(expect4.address(), output4.address(), expect4.size()), 0);
// Plain IPv4 input should also fail.
EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4));
ASSERT_FALSE(ResolvedAddressIsV4Mapped(input4, nullptr));
}
TEST(TcpSocketUtilsTest, ResolvedAddressToV4MappedTest) {
// IPv4 input should succeed.
EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4));
EventEngine::ResolvedAddress output6;
ASSERT_TRUE(ResolvedAddressToV4Mapped(input4, &output6));
EventEngine::ResolvedAddress expect6 = MakeAddr6(kMapped, sizeof(kMapped));
ASSERT_EQ(memcmp(expect6.address(), output6.address(), output6.size()), 0);
// IPv6 input should fail.
EventEngine::ResolvedAddress input6 = MakeAddr6(kIPv6, sizeof(kIPv6));
ASSERT_TRUE(!ResolvedAddressToV4Mapped(input6, &output6));
// Output is unchanged.
ASSERT_EQ(memcmp(expect6.address(), output6.address(), output6.size()), 0);
// Already-v4mapped input should also fail.
input6 = MakeAddr6(kMapped, sizeof(kMapped));
ASSERT_TRUE(!ResolvedAddressToV4Mapped(input6, &output6));
}
TEST(TcpSocketUtilsTest, ResolvedAddressToStringTest) {
errno = 0x7EADBEEF;
EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4));
EXPECT_EQ(ResolvedAddressToString(input4).value(), "192.0.2.1:12345");
EventEngine::ResolvedAddress input6 = MakeAddr6(kIPv6, sizeof(kIPv6));
EXPECT_EQ(ResolvedAddressToString(input6).value(), "[2001:db8::1]:12345");
SetIPv6ScopeId(input6, 2);
EXPECT_EQ(ResolvedAddressToString(input6).value(), "[2001:db8::1%2]:12345");
SetIPv6ScopeId(input6, 101);
EXPECT_EQ(ResolvedAddressToString(input6).value(), "[2001:db8::1%101]:12345");
EventEngine::ResolvedAddress input6x = MakeAddr6(kMapped, sizeof(kMapped));
EXPECT_EQ(ResolvedAddressToString(input6x).value(),
"[::ffff:192.0.2.1]:12345");
EventEngine::ResolvedAddress input6y =
MakeAddr6(kNotQuiteMapped, sizeof(kNotQuiteMapped));
EXPECT_EQ(ResolvedAddressToString(input6y).value(),
"[::fffe:c000:263]:12345");
EventEngine::ResolvedAddress phony;
memset(const_cast<sockaddr*>(phony.address()), 0, phony.size());
sockaddr* phony_addr = const_cast<sockaddr*>(phony.address());
phony_addr->sa_family = 123;
EXPECT_EQ(ResolvedAddressToString(phony).status(),
absl::InvalidArgumentError("Unknown sockaddr family: 123"));
}
TEST(TcpSocketUtilsTest, ResolvedAddressToNormalizedStringTest) {
errno = 0x7EADBEEF;
EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4));
EXPECT_EQ(ResolvedAddressToNormalizedString(input4).value(),
"192.0.2.1:12345");
EventEngine::ResolvedAddress input6 = MakeAddr6(kIPv6, sizeof(kIPv6));
EXPECT_EQ(ResolvedAddressToNormalizedString(input6).value(),
"[2001:db8::1]:12345");
SetIPv6ScopeId(input6, 2);
EXPECT_EQ(ResolvedAddressToNormalizedString(input6).value(),
"[2001:db8::1%2]:12345");
SetIPv6ScopeId(input6, 101);
EXPECT_EQ(ResolvedAddressToNormalizedString(input6).value(),
"[2001:db8::1%101]:12345");
EventEngine::ResolvedAddress input6x = MakeAddr6(kMapped, sizeof(kMapped));
EXPECT_EQ(ResolvedAddressToNormalizedString(input6x).value(),
"192.0.2.1:12345");
EventEngine::ResolvedAddress input6y =
MakeAddr6(kNotQuiteMapped, sizeof(kNotQuiteMapped));
EXPECT_EQ(ResolvedAddressToNormalizedString(input6y).value(),
"[::fffe:c000:263]:12345");
EventEngine::ResolvedAddress phony;
memset(const_cast<sockaddr*>(phony.address()), 0, phony.size());
sockaddr* phony_addr = const_cast<sockaddr*>(phony.address());
phony_addr->sa_family = 123;
EXPECT_EQ(ResolvedAddressToNormalizedString(phony).status(),
absl::InvalidArgumentError("Unknown sockaddr family: 123"));
#ifdef GRPC_HAVE_UNIX_SOCKET
EventEngine::ResolvedAddress inputun =
*UnixSockaddrPopulate("/some/unix/path");
struct sockaddr_un* sock_un = reinterpret_cast<struct sockaddr_un*>(
const_cast<sockaddr*>(inputun.address()));
EXPECT_EQ(ResolvedAddressToNormalizedString(inputun).value(),
"/some/unix/path");
std::string max_filepath(sizeof(sock_un->sun_path) - 1, 'x');
inputun = *UnixSockaddrPopulate(max_filepath);
EXPECT_EQ(ResolvedAddressToNormalizedString(inputun).value(), max_filepath);
inputun = *UnixSockaddrPopulate(max_filepath);
sock_un->sun_path[sizeof(sockaddr_un::sun_path) - 1] = 'x';
EXPECT_EQ(ResolvedAddressToNormalizedString(inputun).status(),
absl::InvalidArgumentError("UDS path is not null-terminated"));
EventEngine::ResolvedAddress inputun2 =
*UnixAbstractSockaddrPopulate("some_unix_path");
EXPECT_EQ(ResolvedAddressToNormalizedString(inputun2).value(),
absl::StrCat(std::string(1, '\0'), "some_unix_path"));
std::string max_abspath(sizeof(sock_un->sun_path) - 1, '\0');
EventEngine::ResolvedAddress inputun3 =
*UnixAbstractSockaddrPopulate(max_abspath);
EXPECT_EQ(ResolvedAddressToNormalizedString(inputun3).value(),
absl::StrCat(std::string(1, '\0'), max_abspath));
#endif
}
TEST(TcpSocketUtilsTest, SockAddrPortTest) {
EventEngine::ResolvedAddress wild6 = ResolvedAddressMakeWild6(20);
EventEngine::ResolvedAddress wild4 = ResolvedAddressMakeWild4(20);
// Verify the string description matches the expected wildcard address with
// correct port number.
EXPECT_EQ(ResolvedAddressToNormalizedString(wild6).value(), "[::]:20");
EXPECT_EQ(ResolvedAddressToNormalizedString(wild4).value(), "0.0.0.0:20");
// Update the port values.
ResolvedAddressSetPort(wild4, 21);
ResolvedAddressSetPort(wild6, 22);
// Read back the port values.
EXPECT_EQ(ResolvedAddressGetPort(wild4), 21);
EXPECT_EQ(ResolvedAddressGetPort(wild6), 22);
// Ensure the string description reflects the updated port values.
EXPECT_EQ(ResolvedAddressToNormalizedString(wild4).value(), "0.0.0.0:21");
EXPECT_EQ(ResolvedAddressToNormalizedString(wild6).value(), "[::]:22");
}
} // namespace experimental
} // namespace grpc_event_engine
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -64,6 +64,28 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "windows_endpoint_test",
timeout = "short",
srcs = ["windows_endpoint_test.cc"],
external_deps = ["gtest"],
language = "C++",
tags = [
"no_linux",
"no_mac",
"no_test_ios",
],
uses_event_engine = False,
uses_polling = False,
deps = [
"create_sockpair",
"//:gpr_platform",
"//src/core:common_event_engine_closures",
"//src/core:windows_endpoint",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_library(
name = "create_sockpair",
srcs = ["create_sockpair.cc"],

@ -26,19 +26,24 @@
namespace grpc_event_engine {
namespace experimental {
sockaddr_in GetSomeIpv4LoopbackAddress() {
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_family = AF_INET;
return addr;
}
void CreateSockpair(SOCKET sockpair[2], DWORD flags) {
SOCKET svr_sock = INVALID_SOCKET;
SOCKET lst_sock = INVALID_SOCKET;
SOCKET cli_sock = INVALID_SOCKET;
SOCKADDR_IN addr;
auto addr = GetSomeIpv4LoopbackAddress();
int addr_len = sizeof(addr);
lst_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, flags);
GPR_ASSERT(lst_sock != INVALID_SOCKET);
memset(&addr, 0, sizeof(addr));
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_family = AF_INET;
GPR_ASSERT(bind(lst_sock, (sockaddr*)&addr, sizeof(addr)) != SOCKET_ERROR);
GPR_ASSERT(listen(lst_sock, SOMAXCONN) != SOCKET_ERROR);
GPR_ASSERT(getsockname(lst_sock, (sockaddr*)&addr, &addr_len) !=
@ -47,11 +52,17 @@ void CreateSockpair(SOCKET sockpair[2], DWORD flags) {
cli_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, flags);
GPR_ASSERT(cli_sock != INVALID_SOCKET);
GPR_ASSERT(WSAConnect(cli_sock, (sockaddr*)&addr, addr_len, NULL, NULL, NULL,
NULL) == 0);
auto result =
WSAConnect(cli_sock, (sockaddr*)&addr, addr_len, NULL, NULL, NULL, NULL);
if (result != 0) {
gpr_log(GPR_DEBUG, "%s",
GRPC_WSA_ERROR(WSAGetLastError(), "Failed in WSAConnect")
.ToString()
.c_str());
abort();
}
svr_sock = accept(lst_sock, (sockaddr*)&addr, &addr_len);
GPR_ASSERT(svr_sock != INVALID_SOCKET);
closesocket(lst_sock);
// TODO(hork): see if we can migrate this to IPv6, or break up the socket prep
// stages.

@ -22,6 +22,11 @@
namespace grpc_event_engine {
namespace experimental {
sockaddr_in GetSomeIpv4LoopbackAddress();
// Creates a connected pair of sockets on the loopback address
// sockpair[0] is a connected client
// sockpair[1] is a listener running `accept` on the socket
void CreateSockpair(SOCKET sockpair[2], DWORD flags);
} // namespace experimental

@ -206,7 +206,6 @@ TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
wrapped_client_socket->NotifyOnRead(on_read);
// wait for the callbacks to run
read_called.WaitForNotification();
delete on_read;
wrapped_client_socket->MaybeShutdown(absl::OkStatus());
executor.Quiesce();

@ -0,0 +1,170 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#ifdef GPR_WINDOWS
#include <gtest/gtest.h>
#include "absl/status/status.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/thread_pool.h"
#include "src/core/lib/event_engine/windows/iocp.h"
#include "src/core/lib/event_engine/windows/windows_endpoint.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "test/core/event_engine/windows/create_sockpair.h"
namespace grpc_event_engine {
namespace experimental {
using namespace std::chrono_literals;
class WindowsEndpointTest : public testing::Test {};
TEST_F(WindowsEndpointTest, BasicCommunication) {
// TODO(hork): deduplicate against winsocket and iocp tests
// Setup
ThreadPool executor;
IOCP iocp(&executor);
grpc_core::MemoryQuota quota("endpoint_test");
SOCKET sockpair[2];
CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags());
auto wrapped_client_socket = iocp.Watch(sockpair[0]);
auto wrapped_server_socket = iocp.Watch(sockpair[1]);
sockaddr_in loopback_addr = GetSomeIpv4LoopbackAddress();
EventEngine::ResolvedAddress addr((sockaddr*)&loopback_addr,
sizeof(loopback_addr));
WindowsEndpoint client(addr, std::move(wrapped_client_socket),
quota.CreateMemoryAllocator("client"),
ChannelArgsEndpointConfig(), &executor);
WindowsEndpoint server(addr, std::move(wrapped_server_socket),
quota.CreateMemoryAllocator("server"),
ChannelArgsEndpointConfig(), &executor);
// Test
std::string message = "0xDEADBEEF";
grpc_core::Notification read_done;
SliceBuffer read_buffer;
server.Read(
[&read_done, &message, &read_buffer](absl::Status status) {
ASSERT_EQ(read_buffer.Count(), 1);
auto slice = read_buffer.TakeFirst();
EXPECT_EQ(slice.as_string_view(), message);
read_done.Notify();
},
&read_buffer, nullptr);
grpc_core::Notification write_done;
SliceBuffer write_buffer;
write_buffer.Append(Slice::FromCopiedString(message));
client.Write([&write_done](absl::Status status) { write_done.Notify(); },
&write_buffer, nullptr);
iocp.Work(5s, []() {});
// Cleanup
write_done.WaitForNotification();
read_done.WaitForNotification();
executor.Quiesce();
}
TEST_F(WindowsEndpointTest, Conversation) {
// Setup
ThreadPool executor;
IOCP iocp(&executor);
grpc_core::MemoryQuota quota("endpoint_test");
SOCKET sockpair[2];
CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags());
sockaddr_in loopback_addr = GetSomeIpv4LoopbackAddress();
EventEngine::ResolvedAddress addr((sockaddr*)&loopback_addr,
sizeof(loopback_addr));
// Test
struct AppState {
AppState(const EventEngine::ResolvedAddress& addr,
std::unique_ptr<WinSocket> client,
std::unique_ptr<WinSocket> server, grpc_core::MemoryQuota& quota,
Executor& executor)
: client(addr, std::move(client), quota.CreateMemoryAllocator("client"),
ChannelArgsEndpointConfig(), &executor),
server(addr, std::move(server), quota.CreateMemoryAllocator("server"),
ChannelArgsEndpointConfig(), &executor) {}
grpc_core::Notification done;
WindowsEndpoint client;
WindowsEndpoint server;
SliceBuffer read_buffer;
SliceBuffer write_buffer;
const std::vector<std::string> messages{
"Java is to Javascript what car is to carpet. -Heilmann",
"Make it work, make it right, make it fast. -Beck",
"First, solve the problem. Then write the code. -Johnson",
"It works on my machine."};
// incremented after a corresponding read of a previous write
// if exchange%2 == 0, client -> server
// if exchange%2 == 1, server -> client
// if exchange == messages.length, done
std::atomic<int> exchange{0};
// Initiates a Write and corresponding Read on two endpoints.
void WriteAndQueueReader(WindowsEndpoint* writer, WindowsEndpoint* reader) {
write_buffer.Clear();
write_buffer.Append(Slice::FromCopiedString(messages[exchange]));
writer->Write([](absl::Status) {}, &write_buffer, /*args=*/nullptr);
auto cb = [this](absl::Status status) { ReadCB(status); };
read_buffer.Clear();
reader->Read(cb, &read_buffer, /*args=*/nullptr);
}
// Asserts that the received string matches, then queues the next Write/Read
// pair
void ReadCB(absl::Status status) {
ASSERT_EQ(read_buffer.Count(), 1);
ASSERT_EQ(read_buffer.TakeFirst().as_string_view(), messages[exchange]);
if (++exchange == messages.size()) {
done.Notify();
return;
}
if (exchange % 2 == 0) {
WriteAndQueueReader(/*writer=*/&client, /*reader=*/&server);
} else {
WriteAndQueueReader(/*writer=*/&server, /*reader=*/&client);
}
}
};
AppState state(addr, /*client=*/iocp.Watch(sockpair[0]),
/*server=*/iocp.Watch(sockpair[1]), quota, executor);
state.WriteAndQueueReader(/*writer=*/&state.client, /*reader=*/&state.server);
while (iocp.Work(100ms, []() {}) == Poller::WorkResult::kOk ||
!state.done.HasBeenNotified()) {
}
// Cleanup
state.done.WaitForNotification();
executor.Quiesce();
}
} // namespace experimental
} // namespace grpc_event_engine
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc_init();
int status = RUN_ALL_TESTS();
grpc_shutdown();
return status;
}
#else // not GPR_WINDOWS
int main(int /* argc */, char** /* argv */) { return 0; }
#endif

@ -2012,6 +2012,8 @@ src/core/lib/event_engine/resolved_address.cc \
src/core/lib/event_engine/slice.cc \
src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/socket_notifier.h \
src/core/lib/event_engine/tcp_socket_utils.cc \
src/core/lib/event_engine/tcp_socket_utils.h \
src/core/lib/event_engine/thread_pool.cc \
src/core/lib/event_engine/thread_pool.h \
src/core/lib/event_engine/time_util.cc \

@ -1788,6 +1788,8 @@ src/core/lib/event_engine/resolved_address.cc \
src/core/lib/event_engine/slice.cc \
src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/socket_notifier.h \
src/core/lib/event_engine/tcp_socket_utils.cc \
src/core/lib/event_engine/tcp_socket_utils.h \
src/core/lib/event_engine/thread_pool.cc \
src/core/lib/event_engine/thread_pool.h \
src/core/lib/event_engine/time_util.cc \

@ -7103,6 +7103,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "tcp_socket_utils_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
@ -8127,6 +8151,28 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "windows_endpoint_test",
"platforms": [
"linux",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save