@ -31,6 +31,9 @@
# include "src/core/lib/gpr/string.h"
# include "src/core/lib/gprpp/memory.h"
# include "src/core/lib/iomgr/combiner.h"
# include "src/core/lib/iomgr/iocp_windows.h"
# include "src/core/lib/iomgr/sockaddr_utils.h"
# include "src/core/lib/iomgr/sockaddr_windows.h"
# include "src/core/lib/iomgr/socket_windows.h"
# include "src/core/lib/iomgr/tcp_windows.h"
# include "src/core/lib/slice/slice_internal.h"
@ -50,6 +53,32 @@ struct iovec {
namespace grpc_core {
/* c-ares reads and takes action on the error codes of the
* " virtual socket operations " in this file , via the WSAGetLastError
* APIs . If code in this file wants to set a specific WSA error that
* c - ares should read , it must do so by calling SetWSAError ( ) on the
* WSAErrorContext instance passed to it . A WSAErrorContext must only be
* instantiated at the top of the virtual socket function callstack . */
class WSAErrorContext {
public :
explicit WSAErrorContext ( ) { } ;
~ WSAErrorContext ( ) {
if ( error_ ! = 0 ) {
WSASetLastError ( error_ ) ;
}
}
/* Disallow copy and assignment operators */
WSAErrorContext ( const WSAErrorContext & ) = delete ;
WSAErrorContext & operator = ( const WSAErrorContext & ) = delete ;
void SetWSAError ( int error ) { error_ = error ; }
private :
int error_ = 0 ;
} ;
/* c-ares creates its own sockets and is meant to read them when readable and
* write them when writeable . To fit this socket usage model into the grpc
* windows poller ( which gives notifications when attempted reads and writes are
@ -68,11 +97,14 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY ,
} ;
GrpcPolledFdWindows ( ares_socket_t as , grpc_combiner * combiner )
GrpcPolledFdWindows ( ares_socket_t as , grpc_combiner * combiner ,
int address_family , int socket_type )
: read_buf_ ( grpc_empty_slice ( ) ) ,
write_buf_ ( grpc_empty_slice ( ) ) ,
write_state_ ( WRITE_IDLE ) ,
gotten_into_driver_list_ ( false ) {
tcp_write_state_ ( WRITE_IDLE ) ,
gotten_into_driver_list_ ( false ) ,
address_family_ ( address_family ) ,
socket_type_ ( socket_type ) {
gpr_asprintf ( & name_ , " c-ares socket: % " PRIdPTR , as ) ;
winsocket_ = grpc_winsocket_create ( as , name_ ) ;
combiner_ = GRPC_COMBINER_REF ( combiner , name_ ) ;
@ -82,6 +114,16 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
GRPC_CLOSURE_INIT ( & outer_write_closure_ ,
& GrpcPolledFdWindows : : OnIocpWriteable , this ,
grpc_combiner_scheduler ( combiner_ ) ) ;
GRPC_CLOSURE_INIT ( & on_tcp_connect_locked_ ,
& GrpcPolledFdWindows : : OnTcpConnectLocked , this ,
grpc_combiner_scheduler ( combiner_ ) ) ;
GRPC_CLOSURE_INIT ( & continue_register_for_on_readable_locked_ ,
& GrpcPolledFdWindows : : ContinueRegisterForOnReadableLocked ,
this , grpc_combiner_scheduler ( combiner_ ) ) ;
GRPC_CLOSURE_INIT (
& continue_register_for_on_writeable_locked_ ,
& GrpcPolledFdWindows : : ContinueRegisterForOnWriteableLocked , this ,
grpc_combiner_scheduler ( combiner_ ) ) ;
}
~ GrpcPolledFdWindows ( ) {
@ -111,6 +153,33 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
grpc_slice_unref_internal ( read_buf_ ) ;
GPR_ASSERT ( ! read_buf_has_data_ ) ;
read_buf_ = GRPC_SLICE_MALLOC ( 4192 ) ;
if ( connect_done_ ) {
GRPC_CLOSURE_SCHED ( & continue_register_for_on_readable_locked_ ,
GRPC_ERROR_NONE ) ;
} else {
GPR_ASSERT ( pending_continue_register_for_on_readable_locked_ = = nullptr ) ;
pending_continue_register_for_on_readable_locked_ =
& continue_register_for_on_readable_locked_ ;
}
}
static void ContinueRegisterForOnReadableLocked ( void * arg ,
grpc_error * unused_error ) {
GrpcPolledFdWindows * grpc_polled_fd =
static_cast < GrpcPolledFdWindows * > ( arg ) ;
grpc_polled_fd - > InnerContinueRegisterForOnReadableLocked ( GRPC_ERROR_NONE ) ;
}
void InnerContinueRegisterForOnReadableLocked ( grpc_error * unused_error ) {
GRPC_CARES_TRACE_LOG (
" fd:|%s| InnerContinueRegisterForOnReadableLocked "
" wsa_connect_error_:%d " ,
GetName ( ) , wsa_connect_error_ ) ;
GPR_ASSERT ( connect_done_ ) ;
if ( wsa_connect_error_ ! = 0 ) {
ScheduleAndNullReadClosure ( GRPC_WSA_ERROR ( wsa_connect_error_ , " connect " ) ) ;
return ;
}
WSABUF buffer ;
buffer . buf = ( char * ) GRPC_SLICE_START_PTR ( read_buf_ ) ;
buffer . len = GRPC_SLICE_LENGTH ( read_buf_ ) ;
@ -123,13 +192,14 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
& winsocket_ - > read_info . overlapped , nullptr ) ) {
int wsa_last_error = WSAGetLastError ( ) ;
char * msg = gpr_format_message ( wsa_last_error ) ;
grpc_error * error = GRPC_ERROR_CREATE_FROM_COPIED_STRING ( msg ) ;
GRPC_CARES_TRACE_LOG (
" RegisterForOnReadableLocked: WSARecvFrom error:|%s|. fd:|%s| " , msg ,
GetName ( ) ) ;
" fd:|%s| RegisterForOnReadableLocked WSARecvFrom error code:|%d| "
" msg:|%s| " ,
GetName ( ) , wsa_last_error , msg ) ;
gpr_free ( msg ) ;
if ( wsa_last_error ! = WSA_IO_PENDING ) {
ScheduleAndNullReadClosure ( error ) ;
ScheduleAndNullReadClosure (
GRPC_WSA_ERROR ( wsa_last_error , " WSARecvFrom " ) ) ;
return ;
}
}
@ -137,23 +207,68 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
}
void RegisterForOnWriteableLocked ( grpc_closure * write_closure ) override {
GRPC_CARES_TRACE_LOG (
" RegisterForOnWriteableLocked. fd:|%s|. Current write state: %d " ,
GetName ( ) , write_state_ ) ;
if ( socket_type_ = = SOCK_DGRAM ) {
GRPC_CARES_TRACE_LOG ( " fd:|%s| RegisterForOnWriteableLocked called " ,
GetName ( ) ) ;
} else {
GPR_ASSERT ( socket_type_ = = SOCK_STREAM ) ;
GRPC_CARES_TRACE_LOG (
" fd:|%s| RegisterForOnWriteableLocked called tcp_write_state_: %d " ,
GetName ( ) , tcp_write_state_ ) ;
}
GPR_ASSERT ( write_closure_ = = nullptr ) ;
write_closure_ = write_closure ;
switch ( write_state_ ) {
case WRITE_IDLE :
ScheduleAndNullWriteClosure ( GRPC_ERROR_NONE ) ;
break ;
case WRITE_REQUESTED :
write_state_ = WRITE_PENDING ;
SendWriteBuf ( nullptr , & winsocket_ - > write_info . overlapped ) ;
grpc_socket_notify_on_write ( winsocket_ , & outer_write_closure_ ) ;
break ;
case WRITE_PENDING :
case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY :
abort ( ) ;
if ( connect_done_ ) {
GRPC_CLOSURE_SCHED ( & continue_register_for_on_writeable_locked_ ,
GRPC_ERROR_NONE ) ;
} else {
GPR_ASSERT ( pending_continue_register_for_on_writeable_locked_ = = nullptr ) ;
pending_continue_register_for_on_writeable_locked_ =
& continue_register_for_on_writeable_locked_ ;
}
}
static void ContinueRegisterForOnWriteableLocked ( void * arg ,
grpc_error * unused_error ) {
GrpcPolledFdWindows * grpc_polled_fd =
static_cast < GrpcPolledFdWindows * > ( arg ) ;
grpc_polled_fd - > InnerContinueRegisterForOnWriteableLocked ( GRPC_ERROR_NONE ) ;
}
void InnerContinueRegisterForOnWriteableLocked ( grpc_error * unused_error ) {
GRPC_CARES_TRACE_LOG (
" fd:|%s| InnerContinueRegisterForOnWriteableLocked "
" wsa_connect_error_:%d " ,
GetName ( ) , wsa_connect_error_ ) ;
GPR_ASSERT ( connect_done_ ) ;
if ( wsa_connect_error_ ! = 0 ) {
ScheduleAndNullWriteClosure (
GRPC_WSA_ERROR ( wsa_connect_error_ , " connect " ) ) ;
return ;
}
if ( socket_type_ = = SOCK_DGRAM ) {
ScheduleAndNullWriteClosure ( GRPC_ERROR_NONE ) ;
} else {
GPR_ASSERT ( socket_type_ = = SOCK_STREAM ) ;
int wsa_error_code = 0 ;
switch ( tcp_write_state_ ) {
case WRITE_IDLE :
ScheduleAndNullWriteClosure ( GRPC_ERROR_NONE ) ;
break ;
case WRITE_REQUESTED :
tcp_write_state_ = WRITE_PENDING ;
if ( SendWriteBuf ( nullptr , & winsocket_ - > write_info . overlapped ,
& wsa_error_code ) ! = 0 ) {
ScheduleAndNullWriteClosure (
GRPC_WSA_ERROR ( wsa_error_code , " WSASend (overlapped) " ) ) ;
} else {
grpc_socket_notify_on_write ( winsocket_ , & outer_write_closure_ ) ;
}
break ;
case WRITE_PENDING :
case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY :
abort ( ) ;
}
}
}
@ -171,13 +286,15 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
const char * GetName ( ) override { return name_ ; }
ares_ssize_t RecvFrom ( void * data , ares_socket_t data_len , int flags ,
ares_ssize_t RecvFrom ( WSAErrorContext * wsa_error_ctx , void * data ,
ares_socket_t data_len , int flags ,
struct sockaddr * from , ares_socklen_t * from_len ) {
GRPC_CARES_TRACE_LOG (
" RecvFrom called on fd:|%s|. Current read buf length:|%d| " , GetName ( ) ,
GRPC_SLICE_LENGTH ( read_buf_ ) ) ;
" fd:|%s| RecvFrom called read_buf_has_data:%d Current read buf "
" length:|%d| " ,
GetName ( ) , read_buf_has_data_ , GRPC_SLICE_LENGTH ( read_buf_ ) ) ;
if ( ! read_buf_has_data_ ) {
WSASetLast Error( WSAEWOULDBLOCK ) ;
wsa_error_ctx - > SetWSA Error( WSAEWOULDBLOCK ) ;
return - 1 ;
}
ares_ssize_t bytes_read = 0 ;
@ -215,54 +332,99 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
return out ;
}
int SendWriteBuf ( LPDWORD bytes_sent_ptr , LPWSAOVERLAPPED overlapped ) {
int SendWriteBuf ( LPDWORD bytes_sent_ptr , LPWSAOVERLAPPED overlapped ,
int * wsa_error_code ) {
WSABUF buf ;
buf . len = GRPC_SLICE_LENGTH ( write_buf_ ) ;
buf . buf = ( char * ) GRPC_SLICE_START_PTR ( write_buf_ ) ;
DWORD flags = 0 ;
int out = WSASend ( grpc_winsocket_wrapped_socket ( winsocket_ ) , & buf , 1 ,
bytes_sent_ptr , flags , overlapped , nullptr ) ;
* wsa_error_code = WSAGetLastError ( ) ;
GRPC_CARES_TRACE_LOG (
" WSASend: name:%s. buf len:%d. bytes sent: %d. overlapped %p. return "
" val: %d " ,
GetName ( ) , buf . len , * bytes_sent_ptr , overlapped , out ) ;
" fd:|%s| SendWriteBuf WSASend buf.len:%d *bytes_sent_ptr:%d "
" overlapped:%p "
" return:%d *wsa_error_code:%d " ,
GetName ( ) , buf . len , bytes_sent_ptr ! = nullptr ? * bytes_sent_ptr : 0 ,
overlapped , out , * wsa_error_code ) ;
return out ;
}
ares_ssize_t TrySendWriteBufSyncNonBlocking ( ) {
GPR_ASSERT ( write_state_ = = WRITE_IDLE ) ;
ares_ssize_t SendV ( WSAErrorContext * wsa_error_ctx , const struct iovec * iov ,
int iov_count ) {
GRPC_CARES_TRACE_LOG (
" fd:|%s| SendV called connect_done_:%d wsa_connect_error_:%d " ,
GetName ( ) , connect_done_ , wsa_connect_error_ ) ;
if ( ! connect_done_ ) {
wsa_error_ctx - > SetWSAError ( WSAEWOULDBLOCK ) ;
return - 1 ;
}
if ( wsa_connect_error_ ! = 0 ) {
wsa_error_ctx - > SetWSAError ( wsa_connect_error_ ) ;
return - 1 ;
}
switch ( socket_type_ ) {
case SOCK_DGRAM :
return SendVUDP ( wsa_error_ctx , iov , iov_count ) ;
case SOCK_STREAM :
return SendVTCP ( wsa_error_ctx , iov , iov_count ) ;
default :
abort ( ) ;
}
}
ares_ssize_t SendVUDP ( WSAErrorContext * wsa_error_ctx , const struct iovec * iov ,
int iov_count ) {
// c-ares doesn't handle retryable errors on writes of UDP sockets.
// Therefore, the sendv handler for UDP sockets must only attempt
// to write everything inline.
GRPC_CARES_TRACE_LOG ( " fd:|%s| SendVUDP called " , GetName ( ) ) ;
GPR_ASSERT ( GRPC_SLICE_LENGTH ( write_buf_ ) = = 0 ) ;
grpc_slice_unref_internal ( write_buf_ ) ;
write_buf_ = FlattenIovec ( iov , iov_count ) ;
DWORD bytes_sent = 0 ;
if ( SendWriteBuf ( & bytes_sent , nullptr ) ! = 0 ) {
int wsa_last_error = WSAGetLastError ( ) ;
char * msg = gpr_format_message ( wsa_last_error ) ;
int wsa_error_code = 0 ;
if ( SendWriteBuf ( & bytes_sent , nullptr , & wsa_error_code ) ! = 0 ) {
wsa_error_ctx - > SetWSAError ( wsa_error_code ) ;
char * msg = gpr_format_message ( wsa_error_code ) ;
GRPC_CARES_TRACE_LOG (
" TrySendWriteBufSyncNonBlocking: SendWriteBuf error:|%s|. fd:|%s| " ,
msg , GetName ( ) ) ;
" fd:|%s| SendVUDP SendWriteBuf error code:%d msg:|%s| " , GetName ( ) ,
wsa_error_code , msg ) ;
gpr_free ( msg ) ;
if ( wsa_last_error = = WSA_IO_PENDING ) {
WSASetLastError ( WSAEWOULDBLOCK ) ;
write_state_ = WRITE_REQUESTED ;
}
return - 1 ;
}
write_buf_ = grpc_slice_sub_no_ref ( write_buf_ , bytes_sent ,
GRPC_SLICE_LENGTH ( write_buf_ ) ) ;
return bytes_sent ;
}
ares_ssize_t SendV ( const struct iovec * iov , int iov_count ) {
GRPC_CARES_TRACE_LOG ( " SendV called on fd:|%s|. Current write state: %d " ,
GetName ( ) , write_state_ ) ;
switch ( write_state_ ) {
ares_ssize_t SendVTCP ( WSAErrorContext * wsa_error_ctx , const struct iovec * iov ,
int iov_count ) {
// The "sendv" handler on TCP sockets buffers up write
// requests and returns an artifical WSAEWOULDBLOCK. Writing that buffer out
// in the background, and making further send progress in general, will
// happen as long as c-ares continues to show interest in writeability on
// this fd.
GRPC_CARES_TRACE_LOG ( " fd:|%s| SendVTCP called tcp_write_state_:%d " ,
GetName ( ) , tcp_write_state_ ) ;
switch ( tcp_write_state_ ) {
case WRITE_IDLE :
tcp_write_state_ = WRITE_REQUESTED ;
GPR_ASSERT ( GRPC_SLICE_LENGTH ( write_buf_ ) = = 0 ) ;
grpc_slice_unref_internal ( write_buf_ ) ;
write_buf_ = FlattenIovec ( iov , iov_count ) ;
return TrySendWriteBufSyncNonBlocking ( ) ;
wsa_error_ctx - > SetWSAError ( WSAEWOULDBLOCK ) ;
return - 1 ;
case WRITE_REQUESTED :
case WRITE_PENDING :
WSASetLastError ( WSAEWOULDBLOCK ) ;
wsa_error_ctx - > SetWSA Error( WSAEWOULDBLOCK ) ;
return - 1 ;
case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY :
// c-ares is retrying a send on data that we previously returned
// WSAEWOULDBLOCK for, but then subsequently wrote out in the
// background. Right now, we assume that c-ares is retrying the same
// send again. If c-ares still needs to send even more data, we'll get
// to it eventually.
grpc_slice currently_attempted = FlattenIovec ( iov , iov_count ) ;
GPR_ASSERT ( GRPC_SLICE_LENGTH ( currently_attempted ) > =
GRPC_SLICE_LENGTH ( write_buf_ ) ) ;
@ -272,31 +434,159 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
GRPC_SLICE_START_PTR ( write_buf_ ) [ i ] ) ;
total_sent + + ;
}
grpc_slice_unref_internal ( write_buf_ ) ;
write_buf_ =
grpc_slice_sub_no_ref ( currently_attempted , total_sent ,
GRPC_SLICE_LENGTH ( currently_attempted ) ) ;
write_state_ = WRITE_IDLE ;
total_sent + = TrySendWriteBufSyncNonBlocking ( ) ;
grpc_slice_unref_internal ( currently_attempted ) ;
tcp_write_state_ = WRITE_IDLE ;
return total_sent ;
}
abort ( ) ;
}
int Connect ( const struct sockaddr * target , ares_socklen_t target_len ) {
static void OnTcpConnectLocked ( void * arg , grpc_error * error ) {
GrpcPolledFdWindows * grpc_polled_fd =
static_cast < GrpcPolledFdWindows * > ( arg ) ;
grpc_polled_fd - > InnerOnTcpConnectLocked ( error ) ;
}
void InnerOnTcpConnectLocked ( grpc_error * error ) {
GRPC_CARES_TRACE_LOG (
" fd:%s InnerOnTcpConnectLocked error:|%s| "
" pending_register_for_readable:% " PRIdPTR
" pending_register_for_writeable:% " PRIdPTR ,
GetName ( ) , grpc_error_string ( error ) ,
pending_continue_register_for_on_readable_locked_ ,
pending_continue_register_for_on_writeable_locked_ ) ;
GPR_ASSERT ( ! connect_done_ ) ;
connect_done_ = true ;
GPR_ASSERT ( wsa_connect_error_ = = 0 ) ;
if ( error = = GRPC_ERROR_NONE ) {
DWORD transfered_bytes = 0 ;
DWORD flags ;
BOOL wsa_success = WSAGetOverlappedResult (
grpc_winsocket_wrapped_socket ( winsocket_ ) ,
& winsocket_ - > write_info . overlapped , & transfered_bytes , FALSE , & flags ) ;
GPR_ASSERT ( transfered_bytes = = 0 ) ;
if ( ! wsa_success ) {
wsa_connect_error_ = WSAGetLastError ( ) ;
char * msg = gpr_format_message ( wsa_connect_error_ ) ;
GRPC_CARES_TRACE_LOG (
" fd:%s InnerOnTcpConnectLocked WSA overlapped result code:%d "
" msg:|%s| " ,
GetName ( ) , wsa_connect_error_ , msg ) ;
gpr_free ( msg ) ;
}
} else {
// Spoof up an error code that will cause any future c-ares operations on
// this fd to abort.
wsa_connect_error_ = WSA_OPERATION_ABORTED ;
}
if ( pending_continue_register_for_on_readable_locked_ ! = nullptr ) {
GRPC_CLOSURE_SCHED ( pending_continue_register_for_on_readable_locked_ ,
GRPC_ERROR_NONE ) ;
}
if ( pending_continue_register_for_on_writeable_locked_ ! = nullptr ) {
GRPC_CLOSURE_SCHED ( pending_continue_register_for_on_writeable_locked_ ,
GRPC_ERROR_NONE ) ;
}
}
int Connect ( WSAErrorContext * wsa_error_ctx , const struct sockaddr * target ,
ares_socklen_t target_len ) {
switch ( socket_type_ ) {
case SOCK_DGRAM :
return ConnectUDP ( wsa_error_ctx , target , target_len ) ;
case SOCK_STREAM :
return ConnectTCP ( wsa_error_ctx , target , target_len ) ;
default :
abort ( ) ;
}
}
int ConnectUDP ( WSAErrorContext * wsa_error_ctx , const struct sockaddr * target ,
ares_socklen_t target_len ) {
GRPC_CARES_TRACE_LOG ( " fd:%s ConnectUDP " , GetName ( ) ) ;
GPR_ASSERT ( ! connect_done_ ) ;
GPR_ASSERT ( wsa_connect_error_ = = 0 ) ;
SOCKET s = grpc_winsocket_wrapped_socket ( winsocket_ ) ;
GRPC_CARES_TRACE_LOG ( " Connect: fd:|%s| " , GetName ( ) ) ;
int out =
WSAConnect ( s , target , target_len , nullptr , nullptr , nullptr , nullptr ) ;
if ( out ! = 0 ) {
wsa_connect_error_ = WSAGetLastError ( ) ;
wsa_error_ctx - > SetWSAError ( wsa_connect_error_ ) ;
connect_done_ = true ;
char * msg = gpr_format_message ( wsa_connect_error_ ) ;
GRPC_CARES_TRACE_LOG ( " fd:%s WSAConnect error code:|%d| msg:|%s| " , GetName ( ) ,
wsa_connect_error_ , msg ) ;
gpr_free ( msg ) ;
// c-ares expects a posix-style connect API
return out = = 0 ? 0 : - 1 ;
}
int ConnectTCP ( WSAErrorContext * wsa_error_ctx , const struct sockaddr * target ,
ares_socklen_t target_len ) {
GRPC_CARES_TRACE_LOG ( " fd:%s ConnectTCP " , GetName ( ) ) ;
LPFN_CONNECTEX ConnectEx ;
GUID guid = WSAID_CONNECTEX ;
DWORD ioctl_num_bytes ;
SOCKET s = grpc_winsocket_wrapped_socket ( winsocket_ ) ;
if ( WSAIoctl ( s , SIO_GET_EXTENSION_FUNCTION_POINTER , & guid , sizeof ( guid ) ,
& ConnectEx , sizeof ( ConnectEx ) , & ioctl_num_bytes , nullptr ,
nullptr ) ! = 0 ) {
int wsa_last_error = WSAGetLastError ( ) ;
wsa_error_ctx - > SetWSAError ( wsa_last_error ) ;
char * msg = gpr_format_message ( wsa_last_error ) ;
GRPC_CARES_TRACE_LOG (
" fd:%s WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:%d "
" msg:|%s| " ,
GetName ( ) , wsa_last_error , msg ) ;
gpr_free ( msg ) ;
connect_done_ = true ;
wsa_connect_error_ = wsa_last_error ;
return - 1 ;
}
grpc_resolved_address wildcard4_addr ;
grpc_resolved_address wildcard6_addr ;
grpc_sockaddr_make_wildcards ( 0 , & wildcard4_addr , & wildcard6_addr ) ;
grpc_resolved_address * local_address = nullptr ;
if ( address_family_ = = AF_INET ) {
local_address = & wildcard4_addr ;
} else {
local_address = & wildcard6_addr ;
}
if ( bind ( s , ( struct sockaddr * ) local_address - > addr ,
( int ) local_address - > len ) ! = 0 ) {
int wsa_last_error = WSAGetLastError ( ) ;
wsa_error_ctx - > SetWSAError ( wsa_last_error ) ;
char * msg = gpr_format_message ( wsa_last_error ) ;
GRPC_CARES_TRACE_LOG ( " Connect error code:|%d|, msg:|%s|. fd:|%s| " ,
wsa_last_error , msg , GetName ( ) ) ;
GRPC_CARES_TRACE_LOG ( " fd:%s bind error code:%d msg:|%s| " , GetName ( ) ,
wsa_last_error , msg ) ;
gpr_free ( msg ) ;
// c-ares expects a posix-style connect API
connect_done_ = true ;
wsa_connect_error_ = wsa_last_error ;
return - 1 ;
}
int out = 0 ;
if ( ConnectEx ( s , target , target_len , nullptr , 0 , nullptr ,
& winsocket_ - > write_info . overlapped ) = = 0 ) {
out = - 1 ;
int wsa_last_error = WSAGetLastError ( ) ;
wsa_error_ctx - > SetWSAError ( wsa_last_error ) ;
char * msg = gpr_format_message ( wsa_last_error ) ;
GRPC_CARES_TRACE_LOG ( " fd:%s ConnectEx error code:%d msg:|%s| " , GetName ( ) ,
wsa_last_error , msg ) ;
gpr_free ( msg ) ;
if ( wsa_last_error = = WSA_IO_PENDING ) {
// c-ares only understands WSAEINPROGRESS and EWOULDBLOCK error codes on
// connect, but an async connect on IOCP socket will give
// WSA_IO_PENDING, so we need to convert.
wsa_error_ctx - > SetWSAError ( WSAEWOULDBLOCK ) ;
} else {
// By returning a non-retryable error to c-ares at this point,
// we're aborting the possibility of any future operations on this fd.
connect_done_ = true ;
wsa_connect_error_ = wsa_last_error ;
return - 1 ;
}
}
grpc_socket_notify_on_write ( winsocket_ , & on_tcp_connect_locked_ ) ;
return out ;
}
@ -319,12 +609,13 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
* in subsequent c - ares reads . */
if ( winsocket_ - > read_info . wsa_error ! = WSAEMSGSIZE ) {
GRPC_ERROR_UNREF ( error ) ;
char * msg = gpr_format_message ( winsocket_ - > read_info . wsa_error ) ;
error = GRPC_WSA_ERROR ( winsocket_ - > read_info . wsa_error ,
" OnIocpReadableInner " ) ;
GRPC_CARES_TRACE_LOG (
" OnIocpReadableInner. winsocket error:|%s|. fd:|%s| " , msg ,
GetName ( ) ) ;
error = GRPC_ERROR_CREATE_FROM_COPIED_STRING ( msg ) ;
gpr_free ( msg ) ;
" fd:|%s| OnIocpReadableInner winsocket_->read_info.wsa_error "
" code:|%d| msg:|%s| " ,
GetName ( ) , winsocket_ - > read_info . wsa_error ,
grpc_error_string ( error ) ) ;
}
}
}
@ -337,8 +628,8 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
read_buf_ = grpc_empty_slice ( ) ;
}
GRPC_CARES_TRACE_LOG (
" OnIocpReadable finishing. read buf length now:|%d|. :fd:|%s| " ,
GRPC_SLICE_LENGTH ( read_buf_ ) , GetName ( ) ) ;
" fd:|%s| OnIocpReadable finishing. read buf length now:|%d|" , GetName ( ) ,
GRPC_SLICE_LENGTH ( read_buf_ ) ) ;
ScheduleAndNullReadClosure ( error ) ;
}
@ -349,22 +640,26 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
void OnIocpWriteableInner ( grpc_error * error ) {
GRPC_CARES_TRACE_LOG ( " OnIocpWriteableInner. fd:|%s| " , GetName ( ) ) ;
GPR_ASSERT ( socket_type_ = = SOCK_STREAM ) ;
if ( error = = GRPC_ERROR_NONE ) {
if ( winsocket_ - > write_info . wsa_error ! = 0 ) {
char * msg = gpr_format_message ( winsocket_ - > write_info . wsa_error ) ;
GRPC_CARES_TRACE_LOG (
" OnIocpWriteableInner. winsocket error:|%s|. fd:|%s| " , msg ,
GetName ( ) ) ;
GRPC_ERROR_UNREF ( error ) ;
error = GRPC_ERROR_CREATE_FROM_COPIED_STRING ( msg ) ;
gpr_free ( msg ) ;
error = GRPC_WSA_ERROR ( winsocket_ - > write_info . wsa_error ,
" OnIocpWriteableInner " ) ;
GRPC_CARES_TRACE_LOG (
" fd:|%s| OnIocpWriteableInner. winsocket_->write_info.wsa_error "
" code:|%d| msg:|%s| " ,
GetName ( ) , winsocket_ - > write_info . wsa_error ,
grpc_error_string ( error ) ) ;
}
}
GPR_ASSERT ( write_state_ = = WRITE_PENDING ) ;
GPR_ASSERT ( tcp_ write_state_ = = WRITE_PENDING ) ;
if ( error = = GRPC_ERROR_NONE ) {
write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY ;
tcp_ write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY ;
write_buf_ = grpc_slice_sub_no_ref (
write_buf_ , 0 , winsocket_ - > write_info . bytes_transfered ) ;
GRPC_CARES_TRACE_LOG ( " fd:|%s| OnIocpWriteableInner. bytes transferred:%d " ,
GetName ( ) , winsocket_ - > write_info . bytes_transfered ) ;
} else {
grpc_slice_unref_internal ( write_buf_ ) ;
write_buf_ = grpc_empty_slice ( ) ;
@ -386,9 +681,22 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
grpc_closure outer_read_closure_ ;
grpc_closure outer_write_closure_ ;
grpc_winsocket * winsocket_ ;
WriteState write_state_ ;
// tcp_write_state_ is only used on TCP GrpcPolledFds
WriteState tcp_write_state_ ;
char * name_ = nullptr ;
bool gotten_into_driver_list_ ;
int address_family_ ;
int socket_type_ ;
grpc_closure on_tcp_connect_locked_ ;
bool connect_done_ = false ;
int wsa_connect_error_ = 0 ;
// We don't run register_for_{readable,writeable} logic until
// a socket is connected. In the interim, we queue readable/writeable
// registrations with the following state.
grpc_closure continue_register_for_on_readable_locked_ ;
grpc_closure continue_register_for_on_writeable_locked_ ;
grpc_closure * pending_continue_register_for_on_readable_locked_ = nullptr ;
grpc_closure * pending_continue_register_for_on_writeable_locked_ = nullptr ;
} ;
struct SockToPolledFdEntry {
@ -454,39 +762,53 @@ class SockToPolledFdMap {
* objects .
*/
static ares_socket_t Socket ( int af , int type , int protocol , void * user_data ) {
if ( type ! = SOCK_DGRAM & & type ! = SOCK_STREAM ) {
GRPC_CARES_TRACE_LOG ( " Socket called with invalid socket type:%d " , type ) ;
return INVALID_SOCKET ;
}
SockToPolledFdMap * map = static_cast < SockToPolledFdMap * > ( user_data ) ;
SOCKET s = WSASocket ( af , type , protocol , nullptr , 0 ,
grpc_get_default_wsa_socket_flags ( ) ) ;
if ( s = = INVALID_SOCKET ) {
GRPC_CARES_TRACE_LOG (
" WSASocket failed with params af:%d type:%d protocol:%d " , af , type ,
protocol ) ;
return s ;
}
grpc_tcp_set_non_block ( s ) ;
GrpcPolledFdWindows * polled_fd =
New < GrpcPolledFdWindows > ( s , map - > combiner_ ) ;
New < GrpcPolledFdWindows > ( s , map - > combiner_ , af , type ) ;
GRPC_CARES_TRACE_LOG (
" fd:|%s| created with params af:%d type:%d protocol:%d " ,
polled_fd - > GetName ( ) , af , type , protocol ) ;
map - > AddNewSocket ( s , polled_fd ) ;
return s ;
}
static int Connect ( ares_socket_t as , const struct sockaddr * target ,
ares_socklen_t target_len , void * user_data ) {
WSAErrorContext wsa_error_ctx ;
SockToPolledFdMap * map = static_cast < SockToPolledFdMap * > ( user_data ) ;
GrpcPolledFdWindows * polled_fd = map - > LookupPolledFd ( as ) ;
return polled_fd - > Connect ( target , target_len ) ;
return polled_fd - > Connect ( & wsa_error_ctx , target , target_len ) ;
}
static ares_ssize_t SendV ( ares_socket_t as , const struct iovec * iov ,
int iovec_count , void * user_data ) {
WSAErrorContext wsa_error_ctx ;
SockToPolledFdMap * map = static_cast < SockToPolledFdMap * > ( user_data ) ;
GrpcPolledFdWindows * polled_fd = map - > LookupPolledFd ( as ) ;
return polled_fd - > SendV ( iov , iovec_count ) ;
return polled_fd - > SendV ( & wsa_error_ctx , iov , iovec_count ) ;
}
static ares_ssize_t RecvFrom ( ares_socket_t as , void * data , size_t data_len ,
int flags , struct sockaddr * from ,
ares_socklen_t * from_len , void * user_data ) {
WSAErrorContext wsa_error_ctx ;
SockToPolledFdMap * map = static_cast < SockToPolledFdMap * > ( user_data ) ;
GrpcPolledFdWindows * polled_fd = map - > LookupPolledFd ( as ) ;
return polled_fd - > RecvFrom ( data , data_len , flags , from , from_len ) ;
return polled_fd - > RecvFrom ( & wsa_error_ctx , data , data_len , flags , from ,
from_len ) ;
}
static int CloseSocket ( SOCKET s , void * user_data ) {