@ -29,6 +29,7 @@
# include <ares.h>
# include <ares.h>
# include "absl/functional/any_invocable.h"
# include "absl/functional/any_invocable.h"
# include "absl/log/check.h"
# include "absl/strings/str_format.h"
# include "absl/strings/str_format.h"
# include <grpc/support/alloc.h>
# include <grpc/support/alloc.h>
@ -136,8 +137,8 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
GetName ( ) , shutdown_called_ ) ;
GetName ( ) , shutdown_called_ ) ;
CSliceUnref ( read_buf_ ) ;
CSliceUnref ( read_buf_ ) ;
CSliceUnref ( write_buf_ ) ;
CSliceUnref ( write_buf_ ) ;
GPR_ASSERT ( read_closure_ = = nullptr ) ;
CHECK_EQ ( read_closure_ , nullptr ) ;
GPR_ASSERT ( write_closure_ = = nullptr ) ;
CHECK_EQ ( write_closure_ , nullptr ) ;
if ( ! shutdown_called_ ) {
if ( ! shutdown_called_ ) {
// This can happen if the socket was never seen by grpc ares wrapper
// This can happen if the socket was never seen by grpc ares wrapper
// code, i.e. if we never started I/O polling on it.
// code, i.e. if we never started I/O polling on it.
@ -157,16 +158,16 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
}
}
void RegisterForOnReadableLocked ( grpc_closure * read_closure ) override {
void RegisterForOnReadableLocked ( grpc_closure * read_closure ) override {
GPR_ASSERT ( read_closure_ = = nullptr ) ;
CHECK_EQ ( read_closure_ , nullptr ) ;
read_closure_ = read_closure ;
read_closure_ = read_closure ;
GPR_ASSERT ( GRPC_SLICE_LENGTH ( read_buf_ ) = = 0 ) ;
CHECK_EQ ( GRPC_SLICE_LENGTH ( read_buf_ ) , 0 ) ;
CSliceUnref ( read_buf_ ) ;
CSliceUnref ( read_buf_ ) ;
GPR_ASSERT ( ! read_buf_has_data_ ) ;
CHECK ( ! read_buf_has_data_ ) ;
read_buf_ = GRPC_SLICE_MALLOC ( 4192 ) ;
read_buf_ = GRPC_SLICE_MALLOC ( 4192 ) ;
if ( connect_done_ ) {
if ( connect_done_ ) {
ContinueRegisterForOnReadableLocked ( ) ;
ContinueRegisterForOnReadableLocked ( ) ;
} else {
} else {
GPR_ASSERT ( pending_continue_register_for_on_readable_locked_ = = false ) ;
CHECK ( pending_continue_register_for_on_readable_locked_ = = false ) ;
pending_continue_register_for_on_readable_locked_ = true ;
pending_continue_register_for_on_readable_locked_ = true ;
}
}
}
}
@ -176,7 +177,7 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
" fd:|%s| ContinueRegisterForOnReadableLocked "
" fd:|%s| ContinueRegisterForOnReadableLocked "
" wsa_connect_error_:%d " ,
" wsa_connect_error_:%d " ,
GetName ( ) , wsa_connect_error_ ) ;
GetName ( ) , wsa_connect_error_ ) ;
GPR_ASSERT ( connect_done_ ) ;
CHECK ( connect_done_ ) ;
if ( wsa_connect_error_ ! = 0 ) {
if ( wsa_connect_error_ ! = 0 ) {
ScheduleAndNullReadClosure ( GRPC_WSA_ERROR ( wsa_connect_error_ , " connect " ) ) ;
ScheduleAndNullReadClosure ( GRPC_WSA_ERROR ( wsa_connect_error_ , " connect " ) ) ;
return ;
return ;
@ -212,16 +213,16 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
GRPC_CARES_TRACE_LOG ( " fd:|%s| RegisterForOnWriteableLocked called " ,
GRPC_CARES_TRACE_LOG ( " fd:|%s| RegisterForOnWriteableLocked called " ,
GetName ( ) ) ;
GetName ( ) ) ;
} else {
} else {
GPR_ASSERT ( socket_type_ = = SOCK_STREAM ) ;
CHECK ( socket_type_ = = SOCK_STREAM ) ;
GRPC_CARES_TRACE_LOG (
GRPC_CARES_TRACE_LOG (
" fd:|%s| RegisterForOnWriteableLocked called tcp_write_state_: %d "
" fd:|%s| RegisterForOnWriteableLocked called tcp_write_state_: %d "
" connect_done_: %d " ,
" connect_done_: %d " ,
GetName ( ) , tcp_write_state_ , connect_done_ ) ;
GetName ( ) , tcp_write_state_ , connect_done_ ) ;
}
}
GPR_ASSERT ( write_closure_ = = nullptr ) ;
CHECK_EQ ( write_closure_ , nullptr ) ;
write_closure_ = write_closure ;
write_closure_ = write_closure ;
if ( ! connect_done_ ) {
if ( ! connect_done_ ) {
GPR_ASSERT ( ! pending_continue_register_for_on_writeable_locked_ ) ;
CHECK ( ! pending_continue_register_for_on_writeable_locked_ ) ;
pending_continue_register_for_on_writeable_locked_ = true ;
pending_continue_register_for_on_writeable_locked_ = true ;
// Register an async OnTcpConnect callback here rather than when the
// Register an async OnTcpConnect callback here rather than when the
// connect was initiated, since we are now guaranteed to hold a ref of the
// connect was initiated, since we are now guaranteed to hold a ref of the
@ -237,7 +238,7 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
" fd:|%s| ContinueRegisterForOnWriteableLocked "
" fd:|%s| ContinueRegisterForOnWriteableLocked "
" wsa_connect_error_:%d " ,
" wsa_connect_error_:%d " ,
GetName ( ) , wsa_connect_error_ ) ;
GetName ( ) , wsa_connect_error_ ) ;
GPR_ASSERT ( connect_done_ ) ;
CHECK ( connect_done_ ) ;
if ( wsa_connect_error_ ! = 0 ) {
if ( wsa_connect_error_ ! = 0 ) {
ScheduleAndNullWriteClosure (
ScheduleAndNullWriteClosure (
GRPC_WSA_ERROR ( wsa_connect_error_ , " connect " ) ) ;
GRPC_WSA_ERROR ( wsa_connect_error_ , " connect " ) ) ;
@ -246,7 +247,7 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
if ( socket_type_ = = SOCK_DGRAM ) {
if ( socket_type_ = = SOCK_DGRAM ) {
ScheduleAndNullWriteClosure ( absl : : OkStatus ( ) ) ;
ScheduleAndNullWriteClosure ( absl : : OkStatus ( ) ) ;
} else {
} else {
GPR_ASSERT ( socket_type_ = = SOCK_STREAM ) ;
CHECK ( socket_type_ = = SOCK_STREAM ) ;
int wsa_error_code = 0 ;
int wsa_error_code = 0 ;
switch ( tcp_write_state_ ) {
switch ( tcp_write_state_ ) {
case WRITE_IDLE :
case WRITE_IDLE :
@ -272,7 +273,7 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
bool IsFdStillReadableLocked ( ) override { return read_buf_has_data_ ; }
bool IsFdStillReadableLocked ( ) override { return read_buf_has_data_ ; }
void ShutdownLocked ( grpc_error_handle /* error */ ) override {
void ShutdownLocked ( grpc_error_handle /* error */ ) override {
GPR_ASSERT ( ! shutdown_called_ ) ;
CHECK ( ! shutdown_called_ ) ;
shutdown_called_ = true ;
shutdown_called_ = true ;
on_shutdown_locked_ ( ) ;
on_shutdown_locked_ ( ) ;
grpc_winsocket_shutdown ( winsocket_ ) ;
grpc_winsocket_shutdown ( winsocket_ ) ;
@ -308,7 +309,7 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
// c-ares overloads this recv_from virtual socket function to receive
// c-ares overloads this recv_from virtual socket function to receive
// data on both UDP and TCP sockets, and from is nullptr for TCP.
// data on both UDP and TCP sockets, and from is nullptr for TCP.
if ( from ! = nullptr ) {
if ( from ! = nullptr ) {
GPR_ASSERT ( * from_len < = recv_from_source_addr_len_ ) ;
CHECK ( * from_len < = recv_from_source_addr_len_ ) ;
memcpy ( from , & recv_from_source_addr_ , recv_from_source_addr_len_ ) ;
memcpy ( from , & recv_from_source_addr_ , recv_from_source_addr_len_ ) ;
* from_len = recv_from_source_addr_len_ ;
* from_len = recv_from_source_addr_len_ ;
}
}
@ -377,7 +378,7 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
// Therefore, the sendv handler for UDP sockets must only attempt
// Therefore, the sendv handler for UDP sockets must only attempt
// to write everything inline.
// to write everything inline.
GRPC_CARES_TRACE_LOG ( " fd:|%s| SendVUDP called " , GetName ( ) ) ;
GRPC_CARES_TRACE_LOG ( " fd:|%s| SendVUDP called " , GetName ( ) ) ;
GPR_ASSERT ( GRPC_SLICE_LENGTH ( write_buf_ ) = = 0 ) ;
CHECK_EQ ( GRPC_SLICE_LENGTH ( write_buf_ ) , 0 ) ;
CSliceUnref ( write_buf_ ) ;
CSliceUnref ( write_buf_ ) ;
write_buf_ = FlattenIovec ( iov , iov_count ) ;
write_buf_ = FlattenIovec ( iov , iov_count ) ;
DWORD bytes_sent = 0 ;
DWORD bytes_sent = 0 ;
@ -410,7 +411,7 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
switch ( tcp_write_state_ ) {
switch ( tcp_write_state_ ) {
case WRITE_IDLE :
case WRITE_IDLE :
tcp_write_state_ = WRITE_REQUESTED ;
tcp_write_state_ = WRITE_REQUESTED ;
GPR_ASSERT ( GRPC_SLICE_LENGTH ( write_buf_ ) = = 0 ) ;
CHECK_EQ ( GRPC_SLICE_LENGTH ( write_buf_ ) , 0 ) ;
CSliceUnref ( write_buf_ ) ;
CSliceUnref ( write_buf_ ) ;
write_buf_ = FlattenIovec ( iov , iov_count ) ;
write_buf_ = FlattenIovec ( iov , iov_count ) ;
wsa_error_ctx - > SetWSAError ( WSAEWOULDBLOCK ) ;
wsa_error_ctx - > SetWSAError ( WSAEWOULDBLOCK ) ;
@ -426,11 +427,11 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
// send again. If c-ares still needs to send even more data, we'll get
// send again. If c-ares still needs to send even more data, we'll get
// to it eventually.
// to it eventually.
grpc_slice currently_attempted = FlattenIovec ( iov , iov_count ) ;
grpc_slice currently_attempted = FlattenIovec ( iov , iov_count ) ;
GPR_ASSERT ( GRPC_SLICE_LENGTH ( currently_attempted ) > =
CHECK ( GRPC_SLICE_LENGTH ( currently_attempted ) > =
GRPC_SLICE_LENGTH ( write_buf_ ) ) ;
GRPC_SLICE_LENGTH ( write_buf_ ) ) ;
ares_ssize_t total_sent = 0 ;
ares_ssize_t total_sent = 0 ;
for ( size_t i = 0 ; i < GRPC_SLICE_LENGTH ( write_buf_ ) ; i + + ) {
for ( size_t i = 0 ; i < GRPC_SLICE_LENGTH ( write_buf_ ) ; i + + ) {
GPR_ASSERT ( GRPC_SLICE_START_PTR ( currently_attempted ) [ i ] = =
CHECK ( GRPC_SLICE_START_PTR ( currently_attempted ) [ i ] = =
GRPC_SLICE_START_PTR ( write_buf_ ) [ i ] ) ;
GRPC_SLICE_START_PTR ( write_buf_ ) [ i ] ) ;
total_sent + + ;
total_sent + + ;
}
}
@ -456,9 +457,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
GetName ( ) , StatusToString ( error ) . c_str ( ) ,
GetName ( ) , StatusToString ( error ) . c_str ( ) ,
pending_continue_register_for_on_readable_locked_ ,
pending_continue_register_for_on_readable_locked_ ,
pending_continue_register_for_on_writeable_locked_ ) ;
pending_continue_register_for_on_writeable_locked_ ) ;
GPR_ASSERT ( ! connect_done_ ) ;
CHECK ( ! connect_done_ ) ;
connect_done_ = true ;
connect_done_ = true ;
GPR_ASSERT ( wsa_connect_error_ = = 0 ) ;
CHECK_EQ ( wsa_connect_error_ , 0 ) ;
if ( ! error . ok ( ) | | shutdown_called_ ) {
if ( ! error . ok ( ) | | shutdown_called_ ) {
wsa_connect_error_ = WSA_OPERATION_ABORTED ;
wsa_connect_error_ = WSA_OPERATION_ABORTED ;
} else {
} else {
@ -468,7 +469,7 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
WSAGetOverlappedResult ( grpc_winsocket_wrapped_socket ( winsocket_ ) ,
WSAGetOverlappedResult ( grpc_winsocket_wrapped_socket ( winsocket_ ) ,
& winsocket_ - > write_info . overlapped ,
& winsocket_ - > write_info . overlapped ,
& transferred_bytes , FALSE , & flags ) ;
& transferred_bytes , FALSE , & flags ) ;
GPR_ASSERT ( transferred_bytes = = 0 ) ;
CHECK_EQ ( transferred_bytes , 0 ) ;
if ( ! wsa_success ) {
if ( ! wsa_success ) {
wsa_connect_error_ = WSAGetLastError ( ) ;
wsa_connect_error_ = WSAGetLastError ( ) ;
char * msg = gpr_format_message ( wsa_connect_error_ ) ;
char * msg = gpr_format_message ( wsa_connect_error_ ) ;
@ -502,8 +503,8 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
int ConnectUDP ( WSAErrorContext * wsa_error_ctx , const struct sockaddr * target ,
int ConnectUDP ( WSAErrorContext * wsa_error_ctx , const struct sockaddr * target ,
ares_socklen_t target_len ) {
ares_socklen_t target_len ) {
GRPC_CARES_TRACE_LOG ( " fd:%s ConnectUDP " , GetName ( ) ) ;
GRPC_CARES_TRACE_LOG ( " fd:%s ConnectUDP " , GetName ( ) ) ;
GPR_ASSERT ( ! connect_done_ ) ;
CHECK ( ! connect_done_ ) ;
GPR_ASSERT ( wsa_connect_error_ = = 0 ) ;
CHECK_EQ ( wsa_connect_error_ , 0 ) ;
SOCKET s = grpc_winsocket_wrapped_socket ( winsocket_ ) ;
SOCKET s = grpc_winsocket_wrapped_socket ( winsocket_ ) ;
int out =
int out =
WSAConnect ( s , target , target_len , nullptr , nullptr , nullptr , nullptr ) ;
WSAConnect ( s , target , target_len , nullptr , nullptr , nullptr , nullptr ) ;
@ -639,7 +640,7 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
void OnIocpWriteableLocked ( grpc_error_handle error ) {
void OnIocpWriteableLocked ( grpc_error_handle error ) {
GRPC_CARES_TRACE_LOG ( " OnIocpWriteableInner. fd:|%s| " , GetName ( ) ) ;
GRPC_CARES_TRACE_LOG ( " OnIocpWriteableInner. fd:|%s| " , GetName ( ) ) ;
GPR_ASSERT ( socket_type_ = = SOCK_STREAM ) ;
CHECK ( socket_type_ = = SOCK_STREAM ) ;
if ( error . ok ( ) ) {
if ( error . ok ( ) ) {
if ( winsocket_ - > write_info . wsa_error ! = 0 ) {
if ( winsocket_ - > write_info . wsa_error ! = 0 ) {
error = GRPC_WSA_ERROR ( winsocket_ - > write_info . wsa_error ,
error = GRPC_WSA_ERROR ( winsocket_ - > write_info . wsa_error ,
@ -651,7 +652,7 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
StatusToString ( error ) . c_str ( ) ) ;
StatusToString ( error ) . c_str ( ) ) ;
}
}
}
}
GPR_ASSERT ( tcp_write_state_ = = WRITE_PENDING ) ;
CHECK ( tcp_write_state_ = = WRITE_PENDING ) ;
if ( error . ok ( ) ) {
if ( error . ok ( ) ) {
tcp_write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY ;
tcp_write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY ;
write_buf_ = grpc_slice_sub_no_ref (
write_buf_ = grpc_slice_sub_no_ref (
@ -710,7 +711,7 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory {
GrpcPolledFd * NewGrpcPolledFdLocked (
GrpcPolledFd * NewGrpcPolledFdLocked (
ares_socket_t as , grpc_pollset_set * /* driver_pollset_set */ ) override {
ares_socket_t as , grpc_pollset_set * /* driver_pollset_set */ ) override {
auto it = sockets_ . find ( as ) ;
auto it = sockets_ . find ( as ) ;
GPR_ASSERT ( it ! = sockets_ . end ( ) ) ;
CHECK ( it ! = sockets_ . end ( ) ) ;
return it - > second ;
return it - > second ;
}
}
@ -757,7 +758,7 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory {
GRPC_CARES_TRACE_LOG (
GRPC_CARES_TRACE_LOG (
" fd:|%s| created with params af:%d type:%d protocol:%d " ,
" fd:|%s| created with params af:%d type:%d protocol:%d " ,
polled_fd - > GetName ( ) , af , type , protocol ) ;
polled_fd - > GetName ( ) , af , type , protocol ) ;
GPR_ASSERT ( self - > sockets_ . insert ( { s , polled_fd } ) . second ) ;
CHECK ( self - > sockets_ . insert ( { s , polled_fd } ) . second ) ;
return s ;
return s ;
}
}
@ -767,7 +768,7 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory {
GrpcPolledFdFactoryWindows * self =
GrpcPolledFdFactoryWindows * self =
static_cast < GrpcPolledFdFactoryWindows * > ( user_data ) ;
static_cast < GrpcPolledFdFactoryWindows * > ( user_data ) ;
auto it = self - > sockets_ . find ( as ) ;
auto it = self - > sockets_ . find ( as ) ;
GPR_ASSERT ( it ! = self - > sockets_ . end ( ) ) ;
CHECK ( it ! = self - > sockets_ . end ( ) ) ;
return it - > second - > Connect ( & wsa_error_ctx , target , target_len ) ;
return it - > second - > Connect ( & wsa_error_ctx , target , target_len ) ;
}
}
@ -777,7 +778,7 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory {
GrpcPolledFdFactoryWindows * self =
GrpcPolledFdFactoryWindows * self =
static_cast < GrpcPolledFdFactoryWindows * > ( user_data ) ;
static_cast < GrpcPolledFdFactoryWindows * > ( user_data ) ;
auto it = self - > sockets_ . find ( as ) ;
auto it = self - > sockets_ . find ( as ) ;
GPR_ASSERT ( it ! = self - > sockets_ . end ( ) ) ;
CHECK ( it ! = self - > sockets_ . end ( ) ) ;
return it - > second - > SendV ( & wsa_error_ctx , iov , iovec_count ) ;
return it - > second - > SendV ( & wsa_error_ctx , iov , iovec_count ) ;
}
}
@ -788,7 +789,7 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory {
GrpcPolledFdFactoryWindows * self =
GrpcPolledFdFactoryWindows * self =
static_cast < GrpcPolledFdFactoryWindows * > ( user_data ) ;
static_cast < GrpcPolledFdFactoryWindows * > ( user_data ) ;
auto it = self - > sockets_ . find ( as ) ;
auto it = self - > sockets_ . find ( as ) ;
GPR_ASSERT ( it ! = self - > sockets_ . end ( ) ) ;
CHECK ( it ! = self - > sockets_ . end ( ) ) ;
return it - > second - > RecvFrom ( & wsa_error_ctx , data , data_len , flags , from ,
return it - > second - > RecvFrom ( & wsa_error_ctx , data , data_len , flags , from ,
from_len ) ;
from_len ) ;
}
}