@ -33,21 +33,10 @@
namespace grpc_event_engine {
namespace experimental {
// TODO(hork): The previous implementation required internal ref counting. Add
// this when it becomes necessary.
// TODO(hork): The previous implementation required a 2-phase shutdown. Add this
// when it becomes necessary.
namespace {
constexpr int64 _t kDefaultTargetReadSize = 8192 ;
constexpr size_t kDefaultTargetReadSize = 8192 ;
constexpr int kMaxWSABUFCount = 16 ;
void AbortOnEvent ( absl : : Status ) {
grpc_core : : Crash (
" INTERNAL ERROR: Asked to handle read/write event with an invalid "
" callback " ) ;
}
} // namespace
WindowsEndpoint : : WindowsEndpoint (
@ -55,15 +44,13 @@ WindowsEndpoint::WindowsEndpoint(
std : : unique_ptr < WinSocket > socket , MemoryAllocator & & allocator ,
const EndpointConfig & /* config */ , Executor * executor )
: peer_address_ ( peer_address ) ,
socket_ ( std : : move ( socket ) ) ,
allocator_ ( std : : move ( allocator ) ) ,
handle_read_event_ ( this ) ,
handle_write_event_ ( this ) ,
executor_ ( executor ) {
executor_ ( executor ) ,
io_state_ ( std : : make_shared < AsyncIOState > ( this , std : : move ( socket ) ) ) {
char addr [ EventEngine : : ResolvedAddress : : MAX_SIZE_BYTES ] ;
int addr_len = sizeof ( addr ) ;
if ( getsockname ( socket _- > socket ( ) , reinterpret_cast < sockaddr * > ( addr ) ,
& addr_len ) < 0 ) {
if ( getsockname ( io_state _- > socket - > raw_socket ( ) ,
reinterpret_cast < sockaddr * > ( addr ) , & addr_len ) < 0 ) {
grpc_core : : Crash ( absl : : StrFormat (
" Unrecoverable error: Failed to get local socket name. %s " ,
GRPC_WSA_ERROR ( WSAGetLastError ( ) , " getsockname " ) . ToString ( ) . c_str ( ) ) ) ;
@ -75,50 +62,67 @@ WindowsEndpoint::WindowsEndpoint(
}
WindowsEndpoint : : ~ WindowsEndpoint ( ) {
socket_ - > MaybeShutdown ( absl : : OkStatus ( ) ) ;
GRPC_EVENT_ENGINE_ENDPOINT_TRACE ( " WindowsEndpoint::%p destoyed " , this ) ;
}
void WindowsEndpoint : : Read ( absl : : AnyInvocable < void ( absl : : Status ) > on_read ,
SliceBuffer * buffer , const ReadArgs * args ) {
// TODO(hork): last_read_buffer from iomgr: Is it only garbage, or optimized?
SliceBuffer * buffer , const ReadArgs * /* args */ ) {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE ( " WindowsEndpoint::%p reading " , this ) ;
if ( io_state_ - > socket - > IsShutdown ( ) ) {
executor_ - > Run ( [ on_read = std : : move ( on_read ) ] ( ) mutable {
on_read ( absl : : UnavailableError ( " Socket is shutting down. " ) ) ;
} ) ;
return ;
}
// Prepare the WSABUF struct
WSABUF wsa_buffers [ kMaxWSABUFCount ] ;
int min_read_size = kDefaultTargetReadSize ;
if ( args ! = nullptr & & args - > read_hint_bytes > 0 ) {
min_read_size = args - > read_hint_bytes ;
}
// TODO(hork): introduce a last_read_buffer to save unused sliced.
buffer - > Clear ( ) ;
// TODO(hork): sometimes args->read_hint_bytes is 1, which is not useful.
// Choose an appropriate size.
size_t min_read_size = kDefaultTargetReadSize ;
if ( buffer - > Length ( ) < min_read_size & & buffer - > Count ( ) < kMaxWSABUFCount ) {
buffer - > AppendIndexed ( Slice ( allocator_ . MakeSlice ( min_read_size ) ) ) ;
}
GPR_ASSERT ( buffer - > Count ( ) < = kMaxWSABUFCount ) ;
for ( in t i = 0 ; i < buffer - > Count ( ) ; i + + ) {
Slice tmp = buffer - > RefSlice ( i ) ;
wsa_buffers [ i ] . buf = ( char * ) tmp . begin ( ) ;
wsa_buffers [ i ] . len = tmp . size ( ) ;
for ( size_ t i = 0 ; i < buffer - > Count ( ) ; i + + ) {
auto & slice = buffer - > MutableSliceAt ( i ) ;
wsa_buffers [ i ] . buf = ( char * ) slice . begin ( ) ;
wsa_buffers [ i ] . len = slice . size ( ) ;
}
DWORD bytes_read = 0 ;
DWORD flags = 0 ;
// First let's try a synchronous, non-blocking read.
int status = WSARecv ( socket_ - > socket ( ) , wsa_buffers , ( DWORD ) buffer - > Count ( ) ,
& bytes_read , & flags , nullptr , nullptr ) ;
int status =
WSARecv ( io_state_ - > socket - > raw_socket ( ) , wsa_buffers ,
( DWORD ) buffer - > Count ( ) , & bytes_read , & flags , nullptr , nullptr ) ;
int wsa_error = status = = 0 ? 0 : WSAGetLastError ( ) ;
// Did we get data immediately ? Yay.
if ( wsa_error ! = WSAEWOULDBLOCK ) {
io_state_ - > socket - > read_info ( ) - > SetResult (
{ /*wsa_error=*/ wsa_error , /*bytes_read=*/ bytes_read } ) ;
absl : : Status result ;
if ( bytes_read = = 0 ) {
result = absl : : UnavailableError ( " End of TCP stream " ) ;
grpc_core : : StatusSetInt ( & result , grpc_core : : StatusIntProperty : : kRpcStatus ,
GRPC_STATUS_UNAVAILABLE ) ;
buffer - > Clear ( ) ;
} else {
result = absl : : OkStatus ( ) ;
// prune slicebuffer
if ( bytes_read ! = buffer - > Length ( ) ) {
buffer - > RemoveLastNBytes ( buffer - > Length ( ) - bytes_read ) ;
}
executor_ - > Run ( [ on_read = std : : move ( on_read ) ] ( ) mutable {
on_read ( absl : : OkStatus ( ) ) ;
} ) ;
}
executor_ - > Run (
[ result , on_read = std : : move ( on_read ) ] ( ) mutable { on_read ( result ) ; } ) ;
return ;
}
// Otherwise, let's retry, by queuing a read.
memset ( socket_ - > read_info ( ) - > overlapped ( ) , 0 , sizeof ( OVERLAPPED ) ) ;
status =
WSARecv ( socket_ - > socket ( ) , wsa_buffers , ( DWORD ) buffer - > Count ( ) ,
& bytes_read , & flags , socket_ - > read_info ( ) - > overlapped ( ) , nullptr ) ;
memset ( io_state_ - > socket - > read_info ( ) - > overlapped ( ) , 0 , sizeof ( OVERLAPPED ) ) ;
status = WSARecv ( io_state_ - > socket - > raw_socket ( ) , wsa_buffers ,
( DWORD ) buffer - > Count ( ) , & bytes_read , & flags ,
io_state_ - > socket - > read_info ( ) - > overlapped ( ) , nullptr ) ;
wsa_error = status = = 0 ? 0 : WSAGetLastError ( ) ;
if ( wsa_error ! = 0 & & wsa_error ! = WSA_IO_PENDING ) {
// Async read returned immediately with an error
@ -129,16 +133,21 @@ void WindowsEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
} ) ;
return ;
}
handle_read_event_ . Prime ( buffer , std : : move ( on_read ) ) ;
socket_ - > NotifyOnRead ( & handle_read_event_ ) ;
io_state_ - > handle_read_event . Prime ( io_state_ , buffer , std : : move ( on_read ) ) ;
io_state_ - > socket - > NotifyOnRead ( & io_state_ - > handle_read_event ) ;
}
void WindowsEndpoint : : Write ( absl : : AnyInvocable < void ( absl : : Status ) > on_writable ,
SliceBuffer * data , const WriteArgs * /* args */ ) {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE ( " WindowsEndpoint::%p writing " , this ) ;
if ( io_state_ - > socket - > IsShutdown ( ) ) {
executor_ - > Run ( [ on_writable = std : : move ( on_writable ) ] ( ) mutable {
on_writable ( absl : : UnavailableError ( " Socket is shutting down. " ) ) ;
} ) ;
return ;
}
if ( grpc_event_engine_endpoint_data_trace . enabled ( ) ) {
for ( int i = 0 ; i < data - > Count ( ) ; i + + ) {
for ( size_ t i = 0 ; i < data - > Count ( ) ; i + + ) {
auto str = data - > RefSlice ( i ) . as_string_view ( ) ;
gpr_log ( GPR_INFO , " WindowsEndpoint::%p WRITE (peer=%s): %.*s " , this ,
peer_address_string_ . c_str ( ) , str . length ( ) , str . data ( ) ) ;
@ -146,16 +155,16 @@ void WindowsEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
}
GPR_ASSERT ( data - > Count ( ) < = UINT_MAX ) ;
absl : : InlinedVector < WSABUF , kMaxWSABUFCount > buffers ( data - > Count ( ) ) ;
for ( in t i = 0 ; i < data - > Count ( ) ; i + + ) {
auto slice = data - > RefSlice ( i ) ;
for ( size_ t i = 0 ; i < data - > Count ( ) ; i + + ) {
auto & slice = data - > MutableSliceAt ( i ) ;
GPR_ASSERT ( slice . size ( ) < = ULONG_MAX ) ;
buffers [ i ] . len = slice . size ( ) ;
buffers [ i ] . buf = ( char * ) slice . begin ( ) ;
}
// First, let's try a synchronous, non-blocking write.
DWORD bytes_sent ;
int status = WSASend ( socket _- > socket ( ) , buffers . data ( ) , ( DWORD ) buffers . size ( ) ,
& bytes_sent , 0 , nullptr , nullptr ) ;
int status = WSASend ( io_state _- > socket - > raw_socket ( ) , buffers . data ( ) ,
( DWORD ) buffers . size ( ) , & bytes_sent , 0 , nullptr , nullptr ) ;
size_t async_buffers_offset = 0 ;
if ( status = = 0 ) {
if ( bytes_sent = = data - > Length ( ) ) {
@ -166,7 +175,7 @@ void WindowsEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
}
// The data was not completely delivered, we should send the rest of it by
// doing an async write operation.
for ( in t i = 0 ; i < data - > Count ( ) ; i + + ) {
for ( size_ t i = 0 ; i < data - > Count ( ) ; i + + ) {
if ( buffers [ i ] . len > bytes_sent ) {
buffers [ i ] . buf + = bytes_sent ;
buffers [ i ] . len - = bytes_sent ;
@ -187,9 +196,10 @@ void WindowsEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
return ;
}
}
auto write_info = socket_ - > write_info ( ) ;
auto write_info = io_state_ - > socket - > write_info ( ) ;
memset ( write_info - > overlapped ( ) , 0 , sizeof ( OVERLAPPED ) ) ;
status = WSASend ( socket_ - > socket ( ) , & buffers [ async_buffers_offset ] ,
status =
WSASend ( io_state_ - > socket - > raw_socket ( ) , & buffers [ async_buffers_offset ] ,
( DWORD ) ( data - > Count ( ) - async_buffers_offset ) , nullptr , 0 ,
write_info - > overlapped ( ) , nullptr ) ;
@ -204,8 +214,8 @@ void WindowsEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
}
// As all is now setup, we can now ask for the IOCP notification. It may
// trigger the callback immediately however, but no matter.
handle_write_event_ . Prime ( data , std : : move ( on_writable ) ) ;
socket_ - > NotifyOnWrite ( & handle_write_event_ ) ;
io_state_ - > handle_write_event . Prime ( io_state_ , data , std : : move ( on_writable ) ) ;
io_state_ - > socket - > NotifyOnWrite ( & io_state_ - > handle_write_event ) ;
}
const EventEngine : : ResolvedAddress & WindowsEndpoint : : GetPeerAddress ( ) const {
return peer_address_ ;
@ -214,38 +224,70 @@ const EventEngine::ResolvedAddress& WindowsEndpoint::GetLocalAddress() const {
return local_address_ ;
}
// ---- Handle{Read|Write}Closure
// ---- Handle{Read|Write}Closure ----
namespace {
void AbortOnEvent ( absl : : Status ) {
grpc_core : : Crash (
" INTERNAL ERROR: Asked to handle read/write event with an invalid "
" callback " ) ;
}
} // namespace
void WindowsEndpoint : : HandleReadClosure : : Reset ( ) {
cb_ = & AbortOnEvent ;
buffer_ = nullptr ;
}
void WindowsEndpoint : : HandleWriteClosure : : Reset ( ) {
cb_ = & AbortOnEvent ;
buffer_ = nullptr ;
}
void WindowsEndpoint : : HandleReadClosure : : Prime (
std : : shared_ptr < AsyncIOState > io_state , SliceBuffer * buffer ,
absl : : AnyInvocable < void ( absl : : Status ) > cb ) {
io_state_ = std : : move ( io_state ) ;
cb_ = std : : move ( cb ) ;
buffer_ = buffer ;
}
WindowsEndpoint : : BaseEventClosure : : BaseEventClosure ( WindowsEndpoint * endpoint )
: cb_ ( & AbortOnEvent ) , endpoint_ ( endpoint ) { }
void WindowsEndpoint : : HandleWriteClosure : : Prime (
std : : shared_ptr < AsyncIOState > io_state , SliceBuffer * buffer ,
absl : : AnyInvocable < void ( absl : : Status ) > cb ) {
io_state_ = std : : move ( io_state ) ;
cb_ = std : : move ( cb ) ;
buffer_ = buffer ;
}
void WindowsEndpoint : : HandleReadClosure : : Run ( ) {
// Deletes the shared_ptr when this closure returns
auto io_state = std : : move ( io_state_ ) ;
GRPC_EVENT_ENGINE_ENDPOINT_TRACE ( " WindowsEndpoint::%p Handling Read Event " ,
endpoint_ ) ;
io_state - > endpoint ) ;
absl : : Status status ;
auto * read_info = endpoint_ - > socket_ - > read_info ( ) ;
auto cb_cleanup = absl : : MakeCleanup ( [ this , & status ] ( ) {
auto cb = std : : move ( cb_ ) ;
cb_ = & AbortOnEvent ;
Reset ( ) ;
cb ( status ) ;
} ) ;
if ( read_info - > wsa_error ( ) ! = 0 ) {
status = GRPC_WSA_ERROR ( read_info - > wsa_error ( ) , " Async Read Error " ) ;
const auto result = io_state - > socket - > read_info ( ) - > result ( ) ;
if ( result . wsa_error ! = 0 ) {
status = GRPC_WSA_ERROR ( result . wsa_error , " Async Read Error " ) ;
buffer_ - > Clear ( ) ;
return ;
}
if ( read_info - > bytes_transferred ( ) > 0 ) {
GPR_ASSERT ( read_info - > bytes_transferred ( ) < = buffer_ - > Length ( ) ) ;
if ( read_info - > bytes_transferred ( ) ! = buffer_ - > Length ( ) ) {
buffer_ - > RemoveLastNBytes ( buffer_ - > Length ( ) -
read_info - > bytes_transferred ( ) ) ;
if ( result . bytes_transferred > 0 ) {
GPR_ASSERT ( result . bytes_transferred < = buffer_ - > Length ( ) ) ;
if ( result . bytes_transferred ! = buffer_ - > Length ( ) ) {
buffer_ - > RemoveLastNBytes ( buffer_ - > Length ( ) - result . bytes_transferred ) ;
}
GPR_ASSERT ( read_info - > bytes_transferred ( ) = = buffer_ - > Length ( ) ) ;
GPR_ASSERT ( result . bytes_transferred = = buffer_ - > Length ( ) ) ;
if ( grpc_event_engine_endpoint_data_trace . enabled ( ) ) {
for ( in t i = 0 ; i < buffer_ - > Count ( ) ; i + + ) {
for ( size_ t i = 0 ; i < buffer_ - > Count ( ) ; i + + ) {
auto str = buffer_ - > RefSlice ( i ) . as_string_view ( ) ;
gpr_log ( GPR_INFO , " WindowsEndpoint::%p READ (peer=%s): %.*s " , this ,
endpoint_ - > peer_address_string_ . c_str ( ) , str . length ( ) ,
gpr_log ( GPR_INFO , " WindowsEndpoint::%p READ (peer=%s): %.*s " ,
io_state - > endpoint ,
io_state - > endpoint - > peer_address_string_ . c_str ( ) , str . length ( ) ,
str . data ( ) ) ;
}
}
@ -253,25 +295,36 @@ void WindowsEndpoint::HandleReadClosure::Run() {
}
// Either the endpoint is shut down or we've seen the end of the stream
buffer_ - > Clear ( ) ;
// TODO(hork): different error message if shut down
status = absl : : UnavailableError ( " End of TCP stream " ) ;
}
void WindowsEndpoint : : HandleWriteClosure : : Run ( ) {
// Deletes the shared_ptr when this closure returns
auto io_state = std : : move ( io_state_ ) ;
GRPC_EVENT_ENGINE_ENDPOINT_TRACE ( " WindowsEndpoint::%p Handling Write Event " ,
endpoint_ ) ;
auto * write_info = endpoint_ - > socket_ - > write_info ( ) ;
io_state_ - > endpoint ) ;
auto cb = std : : move ( cb_ ) ;
cb_ = & AbortOnEvent ;
const auto result = io_state - > socket - > write_info ( ) - > result ( ) ;
Reset ( ) ;
absl : : Status status ;
if ( write_info - > wsa_error ( ) ! = 0 ) {
status = GRPC_WSA_ERROR ( write_info - > wsa_error ( ) , " WSASend " ) ;
if ( result . wsa_error ! = 0 ) {
status = GRPC_WSA_ERROR ( result . wsa_error , " WSASend " ) ;
} else {
GPR_ASSERT ( write_info - > bytes_transferred ( ) = = buffer_ - > Length ( ) ) ;
GPR_ASSERT ( result . bytes_transferred = = buffer_ - > Length ( ) ) ;
}
cb ( status ) ;
}
// ---- AsyncIOState ----
WindowsEndpoint : : AsyncIOState : : AsyncIOState ( WindowsEndpoint * endpoint ,
std : : unique_ptr < WinSocket > socket )
: endpoint ( endpoint ) , socket ( std : : move ( socket ) ) { }
WindowsEndpoint : : AsyncIOState : : ~ AsyncIOState ( ) {
socket - > Shutdown ( DEBUG_LOCATION , " ~AsyncIOState " ) ;
}
} // namespace experimental
} // namespace grpc_event_engine