From a58f83c947be52d4b622aa0d32435323b81a8e5e Mon Sep 17 00:00:00 2001 From: youyuanwu <48816116+youyuanwu@users.noreply.github.com> Date: Wed, 14 Feb 2024 13:14:04 -0800 Subject: [PATCH] [EventEngine] Support AF_UNIX for windows (#34801) #22285 Unix domain socket has been added to windows: [AF_UNIX comes to Windows](https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/) Golang net pkg has adopted this long ago: https://go-review.googlesource.com/c/go/+/125456 https://go-review.googlesource.com/c/sys/+/132555 grpc-go already support this. AF_UNIX on windows is seamlessly integrated with winsock API. The modification needed are: * Set the right address family AF_UNIX depending on the address config, instead of using AF_INET6 all the time. * Ignore socket options for tcp. Closes #34801 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/34801 from youyuanwu:dev 3d9b5c097bc6394f113be4e55173a4e8d4d85e9d PiperOrigin-RevId: 607083164 --- .../binder/client/binder_connector.cc | 7 ++ src/core/lib/address_utils/parse_address.cc | 9 ++- src/core/lib/address_utils/sockaddr_utils.cc | 9 ++- .../posix_engine/tcp_socket_utils.cc | 7 ++ src/core/lib/event_engine/tcp_socket_utils.cc | 7 ++ .../lib/event_engine/windows/win_socket.cc | 4 ++ .../lib/event_engine/windows/win_socket.h | 3 + .../event_engine/windows/windows_engine.cc | 22 +++++- .../event_engine/windows/windows_listener.cc | 58 ++++++++++++++-- .../event_engine/windows/windows_listener.h | 14 ++++ src/core/lib/iomgr/port.h | 3 + src/core/lib/iomgr/tcp_client_windows.cc | 29 ++++++-- src/core/lib/iomgr/tcp_server_windows.cc | 69 ++++++++++++++++--- src/core/lib/iomgr/unix_sockets_posix.cc | 13 ++++ src/core/resolver/binder/binder_resolver.cc | 7 ++ test/core/address_utils/parse_address_test.cc | 9 ++- .../core/address_utils/sockaddr_utils_test.cc | 9 ++- .../resolvers/binder_resolver_test.cc | 9 ++- test/core/end2end/end2end_test_suites.cc | 40 ++++++++--- test/core/end2end/tests/default_host.cc | 3 +- .../event_engine/tcp_socket_utils_test.cc | 9 ++- tools/run_tests/run_tests.py | 4 ++ 22 files changed, 306 insertions(+), 38 deletions(-) diff --git a/src/core/ext/transport/binder/client/binder_connector.cc b/src/core/ext/transport/binder/client/binder_connector.cc index c948e372ad5..0c3db89927c 100644 --- a/src/core/ext/transport/binder/client/binder_connector.cc +++ b/src/core/ext/transport/binder/client/binder_connector.cc @@ -20,7 +20,14 @@ #include "src/core/lib/iomgr/port.h" #ifdef GRPC_HAVE_UNIX_SOCKET +#ifdef GPR_WINDOWS +// clang-format off +#include +#include +// clang-format on +#else #include +#endif // GPR_WINDOWS #endif #include diff --git a/src/core/lib/address_utils/parse_address.cc b/src/core/lib/address_utils/parse_address.cc index deb2b079723..4806d170988 100644 --- a/src/core/lib/address_utils/parse_address.cc +++ b/src/core/lib/address_utils/parse_address.cc @@ -30,8 +30,15 @@ #include #include #ifdef GRPC_HAVE_UNIX_SOCKET +#ifdef GPR_WINDOWS +// clang-format off +#include +#include +// clang-format on +#else #include -#endif +#endif // GPR_WINDOWS +#endif // GRPC_HAVE_UNIX_SOCKET #include #include "absl/status/status.h" diff --git a/src/core/lib/address_utils/sockaddr_utils.cc b/src/core/lib/address_utils/sockaddr_utils.cc index a1442afe5da..c82efe927a1 100644 --- a/src/core/lib/address_utils/sockaddr_utils.cc +++ b/src/core/lib/address_utils/sockaddr_utils.cc @@ -44,8 +44,15 @@ #include "src/core/lib/uri/uri_parser.h" #ifdef GRPC_HAVE_UNIX_SOCKET +#ifdef GPR_WINDOWS +// clang-format off +#include +#include +// clang-format on +#else #include -#endif +#endif // GPR_WINDOWS +#endif // GRPC_HAVE_UNIX_SOCKET #ifdef GRPC_HAVE_UNIX_SOCKET static absl::StatusOr grpc_sockaddr_to_uri_unix_if_possible( diff --git a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc index 76e1b0a57e1..ab98713b62a 100644 --- a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc +++ b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc @@ -58,8 +58,15 @@ #include "src/core/lib/gprpp/strerror.h" #ifdef GRPC_HAVE_UNIX_SOCKET +#ifdef GPR_WINDOWS +// clang-format off +#include +#include +// clang-format on +#else #include // IWYU pragma: keep #include +#endif // GPR_WINDOWS #endif namespace grpc_event_engine { diff --git a/src/core/lib/event_engine/tcp_socket_utils.cc b/src/core/lib/event_engine/tcp_socket_utils.cc index a5fa15223d1..c3bd35fc905 100644 --- a/src/core/lib/event_engine/tcp_socket_utils.cc +++ b/src/core/lib/event_engine/tcp_socket_utils.cc @@ -32,8 +32,15 @@ #endif // GRPC_POSIX_SOCKET_UTILS_COMMON #ifdef GRPC_HAVE_UNIX_SOCKET +#ifdef GPR_WINDOWS +// clang-format off +#include +#include +// clang-format on +#else #include // IWYU pragma: keep #include +#endif // GPR_WINDOWS #endif #ifdef GRPC_HAVE_VSOCK diff --git a/src/core/lib/event_engine/windows/win_socket.cc b/src/core/lib/event_engine/windows/win_socket.cc index 22ee56650d2..da919e2ba7c 100644 --- a/src/core/lib/event_engine/windows/win_socket.cc +++ b/src/core/lib/event_engine/windows/win_socket.cc @@ -208,6 +208,10 @@ static grpc_error_handle enable_socket_low_latency(SOCKET sock) { } // namespace +absl::Status SetSocketNonBlock(SOCKET sock) { + return grpc_tcp_set_non_block(sock); +} + absl::Status PrepareSocket(SOCKET sock) { absl::Status err; err = grpc_tcp_set_non_block(sock); diff --git a/src/core/lib/event_engine/windows/win_socket.h b/src/core/lib/event_engine/windows/win_socket.h index bc8e8061c81..c6d52f41def 100644 --- a/src/core/lib/event_engine/windows/win_socket.h +++ b/src/core/lib/event_engine/windows/win_socket.h @@ -133,6 +133,9 @@ class WinSocket { // Attempt to configure default socket settings absl::Status PrepareSocket(SOCKET sock); +// Set non block option for socket. +absl::Status SetSocketNonBlock(SOCKET sock); + } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/windows/windows_engine.cc b/src/core/lib/event_engine/windows/windows_engine.cc index 5503f3fc85b..f4431ae4ef4 100644 --- a/src/core/lib/event_engine/windows/windows_engine.cc +++ b/src/core/lib/event_engine/windows/windows_engine.cc @@ -306,7 +306,10 @@ EventEngine::ConnectionHandle WindowsEventEngine::Connect( if (ResolvedAddressToV4Mapped(addr, &addr6_v4mapped)) { address = addr6_v4mapped; } - SOCKET sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, nullptr, 0, + const int addr_family = + (address.address()->sa_family == AF_UNIX) ? AF_UNIX : AF_INET6; + const int protocol = addr_family == AF_UNIX ? 0 : IPPROTO_TCP; + SOCKET sock = WSASocket(addr_family, SOCK_STREAM, protocol, nullptr, 0, IOCP::GetDefaultSocketFlags()); if (sock == INVALID_SOCKET) { Run([on_connect = std::move(on_connect), @@ -315,7 +318,11 @@ EventEngine::ConnectionHandle WindowsEventEngine::Connect( }); return EventEngine::ConnectionHandle::kInvalid; } - status = PrepareSocket(sock); + if (addr_family == AF_UNIX) { + status = SetSocketNonBlock(sock); + } else { + status = PrepareSocket(sock); + } if (!status.ok()) { Run([on_connect = std::move(on_connect), status]() mutable { on_connect(status); @@ -340,7 +347,16 @@ EventEngine::ConnectionHandle WindowsEventEngine::Connect( return EventEngine::ConnectionHandle::kInvalid; } // bind the local address - auto local_address = ResolvedAddressMakeWild6(0); + ResolvedAddress local_address; + if (addr_family == AF_UNIX) { + // For ConnectEx() to work for AF_UNIX, the sock needs to be bound to + // the local address of an unnamed socket. + sockaddr addr = {}; + addr.sa_family = AF_UNIX; + local_address = ResolvedAddress(&addr, sizeof(addr)); + } else { + local_address = ResolvedAddressMakeWild6(0); + } istatus = bind(sock, local_address.address(), local_address.size()); if (istatus != 0) { Run([on_connect = std::move(on_connect), diff --git a/src/core/lib/event_engine/windows/windows_listener.cc b/src/core/lib/event_engine/windows/windows_listener.cc index 71116a63326..d74cfc0e8bc 100644 --- a/src/core/lib/event_engine/windows/windows_listener.cc +++ b/src/core/lib/event_engine/windows/windows_listener.cc @@ -27,6 +27,7 @@ #include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/port.h" namespace grpc_event_engine { namespace experimental { @@ -67,11 +68,39 @@ void WindowsEventEngineListener::SinglePortSocketListener:: // ---- SinglePortSocketListener ---- +// TODO(hork): This may be refactored to share with posix engine. +void UnlinkIfUnixDomainSocket( + const EventEngine::ResolvedAddress& resolved_addr) { +#ifdef GRPC_HAVE_UNIX_SOCKET + if (resolved_addr.address()->sa_family != AF_UNIX) { + return; + } + struct sockaddr_un* un = reinterpret_cast( + const_cast(resolved_addr.address())); + // There is nothing to unlink for an abstract unix socket. + if (un->sun_path[0] == '\0' && un->sun_path[1] != '\0') { + return; + } + // For windows we need to remove the file instead of unlink. + DWORD attr = ::GetFileAttributesA(un->sun_path); + if (attr == INVALID_FILE_ATTRIBUTES) { + return; + } + if (attr & FILE_ATTRIBUTE_DIRECTORY || attr & FILE_ATTRIBUTE_READONLY) { + return; + } + ::DeleteFileA(un->sun_path); +#else + (void)resolved_addr; +#endif +} + WindowsEventEngineListener::SinglePortSocketListener:: ~SinglePortSocketListener() { grpc_core::MutexLock lock(&io_state_->mu); io_state_->listener_socket->Shutdown(DEBUG_LOCATION, "~SinglePortSocketListener"); + UnlinkIfUnixDomainSocket(listener_sockname()); GRPC_EVENT_ENGINE_TRACE("~SinglePortSocketListener::%p", this); } @@ -109,7 +138,11 @@ absl::Status WindowsEventEngineListener::SinglePortSocketListener::Start() { absl::Status WindowsEventEngineListener::SinglePortSocketListener::StartLocked() { - SOCKET accept_socket = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0, + const EventEngine::ResolvedAddress addr = listener_sockname(); + const int addr_family = + (addr.address()->sa_family == AF_UNIX) ? AF_UNIX : AF_INET6; + const int protocol = addr_family == AF_UNIX ? 0 : IPPROTO_TCP; + SOCKET accept_socket = WSASocket(addr_family, SOCK_STREAM, protocol, NULL, 0, IOCP::GetDefaultSocketFlags()); if (accept_socket == INVALID_SOCKET) { return GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket"); @@ -118,11 +151,17 @@ WindowsEventEngineListener::SinglePortSocketListener::StartLocked() { if (accept_socket != INVALID_SOCKET) closesocket(accept_socket); return error; }; - auto error = PrepareSocket(accept_socket); + absl::Status error; + if (addr_family == AF_UNIX) { + error = SetSocketNonBlock(accept_socket); + } else { + error = PrepareSocket(accept_socket); + } if (!error.ok()) return fail(error); // Start the "accept" asynchronously. io_state_->listener_socket->NotifyOnRead(&io_state_->on_accept_cb); - DWORD addrlen = sizeof(sockaddr_in6) + 16; + DWORD addrlen = + sizeof(addresses_) / 2; // half of the buffer is for remote addr. DWORD bytes_received = 0; int success = AcceptEx(io_state_->listener_socket->raw_socket(), accept_socket, @@ -238,8 +277,14 @@ WindowsEventEngineListener::SinglePortSocketListener::PrepareListenerSocket( if (sock != INVALID_SOCKET) closesocket(sock); return error; }; - auto error = PrepareSocket(sock); + absl::Status error; + if (addr.address()->sa_family == AF_UNIX) { + error = SetSocketNonBlock(sock); + } else { + error = PrepareSocket(sock); + } if (!error.ok()) return fail(error); + UnlinkIfUnixDomainSocket(addr); if (bind(sock, addr.address(), addr.size()) == SOCKET_ERROR) { return fail(GRPC_WSA_ERROR(WSAGetLastError(), "bind")); } @@ -313,7 +358,10 @@ absl::StatusOr WindowsEventEngineListener::Bind( out_addr = ResolvedAddressMakeWild6(out_port); } // open the socket - SOCKET sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, nullptr, 0, + const int addr_family = + (out_addr.address()->sa_family == AF_UNIX) ? AF_UNIX : AF_INET6; + const int protocol = addr_family == AF_UNIX ? 0 : IPPROTO_TCP; + SOCKET sock = WSASocket(addr_family, SOCK_STREAM, protocol, nullptr, 0, IOCP::GetDefaultSocketFlags()); if (sock == INVALID_SOCKET) { auto error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket"); diff --git a/src/core/lib/event_engine/windows/windows_listener.h b/src/core/lib/event_engine/windows/windows_listener.h index a84238c3b7c..2773f3afbf3 100644 --- a/src/core/lib/event_engine/windows/windows_listener.h +++ b/src/core/lib/event_engine/windows/windows_listener.h @@ -30,6 +30,14 @@ #include "src/core/lib/event_engine/thread_pool/thread_pool.h" #include "src/core/lib/event_engine/windows/iocp.h" #include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_HAVE_UNIX_SOCKET +// clang-format off +#include +#include +// clang-format on +#endif namespace grpc_event_engine { namespace experimental { @@ -120,9 +128,15 @@ class WindowsEventEngineListener : public EventEngine::Listener { // The cached AcceptEx for that port. LPFN_ACCEPTEX AcceptEx; + // Buffer to hold the local and remote address. // This seemingly magic number comes from AcceptEx's documentation. each // address buffer needs to have at least 16 more bytes at their end. +#ifdef GRPC_HAVE_UNIX_SOCKET + // unix addr is larger than ip addr. + uint8_t addresses_[(sizeof(sockaddr_un) + 16) * 2] = {}; +#else uint8_t addresses_[(sizeof(sockaddr_in6) + 16) * 2] = {}; +#endif // The parent listener WindowsEventEngineListener* listener_; // shared state for asynchronous cleanup of overlapped operations diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index 3385408309b..f4102a524a2 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -30,6 +30,9 @@ #endif #if defined(GPR_WINDOWS) #define GRPC_WINSOCK_SOCKET 1 +#ifndef __MINGW32__ +#define GRPC_HAVE_UNIX_SOCKET 1 +#endif // __MINGW32__ #define GRPC_WINDOWS_SOCKETUTILS 1 #define GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER 1 #elif defined(GPR_ANDROID) diff --git a/src/core/lib/iomgr/tcp_client_windows.cc b/src/core/lib/iomgr/tcp_client_windows.cc index 7a4d7ca3103..637c6e3cb55 100644 --- a/src/core/lib/iomgr/tcp_client_windows.cc +++ b/src/core/lib/iomgr/tcp_client_windows.cc @@ -145,6 +145,8 @@ static int64_t tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint, grpc_error_handle error; async_connect* ac = NULL; absl::StatusOr addr_uri; + int addr_family; + int protocol; addr_uri = grpc_sockaddr_to_uri(addr); if (!addr_uri.ok()) { @@ -159,14 +161,25 @@ static int64_t tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint, addr = &addr6_v4mapped; } - sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0, + // extract family + addr_family = + (grpc_sockaddr_get_family(addr) == AF_UNIX) ? AF_UNIX : AF_INET6; + protocol = addr_family == AF_UNIX ? 0 : IPPROTO_TCP; + + sock = WSASocket(addr_family, SOCK_STREAM, protocol, NULL, 0, grpc_get_default_wsa_socket_flags()); if (sock == INVALID_SOCKET) { error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket"); goto failure; } - error = grpc_tcp_prepare_socket(sock); + if (addr_family == AF_UNIX) { + // tcp settings for af_unix are skipped. + error = grpc_tcp_set_non_block(sock); + } else { + error = grpc_tcp_prepare_socket(sock); + } + if (!error.ok()) { goto failure; } @@ -183,7 +196,15 @@ static int64_t tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint, goto failure; } - grpc_sockaddr_make_wildcard6(0, &local_address); + if (addr_family == AF_UNIX) { + // For ConnectEx() to work for AF_UNIX, the sock needs to be bound to + // the local address of an unnamed socket. + local_address = {}; + ((grpc_sockaddr*)local_address.addr)->sa_family = AF_UNIX; + local_address.len = sizeof(grpc_sockaddr); + } else { + grpc_sockaddr_make_wildcard6(0, &local_address); + } status = bind(sock, (grpc_sockaddr*)&local_address.addr, (int)local_address.len); @@ -196,7 +217,6 @@ static int64_t tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint, info = &socket->write_info; success = ConnectEx(sock, (grpc_sockaddr*)&addr->addr, (int)addr->len, NULL, 0, NULL, &info->overlapped); - // It wouldn't be unusual to get a success immediately. But we'll still get // an IOCP notification, so let's ignore it. if (!success) { @@ -206,7 +226,6 @@ static int64_t tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint, goto failure; } } - ac = new async_connect(); ac->on_done = on_done; ac->socket = socket; diff --git a/src/core/lib/iomgr/tcp_server_windows.cc b/src/core/lib/iomgr/tcp_server_windows.cc index d7775a1e267..9de6cb0f268 100644 --- a/src/core/lib/iomgr/tcp_server_windows.cc +++ b/src/core/lib/iomgr/tcp_server_windows.cc @@ -78,13 +78,21 @@ using ::grpc_event_engine::experimental::WindowsEventEngineListener; // one listening port typedef struct grpc_tcp_listener grpc_tcp_listener; struct grpc_tcp_listener { + // Buffer to hold the local and remote address. // This seemingly magic number comes from AcceptEx's documentation. each // address buffer needs to have at least 16 more bytes at their end. +#ifdef GRPC_HAVE_UNIX_SOCKET + // unix addr is larger than ip addr. + uint8_t addresses[(sizeof(sockaddr_un) + 16) * 2] = {}; +#else uint8_t addresses[(sizeof(grpc_sockaddr_in6) + 16) * 2]; +#endif // GRPC_HAVE_UNIX_SOCKET // This will hold the socket for the next accept. SOCKET new_socket; // The listener winsocket. grpc_winsocket* socket; + // address of listener + grpc_resolved_address resolved_addr; // The actual TCP port number. int port; unsigned port_index; @@ -125,6 +133,35 @@ struct grpc_tcp_server { WindowsEventEngineListener* ee_listener; }; +// TODO(hork): This may be refactored to share with posix engine and event +// engine. +void unlink_if_unix_domain_socket(const grpc_resolved_address* resolved_addr) { +#ifdef GRPC_HAVE_UNIX_SOCKET + const grpc_sockaddr* addr = + reinterpret_cast(resolved_addr->addr); + if (addr->sa_family != AF_UNIX) { + return; + } + struct sockaddr_un* un = + reinterpret_cast(const_cast(addr)); + // There is nothing to unlink for an abstract unix socket. + if (un->sun_path[0] == '\0' && un->sun_path[1] != '\0') { + return; + } + // For windows we need to remove the file instead of unlink. + DWORD attr = ::GetFileAttributesA(un->sun_path); + if (attr == INVALID_FILE_ATTRIBUTES) { + return; + } + if (attr & FILE_ATTRIBUTE_DIRECTORY || attr & FILE_ATTRIBUTE_READONLY) { + return; + } + ::DeleteFileA(un->sun_path); +#else + (void)resolved_addr; +#endif +} + // Public function. Allocates the proper data structures to hold a // grpc_tcp_server. static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete, @@ -158,6 +195,7 @@ static void destroy_server(void* arg, grpc_error_handle /* error */) { s->head = sp->next; sp->next = NULL; grpc_winsocket_destroy(sp->socket); + unlink_if_unix_domain_socket(&sp->resolved_addr); gpr_free(sp); } gpr_mu_destroy(&s->mu); @@ -222,12 +260,15 @@ static grpc_error_handle prepare_socket(SOCKET sock, grpc_resolved_address sockname_temp; grpc_error_handle error; int sockname_temp_len; - - error = grpc_tcp_prepare_socket(sock); + if (grpc_sockaddr_get_family(addr) == AF_UNIX) { + error = grpc_tcp_set_non_block(sock); + } else { + error = grpc_tcp_prepare_socket(sock); + } if (!error.ok()) { goto failure; } - + unlink_if_unix_domain_socket(addr); if (bind(sock, (const grpc_sockaddr*)addr->addr, (int)addr->len) == SOCKET_ERROR) { error = GRPC_WSA_ERROR(WSAGetLastError(), "bind"); @@ -277,22 +318,28 @@ static void decrement_active_ports_and_notify_locked(grpc_tcp_listener* sp) { static grpc_error_handle start_accept_locked(grpc_tcp_listener* port) { SOCKET sock = INVALID_SOCKET; BOOL success; - DWORD addrlen = sizeof(grpc_sockaddr_in6) + 16; + const DWORD addrlen = sizeof(port->addresses) / 2; DWORD bytes_received = 0; grpc_error_handle error; if (port->shutting_down) { return absl::OkStatus(); } - - sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0, + const int addr_family = + grpc_sockaddr_get_family(&port->resolved_addr) == AF_UNIX ? AF_UNIX + : AF_INET6; + const int protocol = addr_family == AF_UNIX ? 0 : IPPROTO_TCP; + sock = WSASocket(addr_family, SOCK_STREAM, protocol, NULL, 0, grpc_get_default_wsa_socket_flags()); if (sock == INVALID_SOCKET) { error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket"); goto failure; } - - error = grpc_tcp_prepare_socket(sock); + if (addr_family == AF_UNIX) { + error = grpc_tcp_set_non_block(sock); + } else { + error = grpc_tcp_prepare_socket(sock); + } if (!error.ok()) goto failure; // Start the "accept" asynchronously. @@ -463,6 +510,7 @@ static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, SOCKET sock, sp->outstanding_calls = 0; sp->AcceptEx = AcceptEx; sp->new_socket = INVALID_SOCKET; + sp->resolved_addr = *addr; sp->port = port; sp->port_index = port_index; GRPC_CLOSURE_INIT(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx); @@ -522,7 +570,10 @@ static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s, addr = &wildcard; } - sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0, + const int addr_family = + grpc_sockaddr_get_family(addr) == AF_UNIX ? AF_UNIX : AF_INET6; + const int protocol = addr_family == AF_UNIX ? 0 : IPPROTO_TCP; + sock = WSASocket(addr_family, SOCK_STREAM, protocol, NULL, 0, grpc_get_default_wsa_socket_flags()); if (sock == INVALID_SOCKET) { error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket"); diff --git a/src/core/lib/iomgr/unix_sockets_posix.cc b/src/core/lib/iomgr/unix_sockets_posix.cc index e182e6adf13..01daa2c90cf 100644 --- a/src/core/lib/iomgr/unix_sockets_posix.cc +++ b/src/core/lib/iomgr/unix_sockets_posix.cc @@ -24,7 +24,14 @@ #include #include #include +#ifdef GPR_WINDOWS +// clang-format off +#include +#include +// clang-format on +#else #include +#endif // GPR_WINDOWS #include "absl/strings/str_cat.h" @@ -39,7 +46,11 @@ #include "src/core/lib/transport/error_utils.h" void grpc_create_socketpair_if_unix(int sv[2]) { +#ifdef GPR_WINDOWS + grpc_core::Crash("AF_UNIX socket pairs are not supported on Windows"); +#else GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); +#endif } absl::StatusOr> @@ -86,10 +97,12 @@ void grpc_unlink_if_unix_domain_socket( return; } +#ifndef GPR_WINDOWS struct stat st; if (stat(un->sun_path, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK) { unlink(un->sun_path); } +#endif } #endif diff --git a/src/core/resolver/binder/binder_resolver.cc b/src/core/resolver/binder/binder_resolver.cc index 5f1916958a6..dfc8e15b2d8 100644 --- a/src/core/resolver/binder/binder_resolver.cc +++ b/src/core/resolver/binder/binder_resolver.cc @@ -24,8 +24,15 @@ #ifdef GRPC_HAVE_UNIX_SOCKET #include +#ifdef GPR_WINDOWS +// clang-format off +#include +#include +// clang-format on +#else #include #include +#endif // GPR_WINDOWS #include #include diff --git a/test/core/address_utils/parse_address_test.cc b/test/core/address_utils/parse_address_test.cc index 07b29c2678a..604d38aa300 100644 --- a/test/core/address_utils/parse_address_test.cc +++ b/test/core/address_utils/parse_address_test.cc @@ -18,8 +18,15 @@ #include "src/core/lib/address_utils/parse_address.h" #ifdef GRPC_HAVE_UNIX_SOCKET +#ifdef GPR_WINDOWS +// clang-format off +#include +#include +// clang-format on +#else #include -#endif +#endif // GPR_WINDOWS +#endif // GRPC_HAVE_UNIX_SOCKET #ifdef GRPC_HAVE_VSOCK #include diff --git a/test/core/address_utils/sockaddr_utils_test.cc b/test/core/address_utils/sockaddr_utils_test.cc index 1b4345c4b8a..74bb201cc1a 100644 --- a/test/core/address_utils/sockaddr_utils_test.cc +++ b/test/core/address_utils/sockaddr_utils_test.cc @@ -30,8 +30,15 @@ #include "src/core/lib/iomgr/port.h" #include "src/core/lib/iomgr/resolved_address.h" #ifdef GRPC_HAVE_UNIX_SOCKET +#ifdef GPR_WINDOWS +// clang-format off +#include +#include +// clang-format on +#else #include -#endif +#endif // GPR_WINDOWS +#endif // GRPC_HAVE_UNIX_SOCKET #include diff --git a/test/core/client_channel/resolvers/binder_resolver_test.cc b/test/core/client_channel/resolvers/binder_resolver_test.cc index 4abae7ee1a6..655f4c09cef 100644 --- a/test/core/client_channel/resolvers/binder_resolver_test.cc +++ b/test/core/client_channel/resolvers/binder_resolver_test.cc @@ -33,8 +33,15 @@ #ifdef GRPC_HAVE_UNIX_SOCKET +#ifdef GPR_WINDOWS +// clang-format off +#include +#include +// clang-format on +#else #include #include +#endif // GPR_WINDOWS #include #include @@ -201,7 +208,7 @@ TEST_F(BinderResolverTest, ValidCases) { "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-._~"); } -#endif +#endif // GRPC_HAVE_UNIX_SOCKET int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); diff --git a/test/core/end2end/end2end_test_suites.cc b/test/core/end2end/end2end_test_suites.cc index 6751b6e0dfc..625d1a8190a 100644 --- a/test/core/end2end/end2end_test_suites.cc +++ b/test/core/end2end/end2end_test_suites.cc @@ -568,6 +568,25 @@ class InsecureFixtureWithPipeForWakeupFd : public InsecureFixture { }; #endif +// Returns the temp directory to create uds in this test. +std::string GetTempDir() { +#ifdef GPR_WINDOWS + // Windows temp dir usually exceeds uds max paht length, + // so we create a short dir for this test. + // TODO: find a better solution. + std::string temp_dir = "C:/tmp/"; + if (CreateDirectoryA(temp_dir.c_str(), NULL) == 0 && + ERROR_ALREADY_EXISTS != GetLastError()) { + Crash(absl::StrCat("Could not create temp dir: ", temp_dir)); + } + return temp_dir; +#else + return "/tmp/"; +#endif // GPR_WINDOWS +} + +const std::string temp_dir = GetTempDir(); + std::vector DefaultConfigs() { return std::vector { #ifdef GRPC_POSIX_SOCKET @@ -662,9 +681,10 @@ std::vector DefaultConfigs() { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); return std::make_unique( absl::StrFormat( - "unix:/tmp/grpc_fullstack_test.%%25.%d.%" PRId64 - ".%" PRId32 ".%" PRId64 ".%" PRId64, - getpid(), now.tv_sec, now.tv_nsec, + "unix:%s" + "grpc_fullstack_test.%%25.%d.%" PRId64 ".%" PRId32 + ".%" PRId64 ".%" PRId64, + temp_dir, getpid(), now.tv_sec, now.tv_nsec, unique.fetch_add(1, std::memory_order_relaxed), Rand()), UDS); }}, @@ -680,9 +700,10 @@ std::vector DefaultConfigs() { gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); return std::make_unique( absl::StrFormat( - "unix:/tmp/grpc_fullstack_test.%d.%" PRId64 ".%" PRId32 - ".%" PRId64 ".%" PRId64, - getpid(), now.tv_sec, now.tv_nsec, + "unix:%s" + "grpc_fullstack_test.%d.%" PRId64 ".%" PRId32 ".%" PRId64 + ".%" PRId64, + temp_dir, getpid(), now.tv_sec, now.tv_nsec, unique.fetch_add(1, std::memory_order_relaxed), Rand()), UDS); }}, @@ -924,9 +945,10 @@ std::vector DefaultConfigs() { [](const ChannelArgs&, const ChannelArgs&) { gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); return std::make_unique(absl::StrFormat( - "unix:/tmp/grpc_fullstack_test.%d.%" PRId64 ".%" PRId32 - ".%" PRId64 ".%" PRId64, - getpid(), now.tv_sec, now.tv_nsec, + "unix:%s" + "grpc_fullstack_test.%d.%" PRId64 ".%" PRId32 ".%" PRId64 + ".%" PRId64, + temp_dir, getpid(), now.tv_sec, now.tv_nsec, unique.fetch_add(1, std::memory_order_relaxed), Rand())); }}, #endif diff --git a/test/core/end2end/tests/default_host.cc b/test/core/end2end/tests/default_host.cc index 8bd53f3bb7a..770c15c4d61 100644 --- a/test/core/end2end/tests/default_host.cc +++ b/test/core/end2end/tests/default_host.cc @@ -65,7 +65,8 @@ CORE_END2END_TEST(CoreClientChannelTest, DefaultHost) { EXPECT_THAT(s.host(), AnyOf(StartsWith("localhost"), StartsWith("127.0.0.1"), StartsWith("[::1]"), StartsWith("grpc_fullstack_test."), - StartsWith("tmp%2Fgrpc_fullstack_test."))); + StartsWith("tmp%2Fgrpc_fullstack_test."), + StartsWith("C:%2Ftmp%2Fgrpc_fullstack_test."))); } EXPECT_FALSE(client_close.was_cancelled()); } diff --git a/test/core/event_engine/tcp_socket_utils_test.cc b/test/core/event_engine/tcp_socket_utils_test.cc index d3b114d8014..703ee19c5b7 100644 --- a/test/core/event_engine/tcp_socket_utils_test.cc +++ b/test/core/event_engine/tcp_socket_utils_test.cc @@ -35,8 +35,15 @@ #include #ifdef GRPC_HAVE_UNIX_SOCKET +#ifdef GPR_WINDOWS +// clang-format off +#include +#include +// clang-format on +#else #include -#endif +#endif // GPR_WINDOWS +#endif // GRPC_HAVE_UNIX_SOCKET #include "absl/status/status.h" #include "absl/status/statusor.h" diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index a072fdd17d0..5eb30557651 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -285,6 +285,7 @@ class CLanguage(object): "cmake", "cmake_ninja_vs2019", "cmake_vs2019", + "cmake_vs2022", ], ) _check_arch(self.args.arch, ["default", "x64", "x86"]) @@ -301,6 +302,8 @@ class CLanguage(object): activate_vs_tools = "2019" elif self.args.compiler == "cmake_vs2019": cmake_generator = "Visual Studio 16 2019" + elif self.args.compiler == "cmake_vs2022": + cmake_generator = "Visual Studio 17 2022" else: print("should never reach here.") sys.exit(1) @@ -1684,6 +1687,7 @@ argp.add_argument( "cmake", "cmake_ninja_vs2019", "cmake_vs2019", + "cmake_vs2022", "mono", ], default="default",