diff --git a/CMakeLists.txt b/CMakeLists.txt index 53296baf8be..54707389796 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2207,6 +2207,7 @@ add_library(grpc src/core/lib/event_engine/windows/win_socket.cc src/core/lib/event_engine/windows/windows_endpoint.cc src/core/lib/event_engine/windows/windows_engine.cc + src/core/lib/event_engine/windows/windows_listener.cc src/core/lib/experiments/config.cc src/core/lib/experiments/experiments.cc src/core/lib/gprpp/load_file.cc @@ -2894,6 +2895,7 @@ add_library(grpc_unsecure src/core/lib/event_engine/windows/win_socket.cc src/core/lib/event_engine/windows/windows_endpoint.cc src/core/lib/event_engine/windows/windows_engine.cc + src/core/lib/event_engine/windows/windows_listener.cc src/core/lib/experiments/config.cc src/core/lib/experiments/experiments.cc src/core/lib/gprpp/load_file.cc @@ -4411,6 +4413,7 @@ add_library(grpc_authorization_provider src/core/lib/event_engine/windows/win_socket.cc src/core/lib/event_engine/windows/windows_endpoint.cc src/core/lib/event_engine/windows/windows_engine.cc + src/core/lib/event_engine/windows/windows_listener.cc src/core/lib/experiments/config.cc src/core/lib/experiments/experiments.cc src/core/lib/gprpp/load_file.cc @@ -11500,6 +11503,7 @@ add_executable(frame_test src/core/lib/event_engine/windows/win_socket.cc src/core/lib/event_engine/windows/windows_endpoint.cc src/core/lib/event_engine/windows/windows_engine.cc + src/core/lib/event_engine/windows/windows_listener.cc src/core/lib/experiments/config.cc src/core/lib/experiments/experiments.cc src/core/lib/gprpp/load_file.cc diff --git a/Makefile b/Makefile index 17e6b0e07af..b193285501c 100644 --- a/Makefile +++ b/Makefile @@ -1460,6 +1460,7 @@ LIBGRPC_SRC = \ src/core/lib/event_engine/windows/win_socket.cc \ src/core/lib/event_engine/windows/windows_endpoint.cc \ src/core/lib/event_engine/windows/windows_engine.cc \ + src/core/lib/event_engine/windows/windows_listener.cc \ src/core/lib/experiments/config.cc \ src/core/lib/experiments/experiments.cc \ src/core/lib/gprpp/load_file.cc \ @@ -2000,6 +2001,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/event_engine/windows/win_socket.cc \ src/core/lib/event_engine/windows/windows_endpoint.cc \ src/core/lib/event_engine/windows/windows_engine.cc \ + src/core/lib/event_engine/windows/windows_listener.cc \ src/core/lib/experiments/config.cc \ src/core/lib/experiments/experiments.cc \ src/core/lib/gprpp/load_file.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 3f377381296..6381d79f23d 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -823,6 +823,7 @@ libs: - src/core/lib/event_engine/windows/win_socket.h - src/core/lib/event_engine/windows/windows_endpoint.h - src/core/lib/event_engine/windows/windows_engine.h + - src/core/lib/event_engine/windows/windows_listener.h - src/core/lib/experiments/config.h - src/core/lib/experiments/experiments.h - src/core/lib/gpr/spinlock.h @@ -1600,6 +1601,7 @@ libs: - src/core/lib/event_engine/windows/win_socket.cc - src/core/lib/event_engine/windows/windows_endpoint.cc - src/core/lib/event_engine/windows/windows_engine.cc + - src/core/lib/event_engine/windows/windows_listener.cc - src/core/lib/experiments/config.cc - src/core/lib/experiments/experiments.cc - src/core/lib/gprpp/load_file.cc @@ -2161,6 +2163,7 @@ libs: - src/core/lib/event_engine/windows/win_socket.h - src/core/lib/event_engine/windows/windows_endpoint.h - src/core/lib/event_engine/windows/windows_engine.h + - src/core/lib/event_engine/windows/windows_listener.h - src/core/lib/experiments/config.h - src/core/lib/experiments/experiments.h - src/core/lib/gpr/spinlock.h @@ -2551,6 +2554,7 @@ libs: - src/core/lib/event_engine/windows/win_socket.cc - src/core/lib/event_engine/windows/windows_endpoint.cc - src/core/lib/event_engine/windows/windows_engine.cc + - src/core/lib/event_engine/windows/windows_listener.cc - src/core/lib/experiments/config.cc - src/core/lib/experiments/experiments.cc - src/core/lib/gprpp/load_file.cc @@ -3620,6 +3624,7 @@ libs: - src/core/lib/event_engine/windows/win_socket.h - src/core/lib/event_engine/windows/windows_endpoint.h - src/core/lib/event_engine/windows/windows_engine.h + - src/core/lib/event_engine/windows/windows_listener.h - src/core/lib/experiments/config.h - src/core/lib/experiments/experiments.h - src/core/lib/gpr/spinlock.h @@ -3889,6 +3894,7 @@ libs: - src/core/lib/event_engine/windows/win_socket.cc - src/core/lib/event_engine/windows/windows_endpoint.cc - src/core/lib/event_engine/windows/windows_engine.cc + - src/core/lib/event_engine/windows/windows_listener.cc - src/core/lib/experiments/config.cc - src/core/lib/experiments/experiments.cc - src/core/lib/gprpp/load_file.cc @@ -7437,6 +7443,7 @@ targets: - src/core/lib/event_engine/windows/win_socket.h - src/core/lib/event_engine/windows/windows_endpoint.h - src/core/lib/event_engine/windows/windows_engine.h + - src/core/lib/event_engine/windows/windows_listener.h - src/core/lib/experiments/config.h - src/core/lib/experiments/experiments.h - src/core/lib/gpr/spinlock.h @@ -7689,6 +7696,7 @@ targets: - src/core/lib/event_engine/windows/win_socket.cc - src/core/lib/event_engine/windows/windows_endpoint.cc - src/core/lib/event_engine/windows/windows_engine.cc + - src/core/lib/event_engine/windows/windows_listener.cc - src/core/lib/experiments/config.cc - src/core/lib/experiments/experiments.cc - src/core/lib/gprpp/load_file.cc diff --git a/config.m4 b/config.m4 index a72d56c10ff..f70f215fcb8 100644 --- a/config.m4 +++ b/config.m4 @@ -542,6 +542,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/event_engine/windows/win_socket.cc \ src/core/lib/event_engine/windows/windows_endpoint.cc \ src/core/lib/event_engine/windows/windows_engine.cc \ + src/core/lib/event_engine/windows/windows_listener.cc \ src/core/lib/experiments/config.cc \ src/core/lib/experiments/experiments.cc \ src/core/lib/gpr/alloc.cc \ diff --git a/config.w32 b/config.w32 index 59375a0b674..e997d26b9e3 100644 --- a/config.w32 +++ b/config.w32 @@ -508,6 +508,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\event_engine\\windows\\win_socket.cc " + "src\\core\\lib\\event_engine\\windows\\windows_endpoint.cc " + "src\\core\\lib\\event_engine\\windows\\windows_engine.cc " + + "src\\core\\lib\\event_engine\\windows\\windows_listener.cc " + "src\\core\\lib\\experiments\\config.cc " + "src\\core\\lib\\experiments\\experiments.cc " + "src\\core\\lib\\gpr\\alloc.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 300198da9a7..faefec192bb 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -775,6 +775,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/windows/win_socket.h', 'src/core/lib/event_engine/windows/windows_endpoint.h', 'src/core/lib/event_engine/windows/windows_engine.h', + 'src/core/lib/event_engine/windows/windows_listener.h', 'src/core/lib/experiments/config.h', 'src/core/lib/experiments/experiments.h', 'src/core/lib/gpr/alloc.h', @@ -1708,6 +1709,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/windows/win_socket.h', 'src/core/lib/event_engine/windows/windows_endpoint.h', 'src/core/lib/event_engine/windows/windows_engine.h', + 'src/core/lib/event_engine/windows/windows_listener.h', 'src/core/lib/experiments/config.h', 'src/core/lib/experiments/experiments.h', 'src/core/lib/gpr/alloc.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 3fc4df96905..45ba51d33d6 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1206,6 +1206,8 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/windows/windows_endpoint.h', 'src/core/lib/event_engine/windows/windows_engine.cc', 'src/core/lib/event_engine/windows/windows_engine.h', + 'src/core/lib/event_engine/windows/windows_listener.cc', + 'src/core/lib/event_engine/windows/windows_listener.h', 'src/core/lib/experiments/config.cc', 'src/core/lib/experiments/config.h', 'src/core/lib/experiments/experiments.cc', @@ -2394,6 +2396,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/windows/win_socket.h', 'src/core/lib/event_engine/windows/windows_endpoint.h', 'src/core/lib/event_engine/windows/windows_engine.h', + 'src/core/lib/event_engine/windows/windows_listener.h', 'src/core/lib/experiments/config.h', 'src/core/lib/experiments/experiments.h', 'src/core/lib/gpr/alloc.h', diff --git a/grpc.gemspec b/grpc.gemspec index 53d73afab51..38b7ecdb7df 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -1115,6 +1115,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/event_engine/windows/windows_endpoint.h ) s.files += %w( src/core/lib/event_engine/windows/windows_engine.cc ) s.files += %w( src/core/lib/event_engine/windows/windows_engine.h ) + s.files += %w( src/core/lib/event_engine/windows/windows_listener.cc ) + s.files += %w( src/core/lib/event_engine/windows/windows_listener.h ) s.files += %w( src/core/lib/experiments/config.cc ) s.files += %w( src/core/lib/experiments/config.h ) s.files += %w( src/core/lib/experiments/experiments.cc ) diff --git a/grpc.gyp b/grpc.gyp index b7da1e9e5da..f8314285816 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -873,6 +873,7 @@ 'src/core/lib/event_engine/windows/win_socket.cc', 'src/core/lib/event_engine/windows/windows_endpoint.cc', 'src/core/lib/event_engine/windows/windows_engine.cc', + 'src/core/lib/event_engine/windows/windows_listener.cc', 'src/core/lib/experiments/config.cc', 'src/core/lib/experiments/experiments.cc', 'src/core/lib/gprpp/load_file.cc', @@ -1355,6 +1356,7 @@ 'src/core/lib/event_engine/windows/win_socket.cc', 'src/core/lib/event_engine/windows/windows_endpoint.cc', 'src/core/lib/event_engine/windows/windows_engine.cc', + 'src/core/lib/event_engine/windows/windows_listener.cc', 'src/core/lib/experiments/config.cc', 'src/core/lib/experiments/experiments.cc', 'src/core/lib/gprpp/load_file.cc', @@ -1861,6 +1863,7 @@ 'src/core/lib/event_engine/windows/win_socket.cc', 'src/core/lib/event_engine/windows/windows_endpoint.cc', 'src/core/lib/event_engine/windows/windows_engine.cc', + 'src/core/lib/event_engine/windows/windows_listener.cc', 'src/core/lib/experiments/config.cc', 'src/core/lib/experiments/experiments.cc', 'src/core/lib/gprpp/load_file.cc', diff --git a/package.xml b/package.xml index ab590e51b26..33c71875c2a 100644 --- a/package.xml +++ b/package.xml @@ -1097,6 +1097,8 @@ + + diff --git a/src/core/BUILD b/src/core/BUILD index c675f8c1590..35819274992 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -1992,6 +1992,7 @@ grpc_cc_library( "posix_event_engine_timer_manager", "time", "windows_endpoint", + "windows_event_engine_listener", "windows_iocp", "//:event_engine_base_hdrs", "//:gpr", @@ -2055,6 +2056,33 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "windows_event_engine_listener", + srcs = [ + "lib/event_engine/windows/windows_listener.cc", + ], + hdrs = [ + "lib/event_engine/windows/windows_listener.h", + ], + external_deps = [ + "absl/base:core_headers", + "absl/status", + "absl/status:statusor", + "absl/strings:str_format", + ], + deps = [ + "common_event_engine_closures", + "error", + "event_engine_tcp_socket_utils", + "event_engine_trace", + "windows_endpoint", + "windows_iocp", + "//:event_engine_base_hdrs", + "//:gpr", + "//:gpr_platform", + ], +) + grpc_cc_library( name = "cf_event_engine", srcs = ["lib/event_engine/cf_engine/cf_engine.cc"], diff --git a/src/core/lib/event_engine/windows/windows_engine.cc b/src/core/lib/event_engine/windows/windows_engine.cc index 6d507bc17e6..073511857ce 100644 --- a/src/core/lib/event_engine/windows/windows_engine.cc +++ b/src/core/lib/event_engine/windows/windows_engine.cc @@ -37,6 +37,7 @@ #include "src/core/lib/event_engine/windows/iocp.h" #include "src/core/lib/event_engine/windows/windows_endpoint.h" #include "src/core/lib/event_engine/windows/windows_engine.h" +#include "src/core/lib/event_engine/windows/windows_listener.h" #include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/time.h" @@ -377,13 +378,15 @@ bool WindowsEventEngine::CancelConnectInternalStateLocked( absl::StatusOr> WindowsEventEngine::CreateListener( - Listener::AcceptCallback /* on_accept */, - absl::AnyInvocable /* on_shutdown */, - const EndpointConfig& /* config */, - std::unique_ptr /* memory_allocator_factory */) { - grpc_core::Crash("unimplemented"); + Listener::AcceptCallback on_accept, + absl::AnyInvocable on_shutdown, + const EndpointConfig& config, + std::unique_ptr memory_allocator_factory) { + return std::make_unique( + &iocp_, std::move(on_accept), std::move(on_shutdown), + std::move(memory_allocator_factory), shared_from_this(), executor_.get(), + config); } - } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/windows/windows_listener.cc b/src/core/lib/event_engine/windows/windows_listener.cc new file mode 100644 index 00000000000..40cd22c7018 --- /dev/null +++ b/src/core/lib/event_engine/windows/windows_listener.cc @@ -0,0 +1,365 @@ +// Copyright 2023 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#ifdef GPR_WINDOWS + +#include "absl/status/status.h" +#include "absl/strings/str_format.h" + +#include "src/core/lib/event_engine/tcp_socket_utils.h" +#include "src/core/lib/event_engine/trace.h" +#include "src/core/lib/event_engine/windows/iocp.h" +#include "src/core/lib/event_engine/windows/win_socket.h" +#include "src/core/lib/event_engine/windows/windows_endpoint.h" +#include "src/core/lib/event_engine/windows/windows_listener.h" +#include "src/core/lib/gprpp/crash.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/iomgr/error.h" + +namespace grpc_event_engine { +namespace experimental { + +// ---- SinglePortSocketListener::AsyncIOState ---- + +WindowsEventEngineListener::SinglePortSocketListener::AsyncIOState:: + AsyncIOState(SinglePortSocketListener* port_listener, + std::unique_ptr listener_socket) + : port_listener(port_listener), + listener_socket(std::move(listener_socket)) {} + +WindowsEventEngineListener::SinglePortSocketListener::AsyncIOState:: + ~AsyncIOState() { + closesocket(accept_socket); +} + +void WindowsEventEngineListener::SinglePortSocketListener:: + OnAcceptCallbackWrapper::Run() { + GPR_ASSERT(io_state_ != nullptr); + grpc_core::ReleasableMutexLock lock(&io_state_->mu); + if (io_state_->listener_socket->IsShutdown()) { + GRPC_EVENT_ENGINE_TRACE( + "SinglePortSocketListener::%p listener socket is shut down. Shutting " + "down listener.", + io_state_->port_listener); + lock.Release(); + io_state_.reset(); + return; + } + io_state_->port_listener->OnAcceptCallbackLocked(); +} + +void WindowsEventEngineListener::SinglePortSocketListener:: + OnAcceptCallbackWrapper::Prime(std::shared_ptr io_state) { + io_state_ = std::move(io_state); +} + +// ---- SinglePortSocketListener ---- + +WindowsEventEngineListener::SinglePortSocketListener:: + ~SinglePortSocketListener() { + io_state_->listener_socket->Shutdown(DEBUG_LOCATION, + "~SinglePortSocketListener"); + GRPC_EVENT_ENGINE_TRACE("~SinglePortSocketListener::%p", this); +} + +absl::StatusOr< + std::unique_ptr> +WindowsEventEngineListener::SinglePortSocketListener::Create( + WindowsEventEngineListener* listener, SOCKET sock, + EventEngine::ResolvedAddress addr) { + // We need to grab the AcceptEx pointer for that port, as it may be + // interface-dependent. We'll cache it to avoid doing that again. + GUID guid = WSAID_ACCEPTEX; + DWORD ioctl_num_bytes; + LPFN_ACCEPTEX AcceptEx; + int status = + WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), + &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL); + if (status != 0) { + auto error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket"); + closesocket(sock); + return error; + } + auto result = SinglePortSocketListener::PrepareListenerSocket(sock, addr); + GRPC_RETURN_IF_ERROR(result.status()); + GPR_ASSERT(result->port >= 0); + // Using `new` to access non-public constructor + return absl::WrapUnique(new SinglePortSocketListener( + listener, AcceptEx, /*win_socket=*/listener->iocp_->Watch(sock), + result->port, result->hostbyname)); +} + +absl::Status WindowsEventEngineListener::SinglePortSocketListener::Start() { + grpc_core::MutexLock lock(&io_state_->mu); + return StartLocked(); +} + +absl::Status +WindowsEventEngineListener::SinglePortSocketListener::StartLocked() { + SOCKET accept_socket = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0, + IOCP::GetDefaultSocketFlags()); + if (accept_socket == INVALID_SOCKET) { + return GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket"); + } + auto fail = [&](absl::Status error) -> absl::Status { + if (accept_socket != INVALID_SOCKET) closesocket(accept_socket); + return error; + }; + auto error = PrepareSocket(accept_socket); + if (!error.ok()) return fail(error); + // Start the "accept" asynchronously. + DWORD addrlen = sizeof(sockaddr_in6) + 16; + DWORD bytes_received = 0; + int success = + AcceptEx(io_state_->listener_socket->raw_socket(), accept_socket, + addresses_, 0, addrlen, addrlen, &bytes_received, + io_state_->listener_socket->read_info()->overlapped()); + // It is possible to get an accept immediately without delay. However, we + // will still get an IOCP notification for it. So let's just ignore it. + if (success != 0) { + int last_error = WSAGetLastError(); + if (last_error != ERROR_IO_PENDING) { + return fail(GRPC_WSA_ERROR(last_error, "AcceptEx")); + } + } + // We're ready to do the accept. Calling NotifyOnRead may immediately process + // an accept that happened in the meantime. + io_state_->accept_socket = accept_socket; + io_state_->listener_socket->NotifyOnRead(&io_state_->on_accept_cb); + GRPC_EVENT_ENGINE_TRACE( + "SinglePortSocketListener::%p listening. listener_socket::%p", this, + io_state_->listener_socket.get()); + return absl::OkStatus(); +} + +void WindowsEventEngineListener::SinglePortSocketListener:: + OnAcceptCallbackLocked() { + auto close_socket_and_restart = + [&](bool do_close_socket = true) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(io_state_->mu) { + if (do_close_socket) closesocket(io_state_->accept_socket); + io_state_->accept_socket = INVALID_SOCKET; + GPR_ASSERT(GRPC_LOG_IF_ERROR("SinglePortSocketListener::Start", + StartLocked())); + }; + const auto& overlapped_result = + io_state_->listener_socket->read_info()->result(); + if (overlapped_result.wsa_error != 0) { + gpr_log(GPR_ERROR, "%s", + GRPC_WSA_ERROR(overlapped_result.wsa_error, + "Skipping on_accept due to error") + .ToString() + .c_str()); + return close_socket_and_restart(); + } + SOCKET tmp_listener_socket = io_state_->listener_socket->raw_socket(); + int err = + setsockopt(io_state_->accept_socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, + reinterpret_cast(&tmp_listener_socket), + sizeof(tmp_listener_socket)); + if (err != 0) { + gpr_log(GPR_ERROR, "%s", + GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt").ToString().c_str()); + return close_socket_and_restart(); + } + EventEngine::ResolvedAddress peer_address; + int peer_name_len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES; + err = getpeername(io_state_->accept_socket, + const_cast(peer_address.address()), + &peer_name_len); + if (err != 0) { + gpr_log( + GPR_ERROR, "%s", + GRPC_WSA_ERROR(WSAGetLastError(), "getpeername").ToString().c_str()); + return close_socket_and_restart(); + } + peer_address = + EventEngine::ResolvedAddress(peer_address.address(), peer_name_len); + auto addr_uri = ResolvedAddressToURI(peer_address); + std::string peer_name = "unknown"; + if (!addr_uri.ok()) { + // TODO(hork): test an early exit/restart here with end2end tests + gpr_log(GPR_ERROR, "invalid peer name: %s", + addr_uri.status().ToString().c_str()); + } else { + peer_name = *addr_uri; + } + auto endpoint = std::make_unique( + peer_address, listener_->iocp_->Watch(io_state_->accept_socket), + listener_->memory_allocator_factory_->CreateMemoryAllocator( + absl::StrFormat("listener endpoint %s", peer_name)), + listener_->config_, listener_->executor_); + listener_->accept_cb_( + std::move(endpoint), + listener_->memory_allocator_factory_->CreateMemoryAllocator( + absl::StrFormat("listener accept cb for %s", peer_name))); + close_socket_and_restart(/*do_close_socket=*/false); +} + +WindowsEventEngineListener::SinglePortSocketListener::SinglePortSocketListener( + WindowsEventEngineListener* listener, LPFN_ACCEPTEX AcceptEx, + std::unique_ptr listener_socket, int port, + EventEngine::ResolvedAddress hostbyname) + : AcceptEx(AcceptEx), + listener_(listener), + io_state_( + std::make_shared(this, std::move(listener_socket))), + port_(port), + listener_sockname_(hostbyname) { + io_state_->on_accept_cb.Prime(io_state_); +} + +absl::StatusOr +WindowsEventEngineListener::SinglePortSocketListener::PrepareListenerSocket( + SOCKET sock, const EventEngine::ResolvedAddress& addr) { + auto fail = [&](absl::Status error) -> absl::Status { + GPR_ASSERT(!error.ok()); + auto addr_uri = ResolvedAddressToURI(addr); + error = grpc_error_set_int( + grpc_error_set_str( + GRPC_ERROR_CREATE_REFERENCING("Failed to prepare server socket", + &error, 1), + grpc_core::StatusStrProperty::kTargetAddress, + addr_uri.ok() ? *addr_uri : addr_uri.status().ToString()), + grpc_core::StatusIntProperty::kFd, static_cast(sock)); + if (sock != INVALID_SOCKET) closesocket(sock); + return error; + }; + auto error = PrepareSocket(sock); + if (!error.ok()) return fail(error); + if (bind(sock, addr.address(), addr.size()) == SOCKET_ERROR) { + return fail(GRPC_WSA_ERROR(WSAGetLastError(), "bind")); + } + if (listen(sock, SOMAXCONN) == SOCKET_ERROR) { + return fail(GRPC_WSA_ERROR(WSAGetLastError(), "listen")); + } + int sockname_temp_len = sizeof(struct sockaddr_storage); + EventEngine::ResolvedAddress sockname_temp; + if (getsockname(sock, const_cast(sockname_temp.address()), + &sockname_temp_len) == SOCKET_ERROR) { + return fail(GRPC_WSA_ERROR(WSAGetLastError(), "getsockname")); + } + sockname_temp = + EventEngine::ResolvedAddress(sockname_temp.address(), sockname_temp_len); + return PrepareListenerSocketResult{ResolvedAddressGetPort(sockname_temp), + sockname_temp}; +} + +// ---- WindowsEventEngineListener ---- + +WindowsEventEngineListener::WindowsEventEngineListener( + IOCP* iocp, AcceptCallback accept_cb, + absl::AnyInvocable on_shutdown, + std::unique_ptr memory_allocator_factory, + std::shared_ptr engine, Executor* executor, + const EndpointConfig& config) + : iocp_(iocp), + config_(config), + engine_(std::move(engine)), + executor_(executor), + memory_allocator_factory_(std::move(memory_allocator_factory)), + accept_cb_(std::move(accept_cb)), + on_shutdown_(std::move(on_shutdown)) {} + +WindowsEventEngineListener::~WindowsEventEngineListener() { + GRPC_EVENT_ENGINE_TRACE( + "%s", + absl::StrFormat("WindowsEventEngineListener::%p shutting down", this) + .c_str()); + // Shut down each port listener before destroying this EventEngine::Listener + for (auto& port_listener : port_listeners_) { + port_listener.reset(); + } + on_shutdown_(absl::OkStatus()); +} + +absl::StatusOr WindowsEventEngineListener::Bind( + const EventEngine::ResolvedAddress& addr) { + if (started_.load()) { + return absl::FailedPreconditionError( + absl::StrFormat("WindowsEventEngineListener::%p is already started, " + "ports can no longer be bound", + this)); + } + int out_port = ResolvedAddressGetPort(addr); + EventEngine::ResolvedAddress out_addr(addr); + EventEngine::ResolvedAddress tmp_addr; + // Check if this is a wildcard port, and if so, try to keep the port the same + // as some previously created listener. + if (out_port == 0) { + grpc_core::MutexLock lock(&socket_listeners_mu_); + for (const auto& port_listener : port_listeners_) { + tmp_addr = port_listener->listener_sockname(); + out_port = ResolvedAddressGetPort(tmp_addr); + if (out_port > 0) { + ResolvedAddressSetPort(out_addr, out_port); + break; + } + } + } + if (ResolvedAddressToV4Mapped(out_addr, &tmp_addr)) { + out_addr = tmp_addr; + } + // Treat :: or 0.0.0.0 as a family-agnostic wildcard. + if (ResolvedAddressIsWildcard(out_addr)) { + out_addr = ResolvedAddressMakeWild6(out_port); + } + // open the socket + SOCKET sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, nullptr, 0, + IOCP::GetDefaultSocketFlags()); + if (sock == INVALID_SOCKET) { + auto error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket"); + return GRPC_ERROR_CREATE_REFERENCING("Failed to add port to server", &error, + 1); + } + auto port_listener = AddSinglePortSocketListener(sock, out_addr); + GRPC_RETURN_IF_ERROR(port_listener.status()); + return (*port_listener)->port(); +} + +absl::Status WindowsEventEngineListener::Start() { + GPR_ASSERT(!started_.exchange(true)); + grpc_core::MutexLock lock(&socket_listeners_mu_); + for (auto& port_listener : port_listeners_) { + GRPC_RETURN_IF_ERROR(port_listener->Start()); + } + return absl::OkStatus(); +} + +absl::StatusOr +WindowsEventEngineListener::AddSinglePortSocketListener( + SOCKET sock, EventEngine::ResolvedAddress addr) { + auto single_port_listener = + SinglePortSocketListener::Create(this, sock, addr); + GRPC_RETURN_IF_ERROR(single_port_listener.status()); + auto* single_port_listener_ptr = single_port_listener->get(); + grpc_core::MutexLock lock(&socket_listeners_mu_); + port_listeners_.emplace_back(std::move(*single_port_listener)); + if (started_.load()) { + gpr_log(GPR_ERROR, + "WindowsEventEngineListener::%p Bind was called concurrently while " + "the Listener was starting. This is invalid usage, all ports must " + "be bound before the Listener is started.", + this); + single_port_listener_ptr->Start(); + } + return single_port_listener_ptr; +} + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GPR_WINDOWS diff --git a/src/core/lib/event_engine/windows/windows_listener.h b/src/core/lib/event_engine/windows/windows_listener.h new file mode 100644 index 00000000000..9047180cbdd --- /dev/null +++ b/src/core/lib/event_engine/windows/windows_listener.h @@ -0,0 +1,151 @@ +// Copyright 2023 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_LISTENER_H +#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_LISTENER_H + +#include + +#ifdef GPR_WINDOWS + +#include + +#include "absl/base/thread_annotations.h" +#include "absl/status/statusor.h" + +#include +#include + +#include "src/core/lib/event_engine/common_closures.h" +#include "src/core/lib/event_engine/windows/iocp.h" +#include "src/core/lib/gprpp/sync.h" + +namespace grpc_event_engine { +namespace experimental { + +class WindowsEventEngineListener : public EventEngine::Listener { + public: + WindowsEventEngineListener( + IOCP* iocp, AcceptCallback accept_cb, + absl::AnyInvocable on_shutdown, + std::unique_ptr memory_allocator_factory, + std::shared_ptr engine, Executor* executor_, + const EndpointConfig& config); + ~WindowsEventEngineListener() override; + absl::StatusOr Bind(const EventEngine::ResolvedAddress& addr) override; + absl::Status Start() override; + + private: + /// Responsible for listening on a single port. + class SinglePortSocketListener { + public: + ~SinglePortSocketListener(); + // This factory will create a bound, listening WinSocket, registered with + // the listener's IOCP poller. + static absl::StatusOr> Create( + WindowsEventEngineListener* listener, SOCKET sock, + EventEngine::ResolvedAddress addr); + + // Two-stage initialization, allows creation of all bound sockets before the + // listener is started. + absl::Status Start(); + absl::Status StartLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(io_state_->mu); + + // Accessor methods + EventEngine::ResolvedAddress listener_sockname() { + return listener_sockname_; + }; + int port() { return port_; } + + private: + struct AsyncIOState; + + class OnAcceptCallbackWrapper : public EventEngine::Closure { + public: + void Run() override; + void Prime(std::shared_ptr io_state); + + private: + std::shared_ptr io_state_; + }; + + // A class to manage the data that must outlive the Endpoint. + // + // Once a listener is done and destroyed, there still may be overlapped + // operations pending. To clean up safely, this data must outlive the + // Listener, and be destroyed asynchronously when all pending overlapped + // events are complete. + struct AsyncIOState { + AsyncIOState(SinglePortSocketListener* port_listener, + std::unique_ptr listener_socket); + ~AsyncIOState(); + SinglePortSocketListener* const port_listener; + OnAcceptCallbackWrapper on_accept_cb; + // Synchronize accept handling on the same socket. + grpc_core::Mutex mu; + // This will hold the socket for the next accept. + SOCKET accept_socket ABSL_GUARDED_BY(mu) = INVALID_SOCKET; + // The listener winsocket. + std::unique_ptr listener_socket ABSL_GUARDED_BY(mu); + }; + + SinglePortSocketListener(WindowsEventEngineListener* listener, + LPFN_ACCEPTEX AcceptEx, + std::unique_ptr listener_socket, + int port, EventEngine::ResolvedAddress hostbyname); + + // Bind a recently-created socket for listening + struct PrepareListenerSocketResult { + int port; + EventEngine::ResolvedAddress hostbyname; + }; + static absl::StatusOr PrepareListenerSocket( + SOCKET sock, const EventEngine::ResolvedAddress& addr); + + void OnAcceptCallbackLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(io_state_->mu); + + // The cached AcceptEx for that port. + LPFN_ACCEPTEX AcceptEx; + // This seemingly magic number comes from AcceptEx's documentation. each + // address buffer needs to have at least 16 more bytes at their end. + uint8_t addresses_[(sizeof(sockaddr_in6) + 16) * 2] = {}; + // The parent listener + WindowsEventEngineListener* listener_; + // shared state for asynchronous cleanup of overlapped operations + std::shared_ptr io_state_; + // The actual TCP port number. + int port_; + EventEngine::ResolvedAddress listener_sockname_; + }; + absl::StatusOr AddSinglePortSocketListener( + SOCKET sock, EventEngine::ResolvedAddress addr); + + IOCP* const iocp_; + const EndpointConfig& config_; + std::shared_ptr engine_; + Executor* executor_; + const std::unique_ptr memory_allocator_factory_; + AcceptCallback accept_cb_; + absl::AnyInvocable on_shutdown_; + std::atomic started_{false}; + grpc_core::Mutex socket_listeners_mu_; + std::list> port_listeners_ + ABSL_GUARDED_BY(socket_listeners_mu_); +}; + +} // namespace experimental +} // namespace grpc_event_engine + +#endif + +#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_LISTENER_H diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 8a7c61c2bc5..03190ced1bb 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -517,6 +517,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/event_engine/windows/win_socket.cc', 'src/core/lib/event_engine/windows/windows_endpoint.cc', 'src/core/lib/event_engine/windows/windows_engine.cc', + 'src/core/lib/event_engine/windows/windows_listener.cc', 'src/core/lib/experiments/config.cc', 'src/core/lib/experiments/experiments.cc', 'src/core/lib/gpr/alloc.cc', diff --git a/test/core/event_engine/test_suite/BUILD b/test/core/event_engine/test_suite/BUILD index 0717497904a..203d3d7dff2 100644 --- a/test/core/event_engine/test_suite/BUILD +++ b/test/core/event_engine/test_suite/BUILD @@ -64,12 +64,11 @@ grpc_cc_test( ], uses_polling = False, deps = [ - "//test/core/event_engine/test_suite/tests:timer", "//src/core:windows_event_engine", "//test/core/event_engine:event_engine_test_utils", - # TODO(hork): enable when the listener (or an oracle) is available - # "//test/core/event_engine/test_suite/tests:client", - # "//test/core/event_engine/test_suite/tests:server", + "//test/core/event_engine/test_suite/tests:client", + "//test/core/event_engine/test_suite/tests:server", + "//test/core/event_engine/test_suite/tests:timer", ], ) diff --git a/test/core/event_engine/test_suite/tests/client_test.cc b/test/core/event_engine/test_suite/tests/client_test.cc index 0de1efeacfe..82809c8bac0 100644 --- a/test/core/event_engine/test_suite/tests/client_test.cc +++ b/test/core/event_engine/test_suite/tests/client_test.cc @@ -180,7 +180,7 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { grpc_core::ExecCtx ctx; static constexpr int kNumListenerAddresses = 10; // N static constexpr int kNumConnections = 10; // M - auto oracle_ee = this->NewOracleEventEngine(); + std::shared_ptr oracle_ee(this->NewOracleEventEngine()); std::shared_ptr test_ee(this->NewEventEngine()); auto memory_quota = std::make_unique("bar"); std::unique_ptr server_endpoint; diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index fca829fa5a2..20f926b5755 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -2110,6 +2110,8 @@ src/core/lib/event_engine/windows/windows_endpoint.cc \ src/core/lib/event_engine/windows/windows_endpoint.h \ src/core/lib/event_engine/windows/windows_engine.cc \ src/core/lib/event_engine/windows/windows_engine.h \ +src/core/lib/event_engine/windows/windows_listener.cc \ +src/core/lib/event_engine/windows/windows_listener.h \ src/core/lib/experiments/config.cc \ src/core/lib/experiments/config.h \ src/core/lib/experiments/experiments.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 5ba1ac377dd..c75b0e2ffd5 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1888,6 +1888,8 @@ src/core/lib/event_engine/windows/windows_endpoint.cc \ src/core/lib/event_engine/windows/windows_endpoint.h \ src/core/lib/event_engine/windows/windows_engine.cc \ src/core/lib/event_engine/windows/windows_engine.h \ +src/core/lib/event_engine/windows/windows_listener.cc \ +src/core/lib/event_engine/windows/windows_listener.h \ src/core/lib/experiments/config.cc \ src/core/lib/experiments/config.h \ src/core/lib/experiments/experiments.cc \