[EventEngine] WindowsEventEngine Listener v2 (#32488)

Continued from https://github.com/grpc/grpc/pull/32295 after landing the
cleanup and logging pieces separately.
pull/32493/head
AJ Heller 2 years ago committed by GitHub
parent 2ce147a131
commit 8f7cd1eeed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      CMakeLists.txt
  2. 2
      Makefile
  3. 8
      build_autogenerated.yaml
  4. 1
      config.m4
  5. 1
      config.w32
  6. 2
      gRPC-C++.podspec
  7. 3
      gRPC-Core.podspec
  8. 2
      grpc.gemspec
  9. 3
      grpc.gyp
  10. 2
      package.xml
  11. 28
      src/core/BUILD
  12. 15
      src/core/lib/event_engine/windows/windows_engine.cc
  13. 365
      src/core/lib/event_engine/windows/windows_listener.cc
  14. 151
      src/core/lib/event_engine/windows/windows_listener.h
  15. 1
      src/python/grpcio/grpc_core_dependencies.py
  16. 7
      test/core/event_engine/test_suite/BUILD
  17. 2
      test/core/event_engine/test_suite/tests/client_test.cc
  18. 2
      tools/doxygen/Doxyfile.c++.internal
  19. 2
      tools/doxygen/Doxyfile.core.internal

4
CMakeLists.txt generated

@ -2207,6 +2207,7 @@ add_library(grpc
src/core/lib/event_engine/windows/win_socket.cc
src/core/lib/event_engine/windows/windows_endpoint.cc
src/core/lib/event_engine/windows/windows_engine.cc
src/core/lib/event_engine/windows/windows_listener.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/gprpp/load_file.cc
@ -2894,6 +2895,7 @@ add_library(grpc_unsecure
src/core/lib/event_engine/windows/win_socket.cc
src/core/lib/event_engine/windows/windows_endpoint.cc
src/core/lib/event_engine/windows/windows_engine.cc
src/core/lib/event_engine/windows/windows_listener.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/gprpp/load_file.cc
@ -4411,6 +4413,7 @@ add_library(grpc_authorization_provider
src/core/lib/event_engine/windows/win_socket.cc
src/core/lib/event_engine/windows/windows_endpoint.cc
src/core/lib/event_engine/windows/windows_engine.cc
src/core/lib/event_engine/windows/windows_listener.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/gprpp/load_file.cc
@ -11500,6 +11503,7 @@ add_executable(frame_test
src/core/lib/event_engine/windows/win_socket.cc
src/core/lib/event_engine/windows/windows_endpoint.cc
src/core/lib/event_engine/windows/windows_engine.cc
src/core/lib/event_engine/windows/windows_listener.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/gprpp/load_file.cc

2
Makefile generated

@ -1460,6 +1460,7 @@ LIBGRPC_SRC = \
src/core/lib/event_engine/windows/win_socket.cc \
src/core/lib/event_engine/windows/windows_endpoint.cc \
src/core/lib/event_engine/windows/windows_engine.cc \
src/core/lib/event_engine/windows/windows_listener.cc \
src/core/lib/experiments/config.cc \
src/core/lib/experiments/experiments.cc \
src/core/lib/gprpp/load_file.cc \
@ -2000,6 +2001,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/event_engine/windows/win_socket.cc \
src/core/lib/event_engine/windows/windows_endpoint.cc \
src/core/lib/event_engine/windows/windows_engine.cc \
src/core/lib/event_engine/windows/windows_listener.cc \
src/core/lib/experiments/config.cc \
src/core/lib/experiments/experiments.cc \
src/core/lib/gprpp/load_file.cc \

@ -823,6 +823,7 @@ libs:
- src/core/lib/event_engine/windows/win_socket.h
- src/core/lib/event_engine/windows/windows_endpoint.h
- src/core/lib/event_engine/windows/windows_engine.h
- src/core/lib/event_engine/windows/windows_listener.h
- src/core/lib/experiments/config.h
- src/core/lib/experiments/experiments.h
- src/core/lib/gpr/spinlock.h
@ -1600,6 +1601,7 @@ libs:
- src/core/lib/event_engine/windows/win_socket.cc
- src/core/lib/event_engine/windows/windows_endpoint.cc
- src/core/lib/event_engine/windows/windows_engine.cc
- src/core/lib/event_engine/windows/windows_listener.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/gprpp/load_file.cc
@ -2161,6 +2163,7 @@ libs:
- src/core/lib/event_engine/windows/win_socket.h
- src/core/lib/event_engine/windows/windows_endpoint.h
- src/core/lib/event_engine/windows/windows_engine.h
- src/core/lib/event_engine/windows/windows_listener.h
- src/core/lib/experiments/config.h
- src/core/lib/experiments/experiments.h
- src/core/lib/gpr/spinlock.h
@ -2551,6 +2554,7 @@ libs:
- src/core/lib/event_engine/windows/win_socket.cc
- src/core/lib/event_engine/windows/windows_endpoint.cc
- src/core/lib/event_engine/windows/windows_engine.cc
- src/core/lib/event_engine/windows/windows_listener.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/gprpp/load_file.cc
@ -3620,6 +3624,7 @@ libs:
- src/core/lib/event_engine/windows/win_socket.h
- src/core/lib/event_engine/windows/windows_endpoint.h
- src/core/lib/event_engine/windows/windows_engine.h
- src/core/lib/event_engine/windows/windows_listener.h
- src/core/lib/experiments/config.h
- src/core/lib/experiments/experiments.h
- src/core/lib/gpr/spinlock.h
@ -3889,6 +3894,7 @@ libs:
- src/core/lib/event_engine/windows/win_socket.cc
- src/core/lib/event_engine/windows/windows_endpoint.cc
- src/core/lib/event_engine/windows/windows_engine.cc
- src/core/lib/event_engine/windows/windows_listener.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/gprpp/load_file.cc
@ -7437,6 +7443,7 @@ targets:
- src/core/lib/event_engine/windows/win_socket.h
- src/core/lib/event_engine/windows/windows_endpoint.h
- src/core/lib/event_engine/windows/windows_engine.h
- src/core/lib/event_engine/windows/windows_listener.h
- src/core/lib/experiments/config.h
- src/core/lib/experiments/experiments.h
- src/core/lib/gpr/spinlock.h
@ -7689,6 +7696,7 @@ targets:
- src/core/lib/event_engine/windows/win_socket.cc
- src/core/lib/event_engine/windows/windows_endpoint.cc
- src/core/lib/event_engine/windows/windows_engine.cc
- src/core/lib/event_engine/windows/windows_listener.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/gprpp/load_file.cc

1
config.m4 generated

@ -542,6 +542,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/event_engine/windows/win_socket.cc \
src/core/lib/event_engine/windows/windows_endpoint.cc \
src/core/lib/event_engine/windows/windows_engine.cc \
src/core/lib/event_engine/windows/windows_listener.cc \
src/core/lib/experiments/config.cc \
src/core/lib/experiments/experiments.cc \
src/core/lib/gpr/alloc.cc \

1
config.w32 generated

@ -508,6 +508,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\event_engine\\windows\\win_socket.cc " +
"src\\core\\lib\\event_engine\\windows\\windows_endpoint.cc " +
"src\\core\\lib\\event_engine\\windows\\windows_engine.cc " +
"src\\core\\lib\\event_engine\\windows\\windows_listener.cc " +
"src\\core\\lib\\experiments\\config.cc " +
"src\\core\\lib\\experiments\\experiments.cc " +
"src\\core\\lib\\gpr\\alloc.cc " +

2
gRPC-C++.podspec generated

@ -775,6 +775,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/windows/win_socket.h',
'src/core/lib/event_engine/windows/windows_endpoint.h',
'src/core/lib/event_engine/windows/windows_engine.h',
'src/core/lib/event_engine/windows/windows_listener.h',
'src/core/lib/experiments/config.h',
'src/core/lib/experiments/experiments.h',
'src/core/lib/gpr/alloc.h',
@ -1708,6 +1709,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/windows/win_socket.h',
'src/core/lib/event_engine/windows/windows_endpoint.h',
'src/core/lib/event_engine/windows/windows_engine.h',
'src/core/lib/event_engine/windows/windows_listener.h',
'src/core/lib/experiments/config.h',
'src/core/lib/experiments/experiments.h',
'src/core/lib/gpr/alloc.h',

3
gRPC-Core.podspec generated

@ -1206,6 +1206,8 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/windows/windows_endpoint.h',
'src/core/lib/event_engine/windows/windows_engine.cc',
'src/core/lib/event_engine/windows/windows_engine.h',
'src/core/lib/event_engine/windows/windows_listener.cc',
'src/core/lib/event_engine/windows/windows_listener.h',
'src/core/lib/experiments/config.cc',
'src/core/lib/experiments/config.h',
'src/core/lib/experiments/experiments.cc',
@ -2394,6 +2396,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/windows/win_socket.h',
'src/core/lib/event_engine/windows/windows_endpoint.h',
'src/core/lib/event_engine/windows/windows_engine.h',
'src/core/lib/event_engine/windows/windows_listener.h',
'src/core/lib/experiments/config.h',
'src/core/lib/experiments/experiments.h',
'src/core/lib/gpr/alloc.h',

2
grpc.gemspec generated

@ -1115,6 +1115,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/windows/windows_endpoint.h )
s.files += %w( src/core/lib/event_engine/windows/windows_engine.cc )
s.files += %w( src/core/lib/event_engine/windows/windows_engine.h )
s.files += %w( src/core/lib/event_engine/windows/windows_listener.cc )
s.files += %w( src/core/lib/event_engine/windows/windows_listener.h )
s.files += %w( src/core/lib/experiments/config.cc )
s.files += %w( src/core/lib/experiments/config.h )
s.files += %w( src/core/lib/experiments/experiments.cc )

3
grpc.gyp generated

@ -873,6 +873,7 @@
'src/core/lib/event_engine/windows/win_socket.cc',
'src/core/lib/event_engine/windows/windows_endpoint.cc',
'src/core/lib/event_engine/windows/windows_engine.cc',
'src/core/lib/event_engine/windows/windows_listener.cc',
'src/core/lib/experiments/config.cc',
'src/core/lib/experiments/experiments.cc',
'src/core/lib/gprpp/load_file.cc',
@ -1355,6 +1356,7 @@
'src/core/lib/event_engine/windows/win_socket.cc',
'src/core/lib/event_engine/windows/windows_endpoint.cc',
'src/core/lib/event_engine/windows/windows_engine.cc',
'src/core/lib/event_engine/windows/windows_listener.cc',
'src/core/lib/experiments/config.cc',
'src/core/lib/experiments/experiments.cc',
'src/core/lib/gprpp/load_file.cc',
@ -1861,6 +1863,7 @@
'src/core/lib/event_engine/windows/win_socket.cc',
'src/core/lib/event_engine/windows/windows_endpoint.cc',
'src/core/lib/event_engine/windows/windows_engine.cc',
'src/core/lib/event_engine/windows/windows_listener.cc',
'src/core/lib/experiments/config.cc',
'src/core/lib/experiments/experiments.cc',
'src/core/lib/gprpp/load_file.cc',

2
package.xml generated

@ -1097,6 +1097,8 @@
<file baseinstalldir="/" name="src/core/lib/event_engine/windows/windows_endpoint.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/windows/windows_engine.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/windows/windows_engine.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/windows/windows_listener.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/windows/windows_listener.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/experiments/config.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/experiments/config.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/experiments/experiments.cc" role="src" />

@ -1992,6 +1992,7 @@ grpc_cc_library(
"posix_event_engine_timer_manager",
"time",
"windows_endpoint",
"windows_event_engine_listener",
"windows_iocp",
"//:event_engine_base_hdrs",
"//:gpr",
@ -2055,6 +2056,33 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "windows_event_engine_listener",
srcs = [
"lib/event_engine/windows/windows_listener.cc",
],
hdrs = [
"lib/event_engine/windows/windows_listener.h",
],
external_deps = [
"absl/base:core_headers",
"absl/status",
"absl/status:statusor",
"absl/strings:str_format",
],
deps = [
"common_event_engine_closures",
"error",
"event_engine_tcp_socket_utils",
"event_engine_trace",
"windows_endpoint",
"windows_iocp",
"//:event_engine_base_hdrs",
"//:gpr",
"//:gpr_platform",
],
)
grpc_cc_library(
name = "cf_event_engine",
srcs = ["lib/event_engine/cf_engine/cf_engine.cc"],

@ -37,6 +37,7 @@
#include "src/core/lib/event_engine/windows/iocp.h"
#include "src/core/lib/event_engine/windows/windows_endpoint.h"
#include "src/core/lib/event_engine/windows/windows_engine.h"
#include "src/core/lib/event_engine/windows/windows_listener.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
@ -377,13 +378,15 @@ bool WindowsEventEngine::CancelConnectInternalStateLocked(
absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
WindowsEventEngine::CreateListener(
Listener::AcceptCallback /* on_accept */,
absl::AnyInvocable<void(absl::Status)> /* on_shutdown */,
const EndpointConfig& /* config */,
std::unique_ptr<MemoryAllocatorFactory> /* memory_allocator_factory */) {
grpc_core::Crash("unimplemented");
Listener::AcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
return std::make_unique<WindowsEventEngineListener>(
&iocp_, std::move(on_accept), std::move(on_shutdown),
std::move(memory_allocator_factory), shared_from_this(), executor_.get(),
config);
}
} // namespace experimental
} // namespace grpc_event_engine

@ -0,0 +1,365 @@
// Copyright 2023 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#ifdef GPR_WINDOWS
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/event_engine/windows/iocp.h"
#include "src/core/lib/event_engine/windows/win_socket.h"
#include "src/core/lib/event_engine/windows/windows_endpoint.h"
#include "src/core/lib/event_engine/windows/windows_listener.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/error.h"
namespace grpc_event_engine {
namespace experimental {
// ---- SinglePortSocketListener::AsyncIOState ----
WindowsEventEngineListener::SinglePortSocketListener::AsyncIOState::
AsyncIOState(SinglePortSocketListener* port_listener,
std::unique_ptr<WinSocket> listener_socket)
: port_listener(port_listener),
listener_socket(std::move(listener_socket)) {}
WindowsEventEngineListener::SinglePortSocketListener::AsyncIOState::
~AsyncIOState() {
closesocket(accept_socket);
}
void WindowsEventEngineListener::SinglePortSocketListener::
OnAcceptCallbackWrapper::Run() {
GPR_ASSERT(io_state_ != nullptr);
grpc_core::ReleasableMutexLock lock(&io_state_->mu);
if (io_state_->listener_socket->IsShutdown()) {
GRPC_EVENT_ENGINE_TRACE(
"SinglePortSocketListener::%p listener socket is shut down. Shutting "
"down listener.",
io_state_->port_listener);
lock.Release();
io_state_.reset();
return;
}
io_state_->port_listener->OnAcceptCallbackLocked();
}
void WindowsEventEngineListener::SinglePortSocketListener::
OnAcceptCallbackWrapper::Prime(std::shared_ptr<AsyncIOState> io_state) {
io_state_ = std::move(io_state);
}
// ---- SinglePortSocketListener ----
WindowsEventEngineListener::SinglePortSocketListener::
~SinglePortSocketListener() {
io_state_->listener_socket->Shutdown(DEBUG_LOCATION,
"~SinglePortSocketListener");
GRPC_EVENT_ENGINE_TRACE("~SinglePortSocketListener::%p", this);
}
absl::StatusOr<
std::unique_ptr<WindowsEventEngineListener::SinglePortSocketListener>>
WindowsEventEngineListener::SinglePortSocketListener::Create(
WindowsEventEngineListener* listener, SOCKET sock,
EventEngine::ResolvedAddress addr) {
// We need to grab the AcceptEx pointer for that port, as it may be
// interface-dependent. We'll cache it to avoid doing that again.
GUID guid = WSAID_ACCEPTEX;
DWORD ioctl_num_bytes;
LPFN_ACCEPTEX AcceptEx;
int status =
WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
&AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);
if (status != 0) {
auto error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
closesocket(sock);
return error;
}
auto result = SinglePortSocketListener::PrepareListenerSocket(sock, addr);
GRPC_RETURN_IF_ERROR(result.status());
GPR_ASSERT(result->port >= 0);
// Using `new` to access non-public constructor
return absl::WrapUnique(new SinglePortSocketListener(
listener, AcceptEx, /*win_socket=*/listener->iocp_->Watch(sock),
result->port, result->hostbyname));
}
absl::Status WindowsEventEngineListener::SinglePortSocketListener::Start() {
grpc_core::MutexLock lock(&io_state_->mu);
return StartLocked();
}
absl::Status
WindowsEventEngineListener::SinglePortSocketListener::StartLocked() {
SOCKET accept_socket = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
IOCP::GetDefaultSocketFlags());
if (accept_socket == INVALID_SOCKET) {
return GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
}
auto fail = [&](absl::Status error) -> absl::Status {
if (accept_socket != INVALID_SOCKET) closesocket(accept_socket);
return error;
};
auto error = PrepareSocket(accept_socket);
if (!error.ok()) return fail(error);
// Start the "accept" asynchronously.
DWORD addrlen = sizeof(sockaddr_in6) + 16;
DWORD bytes_received = 0;
int success =
AcceptEx(io_state_->listener_socket->raw_socket(), accept_socket,
addresses_, 0, addrlen, addrlen, &bytes_received,
io_state_->listener_socket->read_info()->overlapped());
// It is possible to get an accept immediately without delay. However, we
// will still get an IOCP notification for it. So let's just ignore it.
if (success != 0) {
int last_error = WSAGetLastError();
if (last_error != ERROR_IO_PENDING) {
return fail(GRPC_WSA_ERROR(last_error, "AcceptEx"));
}
}
// We're ready to do the accept. Calling NotifyOnRead may immediately process
// an accept that happened in the meantime.
io_state_->accept_socket = accept_socket;
io_state_->listener_socket->NotifyOnRead(&io_state_->on_accept_cb);
GRPC_EVENT_ENGINE_TRACE(
"SinglePortSocketListener::%p listening. listener_socket::%p", this,
io_state_->listener_socket.get());
return absl::OkStatus();
}
void WindowsEventEngineListener::SinglePortSocketListener::
OnAcceptCallbackLocked() {
auto close_socket_and_restart =
[&](bool do_close_socket = true)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(io_state_->mu) {
if (do_close_socket) closesocket(io_state_->accept_socket);
io_state_->accept_socket = INVALID_SOCKET;
GPR_ASSERT(GRPC_LOG_IF_ERROR("SinglePortSocketListener::Start",
StartLocked()));
};
const auto& overlapped_result =
io_state_->listener_socket->read_info()->result();
if (overlapped_result.wsa_error != 0) {
gpr_log(GPR_ERROR, "%s",
GRPC_WSA_ERROR(overlapped_result.wsa_error,
"Skipping on_accept due to error")
.ToString()
.c_str());
return close_socket_and_restart();
}
SOCKET tmp_listener_socket = io_state_->listener_socket->raw_socket();
int err =
setsockopt(io_state_->accept_socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
reinterpret_cast<char*>(&tmp_listener_socket),
sizeof(tmp_listener_socket));
if (err != 0) {
gpr_log(GPR_ERROR, "%s",
GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt").ToString().c_str());
return close_socket_and_restart();
}
EventEngine::ResolvedAddress peer_address;
int peer_name_len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
err = getpeername(io_state_->accept_socket,
const_cast<sockaddr*>(peer_address.address()),
&peer_name_len);
if (err != 0) {
gpr_log(
GPR_ERROR, "%s",
GRPC_WSA_ERROR(WSAGetLastError(), "getpeername").ToString().c_str());
return close_socket_and_restart();
}
peer_address =
EventEngine::ResolvedAddress(peer_address.address(), peer_name_len);
auto addr_uri = ResolvedAddressToURI(peer_address);
std::string peer_name = "unknown";
if (!addr_uri.ok()) {
// TODO(hork): test an early exit/restart here with end2end tests
gpr_log(GPR_ERROR, "invalid peer name: %s",
addr_uri.status().ToString().c_str());
} else {
peer_name = *addr_uri;
}
auto endpoint = std::make_unique<WindowsEndpoint>(
peer_address, listener_->iocp_->Watch(io_state_->accept_socket),
listener_->memory_allocator_factory_->CreateMemoryAllocator(
absl::StrFormat("listener endpoint %s", peer_name)),
listener_->config_, listener_->executor_);
listener_->accept_cb_(
std::move(endpoint),
listener_->memory_allocator_factory_->CreateMemoryAllocator(
absl::StrFormat("listener accept cb for %s", peer_name)));
close_socket_and_restart(/*do_close_socket=*/false);
}
WindowsEventEngineListener::SinglePortSocketListener::SinglePortSocketListener(
WindowsEventEngineListener* listener, LPFN_ACCEPTEX AcceptEx,
std::unique_ptr<WinSocket> listener_socket, int port,
EventEngine::ResolvedAddress hostbyname)
: AcceptEx(AcceptEx),
listener_(listener),
io_state_(
std::make_shared<AsyncIOState>(this, std::move(listener_socket))),
port_(port),
listener_sockname_(hostbyname) {
io_state_->on_accept_cb.Prime(io_state_);
}
absl::StatusOr<WindowsEventEngineListener::SinglePortSocketListener::
PrepareListenerSocketResult>
WindowsEventEngineListener::SinglePortSocketListener::PrepareListenerSocket(
SOCKET sock, const EventEngine::ResolvedAddress& addr) {
auto fail = [&](absl::Status error) -> absl::Status {
GPR_ASSERT(!error.ok());
auto addr_uri = ResolvedAddressToURI(addr);
error = grpc_error_set_int(
grpc_error_set_str(
GRPC_ERROR_CREATE_REFERENCING("Failed to prepare server socket",
&error, 1),
grpc_core::StatusStrProperty::kTargetAddress,
addr_uri.ok() ? *addr_uri : addr_uri.status().ToString()),
grpc_core::StatusIntProperty::kFd, static_cast<intptr_t>(sock));
if (sock != INVALID_SOCKET) closesocket(sock);
return error;
};
auto error = PrepareSocket(sock);
if (!error.ok()) return fail(error);
if (bind(sock, addr.address(), addr.size()) == SOCKET_ERROR) {
return fail(GRPC_WSA_ERROR(WSAGetLastError(), "bind"));
}
if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
return fail(GRPC_WSA_ERROR(WSAGetLastError(), "listen"));
}
int sockname_temp_len = sizeof(struct sockaddr_storage);
EventEngine::ResolvedAddress sockname_temp;
if (getsockname(sock, const_cast<sockaddr*>(sockname_temp.address()),
&sockname_temp_len) == SOCKET_ERROR) {
return fail(GRPC_WSA_ERROR(WSAGetLastError(), "getsockname"));
}
sockname_temp =
EventEngine::ResolvedAddress(sockname_temp.address(), sockname_temp_len);
return PrepareListenerSocketResult{ResolvedAddressGetPort(sockname_temp),
sockname_temp};
}
// ---- WindowsEventEngineListener ----
WindowsEventEngineListener::WindowsEventEngineListener(
IOCP* iocp, AcceptCallback accept_cb,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory,
std::shared_ptr<EventEngine> engine, Executor* executor,
const EndpointConfig& config)
: iocp_(iocp),
config_(config),
engine_(std::move(engine)),
executor_(executor),
memory_allocator_factory_(std::move(memory_allocator_factory)),
accept_cb_(std::move(accept_cb)),
on_shutdown_(std::move(on_shutdown)) {}
WindowsEventEngineListener::~WindowsEventEngineListener() {
GRPC_EVENT_ENGINE_TRACE(
"%s",
absl::StrFormat("WindowsEventEngineListener::%p shutting down", this)
.c_str());
// Shut down each port listener before destroying this EventEngine::Listener
for (auto& port_listener : port_listeners_) {
port_listener.reset();
}
on_shutdown_(absl::OkStatus());
}
absl::StatusOr<int> WindowsEventEngineListener::Bind(
const EventEngine::ResolvedAddress& addr) {
if (started_.load()) {
return absl::FailedPreconditionError(
absl::StrFormat("WindowsEventEngineListener::%p is already started, "
"ports can no longer be bound",
this));
}
int out_port = ResolvedAddressGetPort(addr);
EventEngine::ResolvedAddress out_addr(addr);
EventEngine::ResolvedAddress tmp_addr;
// Check if this is a wildcard port, and if so, try to keep the port the same
// as some previously created listener.
if (out_port == 0) {
grpc_core::MutexLock lock(&socket_listeners_mu_);
for (const auto& port_listener : port_listeners_) {
tmp_addr = port_listener->listener_sockname();
out_port = ResolvedAddressGetPort(tmp_addr);
if (out_port > 0) {
ResolvedAddressSetPort(out_addr, out_port);
break;
}
}
}
if (ResolvedAddressToV4Mapped(out_addr, &tmp_addr)) {
out_addr = tmp_addr;
}
// Treat :: or 0.0.0.0 as a family-agnostic wildcard.
if (ResolvedAddressIsWildcard(out_addr)) {
out_addr = ResolvedAddressMakeWild6(out_port);
}
// open the socket
SOCKET sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, nullptr, 0,
IOCP::GetDefaultSocketFlags());
if (sock == INVALID_SOCKET) {
auto error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
return GRPC_ERROR_CREATE_REFERENCING("Failed to add port to server", &error,
1);
}
auto port_listener = AddSinglePortSocketListener(sock, out_addr);
GRPC_RETURN_IF_ERROR(port_listener.status());
return (*port_listener)->port();
}
absl::Status WindowsEventEngineListener::Start() {
GPR_ASSERT(!started_.exchange(true));
grpc_core::MutexLock lock(&socket_listeners_mu_);
for (auto& port_listener : port_listeners_) {
GRPC_RETURN_IF_ERROR(port_listener->Start());
}
return absl::OkStatus();
}
absl::StatusOr<WindowsEventEngineListener::SinglePortSocketListener*>
WindowsEventEngineListener::AddSinglePortSocketListener(
SOCKET sock, EventEngine::ResolvedAddress addr) {
auto single_port_listener =
SinglePortSocketListener::Create(this, sock, addr);
GRPC_RETURN_IF_ERROR(single_port_listener.status());
auto* single_port_listener_ptr = single_port_listener->get();
grpc_core::MutexLock lock(&socket_listeners_mu_);
port_listeners_.emplace_back(std::move(*single_port_listener));
if (started_.load()) {
gpr_log(GPR_ERROR,
"WindowsEventEngineListener::%p Bind was called concurrently while "
"the Listener was starting. This is invalid usage, all ports must "
"be bound before the Listener is started.",
this);
single_port_listener_ptr->Start();
}
return single_port_listener_ptr;
}
} // namespace experimental
} // namespace grpc_event_engine
#endif // GPR_WINDOWS

@ -0,0 +1,151 @@
// Copyright 2023 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_LISTENER_H
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_LISTENER_H
#include <grpc/support/port_platform.h>
#ifdef GPR_WINDOWS
#include <list>
#include "absl/base/thread_annotations.h"
#include "absl/status/statusor.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include "src/core/lib/event_engine/common_closures.h"
#include "src/core/lib/event_engine/windows/iocp.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_event_engine {
namespace experimental {
class WindowsEventEngineListener : public EventEngine::Listener {
public:
WindowsEventEngineListener(
IOCP* iocp, AcceptCallback accept_cb,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory,
std::shared_ptr<EventEngine> engine, Executor* executor_,
const EndpointConfig& config);
~WindowsEventEngineListener() override;
absl::StatusOr<int> Bind(const EventEngine::ResolvedAddress& addr) override;
absl::Status Start() override;
private:
/// Responsible for listening on a single port.
class SinglePortSocketListener {
public:
~SinglePortSocketListener();
// This factory will create a bound, listening WinSocket, registered with
// the listener's IOCP poller.
static absl::StatusOr<std::unique_ptr<SinglePortSocketListener>> Create(
WindowsEventEngineListener* listener, SOCKET sock,
EventEngine::ResolvedAddress addr);
// Two-stage initialization, allows creation of all bound sockets before the
// listener is started.
absl::Status Start();
absl::Status StartLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(io_state_->mu);
// Accessor methods
EventEngine::ResolvedAddress listener_sockname() {
return listener_sockname_;
};
int port() { return port_; }
private:
struct AsyncIOState;
class OnAcceptCallbackWrapper : public EventEngine::Closure {
public:
void Run() override;
void Prime(std::shared_ptr<AsyncIOState> io_state);
private:
std::shared_ptr<AsyncIOState> io_state_;
};
// A class to manage the data that must outlive the Endpoint.
//
// Once a listener is done and destroyed, there still may be overlapped
// operations pending. To clean up safely, this data must outlive the
// Listener, and be destroyed asynchronously when all pending overlapped
// events are complete.
struct AsyncIOState {
AsyncIOState(SinglePortSocketListener* port_listener,
std::unique_ptr<WinSocket> listener_socket);
~AsyncIOState();
SinglePortSocketListener* const port_listener;
OnAcceptCallbackWrapper on_accept_cb;
// Synchronize accept handling on the same socket.
grpc_core::Mutex mu;
// This will hold the socket for the next accept.
SOCKET accept_socket ABSL_GUARDED_BY(mu) = INVALID_SOCKET;
// The listener winsocket.
std::unique_ptr<WinSocket> listener_socket ABSL_GUARDED_BY(mu);
};
SinglePortSocketListener(WindowsEventEngineListener* listener,
LPFN_ACCEPTEX AcceptEx,
std::unique_ptr<WinSocket> listener_socket,
int port, EventEngine::ResolvedAddress hostbyname);
// Bind a recently-created socket for listening
struct PrepareListenerSocketResult {
int port;
EventEngine::ResolvedAddress hostbyname;
};
static absl::StatusOr<PrepareListenerSocketResult> PrepareListenerSocket(
SOCKET sock, const EventEngine::ResolvedAddress& addr);
void OnAcceptCallbackLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(io_state_->mu);
// The cached AcceptEx for that port.
LPFN_ACCEPTEX AcceptEx;
// This seemingly magic number comes from AcceptEx's documentation. each
// address buffer needs to have at least 16 more bytes at their end.
uint8_t addresses_[(sizeof(sockaddr_in6) + 16) * 2] = {};
// The parent listener
WindowsEventEngineListener* listener_;
// shared state for asynchronous cleanup of overlapped operations
std::shared_ptr<AsyncIOState> io_state_;
// The actual TCP port number.
int port_;
EventEngine::ResolvedAddress listener_sockname_;
};
absl::StatusOr<SinglePortSocketListener*> AddSinglePortSocketListener(
SOCKET sock, EventEngine::ResolvedAddress addr);
IOCP* const iocp_;
const EndpointConfig& config_;
std::shared_ptr<EventEngine> engine_;
Executor* executor_;
const std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory_;
AcceptCallback accept_cb_;
absl::AnyInvocable<void(absl::Status)> on_shutdown_;
std::atomic<bool> started_{false};
grpc_core::Mutex socket_listeners_mu_;
std::list<std::unique_ptr<SinglePortSocketListener>> port_listeners_
ABSL_GUARDED_BY(socket_listeners_mu_);
};
} // namespace experimental
} // namespace grpc_event_engine
#endif
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_LISTENER_H

@ -517,6 +517,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/event_engine/windows/win_socket.cc',
'src/core/lib/event_engine/windows/windows_endpoint.cc',
'src/core/lib/event_engine/windows/windows_engine.cc',
'src/core/lib/event_engine/windows/windows_listener.cc',
'src/core/lib/experiments/config.cc',
'src/core/lib/experiments/experiments.cc',
'src/core/lib/gpr/alloc.cc',

@ -64,12 +64,11 @@ grpc_cc_test(
],
uses_polling = False,
deps = [
"//test/core/event_engine/test_suite/tests:timer",
"//src/core:windows_event_engine",
"//test/core/event_engine:event_engine_test_utils",
# TODO(hork): enable when the listener (or an oracle) is available
# "//test/core/event_engine/test_suite/tests:client",
# "//test/core/event_engine/test_suite/tests:server",
"//test/core/event_engine/test_suite/tests:client",
"//test/core/event_engine/test_suite/tests:server",
"//test/core/event_engine/test_suite/tests:timer",
],
)

@ -180,7 +180,7 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
grpc_core::ExecCtx ctx;
static constexpr int kNumListenerAddresses = 10; // N
static constexpr int kNumConnections = 10; // M
auto oracle_ee = this->NewOracleEventEngine();
std::shared_ptr<EventEngine> oracle_ee(this->NewOracleEventEngine());
std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
std::unique_ptr<EventEngine::Endpoint> server_endpoint;

@ -2110,6 +2110,8 @@ src/core/lib/event_engine/windows/windows_endpoint.cc \
src/core/lib/event_engine/windows/windows_endpoint.h \
src/core/lib/event_engine/windows/windows_engine.cc \
src/core/lib/event_engine/windows/windows_engine.h \
src/core/lib/event_engine/windows/windows_listener.cc \
src/core/lib/event_engine/windows/windows_listener.h \
src/core/lib/experiments/config.cc \
src/core/lib/experiments/config.h \
src/core/lib/experiments/experiments.cc \

@ -1888,6 +1888,8 @@ src/core/lib/event_engine/windows/windows_endpoint.cc \
src/core/lib/event_engine/windows/windows_endpoint.h \
src/core/lib/event_engine/windows/windows_engine.cc \
src/core/lib/event_engine/windows/windows_engine.h \
src/core/lib/event_engine/windows/windows_listener.cc \
src/core/lib/event_engine/windows/windows_listener.h \
src/core/lib/experiments/config.cc \
src/core/lib/experiments/config.h \
src/core/lib/experiments/experiments.cc \

Loading…
Cancel
Save