diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9488b1f670b..40e02897cb7 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -2266,6 +2266,7 @@ add_library(grpc
src/core/lib/event_engine/time_util.cc
src/core/lib/event_engine/trace.cc
src/core/lib/event_engine/utils.cc
+ src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
src/core/lib/event_engine/windows/iocp.cc
src/core/lib/event_engine/windows/win_socket.cc
src/core/lib/event_engine/windows/windows_endpoint.cc
@@ -2991,6 +2992,7 @@ add_library(grpc_unsecure
src/core/lib/event_engine/time_util.cc
src/core/lib/event_engine/trace.cc
src/core/lib/event_engine/utils.cc
+ src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
src/core/lib/event_engine/windows/iocp.cc
src/core/lib/event_engine/windows/win_socket.cc
src/core/lib/event_engine/windows/windows_endpoint.cc
@@ -4949,6 +4951,7 @@ add_library(grpc_authorization_provider
src/core/lib/event_engine/time_util.cc
src/core/lib/event_engine/trace.cc
src/core/lib/event_engine/utils.cc
+ src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
src/core/lib/event_engine/windows/iocp.cc
src/core/lib/event_engine/windows/win_socket.cc
src/core/lib/event_engine/windows/windows_endpoint.cc
@@ -18101,6 +18104,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
test/core/event_engine/test_suite/tests/timer_test.cc
test/core/util/fake_udp_and_tcp_server.cc
test/cpp/util/get_grpc_test_runfile_dir.cc
+ test/cpp/util/windows/manifest_file.cc
)
target_compile_features(posix_event_engine_test PUBLIC cxx_std_14)
target_include_directories(posix_event_engine_test
@@ -24368,6 +24372,7 @@ add_executable(test_core_transport_chaotic_good_frame_test
src/core/lib/event_engine/time_util.cc
src/core/lib/event_engine/trace.cc
src/core/lib/event_engine/utils.cc
+ src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
src/core/lib/event_engine/windows/iocp.cc
src/core/lib/event_engine/windows/win_socket.cc
src/core/lib/event_engine/windows/windows_endpoint.cc
diff --git a/Makefile b/Makefile
index 384c374fad8..83d99097479 100644
--- a/Makefile
+++ b/Makefile
@@ -1474,6 +1474,7 @@ LIBGRPC_SRC = \
src/core/lib/event_engine/time_util.cc \
src/core/lib/event_engine/trace.cc \
src/core/lib/event_engine/utils.cc \
+ src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc \
src/core/lib/event_engine/windows/iocp.cc \
src/core/lib/event_engine/windows/win_socket.cc \
src/core/lib/event_engine/windows/windows_endpoint.cc \
@@ -2051,6 +2052,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/event_engine/time_util.cc \
src/core/lib/event_engine/trace.cc \
src/core/lib/event_engine/utils.cc \
+ src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc \
src/core/lib/event_engine/windows/iocp.cc \
src/core/lib/event_engine/windows/win_socket.cc \
src/core/lib/event_engine/windows/windows_endpoint.cc \
diff --git a/Package.swift b/Package.swift
index 0296a882cb5..b9d7ac6e773 100644
--- a/Package.swift
+++ b/Package.swift
@@ -1164,6 +1164,8 @@ let package = Package(
"src/core/lib/event_engine/trace.h",
"src/core/lib/event_engine/utils.cc",
"src/core/lib/event_engine/utils.h",
+ "src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc",
+ "src/core/lib/event_engine/windows/grpc_polled_fd_windows.h",
"src/core/lib/event_engine/windows/iocp.cc",
"src/core/lib/event_engine/windows/iocp.h",
"src/core/lib/event_engine/windows/win_socket.cc",
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index d976f2f2c1f..373d26d111b 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -738,6 +738,7 @@ libs:
- src/core/lib/event_engine/time_util.h
- src/core/lib/event_engine/trace.h
- src/core/lib/event_engine/utils.h
+ - src/core/lib/event_engine/windows/grpc_polled_fd_windows.h
- src/core/lib/event_engine/windows/iocp.h
- src/core/lib/event_engine/windows/win_socket.h
- src/core/lib/event_engine/windows/windows_endpoint.h
@@ -1560,6 +1561,7 @@ libs:
- src/core/lib/event_engine/time_util.cc
- src/core/lib/event_engine/trace.cc
- src/core/lib/event_engine/utils.cc
+ - src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
- src/core/lib/event_engine/windows/iocp.cc
- src/core/lib/event_engine/windows/win_socket.cc
- src/core/lib/event_engine/windows/windows_endpoint.cc
@@ -2170,6 +2172,7 @@ libs:
- src/core/lib/event_engine/time_util.h
- src/core/lib/event_engine/trace.h
- src/core/lib/event_engine/utils.h
+ - src/core/lib/event_engine/windows/grpc_polled_fd_windows.h
- src/core/lib/event_engine/windows/iocp.h
- src/core/lib/event_engine/windows/win_socket.h
- src/core/lib/event_engine/windows/windows_endpoint.h
@@ -2605,6 +2608,7 @@ libs:
- src/core/lib/event_engine/time_util.cc
- src/core/lib/event_engine/trace.cc
- src/core/lib/event_engine/utils.cc
+ - src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
- src/core/lib/event_engine/windows/iocp.cc
- src/core/lib/event_engine/windows/win_socket.cc
- src/core/lib/event_engine/windows/windows_endpoint.cc
@@ -4200,6 +4204,7 @@ libs:
- src/core/lib/event_engine/time_util.h
- src/core/lib/event_engine/trace.h
- src/core/lib/event_engine/utils.h
+ - src/core/lib/event_engine/windows/grpc_polled_fd_windows.h
- src/core/lib/event_engine/windows/iocp.h
- src/core/lib/event_engine/windows/win_socket.h
- src/core/lib/event_engine/windows/windows_endpoint.h
@@ -4511,6 +4516,7 @@ libs:
- src/core/lib/event_engine/time_util.cc
- src/core/lib/event_engine/trace.cc
- src/core/lib/event_engine/utils.cc
+ - src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
- src/core/lib/event_engine/windows/iocp.cc
- src/core/lib/event_engine/windows/win_socket.cc
- src/core/lib/event_engine/windows/windows_endpoint.cc
@@ -12135,6 +12141,7 @@ targets:
- test/core/event_engine/test_suite/tests/timer_test.h
- test/core/util/fake_udp_and_tcp_server.h
- test/cpp/util/get_grpc_test_runfile_dir.h
+ - test/cpp/util/windows/manifest_file.h
src:
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/test_suite/event_engine_test_framework.cc
@@ -12146,6 +12153,7 @@ targets:
- test/core/event_engine/test_suite/tests/timer_test.cc
- test/core/util/fake_udp_and_tcp_server.cc
- test/cpp/util/get_grpc_test_runfile_dir.cc
+ - test/cpp/util/windows/manifest_file.cc
deps:
- gtest
- grpc++_test_util
@@ -15936,6 +15944,7 @@ targets:
- src/core/lib/event_engine/time_util.h
- src/core/lib/event_engine/trace.h
- src/core/lib/event_engine/utils.h
+ - src/core/lib/event_engine/windows/grpc_polled_fd_windows.h
- src/core/lib/event_engine/windows/iocp.h
- src/core/lib/event_engine/windows/win_socket.h
- src/core/lib/event_engine/windows/windows_endpoint.h
@@ -16228,6 +16237,7 @@ targets:
- src/core/lib/event_engine/time_util.cc
- src/core/lib/event_engine/trace.cc
- src/core/lib/event_engine/utils.cc
+ - src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
- src/core/lib/event_engine/windows/iocp.cc
- src/core/lib/event_engine/windows/win_socket.cc
- src/core/lib/event_engine/windows/windows_endpoint.cc
diff --git a/config.m4 b/config.m4
index 2b0b5814e1e..34445d3ba06 100644
--- a/config.m4
+++ b/config.m4
@@ -565,6 +565,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/event_engine/time_util.cc \
src/core/lib/event_engine/trace.cc \
src/core/lib/event_engine/utils.cc \
+ src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc \
src/core/lib/event_engine/windows/iocp.cc \
src/core/lib/event_engine/windows/win_socket.cc \
src/core/lib/event_engine/windows/windows_endpoint.cc \
diff --git a/config.w32 b/config.w32
index 87fca3a97d7..bcc0cb68f50 100644
--- a/config.w32
+++ b/config.w32
@@ -530,6 +530,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\event_engine\\time_util.cc " +
"src\\core\\lib\\event_engine\\trace.cc " +
"src\\core\\lib\\event_engine\\utils.cc " +
+ "src\\core\\lib\\event_engine\\windows\\grpc_polled_fd_windows.cc " +
"src\\core\\lib\\event_engine\\windows\\iocp.cc " +
"src\\core\\lib\\event_engine\\windows\\win_socket.cc " +
"src\\core\\lib\\event_engine\\windows\\windows_endpoint.cc " +
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 38b01e92c86..8277f6d4541 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -813,6 +813,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/time_util.h',
'src/core/lib/event_engine/trace.h',
'src/core/lib/event_engine/utils.h',
+ 'src/core/lib/event_engine/windows/grpc_polled_fd_windows.h',
'src/core/lib/event_engine/windows/iocp.h',
'src/core/lib/event_engine/windows/win_socket.h',
'src/core/lib/event_engine/windows/windows_endpoint.h',
@@ -1890,6 +1891,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/time_util.h',
'src/core/lib/event_engine/trace.h',
'src/core/lib/event_engine/utils.h',
+ 'src/core/lib/event_engine/windows/grpc_polled_fd_windows.h',
'src/core/lib/event_engine/windows/iocp.h',
'src/core/lib/event_engine/windows/win_socket.h',
'src/core/lib/event_engine/windows/windows_endpoint.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 0291f72a5ee..f7eff7c0348 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -1267,6 +1267,8 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/trace.h',
'src/core/lib/event_engine/utils.cc',
'src/core/lib/event_engine/utils.h',
+ 'src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc',
+ 'src/core/lib/event_engine/windows/grpc_polled_fd_windows.h',
'src/core/lib/event_engine/windows/iocp.cc',
'src/core/lib/event_engine/windows/iocp.h',
'src/core/lib/event_engine/windows/win_socket.cc',
@@ -2650,6 +2652,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/time_util.h',
'src/core/lib/event_engine/trace.h',
'src/core/lib/event_engine/utils.h',
+ 'src/core/lib/event_engine/windows/grpc_polled_fd_windows.h',
'src/core/lib/event_engine/windows/iocp.h',
'src/core/lib/event_engine/windows/win_socket.h',
'src/core/lib/event_engine/windows/windows_endpoint.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index ae972119229..6ab68794256 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -1170,6 +1170,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/trace.h )
s.files += %w( src/core/lib/event_engine/utils.cc )
s.files += %w( src/core/lib/event_engine/utils.h )
+ s.files += %w( src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc )
+ s.files += %w( src/core/lib/event_engine/windows/grpc_polled_fd_windows.h )
s.files += %w( src/core/lib/event_engine/windows/iocp.cc )
s.files += %w( src/core/lib/event_engine/windows/iocp.h )
s.files += %w( src/core/lib/event_engine/windows/win_socket.cc )
diff --git a/grpc.gyp b/grpc.gyp
index 5a0c623053e..f84cabfd6d2 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -792,6 +792,7 @@
'src/core/lib/event_engine/time_util.cc',
'src/core/lib/event_engine/trace.cc',
'src/core/lib/event_engine/utils.cc',
+ 'src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc',
'src/core/lib/event_engine/windows/iocp.cc',
'src/core/lib/event_engine/windows/win_socket.cc',
'src/core/lib/event_engine/windows/windows_endpoint.cc',
@@ -1310,6 +1311,7 @@
'src/core/lib/event_engine/time_util.cc',
'src/core/lib/event_engine/trace.cc',
'src/core/lib/event_engine/utils.cc',
+ 'src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc',
'src/core/lib/event_engine/windows/iocp.cc',
'src/core/lib/event_engine/windows/win_socket.cc',
'src/core/lib/event_engine/windows/windows_endpoint.cc',
@@ -2059,6 +2061,7 @@
'src/core/lib/event_engine/time_util.cc',
'src/core/lib/event_engine/trace.cc',
'src/core/lib/event_engine/utils.cc',
+ 'src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc',
'src/core/lib/event_engine/windows/iocp.cc',
'src/core/lib/event_engine/windows/win_socket.cc',
'src/core/lib/event_engine/windows/windows_endpoint.cc',
diff --git a/package.xml b/package.xml
index fbf8f025506..75636d930ee 100644
--- a/package.xml
+++ b/package.xml
@@ -1152,6 +1152,8 @@
+
+
diff --git a/src/core/BUILD b/src/core/BUILD
index 1f32636fa2b..6acf3ffe99d 100644
--- a/src/core/BUILD
+++ b/src/core/BUILD
@@ -2116,6 +2116,7 @@ grpc_cc_library(
"absl/strings",
],
deps = [
+ "ares_resolver",
"channel_args_endpoint_config",
"common_event_engine_closures",
"error",
@@ -2125,6 +2126,7 @@ grpc_cc_library(
"event_engine_trace",
"event_engine_utils",
"init_internally",
+ "iomgr_port",
"posix_event_engine_timer_manager",
"time",
"windows_endpoint",
@@ -2419,12 +2421,14 @@ grpc_cc_library(
name = "ares_resolver",
srcs = [
"lib/event_engine/ares_resolver.cc",
+ "lib/event_engine/windows/grpc_polled_fd_windows.cc",
],
hdrs = [
"lib/event_engine/ares_resolver.h",
"lib/event_engine/grpc_polled_fd.h",
"lib/event_engine/nameser.h",
"lib/event_engine/posix_engine/grpc_polled_fd_posix.h",
+ "lib/event_engine/windows/grpc_polled_fd_windows.h",
],
external_deps = [
"absl/base:core_headers",
@@ -2440,6 +2444,7 @@ grpc_cc_library(
"cares",
],
deps = [
+ "common_event_engine_closures",
"error",
"event_engine_time_util",
"grpc_sockaddr",
@@ -2448,6 +2453,8 @@ grpc_cc_library(
"posix_event_engine_event_poller",
"posix_event_engine_tcp_socket_utils",
"resolved_address",
+ "slice",
+ "windows_iocp",
"//:debug_location",
"//:event_engine_base_hdrs",
"//:gpr",
diff --git a/src/core/lib/event_engine/ares_resolver.cc b/src/core/lib/event_engine/ares_resolver.cc
index 08ab6185a35..d88f0a9c245 100644
--- a/src/core/lib/event_engine/ares_resolver.cc
+++ b/src/core/lib/event_engine/ares_resolver.cc
@@ -62,10 +62,12 @@
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
+#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/grpc_polled_fd.h"
#include "src/core/lib/event_engine/time_util.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/host_port.h"
+#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
@@ -188,6 +190,18 @@ AresResolver::CreateAresResolver(
std::move(polled_fd_factory), std::move(event_engine), channel);
}
+AresResolver::AresResolver(
+ std::unique_ptr polled_fd_factory,
+ std::shared_ptr event_engine, ares_channel channel)
+ : grpc_core::InternallyRefCounted(
+ GRPC_TRACE_FLAG_ENABLED(grpc_trace_ares_resolver) ? "AresResolver"
+ : nullptr),
+ channel_(channel),
+ polled_fd_factory_(std::move(polled_fd_factory)),
+ event_engine_(std::move(event_engine)) {
+ polled_fd_factory_->Initialize(&mutex_, event_engine_.get());
+}
+
AresResolver::~AresResolver() {
GPR_ASSERT(fd_node_list_.empty());
GPR_ASSERT(callback_map_.empty());
@@ -206,8 +220,8 @@ void AresResolver::Orphan() {
if (!fd_node->already_shutdown) {
GRPC_ARES_RESOLVER_TRACE_LOG("resolver: %p shutdown fd: %s", this,
fd_node->polled_fd->GetName());
- fd_node->polled_fd->ShutdownLocked(
- absl::CancelledError("AresResolver::Orphan"));
+ GPR_ASSERT(fd_node->polled_fd->ShutdownLocked(
+ absl::CancelledError("AresResolver::Orphan")));
fd_node->already_shutdown = true;
}
}
@@ -339,20 +353,10 @@ void AresResolver::LookupTXT(
MaybeStartTimerLocked();
}
-AresResolver::AresResolver(
- std::unique_ptr polled_fd_factory,
- std::shared_ptr event_engine, ares_channel channel)
- : grpc_core::InternallyRefCounted(
- GRPC_TRACE_FLAG_ENABLED(grpc_trace_ares_resolver) ? "AresResolver"
- : nullptr),
- channel_(channel),
- polled_fd_factory_(std::move(polled_fd_factory)),
- event_engine_(std::move(event_engine)) {}
-
void AresResolver::CheckSocketsLocked() {
FdNodeList new_list;
if (!shutting_down_) {
- ares_socket_t socks[ARES_GETSOCK_MAXNUM];
+ ares_socket_t socks[ARES_GETSOCK_MAXNUM] = {};
int socks_bitmask = ares_getsock(channel_, socks, ARES_GETSOCK_MAXNUM);
for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
@@ -415,13 +419,17 @@ void AresResolver::CheckSocketsLocked() {
// Any remaining fds in fd_node_list_ were not returned by ares_getsock()
// and are therefore no longer in use, so they can be shut down and removed
// from the list.
+ // TODO(yijiem): Since we are keeping the underlying socket opened for both
+ // Posix and Windows, it might be reasonable to also keep the FdNodes alive
+ // till the end. But we need to change the state management of FdNodes in this
+ // file. This may simplify the code a bit.
while (!fd_node_list_.empty()) {
FdNode* fd_node = fd_node_list_.front().get();
if (!fd_node->already_shutdown) {
GRPC_ARES_RESOLVER_TRACE_LOG("resolver: %p shutdown fd: %s", this,
fd_node->polled_fd->GetName());
- fd_node->polled_fd->ShutdownLocked(absl::OkStatus());
- fd_node->already_shutdown = true;
+ fd_node->already_shutdown =
+ fd_node->polled_fd->ShutdownLocked(absl::OkStatus());
}
if (!fd_node->readable_registered && !fd_node->writable_registered) {
GRPC_ARES_RESOLVER_TRACE_LOG("resolver: %p delete fd: %s", this,
diff --git a/src/core/lib/event_engine/ares_resolver.h b/src/core/lib/event_engine/ares_resolver.h
index d90f13d8a8f..df394ca69dc 100644
--- a/src/core/lib/event_engine/ares_resolver.h
+++ b/src/core/lib/event_engine/ares_resolver.h
@@ -16,6 +16,8 @@
#include
+#include
+
#include "src/core/lib/debug/trace.h"
#if GRPC_ARES == 1
@@ -87,8 +89,8 @@ class AresResolver : public grpc_core::InternallyRefCounted {
// close the socket (possibly through ares_destroy).
struct FdNode {
FdNode() = default;
- FdNode(ares_socket_t as, GrpcPolledFd* polled_fd)
- : as(as), polled_fd(polled_fd) {}
+ FdNode(ares_socket_t as, std::unique_ptr pf)
+ : as(as), polled_fd(std::move(pf)) {}
ares_socket_t as;
std::unique_ptr polled_fd;
// true if the readable closure has been registered
diff --git a/src/core/lib/event_engine/grpc_polled_fd.h b/src/core/lib/event_engine/grpc_polled_fd.h
index bb66089d5ad..463a403dc81 100644
--- a/src/core/lib/event_engine/grpc_polled_fd.h
+++ b/src/core/lib/event_engine/grpc_polled_fd.h
@@ -17,6 +17,10 @@
#include
+#include
+
+#include
+
#if GRPC_ARES == 1
#include
@@ -24,7 +28,7 @@
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
-#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/gprpp/sync.h"
namespace grpc_event_engine {
namespace experimental {
@@ -46,8 +50,17 @@ class GrpcPolledFd {
// Indicates if there is data left even after just being read from
virtual bool IsFdStillReadableLocked() = 0;
// Called once and only once. Must cause cancellation of any pending
- // read/write callbacks.
- virtual void ShutdownLocked(grpc_error_handle error) = 0;
+ // read/write callbacks. Return true when the Shutdown is confirmed, false
+ // otherwise.
+ //
+ // TODO(yijiem): On Posix, ShutdownLocked will always succeed. On Windows,
+ // ShutdownLocked only succeeds when error is Cancelled. We could remove these
+ // requirements if we changed the FdNode lifetime model so that:
+ // 1. FdNodes and their underlying socket handles remain alive for
+ // the lifetime of the resolver.
+ // 2. Orphaning the resolver triggers shutdown and subsequent cleanup for
+ // all FdNodes and socket handles.
+ GRPC_MUST_USE_RESULT virtual bool ShutdownLocked(absl::Status error) = 0;
// Get the underlying ares_socket_t that this was created from
virtual ares_socket_t GetWrappedAresSocketLocked() = 0;
// A unique name, for logging
@@ -60,8 +73,14 @@ class GrpcPolledFd {
class GrpcPolledFdFactory {
public:
virtual ~GrpcPolledFdFactory() {}
+ // Optionally initializes the GrpcPolledFdFactory with a grpc_core::Mutex*
+ // for synchronization between the AresResolver and the GrpcPolledFds. The
+ // Windows implementation overrides this.
+ virtual void Initialize(grpc_core::Mutex* mutex,
+ EventEngine* event_engine) = 0;
// Creates a new wrapped fd for the current platform
- virtual GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as) = 0;
+ virtual std::unique_ptr NewGrpcPolledFdLocked(
+ ares_socket_t as) = 0;
// Optionally configures the ares channel after creation
virtual void ConfigureAresChannelLocked(ares_channel channel) = 0;
};
diff --git a/src/core/lib/event_engine/posix_engine/grpc_polled_fd_posix.h b/src/core/lib/event_engine/posix_engine/grpc_polled_fd_posix.h
index c6eed183233..3feea096e84 100644
--- a/src/core/lib/event_engine/posix_engine/grpc_polled_fd_posix.h
+++ b/src/core/lib/event_engine/posix_engine/grpc_polled_fd_posix.h
@@ -17,6 +17,11 @@
#include
+#include
+
+#include
+
+#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/port.h"
#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)
@@ -42,7 +47,6 @@
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
-#include "src/core/lib/iomgr/error.h"
namespace grpc_event_engine {
namespace experimental {
@@ -80,8 +84,9 @@ class GrpcPolledFdPosix : public GrpcPolledFd {
bytes_available > 0;
}
- void ShutdownLocked(grpc_error_handle error) override {
+ bool ShutdownLocked(absl::Status error) override {
handle_->ShutdownHandle(error);
+ return true;
}
ares_socket_t GetWrappedAresSocketLocked() override { return as_; }
@@ -105,9 +110,12 @@ class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory {
}
}
- GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as) override {
+ void Initialize(grpc_core::Mutex*, EventEngine*) override {}
+
+ std::unique_ptr NewGrpcPolledFdLocked(
+ ares_socket_t as) override {
owned_fds_.insert(as);
- return new GrpcPolledFdPosix(
+ return std::make_unique(
as,
poller_->CreateHandle(as, "c-ares socket", poller_->CanTrackErrors()));
}
diff --git a/src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc b/src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
new file mode 100644
index 00000000000..7a8992404a6
--- /dev/null
+++ b/src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
@@ -0,0 +1,823 @@
+// Copyright 2023 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include
+
+#include "src/core/lib/iomgr/port.h" // IWYU pragma: keep
+
+#if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
+
+#include
+
+#include
+
+#include "absl/functional/any_invocable.h"
+#include "absl/status/status.h"
+
+#include
+
+#include "src/core/lib/address_utils/sockaddr_utils.h"
+#include "src/core/lib/event_engine/ares_resolver.h"
+#include "src/core/lib/event_engine/grpc_polled_fd.h"
+#include "src/core/lib/event_engine/windows/grpc_polled_fd_windows.h"
+#include "src/core/lib/event_engine/windows/win_socket.h"
+#include "src/core/lib/gprpp/debug_location.h"
+#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/slice/slice.h"
+
+// TODO(apolcyn): remove this hack after fixing upstream.
+// Our grpc/c-ares code on Windows uses the ares_set_socket_functions API,
+// which uses "struct iovec" type, which on Windows is defined inside of
+// a c-ares header that is not public.
+// See https://github.com/c-ares/c-ares/issues/206.
+struct iovec {
+ void* iov_base;
+ size_t iov_len;
+};
+
+namespace grpc_event_engine {
+namespace experimental {
+namespace {
+
+constexpr int kRecvFromSourceAddrSize = 200;
+constexpr int kReadBufferSize = 4192;
+
+grpc_slice FlattenIovec(const struct iovec* iov, int iov_count) {
+ int total = 0;
+ for (int i = 0; i < iov_count; i++) {
+ total += iov[i].iov_len;
+ }
+ grpc_slice out = GRPC_SLICE_MALLOC(total);
+ size_t cur = 0;
+ for (int i = 0; i < iov_count; i++) {
+ for (size_t k = 0; k < iov[i].iov_len; k++) {
+ GRPC_SLICE_START_PTR(out)
+ [cur++] = (static_cast(iov[i].iov_base))[k];
+ }
+ }
+ return out;
+}
+
+} // namespace
+
+// c-ares reads and takes action on the error codes of the
+// "virtual socket operations" in this file, via the WSAGetLastError
+// APIs. If code in this file wants to set a specific WSA error that
+// c-ares should read, it must do so by calling SetWSAError() on the
+// WSAErrorContext instance passed to it. A WSAErrorContext must only be
+// instantiated at the top of the virtual socket function callstack.
+class WSAErrorContext {
+ public:
+ explicit WSAErrorContext(){};
+
+ ~WSAErrorContext() {
+ if (error_ != 0) {
+ WSASetLastError(error_);
+ }
+ }
+
+ // Disallow copy and assignment operators
+ WSAErrorContext(const WSAErrorContext&) = delete;
+ WSAErrorContext& operator=(const WSAErrorContext&) = delete;
+
+ void SetWSAError(int error) { error_ = error; }
+
+ private:
+ int error_ = 0;
+};
+
+// c-ares creates its own sockets and is meant to read them when readable and
+// write them when writeable. To fit this socket usage model into the grpc
+// windows poller (which gives notifications when attempted reads and writes
+// are actually fulfilled rather than possible), this GrpcPolledFdWindows
+// class takes advantage of the ares_set_socket_functions API and acts as a
+// virtual socket. It holds its own read and write buffers which are written
+// to and read from c-ares and are used with the grpc windows poller, and it,
+// e.g., manufactures virtual socket error codes when it e.g. needs to tell
+// the c-ares library to wait for an async read.
+class GrpcPolledFdWindows : public GrpcPolledFd {
+ public:
+ GrpcPolledFdWindows(std::unique_ptr winsocket,
+ grpc_core::Mutex* mu, int address_family, int socket_type,
+ EventEngine* event_engine)
+ : name_(absl::StrFormat("c-ares socket: %" PRIdPTR,
+ winsocket->raw_socket())),
+ address_family_(address_family),
+ socket_type_(socket_type),
+ mu_(mu),
+ winsocket_(std::move(winsocket)),
+ read_buf_(grpc_empty_slice()),
+ write_buf_(grpc_empty_slice()),
+ outer_read_closure_([this]() { OnIocpReadable(); }),
+ outer_write_closure_([this]() { OnIocpWriteable(); }),
+ on_tcp_connect_locked_([this]() { OnTcpConnect(); }),
+ event_engine_(event_engine) {}
+
+ ~GrpcPolledFdWindows() override {
+ GRPC_ARES_RESOLVER_TRACE_LOG(
+ "fd:|%s| ~GrpcPolledFdWindows shutdown_called_: %d ", GetName(),
+ shutdown_called_);
+ grpc_core::CSliceUnref(read_buf_);
+ grpc_core::CSliceUnref(write_buf_);
+ GPR_ASSERT(read_closure_ == nullptr);
+ GPR_ASSERT(write_closure_ == nullptr);
+ if (!shutdown_called_) {
+ winsocket_->Shutdown(DEBUG_LOCATION, "~GrpcPolledFdWindows");
+ }
+ }
+
+ void RegisterForOnReadableLocked(
+ absl::AnyInvocable read_closure) override {
+ GPR_ASSERT(read_closure_ == nullptr);
+ read_closure_ = std::move(read_closure);
+ grpc_core::CSliceUnref(read_buf_);
+ GPR_ASSERT(!read_buf_has_data_);
+ read_buf_ = GRPC_SLICE_MALLOC(kReadBufferSize);
+ if (connect_done_) {
+ ContinueRegisterForOnReadableLocked();
+ } else {
+ GPR_ASSERT(pending_continue_register_for_on_readable_locked_ == false);
+ pending_continue_register_for_on_readable_locked_ = true;
+ }
+ }
+
+ void RegisterForOnWriteableLocked(
+ absl::AnyInvocable write_closure) override {
+ if (socket_type_ == SOCK_DGRAM) {
+ GRPC_ARES_RESOLVER_TRACE_LOG(
+ "fd:|%s| RegisterForOnWriteableLocked called", GetName());
+ } else {
+ GPR_ASSERT(socket_type_ == SOCK_STREAM);
+ GRPC_ARES_RESOLVER_TRACE_LOG(
+ "fd:|%s| RegisterForOnWriteableLocked called tcp_write_state_: %d "
+ "connect_done_: %d",
+ GetName(), tcp_write_state_, connect_done_);
+ }
+ GPR_ASSERT(write_closure_ == nullptr);
+ write_closure_ = std::move(write_closure);
+ if (!connect_done_) {
+ GPR_ASSERT(!pending_continue_register_for_on_writeable_locked_);
+ pending_continue_register_for_on_writeable_locked_ = true;
+ } else {
+ ContinueRegisterForOnWriteableLocked();
+ }
+ }
+
+ bool IsFdStillReadableLocked() override { return read_buf_has_data_; }
+
+ bool ShutdownLocked(absl::Status error) override {
+ GPR_ASSERT(!shutdown_called_);
+ if (!absl::IsCancelled(error)) {
+ return false;
+ }
+ GRPC_ARES_RESOLVER_TRACE_LOG("fd:|%s| ShutdownLocked", GetName());
+ shutdown_called_ = true;
+ // The socket is disconnected and closed here since this is an external
+ // cancel request, e.g. a timeout. c-ares shouldn't do anything on the
+ // socket after this point except calling close which should then destroy
+ // the GrpcPolledFdWindows object.
+ winsocket_->Shutdown(DEBUG_LOCATION, "GrpcPolledFdWindows::ShutdownLocked");
+ return true;
+ }
+
+ ares_socket_t GetWrappedAresSocketLocked() override {
+ return winsocket_->raw_socket();
+ }
+
+ const char* GetName() const override { return name_.c_str(); }
+
+ ares_ssize_t RecvFrom(WSAErrorContext* wsa_error_ctx, void* data,
+ ares_socket_t data_len, int /* flags */,
+ struct sockaddr* from, ares_socklen_t* from_len) {
+ GRPC_ARES_RESOLVER_TRACE_LOG(
+ "fd:|%s| RecvFrom called read_buf_has_data:%d Current read buf "
+ "length:|%d|",
+ GetName(), read_buf_has_data_, GRPC_SLICE_LENGTH(read_buf_));
+ if (!read_buf_has_data_) {
+ wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
+ return -1;
+ }
+ ares_ssize_t bytes_read = 0;
+ for (size_t i = 0; i < GRPC_SLICE_LENGTH(read_buf_) && i < data_len; i++) {
+ (static_cast(data))[i] = GRPC_SLICE_START_PTR(read_buf_)[i];
+ bytes_read++;
+ }
+ read_buf_ = grpc_slice_sub_no_ref(read_buf_, bytes_read,
+ GRPC_SLICE_LENGTH(read_buf_));
+ if (GRPC_SLICE_LENGTH(read_buf_) == 0) {
+ read_buf_has_data_ = false;
+ }
+ // c-ares overloads this recv_from virtual socket function to receive
+ // data on both UDP and TCP sockets, and from is nullptr for TCP.
+ if (from != nullptr) {
+ GPR_ASSERT(*from_len <= recv_from_source_addr_len_);
+ memcpy(from, &recv_from_source_addr_, recv_from_source_addr_len_);
+ *from_len = recv_from_source_addr_len_;
+ }
+ return bytes_read;
+ }
+
+ ares_ssize_t SendV(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
+ int iov_count) {
+ GRPC_ARES_RESOLVER_TRACE_LOG(
+ "fd:|%s| SendV called connect_done_:%d wsa_connect_error_:%d",
+ GetName(), connect_done_, wsa_connect_error_);
+ if (!connect_done_) {
+ wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
+ return -1;
+ }
+ if (wsa_connect_error_ != 0) {
+ wsa_error_ctx->SetWSAError(wsa_connect_error_);
+ return -1;
+ }
+ switch (socket_type_) {
+ case SOCK_DGRAM:
+ return SendVUDP(wsa_error_ctx, iov, iov_count);
+ case SOCK_STREAM:
+ return SendVTCP(wsa_error_ctx, iov, iov_count);
+ default:
+ abort();
+ }
+ }
+
+ int Connect(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
+ ares_socklen_t target_len) {
+ switch (socket_type_) {
+ case SOCK_DGRAM:
+ return ConnectUDP(wsa_error_ctx, target, target_len);
+ case SOCK_STREAM:
+ return ConnectTCP(wsa_error_ctx, target, target_len);
+ default:
+ grpc_core::Crash(
+ absl::StrFormat("Unknown socket_type_: %d", socket_type_));
+ }
+ }
+
+ private:
+ enum WriteState {
+ WRITE_IDLE,
+ WRITE_REQUESTED,
+ WRITE_PENDING,
+ WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY,
+ };
+
+ void ScheduleAndNullReadClosure(absl::Status error) {
+ event_engine_->Run([read_closure = std::move(read_closure_),
+ error]() mutable { read_closure(error); });
+ read_closure_ = nullptr;
+ }
+
+ void ScheduleAndNullWriteClosure(absl::Status error) {
+ event_engine_->Run([write_closure = std::move(write_closure_),
+ error]() mutable { write_closure(error); });
+ write_closure_ = nullptr;
+ }
+
+ void ContinueRegisterForOnReadableLocked() {
+ GRPC_ARES_RESOLVER_TRACE_LOG(
+ "fd:|%s| ContinueRegisterForOnReadableLocked "
+ "wsa_connect_error_:%d",
+ GetName(), wsa_connect_error_);
+ GPR_ASSERT(connect_done_);
+ if (wsa_connect_error_ != 0) {
+ ScheduleAndNullReadClosure(GRPC_WSA_ERROR(wsa_connect_error_, "connect"));
+ return;
+ }
+ WSABUF buffer;
+ buffer.buf = reinterpret_cast(GRPC_SLICE_START_PTR(read_buf_));
+ buffer.len = GRPC_SLICE_LENGTH(read_buf_);
+ recv_from_source_addr_len_ = sizeof(recv_from_source_addr_);
+ DWORD flags = 0;
+ winsocket_->NotifyOnRead(&outer_read_closure_);
+ if (WSARecvFrom(winsocket_->raw_socket(), &buffer, 1, nullptr, &flags,
+ reinterpret_cast(recv_from_source_addr_),
+ &recv_from_source_addr_len_,
+ winsocket_->read_info()->overlapped(), nullptr) != 0) {
+ int wsa_last_error = WSAGetLastError();
+ char* msg = gpr_format_message(wsa_last_error);
+ GRPC_ARES_RESOLVER_TRACE_LOG(
+ "fd:|%s| ContinueRegisterForOnReadableLocked WSARecvFrom error "
+ "code:|%d| "
+ "msg:|%s|",
+ GetName(), wsa_last_error, msg);
+ gpr_free(msg);
+ if (wsa_last_error != WSA_IO_PENDING) {
+ winsocket_->UnregisterReadCallback();
+ ScheduleAndNullReadClosure(
+ GRPC_WSA_ERROR(wsa_last_error, "WSARecvFrom"));
+ return;
+ }
+ }
+ }
+
+ void ContinueRegisterForOnWriteableLocked() {
+ GRPC_ARES_RESOLVER_TRACE_LOG(
+ "fd:|%s| ContinueRegisterForOnWriteableLocked "
+ "wsa_connect_error_:%d",
+ GetName(), wsa_connect_error_);
+ GPR_ASSERT(connect_done_);
+ if (wsa_connect_error_ != 0) {
+ ScheduleAndNullWriteClosure(
+ GRPC_WSA_ERROR(wsa_connect_error_, "connect"));
+ return;
+ }
+ if (socket_type_ == SOCK_DGRAM) {
+ ScheduleAndNullWriteClosure(absl::OkStatus());
+ return;
+ }
+ GPR_ASSERT(socket_type_ == SOCK_STREAM);
+ int wsa_error_code = 0;
+ switch (tcp_write_state_) {
+ case WRITE_IDLE:
+ ScheduleAndNullWriteClosure(absl::OkStatus());
+ break;
+ case WRITE_REQUESTED:
+ tcp_write_state_ = WRITE_PENDING;
+ winsocket_->NotifyOnWrite(&outer_write_closure_);
+ if (SendWriteBuf(nullptr, winsocket_->write_info()->overlapped(),
+ &wsa_error_code) != 0) {
+ winsocket_->UnregisterWriteCallback();
+ ScheduleAndNullWriteClosure(
+ GRPC_WSA_ERROR(wsa_error_code, "WSASend (overlapped)"));
+ return;
+ }
+ break;
+ case WRITE_PENDING:
+ case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
+ grpc_core::Crash(
+ absl::StrFormat("Invalid tcp_write_state_: %d", tcp_write_state_));
+ }
+ }
+
+ int SendWriteBuf(LPDWORD bytes_sent_ptr, LPWSAOVERLAPPED overlapped,
+ int* wsa_error_code) {
+ WSABUF buf;
+ buf.len = GRPC_SLICE_LENGTH(write_buf_);
+ buf.buf = reinterpret_cast(GRPC_SLICE_START_PTR(write_buf_));
+ DWORD flags = 0;
+ int out = WSASend(winsocket_->raw_socket(), &buf, 1, bytes_sent_ptr, flags,
+ overlapped, nullptr);
+ *wsa_error_code = WSAGetLastError();
+ GRPC_ARES_RESOLVER_TRACE_LOG(
+ "fd:|%s| SendWriteBuf WSASend buf.len:%d *bytes_sent_ptr:%d "
+ "overlapped:%p "
+ "return:%d *wsa_error_code:%d",
+ GetName(), buf.len, bytes_sent_ptr != nullptr ? *bytes_sent_ptr : 0,
+ overlapped, out, *wsa_error_code);
+ return out;
+ }
+
+ ares_ssize_t SendVUDP(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
+ int iov_count) {
+ // c-ares doesn't handle retryable errors on writes of UDP sockets.
+ // Therefore, the sendv handler for UDP sockets must only attempt
+ // to write everything inline.
+ GRPC_ARES_RESOLVER_TRACE_LOG("fd:|%s| SendVUDP called", GetName());
+ GPR_ASSERT(GRPC_SLICE_LENGTH(write_buf_) == 0);
+ grpc_core::CSliceUnref(write_buf_);
+ write_buf_ = FlattenIovec(iov, iov_count);
+ DWORD bytes_sent = 0;
+ int wsa_error_code = 0;
+ if (SendWriteBuf(&bytes_sent, nullptr, &wsa_error_code) != 0) {
+ grpc_core::CSliceUnref(write_buf_);
+ write_buf_ = grpc_empty_slice();
+ wsa_error_ctx->SetWSAError(wsa_error_code);
+ char* msg = gpr_format_message(wsa_error_code);
+ GRPC_ARES_RESOLVER_TRACE_LOG(
+ "fd:|%s| SendVUDP SendWriteBuf error code:%d msg:|%s|", GetName(),
+ wsa_error_code, msg);
+ gpr_free(msg);
+ return -1;
+ }
+ write_buf_ = grpc_slice_sub_no_ref(write_buf_, bytes_sent,
+ GRPC_SLICE_LENGTH(write_buf_));
+ return bytes_sent;
+ }
+
+ ares_ssize_t SendVTCP(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
+ int iov_count) {
+ // The "sendv" handler on TCP sockets buffers up write
+ // requests and returns an artificial WSAEWOULDBLOCK. Writing that buffer
+ // out in the background, and making further send progress in general, will
+ // happen as long as c-ares continues to show interest in writeability on
+ // this fd.
+ GRPC_ARES_RESOLVER_TRACE_LOG("fd:|%s| SendVTCP called tcp_write_state_:%d",
+ GetName(), tcp_write_state_);
+ switch (tcp_write_state_) {
+ case WRITE_IDLE:
+ tcp_write_state_ = WRITE_REQUESTED;
+ grpc_core::CSliceUnref(write_buf_);
+ write_buf_ = FlattenIovec(iov, iov_count);
+ wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
+ return -1;
+ case WRITE_REQUESTED:
+ case WRITE_PENDING:
+ wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
+ return -1;
+ case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
+ // c-ares is retrying a send on data that we previously returned
+ // WSAEWOULDBLOCK for, but then subsequently wrote out in the
+ // background. Right now, we assume that c-ares is retrying the same
+ // send again. If c-ares still needs to send even more data, we'll get
+ // to it eventually.
+ grpc_slice currently_attempted = FlattenIovec(iov, iov_count);
+ GPR_ASSERT(GRPC_SLICE_LENGTH(currently_attempted) >=
+ GRPC_SLICE_LENGTH(write_buf_));
+ ares_ssize_t total_sent = 0;
+ for (size_t i = 0; i < GRPC_SLICE_LENGTH(write_buf_); i++) {
+ GPR_ASSERT(GRPC_SLICE_START_PTR(currently_attempted)[i] ==
+ GRPC_SLICE_START_PTR(write_buf_)[i]);
+ total_sent++;
+ }
+ grpc_core::CSliceUnref(currently_attempted);
+ tcp_write_state_ = WRITE_IDLE;
+ return total_sent;
+ }
+ grpc_core::Crash(
+ absl::StrFormat("Unknown tcp_write_state_: %d", tcp_write_state_));
+ }
+
+ void OnTcpConnect() {
+ grpc_core::MutexLock lock(mu_);
+ GRPC_ARES_RESOLVER_TRACE_LOG(
+ "fd:%s InnerOnTcpConnectLocked "
+ "pending_register_for_readable:%d"
+ " pending_register_for_writeable:%d",
+ GetName(), pending_continue_register_for_on_readable_locked_,
+ pending_continue_register_for_on_writeable_locked_);
+ GPR_ASSERT(!connect_done_);
+ connect_done_ = true;
+ GPR_ASSERT(wsa_connect_error_ == 0);
+ if (shutdown_called_) {
+ wsa_connect_error_ = WSA_OPERATION_ABORTED;
+ } else {
+ DWORD transferred_bytes = 0;
+ DWORD flags;
+ BOOL wsa_success = WSAGetOverlappedResult(
+ winsocket_->raw_socket(), winsocket_->write_info()->overlapped(),
+ &transferred_bytes, FALSE, &flags);
+ GPR_ASSERT(transferred_bytes == 0);
+ if (!wsa_success) {
+ wsa_connect_error_ = WSAGetLastError();
+ char* msg = gpr_format_message(wsa_connect_error_);
+ GRPC_ARES_RESOLVER_TRACE_LOG(
+ "fd:%s InnerOnTcpConnectLocked WSA overlapped result code:%d "
+ "msg:|%s|",
+ GetName(), wsa_connect_error_, msg);
+ gpr_free(msg);
+ }
+ }
+ if (pending_continue_register_for_on_readable_locked_) {
+ ContinueRegisterForOnReadableLocked();
+ }
+ if (pending_continue_register_for_on_writeable_locked_) {
+ ContinueRegisterForOnWriteableLocked();
+ }
+ }
+
+ int ConnectUDP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
+ ares_socklen_t target_len) {
+ GRPC_ARES_RESOLVER_TRACE_LOG("fd:%s ConnectUDP", GetName());
+ GPR_ASSERT(!connect_done_);
+ GPR_ASSERT(wsa_connect_error_ == 0);
+ SOCKET s = winsocket_->raw_socket();
+ int out =
+ WSAConnect(s, target, target_len, nullptr, nullptr, nullptr, nullptr);
+ wsa_connect_error_ = WSAGetLastError();
+ wsa_error_ctx->SetWSAError(wsa_connect_error_);
+ connect_done_ = true;
+ char* msg = gpr_format_message(wsa_connect_error_);
+ GRPC_ARES_RESOLVER_TRACE_LOG("fd:%s WSAConnect error code:|%d| msg:|%s|",
+ GetName(), wsa_connect_error_, msg);
+ gpr_free(msg);
+ // c-ares expects a posix-style connect API
+ return out == 0 ? 0 : -1;
+ }
+
+ int ConnectTCP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
+ ares_socklen_t target_len) {
+ GRPC_ARES_RESOLVER_TRACE_LOG("fd:%s ConnectTCP", GetName());
+ LPFN_CONNECTEX ConnectEx;
+ GUID guid = WSAID_CONNECTEX;
+ DWORD ioctl_num_bytes;
+ SOCKET s = winsocket_->raw_socket();
+ if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
+ &ConnectEx, sizeof(ConnectEx), &ioctl_num_bytes, nullptr,
+ nullptr) != 0) {
+ int wsa_last_error = WSAGetLastError();
+ wsa_error_ctx->SetWSAError(wsa_last_error);
+ char* msg = gpr_format_message(wsa_last_error);
+ GRPC_ARES_RESOLVER_TRACE_LOG(
+ "fd:%s WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:%d "
+ "msg:|%s|",
+ GetName(), wsa_last_error, msg);
+ gpr_free(msg);
+ connect_done_ = true;
+ wsa_connect_error_ = wsa_last_error;
+ return -1;
+ }
+ grpc_resolved_address wildcard4_addr;
+ grpc_resolved_address wildcard6_addr;
+ grpc_sockaddr_make_wildcards(0, &wildcard4_addr, &wildcard6_addr);
+ grpc_resolved_address* local_address = nullptr;
+ if (address_family_ == AF_INET) {
+ local_address = &wildcard4_addr;
+ } else {
+ local_address = &wildcard6_addr;
+ }
+ if (bind(s, reinterpret_cast(local_address->addr),
+ static_cast(local_address->len)) != 0) {
+ int wsa_last_error = WSAGetLastError();
+ wsa_error_ctx->SetWSAError(wsa_last_error);
+ char* msg = gpr_format_message(wsa_last_error);
+ GRPC_ARES_RESOLVER_TRACE_LOG("fd:%s bind error code:%d msg:|%s|",
+ GetName(), wsa_last_error, msg);
+ gpr_free(msg);
+ connect_done_ = true;
+ wsa_connect_error_ = wsa_last_error;
+ return -1;
+ }
+ int out = 0;
+ // Register an async OnTcpConnect callback here since it is required by the
+ // WinSocket API.
+ winsocket_->NotifyOnWrite(&on_tcp_connect_locked_);
+ if (ConnectEx(s, target, target_len, nullptr, 0, nullptr,
+ winsocket_->write_info()->overlapped()) == 0) {
+ out = -1;
+ int wsa_last_error = WSAGetLastError();
+ wsa_error_ctx->SetWSAError(wsa_last_error);
+ char* msg = gpr_format_message(wsa_last_error);
+ GRPC_ARES_RESOLVER_TRACE_LOG("fd:%s ConnectEx error code:%d msg:|%s|",
+ GetName(), wsa_last_error, msg);
+ gpr_free(msg);
+ if (wsa_last_error == WSA_IO_PENDING) {
+ // c-ares only understands WSAEINPROGRESS and EWOULDBLOCK error codes on
+ // connect, but an async connect on IOCP socket will give
+ // WSA_IO_PENDING, so we need to convert.
+ wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
+ } else {
+ winsocket_->UnregisterWriteCallback();
+ // By returning a non-retryable error to c-ares at this point,
+ // we're aborting the possibility of any future operations on this fd.
+ connect_done_ = true;
+ wsa_connect_error_ = wsa_last_error;
+ return -1;
+ }
+ }
+ return out;
+ }
+
+ // TODO(apolcyn): improve this error handling to be less conversative.
+ // An e.g. ECONNRESET error here should result in errors when
+ // c-ares reads from this socket later, but it shouldn't necessarily cancel
+ // the entire resolution attempt. Doing so will allow the "inject broken
+ // nameserver list" test to pass on Windows.
+ void OnIocpReadable() {
+ grpc_core::MutexLock lock(mu_);
+ absl::Status error;
+ if (winsocket_->read_info()->result().wsa_error != 0) {
+ // WSAEMSGSIZE would be due to receiving more data
+ // than our read buffer's fixed capacity. Assume that
+ // the connection is TCP and read the leftovers
+ // in subsequent c-ares reads.
+ if (winsocket_->read_info()->result().wsa_error != WSAEMSGSIZE) {
+ error = GRPC_WSA_ERROR(winsocket_->read_info()->result().wsa_error,
+ "OnIocpReadableInner");
+ GRPC_ARES_RESOLVER_TRACE_LOG(
+ "fd:|%s| OnIocpReadableInner winsocket_->read_info.wsa_error "
+ "code:|%d| msg:|%s|",
+ GetName(), winsocket_->read_info()->result().wsa_error,
+ grpc_core::StatusToString(error).c_str());
+ }
+ }
+ if (error.ok()) {
+ read_buf_ = grpc_slice_sub_no_ref(
+ read_buf_, 0, winsocket_->read_info()->result().bytes_transferred);
+ read_buf_has_data_ = true;
+ } else {
+ grpc_core::CSliceUnref(read_buf_);
+ read_buf_ = grpc_empty_slice();
+ }
+ GRPC_ARES_RESOLVER_TRACE_LOG(
+ "fd:|%s| OnIocpReadable finishing. read buf length now:|%d|", GetName(),
+ GRPC_SLICE_LENGTH(read_buf_));
+ ScheduleAndNullReadClosure(error);
+ }
+
+ void OnIocpWriteable() {
+ grpc_core::MutexLock lock(mu_);
+ GRPC_ARES_RESOLVER_TRACE_LOG("OnIocpWriteableInner. fd:|%s|", GetName());
+ GPR_ASSERT(socket_type_ == SOCK_STREAM);
+ absl::Status error;
+ if (winsocket_->write_info()->result().wsa_error != 0) {
+ error = GRPC_WSA_ERROR(winsocket_->write_info()->result().wsa_error,
+ "OnIocpWriteableInner");
+ GRPC_ARES_RESOLVER_TRACE_LOG(
+ "fd:|%s| OnIocpWriteableInner. winsocket_->write_info.wsa_error "
+ "code:|%d| msg:|%s|",
+ GetName(), winsocket_->write_info()->result().wsa_error,
+ grpc_core::StatusToString(error).c_str());
+ }
+ GPR_ASSERT(tcp_write_state_ == WRITE_PENDING);
+ if (error.ok()) {
+ tcp_write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY;
+ write_buf_ = grpc_slice_sub_no_ref(
+ write_buf_, 0, winsocket_->write_info()->result().bytes_transferred);
+ GRPC_ARES_RESOLVER_TRACE_LOG(
+ "fd:|%s| OnIocpWriteableInner. bytes transferred:%d", GetName(),
+ winsocket_->write_info()->result().bytes_transferred);
+ } else {
+ grpc_core::CSliceUnref(write_buf_);
+ write_buf_ = grpc_empty_slice();
+ }
+ ScheduleAndNullWriteClosure(error);
+ }
+
+ const std::string name_;
+ const int address_family_;
+ const int socket_type_;
+ grpc_core::Mutex* mu_;
+ std::unique_ptr winsocket_;
+ char recv_from_source_addr_[kRecvFromSourceAddrSize];
+ ares_socklen_t recv_from_source_addr_len_;
+ grpc_slice read_buf_;
+ bool read_buf_has_data_ = false;
+ grpc_slice write_buf_;
+ absl::AnyInvocable read_closure_;
+ absl::AnyInvocable write_closure_;
+ AnyInvocableClosure outer_read_closure_;
+ AnyInvocableClosure outer_write_closure_;
+ bool shutdown_called_ = false;
+ // State related to TCP sockets
+ AnyInvocableClosure on_tcp_connect_locked_;
+ bool connect_done_ = false;
+ int wsa_connect_error_ = 0;
+ WriteState tcp_write_state_ = WRITE_IDLE;
+ // We don't run register_for_{readable,writeable} logic until
+ // a socket is connected. In the interim, we queue readable/writeable
+ // registrations with the following state.
+ bool pending_continue_register_for_on_readable_locked_ = false;
+ bool pending_continue_register_for_on_writeable_locked_ = false;
+ // This pointer is initialized from the stored pointer inside the shared
+ // pointer owned by the AresResolver and should be valid at the time of use.
+ EventEngine* event_engine_;
+};
+
+// These virtual socket functions are called from within the c-ares
+// library. These methods generally dispatch those socket calls to the
+// appropriate methods. The virtual "socket" and "close" methods are
+// special and instead create/add and remove/destroy GrpcPolledFdWindows
+// objects.
+class CustomSockFuncs {
+ public:
+ static ares_socket_t Socket(int af, int type, int protocol, void* user_data) {
+ if (type != SOCK_DGRAM && type != SOCK_STREAM) {
+ GRPC_ARES_RESOLVER_TRACE_LOG("Socket called with invalid socket type:%d",
+ type);
+ return INVALID_SOCKET;
+ }
+ GrpcPolledFdFactoryWindows* self =
+ static_cast(user_data);
+ SOCKET s = WSASocket(af, type, protocol, nullptr, 0,
+ IOCP::GetDefaultSocketFlags());
+ if (s == INVALID_SOCKET) {
+ GRPC_ARES_RESOLVER_TRACE_LOG(
+ "WSASocket failed with params af:%d type:%d protocol:%d", af, type,
+ protocol);
+ return INVALID_SOCKET;
+ }
+ if (type == SOCK_STREAM) {
+ absl::Status error = PrepareSocket(s);
+ if (!error.ok()) {
+ GRPC_ARES_RESOLVER_TRACE_LOG("WSAIoctl failed with error: %s",
+ grpc_core::StatusToString(error).c_str());
+ return INVALID_SOCKET;
+ }
+ }
+ auto polled_fd = std::make_unique(
+ self->iocp_->Watch(s), self->mu_, af, type, self->event_engine_);
+ GRPC_ARES_RESOLVER_TRACE_LOG(
+ "fd:|%s| created with params af:%d type:%d protocol:%d",
+ polled_fd->GetName(), af, type, protocol);
+ GPR_ASSERT(self->sockets_.insert({s, std::move(polled_fd)}).second);
+ return s;
+ }
+
+ static int Connect(ares_socket_t as, const struct sockaddr* target,
+ ares_socklen_t target_len, void* user_data) {
+ WSAErrorContext wsa_error_ctx;
+ GrpcPolledFdFactoryWindows* self =
+ static_cast(user_data);
+ auto it = self->sockets_.find(as);
+ GPR_ASSERT(it != self->sockets_.end());
+ return it->second->Connect(&wsa_error_ctx, target, target_len);
+ }
+
+ static ares_ssize_t SendV(ares_socket_t as, const struct iovec* iov,
+ int iovec_count, void* user_data) {
+ WSAErrorContext wsa_error_ctx;
+ GrpcPolledFdFactoryWindows* self =
+ static_cast(user_data);
+ auto it = self->sockets_.find(as);
+ GPR_ASSERT(it != self->sockets_.end());
+ return it->second->SendV(&wsa_error_ctx, iov, iovec_count);
+ }
+
+ static ares_ssize_t RecvFrom(ares_socket_t as, void* data, size_t data_len,
+ int flags, struct sockaddr* from,
+ ares_socklen_t* from_len, void* user_data) {
+ WSAErrorContext wsa_error_ctx;
+ GrpcPolledFdFactoryWindows* self =
+ static_cast(user_data);
+ auto it = self->sockets_.find(as);
+ GPR_ASSERT(it != self->sockets_.end());
+ return it->second->RecvFrom(&wsa_error_ctx, data, data_len, flags, from,
+ from_len);
+ }
+
+ static int CloseSocket(SOCKET s, void*) {
+ GRPC_ARES_RESOLVER_TRACE_LOG("c-ares socket: %d CloseSocket", s);
+ return 0;
+ }
+};
+
+// Adapter to hold the ownership of GrpcPolledFdWindows internally.
+class GrpcPolledFdWrapper : public GrpcPolledFd {
+ public:
+ explicit GrpcPolledFdWrapper(GrpcPolledFdWindows* polled_fd)
+ : polled_fd_(polled_fd) {}
+
+ void RegisterForOnReadableLocked(
+ absl::AnyInvocable read_closure) override {
+ polled_fd_->RegisterForOnReadableLocked(std::move(read_closure));
+ }
+
+ void RegisterForOnWriteableLocked(
+ absl::AnyInvocable write_closure) override {
+ polled_fd_->RegisterForOnWriteableLocked(std::move(write_closure));
+ }
+
+ bool IsFdStillReadableLocked() override {
+ return polled_fd_->IsFdStillReadableLocked();
+ }
+
+ bool ShutdownLocked(absl::Status error) override {
+ return polled_fd_->ShutdownLocked(error);
+ }
+
+ ares_socket_t GetWrappedAresSocketLocked() override {
+ return polled_fd_->GetWrappedAresSocketLocked();
+ }
+
+ const char* GetName() const override { return polled_fd_->GetName(); }
+
+ private:
+ GrpcPolledFdWindows* polled_fd_;
+};
+
+GrpcPolledFdFactoryWindows::GrpcPolledFdFactoryWindows(IOCP* iocp)
+ : iocp_(iocp) {}
+
+GrpcPolledFdFactoryWindows::~GrpcPolledFdFactoryWindows() {}
+
+void GrpcPolledFdFactoryWindows::Initialize(grpc_core::Mutex* mutex,
+ EventEngine* event_engine) {
+ mu_ = mutex;
+ event_engine_ = event_engine;
+}
+
+std::unique_ptr GrpcPolledFdFactoryWindows::NewGrpcPolledFdLocked(
+ ares_socket_t as) {
+ auto it = sockets_.find(as);
+ GPR_ASSERT(it != sockets_.end());
+ return std::make_unique(it->second.get());
+}
+
+void GrpcPolledFdFactoryWindows::ConfigureAresChannelLocked(
+ ares_channel channel) {
+ static const struct ares_socket_functions kCustomSockFuncs = {
+ /*asocket=*/&CustomSockFuncs::Socket,
+ /*aclose=*/&CustomSockFuncs::CloseSocket,
+ /*aconnect=*/&CustomSockFuncs::Connect,
+ /*arecvfrom=*/&CustomSockFuncs::RecvFrom,
+ /*asendv=*/&CustomSockFuncs::SendV,
+ };
+ ares_set_socket_functions(channel, &kCustomSockFuncs, this);
+}
+
+} // namespace experimental
+} // namespace grpc_event_engine
+
+#endif // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
diff --git a/src/core/lib/event_engine/windows/grpc_polled_fd_windows.h b/src/core/lib/event_engine/windows/grpc_polled_fd_windows.h
new file mode 100644
index 00000000000..eda3330cb91
--- /dev/null
+++ b/src/core/lib/event_engine/windows/grpc_polled_fd_windows.h
@@ -0,0 +1,75 @@
+// Copyright 2023 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_GRPC_POLLED_FD_WINDOWS_H
+#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_GRPC_POLLED_FD_WINDOWS_H
+
+#include
+
+#include "src/core/lib/iomgr/port.h" // IWYU pragma: keep
+
+#if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
+
+#include
+
+#include
+
+#include "absl/functional/any_invocable.h"
+#include "absl/status/status.h"
+
+#include
+
+#include "src/core/lib/event_engine/common_closures.h"
+#include "src/core/lib/event_engine/grpc_polled_fd.h"
+#include "src/core/lib/event_engine/windows/iocp.h"
+#include "src/core/lib/event_engine/windows/win_socket.h"
+#include "src/core/lib/gprpp/sync.h"
+
+struct iovec;
+
+namespace grpc_event_engine {
+namespace experimental {
+
+class GrpcPolledFdWindows;
+
+class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory {
+ public:
+ explicit GrpcPolledFdFactoryWindows(IOCP* iocp);
+ ~GrpcPolledFdFactoryWindows() override;
+
+ void Initialize(grpc_core::Mutex* mutex, EventEngine* event_engine) override;
+ std::unique_ptr NewGrpcPolledFdLocked(
+ ares_socket_t as) override;
+ void ConfigureAresChannelLocked(ares_channel channel) override;
+
+ private:
+ friend class CustomSockFuncs;
+
+ // The mutex is owned by the AresResolver which owns this object.
+ grpc_core::Mutex* mu_;
+ // The IOCP object is owned by the WindowsEngine whose ownership is shared by
+ // the AresResolver.
+ IOCP* iocp_;
+ // This pointer is initialized from the stored pointer inside the shared
+ // pointer owned by the AresResolver which owns this object.
+ EventEngine* event_engine_;
+ std::map> sockets_;
+};
+
+} // namespace experimental
+} // namespace grpc_event_engine
+
+#endif // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
+
+#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_GRPC_POLLED_FD_WINDOWS_H
diff --git a/src/core/lib/event_engine/windows/windows_engine.cc b/src/core/lib/event_engine/windows/windows_engine.cc
index d4336344d8b..24b5c8898e6 100644
--- a/src/core/lib/event_engine/windows/windows_engine.cc
+++ b/src/core/lib/event_engine/windows/windows_engine.cc
@@ -35,6 +35,7 @@
#include "src/core/lib/event_engine/thread_pool/thread_pool.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/event_engine/utils.h"
+#include "src/core/lib/event_engine/windows/grpc_polled_fd_windows.h"
#include "src/core/lib/event_engine/windows/iocp.h"
#include "src/core/lib/event_engine/windows/windows_endpoint.h"
#include "src/core/lib/event_engine/windows/windows_engine.h"
@@ -194,10 +195,49 @@ EventEngine::TaskHandle WindowsEventEngine::RunAfterInternal(
return handle;
}
+#if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
+
+WindowsEventEngine::WindowsDNSResolver::WindowsDNSResolver(
+ grpc_core::OrphanablePtr ares_resolver)
+ : ares_resolver_(std::move(ares_resolver)) {}
+
+void WindowsEventEngine::WindowsDNSResolver::LookupHostname(
+ LookupHostnameCallback on_resolve, absl::string_view name,
+ absl::string_view default_port) {
+ ares_resolver_->LookupHostname(name, default_port, std::move(on_resolve));
+}
+
+void WindowsEventEngine::WindowsDNSResolver::LookupSRV(
+ LookupSRVCallback on_resolve, absl::string_view name) {
+ ares_resolver_->LookupSRV(name, std::move(on_resolve));
+}
+
+void WindowsEventEngine::WindowsDNSResolver::LookupTXT(
+ LookupTXTCallback on_resolve, absl::string_view name) {
+ ares_resolver_->LookupTXT(name, std::move(on_resolve));
+}
+
+#endif // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
+
absl::StatusOr>
WindowsEventEngine::GetDNSResolver(
- EventEngine::DNSResolver::ResolverOptions const& /*options*/) {
+ EventEngine::DNSResolver::ResolverOptions const& options) {
+#if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
+ auto ares_resolver = AresResolver::CreateAresResolver(
+ options.dns_server,
+ std::make_unique(poller()),
+ shared_from_this());
+ if (!ares_resolver.ok()) {
+ return ares_resolver.status();
+ }
+ return std::make_unique(
+ std::move(*ares_resolver));
+#else // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
+ // TODO(yijiem): Implement a basic A/AAAA-only native resolver in
+ // WindowsEventEngine.
+ (void)options;
grpc_core::Crash("unimplemented");
+#endif // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
}
bool WindowsEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }
diff --git a/src/core/lib/event_engine/windows/windows_engine.h b/src/core/lib/event_engine/windows/windows_engine.h
index f9211e46177..150e7facf3f 100644
--- a/src/core/lib/event_engine/windows/windows_engine.h
+++ b/src/core/lib/event_engine/windows/windows_engine.h
@@ -13,8 +13,11 @@
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENGINE_H
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENGINE_H
+
#include
+#include "src/core/lib/iomgr/port.h" // IWYU pragma: keep
+
#ifdef GPR_WINDOWS
#include
@@ -28,6 +31,7 @@
#include
#include
+#include "src/core/lib/event_engine/ares_resolver.h"
#include "src/core/lib/event_engine/handle_containers.h"
#include "src/core/lib/event_engine/posix_engine/timer_manager.h"
#include "src/core/lib/event_engine/thread_pool/thread_pool.h"
@@ -47,7 +51,11 @@ class WindowsEventEngine : public EventEngine,
public:
class WindowsDNSResolver : public EventEngine::DNSResolver {
public:
- ~WindowsDNSResolver() override;
+ WindowsDNSResolver() = delete;
+#if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
+ explicit WindowsDNSResolver(
+ grpc_core::OrphanablePtr ares_resolver);
+#endif // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
void LookupHostname(LookupHostnameCallback on_resolve,
absl::string_view name,
absl::string_view default_port) override;
@@ -55,6 +63,11 @@ class WindowsEventEngine : public EventEngine,
absl::string_view name) override;
void LookupTXT(LookupTXTCallback on_resolve,
absl::string_view name) override;
+
+#if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
+ private:
+ grpc_core::OrphanablePtr ares_resolver_;
+#endif // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
};
WindowsEventEngine();
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 14ad4b18b2d..f63ba634c18 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -539,6 +539,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/event_engine/time_util.cc',
'src/core/lib/event_engine/trace.cc',
'src/core/lib/event_engine/utils.cc',
+ 'src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc',
'src/core/lib/event_engine/windows/iocp.cc',
'src/core/lib/event_engine/windows/win_socket.cc',
'src/core/lib/event_engine/windows/windows_endpoint.cc',
diff --git a/test/core/event_engine/test_suite/BUILD b/test/core/event_engine/test_suite/BUILD
index 385aa7bb6a2..63c37eb70ac 100644
--- a/test/core/event_engine/test_suite/BUILD
+++ b/test/core/event_engine/test_suite/BUILD
@@ -88,6 +88,7 @@ grpc_cc_test(
"//src/core:windows_event_engine",
"//test/core/event_engine:event_engine_test_utils",
"//test/core/event_engine/test_suite/tests:client",
+ "//test/core/event_engine/test_suite/tests:dns",
"//test/core/event_engine/test_suite/tests:server",
"//test/core/event_engine/test_suite/tests:timer",
],
diff --git a/test/core/event_engine/test_suite/tests/BUILD b/test/core/event_engine/test_suite/tests/BUILD
index 4c187e2c5d4..87ebaab6dc2 100644
--- a/test/core/event_engine/test_suite/tests/BUILD
+++ b/test/core/event_engine/test_suite/tests/BUILD
@@ -76,6 +76,7 @@ grpc_cc_library(
"//test/core/util:grpc_test_util_base",
"//test/cpp/util:get_grpc_test_runfile_dir",
"//test/cpp/util:test_util",
+ "//test/cpp/util/windows:manifest_file",
],
alwayslink = 1,
)
diff --git a/test/core/event_engine/test_suite/tests/dns_test.cc b/test/core/event_engine/test_suite/tests/dns_test.cc
index 4c09f49846e..4e585b59a7d 100644
--- a/test/core/event_engine/test_suite/tests/dns_test.cc
+++ b/test/core/event_engine/test_suite/tests/dns_test.cc
@@ -15,6 +15,8 @@
// IWYU pragma: no_include
// IWYU pragma: no_include
+#include
+
#include
#include
#include
@@ -34,6 +36,7 @@
#include
#include "src/core/lib/event_engine/tcp_socket_utils.h"
+#include "src/core/lib/gprpp/crash.h" // IWYU pragma: keep
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "test/core/event_engine/test_suite/event_engine_test_framework.h"
@@ -42,6 +45,10 @@
#include "test/cpp/util/get_grpc_test_runfile_dir.h"
#include "test/cpp/util/subprocess.h"
+#ifdef GPR_WINDOWS
+#include "test/cpp/util/windows/manifest_file.h"
+#endif // GPR_WINDOWS
+
namespace grpc_event_engine {
namespace experimental {
@@ -50,13 +57,6 @@ void InitDNSTests() {}
} // namespace experimental
} // namespace grpc_event_engine
-#ifdef GPR_WINDOWS
-class EventEngineDNSTest : public EventEngineTest {};
-
-// TODO(yijiem): make the test run on Windows
-TEST_F(EventEngineDNSTest, TODO) {}
-#else
-
namespace {
using grpc_event_engine::experimental::EventEngine;
@@ -109,13 +109,36 @@ class EventEngineDNSTest : public EventEngineTest {
std::string health_check_path = kHealthCheckRelPath;
absl::optional runfile_dir = grpc::GetGrpcTestRunFileDir();
if (runfile_dir.has_value()) {
- // We sure need a portable filesystem lib for this to work on Windows.
test_records_path = absl::StrJoin({*runfile_dir, test_records_path}, "/");
dns_server_path = absl::StrJoin({*runfile_dir, dns_server_path}, "/");
dns_resolver_path = absl::StrJoin({*runfile_dir, dns_resolver_path}, "/");
tcp_connect_path = absl::StrJoin({*runfile_dir, tcp_connect_path}, "/");
health_check_path = absl::StrJoin({*runfile_dir, health_check_path}, "/");
+#ifdef GPR_WINDOWS
+// TODO(yijiem): Misusing the GRPC_PORT_ISOLATED_RUNTIME preprocessor symbol as
+// an indication whether the test is running on RBE or not. Find a better way of
+// doing this.
+#ifndef GRPC_PORT_ISOLATED_RUNTIME
+ gpr_log(GPR_ERROR,
+ "You are invoking the test locally with Bazel, you may need to "
+ "invoke Bazel with --enable_runfiles=yes.");
+#endif // GRPC_PORT_ISOLATED_RUNTIME
+ test_records_path = grpc::testing::NormalizeFilePath(test_records_path);
+ dns_server_path =
+ grpc::testing::NormalizeFilePath(dns_server_path + ".exe");
+ dns_resolver_path =
+ grpc::testing::NormalizeFilePath(dns_resolver_path + ".exe");
+ tcp_connect_path =
+ grpc::testing::NormalizeFilePath(tcp_connect_path + ".exe");
+ health_check_path =
+ grpc::testing::NormalizeFilePath(health_check_path + ".exe");
+#endif // GPR_WINDOWS
} else {
+#ifdef GPR_WINDOWS
+ grpc_core::Crash(
+ "The EventEngineDNSTest does not support running without Bazel on "
+ "Windows for now.");
+#endif // GPR_WINDOWS
// Invoke the .py scripts directly where they are in source code if we are
// not running with bazel.
dns_server_path += ".py";
@@ -141,8 +164,11 @@ class EventEngineDNSTest : public EventEngineTest {
tcp_connect_path,
});
int status = health_check.Join();
- // TODO(yijiem): make this portable for Windows
+#ifdef GPR_WINDOWS
+ ASSERT_EQ(status, 0);
+#else
ASSERT_TRUE(WIFEXITED(status) && WEXITSTATUS(status) == 0);
+#endif // GPR_WINDOWS
#endif // GRPC_IOS_EVENT_ENGINE_CLIENT
}
@@ -546,5 +572,3 @@ TEST_F(EventEngineDNSTest, UnparseableHostPortsBadLocalhostWithPort) {
&dns_resolver_signal_, "[localhost]:1");
}
// END
-
-#endif // GPR_WINDOWS
diff --git a/test/cpp/naming/utils/health_check.py b/test/cpp/naming/utils/health_check.py
index a7950c65f6b..e754571db30 100755
--- a/test/cpp/naming/utils/health_check.py
+++ b/test/cpp/naming/utils/health_check.py
@@ -25,7 +25,7 @@ def test_runner_log(msg):
def python_args(arg_list):
- if platform.system() == "Windows":
+ if platform.system() == "Windows" and arg_list[0].endswith(".py"):
return [sys.executable] + arg_list
return arg_list
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index 1cf301f970e..c24e3a24542 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -2169,6 +2169,8 @@ src/core/lib/event_engine/trace.cc \
src/core/lib/event_engine/trace.h \
src/core/lib/event_engine/utils.cc \
src/core/lib/event_engine/utils.h \
+src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc \
+src/core/lib/event_engine/windows/grpc_polled_fd_windows.h \
src/core/lib/event_engine/windows/iocp.cc \
src/core/lib/event_engine/windows/iocp.h \
src/core/lib/event_engine/windows/win_socket.cc \
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index c332115225f..92c23067bd2 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -1945,6 +1945,8 @@ src/core/lib/event_engine/trace.cc \
src/core/lib/event_engine/trace.h \
src/core/lib/event_engine/utils.cc \
src/core/lib/event_engine/utils.h \
+src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc \
+src/core/lib/event_engine/windows/grpc_polled_fd_windows.h \
src/core/lib/event_engine/windows/iocp.cc \
src/core/lib/event_engine/windows/iocp.h \
src/core/lib/event_engine/windows/win_socket.cc \