@ -27,7 +27,9 @@
# include <errno.h>
# include <limits.h>
# include <netinet/in.h>
# include <stdbool.h>
# include <stdio.h>
# include <stdlib.h>
# include <string.h>
# include <sys/socket.h>
@ -46,6 +48,7 @@
# include "src/core/lib/debug/trace.h"
# include "src/core/lib/gpr/string.h"
# include "src/core/lib/gpr/useful.h"
# include "src/core/lib/iomgr/buffer_list.h"
# include "src/core/lib/iomgr/ev_posix.h"
# include "src/core/lib/iomgr/executor.h"
# include "src/core/lib/profiling/timers.h"
@ -97,17 +100,42 @@ struct grpc_tcp {
grpc_closure read_done_closure ;
grpc_closure write_done_closure ;
grpc_closure error_closure ;
char * peer_string ;
grpc_resource_user * resource_user ;
grpc_resource_user_slice_allocator slice_allocator ;
grpc_core : : TracedBuffer * tb_head ; /* List of traced buffers */
gpr_mu tb_mu ; /* Lock for access to list of traced buffers */
/* grpc_endpoint_write takes an argument which if non-null means that the
* transport layer wants the TCP layer to collect timestamps for this write .
* This arg is forwarded to the timestamps callback function when the ACK
* timestamp is received from the kernel . This arg is a ( void * ) which allows
* users of this API to pass in a pointer to any kind of structure . This
* structure could actually be a tag or any book - keeping object that the user
* can use to distinguish between different traced writes . The only
* requirement from the TCP endpoint layer is that this arg should be non - null
* if the user wants timestamps for the write . */
void * outgoing_buffer_arg ;
/* A counter which starts at 0. It is initialized the first time the socket
* options for collecting timestamps are set , and is incremented with each
* byte sent . */
int bytes_counter ;
bool socket_ts_enabled ; /* True if timestamping options are set on the socket
*/
gpr_atm
stop_error_notification ; /* Set to 1 if we do not want to be notified on
errors anymore */
} ;
struct backup_poller {
gpr_mu * pollset_mu ;
grpc_closure run_poller ;
} ;
} // namespace
# define BACKUP_POLLER_POLLSET(b) ((grpc_pollset*)((b) + 1))
@ -302,6 +330,7 @@ static void tcp_free(grpc_tcp* tcp) {
grpc_slice_buffer_destroy_internal ( & tcp - > last_read_buffer ) ;
grpc_resource_user_unref ( tcp - > resource_user ) ;
gpr_free ( tcp - > peer_string ) ;
gpr_mu_destroy ( & tcp - > tb_mu ) ;
gpr_free ( tcp ) ;
}
@ -347,6 +376,10 @@ static void tcp_destroy(grpc_endpoint* ep) {
grpc_network_status_unregister_endpoint ( ep ) ;
grpc_tcp * tcp = reinterpret_cast < grpc_tcp * > ( ep ) ;
grpc_slice_buffer_reset_and_unref_internal ( & tcp - > last_read_buffer ) ;
if ( grpc_event_engine_can_track_errors ( ) ) {
gpr_atm_no_barrier_store ( & tcp - > stop_error_notification , true ) ;
grpc_fd_set_error ( tcp - > em_fd ) ;
}
TCP_UNREF ( tcp , " destroy " ) ;
}
@ -513,6 +546,235 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
}
}
/* A wrapper around sendmsg. It sends \a msg over \a fd and returns the number
* of bytes sent . */
ssize_t tcp_send ( int fd , const struct msghdr * msg ) {
GPR_TIMER_SCOPE ( " sendmsg " , 1 ) ;
ssize_t sent_length ;
do {
/* TODO(klempner): Cork if this is a partial write */
GRPC_STATS_INC_SYSCALL_WRITE ( ) ;
sent_length = sendmsg ( fd , msg , SENDMSG_FLAGS ) ;
} while ( sent_length < 0 & & errno = = EINTR ) ;
return sent_length ;
}
/** This is to be called if outgoing_buffer_arg is not null. On linux platforms,
* this will call sendmsg with socket options set to collect timestamps inside
* the kernel . On return , sent_length is set to the return value of the sendmsg
* call . Returns false if setting the socket options failed . This is not
* implemented for non - linux platforms currently , and crashes out .
*/
static bool tcp_write_with_timestamps ( grpc_tcp * tcp , struct msghdr * msg ,
size_t sending_length ,
ssize_t * sent_length , grpc_error * * error ) ;
/** The callback function to be invoked when we get an error on the socket. */
static void tcp_handle_error ( void * arg /* grpc_tcp */ , grpc_error * error ) ;
# ifdef GRPC_LINUX_ERRQUEUE
static bool tcp_write_with_timestamps ( grpc_tcp * tcp , struct msghdr * msg ,
size_t sending_length ,
ssize_t * sent_length ,
grpc_error * * error ) {
if ( ! tcp - > socket_ts_enabled ) {
uint32_t opt = grpc_core : : kTimestampingSocketOptions ;
if ( setsockopt ( tcp - > fd , SOL_SOCKET , SO_TIMESTAMPING ,
static_cast < void * > ( & opt ) , sizeof ( opt ) ) ! = 0 ) {
* error = tcp_annotate_error ( GRPC_OS_ERROR ( errno , " setsockopt " ) , tcp ) ;
grpc_slice_buffer_reset_and_unref_internal ( tcp - > outgoing_buffer ) ;
if ( grpc_tcp_trace . enabled ( ) ) {
gpr_log ( GPR_ERROR , " Failed to set timestamping options on the socket. " ) ;
}
return false ;
}
tcp - > bytes_counter = - 1 ;
tcp - > socket_ts_enabled = true ;
}
/* Set control message to indicate that you want timestamps. */
union {
char cmsg_buf [ CMSG_SPACE ( sizeof ( uint32_t ) ) ] ;
struct cmsghdr align ;
} u ;
cmsghdr * cmsg = reinterpret_cast < cmsghdr * > ( u . cmsg_buf ) ;
cmsg - > cmsg_level = SOL_SOCKET ;
cmsg - > cmsg_type = SO_TIMESTAMPING ;
cmsg - > cmsg_len = CMSG_LEN ( sizeof ( uint32_t ) ) ;
* reinterpret_cast < int * > ( CMSG_DATA ( cmsg ) ) =
grpc_core : : kTimestampingRecordingOptions ;
msg - > msg_control = u . cmsg_buf ;
msg - > msg_controllen = CMSG_SPACE ( sizeof ( uint32_t ) ) ;
/* If there was an error on sendmsg the logic in tcp_flush will handle it. */
ssize_t length = tcp_send ( tcp - > fd , msg ) ;
* sent_length = length ;
/* Only save timestamps if all the bytes were taken by sendmsg. */
if ( sending_length = = static_cast < size_t > ( length ) ) {
gpr_mu_lock ( & tcp - > tb_mu ) ;
grpc_core : : TracedBuffer : : AddNewEntry (
& tcp - > tb_head , static_cast < int > ( tcp - > bytes_counter + length ) ,
tcp - > outgoing_buffer_arg ) ;
gpr_mu_unlock ( & tcp - > tb_mu ) ;
tcp - > outgoing_buffer_arg = nullptr ;
}
return true ;
}
/** Reads \a cmsg to derive timestamps from the control messages. If a valid
* timestamp is found , the traced buffer list is updated with this timestamp .
* The caller of this function should be looping on the control messages found
* in \ a msg . \ a cmsg should point to the control message that the caller wants
* processed .
* On return , a pointer to a control message is returned . On the next iteration ,
* CMSG_NXTHDR ( msg , ret_val ) should be passed as \ a cmsg . */
struct cmsghdr * process_timestamp ( grpc_tcp * tcp , msghdr * msg ,
struct cmsghdr * cmsg ) {
auto next_cmsg = CMSG_NXTHDR ( msg , cmsg ) ;
if ( next_cmsg = = nullptr ) {
if ( grpc_tcp_trace . enabled ( ) ) {
gpr_log ( GPR_ERROR , " Received timestamp without extended error " ) ;
}
return cmsg ;
}
if ( ! ( next_cmsg - > cmsg_level = = SOL_IP | | next_cmsg - > cmsg_level = = SOL_IPV6 ) | |
! ( next_cmsg - > cmsg_type = = IP_RECVERR | |
next_cmsg - > cmsg_type = = IPV6_RECVERR ) ) {
if ( grpc_tcp_trace . enabled ( ) ) {
gpr_log ( GPR_ERROR , " Unexpected control message " ) ;
}
return cmsg ;
}
auto tss =
reinterpret_cast < struct grpc_core : : scm_timestamping * > ( CMSG_DATA ( cmsg ) ) ;
auto serr = reinterpret_cast < struct sock_extended_err * > ( CMSG_DATA ( next_cmsg ) ) ;
if ( serr - > ee_errno ! = ENOMSG | |
serr - > ee_origin ! = SO_EE_ORIGIN_TIMESTAMPING ) {
gpr_log ( GPR_ERROR , " Unexpected control message " ) ;
return cmsg ;
}
/* The error handling can potentially be done on another thread so we need
* to protect the traced buffer list . A lock free list might be better . Using
* a simple mutex for now . */
gpr_mu_lock ( & tcp - > tb_mu ) ;
grpc_core : : TracedBuffer : : ProcessTimestamp ( & tcp - > tb_head , serr , tss ) ;
gpr_mu_unlock ( & tcp - > tb_mu ) ;
return next_cmsg ;
}
/** For linux platforms, reads the socket's error queue and processes error
* messages from the queue . Returns true if all the errors processed were
* timestamps . Returns false if any of the errors were not timestamps . For
* non - linux platforms , error processing is not used / enabled currently .
*/
static bool process_errors ( grpc_tcp * tcp ) {
while ( true ) {
struct iovec iov ;
iov . iov_base = nullptr ;
iov . iov_len = 0 ;
struct msghdr msg ;
msg . msg_name = nullptr ;
msg . msg_namelen = 0 ;
msg . msg_iov = & iov ;
msg . msg_iovlen = 0 ;
msg . msg_flags = 0 ;
union {
char rbuf [ 1024 /*CMSG_SPACE(sizeof(scm_timestamping)) +
CMSG_SPACE ( sizeof ( sock_extended_err ) + sizeof ( sockaddr_in ) ) */ ] ;
struct cmsghdr align ;
} aligned_buf ;
memset ( & aligned_buf , 0 , sizeof ( aligned_buf ) ) ;
msg . msg_control = aligned_buf . rbuf ;
msg . msg_controllen = sizeof ( aligned_buf . rbuf ) ;
int r , saved_errno ;
do {
r = recvmsg ( tcp - > fd , & msg , MSG_ERRQUEUE ) ;
saved_errno = errno ;
} while ( r < 0 & & saved_errno = = EINTR ) ;
if ( r = = - 1 & & saved_errno = = EAGAIN ) {
return true ; /* No more errors to process */
}
if ( r = = - 1 ) {
return false ;
}
if ( grpc_tcp_trace . enabled ( ) ) {
if ( ( msg . msg_flags & MSG_CTRUNC ) = = 1 ) {
gpr_log ( GPR_INFO , " Error message was truncated. " ) ;
}
}
if ( msg . msg_controllen = = 0 ) {
/* There was no control message found. It was probably spurious. */
return true ;
}
for ( auto cmsg = CMSG_FIRSTHDR ( & msg ) ; cmsg & & cmsg - > cmsg_len ;
cmsg = CMSG_NXTHDR ( & msg , cmsg ) ) {
if ( cmsg - > cmsg_level ! = SOL_SOCKET | |
cmsg - > cmsg_type ! = SCM_TIMESTAMPING ) {
/* Got a control message that is not a timestamp. Don't know how to
* handle this . */
if ( grpc_tcp_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" unknown control message cmsg_level:%d cmsg_type:%d " ,
cmsg - > cmsg_level , cmsg - > cmsg_type ) ;
}
return false ;
}
process_timestamp ( tcp , & msg , cmsg ) ;
}
}
}
static void tcp_handle_error ( void * arg /* grpc_tcp */ , grpc_error * error ) {
grpc_tcp * tcp = static_cast < grpc_tcp * > ( arg ) ;
if ( grpc_tcp_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " TCP:%p got_error: %s " , tcp , grpc_error_string ( error ) ) ;
}
if ( error ! = GRPC_ERROR_NONE | |
static_cast < bool > ( gpr_atm_acq_load ( & tcp - > stop_error_notification ) ) ) {
/* We aren't going to register to hear on error anymore, so it is safe to
* unref . */
grpc_core : : TracedBuffer : : Shutdown ( & tcp - > tb_head , GRPC_ERROR_REF ( error ) ) ;
TCP_UNREF ( tcp , " error-tracking " ) ;
return ;
}
/* We are still interested in collecting timestamps, so let's try reading
* them . */
if ( ! process_errors ( tcp ) ) {
/* This was not a timestamps error. This was an actual error. Set the
* read and write closures to be ready .
*/
grpc_fd_set_readable ( tcp - > em_fd ) ;
grpc_fd_set_writable ( tcp - > em_fd ) ;
}
GRPC_CLOSURE_INIT ( & tcp - > error_closure , tcp_handle_error , tcp ,
grpc_schedule_on_exec_ctx ) ;
grpc_fd_notify_on_error ( tcp - > em_fd , & tcp - > error_closure ) ;
}
# else /* GRPC_LINUX_ERRQUEUE */
static bool tcp_write_with_timestamps ( grpc_tcp * tcp , struct msghdr * msg ,
size_t sending_length ,
ssize_t * sent_length ,
grpc_error * * error ) {
gpr_log ( GPR_ERROR , " Write with timestamps not supported for this platform " ) ;
GPR_ASSERT ( 0 ) ;
return false ;
}
static void tcp_handle_error ( void * arg /* grpc_tcp */ , grpc_error * error ) {
gpr_log ( GPR_ERROR , " Error handling is not supported for this platform " ) ;
GPR_ASSERT ( 0 ) ;
}
# endif /* GRPC_LINUX_ERRQUEUE */
/* returns true if done, false if pending; if returning true, *error is set */
# if defined(IOV_MAX) && IOV_MAX < 1000
# define MAX_WRITE_IOVEC IOV_MAX
@ -557,19 +819,20 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
msg . msg_namelen = 0 ;
msg . msg_iov = iov ;
msg . msg_iovlen = iov_size ;
msg . msg_control = nullptr ;
msg . msg_controllen = 0 ;
msg . msg_flags = 0 ;
if ( tcp - > outgoing_buffer_arg ! = nullptr ) {
if ( ! tcp_write_with_timestamps ( tcp , & msg , sending_length , & sent_length ,
error ) )
return true ; /* something went wrong with timestamps */
} else {
msg . msg_control = nullptr ;
msg . msg_controllen = 0 ;
GRPC_STATS_INC_TCP_WRITE_SIZE ( sending_length ) ;
GRPC_STATS_INC_TCP_WRITE_IOV_SIZE ( iov_size ) ;
GRPC_STATS_INC_TCP_WRITE_SIZE ( sending_length ) ;
GRPC_STATS_INC_TCP_WRITE_IOV_SIZE ( iov_size ) ;
GPR_TIMER_SCOPE ( " sendmsg " , 1 ) ;
do {
/* TODO(klempner): Cork if this is a partial write */
GRPC_STATS_INC_SYSCALL_WRITE ( ) ;
sent_length = sendmsg ( tcp - > fd , & msg , SENDMSG_FLAGS ) ;
} while ( sent_length < 0 & & errno = = EINTR ) ;
sent_length = tcp_send ( tcp - > fd , & msg ) ;
}
if ( sent_length < 0 ) {
if ( errno = = EAGAIN ) {
@ -593,6 +856,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
}
GPR_ASSERT ( tcp - > outgoing_byte_idx = = 0 ) ;
tcp - > bytes_counter + = sent_length ;
trailing = sending_length - static_cast < size_t > ( sent_length ) ;
while ( trailing > 0 ) {
size_t slice_length ;
@ -607,7 +871,6 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
trailing - = slice_length ;
}
}
if ( outgoing_slice_idx = = tcp - > outgoing_buffer - > count ) {
* error = GRPC_ERROR_NONE ;
grpc_slice_buffer_reset_and_unref_internal ( tcp - > outgoing_buffer ) ;
@ -640,14 +903,13 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
const char * str = grpc_error_string ( error ) ;
gpr_log ( GPR_INFO , " write: %s " , str ) ;
}
GRPC_CLOSURE_SCHED ( cb , error ) ;
TCP_UNREF ( tcp , " write " ) ;
}
}
static void tcp_write ( grpc_endpoint * ep , grpc_slice_buffer * buf ,
grpc_closure * cb ) {
grpc_closure * cb , void * arg ) {
GPR_TIMER_SCOPE ( " tcp_write " , 0 ) ;
grpc_tcp * tcp = reinterpret_cast < grpc_tcp * > ( ep ) ;
grpc_error * error = GRPC_ERROR_NONE ;
@ -675,6 +937,10 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
}
tcp - > outgoing_buffer = buf ;
tcp - > outgoing_byte_idx = 0 ;
tcp - > outgoing_buffer_arg = arg ;
if ( arg ) {
GPR_ASSERT ( grpc_event_engine_can_track_errors ( ) ) ;
}
if ( ! tcp_flush ( tcp , & error ) ) {
TCP_REF ( tcp , " write " ) ;
@ -792,6 +1058,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp - > bytes_read_this_round = 0 ;
/* Will be set to false by the very first endpoint read function */
tcp - > is_first_read = true ;
tcp - > bytes_counter = - 1 ;
tcp - > socket_ts_enabled = false ;
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init ( & tcp - > refcount , 1 ) ;
gpr_atm_no_barrier_store ( & tcp - > shutdown_count , 0 ) ;
@ -803,6 +1071,19 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
/* Tell network status tracker about new endpoint */
grpc_network_status_register_endpoint ( & tcp - > base ) ;
grpc_resource_quota_unref_internal ( resource_quota ) ;
gpr_mu_init ( & tcp - > tb_mu ) ;
tcp - > tb_head = nullptr ;
/* Start being notified on errors if event engine can track errors. */
if ( grpc_event_engine_can_track_errors ( ) ) {
/* Grab a ref to tcp so that we can safely access the tcp struct when
* processing errors . We unref when we no longer want to track errors
* separately . */
TCP_REF ( tcp , " error-tracking " ) ;
gpr_atm_rel_store ( & tcp - > stop_error_notification , 0 ) ;
GRPC_CLOSURE_INIT ( & tcp - > error_closure , tcp_handle_error , tcp ,
grpc_schedule_on_exec_ctx ) ;
grpc_fd_notify_on_error ( tcp - > em_fd , & tcp - > error_closure ) ;
}
return & tcp - > base ;
}
@ -821,6 +1102,11 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
tcp - > release_fd = fd ;
tcp - > release_fd_cb = done ;
grpc_slice_buffer_reset_and_unref_internal ( & tcp - > last_read_buffer ) ;
if ( grpc_event_engine_can_track_errors ( ) ) {
/* Stop errors notification. */
gpr_atm_no_barrier_store ( & tcp - > stop_error_notification , true ) ;
grpc_fd_set_error ( tcp - > em_fd ) ;
}
TCP_UNREF ( tcp , " destroy " ) ;
}