@ -26,33 +26,67 @@
# include <grpc/event_engine/memory_allocator.h>
# include <grpc/event_engine/slice_buffer.h>
# include "src/core/lib/event_engine/channel_args_endpoint_config.h"
# include "src/core/lib/event_engine/common_closures.h"
# include "src/core/lib/event_engine/executor/executor.h"
# include "src/core/lib/event_engine/handle_containers.h"
# include "src/core/lib/event_engine/posix_engine/timer_manager.h"
# include "src/core/lib/event_engine/tcp_socket_utils.h"
# include "src/core/lib/event_engine/trace.h"
# include "src/core/lib/event_engine/utils.h"
# include "src/core/lib/event_engine/windows/iocp.h"
# include "src/core/lib/event_engine/windows/windows_endpoint.h"
# include "src/core/lib/event_engine/windows/windows_engine.h"
# include "src/core/lib/gprpp/sync.h"
# include "src/core/lib/gprpp/time.h"
# include "src/core/lib/iomgr/error.h"
namespace grpc_event_engine {
namespace experimental {
// ---- IOCPWorkClosure ----
WindowsEventEngine : : IOCPWorkClosure : : IOCPWorkClosure ( Executor * executor ,
IOCP * iocp )
: executor_ ( executor ) , iocp_ ( iocp ) {
executor_ - > Run ( this ) ;
}
void WindowsEventEngine : : IOCPWorkClosure : : Run ( ) {
auto result = iocp_ - > Work ( std : : chrono : : seconds ( 60 ) , [ this ] {
workers_ . fetch_add ( 1 ) ;
executor_ - > Run ( this ) ;
} ) ;
if ( result = = Poller : : WorkResult : : kDeadlineExceeded ) {
// iocp received no messages. restart the worker
workers_ . fetch_add ( 1 ) ;
executor_ - > Run ( this ) ;
}
if ( workers_ . fetch_sub ( 1 ) = = 1 ) done_signal_ . Notify ( ) ;
}
void WindowsEventEngine : : IOCPWorkClosure : : WaitForShutdown ( ) {
done_signal_ . WaitForNotification ( ) ;
}
// ---- WindowsEventEngine ----
// TODO(hork): The iomgr timer and execution engine can be reused. It should
// be separated out from the posix_engine and instantiated as components. It is
// effectively copied below.
struct WindowsEventEngine : : Closure final : public EventEngine : : Closure {
struct WindowsEventEngine : : Timer Closure final : public EventEngine : : Closure {
absl : : AnyInvocable < void ( ) > cb ;
Timer timer ;
WindowsEventEngine * engine ;
EventEngine : : TaskHandle handle ;
void Run ( ) override {
GRPC_EVENT_ENGINE_TRACE ( " WindowsEventEngine:%p executing callback:%s " ,
engine , HandleToString ( handle ) . c_str ( ) ) ;
GRPC_EVENT_ENGINE_TRACE (
" WindowsEventEngine:%p executing callback:%s " , engine ,
HandleToString < EventEngine : : TaskHandle > ( handle ) . c_str ( ) ) ;
{
grpc_core : : MutexLock lock ( & engine - > mu_ ) ;
grpc_core : : MutexLock lock ( & engine - > task_ mu_) ;
engine - > known_handles_ . erase ( handle ) ;
}
cb ( ) ;
@ -63,7 +97,8 @@ struct WindowsEventEngine::Closure final : public EventEngine::Closure {
WindowsEventEngine : : WindowsEventEngine ( )
: executor_ ( std : : make_shared < ThreadPool > ( ) ) ,
iocp_ ( executor_ . get ( ) ) ,
timer_manager_ ( executor_ ) {
timer_manager_ ( executor_ ) ,
iocp_worker_ ( executor_ . get ( ) , & iocp_ ) {
WSADATA wsaData ;
int status = WSAStartup ( MAKEWORD ( 2 , 0 ) , & wsaData ) ;
GPR_ASSERT ( status = = 0 ) ;
@ -71,25 +106,28 @@ WindowsEventEngine::WindowsEventEngine()
WindowsEventEngine : : ~ WindowsEventEngine ( ) {
{
grpc_core : : MutexLock lock ( & mu_ ) ;
grpc_core : : MutexLock lock ( & task_ mu_) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_event_engine_trace ) ) {
for ( auto handle : known_handles_ ) {
gpr_log ( GPR_ERROR ,
" WindowsEventEngine:%p uncleared TaskHandle at shutdown:%s " ,
this , HandleToString ( handle ) . c_str ( ) ) ;
this , HandleToString < EventEngine : : TaskHandle > ( handle ) . c_str ( ) ) ;
}
}
GPR_ASSERT ( GPR_LIKELY ( known_handles_ . empty ( ) ) ) ;
GPR_ASSERT ( WSACleanup ( ) = = 0 ) ;
timer_manager_ . Shutdown ( ) ;
}
iocp_ . Kick ( ) ;
iocp_worker_ . WaitForShutdown ( ) ;
iocp_ . Shutdown ( ) ;
GPR_ASSERT ( WSACleanup ( ) = = 0 ) ;
timer_manager_ . Shutdown ( ) ;
executor_ - > Quiesce ( ) ;
}
bool WindowsEventEngine : : Cancel ( EventEngine : : TaskHandle handle ) {
grpc_core : : MutexLock lock ( & mu_ ) ;
grpc_core : : MutexLock lock ( & task_ mu_) ;
if ( ! known_handles_ . contains ( handle ) ) return false ;
auto * cd = reinterpret_cast < Closure * > ( handle . keys [ 0 ] ) ;
auto * cd = reinterpret_cast < Timer Closure* > ( handle . keys [ 0 ] ) ;
bool r = timer_manager_ . TimerCancel ( & cd - > timer ) ;
known_handles_ . erase ( handle ) ;
if ( r ) delete cd ;
@ -117,16 +155,17 @@ void WindowsEventEngine::Run(EventEngine::Closure* closure) {
EventEngine : : TaskHandle WindowsEventEngine : : RunAfterInternal (
Duration when , absl : : AnyInvocable < void ( ) > cb ) {
auto when_ts = ToTimestamp ( timer_manager_ . Now ( ) , when ) ;
auto * cd = new Closure ;
auto * cd = new Timer Closure;
cd - > cb = std : : move ( cb ) ;
cd - > engine = this ;
EventEngine : : TaskHandle handle { reinterpret_cast < intptr_t > ( cd ) ,
aba_token_ . fetch_add ( 1 ) } ;
grpc_core : : MutexLock lock ( & mu_ ) ;
grpc_core : : MutexLock lock ( & task_ mu_) ;
known_handles_ . insert ( handle ) ;
cd - > handle = handle ;
GRPC_EVENT_ENGINE_TRACE ( " WindowsEventEngine:%p scheduling callback:%s " , this ,
HandleToString ( handle ) . c_str ( ) ) ;
GRPC_EVENT_ENGINE_TRACE (
" WindowsEventEngine:%p scheduling callback:%s " , this ,
HandleToString < EventEngine : : TaskHandle > ( handle ) . c_str ( ) ) ;
timer_manager_ . TimerInit ( & cd - > timer , when_ts , cd ) ;
return handle ;
}
@ -140,15 +179,194 @@ bool WindowsEventEngine::IsWorkerThread() {
GPR_ASSERT ( false & & " unimplemented " ) ;
}
bool WindowsEventEngine : : CancelConnect ( EventEngine : : ConnectionHandle handle ) {
GPR_ASSERT ( false & & " unimplemented " ) ;
void WindowsEventEngine : : OnConnectCompleted (
std : : shared_ptr < ConnectionState > state ) {
// Connection attempt complete!
grpc_core : : MutexLock lock ( & state - > mu ) ;
state - > on_connected = nullptr ;
{
grpc_core : : MutexLock handle_lock ( & connection_mu_ ) ;
known_connection_handles_ . erase ( state - > connection_handle ) ;
}
// return early if we cannot cancel the connection timeout timer.
if ( ! Cancel ( state - > timer_handle ) ) return ;
auto write_info = state - > socket - > write_info ( ) ;
if ( write_info - > wsa_error ( ) ! = 0 ) {
auto error = GRPC_WSA_ERROR ( write_info - > wsa_error ( ) , " ConnectEx " ) ;
state - > socket - > MaybeShutdown ( error ) ;
state - > on_connected_user_callback ( error ) ;
return ;
}
// This code should be running in an executor thread already, so the callback
// can be run directly.
ChannelArgsEndpointConfig cfg ;
state - > on_connected_user_callback ( std : : make_unique < WindowsEndpoint > (
state - > address , std : : move ( state - > socket ) , std : : move ( state - > allocator ) ,
cfg , executor_ . get ( ) ) ) ;
}
EventEngine : : ConnectionHandle WindowsEventEngine : : Connect (
OnConnectCallback on_connect , const ResolvedAddress & addr ,
const EndpointConfig & args , MemoryAllocator memory_allocator ,
Duration deadline ) {
GPR_ASSERT ( false & & " unimplemented " ) ;
const EndpointConfig & /* args */ , MemoryAllocator memory_allocator ,
Duration timeout ) {
// TODO(hork): utilize the endpoint config
absl : : Status status ;
int istatus ;
auto uri = ResolvedAddressToURI ( addr ) ;
if ( ! uri . ok ( ) ) {
Run ( [ on_connect = std : : move ( on_connect ) , status = uri . status ( ) ] ( ) mutable {
on_connect ( status ) ;
} ) ;
return invalid_connection_handle ;
}
GRPC_EVENT_ENGINE_TRACE ( " EventEngine::%p connecting to %s " , this ,
uri - > c_str ( ) ) ;
// Use dualstack sockets where available.
ResolvedAddress address = addr ;
ResolvedAddress addr6_v4mapped ;
if ( ResolvedAddressToV4Mapped ( addr , & addr6_v4mapped ) ) {
address = addr6_v4mapped ;
}
SOCKET sock = WSASocket ( AF_INET6 , SOCK_STREAM , IPPROTO_TCP , nullptr , 0 ,
IOCP : : GetDefaultSocketFlags ( ) ) ;
if ( sock = = INVALID_SOCKET ) {
Run ( [ on_connect = std : : move ( on_connect ) ,
status = GRPC_WSA_ERROR ( WSAGetLastError ( ) , " WSASocket " ) ] ( ) mutable {
on_connect ( status ) ;
} ) ;
return invalid_connection_handle ;
}
status = PrepareSocket ( sock ) ;
if ( ! status . ok ( ) ) {
Run ( [ on_connect = std : : move ( on_connect ) , status ] ( ) mutable {
on_connect ( status ) ;
} ) ;
return invalid_connection_handle ;
}
// Grab the function pointer for ConnectEx for that specific socket It may
// change depending on the interface.
LPFN_CONNECTEX ConnectEx ;
GUID guid = WSAID_CONNECTEX ;
DWORD ioctl_num_bytes ;
istatus = WSAIoctl ( sock , SIO_GET_EXTENSION_FUNCTION_POINTER , & guid ,
sizeof ( guid ) , & ConnectEx , sizeof ( ConnectEx ) ,
& ioctl_num_bytes , nullptr , nullptr ) ;
if ( istatus ! = 0 ) {
Run ( [ on_connect = std : : move ( on_connect ) ,
status = GRPC_WSA_ERROR (
WSAGetLastError ( ) ,
" WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) " ) ] ( ) mutable {
on_connect ( status ) ;
} ) ;
return invalid_connection_handle ;
}
// bind the local address
auto local_address = ResolvedAddressMakeWild6 ( 0 ) ;
istatus = bind ( sock , local_address . address ( ) , local_address . size ( ) ) ;
if ( istatus ! = 0 ) {
Run ( [ on_connect = std : : move ( on_connect ) ,
status = GRPC_WSA_ERROR ( WSAGetLastError ( ) , " bind " ) ] ( ) mutable {
on_connect ( status ) ;
} ) ;
return invalid_connection_handle ;
}
// Connect
auto watched_socket = iocp_ . Watch ( sock ) ;
auto * info = watched_socket - > write_info ( ) ;
bool success =
ConnectEx ( watched_socket - > socket ( ) , address . address ( ) , address . size ( ) ,
nullptr , 0 , nullptr , info - > overlapped ( ) ) ;
// It wouldn't be unusual to get a success immediately. But we'll still get an
// IOCP notification, so let's ignore it.
if ( ! success ) {
int last_error = WSAGetLastError ( ) ;
if ( last_error ! = ERROR_IO_PENDING ) {
auto status = GRPC_WSA_ERROR ( WSAGetLastError ( ) , " ConnectEx " ) ;
Run ( [ on_connect = std : : move ( on_connect ) , status ] ( ) mutable {
on_connect ( status ) ;
} ) ;
watched_socket - > MaybeShutdown ( status ) ;
return invalid_connection_handle ;
}
}
GPR_ASSERT ( watched_socket ! = nullptr ) ;
auto connection_state = std : : make_shared < ConnectionState > ( ) ;
grpc_core : : MutexLock lock ( & connection_state - > mu ) ;
connection_state - > address = address ;
connection_state - > socket = std : : move ( watched_socket ) ;
connection_state - > on_connected_user_callback = std : : move ( on_connect ) ;
connection_state - > allocator = std : : move ( memory_allocator ) ;
connection_state - > on_connected =
SelfDeletingClosure : : Create ( [ this , connection_state ] ( ) mutable {
OnConnectCompleted ( std : : move ( connection_state ) ) ;
} ) ;
{
grpc_core : : MutexLock conn_lock ( & connection_mu_ ) ;
connection_state - > connection_handle =
ConnectionHandle { reinterpret_cast < intptr_t > ( connection_state . get ( ) ) ,
aba_token_ . fetch_add ( 1 ) } ;
known_connection_handles_ . insert ( connection_state - > connection_handle ) ;
}
connection_state - > timer_handle =
RunAfter ( timeout , [ this , connection_state ] ( ) {
grpc_core : : MutexLock lock ( & connection_state - > mu ) ;
if ( CancelConnectFromDeadlineTimer ( connection_state . get ( ) ) ) {
connection_state - > on_connected_user_callback (
absl : : DeadlineExceededError ( " Connection timed out " ) ) ;
}
// else: The connection attempt could not be canceled. We can assume the
// connection callback will be called.
} ) ;
connection_state - > socket - > NotifyOnWrite ( connection_state - > on_connected ) ;
return connection_state - > connection_handle ;
}
bool WindowsEventEngine : : CancelConnect ( EventEngine : : ConnectionHandle handle ) {
if ( TaskHandleComparator < ConnectionHandle > : : Eq ( ) ( handle ,
invalid_connection_handle ) ) {
GRPC_EVENT_ENGINE_TRACE ( " %s " ,
" Attempted to cancel an invalid connection handle " ) ;
return false ;
}
// Erase the connection handle, which may be unknown
{
grpc_core : : MutexLock lock ( & connection_mu_ ) ;
if ( ! known_connection_handles_ . contains ( handle ) ) {
GRPC_EVENT_ENGINE_TRACE (
" Unknown connection handle: %s " ,
HandleToString < EventEngine : : ConnectionHandle > ( handle ) . c_str ( ) ) ;
return false ;
}
known_connection_handles_ . erase ( handle ) ;
}
auto * connection_state = reinterpret_cast < ConnectionState * > ( handle . keys [ 0 ] ) ;
grpc_core : : MutexLock state_lock ( & connection_state - > mu ) ;
if ( ! Cancel ( connection_state - > timer_handle ) ) return false ;
return CancelConnectInternalStateLocked ( connection_state ) ;
}
bool WindowsEventEngine : : CancelConnectFromDeadlineTimer (
ConnectionState * connection_state ) {
// Erase the connection handle, which is guaranteed to exist.
{
grpc_core : : MutexLock lock ( & connection_mu_ ) ;
GPR_ASSERT ( known_connection_handles_ . erase (
connection_state - > connection_handle ) = = 1 ) ;
}
return CancelConnectInternalStateLocked ( connection_state ) ;
}
bool WindowsEventEngine : : CancelConnectInternalStateLocked (
ConnectionState * connection_state ) {
connection_state - > socket - > MaybeShutdown (
absl : : CancelledError ( " CancelConnect " ) ) ;
// Release the connection_state shared_ptr. connection_state is now invalid.
delete connection_state - > on_connected ;
GRPC_EVENT_ENGINE_TRACE ( " Successfully cancelled connection %s " ,
HandleToString < EventEngine : : ConnectionHandle > (
connection_state - > connection_handle )
. c_str ( ) ) ;
return true ;
}
absl : : StatusOr < std : : unique_ptr < EventEngine : : Listener > >