@ -36,6 +36,7 @@
# include <sys/types.h>
# include <unistd.h>
# include <algorithm>
# include <unordered_map>
# include <grpc/slice.h>
# include <grpc/support/alloc.h>
@ -49,9 +50,11 @@
# include "src/core/lib/debug/trace.h"
# include "src/core/lib/gpr/string.h"
# include "src/core/lib/gpr/useful.h"
# include "src/core/lib/gprpp/sync.h"
# include "src/core/lib/iomgr/buffer_list.h"
# include "src/core/lib/iomgr/ev_posix.h"
# include "src/core/lib/iomgr/executor.h"
# include "src/core/lib/iomgr/socket_utils_posix.h"
# include "src/core/lib/profiling/timers.h"
# include "src/core/lib/slice/slice_internal.h"
# include "src/core/lib/slice/slice_string_helpers.h"
@ -71,6 +74,15 @@
# define SENDMSG_FLAGS 0
# endif
// TCP zero copy sendmsg flag.
// NB: We define this here as a fallback in case we're using an older set of
// library headers that has not defined MSG_ZEROCOPY. Since this constant is
// part of the kernel, we are guaranteed it will never change/disagree so
// defining it here is safe.
# ifndef MSG_ZEROCOPY
# define MSG_ZEROCOPY 0x4000000
# endif
# ifdef GRPC_MSG_IOVLEN_TYPE
typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type ;
# else
@ -79,6 +91,264 @@ typedef size_t msg_iovlen_type;
extern grpc_core : : TraceFlag grpc_tcp_trace ;
namespace grpc_core {
class TcpZerocopySendRecord {
public :
TcpZerocopySendRecord ( ) { grpc_slice_buffer_init ( & buf_ ) ; }
~ TcpZerocopySendRecord ( ) {
AssertEmpty ( ) ;
grpc_slice_buffer_destroy_internal ( & buf_ ) ;
}
// Given the slices that we wish to send, and the current offset into the
// slice buffer (indicating which have already been sent), populate an iovec
// array that will be used for a zerocopy enabled sendmsg().
msg_iovlen_type PopulateIovs ( size_t * unwind_slice_idx ,
size_t * unwind_byte_idx , size_t * sending_length ,
iovec * iov ) ;
// A sendmsg() may not be able to send the bytes that we requested at this
// time, returning EAGAIN (possibly due to backpressure). In this case,
// unwind the offset into the slice buffer so we retry sending these bytes.
void UnwindIfThrottled ( size_t unwind_slice_idx , size_t unwind_byte_idx ) {
out_offset_ . byte_idx = unwind_byte_idx ;
out_offset_ . slice_idx = unwind_slice_idx ;
}
// Update the offset into the slice buffer based on how much we wanted to sent
// vs. what sendmsg() actually sent (which may be lower, possibly due to
// backpressure).
void UpdateOffsetForBytesSent ( size_t sending_length , size_t actually_sent ) ;
// Indicates whether all underlying data has been sent or not.
bool AllSlicesSent ( ) { return out_offset_ . slice_idx = = buf_ . count ; }
// Reset this structure for a new tcp_write() with zerocopy.
void PrepareForSends ( grpc_slice_buffer * slices_to_send ) {
AssertEmpty ( ) ;
out_offset_ . slice_idx = 0 ;
out_offset_ . byte_idx = 0 ;
grpc_slice_buffer_swap ( slices_to_send , & buf_ ) ;
Ref ( ) ;
}
// References: 1 reference per sendmsg(), and 1 for the tcp_write().
void Ref ( ) { ref_ . FetchAdd ( 1 , MemoryOrder : : RELAXED ) ; }
// Unref: called when we get an error queue notification for a sendmsg(), if a
// sendmsg() failed or when tcp_write() is done.
bool Unref ( ) {
const intptr_t prior = ref_ . FetchSub ( 1 , MemoryOrder : : ACQ_REL ) ;
GPR_DEBUG_ASSERT ( prior > 0 ) ;
if ( prior = = 1 ) {
AllSendsComplete ( ) ;
return true ;
}
return false ;
}
private :
struct OutgoingOffset {
size_t slice_idx = 0 ;
size_t byte_idx = 0 ;
} ;
void AssertEmpty ( ) {
GPR_DEBUG_ASSERT ( buf_ . count = = 0 ) ;
GPR_DEBUG_ASSERT ( buf_ . length = = 0 ) ;
GPR_DEBUG_ASSERT ( ref_ . Load ( MemoryOrder : : RELAXED ) = = 0 ) ;
}
// When all sendmsg() calls associated with this tcp_write() have been
// completed (ie. we have received the notifications for each sequence number
// for each sendmsg()) and all reference counts have been dropped, drop our
// reference to the underlying data since we no longer need it.
void AllSendsComplete ( ) {
GPR_DEBUG_ASSERT ( ref_ . Load ( MemoryOrder : : RELAXED ) = = 0 ) ;
grpc_slice_buffer_reset_and_unref_internal ( & buf_ ) ;
}
grpc_slice_buffer buf_ ;
Atomic < intptr_t > ref_ ;
OutgoingOffset out_offset_ ;
} ;
class TcpZerocopySendCtx {
public :
static constexpr int kDefaultMaxSends = 4 ;
static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024 ; // 16KB
TcpZerocopySendCtx ( int max_sends = kDefaultMaxSends ,
size_t send_bytes_threshold = kDefaultSendBytesThreshold )
: max_sends_ ( max_sends ) ,
free_send_records_size_ ( max_sends ) ,
threshold_bytes_ ( send_bytes_threshold ) {
send_records_ = static_cast < TcpZerocopySendRecord * > (
gpr_malloc ( max_sends * sizeof ( * send_records_ ) ) ) ;
free_send_records_ = static_cast < TcpZerocopySendRecord * * > (
gpr_malloc ( max_sends * sizeof ( * free_send_records_ ) ) ) ;
if ( send_records_ = = nullptr | | free_send_records_ = = nullptr ) {
gpr_free ( send_records_ ) ;
gpr_free ( free_send_records_ ) ;
gpr_log ( GPR_INFO , " Disabling TCP TX zerocopy due to memory pressure. \n " ) ;
memory_limited_ = true ;
} else {
for ( int idx = 0 ; idx < max_sends_ ; + + idx ) {
new ( send_records_ + idx ) TcpZerocopySendRecord ( ) ;
free_send_records_ [ idx ] = send_records_ + idx ;
}
}
}
~ TcpZerocopySendCtx ( ) {
if ( send_records_ ! = nullptr ) {
for ( int idx = 0 ; idx < max_sends_ ; + + idx ) {
send_records_ [ idx ] . ~ TcpZerocopySendRecord ( ) ;
}
}
gpr_free ( send_records_ ) ;
gpr_free ( free_send_records_ ) ;
}
// True if we were unable to allocate the various bookkeeping structures at
// transport initialization time. If memory limited, we do not zerocopy.
bool memory_limited ( ) const { return memory_limited_ ; }
// TCP send zerocopy maintains an implicit sequence number for every
// successful sendmsg() with zerocopy enabled; the kernel later gives us an
// error queue notification with this sequence number indicating that the
// underlying data buffers that we sent can now be released. Once that
// notification is received, we can release the buffers associated with this
// zerocopy send record. Here, we associate the sequence number with the data
// buffers that were sent with the corresponding call to sendmsg().
void NoteSend ( TcpZerocopySendRecord * record ) {
record - > Ref ( ) ;
AssociateSeqWithSendRecord ( last_send_ , record ) ;
+ + last_send_ ;
}
// If sendmsg() actually failed, though, we need to revert the sequence number
// that we speculatively bumped before calling sendmsg(). Note that we bump
// this sequence number and perform relevant bookkeeping (see: NoteSend())
// *before* calling sendmsg() since, if we called it *after* sendmsg(), then
// there is a possible race with the release notification which could occur on
// another thread before we do the necessary bookkeeping. Hence, calling
// NoteSend() *before* sendmsg() and implementing an undo function is needed.
void UndoSend ( ) {
- - last_send_ ;
if ( ReleaseSendRecord ( last_send_ ) - > Unref ( ) ) {
// We should still be holding the ref taken by tcp_write().
GPR_DEBUG_ASSERT ( 0 ) ;
}
}
// Simply associate this send record (and the underlying sent data buffers)
// with the implicit sequence number for this zerocopy sendmsg().
void AssociateSeqWithSendRecord ( uint32_t seq , TcpZerocopySendRecord * record ) {
MutexLock guard ( & lock_ ) ;
ctx_lookup_ . emplace ( seq , record ) ;
}
// Get a send record for a send that we wish to do with zerocopy.
TcpZerocopySendRecord * GetSendRecord ( ) {
MutexLock guard ( & lock_ ) ;
return TryGetSendRecordLocked ( ) ;
}
// A given send record corresponds to a single tcp_write() with zerocopy
// enabled. This can result in several sendmsg() calls to flush all of the
// data to wire. Each sendmsg() takes a reference on the
// TcpZerocopySendRecord, and corresponds to a single sequence number.
// ReleaseSendRecord releases a reference on TcpZerocopySendRecord for a
// single sequence number. This is called either when we receive the relevant
// error queue notification (saying that we can discard the underlying
// buffers for this sendmsg()) is received from the kernel - or, in case
// sendmsg() was unsuccessful to begin with.
TcpZerocopySendRecord * ReleaseSendRecord ( uint32_t seq ) {
MutexLock guard ( & lock_ ) ;
return ReleaseSendRecordLocked ( seq ) ;
}
// After all the references to a TcpZerocopySendRecord are released, we can
// add it back to the pool (of size max_sends_). Note that we can only have
// max_sends_ tcp_write() instances with zerocopy enabled in flight at the
// same time.
void PutSendRecord ( TcpZerocopySendRecord * record ) {
GPR_DEBUG_ASSERT ( record > = send_records_ & &
record < send_records_ + max_sends_ ) ;
MutexLock guard ( & lock_ ) ;
PutSendRecordLocked ( record ) ;
}
// Indicate that we are disposing of this zerocopy context. This indicator
// will prevent new zerocopy writes from being issued.
void Shutdown ( ) { shutdown_ . Store ( true , MemoryOrder : : RELEASE ) ; }
// Indicates that there are no inflight tcp_write() instances with zerocopy
// enabled.
bool AllSendRecordsEmpty ( ) {
MutexLock guard ( & lock_ ) ;
return free_send_records_size_ = = max_sends_ ;
}
bool enabled ( ) const { return enabled_ ; }
void set_enabled ( bool enabled ) {
GPR_DEBUG_ASSERT ( ! enabled | | ! memory_limited ( ) ) ;
enabled_ = enabled ;
}
// Only use zerocopy if we are sending at least this many bytes. The
// additional overhead of reading the error queue for notifications means that
// zerocopy is not useful for small transfers.
size_t threshold_bytes ( ) const { return threshold_bytes_ ; }
private :
TcpZerocopySendRecord * ReleaseSendRecordLocked ( uint32_t seq ) {
auto iter = ctx_lookup_ . find ( seq ) ;
GPR_DEBUG_ASSERT ( iter ! = ctx_lookup_ . end ( ) ) ;
TcpZerocopySendRecord * record = iter - > second ;
ctx_lookup_ . erase ( iter ) ;
return record ;
}
TcpZerocopySendRecord * TryGetSendRecordLocked ( ) {
if ( shutdown_ . Load ( MemoryOrder : : ACQUIRE ) ) {
return nullptr ;
}
if ( free_send_records_size_ = = 0 ) {
return nullptr ;
}
free_send_records_size_ - - ;
return free_send_records_ [ free_send_records_size_ ] ;
}
void PutSendRecordLocked ( TcpZerocopySendRecord * record ) {
GPR_DEBUG_ASSERT ( free_send_records_size_ < max_sends_ ) ;
free_send_records_ [ free_send_records_size_ ] = record ;
free_send_records_size_ + + ;
}
TcpZerocopySendRecord * send_records_ ;
TcpZerocopySendRecord * * free_send_records_ ;
int max_sends_ ;
int free_send_records_size_ ;
Mutex lock_ ;
uint32_t last_send_ = 0 ;
Atomic < bool > shutdown_ ;
bool enabled_ = false ;
size_t threshold_bytes_ = kDefaultSendBytesThreshold ;
std : : unordered_map < uint32_t , TcpZerocopySendRecord * > ctx_lookup_ ;
bool memory_limited_ = false ;
} ;
} // namespace grpc_core
using grpc_core : : TcpZerocopySendCtx ;
using grpc_core : : TcpZerocopySendRecord ;
namespace {
struct grpc_tcp {
grpc_endpoint base ;
@ -142,6 +412,8 @@ struct grpc_tcp {
bool ts_capable ; /* Cache whether we can set timestamping options */
gpr_atm stop_error_notification ; /* Set to 1 if we do not want to be notified
on errors anymore */
TcpZerocopySendCtx tcp_zerocopy_send_ctx ;
TcpZerocopySendRecord * current_zerocopy_send = nullptr ;
} ;
struct backup_poller {
@ -151,6 +423,8 @@ struct backup_poller {
} // namespace
static void ZerocopyDisableAndWaitForRemaining ( grpc_tcp * tcp ) ;
# define BACKUP_POLLER_POLLSET(b) ((grpc_pollset*)((b) + 1))
static gpr_atm g_uncovered_notifications_pending ;
@ -339,6 +613,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error);
static void tcp_shutdown ( grpc_endpoint * ep , grpc_error * why ) {
grpc_tcp * tcp = reinterpret_cast < grpc_tcp * > ( ep ) ;
ZerocopyDisableAndWaitForRemaining ( tcp ) ;
grpc_fd_shutdown ( tcp - > em_fd , why ) ;
grpc_resource_user_shutdown ( tcp - > resource_user ) ;
}
@ -357,6 +632,7 @@ static void tcp_free(grpc_tcp* tcp) {
gpr_mu_unlock ( & tcp - > tb_mu ) ;
tcp - > outgoing_buffer_arg = nullptr ;
gpr_mu_destroy ( & tcp - > tb_mu ) ;
tcp - > tcp_zerocopy_send_ctx . ~ TcpZerocopySendCtx ( ) ;
gpr_free ( tcp ) ;
}
@ -390,6 +666,7 @@ static void tcp_destroy(grpc_endpoint* ep) {
grpc_tcp * tcp = reinterpret_cast < grpc_tcp * > ( ep ) ;
grpc_slice_buffer_reset_and_unref_internal ( & tcp - > last_read_buffer ) ;
if ( grpc_event_engine_can_track_errors ( ) ) {
ZerocopyDisableAndWaitForRemaining ( tcp ) ;
gpr_atm_no_barrier_store ( & tcp - > stop_error_notification , true ) ;
grpc_fd_set_error ( tcp - > em_fd ) ;
}
@ -652,13 +929,13 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
/* A wrapper around sendmsg. It sends \a msg over \a fd and returns the number
* of bytes sent . */
ssize_t tcp_send ( int fd , const struct msghdr * msg ) {
ssize_t tcp_send ( int fd , const struct msghdr * msg , int additional_flags = 0 ) {
GPR_TIMER_SCOPE ( " sendmsg " , 1 ) ;
ssize_t sent_length ;
do {
/* TODO(klempner): Cork if this is a partial write */
GRPC_STATS_INC_SYSCALL_WRITE ( ) ;
sent_length = sendmsg ( fd , msg , SENDMSG_FLAGS ) ;
sent_length = sendmsg ( fd , msg , SENDMSG_FLAGS | additional_flags ) ;
} while ( sent_length < 0 & & errno = = EINTR ) ;
return sent_length ;
}
@ -671,16 +948,52 @@ ssize_t tcp_send(int fd, const struct msghdr* msg) {
*/
static bool tcp_write_with_timestamps ( grpc_tcp * tcp , struct msghdr * msg ,
size_t sending_length ,
ssize_t * sent_length ) ;
ssize_t * sent_length ,
int additional_flags = 0 ) ;
/** The callback function to be invoked when we get an error on the socket. */
static void tcp_handle_error ( void * arg /* grpc_tcp */ , grpc_error * error ) ;
static TcpZerocopySendRecord * tcp_get_send_zerocopy_record (
grpc_tcp * tcp , grpc_slice_buffer * buf ) ;
# ifdef GRPC_LINUX_ERRQUEUE
static bool process_errors ( grpc_tcp * tcp ) ;
static TcpZerocopySendRecord * tcp_get_send_zerocopy_record (
grpc_tcp * tcp , grpc_slice_buffer * buf ) {
TcpZerocopySendRecord * zerocopy_send_record = nullptr ;
const bool use_zerocopy =
tcp - > tcp_zerocopy_send_ctx . enabled ( ) & &
tcp - > tcp_zerocopy_send_ctx . threshold_bytes ( ) < buf - > length ;
if ( use_zerocopy ) {
zerocopy_send_record = tcp - > tcp_zerocopy_send_ctx . GetSendRecord ( ) ;
if ( zerocopy_send_record = = nullptr ) {
process_errors ( tcp ) ;
zerocopy_send_record = tcp - > tcp_zerocopy_send_ctx . GetSendRecord ( ) ;
}
if ( zerocopy_send_record ! = nullptr ) {
zerocopy_send_record - > PrepareForSends ( buf ) ;
GPR_DEBUG_ASSERT ( buf - > count = = 0 ) ;
GPR_DEBUG_ASSERT ( buf - > length = = 0 ) ;
tcp - > outgoing_byte_idx = 0 ;
tcp - > outgoing_buffer = nullptr ;
}
}
return zerocopy_send_record ;
}
static void ZerocopyDisableAndWaitForRemaining ( grpc_tcp * tcp ) {
tcp - > tcp_zerocopy_send_ctx . Shutdown ( ) ;
while ( ! tcp - > tcp_zerocopy_send_ctx . AllSendRecordsEmpty ( ) ) {
process_errors ( tcp ) ;
}
}
static bool tcp_write_with_timestamps ( grpc_tcp * tcp , struct msghdr * msg ,
size_t sending_length ,
ssize_t * sent_length ) {
ssize_t * sent_length ,
int additional_flags ) {
if ( ! tcp - > socket_ts_enabled ) {
uint32_t opt = grpc_core : : kTimestampingSocketOptions ;
if ( setsockopt ( tcp - > fd , SOL_SOCKET , SO_TIMESTAMPING ,
@ -708,7 +1021,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
msg - > msg_controllen = CMSG_SPACE ( sizeof ( uint32_t ) ) ;
/* If there was an error on sendmsg the logic in tcp_flush will handle it. */
ssize_t length = tcp_send ( tcp - > fd , msg ) ;
ssize_t length = tcp_send ( tcp - > fd , msg , additional_flags ) ;
* sent_length = length ;
/* Only save timestamps if all the bytes were taken by sendmsg. */
if ( sending_length = = static_cast < size_t > ( length ) ) {
@ -722,6 +1035,43 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
return true ;
}
static void UnrefMaybePutZerocopySendRecord ( grpc_tcp * tcp ,
TcpZerocopySendRecord * record ,
uint32_t seq , const char * tag ) ;
// Reads \a cmsg to process zerocopy control messages.
static void process_zerocopy ( grpc_tcp * tcp , struct cmsghdr * cmsg ) {
GPR_DEBUG_ASSERT ( cmsg ) ;
auto serr = reinterpret_cast < struct sock_extended_err * > ( CMSG_DATA ( cmsg ) ) ;
GPR_DEBUG_ASSERT ( serr - > ee_errno = = 0 ) ;
GPR_DEBUG_ASSERT ( serr - > ee_origin = = SO_EE_ORIGIN_ZEROCOPY ) ;
const uint32_t lo = serr - > ee_info ;
const uint32_t hi = serr - > ee_data ;
for ( uint32_t seq = lo ; seq < = hi ; + + seq ) {
// TODO(arjunroy): It's likely that lo and hi refer to zerocopy sequence
// numbers that are generated by a single call to grpc_endpoint_write; ie.
// we can batch the unref operation. So, check if record is the same for
// both; if so, batch the unref/put.
TcpZerocopySendRecord * record =
tcp - > tcp_zerocopy_send_ctx . ReleaseSendRecord ( seq ) ;
GPR_DEBUG_ASSERT ( record ) ;
UnrefMaybePutZerocopySendRecord ( tcp , record , seq , " CALLBACK RCVD " ) ;
}
}
// Whether the cmsg received from error queue is of the IPv4 or IPv6 levels.
static bool CmsgIsIpLevel ( const cmsghdr & cmsg ) {
return ( cmsg . cmsg_level = = SOL_IPV6 & & cmsg . cmsg_type = = IPV6_RECVERR ) | |
( cmsg . cmsg_level = = SOL_IP & & cmsg . cmsg_type = = IP_RECVERR ) ;
}
static bool CmsgIsZeroCopy ( const cmsghdr & cmsg ) {
if ( ! CmsgIsIpLevel ( cmsg ) ) {
return false ;
}
auto serr = reinterpret_cast < const sock_extended_err * > CMSG_DATA ( & cmsg ) ;
return serr - > ee_errno = = 0 & & serr - > ee_origin = = SO_EE_ORIGIN_ZEROCOPY ;
}
/** Reads \a cmsg to derive timestamps from the control messages. If a valid
* timestamp is found , the traced buffer list is updated with this timestamp .
* The caller of this function should be looping on the control messages found
@ -783,8 +1133,8 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
/** For linux platforms, reads the socket's error queue and processes error
* messages from the queue .
*/
static void process_errors ( grpc_tcp * tcp ) {
while ( true ) {
static bool process_errors ( grpc_tcp * tcp ) {
bool processed_err = false ;
struct iovec iov ;
iov . iov_base = nullptr ;
iov . iov_len = 0 ;
@ -794,7 +1144,6 @@ static void process_errors(grpc_tcp* tcp) {
msg . msg_iov = & iov ;
msg . msg_iovlen = 0 ;
msg . msg_flags = 0 ;
/* Allocate enough space so we don't need to keep increasing this as size
* of OPT_STATS increase */
constexpr size_t cmsg_alloc_space =
@ -806,50 +1155,54 @@ static void process_errors(grpc_tcp* tcp) {
char rbuf [ cmsg_alloc_space ] ;
struct cmsghdr align ;
} aligned_buf ;
memset ( & aligned_buf , 0 , sizeof ( aligned_buf ) ) ;
msg . msg_control = aligned_buf . rbuf ;
msg . msg_controllen = sizeof ( aligned_buf . rbuf ) ;
int r , saved_errno ;
while ( true ) {
do {
r = recvmsg ( tcp - > fd , & msg , MSG_ERRQUEUE ) ;
saved_errno = errno ;
} while ( r < 0 & & saved_errno = = EINTR ) ;
if ( r = = - 1 & & saved_errno = = EAGAIN ) {
return ; /* No more errors to process */
return processed_err ; /* No more errors to process */
}
if ( r = = - 1 ) {
return ;
return processed_err ;
}
if ( ( msg . msg_flags & MSG_CTRUNC ) ! = 0 ) {
if ( GPR_UNLIKELY ( ( msg . msg_flags & MSG_CTRUNC ) ! = 0 ) ) {
gpr_log ( GPR_ERROR , " Error message was truncated. " ) ;
}
if ( msg . msg_controllen = = 0 ) {
/* There was no control message found. It was probably spurious. */
return ;
return processed_err ;
}
bool seen = false ;
for ( auto cmsg = CMSG_FIRSTHDR ( & msg ) ; cmsg & & cmsg - > cmsg_len ;
cmsg = CMSG_NXTHDR ( & msg , cmsg ) ) {
if ( cmsg - > cmsg_level ! = SOL_SOCKET | |
cmsg - > cmsg_type ! = SCM_TIMESTAMPING ) {
/* Got a control message that is not a timestamp. Don't know how to
* handle this . */
if ( CmsgIsZeroCopy ( * cmsg ) ) {
process_zerocopy ( tcp , cmsg ) ;
seen = true ;
processed_err = true ;
} else if ( cmsg - > cmsg_level = = SOL_SOCKET & &
cmsg - > cmsg_type = = SCM_TIMESTAMPING ) {
cmsg = process_timestamp ( tcp , & msg , cmsg ) ;
seen = true ;
processed_err = true ;
} else {
/* Got a control message that is not a timestamp or zerocopy. Don't know
* how to handle this . */
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_tcp_trace ) ) {
gpr_log ( GPR_INFO ,
" unknown control message cmsg_level:%d cmsg_type:%d " ,
cmsg - > cmsg_level , cmsg - > cmsg_type ) ;
}
return ;
return processed_err ;
}
cmsg = process_timestamp ( tcp , & msg , cmsg ) ;
seen = true ;
}
if ( ! seen ) {
return ;
return processed_err ;
}
}
}
@ -870,18 +1223,28 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
/* We are still interested in collecting timestamps, so let's try reading
* them . */
process_errors ( tcp ) ;
bool processed = process_errors ( tcp ) ;
/* This might not a timestamps error. Set the read and write closures to be
* ready . */
if ( ! processed ) {
grpc_fd_set_readable ( tcp - > em_fd ) ;
grpc_fd_set_writable ( tcp - > em_fd ) ;
}
grpc_fd_notify_on_error ( tcp - > em_fd , & tcp - > error_closure ) ;
}
# else /* GRPC_LINUX_ERRQUEUE */
static TcpZerocopySendRecord * tcp_get_send_zerocopy_record (
grpc_tcp * tcp , grpc_slice_buffer * buf ) {
return nullptr ;
}
static void ZerocopyDisableAndWaitForRemaining ( grpc_tcp * tcp ) { }
static bool tcp_write_with_timestamps ( grpc_tcp * /*tcp*/ , struct msghdr * /*msg*/ ,
size_t /*sending_length*/ ,
ssize_t * /*sent_length*/ ) {
ssize_t * /*sent_length*/ ,
int /*additional_flags*/ ) {
gpr_log ( GPR_ERROR , " Write with timestamps not supported for this platform " ) ;
GPR_ASSERT ( 0 ) ;
return false ;
@ -907,12 +1270,138 @@ void tcp_shutdown_buffer_list(grpc_tcp* tcp) {
}
}
/* returns true if done, false if pending; if returning true, *error is set */
# if defined(IOV_MAX) && IOV_MAX < 1000
# define MAX_WRITE_IOVEC IOV_MAX
# else
# define MAX_WRITE_IOVEC 1000
# endif
msg_iovlen_type TcpZerocopySendRecord : : PopulateIovs ( size_t * unwind_slice_idx ,
size_t * unwind_byte_idx ,
size_t * sending_length ,
iovec * iov ) {
msg_iovlen_type iov_size ;
* unwind_slice_idx = out_offset_ . slice_idx ;
* unwind_byte_idx = out_offset_ . byte_idx ;
for ( iov_size = 0 ;
out_offset_ . slice_idx ! = buf_ . count & & iov_size ! = MAX_WRITE_IOVEC ;
iov_size + + ) {
iov [ iov_size ] . iov_base =
GRPC_SLICE_START_PTR ( buf_ . slices [ out_offset_ . slice_idx ] ) +
out_offset_ . byte_idx ;
iov [ iov_size ] . iov_len =
GRPC_SLICE_LENGTH ( buf_ . slices [ out_offset_ . slice_idx ] ) -
out_offset_ . byte_idx ;
* sending_length + = iov [ iov_size ] . iov_len ;
+ + ( out_offset_ . slice_idx ) ;
out_offset_ . byte_idx = 0 ;
}
GPR_DEBUG_ASSERT ( iov_size > 0 ) ;
return iov_size ;
}
void TcpZerocopySendRecord : : UpdateOffsetForBytesSent ( size_t sending_length ,
size_t actually_sent ) {
size_t trailing = sending_length - actually_sent ;
while ( trailing > 0 ) {
size_t slice_length ;
out_offset_ . slice_idx - - ;
slice_length = GRPC_SLICE_LENGTH ( buf_ . slices [ out_offset_ . slice_idx ] ) ;
if ( slice_length > trailing ) {
out_offset_ . byte_idx = slice_length - trailing ;
break ;
} else {
trailing - = slice_length ;
}
}
}
// returns true if done, false if pending; if returning true, *error is set
static bool do_tcp_flush_zerocopy ( grpc_tcp * tcp , TcpZerocopySendRecord * record ,
grpc_error * * error ) {
struct msghdr msg ;
struct iovec iov [ MAX_WRITE_IOVEC ] ;
msg_iovlen_type iov_size ;
ssize_t sent_length = 0 ;
size_t sending_length ;
size_t unwind_slice_idx ;
size_t unwind_byte_idx ;
while ( true ) {
sending_length = 0 ;
iov_size = record - > PopulateIovs ( & unwind_slice_idx , & unwind_byte_idx ,
& sending_length , iov ) ;
msg . msg_name = nullptr ;
msg . msg_namelen = 0 ;
msg . msg_iov = iov ;
msg . msg_iovlen = iov_size ;
msg . msg_flags = 0 ;
bool tried_sending_message = false ;
// Before calling sendmsg (with or without timestamps): we
// take a single ref on the zerocopy send record.
tcp - > tcp_zerocopy_send_ctx . NoteSend ( record ) ;
if ( tcp - > outgoing_buffer_arg ! = nullptr ) {
if ( ! tcp - > ts_capable | |
! tcp_write_with_timestamps ( tcp , & msg , sending_length , & sent_length ,
MSG_ZEROCOPY ) ) {
/* We could not set socket options to collect Fathom timestamps.
* Fallback on writing without timestamps . */
tcp - > ts_capable = false ;
tcp_shutdown_buffer_list ( tcp ) ;
} else {
tried_sending_message = true ;
}
}
if ( ! tried_sending_message ) {
msg . msg_control = nullptr ;
msg . msg_controllen = 0 ;
GRPC_STATS_INC_TCP_WRITE_SIZE ( sending_length ) ;
GRPC_STATS_INC_TCP_WRITE_IOV_SIZE ( iov_size ) ;
sent_length = tcp_send ( tcp - > fd , & msg , MSG_ZEROCOPY ) ;
}
if ( sent_length < 0 ) {
// If this particular send failed, drop ref taken earlier in this method.
tcp - > tcp_zerocopy_send_ctx . UndoSend ( ) ;
if ( errno = = EAGAIN ) {
record - > UnwindIfThrottled ( unwind_slice_idx , unwind_byte_idx ) ;
return false ;
} else if ( errno = = EPIPE ) {
* error = tcp_annotate_error ( GRPC_OS_ERROR ( errno , " sendmsg " ) , tcp ) ;
tcp_shutdown_buffer_list ( tcp ) ;
return true ;
} else {
* error = tcp_annotate_error ( GRPC_OS_ERROR ( errno , " sendmsg " ) , tcp ) ;
tcp_shutdown_buffer_list ( tcp ) ;
return true ;
}
}
tcp - > bytes_counter + = sent_length ;
record - > UpdateOffsetForBytesSent ( sending_length ,
static_cast < size_t > ( sent_length ) ) ;
if ( record - > AllSlicesSent ( ) ) {
* error = GRPC_ERROR_NONE ;
return true ;
}
}
}
static void UnrefMaybePutZerocopySendRecord ( grpc_tcp * tcp ,
TcpZerocopySendRecord * record ,
uint32_t seq , const char * tag ) {
if ( record - > Unref ( ) ) {
tcp - > tcp_zerocopy_send_ctx . PutSendRecord ( record ) ;
}
}
static bool tcp_flush_zerocopy ( grpc_tcp * tcp , TcpZerocopySendRecord * record ,
grpc_error * * error ) {
bool done = do_tcp_flush_zerocopy ( tcp , record , error ) ;
if ( done ) {
// Either we encountered an error, or we successfully sent all the bytes.
// In either case, we're done with this record.
UnrefMaybePutZerocopySendRecord ( tcp , record , 0 , " flush_done " ) ;
}
return done ;
}
static bool tcp_flush ( grpc_tcp * tcp , grpc_error * * error ) {
struct msghdr msg ;
struct iovec iov [ MAX_WRITE_IOVEC ] ;
@ -927,7 +1416,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
// buffer as we write
size_t outgoing_slice_idx = 0 ;
for ( ; ; ) {
while ( true ) {
sending_length = 0 ;
unwind_slice_idx = outgoing_slice_idx ;
unwind_byte_idx = tcp - > outgoing_byte_idx ;
@ -1027,12 +1516,21 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
if ( error ! = GRPC_ERROR_NONE ) {
cb = tcp - > write_cb ;
tcp - > write_cb = nullptr ;
if ( tcp - > current_zerocopy_send ! = nullptr ) {
UnrefMaybePutZerocopySendRecord ( tcp , tcp - > current_zerocopy_send , 0 ,
" handle_write_err " ) ;
tcp - > current_zerocopy_send = nullptr ;
}
grpc_core : : Closure : : Run ( DEBUG_LOCATION , cb , GRPC_ERROR_REF ( error ) ) ;
TCP_UNREF ( tcp , " write " ) ;
return ;
}
if ( ! tcp_flush ( tcp , & error ) ) {
bool flush_result =
tcp - > current_zerocopy_send ! = nullptr
? tcp_flush_zerocopy ( tcp , tcp - > current_zerocopy_send , & error )
: tcp_flush ( tcp , & error ) ;
if ( ! flush_result ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_tcp_trace ) ) {
gpr_log ( GPR_INFO , " write: delayed " ) ;
}
@ -1042,6 +1540,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
} else {
cb = tcp - > write_cb ;
tcp - > write_cb = nullptr ;
tcp - > current_zerocopy_send = nullptr ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_tcp_trace ) ) {
const char * str = grpc_error_string ( error ) ;
gpr_log ( GPR_INFO , " write: %s " , str ) ;
@ -1057,6 +1556,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
GPR_TIMER_SCOPE ( " tcp_write " , 0 ) ;
grpc_tcp * tcp = reinterpret_cast < grpc_tcp * > ( ep ) ;
grpc_error * error = GRPC_ERROR_NONE ;
TcpZerocopySendRecord * zerocopy_send_record = nullptr ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_tcp_trace ) ) {
size_t i ;
@ -1073,8 +1573,8 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
}
GPR_ASSERT ( tcp - > write_cb = = nullptr ) ;
GPR_DEBUG_ASSERT ( tcp - > current_zerocopy_send = = nullptr ) ;
tcp - > outgoing_buffer_arg = arg ;
if ( buf - > length = = 0 ) {
grpc_core : : Closure : : Run (
DEBUG_LOCATION , cb ,
@ -1085,15 +1585,26 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
tcp_shutdown_buffer_list ( tcp ) ;
return ;
}
zerocopy_send_record = tcp_get_send_zerocopy_record ( tcp , buf ) ;
if ( zerocopy_send_record = = nullptr ) {
// Either not enough bytes, or couldn't allocate a zerocopy context.
tcp - > outgoing_buffer = buf ;
tcp - > outgoing_byte_idx = 0 ;
}
tcp - > outgoing_buffer_arg = arg ;
if ( arg ) {
GPR_ASSERT ( grpc_event_engine_can_track_errors ( ) ) ;
}
if ( ! tcp_flush ( tcp , & error ) ) {
bool flush_result =
zerocopy_send_record ! = nullptr
? tcp_flush_zerocopy ( tcp , zerocopy_send_record , & error )
: tcp_flush ( tcp , & error ) ;
if ( ! flush_result ) {
TCP_REF ( tcp , " write " ) ;
tcp - > write_cb = cb ;
tcp - > current_zerocopy_send = zerocopy_send_record ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_tcp_trace ) ) {
gpr_log ( GPR_INFO , " write: delayed " ) ;
}
@ -1121,6 +1632,7 @@ static void tcp_add_to_pollset_set(grpc_endpoint* ep,
static void tcp_delete_from_pollset_set ( grpc_endpoint * ep ,
grpc_pollset_set * pollset_set ) {
grpc_tcp * tcp = reinterpret_cast < grpc_tcp * > ( ep ) ;
ZerocopyDisableAndWaitForRemaining ( tcp ) ;
grpc_pollset_set_del_fd ( pollset_set , tcp - > em_fd ) ;
}
@ -1172,9 +1684,15 @@ static const grpc_endpoint_vtable vtable = {tcp_read,
grpc_endpoint * grpc_tcp_create ( grpc_fd * em_fd ,
const grpc_channel_args * channel_args ,
const char * peer_string ) {
static constexpr bool kZerocpTxEnabledDefault = false ;
int tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE ;
int tcp_max_read_chunk_size = 4 * 1024 * 1024 ;
int tcp_min_read_chunk_size = 256 ;
bool tcp_tx_zerocopy_enabled = kZerocpTxEnabledDefault ;
int tcp_tx_zerocopy_send_bytes_thresh =
grpc_core : : TcpZerocopySendCtx : : kDefaultSendBytesThreshold ;
int tcp_tx_zerocopy_max_simult_sends =
grpc_core : : TcpZerocopySendCtx : : kDefaultMaxSends ;
grpc_resource_quota * resource_quota = grpc_resource_quota_create ( nullptr ) ;
if ( channel_args ! = nullptr ) {
for ( size_t i = 0 ; i < channel_args - > num_args ; i + + ) {
@ -1199,6 +1717,23 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
resource_quota =
grpc_resource_quota_ref_internal ( static_cast < grpc_resource_quota * > (
channel_args - > args [ i ] . value . pointer . p ) ) ;
} else if ( 0 = = strcmp ( channel_args - > args [ i ] . key ,
GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED ) ) {
tcp_tx_zerocopy_enabled = grpc_channel_arg_get_bool (
& channel_args - > args [ i ] , kZerocpTxEnabledDefault ) ;
} else if ( 0 = = strcmp ( channel_args - > args [ i ] . key ,
GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD ) ) {
grpc_integer_options options = {
grpc_core : : TcpZerocopySendCtx : : kDefaultSendBytesThreshold , 0 ,
INT_MAX } ;
tcp_tx_zerocopy_send_bytes_thresh =
grpc_channel_arg_get_integer ( & channel_args - > args [ i ] , options ) ;
} else if ( 0 = = strcmp ( channel_args - > args [ i ] . key ,
GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS ) ) {
grpc_integer_options options = {
grpc_core : : TcpZerocopySendCtx : : kDefaultMaxSends , 0 , INT_MAX } ;
tcp_tx_zerocopy_max_simult_sends =
grpc_channel_arg_get_integer ( & channel_args - > args [ i ] , options ) ;
}
}
}
@ -1215,6 +1750,7 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp - > fd = grpc_fd_wrapped_fd ( em_fd ) ;
tcp - > read_cb = nullptr ;
tcp - > write_cb = nullptr ;
tcp - > current_zerocopy_send = nullptr ;
tcp - > release_fd_cb = nullptr ;
tcp - > release_fd = nullptr ;
tcp - > incoming_buffer = nullptr ;
@ -1228,6 +1764,20 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp - > socket_ts_enabled = false ;
tcp - > ts_capable = true ;
tcp - > outgoing_buffer_arg = nullptr ;
new ( & tcp - > tcp_zerocopy_send_ctx ) TcpZerocopySendCtx (
tcp_tx_zerocopy_max_simult_sends , tcp_tx_zerocopy_send_bytes_thresh ) ;
if ( tcp_tx_zerocopy_enabled & & ! tcp - > tcp_zerocopy_send_ctx . memory_limited ( ) ) {
# ifdef GRPC_LINUX_ERRQUEUE
const int enable = 1 ;
auto err =
setsockopt ( tcp - > fd , SOL_SOCKET , SO_ZEROCOPY , & enable , sizeof ( enable ) ) ;
if ( err = = 0 ) {
tcp - > tcp_zerocopy_send_ctx . set_enabled ( true ) ;
} else {
gpr_log ( GPR_ERROR , " Failed to set zerocopy options on the socket. " ) ;
}
# endif
}
/* paired with unref in grpc_tcp_destroy */
new ( & tcp - > refcount ) grpc_core : : RefCount ( 1 , & grpc_tcp_trace ) ;
gpr_atm_no_barrier_store ( & tcp - > shutdown_count , 0 ) ;
@ -1294,6 +1844,7 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
grpc_slice_buffer_reset_and_unref_internal ( & tcp - > last_read_buffer ) ;
if ( grpc_event_engine_can_track_errors ( ) ) {
/* Stop errors notification. */
ZerocopyDisableAndWaitForRemaining ( tcp ) ;
gpr_atm_no_barrier_store ( & tcp - > stop_error_notification , true ) ;
grpc_fd_set_error ( tcp - > em_fd ) ;
}