[EventEngine] Relandx3: WindowsEventEngine client (#32031)

* Reland x3: "WindowsEventEngine Client implementation (#31848)"

This reverts commit 2c4d55b3a1.

* fixes

* fix

* fix
pull/32034/head^2
AJ Heller 2 years ago committed by GitHub
parent 5680a9b57b
commit 6778bb3501
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      BUILD
  2. 14
      CMakeLists.txt
  3. 2
      Makefile
  4. 33
      build_autogenerated.yaml
  5. 1
      config.m4
  6. 1
      config.w32
  7. 2
      gRPC-C++.podspec
  8. 3
      gRPC-Core.podspec
  9. 2
      grpc.gemspec
  10. 3
      grpc.gyp
  11. 2
      package.xml
  12. 29
      src/core/BUILD
  13. 7
      src/core/lib/event_engine/handle_containers.h
  14. 3
      src/core/lib/event_engine/tcp_socket_utils.cc
  15. 4
      src/core/lib/event_engine/utils.cc
  16. 10
      src/core/lib/event_engine/utils.h
  17. 18
      src/core/lib/event_engine/windows/iocp.cc
  18. 4
      src/core/lib/event_engine/windows/win_socket.cc
  19. 18
      src/core/lib/event_engine/windows/windows_endpoint.cc
  20. 256
      src/core/lib/event_engine/windows/windows_engine.cc
  21. 69
      src/core/lib/event_engine/windows/windows_engine.h
  22. 1
      src/python/grpcio/grpc_core_dependencies.py
  23. 1
      test/core/end2end/BUILD
  24. 2
      test/core/event_engine/BUILD
  25. 2
      test/core/util/BUILD
  26. 2
      tools/doxygen/Doxyfile.c++.internal
  27. 2
      tools/doxygen/Doxyfile.core.internal

@ -1179,7 +1179,6 @@ grpc_cc_library(
"//src/core:lib/compression/compression.cc",
"//src/core:lib/compression/compression_internal.cc",
"//src/core:lib/compression/message_compress.cc",
"//src/core:lib/event_engine/channel_args_endpoint_config.cc",
"//src/core:lib/iomgr/buffer_list.cc",
"//src/core:lib/iomgr/call_combiner.cc",
"//src/core:lib/iomgr/cfstream_handle.cc",
@ -1284,7 +1283,6 @@ grpc_cc_library(
"//src/core:lib/channel/status_util.h",
"//src/core:lib/compression/compression_internal.h",
"//src/core:lib/compression/message_compress.h",
"//src/core:lib/event_engine/channel_args_endpoint_config.h",
"//src/core:lib/iomgr/block_annotate.h",
"//src/core:lib/iomgr/buffer_list.h",
"//src/core:lib/iomgr/call_combiner.h",
@ -1407,6 +1405,7 @@ grpc_cc_library(
"//src/core:atomic_utils",
"//src/core:bitset",
"//src/core:channel_args",
"//src/core:channel_args_endpoint_config",
"//src/core:channel_args_preconditioning",
"//src/core:channel_fwd",
"//src/core:channel_init",

14
CMakeLists.txt generated

@ -2177,6 +2177,7 @@ add_library(grpc
src/core/lib/event_engine/utils.cc
src/core/lib/event_engine/windows/iocp.cc
src/core/lib/event_engine/windows/win_socket.cc
src/core/lib/event_engine/windows/windows_endpoint.cc
src/core/lib/event_engine/windows/windows_engine.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
@ -2847,6 +2848,7 @@ add_library(grpc_unsecure
src/core/lib/event_engine/utils.cc
src/core/lib/event_engine/windows/iocp.cc
src/core/lib/event_engine/windows/win_socket.cc
src/core/lib/event_engine/windows/windows_endpoint.cc
src/core/lib/event_engine/windows/windows_engine.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
@ -4334,6 +4336,7 @@ add_library(grpc_authorization_provider
src/core/lib/event_engine/utils.cc
src/core/lib/event_engine/windows/iocp.cc
src/core/lib/event_engine/windows/win_socket.cc
src/core/lib/event_engine/windows/windows_endpoint.cc
src/core/lib/event_engine/windows/windows_engine.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
@ -9962,6 +9965,10 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(endpoint_config_test
src/core/lib/channel/channel_args.cc
src/core/lib/event_engine/channel_args_endpoint_config.cc
src/core/lib/gprpp/time.cc
src/core/lib/surface/channel_stack_type.cc
test/core/event_engine/endpoint_config_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@ -9991,7 +9998,10 @@ target_link_libraries(endpoint_config_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc
absl::any_invocable
absl::type_traits
absl::statusor
gpr
)
@ -11343,6 +11353,7 @@ add_executable(frame_test
src/core/lib/event_engine/utils.cc
src/core/lib/event_engine/windows/iocp.cc
src/core/lib/event_engine/windows/win_socket.cc
src/core/lib/event_engine/windows/windows_endpoint.cc
src/core/lib/event_engine/windows/windows_engine.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
@ -21909,7 +21920,6 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX OR _gRPC_PLATFORM_WINDOWS)
add_executable(windows_endpoint_test
src/core/lib/event_engine/windows/windows_endpoint.cc
test/core/event_engine/windows/create_sockpair.cc
test/core/event_engine/windows/windows_endpoint_test.cc
third_party/googletest/googletest/src/gtest-all.cc

2
Makefile generated

@ -1445,6 +1445,7 @@ LIBGRPC_SRC = \
src/core/lib/event_engine/utils.cc \
src/core/lib/event_engine/windows/iocp.cc \
src/core/lib/event_engine/windows/win_socket.cc \
src/core/lib/event_engine/windows/windows_endpoint.cc \
src/core/lib/event_engine/windows/windows_engine.cc \
src/core/lib/experiments/config.cc \
src/core/lib/experiments/experiments.cc \
@ -1974,6 +1975,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/event_engine/utils.cc \
src/core/lib/event_engine/windows/iocp.cc \
src/core/lib/event_engine/windows/win_socket.cc \
src/core/lib/event_engine/windows/windows_endpoint.cc \
src/core/lib/event_engine/windows/windows_engine.cc \
src/core/lib/experiments/config.cc \
src/core/lib/experiments/experiments.cc \

@ -807,6 +807,7 @@ libs:
- src/core/lib/event_engine/utils.h
- src/core/lib/event_engine/windows/iocp.h
- src/core/lib/event_engine/windows/win_socket.h
- src/core/lib/event_engine/windows/windows_endpoint.h
- src/core/lib/event_engine/windows/windows_engine.h
- src/core/lib/experiments/config.h
- src/core/lib/experiments/experiments.h
@ -1566,6 +1567,7 @@ libs:
- src/core/lib/event_engine/utils.cc
- src/core/lib/event_engine/windows/iocp.cc
- src/core/lib/event_engine/windows/win_socket.cc
- src/core/lib/event_engine/windows/windows_endpoint.cc
- src/core/lib/event_engine/windows/windows_engine.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
@ -2116,6 +2118,7 @@ libs:
- src/core/lib/event_engine/utils.h
- src/core/lib/event_engine/windows/iocp.h
- src/core/lib/event_engine/windows/win_socket.h
- src/core/lib/event_engine/windows/windows_endpoint.h
- src/core/lib/event_engine/windows/windows_engine.h
- src/core/lib/experiments/config.h
- src/core/lib/experiments/experiments.h
@ -2495,6 +2498,7 @@ libs:
- src/core/lib/event_engine/utils.cc
- src/core/lib/event_engine/windows/iocp.cc
- src/core/lib/event_engine/windows/win_socket.cc
- src/core/lib/event_engine/windows/windows_endpoint.cc
- src/core/lib/event_engine/windows/windows_engine.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
@ -3553,6 +3557,7 @@ libs:
- src/core/lib/event_engine/utils.h
- src/core/lib/event_engine/windows/iocp.h
- src/core/lib/event_engine/windows/win_socket.h
- src/core/lib/event_engine/windows/windows_endpoint.h
- src/core/lib/event_engine/windows/windows_engine.h
- src/core/lib/experiments/config.h
- src/core/lib/experiments/experiments.h
@ -3812,6 +3817,7 @@ libs:
- src/core/lib/event_engine/utils.cc
- src/core/lib/event_engine/windows/iocp.cc
- src/core/lib/event_engine/windows/win_socket.cc
- src/core/lib/event_engine/windows/windows_endpoint.cc
- src/core/lib/event_engine/windows/windows_engine.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
@ -6431,11 +6437,30 @@ targets:
gtest: true
build: test
language: c++
headers: []
headers:
- src/core/lib/avl/avl.h
- src/core/lib/channel/channel_args.h
- src/core/lib/event_engine/channel_args_endpoint_config.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/match.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/overload.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
- src/core/lib/gprpp/time.h
- src/core/lib/surface/channel_stack_type.h
src:
- src/core/lib/channel/channel_args.cc
- src/core/lib/event_engine/channel_args_endpoint_config.cc
- src/core/lib/gprpp/time.cc
- src/core/lib/surface/channel_stack_type.cc
- test/core/event_engine/endpoint_config_test.cc
deps:
- grpc
- absl/functional:any_invocable
- absl/meta:type_traits
- absl/status:statusor
- gpr
uses_polling: false
- name: endpoint_pair_test
gtest: true
@ -7293,6 +7318,7 @@ targets:
- src/core/lib/event_engine/utils.h
- src/core/lib/event_engine/windows/iocp.h
- src/core/lib/event_engine/windows/win_socket.h
- src/core/lib/event_engine/windows/windows_endpoint.h
- src/core/lib/event_engine/windows/windows_engine.h
- src/core/lib/experiments/config.h
- src/core/lib/experiments/experiments.h
@ -7534,6 +7560,7 @@ targets:
- src/core/lib/event_engine/utils.cc
- src/core/lib/event_engine/windows/iocp.cc
- src/core/lib/event_engine/windows/win_socket.cc
- src/core/lib/event_engine/windows/windows_endpoint.cc
- src/core/lib/event_engine/windows/windows_engine.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
@ -12278,10 +12305,8 @@ targets:
build: test
language: c++
headers:
- src/core/lib/event_engine/windows/windows_endpoint.h
- test/core/event_engine/windows/create_sockpair.h
src:
- src/core/lib/event_engine/windows/windows_endpoint.cc
- test/core/event_engine/windows/create_sockpair.cc
- test/core/event_engine/windows/windows_endpoint_test.cc
deps:

1
config.m4 generated

@ -527,6 +527,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/event_engine/utils.cc \
src/core/lib/event_engine/windows/iocp.cc \
src/core/lib/event_engine/windows/win_socket.cc \
src/core/lib/event_engine/windows/windows_endpoint.cc \
src/core/lib/event_engine/windows/windows_engine.cc \
src/core/lib/experiments/config.cc \
src/core/lib/experiments/experiments.cc \

1
config.w32 generated

@ -493,6 +493,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\event_engine\\utils.cc " +
"src\\core\\lib\\event_engine\\windows\\iocp.cc " +
"src\\core\\lib\\event_engine\\windows\\win_socket.cc " +
"src\\core\\lib\\event_engine\\windows\\windows_endpoint.cc " +
"src\\core\\lib\\event_engine\\windows\\windows_engine.cc " +
"src\\core\\lib\\experiments\\config.cc " +
"src\\core\\lib\\experiments\\experiments.cc " +

2
gRPC-C++.podspec generated

@ -757,6 +757,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/utils.h',
'src/core/lib/event_engine/windows/iocp.h',
'src/core/lib/event_engine/windows/win_socket.h',
'src/core/lib/event_engine/windows/windows_endpoint.h',
'src/core/lib/event_engine/windows/windows_engine.h',
'src/core/lib/experiments/config.h',
'src/core/lib/experiments/experiments.h',
@ -1670,6 +1671,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/utils.h',
'src/core/lib/event_engine/windows/iocp.h',
'src/core/lib/event_engine/windows/win_socket.h',
'src/core/lib/event_engine/windows/windows_endpoint.h',
'src/core/lib/event_engine/windows/windows_engine.h',
'src/core/lib/experiments/config.h',
'src/core/lib/experiments/experiments.h',

3
gRPC-Core.podspec generated

@ -1174,6 +1174,8 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/windows/iocp.h',
'src/core/lib/event_engine/windows/win_socket.cc',
'src/core/lib/event_engine/windows/win_socket.h',
'src/core/lib/event_engine/windows/windows_endpoint.cc',
'src/core/lib/event_engine/windows/windows_endpoint.h',
'src/core/lib/event_engine/windows/windows_engine.cc',
'src/core/lib/event_engine/windows/windows_engine.h',
'src/core/lib/experiments/config.cc',
@ -2339,6 +2341,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/utils.h',
'src/core/lib/event_engine/windows/iocp.h',
'src/core/lib/event_engine/windows/win_socket.h',
'src/core/lib/event_engine/windows/windows_endpoint.h',
'src/core/lib/event_engine/windows/windows_engine.h',
'src/core/lib/experiments/config.h',
'src/core/lib/experiments/experiments.h',

2
grpc.gemspec generated

@ -1085,6 +1085,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/windows/iocp.h )
s.files += %w( src/core/lib/event_engine/windows/win_socket.cc )
s.files += %w( src/core/lib/event_engine/windows/win_socket.h )
s.files += %w( src/core/lib/event_engine/windows/windows_endpoint.cc )
s.files += %w( src/core/lib/event_engine/windows/windows_endpoint.h )
s.files += %w( src/core/lib/event_engine/windows/windows_engine.cc )
s.files += %w( src/core/lib/event_engine/windows/windows_engine.h )
s.files += %w( src/core/lib/experiments/config.cc )

3
grpc.gyp generated

@ -858,6 +858,7 @@
'src/core/lib/event_engine/utils.cc',
'src/core/lib/event_engine/windows/iocp.cc',
'src/core/lib/event_engine/windows/win_socket.cc',
'src/core/lib/event_engine/windows/windows_endpoint.cc',
'src/core/lib/event_engine/windows/windows_engine.cc',
'src/core/lib/experiments/config.cc',
'src/core/lib/experiments/experiments.cc',
@ -1329,6 +1330,7 @@
'src/core/lib/event_engine/utils.cc',
'src/core/lib/event_engine/windows/iocp.cc',
'src/core/lib/event_engine/windows/win_socket.cc',
'src/core/lib/event_engine/windows/windows_endpoint.cc',
'src/core/lib/event_engine/windows/windows_engine.cc',
'src/core/lib/experiments/config.cc',
'src/core/lib/experiments/experiments.cc',
@ -1827,6 +1829,7 @@
'src/core/lib/event_engine/utils.cc',
'src/core/lib/event_engine/windows/iocp.cc',
'src/core/lib/event_engine/windows/win_socket.cc',
'src/core/lib/event_engine/windows/windows_endpoint.cc',
'src/core/lib/event_engine/windows/windows_engine.cc',
'src/core/lib/experiments/config.cc',
'src/core/lib/experiments/experiments.cc',

2
package.xml generated

@ -1067,6 +1067,8 @@
<file baseinstalldir="/" name="src/core/lib/event_engine/windows/iocp.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/windows/win_socket.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/windows/win_socket.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/windows/windows_endpoint.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/windows/windows_endpoint.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/windows/windows_engine.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/windows/windows_engine.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/experiments/config.cc" role="src" />

@ -938,6 +938,7 @@ grpc_cc_library(
],
deps = [
"channel_args",
"channel_args_endpoint_config",
"closure",
"error",
"handshaker_factory",
@ -1903,13 +1904,19 @@ grpc_cc_library(
"absl/strings",
],
deps = [
"channel_args_endpoint_config",
"common_event_engine_closures",
"error",
"event_engine_common",
"event_engine_executor",
"event_engine_tcp_socket_utils",
"event_engine_thread_pool",
"event_engine_trace",
"event_engine_utils",
"init_internally",
"posix_event_engine_timer_manager",
"time",
"windows_endpoint",
"windows_iocp",
"//:event_engine_base_hdrs",
"//:gpr",
@ -2032,6 +2039,26 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "channel_args_endpoint_config",
srcs = [
"//src/core:lib/event_engine/channel_args_endpoint_config.cc",
],
hdrs = [
"//src/core:lib/event_engine/channel_args_endpoint_config.h",
],
external_deps = [
"absl/strings",
"absl/types:optional",
],
visibility = ["@grpc:alt_grpc_base_legacy"],
deps = [
"channel_args",
"//:event_engine_base_hdrs",
"//:gpr_platform",
],
)
grpc_cc_library(
name = "default_event_engine",
srcs = [
@ -4904,6 +4931,7 @@ grpc_cc_library(
language = "c++",
deps = [
"channel_args",
"channel_args_endpoint_config",
"channel_args_preconditioning",
"channel_stack_type",
"closure",
@ -4953,6 +4981,7 @@ grpc_cc_library(
language = "c++",
deps = [
"channel_args",
"channel_args_endpoint_config",
"closure",
"error",
"grpc_insecure_credentials",

@ -54,6 +54,13 @@ using TaskHandleSet = absl::flat_hash_set<
TaskHandleComparator<
grpc_event_engine::experimental::EventEngine::TaskHandle>::Eq>;
using ConnectionHandleSet = absl::flat_hash_set<
grpc_event_engine::experimental::EventEngine::ConnectionHandle,
TaskHandleComparator<
grpc_event_engine::experimental::EventEngine::ConnectionHandle>::Hash,
TaskHandleComparator<
grpc_event_engine::experimental::EventEngine::ConnectionHandle>::Eq>;
using LookupTaskHandleSet = absl::flat_hash_set<
grpc_event_engine::experimental::EventEngine::DNSResolver::LookupTaskHandle,
TaskHandleComparator<grpc_event_engine::experimental::EventEngine::

@ -187,8 +187,7 @@ bool ResolvedAddressToV4Mapped(
memcpy(&addr6_out->sin6_addr.s6_addr[12], &addr4->sin_addr, 4);
addr6_out->sin6_port = addr4->sin_port;
*resolved_addr6_out = EventEngine::ResolvedAddress(
reinterpret_cast<sockaddr*>(addr6_out),
static_cast<socklen_t>(sizeof(sockaddr_in6)));
reinterpret_cast<sockaddr*>(addr6_out), sizeof(sockaddr_in6));
return true;
}
return false;

@ -28,8 +28,8 @@
namespace grpc_event_engine {
namespace experimental {
std::string HandleToString(EventEngine::TaskHandle handle) {
return absl::StrCat("{", handle.keys[0], ",", handle.keys[1], "}");
std::string HandleToStringInternal(uintptr_t a, uintptr_t b) {
return absl::StrCat("{", a, ",", b, "}");
}
grpc_core::Timestamp ToTimestamp(grpc_core::Timestamp now,

@ -16,6 +16,8 @@
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <string>
#include <grpc/event_engine/event_engine.h>
@ -25,7 +27,13 @@
namespace grpc_event_engine {
namespace experimental {
std::string HandleToString(EventEngine::TaskHandle handle);
std::string HandleToStringInternal(uintptr_t a, uintptr_t b);
// Returns a string representation of the EventEngine::*Handle types
template <typename Handle>
std::string HandleToString(const Handle& handle) {
return HandleToStringInternal(handle.keys[0], handle.keys[1]);
}
grpc_core::Timestamp ToTimestamp(grpc_core::Timestamp now,
EventEngine::Duration delta);

@ -33,8 +33,8 @@ namespace experimental {
IOCP::IOCP(Executor* executor) noexcept
: executor_(executor),
iocp_handle_(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
(ULONG_PTR)NULL, 0)) {
iocp_handle_(CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr,
(ULONG_PTR) nullptr, 0)) {
GPR_ASSERT(iocp_handle_);
WSASocketFlagsInit();
}
@ -59,6 +59,10 @@ std::unique_ptr<WinSocket> IOCP::Watch(SOCKET socket) {
}
void IOCP::Shutdown() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
gpr_log(GPR_DEBUG, "IOCP::%p shutting down. Outstanding kicks: %d", this,
outstanding_kicks_.load());
}
while (outstanding_kicks_.load() > 0) {
Work(std::chrono::hours(42), []() {});
}
@ -67,10 +71,6 @@ void IOCP::Shutdown() {
Poller::WorkResult IOCP::Work(EventEngine::Duration timeout,
absl::FunctionRef<void()> schedule_poll_again) {
static const absl::Status kDeadlineExceeded = absl::DeadlineExceededError(
absl::StrFormat("IOCP::%p: Received no completions", this));
static const absl::Status kKicked =
absl::AbortedError(absl::StrFormat("IOCP::%p: Awoken from a kick", this));
DWORD bytes = 0;
ULONG_PTR completion_key;
LPOVERLAPPED overlapped;
@ -80,7 +80,7 @@ Poller::WorkResult IOCP::Work(EventEngine::Duration timeout,
BOOL success = GetQueuedCompletionStatus(
iocp_handle_, &bytes, &completion_key, &overlapped,
static_cast<DWORD>(Milliseconds(timeout)));
if (success == 0 && overlapped == NULL) {
if (success == 0 && overlapped == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
gpr_log(GPR_DEBUG, "IOCP::%p deadline exceeded", this);
}
@ -96,7 +96,7 @@ Poller::WorkResult IOCP::Work(EventEngine::Duration timeout,
return Poller::WorkResult::kKicked;
}
grpc_core::Crash(
absl::StrFormat("Unknown custom completion key: %p", completion_key));
absl::StrFormat("Unknown custom completion key: %lu", completion_key));
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
gpr_log(GPR_DEBUG, "IOCP::%p got event on OVERLAPPED::%p", this,
@ -140,7 +140,7 @@ DWORD IOCP::WSASocketFlagsInit() {
// versions, see
// https://msdn.microsoft.com/en-us/library/windows/desktop/ms742212(v=vs.85).aspx
// for details.
SOCKET sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
SOCKET sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, nullptr, 0,
wsa_socket_flags | WSA_FLAG_NO_HANDLE_INHERIT);
if (sock != INVALID_SOCKET) {
// Windows 7, Windows 2008 R2 with SP1 or later

@ -98,7 +98,9 @@ void WinSocket::NotifyOnWrite(EventEngine::Closure* on_write) {
}
WinSocket::OpState::OpState(WinSocket* win_socket) noexcept
: win_socket_(win_socket), closure_(nullptr) {}
: win_socket_(win_socket), closure_(nullptr) {
memset(&overlapped_, 0, sizeof(OVERLAPPED));
}
void WinSocket::OpState::SetReady() {
GPR_ASSERT(!has_pending_iocp_);

@ -60,12 +60,16 @@ WindowsEndpoint::WindowsEndpoint(
handle_read_event_(this),
handle_write_event_(this),
executor_(executor) {
sockaddr addr;
char addr[EventEngine::ResolvedAddress::MAX_SIZE_BYTES];
int addr_len = sizeof(addr);
if (getsockname(socket_->socket(), &addr, &addr_len) < 0) {
grpc_core::Crash("Unrecoverable error: Failed to get local socket name.");
}
local_address_ = EventEngine::ResolvedAddress(&addr, addr_len);
if (getsockname(socket_->socket(), reinterpret_cast<sockaddr*>(addr),
&addr_len) < 0) {
grpc_core::Crash(absl::StrFormat(
"Unrecoverable error: Failed to get local socket name. %s",
GRPC_WSA_ERROR(WSAGetLastError(), "getsockname").ToString().c_str()));
}
local_address_ =
EventEngine::ResolvedAddress(reinterpret_cast<sockaddr*>(addr), addr_len);
local_address_string_ = *ResolvedAddressToURI(local_address_);
peer_address_string_ = *ResolvedAddressToURI(peer_address_);
}
@ -151,7 +155,7 @@ void WindowsEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
DWORD bytes_sent;
int status = WSASend(socket_->socket(), buffers.data(), (DWORD)buffers.size(),
&bytes_sent, 0, nullptr, nullptr);
size_t async_buffers_offset;
size_t async_buffers_offset = 0;
if (status == 0) {
if (bytes_sent == data->Length()) {
// Write completed, exiting early
@ -212,7 +216,7 @@ const EventEngine::ResolvedAddress& WindowsEndpoint::GetLocalAddress() const {
// ---- Handle{Read|Write}Closure
WindowsEndpoint::BaseEventClosure::BaseEventClosure(WindowsEndpoint* endpoint)
: endpoint_(endpoint), cb_(&AbortOnEvent) {}
: cb_(&AbortOnEvent), endpoint_(endpoint) {}
void WindowsEndpoint::HandleReadClosure::Run() {
GRPC_EVENT_ENGINE_TRACE("WindowsEndpoint::%p Handling Read Event", endpoint_);

@ -26,34 +26,68 @@
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/slice_buffer.h>
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/common_closures.h"
#include "src/core/lib/event_engine/executor/executor.h"
#include "src/core/lib/event_engine/handle_containers.h"
#include "src/core/lib/event_engine/posix_engine/timer_manager.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/event_engine/utils.h"
#include "src/core/lib/event_engine/windows/iocp.h"
#include "src/core/lib/event_engine/windows/windows_endpoint.h"
#include "src/core/lib/event_engine/windows/windows_engine.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/error.h"
namespace grpc_event_engine {
namespace experimental {
// ---- IOCPWorkClosure ----
WindowsEventEngine::IOCPWorkClosure::IOCPWorkClosure(Executor* executor,
IOCP* iocp)
: executor_(executor), iocp_(iocp) {
executor_->Run(this);
}
void WindowsEventEngine::IOCPWorkClosure::Run() {
auto result = iocp_->Work(std::chrono::seconds(60), [this] {
workers_.fetch_add(1);
executor_->Run(this);
});
if (result == Poller::WorkResult::kDeadlineExceeded) {
// iocp received no messages. restart the worker
workers_.fetch_add(1);
executor_->Run(this);
}
if (workers_.fetch_sub(1) == 1) done_signal_.Notify();
}
void WindowsEventEngine::IOCPWorkClosure::WaitForShutdown() {
done_signal_.WaitForNotification();
}
// ---- WindowsEventEngine ----
// TODO(hork): The iomgr timer and execution engine can be reused. It should
// be separated out from the posix_engine and instantiated as components. It is
// effectively copied below.
struct WindowsEventEngine::Closure final : public EventEngine::Closure {
struct WindowsEventEngine::TimerClosure final : public EventEngine::Closure {
absl::AnyInvocable<void()> cb;
Timer timer;
WindowsEventEngine* engine;
EventEngine::TaskHandle handle;
void Run() override {
GRPC_EVENT_ENGINE_TRACE("WindowsEventEngine:%p executing callback:%s",
engine, HandleToString(handle).c_str());
GRPC_EVENT_ENGINE_TRACE(
"WindowsEventEngine:%p executing callback:%s", engine,
HandleToString<EventEngine::TaskHandle>(handle).c_str());
{
grpc_core::MutexLock lock(&engine->mu_);
grpc_core::MutexLock lock(&engine->task_mu_);
engine->known_handles_.erase(handle);
}
cb();
@ -64,7 +98,8 @@ struct WindowsEventEngine::Closure final : public EventEngine::Closure {
WindowsEventEngine::WindowsEventEngine()
: executor_(std::make_shared<ThreadPool>()),
iocp_(executor_.get()),
timer_manager_(executor_) {
timer_manager_(executor_),
iocp_worker_(executor_.get(), &iocp_) {
WSADATA wsaData;
int status = WSAStartup(MAKEWORD(2, 0), &wsaData);
GPR_ASSERT(status == 0);
@ -72,25 +107,28 @@ WindowsEventEngine::WindowsEventEngine()
WindowsEventEngine::~WindowsEventEngine() {
{
grpc_core::MutexLock lock(&mu_);
grpc_core::MutexLock lock(&task_mu_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
for (auto handle : known_handles_) {
gpr_log(GPR_ERROR,
"WindowsEventEngine:%p uncleared TaskHandle at shutdown:%s",
this, HandleToString(handle).c_str());
this, HandleToString<EventEngine::TaskHandle>(handle).c_str());
}
}
GPR_ASSERT(GPR_LIKELY(known_handles_.empty()));
}
iocp_.Kick();
iocp_worker_.WaitForShutdown();
iocp_.Shutdown();
GPR_ASSERT(WSACleanup() == 0);
timer_manager_.Shutdown();
}
executor_->Quiesce();
}
bool WindowsEventEngine::Cancel(EventEngine::TaskHandle handle) {
grpc_core::MutexLock lock(&mu_);
grpc_core::MutexLock lock(&task_mu_);
if (!known_handles_.contains(handle)) return false;
auto* cd = reinterpret_cast<Closure*>(handle.keys[0]);
auto* cd = reinterpret_cast<TimerClosure*>(handle.keys[0]);
bool r = timer_manager_.TimerCancel(&cd->timer);
known_handles_.erase(handle);
if (r) delete cd;
@ -118,16 +156,17 @@ void WindowsEventEngine::Run(EventEngine::Closure* closure) {
EventEngine::TaskHandle WindowsEventEngine::RunAfterInternal(
Duration when, absl::AnyInvocable<void()> cb) {
auto when_ts = ToTimestamp(timer_manager_.Now(), when);
auto* cd = new Closure;
auto* cd = new TimerClosure;
cd->cb = std::move(cb);
cd->engine = this;
EventEngine::TaskHandle handle{reinterpret_cast<intptr_t>(cd),
aba_token_.fetch_add(1)};
grpc_core::MutexLock lock(&mu_);
grpc_core::MutexLock lock(&task_mu_);
known_handles_.insert(handle);
cd->handle = handle;
GRPC_EVENT_ENGINE_TRACE("WindowsEventEngine:%p scheduling callback:%s", this,
HandleToString(handle).c_str());
GRPC_EVENT_ENGINE_TRACE(
"WindowsEventEngine:%p scheduling callback:%s", this,
HandleToString<EventEngine::TaskHandle>(handle).c_str());
timer_manager_.TimerInit(&cd->timer, when_ts, cd);
return handle;
}
@ -139,15 +178,194 @@ std::unique_ptr<EventEngine::DNSResolver> WindowsEventEngine::GetDNSResolver(
bool WindowsEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }
bool WindowsEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) {
grpc_core::Crash("unimplemented");
void WindowsEventEngine::OnConnectCompleted(
std::shared_ptr<ConnectionState> state) {
// Connection attempt complete!
grpc_core::MutexLock lock(&state->mu);
state->on_connected = nullptr;
{
grpc_core::MutexLock handle_lock(&connection_mu_);
known_connection_handles_.erase(state->connection_handle);
}
// return early if we cannot cancel the connection timeout timer.
if (!Cancel(state->timer_handle)) return;
auto write_info = state->socket->write_info();
if (write_info->wsa_error() != 0) {
auto error = GRPC_WSA_ERROR(write_info->wsa_error(), "ConnectEx");
state->socket->MaybeShutdown(error);
state->on_connected_user_callback(error);
return;
}
// This code should be running in an executor thread already, so the callback
// can be run directly.
ChannelArgsEndpointConfig cfg;
state->on_connected_user_callback(std::make_unique<WindowsEndpoint>(
state->address, std::move(state->socket), std::move(state->allocator),
cfg, executor_.get()));
}
EventEngine::ConnectionHandle WindowsEventEngine::Connect(
OnConnectCallback on_connect, const ResolvedAddress& addr,
const EndpointConfig& args, MemoryAllocator memory_allocator,
Duration deadline) {
grpc_core::Crash("unimplemented");
const EndpointConfig& /* args */, MemoryAllocator memory_allocator,
Duration timeout) {
// TODO(hork): utilize the endpoint config
absl::Status status;
int istatus;
auto uri = ResolvedAddressToURI(addr);
if (!uri.ok()) {
Run([on_connect = std::move(on_connect), status = uri.status()]() mutable {
on_connect(status);
});
return invalid_connection_handle;
}
GRPC_EVENT_ENGINE_TRACE("EventEngine::%p connecting to %s", this,
uri->c_str());
// Use dualstack sockets where available.
ResolvedAddress address = addr;
ResolvedAddress addr6_v4mapped;
if (ResolvedAddressToV4Mapped(addr, &addr6_v4mapped)) {
address = addr6_v4mapped;
}
SOCKET sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, nullptr, 0,
IOCP::GetDefaultSocketFlags());
if (sock == INVALID_SOCKET) {
Run([on_connect = std::move(on_connect),
status = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket")]() mutable {
on_connect(status);
});
return invalid_connection_handle;
}
status = PrepareSocket(sock);
if (!status.ok()) {
Run([on_connect = std::move(on_connect), status]() mutable {
on_connect(status);
});
return invalid_connection_handle;
}
// Grab the function pointer for ConnectEx for that specific socket It may
// change depending on the interface.
LPFN_CONNECTEX ConnectEx;
GUID guid = WSAID_CONNECTEX;
DWORD ioctl_num_bytes;
istatus = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid,
sizeof(guid), &ConnectEx, sizeof(ConnectEx),
&ioctl_num_bytes, nullptr, nullptr);
if (istatus != 0) {
Run([on_connect = std::move(on_connect),
status = GRPC_WSA_ERROR(
WSAGetLastError(),
"WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER)")]() mutable {
on_connect(status);
});
return invalid_connection_handle;
}
// bind the local address
auto local_address = ResolvedAddressMakeWild6(0);
istatus = bind(sock, local_address.address(), local_address.size());
if (istatus != 0) {
Run([on_connect = std::move(on_connect),
status = GRPC_WSA_ERROR(WSAGetLastError(), "bind")]() mutable {
on_connect(status);
});
return invalid_connection_handle;
}
// Connect
auto watched_socket = iocp_.Watch(sock);
auto* info = watched_socket->write_info();
bool success =
ConnectEx(watched_socket->socket(), address.address(), address.size(),
nullptr, 0, nullptr, info->overlapped());
// It wouldn't be unusual to get a success immediately. But we'll still get an
// IOCP notification, so let's ignore it.
if (!success) {
int last_error = WSAGetLastError();
if (last_error != ERROR_IO_PENDING) {
auto status = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
Run([on_connect = std::move(on_connect), status]() mutable {
on_connect(status);
});
watched_socket->MaybeShutdown(status);
return invalid_connection_handle;
}
}
GPR_ASSERT(watched_socket != nullptr);
auto connection_state = std::make_shared<ConnectionState>();
grpc_core::MutexLock lock(&connection_state->mu);
connection_state->address = address;
connection_state->socket = std::move(watched_socket);
connection_state->on_connected_user_callback = std::move(on_connect);
connection_state->allocator = std::move(memory_allocator);
connection_state->on_connected =
SelfDeletingClosure::Create([this, connection_state]() mutable {
OnConnectCompleted(std::move(connection_state));
});
{
grpc_core::MutexLock conn_lock(&connection_mu_);
connection_state->connection_handle =
ConnectionHandle{reinterpret_cast<intptr_t>(connection_state.get()),
aba_token_.fetch_add(1)};
known_connection_handles_.insert(connection_state->connection_handle);
}
connection_state->timer_handle =
RunAfter(timeout, [this, connection_state]() {
grpc_core::MutexLock lock(&connection_state->mu);
if (CancelConnectFromDeadlineTimer(connection_state.get())) {
connection_state->on_connected_user_callback(
absl::DeadlineExceededError("Connection timed out"));
}
// else: The connection attempt could not be canceled. We can assume the
// connection callback will be called.
});
connection_state->socket->NotifyOnWrite(connection_state->on_connected);
return connection_state->connection_handle;
}
bool WindowsEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) {
if (TaskHandleComparator<ConnectionHandle>::Eq()(handle,
invalid_connection_handle)) {
GRPC_EVENT_ENGINE_TRACE("%s",
"Attempted to cancel an invalid connection handle");
return false;
}
// Erase the connection handle, which may be unknown
{
grpc_core::MutexLock lock(&connection_mu_);
if (!known_connection_handles_.contains(handle)) {
GRPC_EVENT_ENGINE_TRACE(
"Unknown connection handle: %s",
HandleToString<EventEngine::ConnectionHandle>(handle).c_str());
return false;
}
known_connection_handles_.erase(handle);
}
auto* connection_state = reinterpret_cast<ConnectionState*>(handle.keys[0]);
grpc_core::MutexLock state_lock(&connection_state->mu);
if (!Cancel(connection_state->timer_handle)) return false;
return CancelConnectInternalStateLocked(connection_state);
}
bool WindowsEventEngine::CancelConnectFromDeadlineTimer(
ConnectionState* connection_state) {
// Erase the connection handle, which is guaranteed to exist.
{
grpc_core::MutexLock lock(&connection_mu_);
GPR_ASSERT(known_connection_handles_.erase(
connection_state->connection_handle) == 1);
}
return CancelConnectInternalStateLocked(connection_state);
}
bool WindowsEventEngine::CancelConnectInternalStateLocked(
ConnectionState* connection_state) {
connection_state->socket->MaybeShutdown(
absl::CancelledError("CancelConnect"));
// Release the connection_state shared_ptr. connection_state is now invalid.
delete connection_state->on_connected;
GRPC_EVENT_ENGINE_TRACE("Successfully cancelled connection %s",
HandleToString<EventEngine::ConnectionHandle>(
connection_state->connection_handle)
.c_str());
return true;
}
absl::StatusOr<std::unique_ptr<EventEngine::Listener>>

@ -32,6 +32,7 @@
#include "src/core/lib/event_engine/posix_engine/timer_manager.h"
#include "src/core/lib/event_engine/thread_pool.h"
#include "src/core/lib/event_engine/windows/iocp.h"
#include "src/core/lib/event_engine/windows/windows_endpoint.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/surface/init_internally.h"
@ -44,16 +45,10 @@ namespace experimental {
class WindowsEventEngine : public EventEngine,
public grpc_core::KeepsGrpcInitialized {
public:
class WindowsEndpoint : public EventEngine::Endpoint {
public:
~WindowsEndpoint() override;
void Read(absl::AnyInvocable<void(absl::Status)> on_read,
SliceBuffer* buffer, const ReadArgs* args) override;
void Write(absl::AnyInvocable<void(absl::Status)> on_writable,
SliceBuffer* data, const WriteArgs* args) override;
const ResolvedAddress& GetPeerAddress() const override;
const ResolvedAddress& GetLocalAddress() const override;
};
constexpr static TaskHandle invalid_handle{-1, -1};
constexpr static EventEngine::ConnectionHandle invalid_connection_handle{-1,
-1};
class WindowsListener : public EventEngine::Listener {
public:
~WindowsListener() override;
@ -104,16 +99,64 @@ class WindowsEventEngine : public EventEngine,
bool Cancel(TaskHandle handle) override;
private:
struct Closure;
// State of an active connection.
// Managed by a shared_ptr, owned exclusively by the timeout callback and the
// OnConnectCompleted callback herein.
struct ConnectionState {
// everything is guarded by mu;
grpc_core::Mutex mu
ABSL_ACQUIRED_BEFORE(WindowsEventEngine::connection_mu_);
EventEngine::ConnectionHandle connection_handle ABSL_GUARDED_BY(mu);
EventEngine::TaskHandle timer_handle ABSL_GUARDED_BY(mu);
EventEngine::OnConnectCallback on_connected_user_callback
ABSL_GUARDED_BY(mu);
EventEngine::Closure* on_connected ABSL_GUARDED_BY(mu);
std::unique_ptr<WinSocket> socket ABSL_GUARDED_BY(mu);
EventEngine::ResolvedAddress address ABSL_GUARDED_BY(mu);
MemoryAllocator allocator ABSL_GUARDED_BY(mu);
};
// A poll worker which schedules itself unless kicked
class IOCPWorkClosure : public EventEngine::Closure {
public:
explicit IOCPWorkClosure(Executor* executor, IOCP* iocp);
void Run() override;
void WaitForShutdown();
private:
std::atomic<int> workers_{1};
grpc_core::Notification done_signal_;
Executor* executor_;
IOCP* iocp_;
};
void OnConnectCompleted(std::shared_ptr<ConnectionState> state);
// CancelConnect called from within the deadline timer.
// In this case, the connection_state->mu is already locked, and timer
// cancellation is not possible.
bool CancelConnectFromDeadlineTimer(ConnectionState* connection_state)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(connection_state->mu);
// Completes the connection cancellation logic after checking handle validity
// and optionally cancelling deadline timers.
bool CancelConnectInternalStateLocked(ConnectionState* connection_state)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(connection_state->mu);
class TimerClosure;
EventEngine::TaskHandle RunAfterInternal(Duration when,
absl::AnyInvocable<void()> cb);
grpc_core::Mutex mu_;
TaskHandleSet known_handles_ ABSL_GUARDED_BY(mu_);
grpc_core::Mutex task_mu_;
TaskHandleSet known_handles_ ABSL_GUARDED_BY(task_mu_);
grpc_core::Mutex connection_mu_ ABSL_ACQUIRED_AFTER(ConnectionState::mu);
grpc_core::CondVar connection_cv_;
ConnectionHandleSet known_connection_handles_ ABSL_GUARDED_BY(connection_mu_);
std::atomic<intptr_t> aba_token_{0};
std::shared_ptr<ThreadPool> executor_;
IOCP iocp_;
TimerManager timer_manager_;
IOCPWorkClosure iocp_worker_;
};
} // namespace experimental

@ -502,6 +502,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/event_engine/utils.cc',
'src/core/lib/event_engine/windows/iocp.cc',
'src/core/lib/event_engine/windows/win_socket.cc',
'src/core/lib/event_engine/windows/windows_endpoint.cc',
'src/core/lib/event_engine/windows/windows_engine.cc',
'src/core/lib/experiments/config.cc',
'src/core/lib/experiments/experiments.cc',

@ -71,6 +71,7 @@ grpc_cc_library(
"//:httpcli",
"//:sockaddr_utils",
"//src/core:channel_args",
"//src/core:channel_args_endpoint_config",
"//src/core:channel_args_preconditioning",
"//src/core:closure",
"//src/core:error",

@ -72,8 +72,8 @@ grpc_cc_test(
uses_polling = False,
deps = [
"//:gpr_platform",
"//:grpc",
"//src/core:channel_args",
"//src/core:channel_args_endpoint_config",
],
)

@ -136,6 +136,7 @@ grpc_cc_library(
"//:ref_counted_ptr",
"//:tsi_ssl_credentials",
"//:uri_parser",
"//src/core:channel_args_endpoint_config",
"//src/core:channel_args_preconditioning",
"//src/core:closure",
"//src/core:error",
@ -176,6 +177,7 @@ grpc_cc_library(
"//:orphanable",
"//:ref_counted_ptr",
"//:uri_parser",
"//src/core:channel_args_endpoint_config",
"//src/core:channel_args_preconditioning",
"//src/core:closure",
"//src/core:error",

@ -2079,6 +2079,8 @@ src/core/lib/event_engine/windows/iocp.cc \
src/core/lib/event_engine/windows/iocp.h \
src/core/lib/event_engine/windows/win_socket.cc \
src/core/lib/event_engine/windows/win_socket.h \
src/core/lib/event_engine/windows/windows_endpoint.cc \
src/core/lib/event_engine/windows/windows_endpoint.h \
src/core/lib/event_engine/windows/windows_engine.cc \
src/core/lib/event_engine/windows/windows_engine.h \
src/core/lib/experiments/config.cc \

@ -1858,6 +1858,8 @@ src/core/lib/event_engine/windows/iocp.cc \
src/core/lib/event_engine/windows/iocp.h \
src/core/lib/event_engine/windows/win_socket.cc \
src/core/lib/event_engine/windows/win_socket.h \
src/core/lib/event_engine/windows/windows_endpoint.cc \
src/core/lib/event_engine/windows/windows_endpoint.h \
src/core/lib/event_engine/windows/windows_engine.cc \
src/core/lib/event_engine/windows/windows_engine.h \
src/core/lib/experiments/config.cc \

Loading…
Cancel
Save