[EventEngine] WindowsDNSResolver Implementation (#34400)

Design is documented at
[go/windows-dns-resolver-issue](http://go/windows-dns-resolver-issue)
(note that the design doc is slightly outdated regarding the shared
ownership model of the virtual socket that was implemented in
13bd2b404e).

Passed `//test/cpp/naming:resolver_component_tests_runner_invoker` and
`//test/cpp/naming:cancel_ares_query_test`:
```
C:\Users\yijiem\projects\grpc>bazel --output_base=C:\bazel6 test --dynamic_mode=off --verbose_failures --test_env=GRPC_EXPERIMENTS=event_engine_dns --test_env=GRPC_VERBOSITY=debug --test_env=GRPC_TRACE=cares_resolver --enable_runfiles=yes --nocache_test_results //test/cpp/naming:resolver_component_tests_runner_invoker
INFO: Analyzed target //test/cpp/naming:resolver_component_tests_runner_invoker (1 packages loaded, 8 targets configured).
INFO: Found 1 test target...
INFO: From Compiling src/core/lib/event_engine/windows/windows_engine.cc:
C:\bazel6\execroot\com_github_grpc_grpc\src/core/lib/channel/channel_args.h(287): warning C4312: 'reinterpret_cast': conversion from 'int' to 'void *' of greater size
Target //test/cpp/naming:resolver_component_tests_runner_invoker up-to-date:
  bazel-bin/test/cpp/naming/resolver_component_tests_runner_invoker.exe
INFO: Elapsed time: 230.374s, Critical Path: 228.54s
INFO: 9 processes: 2 internal, 7 local.
INFO: Build completed successfully, 9 total actions
//test/cpp/naming:resolver_component_tests_runner_invoker                PASSED in 221.2s

Executed 1 out of 1 test: 1 test passes.
```

```
C:\Users\yijiem\projects\grpc>bazel --output_base=C:\bazel6 test --dynamic_mode=off --verbose_failures --test_env=GRPC_EXPERIMENTS=event_engine_dns --test_env=GRPC_VERBOSITY=debug --test_env=GRPC_TRACE=cares_resolver --enable_runfiles=yes --nocache_test_results //test/cpp/naming:cancel_ares_query_test
INFO: Analyzed target //test/cpp/naming:cancel_ares_query_test (0 packages loaded, 0 targets configured).
INFO: Found 1 test target...
Target //test/cpp/naming:cancel_ares_query_test up-to-date:
  bazel-bin/test/cpp/naming/cancel_ares_query_test.exe
INFO: Elapsed time: 49.656s, Critical Path: 48.00s
INFO: 6 processes: 2 internal, 4 local.
INFO: Build completed successfully, 6 total actions
//test/cpp/naming:cancel_ares_query_test                                 PASSED in 43.0s

Executed 1 out of 1 test: 1 test passes.
```

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
veblush-patch-3
Yijie Ma 1 year ago committed by GitHub
parent b0e0659bab
commit 720d7a0653
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      CMakeLists.txt
  2. 2
      Makefile
  3. 2
      Package.swift
  4. 10
      build_autogenerated.yaml
  5. 1
      config.m4
  6. 1
      config.w32
  7. 2
      gRPC-C++.podspec
  8. 3
      gRPC-Core.podspec
  9. 2
      grpc.gemspec
  10. 3
      grpc.gyp
  11. 2
      package.xml
  12. 7
      src/core/BUILD
  13. 38
      src/core/lib/event_engine/ares_resolver.cc
  14. 6
      src/core/lib/event_engine/ares_resolver.h
  15. 27
      src/core/lib/event_engine/grpc_polled_fd.h
  16. 16
      src/core/lib/event_engine/posix_engine/grpc_polled_fd_posix.h
  17. 823
      src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
  18. 75
      src/core/lib/event_engine/windows/grpc_polled_fd_windows.h
  19. 42
      src/core/lib/event_engine/windows/windows_engine.cc
  20. 15
      src/core/lib/event_engine/windows/windows_engine.h
  21. 1
      src/python/grpcio/grpc_core_dependencies.py
  22. 1
      test/core/event_engine/test_suite/BUILD
  23. 1
      test/core/event_engine/test_suite/tests/BUILD
  24. 46
      test/core/event_engine/test_suite/tests/dns_test.cc
  25. 2
      test/cpp/naming/utils/health_check.py
  26. 2
      tools/doxygen/Doxyfile.c++.internal
  27. 2
      tools/doxygen/Doxyfile.core.internal

5
CMakeLists.txt generated

@ -2266,6 +2266,7 @@ add_library(grpc
src/core/lib/event_engine/time_util.cc
src/core/lib/event_engine/trace.cc
src/core/lib/event_engine/utils.cc
src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
src/core/lib/event_engine/windows/iocp.cc
src/core/lib/event_engine/windows/win_socket.cc
src/core/lib/event_engine/windows/windows_endpoint.cc
@ -2991,6 +2992,7 @@ add_library(grpc_unsecure
src/core/lib/event_engine/time_util.cc
src/core/lib/event_engine/trace.cc
src/core/lib/event_engine/utils.cc
src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
src/core/lib/event_engine/windows/iocp.cc
src/core/lib/event_engine/windows/win_socket.cc
src/core/lib/event_engine/windows/windows_endpoint.cc
@ -4949,6 +4951,7 @@ add_library(grpc_authorization_provider
src/core/lib/event_engine/time_util.cc
src/core/lib/event_engine/trace.cc
src/core/lib/event_engine/utils.cc
src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
src/core/lib/event_engine/windows/iocp.cc
src/core/lib/event_engine/windows/win_socket.cc
src/core/lib/event_engine/windows/windows_endpoint.cc
@ -18101,6 +18104,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
test/core/event_engine/test_suite/tests/timer_test.cc
test/core/util/fake_udp_and_tcp_server.cc
test/cpp/util/get_grpc_test_runfile_dir.cc
test/cpp/util/windows/manifest_file.cc
)
target_compile_features(posix_event_engine_test PUBLIC cxx_std_14)
target_include_directories(posix_event_engine_test
@ -24368,6 +24372,7 @@ add_executable(test_core_transport_chaotic_good_frame_test
src/core/lib/event_engine/time_util.cc
src/core/lib/event_engine/trace.cc
src/core/lib/event_engine/utils.cc
src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
src/core/lib/event_engine/windows/iocp.cc
src/core/lib/event_engine/windows/win_socket.cc
src/core/lib/event_engine/windows/windows_endpoint.cc

2
Makefile generated

@ -1474,6 +1474,7 @@ LIBGRPC_SRC = \
src/core/lib/event_engine/time_util.cc \
src/core/lib/event_engine/trace.cc \
src/core/lib/event_engine/utils.cc \
src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc \
src/core/lib/event_engine/windows/iocp.cc \
src/core/lib/event_engine/windows/win_socket.cc \
src/core/lib/event_engine/windows/windows_endpoint.cc \
@ -2051,6 +2052,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/event_engine/time_util.cc \
src/core/lib/event_engine/trace.cc \
src/core/lib/event_engine/utils.cc \
src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc \
src/core/lib/event_engine/windows/iocp.cc \
src/core/lib/event_engine/windows/win_socket.cc \
src/core/lib/event_engine/windows/windows_endpoint.cc \

2
Package.swift generated

@ -1164,6 +1164,8 @@ let package = Package(
"src/core/lib/event_engine/trace.h",
"src/core/lib/event_engine/utils.cc",
"src/core/lib/event_engine/utils.h",
"src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc",
"src/core/lib/event_engine/windows/grpc_polled_fd_windows.h",
"src/core/lib/event_engine/windows/iocp.cc",
"src/core/lib/event_engine/windows/iocp.h",
"src/core/lib/event_engine/windows/win_socket.cc",

@ -738,6 +738,7 @@ libs:
- src/core/lib/event_engine/time_util.h
- src/core/lib/event_engine/trace.h
- src/core/lib/event_engine/utils.h
- src/core/lib/event_engine/windows/grpc_polled_fd_windows.h
- src/core/lib/event_engine/windows/iocp.h
- src/core/lib/event_engine/windows/win_socket.h
- src/core/lib/event_engine/windows/windows_endpoint.h
@ -1560,6 +1561,7 @@ libs:
- src/core/lib/event_engine/time_util.cc
- src/core/lib/event_engine/trace.cc
- src/core/lib/event_engine/utils.cc
- src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
- src/core/lib/event_engine/windows/iocp.cc
- src/core/lib/event_engine/windows/win_socket.cc
- src/core/lib/event_engine/windows/windows_endpoint.cc
@ -2170,6 +2172,7 @@ libs:
- src/core/lib/event_engine/time_util.h
- src/core/lib/event_engine/trace.h
- src/core/lib/event_engine/utils.h
- src/core/lib/event_engine/windows/grpc_polled_fd_windows.h
- src/core/lib/event_engine/windows/iocp.h
- src/core/lib/event_engine/windows/win_socket.h
- src/core/lib/event_engine/windows/windows_endpoint.h
@ -2605,6 +2608,7 @@ libs:
- src/core/lib/event_engine/time_util.cc
- src/core/lib/event_engine/trace.cc
- src/core/lib/event_engine/utils.cc
- src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
- src/core/lib/event_engine/windows/iocp.cc
- src/core/lib/event_engine/windows/win_socket.cc
- src/core/lib/event_engine/windows/windows_endpoint.cc
@ -4200,6 +4204,7 @@ libs:
- src/core/lib/event_engine/time_util.h
- src/core/lib/event_engine/trace.h
- src/core/lib/event_engine/utils.h
- src/core/lib/event_engine/windows/grpc_polled_fd_windows.h
- src/core/lib/event_engine/windows/iocp.h
- src/core/lib/event_engine/windows/win_socket.h
- src/core/lib/event_engine/windows/windows_endpoint.h
@ -4511,6 +4516,7 @@ libs:
- src/core/lib/event_engine/time_util.cc
- src/core/lib/event_engine/trace.cc
- src/core/lib/event_engine/utils.cc
- src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
- src/core/lib/event_engine/windows/iocp.cc
- src/core/lib/event_engine/windows/win_socket.cc
- src/core/lib/event_engine/windows/windows_endpoint.cc
@ -12135,6 +12141,7 @@ targets:
- test/core/event_engine/test_suite/tests/timer_test.h
- test/core/util/fake_udp_and_tcp_server.h
- test/cpp/util/get_grpc_test_runfile_dir.h
- test/cpp/util/windows/manifest_file.h
src:
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/test_suite/event_engine_test_framework.cc
@ -12146,6 +12153,7 @@ targets:
- test/core/event_engine/test_suite/tests/timer_test.cc
- test/core/util/fake_udp_and_tcp_server.cc
- test/cpp/util/get_grpc_test_runfile_dir.cc
- test/cpp/util/windows/manifest_file.cc
deps:
- gtest
- grpc++_test_util
@ -15936,6 +15944,7 @@ targets:
- src/core/lib/event_engine/time_util.h
- src/core/lib/event_engine/trace.h
- src/core/lib/event_engine/utils.h
- src/core/lib/event_engine/windows/grpc_polled_fd_windows.h
- src/core/lib/event_engine/windows/iocp.h
- src/core/lib/event_engine/windows/win_socket.h
- src/core/lib/event_engine/windows/windows_endpoint.h
@ -16228,6 +16237,7 @@ targets:
- src/core/lib/event_engine/time_util.cc
- src/core/lib/event_engine/trace.cc
- src/core/lib/event_engine/utils.cc
- src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
- src/core/lib/event_engine/windows/iocp.cc
- src/core/lib/event_engine/windows/win_socket.cc
- src/core/lib/event_engine/windows/windows_endpoint.cc

1
config.m4 generated

@ -565,6 +565,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/event_engine/time_util.cc \
src/core/lib/event_engine/trace.cc \
src/core/lib/event_engine/utils.cc \
src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc \
src/core/lib/event_engine/windows/iocp.cc \
src/core/lib/event_engine/windows/win_socket.cc \
src/core/lib/event_engine/windows/windows_endpoint.cc \

1
config.w32 generated

@ -530,6 +530,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\event_engine\\time_util.cc " +
"src\\core\\lib\\event_engine\\trace.cc " +
"src\\core\\lib\\event_engine\\utils.cc " +
"src\\core\\lib\\event_engine\\windows\\grpc_polled_fd_windows.cc " +
"src\\core\\lib\\event_engine\\windows\\iocp.cc " +
"src\\core\\lib\\event_engine\\windows\\win_socket.cc " +
"src\\core\\lib\\event_engine\\windows\\windows_endpoint.cc " +

2
gRPC-C++.podspec generated

@ -813,6 +813,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/time_util.h',
'src/core/lib/event_engine/trace.h',
'src/core/lib/event_engine/utils.h',
'src/core/lib/event_engine/windows/grpc_polled_fd_windows.h',
'src/core/lib/event_engine/windows/iocp.h',
'src/core/lib/event_engine/windows/win_socket.h',
'src/core/lib/event_engine/windows/windows_endpoint.h',
@ -1890,6 +1891,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/time_util.h',
'src/core/lib/event_engine/trace.h',
'src/core/lib/event_engine/utils.h',
'src/core/lib/event_engine/windows/grpc_polled_fd_windows.h',
'src/core/lib/event_engine/windows/iocp.h',
'src/core/lib/event_engine/windows/win_socket.h',
'src/core/lib/event_engine/windows/windows_endpoint.h',

3
gRPC-Core.podspec generated

@ -1267,6 +1267,8 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/trace.h',
'src/core/lib/event_engine/utils.cc',
'src/core/lib/event_engine/utils.h',
'src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc',
'src/core/lib/event_engine/windows/grpc_polled_fd_windows.h',
'src/core/lib/event_engine/windows/iocp.cc',
'src/core/lib/event_engine/windows/iocp.h',
'src/core/lib/event_engine/windows/win_socket.cc',
@ -2650,6 +2652,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/time_util.h',
'src/core/lib/event_engine/trace.h',
'src/core/lib/event_engine/utils.h',
'src/core/lib/event_engine/windows/grpc_polled_fd_windows.h',
'src/core/lib/event_engine/windows/iocp.h',
'src/core/lib/event_engine/windows/win_socket.h',
'src/core/lib/event_engine/windows/windows_endpoint.h',

2
grpc.gemspec generated

@ -1170,6 +1170,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/trace.h )
s.files += %w( src/core/lib/event_engine/utils.cc )
s.files += %w( src/core/lib/event_engine/utils.h )
s.files += %w( src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc )
s.files += %w( src/core/lib/event_engine/windows/grpc_polled_fd_windows.h )
s.files += %w( src/core/lib/event_engine/windows/iocp.cc )
s.files += %w( src/core/lib/event_engine/windows/iocp.h )
s.files += %w( src/core/lib/event_engine/windows/win_socket.cc )

3
grpc.gyp generated

@ -792,6 +792,7 @@
'src/core/lib/event_engine/time_util.cc',
'src/core/lib/event_engine/trace.cc',
'src/core/lib/event_engine/utils.cc',
'src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc',
'src/core/lib/event_engine/windows/iocp.cc',
'src/core/lib/event_engine/windows/win_socket.cc',
'src/core/lib/event_engine/windows/windows_endpoint.cc',
@ -1310,6 +1311,7 @@
'src/core/lib/event_engine/time_util.cc',
'src/core/lib/event_engine/trace.cc',
'src/core/lib/event_engine/utils.cc',
'src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc',
'src/core/lib/event_engine/windows/iocp.cc',
'src/core/lib/event_engine/windows/win_socket.cc',
'src/core/lib/event_engine/windows/windows_endpoint.cc',
@ -2059,6 +2061,7 @@
'src/core/lib/event_engine/time_util.cc',
'src/core/lib/event_engine/trace.cc',
'src/core/lib/event_engine/utils.cc',
'src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc',
'src/core/lib/event_engine/windows/iocp.cc',
'src/core/lib/event_engine/windows/win_socket.cc',
'src/core/lib/event_engine/windows/windows_endpoint.cc',

2
package.xml generated

@ -1152,6 +1152,8 @@
<file baseinstalldir="/" name="src/core/lib/event_engine/trace.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/utils.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/utils.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/windows/grpc_polled_fd_windows.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/windows/iocp.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/windows/iocp.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/windows/win_socket.cc" role="src" />

@ -2116,6 +2116,7 @@ grpc_cc_library(
"absl/strings",
],
deps = [
"ares_resolver",
"channel_args_endpoint_config",
"common_event_engine_closures",
"error",
@ -2125,6 +2126,7 @@ grpc_cc_library(
"event_engine_trace",
"event_engine_utils",
"init_internally",
"iomgr_port",
"posix_event_engine_timer_manager",
"time",
"windows_endpoint",
@ -2419,12 +2421,14 @@ grpc_cc_library(
name = "ares_resolver",
srcs = [
"lib/event_engine/ares_resolver.cc",
"lib/event_engine/windows/grpc_polled_fd_windows.cc",
],
hdrs = [
"lib/event_engine/ares_resolver.h",
"lib/event_engine/grpc_polled_fd.h",
"lib/event_engine/nameser.h",
"lib/event_engine/posix_engine/grpc_polled_fd_posix.h",
"lib/event_engine/windows/grpc_polled_fd_windows.h",
],
external_deps = [
"absl/base:core_headers",
@ -2440,6 +2444,7 @@ grpc_cc_library(
"cares",
],
deps = [
"common_event_engine_closures",
"error",
"event_engine_time_util",
"grpc_sockaddr",
@ -2448,6 +2453,8 @@ grpc_cc_library(
"posix_event_engine_event_poller",
"posix_event_engine_tcp_socket_utils",
"resolved_address",
"slice",
"windows_iocp",
"//:debug_location",
"//:event_engine_base_hdrs",
"//:gpr",

@ -62,10 +62,12 @@
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/grpc_polled_fd.h"
#include "src/core/lib/event_engine/time_util.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
@ -188,6 +190,18 @@ AresResolver::CreateAresResolver(
std::move(polled_fd_factory), std::move(event_engine), channel);
}
AresResolver::AresResolver(
std::unique_ptr<GrpcPolledFdFactory> polled_fd_factory,
std::shared_ptr<EventEngine> event_engine, ares_channel channel)
: grpc_core::InternallyRefCounted<AresResolver>(
GRPC_TRACE_FLAG_ENABLED(grpc_trace_ares_resolver) ? "AresResolver"
: nullptr),
channel_(channel),
polled_fd_factory_(std::move(polled_fd_factory)),
event_engine_(std::move(event_engine)) {
polled_fd_factory_->Initialize(&mutex_, event_engine_.get());
}
AresResolver::~AresResolver() {
GPR_ASSERT(fd_node_list_.empty());
GPR_ASSERT(callback_map_.empty());
@ -206,8 +220,8 @@ void AresResolver::Orphan() {
if (!fd_node->already_shutdown) {
GRPC_ARES_RESOLVER_TRACE_LOG("resolver: %p shutdown fd: %s", this,
fd_node->polled_fd->GetName());
fd_node->polled_fd->ShutdownLocked(
absl::CancelledError("AresResolver::Orphan"));
GPR_ASSERT(fd_node->polled_fd->ShutdownLocked(
absl::CancelledError("AresResolver::Orphan")));
fd_node->already_shutdown = true;
}
}
@ -339,20 +353,10 @@ void AresResolver::LookupTXT(
MaybeStartTimerLocked();
}
AresResolver::AresResolver(
std::unique_ptr<GrpcPolledFdFactory> polled_fd_factory,
std::shared_ptr<EventEngine> event_engine, ares_channel channel)
: grpc_core::InternallyRefCounted<AresResolver>(
GRPC_TRACE_FLAG_ENABLED(grpc_trace_ares_resolver) ? "AresResolver"
: nullptr),
channel_(channel),
polled_fd_factory_(std::move(polled_fd_factory)),
event_engine_(std::move(event_engine)) {}
void AresResolver::CheckSocketsLocked() {
FdNodeList new_list;
if (!shutting_down_) {
ares_socket_t socks[ARES_GETSOCK_MAXNUM];
ares_socket_t socks[ARES_GETSOCK_MAXNUM] = {};
int socks_bitmask = ares_getsock(channel_, socks, ARES_GETSOCK_MAXNUM);
for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
@ -415,13 +419,17 @@ void AresResolver::CheckSocketsLocked() {
// Any remaining fds in fd_node_list_ were not returned by ares_getsock()
// and are therefore no longer in use, so they can be shut down and removed
// from the list.
// TODO(yijiem): Since we are keeping the underlying socket opened for both
// Posix and Windows, it might be reasonable to also keep the FdNodes alive
// till the end. But we need to change the state management of FdNodes in this
// file. This may simplify the code a bit.
while (!fd_node_list_.empty()) {
FdNode* fd_node = fd_node_list_.front().get();
if (!fd_node->already_shutdown) {
GRPC_ARES_RESOLVER_TRACE_LOG("resolver: %p shutdown fd: %s", this,
fd_node->polled_fd->GetName());
fd_node->polled_fd->ShutdownLocked(absl::OkStatus());
fd_node->already_shutdown = true;
fd_node->already_shutdown =
fd_node->polled_fd->ShutdownLocked(absl::OkStatus());
}
if (!fd_node->readable_registered && !fd_node->writable_registered) {
GRPC_ARES_RESOLVER_TRACE_LOG("resolver: %p delete fd: %s", this,

@ -16,6 +16,8 @@
#include <grpc/support/port_platform.h>
#include <utility>
#include "src/core/lib/debug/trace.h"
#if GRPC_ARES == 1
@ -87,8 +89,8 @@ class AresResolver : public grpc_core::InternallyRefCounted<AresResolver> {
// close the socket (possibly through ares_destroy).
struct FdNode {
FdNode() = default;
FdNode(ares_socket_t as, GrpcPolledFd* polled_fd)
: as(as), polled_fd(polled_fd) {}
FdNode(ares_socket_t as, std::unique_ptr<GrpcPolledFd> pf)
: as(as), polled_fd(std::move(pf)) {}
ares_socket_t as;
std::unique_ptr<GrpcPolledFd> polled_fd;
// true if the readable closure has been registered

@ -17,6 +17,10 @@
#include <grpc/support/port_platform.h>
#include <memory>
#include <grpc/event_engine/event_engine.h>
#if GRPC_ARES == 1
#include <ares.h>
@ -24,7 +28,7 @@
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_event_engine {
namespace experimental {
@ -46,8 +50,17 @@ class GrpcPolledFd {
// Indicates if there is data left even after just being read from
virtual bool IsFdStillReadableLocked() = 0;
// Called once and only once. Must cause cancellation of any pending
// read/write callbacks.
virtual void ShutdownLocked(grpc_error_handle error) = 0;
// read/write callbacks. Return true when the Shutdown is confirmed, false
// otherwise.
//
// TODO(yijiem): On Posix, ShutdownLocked will always succeed. On Windows,
// ShutdownLocked only succeeds when error is Cancelled. We could remove these
// requirements if we changed the FdNode lifetime model so that:
// 1. FdNodes and their underlying socket handles remain alive for
// the lifetime of the resolver.
// 2. Orphaning the resolver triggers shutdown and subsequent cleanup for
// all FdNodes and socket handles.
GRPC_MUST_USE_RESULT virtual bool ShutdownLocked(absl::Status error) = 0;
// Get the underlying ares_socket_t that this was created from
virtual ares_socket_t GetWrappedAresSocketLocked() = 0;
// A unique name, for logging
@ -60,8 +73,14 @@ class GrpcPolledFd {
class GrpcPolledFdFactory {
public:
virtual ~GrpcPolledFdFactory() {}
// Optionally initializes the GrpcPolledFdFactory with a grpc_core::Mutex*
// for synchronization between the AresResolver and the GrpcPolledFds. The
// Windows implementation overrides this.
virtual void Initialize(grpc_core::Mutex* mutex,
EventEngine* event_engine) = 0;
// Creates a new wrapped fd for the current platform
virtual GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as) = 0;
virtual std::unique_ptr<GrpcPolledFd> NewGrpcPolledFdLocked(
ares_socket_t as) = 0;
// Optionally configures the ares channel after creation
virtual void ConfigureAresChannelLocked(ares_channel channel) = 0;
};

@ -17,6 +17,11 @@
#include <grpc/support/port_platform.h>
#include <memory>
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/port.h"
#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)
@ -42,7 +47,6 @@
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/iomgr/error.h"
namespace grpc_event_engine {
namespace experimental {
@ -80,8 +84,9 @@ class GrpcPolledFdPosix : public GrpcPolledFd {
bytes_available > 0;
}
void ShutdownLocked(grpc_error_handle error) override {
bool ShutdownLocked(absl::Status error) override {
handle_->ShutdownHandle(error);
return true;
}
ares_socket_t GetWrappedAresSocketLocked() override { return as_; }
@ -105,9 +110,12 @@ class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory {
}
}
GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as) override {
void Initialize(grpc_core::Mutex*, EventEngine*) override {}
std::unique_ptr<GrpcPolledFd> NewGrpcPolledFdLocked(
ares_socket_t as) override {
owned_fds_.insert(as);
return new GrpcPolledFdPosix(
return std::make_unique<GrpcPolledFdPosix>(
as,
poller_->CreateHandle(as, "c-ares socket", poller_->CanTrackErrors()));
}

@ -0,0 +1,823 @@
// Copyright 2023 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h" // IWYU pragma: keep
#if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
#include <winsock2.h>
#include <ares.h>
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include <grpc/support/log_windows.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/event_engine/ares_resolver.h"
#include "src/core/lib/event_engine/grpc_polled_fd.h"
#include "src/core/lib/event_engine/windows/grpc_polled_fd_windows.h"
#include "src/core/lib/event_engine/windows/win_socket.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/slice/slice.h"
// TODO(apolcyn): remove this hack after fixing upstream.
// Our grpc/c-ares code on Windows uses the ares_set_socket_functions API,
// which uses "struct iovec" type, which on Windows is defined inside of
// a c-ares header that is not public.
// See https://github.com/c-ares/c-ares/issues/206.
struct iovec {
void* iov_base;
size_t iov_len;
};
namespace grpc_event_engine {
namespace experimental {
namespace {
constexpr int kRecvFromSourceAddrSize = 200;
constexpr int kReadBufferSize = 4192;
grpc_slice FlattenIovec(const struct iovec* iov, int iov_count) {
int total = 0;
for (int i = 0; i < iov_count; i++) {
total += iov[i].iov_len;
}
grpc_slice out = GRPC_SLICE_MALLOC(total);
size_t cur = 0;
for (int i = 0; i < iov_count; i++) {
for (size_t k = 0; k < iov[i].iov_len; k++) {
GRPC_SLICE_START_PTR(out)
[cur++] = (static_cast<char*>(iov[i].iov_base))[k];
}
}
return out;
}
} // namespace
// c-ares reads and takes action on the error codes of the
// "virtual socket operations" in this file, via the WSAGetLastError
// APIs. If code in this file wants to set a specific WSA error that
// c-ares should read, it must do so by calling SetWSAError() on the
// WSAErrorContext instance passed to it. A WSAErrorContext must only be
// instantiated at the top of the virtual socket function callstack.
class WSAErrorContext {
public:
explicit WSAErrorContext(){};
~WSAErrorContext() {
if (error_ != 0) {
WSASetLastError(error_);
}
}
// Disallow copy and assignment operators
WSAErrorContext(const WSAErrorContext&) = delete;
WSAErrorContext& operator=(const WSAErrorContext&) = delete;
void SetWSAError(int error) { error_ = error; }
private:
int error_ = 0;
};
// c-ares creates its own sockets and is meant to read them when readable and
// write them when writeable. To fit this socket usage model into the grpc
// windows poller (which gives notifications when attempted reads and writes
// are actually fulfilled rather than possible), this GrpcPolledFdWindows
// class takes advantage of the ares_set_socket_functions API and acts as a
// virtual socket. It holds its own read and write buffers which are written
// to and read from c-ares and are used with the grpc windows poller, and it,
// e.g., manufactures virtual socket error codes when it e.g. needs to tell
// the c-ares library to wait for an async read.
class GrpcPolledFdWindows : public GrpcPolledFd {
public:
GrpcPolledFdWindows(std::unique_ptr<WinSocket> winsocket,
grpc_core::Mutex* mu, int address_family, int socket_type,
EventEngine* event_engine)
: name_(absl::StrFormat("c-ares socket: %" PRIdPTR,
winsocket->raw_socket())),
address_family_(address_family),
socket_type_(socket_type),
mu_(mu),
winsocket_(std::move(winsocket)),
read_buf_(grpc_empty_slice()),
write_buf_(grpc_empty_slice()),
outer_read_closure_([this]() { OnIocpReadable(); }),
outer_write_closure_([this]() { OnIocpWriteable(); }),
on_tcp_connect_locked_([this]() { OnTcpConnect(); }),
event_engine_(event_engine) {}
~GrpcPolledFdWindows() override {
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| ~GrpcPolledFdWindows shutdown_called_: %d ", GetName(),
shutdown_called_);
grpc_core::CSliceUnref(read_buf_);
grpc_core::CSliceUnref(write_buf_);
GPR_ASSERT(read_closure_ == nullptr);
GPR_ASSERT(write_closure_ == nullptr);
if (!shutdown_called_) {
winsocket_->Shutdown(DEBUG_LOCATION, "~GrpcPolledFdWindows");
}
}
void RegisterForOnReadableLocked(
absl::AnyInvocable<void(absl::Status)> read_closure) override {
GPR_ASSERT(read_closure_ == nullptr);
read_closure_ = std::move(read_closure);
grpc_core::CSliceUnref(read_buf_);
GPR_ASSERT(!read_buf_has_data_);
read_buf_ = GRPC_SLICE_MALLOC(kReadBufferSize);
if (connect_done_) {
ContinueRegisterForOnReadableLocked();
} else {
GPR_ASSERT(pending_continue_register_for_on_readable_locked_ == false);
pending_continue_register_for_on_readable_locked_ = true;
}
}
void RegisterForOnWriteableLocked(
absl::AnyInvocable<void(absl::Status)> write_closure) override {
if (socket_type_ == SOCK_DGRAM) {
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| RegisterForOnWriteableLocked called", GetName());
} else {
GPR_ASSERT(socket_type_ == SOCK_STREAM);
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| RegisterForOnWriteableLocked called tcp_write_state_: %d "
"connect_done_: %d",
GetName(), tcp_write_state_, connect_done_);
}
GPR_ASSERT(write_closure_ == nullptr);
write_closure_ = std::move(write_closure);
if (!connect_done_) {
GPR_ASSERT(!pending_continue_register_for_on_writeable_locked_);
pending_continue_register_for_on_writeable_locked_ = true;
} else {
ContinueRegisterForOnWriteableLocked();
}
}
bool IsFdStillReadableLocked() override { return read_buf_has_data_; }
bool ShutdownLocked(absl::Status error) override {
GPR_ASSERT(!shutdown_called_);
if (!absl::IsCancelled(error)) {
return false;
}
GRPC_ARES_RESOLVER_TRACE_LOG("fd:|%s| ShutdownLocked", GetName());
shutdown_called_ = true;
// The socket is disconnected and closed here since this is an external
// cancel request, e.g. a timeout. c-ares shouldn't do anything on the
// socket after this point except calling close which should then destroy
// the GrpcPolledFdWindows object.
winsocket_->Shutdown(DEBUG_LOCATION, "GrpcPolledFdWindows::ShutdownLocked");
return true;
}
ares_socket_t GetWrappedAresSocketLocked() override {
return winsocket_->raw_socket();
}
const char* GetName() const override { return name_.c_str(); }
ares_ssize_t RecvFrom(WSAErrorContext* wsa_error_ctx, void* data,
ares_socket_t data_len, int /* flags */,
struct sockaddr* from, ares_socklen_t* from_len) {
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| RecvFrom called read_buf_has_data:%d Current read buf "
"length:|%d|",
GetName(), read_buf_has_data_, GRPC_SLICE_LENGTH(read_buf_));
if (!read_buf_has_data_) {
wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
return -1;
}
ares_ssize_t bytes_read = 0;
for (size_t i = 0; i < GRPC_SLICE_LENGTH(read_buf_) && i < data_len; i++) {
(static_cast<char*>(data))[i] = GRPC_SLICE_START_PTR(read_buf_)[i];
bytes_read++;
}
read_buf_ = grpc_slice_sub_no_ref(read_buf_, bytes_read,
GRPC_SLICE_LENGTH(read_buf_));
if (GRPC_SLICE_LENGTH(read_buf_) == 0) {
read_buf_has_data_ = false;
}
// c-ares overloads this recv_from virtual socket function to receive
// data on both UDP and TCP sockets, and from is nullptr for TCP.
if (from != nullptr) {
GPR_ASSERT(*from_len <= recv_from_source_addr_len_);
memcpy(from, &recv_from_source_addr_, recv_from_source_addr_len_);
*from_len = recv_from_source_addr_len_;
}
return bytes_read;
}
ares_ssize_t SendV(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
int iov_count) {
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| SendV called connect_done_:%d wsa_connect_error_:%d",
GetName(), connect_done_, wsa_connect_error_);
if (!connect_done_) {
wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
return -1;
}
if (wsa_connect_error_ != 0) {
wsa_error_ctx->SetWSAError(wsa_connect_error_);
return -1;
}
switch (socket_type_) {
case SOCK_DGRAM:
return SendVUDP(wsa_error_ctx, iov, iov_count);
case SOCK_STREAM:
return SendVTCP(wsa_error_ctx, iov, iov_count);
default:
abort();
}
}
int Connect(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
ares_socklen_t target_len) {
switch (socket_type_) {
case SOCK_DGRAM:
return ConnectUDP(wsa_error_ctx, target, target_len);
case SOCK_STREAM:
return ConnectTCP(wsa_error_ctx, target, target_len);
default:
grpc_core::Crash(
absl::StrFormat("Unknown socket_type_: %d", socket_type_));
}
}
private:
enum WriteState {
WRITE_IDLE,
WRITE_REQUESTED,
WRITE_PENDING,
WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY,
};
void ScheduleAndNullReadClosure(absl::Status error) {
event_engine_->Run([read_closure = std::move(read_closure_),
error]() mutable { read_closure(error); });
read_closure_ = nullptr;
}
void ScheduleAndNullWriteClosure(absl::Status error) {
event_engine_->Run([write_closure = std::move(write_closure_),
error]() mutable { write_closure(error); });
write_closure_ = nullptr;
}
void ContinueRegisterForOnReadableLocked() {
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| ContinueRegisterForOnReadableLocked "
"wsa_connect_error_:%d",
GetName(), wsa_connect_error_);
GPR_ASSERT(connect_done_);
if (wsa_connect_error_ != 0) {
ScheduleAndNullReadClosure(GRPC_WSA_ERROR(wsa_connect_error_, "connect"));
return;
}
WSABUF buffer;
buffer.buf = reinterpret_cast<char*>(GRPC_SLICE_START_PTR(read_buf_));
buffer.len = GRPC_SLICE_LENGTH(read_buf_);
recv_from_source_addr_len_ = sizeof(recv_from_source_addr_);
DWORD flags = 0;
winsocket_->NotifyOnRead(&outer_read_closure_);
if (WSARecvFrom(winsocket_->raw_socket(), &buffer, 1, nullptr, &flags,
reinterpret_cast<sockaddr*>(recv_from_source_addr_),
&recv_from_source_addr_len_,
winsocket_->read_info()->overlapped(), nullptr) != 0) {
int wsa_last_error = WSAGetLastError();
char* msg = gpr_format_message(wsa_last_error);
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| ContinueRegisterForOnReadableLocked WSARecvFrom error "
"code:|%d| "
"msg:|%s|",
GetName(), wsa_last_error, msg);
gpr_free(msg);
if (wsa_last_error != WSA_IO_PENDING) {
winsocket_->UnregisterReadCallback();
ScheduleAndNullReadClosure(
GRPC_WSA_ERROR(wsa_last_error, "WSARecvFrom"));
return;
}
}
}
void ContinueRegisterForOnWriteableLocked() {
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| ContinueRegisterForOnWriteableLocked "
"wsa_connect_error_:%d",
GetName(), wsa_connect_error_);
GPR_ASSERT(connect_done_);
if (wsa_connect_error_ != 0) {
ScheduleAndNullWriteClosure(
GRPC_WSA_ERROR(wsa_connect_error_, "connect"));
return;
}
if (socket_type_ == SOCK_DGRAM) {
ScheduleAndNullWriteClosure(absl::OkStatus());
return;
}
GPR_ASSERT(socket_type_ == SOCK_STREAM);
int wsa_error_code = 0;
switch (tcp_write_state_) {
case WRITE_IDLE:
ScheduleAndNullWriteClosure(absl::OkStatus());
break;
case WRITE_REQUESTED:
tcp_write_state_ = WRITE_PENDING;
winsocket_->NotifyOnWrite(&outer_write_closure_);
if (SendWriteBuf(nullptr, winsocket_->write_info()->overlapped(),
&wsa_error_code) != 0) {
winsocket_->UnregisterWriteCallback();
ScheduleAndNullWriteClosure(
GRPC_WSA_ERROR(wsa_error_code, "WSASend (overlapped)"));
return;
}
break;
case WRITE_PENDING:
case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
grpc_core::Crash(
absl::StrFormat("Invalid tcp_write_state_: %d", tcp_write_state_));
}
}
int SendWriteBuf(LPDWORD bytes_sent_ptr, LPWSAOVERLAPPED overlapped,
int* wsa_error_code) {
WSABUF buf;
buf.len = GRPC_SLICE_LENGTH(write_buf_);
buf.buf = reinterpret_cast<char*>(GRPC_SLICE_START_PTR(write_buf_));
DWORD flags = 0;
int out = WSASend(winsocket_->raw_socket(), &buf, 1, bytes_sent_ptr, flags,
overlapped, nullptr);
*wsa_error_code = WSAGetLastError();
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| SendWriteBuf WSASend buf.len:%d *bytes_sent_ptr:%d "
"overlapped:%p "
"return:%d *wsa_error_code:%d",
GetName(), buf.len, bytes_sent_ptr != nullptr ? *bytes_sent_ptr : 0,
overlapped, out, *wsa_error_code);
return out;
}
ares_ssize_t SendVUDP(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
int iov_count) {
// c-ares doesn't handle retryable errors on writes of UDP sockets.
// Therefore, the sendv handler for UDP sockets must only attempt
// to write everything inline.
GRPC_ARES_RESOLVER_TRACE_LOG("fd:|%s| SendVUDP called", GetName());
GPR_ASSERT(GRPC_SLICE_LENGTH(write_buf_) == 0);
grpc_core::CSliceUnref(write_buf_);
write_buf_ = FlattenIovec(iov, iov_count);
DWORD bytes_sent = 0;
int wsa_error_code = 0;
if (SendWriteBuf(&bytes_sent, nullptr, &wsa_error_code) != 0) {
grpc_core::CSliceUnref(write_buf_);
write_buf_ = grpc_empty_slice();
wsa_error_ctx->SetWSAError(wsa_error_code);
char* msg = gpr_format_message(wsa_error_code);
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| SendVUDP SendWriteBuf error code:%d msg:|%s|", GetName(),
wsa_error_code, msg);
gpr_free(msg);
return -1;
}
write_buf_ = grpc_slice_sub_no_ref(write_buf_, bytes_sent,
GRPC_SLICE_LENGTH(write_buf_));
return bytes_sent;
}
ares_ssize_t SendVTCP(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
int iov_count) {
// The "sendv" handler on TCP sockets buffers up write
// requests and returns an artificial WSAEWOULDBLOCK. Writing that buffer
// out in the background, and making further send progress in general, will
// happen as long as c-ares continues to show interest in writeability on
// this fd.
GRPC_ARES_RESOLVER_TRACE_LOG("fd:|%s| SendVTCP called tcp_write_state_:%d",
GetName(), tcp_write_state_);
switch (tcp_write_state_) {
case WRITE_IDLE:
tcp_write_state_ = WRITE_REQUESTED;
grpc_core::CSliceUnref(write_buf_);
write_buf_ = FlattenIovec(iov, iov_count);
wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
return -1;
case WRITE_REQUESTED:
case WRITE_PENDING:
wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
return -1;
case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
// c-ares is retrying a send on data that we previously returned
// WSAEWOULDBLOCK for, but then subsequently wrote out in the
// background. Right now, we assume that c-ares is retrying the same
// send again. If c-ares still needs to send even more data, we'll get
// to it eventually.
grpc_slice currently_attempted = FlattenIovec(iov, iov_count);
GPR_ASSERT(GRPC_SLICE_LENGTH(currently_attempted) >=
GRPC_SLICE_LENGTH(write_buf_));
ares_ssize_t total_sent = 0;
for (size_t i = 0; i < GRPC_SLICE_LENGTH(write_buf_); i++) {
GPR_ASSERT(GRPC_SLICE_START_PTR(currently_attempted)[i] ==
GRPC_SLICE_START_PTR(write_buf_)[i]);
total_sent++;
}
grpc_core::CSliceUnref(currently_attempted);
tcp_write_state_ = WRITE_IDLE;
return total_sent;
}
grpc_core::Crash(
absl::StrFormat("Unknown tcp_write_state_: %d", tcp_write_state_));
}
void OnTcpConnect() {
grpc_core::MutexLock lock(mu_);
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:%s InnerOnTcpConnectLocked "
"pending_register_for_readable:%d"
" pending_register_for_writeable:%d",
GetName(), pending_continue_register_for_on_readable_locked_,
pending_continue_register_for_on_writeable_locked_);
GPR_ASSERT(!connect_done_);
connect_done_ = true;
GPR_ASSERT(wsa_connect_error_ == 0);
if (shutdown_called_) {
wsa_connect_error_ = WSA_OPERATION_ABORTED;
} else {
DWORD transferred_bytes = 0;
DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(
winsocket_->raw_socket(), winsocket_->write_info()->overlapped(),
&transferred_bytes, FALSE, &flags);
GPR_ASSERT(transferred_bytes == 0);
if (!wsa_success) {
wsa_connect_error_ = WSAGetLastError();
char* msg = gpr_format_message(wsa_connect_error_);
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:%s InnerOnTcpConnectLocked WSA overlapped result code:%d "
"msg:|%s|",
GetName(), wsa_connect_error_, msg);
gpr_free(msg);
}
}
if (pending_continue_register_for_on_readable_locked_) {
ContinueRegisterForOnReadableLocked();
}
if (pending_continue_register_for_on_writeable_locked_) {
ContinueRegisterForOnWriteableLocked();
}
}
int ConnectUDP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
ares_socklen_t target_len) {
GRPC_ARES_RESOLVER_TRACE_LOG("fd:%s ConnectUDP", GetName());
GPR_ASSERT(!connect_done_);
GPR_ASSERT(wsa_connect_error_ == 0);
SOCKET s = winsocket_->raw_socket();
int out =
WSAConnect(s, target, target_len, nullptr, nullptr, nullptr, nullptr);
wsa_connect_error_ = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_connect_error_);
connect_done_ = true;
char* msg = gpr_format_message(wsa_connect_error_);
GRPC_ARES_RESOLVER_TRACE_LOG("fd:%s WSAConnect error code:|%d| msg:|%s|",
GetName(), wsa_connect_error_, msg);
gpr_free(msg);
// c-ares expects a posix-style connect API
return out == 0 ? 0 : -1;
}
int ConnectTCP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
ares_socklen_t target_len) {
GRPC_ARES_RESOLVER_TRACE_LOG("fd:%s ConnectTCP", GetName());
LPFN_CONNECTEX ConnectEx;
GUID guid = WSAID_CONNECTEX;
DWORD ioctl_num_bytes;
SOCKET s = winsocket_->raw_socket();
if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
&ConnectEx, sizeof(ConnectEx), &ioctl_num_bytes, nullptr,
nullptr) != 0) {
int wsa_last_error = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_last_error);
char* msg = gpr_format_message(wsa_last_error);
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:%s WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:%d "
"msg:|%s|",
GetName(), wsa_last_error, msg);
gpr_free(msg);
connect_done_ = true;
wsa_connect_error_ = wsa_last_error;
return -1;
}
grpc_resolved_address wildcard4_addr;
grpc_resolved_address wildcard6_addr;
grpc_sockaddr_make_wildcards(0, &wildcard4_addr, &wildcard6_addr);
grpc_resolved_address* local_address = nullptr;
if (address_family_ == AF_INET) {
local_address = &wildcard4_addr;
} else {
local_address = &wildcard6_addr;
}
if (bind(s, reinterpret_cast<struct sockaddr*>(local_address->addr),
static_cast<int>(local_address->len)) != 0) {
int wsa_last_error = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_last_error);
char* msg = gpr_format_message(wsa_last_error);
GRPC_ARES_RESOLVER_TRACE_LOG("fd:%s bind error code:%d msg:|%s|",
GetName(), wsa_last_error, msg);
gpr_free(msg);
connect_done_ = true;
wsa_connect_error_ = wsa_last_error;
return -1;
}
int out = 0;
// Register an async OnTcpConnect callback here since it is required by the
// WinSocket API.
winsocket_->NotifyOnWrite(&on_tcp_connect_locked_);
if (ConnectEx(s, target, target_len, nullptr, 0, nullptr,
winsocket_->write_info()->overlapped()) == 0) {
out = -1;
int wsa_last_error = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_last_error);
char* msg = gpr_format_message(wsa_last_error);
GRPC_ARES_RESOLVER_TRACE_LOG("fd:%s ConnectEx error code:%d msg:|%s|",
GetName(), wsa_last_error, msg);
gpr_free(msg);
if (wsa_last_error == WSA_IO_PENDING) {
// c-ares only understands WSAEINPROGRESS and EWOULDBLOCK error codes on
// connect, but an async connect on IOCP socket will give
// WSA_IO_PENDING, so we need to convert.
wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
} else {
winsocket_->UnregisterWriteCallback();
// By returning a non-retryable error to c-ares at this point,
// we're aborting the possibility of any future operations on this fd.
connect_done_ = true;
wsa_connect_error_ = wsa_last_error;
return -1;
}
}
return out;
}
// TODO(apolcyn): improve this error handling to be less conversative.
// An e.g. ECONNRESET error here should result in errors when
// c-ares reads from this socket later, but it shouldn't necessarily cancel
// the entire resolution attempt. Doing so will allow the "inject broken
// nameserver list" test to pass on Windows.
void OnIocpReadable() {
grpc_core::MutexLock lock(mu_);
absl::Status error;
if (winsocket_->read_info()->result().wsa_error != 0) {
// WSAEMSGSIZE would be due to receiving more data
// than our read buffer's fixed capacity. Assume that
// the connection is TCP and read the leftovers
// in subsequent c-ares reads.
if (winsocket_->read_info()->result().wsa_error != WSAEMSGSIZE) {
error = GRPC_WSA_ERROR(winsocket_->read_info()->result().wsa_error,
"OnIocpReadableInner");
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| OnIocpReadableInner winsocket_->read_info.wsa_error "
"code:|%d| msg:|%s|",
GetName(), winsocket_->read_info()->result().wsa_error,
grpc_core::StatusToString(error).c_str());
}
}
if (error.ok()) {
read_buf_ = grpc_slice_sub_no_ref(
read_buf_, 0, winsocket_->read_info()->result().bytes_transferred);
read_buf_has_data_ = true;
} else {
grpc_core::CSliceUnref(read_buf_);
read_buf_ = grpc_empty_slice();
}
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| OnIocpReadable finishing. read buf length now:|%d|", GetName(),
GRPC_SLICE_LENGTH(read_buf_));
ScheduleAndNullReadClosure(error);
}
void OnIocpWriteable() {
grpc_core::MutexLock lock(mu_);
GRPC_ARES_RESOLVER_TRACE_LOG("OnIocpWriteableInner. fd:|%s|", GetName());
GPR_ASSERT(socket_type_ == SOCK_STREAM);
absl::Status error;
if (winsocket_->write_info()->result().wsa_error != 0) {
error = GRPC_WSA_ERROR(winsocket_->write_info()->result().wsa_error,
"OnIocpWriteableInner");
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| OnIocpWriteableInner. winsocket_->write_info.wsa_error "
"code:|%d| msg:|%s|",
GetName(), winsocket_->write_info()->result().wsa_error,
grpc_core::StatusToString(error).c_str());
}
GPR_ASSERT(tcp_write_state_ == WRITE_PENDING);
if (error.ok()) {
tcp_write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY;
write_buf_ = grpc_slice_sub_no_ref(
write_buf_, 0, winsocket_->write_info()->result().bytes_transferred);
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| OnIocpWriteableInner. bytes transferred:%d", GetName(),
winsocket_->write_info()->result().bytes_transferred);
} else {
grpc_core::CSliceUnref(write_buf_);
write_buf_ = grpc_empty_slice();
}
ScheduleAndNullWriteClosure(error);
}
const std::string name_;
const int address_family_;
const int socket_type_;
grpc_core::Mutex* mu_;
std::unique_ptr<WinSocket> winsocket_;
char recv_from_source_addr_[kRecvFromSourceAddrSize];
ares_socklen_t recv_from_source_addr_len_;
grpc_slice read_buf_;
bool read_buf_has_data_ = false;
grpc_slice write_buf_;
absl::AnyInvocable<void(absl::Status)> read_closure_;
absl::AnyInvocable<void(absl::Status)> write_closure_;
AnyInvocableClosure outer_read_closure_;
AnyInvocableClosure outer_write_closure_;
bool shutdown_called_ = false;
// State related to TCP sockets
AnyInvocableClosure on_tcp_connect_locked_;
bool connect_done_ = false;
int wsa_connect_error_ = 0;
WriteState tcp_write_state_ = WRITE_IDLE;
// We don't run register_for_{readable,writeable} logic until
// a socket is connected. In the interim, we queue readable/writeable
// registrations with the following state.
bool pending_continue_register_for_on_readable_locked_ = false;
bool pending_continue_register_for_on_writeable_locked_ = false;
// This pointer is initialized from the stored pointer inside the shared
// pointer owned by the AresResolver and should be valid at the time of use.
EventEngine* event_engine_;
};
// These virtual socket functions are called from within the c-ares
// library. These methods generally dispatch those socket calls to the
// appropriate methods. The virtual "socket" and "close" methods are
// special and instead create/add and remove/destroy GrpcPolledFdWindows
// objects.
class CustomSockFuncs {
public:
static ares_socket_t Socket(int af, int type, int protocol, void* user_data) {
if (type != SOCK_DGRAM && type != SOCK_STREAM) {
GRPC_ARES_RESOLVER_TRACE_LOG("Socket called with invalid socket type:%d",
type);
return INVALID_SOCKET;
}
GrpcPolledFdFactoryWindows* self =
static_cast<GrpcPolledFdFactoryWindows*>(user_data);
SOCKET s = WSASocket(af, type, protocol, nullptr, 0,
IOCP::GetDefaultSocketFlags());
if (s == INVALID_SOCKET) {
GRPC_ARES_RESOLVER_TRACE_LOG(
"WSASocket failed with params af:%d type:%d protocol:%d", af, type,
protocol);
return INVALID_SOCKET;
}
if (type == SOCK_STREAM) {
absl::Status error = PrepareSocket(s);
if (!error.ok()) {
GRPC_ARES_RESOLVER_TRACE_LOG("WSAIoctl failed with error: %s",
grpc_core::StatusToString(error).c_str());
return INVALID_SOCKET;
}
}
auto polled_fd = std::make_unique<GrpcPolledFdWindows>(
self->iocp_->Watch(s), self->mu_, af, type, self->event_engine_);
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| created with params af:%d type:%d protocol:%d",
polled_fd->GetName(), af, type, protocol);
GPR_ASSERT(self->sockets_.insert({s, std::move(polled_fd)}).second);
return s;
}
static int Connect(ares_socket_t as, const struct sockaddr* target,
ares_socklen_t target_len, void* user_data) {
WSAErrorContext wsa_error_ctx;
GrpcPolledFdFactoryWindows* self =
static_cast<GrpcPolledFdFactoryWindows*>(user_data);
auto it = self->sockets_.find(as);
GPR_ASSERT(it != self->sockets_.end());
return it->second->Connect(&wsa_error_ctx, target, target_len);
}
static ares_ssize_t SendV(ares_socket_t as, const struct iovec* iov,
int iovec_count, void* user_data) {
WSAErrorContext wsa_error_ctx;
GrpcPolledFdFactoryWindows* self =
static_cast<GrpcPolledFdFactoryWindows*>(user_data);
auto it = self->sockets_.find(as);
GPR_ASSERT(it != self->sockets_.end());
return it->second->SendV(&wsa_error_ctx, iov, iovec_count);
}
static ares_ssize_t RecvFrom(ares_socket_t as, void* data, size_t data_len,
int flags, struct sockaddr* from,
ares_socklen_t* from_len, void* user_data) {
WSAErrorContext wsa_error_ctx;
GrpcPolledFdFactoryWindows* self =
static_cast<GrpcPolledFdFactoryWindows*>(user_data);
auto it = self->sockets_.find(as);
GPR_ASSERT(it != self->sockets_.end());
return it->second->RecvFrom(&wsa_error_ctx, data, data_len, flags, from,
from_len);
}
static int CloseSocket(SOCKET s, void*) {
GRPC_ARES_RESOLVER_TRACE_LOG("c-ares socket: %d CloseSocket", s);
return 0;
}
};
// Adapter to hold the ownership of GrpcPolledFdWindows internally.
class GrpcPolledFdWrapper : public GrpcPolledFd {
public:
explicit GrpcPolledFdWrapper(GrpcPolledFdWindows* polled_fd)
: polled_fd_(polled_fd) {}
void RegisterForOnReadableLocked(
absl::AnyInvocable<void(absl::Status)> read_closure) override {
polled_fd_->RegisterForOnReadableLocked(std::move(read_closure));
}
void RegisterForOnWriteableLocked(
absl::AnyInvocable<void(absl::Status)> write_closure) override {
polled_fd_->RegisterForOnWriteableLocked(std::move(write_closure));
}
bool IsFdStillReadableLocked() override {
return polled_fd_->IsFdStillReadableLocked();
}
bool ShutdownLocked(absl::Status error) override {
return polled_fd_->ShutdownLocked(error);
}
ares_socket_t GetWrappedAresSocketLocked() override {
return polled_fd_->GetWrappedAresSocketLocked();
}
const char* GetName() const override { return polled_fd_->GetName(); }
private:
GrpcPolledFdWindows* polled_fd_;
};
GrpcPolledFdFactoryWindows::GrpcPolledFdFactoryWindows(IOCP* iocp)
: iocp_(iocp) {}
GrpcPolledFdFactoryWindows::~GrpcPolledFdFactoryWindows() {}
void GrpcPolledFdFactoryWindows::Initialize(grpc_core::Mutex* mutex,
EventEngine* event_engine) {
mu_ = mutex;
event_engine_ = event_engine;
}
std::unique_ptr<GrpcPolledFd> GrpcPolledFdFactoryWindows::NewGrpcPolledFdLocked(
ares_socket_t as) {
auto it = sockets_.find(as);
GPR_ASSERT(it != sockets_.end());
return std::make_unique<GrpcPolledFdWrapper>(it->second.get());
}
void GrpcPolledFdFactoryWindows::ConfigureAresChannelLocked(
ares_channel channel) {
static const struct ares_socket_functions kCustomSockFuncs = {
/*asocket=*/&CustomSockFuncs::Socket,
/*aclose=*/&CustomSockFuncs::CloseSocket,
/*aconnect=*/&CustomSockFuncs::Connect,
/*arecvfrom=*/&CustomSockFuncs::RecvFrom,
/*asendv=*/&CustomSockFuncs::SendV,
};
ares_set_socket_functions(channel, &kCustomSockFuncs, this);
}
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)

@ -0,0 +1,75 @@
// Copyright 2023 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_GRPC_POLLED_FD_WINDOWS_H
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_GRPC_POLLED_FD_WINDOWS_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h" // IWYU pragma: keep
#if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
#include <memory>
#include <ares.h>
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/common_closures.h"
#include "src/core/lib/event_engine/grpc_polled_fd.h"
#include "src/core/lib/event_engine/windows/iocp.h"
#include "src/core/lib/event_engine/windows/win_socket.h"
#include "src/core/lib/gprpp/sync.h"
struct iovec;
namespace grpc_event_engine {
namespace experimental {
class GrpcPolledFdWindows;
class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory {
public:
explicit GrpcPolledFdFactoryWindows(IOCP* iocp);
~GrpcPolledFdFactoryWindows() override;
void Initialize(grpc_core::Mutex* mutex, EventEngine* event_engine) override;
std::unique_ptr<GrpcPolledFd> NewGrpcPolledFdLocked(
ares_socket_t as) override;
void ConfigureAresChannelLocked(ares_channel channel) override;
private:
friend class CustomSockFuncs;
// The mutex is owned by the AresResolver which owns this object.
grpc_core::Mutex* mu_;
// The IOCP object is owned by the WindowsEngine whose ownership is shared by
// the AresResolver.
IOCP* iocp_;
// This pointer is initialized from the stored pointer inside the shared
// pointer owned by the AresResolver which owns this object.
EventEngine* event_engine_;
std::map<SOCKET, std::unique_ptr<GrpcPolledFdWindows>> sockets_;
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_GRPC_POLLED_FD_WINDOWS_H

@ -35,6 +35,7 @@
#include "src/core/lib/event_engine/thread_pool/thread_pool.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/event_engine/utils.h"
#include "src/core/lib/event_engine/windows/grpc_polled_fd_windows.h"
#include "src/core/lib/event_engine/windows/iocp.h"
#include "src/core/lib/event_engine/windows/windows_endpoint.h"
#include "src/core/lib/event_engine/windows/windows_engine.h"
@ -194,10 +195,49 @@ EventEngine::TaskHandle WindowsEventEngine::RunAfterInternal(
return handle;
}
#if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
WindowsEventEngine::WindowsDNSResolver::WindowsDNSResolver(
grpc_core::OrphanablePtr<AresResolver> ares_resolver)
: ares_resolver_(std::move(ares_resolver)) {}
void WindowsEventEngine::WindowsDNSResolver::LookupHostname(
LookupHostnameCallback on_resolve, absl::string_view name,
absl::string_view default_port) {
ares_resolver_->LookupHostname(name, default_port, std::move(on_resolve));
}
void WindowsEventEngine::WindowsDNSResolver::LookupSRV(
LookupSRVCallback on_resolve, absl::string_view name) {
ares_resolver_->LookupSRV(name, std::move(on_resolve));
}
void WindowsEventEngine::WindowsDNSResolver::LookupTXT(
LookupTXTCallback on_resolve, absl::string_view name) {
ares_resolver_->LookupTXT(name, std::move(on_resolve));
}
#endif // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
absl::StatusOr<std::unique_ptr<EventEngine::DNSResolver>>
WindowsEventEngine::GetDNSResolver(
EventEngine::DNSResolver::ResolverOptions const& /*options*/) {
EventEngine::DNSResolver::ResolverOptions const& options) {
#if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
auto ares_resolver = AresResolver::CreateAresResolver(
options.dns_server,
std::make_unique<GrpcPolledFdFactoryWindows>(poller()),
shared_from_this());
if (!ares_resolver.ok()) {
return ares_resolver.status();
}
return std::make_unique<WindowsEventEngine::WindowsDNSResolver>(
std::move(*ares_resolver));
#else // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
// TODO(yijiem): Implement a basic A/AAAA-only native resolver in
// WindowsEventEngine.
(void)options;
grpc_core::Crash("unimplemented");
#endif // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
}
bool WindowsEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }

@ -13,8 +13,11 @@
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENGINE_H
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENGINE_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h" // IWYU pragma: keep
#ifdef GPR_WINDOWS
#include <memory>
@ -28,6 +31,7 @@
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/slice_buffer.h>
#include "src/core/lib/event_engine/ares_resolver.h"
#include "src/core/lib/event_engine/handle_containers.h"
#include "src/core/lib/event_engine/posix_engine/timer_manager.h"
#include "src/core/lib/event_engine/thread_pool/thread_pool.h"
@ -47,7 +51,11 @@ class WindowsEventEngine : public EventEngine,
public:
class WindowsDNSResolver : public EventEngine::DNSResolver {
public:
~WindowsDNSResolver() override;
WindowsDNSResolver() = delete;
#if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
explicit WindowsDNSResolver(
grpc_core::OrphanablePtr<AresResolver> ares_resolver);
#endif // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
void LookupHostname(LookupHostnameCallback on_resolve,
absl::string_view name,
absl::string_view default_port) override;
@ -55,6 +63,11 @@ class WindowsEventEngine : public EventEngine,
absl::string_view name) override;
void LookupTXT(LookupTXTCallback on_resolve,
absl::string_view name) override;
#if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
private:
grpc_core::OrphanablePtr<AresResolver> ares_resolver_;
#endif // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
};
WindowsEventEngine();

@ -539,6 +539,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/event_engine/time_util.cc',
'src/core/lib/event_engine/trace.cc',
'src/core/lib/event_engine/utils.cc',
'src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc',
'src/core/lib/event_engine/windows/iocp.cc',
'src/core/lib/event_engine/windows/win_socket.cc',
'src/core/lib/event_engine/windows/windows_endpoint.cc',

@ -88,6 +88,7 @@ grpc_cc_test(
"//src/core:windows_event_engine",
"//test/core/event_engine:event_engine_test_utils",
"//test/core/event_engine/test_suite/tests:client",
"//test/core/event_engine/test_suite/tests:dns",
"//test/core/event_engine/test_suite/tests:server",
"//test/core/event_engine/test_suite/tests:timer",
],

@ -76,6 +76,7 @@ grpc_cc_library(
"//test/core/util:grpc_test_util_base",
"//test/cpp/util:get_grpc_test_runfile_dir",
"//test/cpp/util:test_util",
"//test/cpp/util/windows:manifest_file",
],
alwayslink = 1,
)

@ -15,6 +15,8 @@
// IWYU pragma: no_include <ratio>
// IWYU pragma: no_include <arpa/inet.h>
#include <grpc/support/port_platform.h>
#include <cstdlib>
#include <cstring>
#include <memory>
@ -34,6 +36,7 @@
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/crash.h" // IWYU pragma: keep
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "test/core/event_engine/test_suite/event_engine_test_framework.h"
@ -42,6 +45,10 @@
#include "test/cpp/util/get_grpc_test_runfile_dir.h"
#include "test/cpp/util/subprocess.h"
#ifdef GPR_WINDOWS
#include "test/cpp/util/windows/manifest_file.h"
#endif // GPR_WINDOWS
namespace grpc_event_engine {
namespace experimental {
@ -50,13 +57,6 @@ void InitDNSTests() {}
} // namespace experimental
} // namespace grpc_event_engine
#ifdef GPR_WINDOWS
class EventEngineDNSTest : public EventEngineTest {};
// TODO(yijiem): make the test run on Windows
TEST_F(EventEngineDNSTest, TODO) {}
#else
namespace {
using grpc_event_engine::experimental::EventEngine;
@ -109,13 +109,36 @@ class EventEngineDNSTest : public EventEngineTest {
std::string health_check_path = kHealthCheckRelPath;
absl::optional<std::string> runfile_dir = grpc::GetGrpcTestRunFileDir();
if (runfile_dir.has_value()) {
// We sure need a portable filesystem lib for this to work on Windows.
test_records_path = absl::StrJoin({*runfile_dir, test_records_path}, "/");
dns_server_path = absl::StrJoin({*runfile_dir, dns_server_path}, "/");
dns_resolver_path = absl::StrJoin({*runfile_dir, dns_resolver_path}, "/");
tcp_connect_path = absl::StrJoin({*runfile_dir, tcp_connect_path}, "/");
health_check_path = absl::StrJoin({*runfile_dir, health_check_path}, "/");
#ifdef GPR_WINDOWS
// TODO(yijiem): Misusing the GRPC_PORT_ISOLATED_RUNTIME preprocessor symbol as
// an indication whether the test is running on RBE or not. Find a better way of
// doing this.
#ifndef GRPC_PORT_ISOLATED_RUNTIME
gpr_log(GPR_ERROR,
"You are invoking the test locally with Bazel, you may need to "
"invoke Bazel with --enable_runfiles=yes.");
#endif // GRPC_PORT_ISOLATED_RUNTIME
test_records_path = grpc::testing::NormalizeFilePath(test_records_path);
dns_server_path =
grpc::testing::NormalizeFilePath(dns_server_path + ".exe");
dns_resolver_path =
grpc::testing::NormalizeFilePath(dns_resolver_path + ".exe");
tcp_connect_path =
grpc::testing::NormalizeFilePath(tcp_connect_path + ".exe");
health_check_path =
grpc::testing::NormalizeFilePath(health_check_path + ".exe");
#endif // GPR_WINDOWS
} else {
#ifdef GPR_WINDOWS
grpc_core::Crash(
"The EventEngineDNSTest does not support running without Bazel on "
"Windows for now.");
#endif // GPR_WINDOWS
// Invoke the .py scripts directly where they are in source code if we are
// not running with bazel.
dns_server_path += ".py";
@ -141,8 +164,11 @@ class EventEngineDNSTest : public EventEngineTest {
tcp_connect_path,
});
int status = health_check.Join();
// TODO(yijiem): make this portable for Windows
#ifdef GPR_WINDOWS
ASSERT_EQ(status, 0);
#else
ASSERT_TRUE(WIFEXITED(status) && WEXITSTATUS(status) == 0);
#endif // GPR_WINDOWS
#endif // GRPC_IOS_EVENT_ENGINE_CLIENT
}
@ -546,5 +572,3 @@ TEST_F(EventEngineDNSTest, UnparseableHostPortsBadLocalhostWithPort) {
&dns_resolver_signal_, "[localhost]:1");
}
// END
#endif // GPR_WINDOWS

@ -25,7 +25,7 @@ def test_runner_log(msg):
def python_args(arg_list):
if platform.system() == "Windows":
if platform.system() == "Windows" and arg_list[0].endswith(".py"):
return [sys.executable] + arg_list
return arg_list

@ -2169,6 +2169,8 @@ src/core/lib/event_engine/trace.cc \
src/core/lib/event_engine/trace.h \
src/core/lib/event_engine/utils.cc \
src/core/lib/event_engine/utils.h \
src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc \
src/core/lib/event_engine/windows/grpc_polled_fd_windows.h \
src/core/lib/event_engine/windows/iocp.cc \
src/core/lib/event_engine/windows/iocp.h \
src/core/lib/event_engine/windows/win_socket.cc \

@ -1945,6 +1945,8 @@ src/core/lib/event_engine/trace.cc \
src/core/lib/event_engine/trace.h \
src/core/lib/event_engine/utils.cc \
src/core/lib/event_engine/utils.h \
src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc \
src/core/lib/event_engine/windows/grpc_polled_fd_windows.h \
src/core/lib/event_engine/windows/iocp.cc \
src/core/lib/event_engine/windows/iocp.h \
src/core/lib/event_engine/windows/win_socket.cc \

Loading…
Cancel
Save