@ -234,7 +234,11 @@ class TcpZerocopySendCtx {
// buffers that were sent with the corresponding call to sendmsg().
// buffers that were sent with the corresponding call to sendmsg().
void NoteSend ( TcpZerocopySendRecord * record ) {
void NoteSend ( TcpZerocopySendRecord * record ) {
record - > Ref ( ) ;
record - > Ref ( ) ;
AssociateSeqWithSendRecord ( last_send_ , record ) ;
{
MutexLock guard ( & lock_ ) ;
is_in_write_ = true ;
AssociateSeqWithSendRecordLocked ( last_send_ , record ) ;
}
+ + last_send_ ;
+ + last_send_ ;
}
}
@ -255,8 +259,8 @@ class TcpZerocopySendCtx {
// Simply associate this send record (and the underlying sent data buffers)
// Simply associate this send record (and the underlying sent data buffers)
// with the implicit sequence number for this zerocopy sendmsg().
// with the implicit sequence number for this zerocopy sendmsg().
void AssociateSeqWithSendRecord ( uint32_t seq , TcpZerocopySendRecord * record ) {
void AssociateSeqWithSendRecordLocke d ( uint32_t seq ,
MutexLock guard ( & lock_ ) ;
TcpZerocopySendRecord * record ) {
ctx_lookup_ . emplace ( seq , record ) ;
ctx_lookup_ . emplace ( seq , record ) ;
}
}
@ -314,7 +318,107 @@ class TcpZerocopySendCtx {
// zerocopy is not useful for small transfers.
// zerocopy is not useful for small transfers.
size_t threshold_bytes ( ) const { return threshold_bytes_ ; }
size_t threshold_bytes ( ) const { return threshold_bytes_ ; }
// Expected to be called by handler reading messages from the err queue.
// It is used to indicate that some OMem meory is now available. It returns
// true to tell the caller to mark the file descriptor as immediately
// writable.
//
// If a write is currently in progress on the socket (ie. we have issued a
// sendmsg() and are about to check its return value) then we set omem state
// to CHECK to make the sending thread know that some tcp_omem was
// concurrently freed even if sendmsg() returns ENOBUFS. In this case, since
// there is already an active send thread, we do not need to mark the
// socket writeable, so we return false.
//
// If there was no write in progress on the socket, and the socket was not
// marked as FULL, then we need not mark the socket writeable now that some
// tcp_omem memory is freed since it was not considered as blocked on
// tcp_omem to begin with. So in this case, return false.
//
// But, if a write was not in progress and the omem state was FULL, then we
// need to mark the socket writeable since it is no longer blocked by
// tcp_omem. In this case, return true.
//
// Please refer to the STATE TRANSITION DIAGRAM below for more details.
//
bool UpdateZeroCopyOMemStateAfterFree ( ) {
MutexLock guard ( & lock_ ) ;
if ( is_in_write_ ) {
zcopy_enobuf_state_ = OMemState : : CHECK ;
return false ;
}
GPR_DEBUG_ASSERT ( zcopy_enobuf_state_ ! = OMemState : : CHECK ) ;
if ( zcopy_enobuf_state_ = = OMemState : : FULL ) {
// A previous sendmsg attempt was blocked by ENOBUFS. Return true to
// mark the fd as writable so the next write attempt could be made.
zcopy_enobuf_state_ = OMemState : : OPEN ;
return true ;
} else if ( zcopy_enobuf_state_ = = OMemState : : OPEN ) {
// No need to mark the fd as writable because the previous write
// attempt did not encounter ENOBUFS.
return false ;
} else {
// This state should never be reached because it implies that the previous
// state was CHECK and is_in_write is false. This means that after the
// previous sendmsg returned and set is_in_write to false, it did
// not update the z-copy change from CHECK to OPEN.
GPR_ASSERT ( false & & " OMem state error! " ) ;
}
}
// Expected to be called by the thread calling sendmsg after the syscall
// invocation. is complete. If an ENOBUF is seen, it checks if the error
// handler (Tx0cp completions) has already run and free'ed up some OMem. It
// returns true indicating that the write can be attempted again immediately.
// If ENOBUFS was seen but no Tx0cp completions have been received between the
// sendmsg() and us taking this lock, then tcp_omem is still full from our
// point of view. Therefore, we do not signal that the socket is writeable
// with respect to the availability of tcp_omem. Therefore the function
// returns false. This indicates that another write should not be attempted
// immediately and the calling thread should wait until the socket is writable
// again. If ENOBUFS was not seen, then again return false because the next
// write should be attempted only when the socket is writable again.
//
// Please refer to the STATE TRANSITION DIAGRAM below for more details.
//
bool UpdateZeroCopyOMemStateAfterSend ( bool seen_enobuf ) {
MutexLock guard ( & lock_ ) ;
is_in_write_ = false ;
if ( seen_enobuf ) {
if ( zcopy_enobuf_state_ = = OMemState : : CHECK ) {
zcopy_enobuf_state_ = OMemState : : OPEN ;
return true ;
} else {
zcopy_enobuf_state_ = OMemState : : FULL ;
}
} else if ( zcopy_enobuf_state_ ! = OMemState : : OPEN ) {
zcopy_enobuf_state_ = OMemState : : OPEN ;
}
return false ;
}
private :
private :
// STATE TRANSITION DIAGRAM
//
// sendmsg succeeds Tx-zero copy succeeds and there is no active sendmsg
// ----<<--+ +------<<-------------------------------------+
// | | | |
// | | v sendmsg returns ENOBUFS |
// +-----> OPEN ------------->>-------------------------> FULL
// ^ |
// | |
// | sendmsg completes |
// +----<<---------- CHECK <-------<<-------------+
// Tx-zero copy succeeds and there is
// an active sendmsg
//
enum class OMemState : int8_t {
OPEN , // Everything is clear and omem is not full.
FULL , // The last sendmsg() has returned with an errno of ENOBUFS.
CHECK , // Error queue is read while is_in_write_ was true, so we should
// check this state after the sendmsg.
} ;
TcpZerocopySendRecord * ReleaseSendRecordLocked ( uint32_t seq ) {
TcpZerocopySendRecord * ReleaseSendRecordLocked ( uint32_t seq ) {
auto iter = ctx_lookup_ . find ( seq ) ;
auto iter = ctx_lookup_ . find ( seq ) ;
GPR_DEBUG_ASSERT ( iter ! = ctx_lookup_ . end ( ) ) ;
GPR_DEBUG_ASSERT ( iter ! = ctx_lookup_ . end ( ) ) ;
@ -351,6 +455,8 @@ class TcpZerocopySendCtx {
size_t threshold_bytes_ = kDefaultSendBytesThreshold ;
size_t threshold_bytes_ = kDefaultSendBytesThreshold ;
std : : unordered_map < uint32_t , TcpZerocopySendRecord * > ctx_lookup_ ;
std : : unordered_map < uint32_t , TcpZerocopySendRecord * > ctx_lookup_ ;
bool memory_limited_ = false ;
bool memory_limited_ = false ;
bool is_in_write_ = false ;
OMemState zcopy_enobuf_state_ ;
} ;
} ;
} // namespace grpc_core
} // namespace grpc_core
@ -1126,6 +1232,9 @@ static void process_zerocopy(grpc_tcp* tcp, struct cmsghdr* cmsg) {
GPR_DEBUG_ASSERT ( record ) ;
GPR_DEBUG_ASSERT ( record ) ;
UnrefMaybePutZerocopySendRecord ( tcp , record , seq , " CALLBACK RCVD " ) ;
UnrefMaybePutZerocopySendRecord ( tcp , record , seq , " CALLBACK RCVD " ) ;
}
}
if ( tcp - > tcp_zerocopy_send_ctx . UpdateZeroCopyOMemStateAfterFree ( ) ) {
grpc_fd_set_writable ( tcp - > em_fd ) ;
}
}
}
// Whether the cmsg received from error queue is of the IPv4 or IPv6 levels.
// Whether the cmsg received from error queue is of the IPv4 or IPv6 levels.
@ -1436,10 +1545,14 @@ static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record,
GRPC_STATS_INC_TCP_WRITE_IOV_SIZE ( iov_size ) ;
GRPC_STATS_INC_TCP_WRITE_IOV_SIZE ( iov_size ) ;
sent_length = tcp_send ( tcp - > fd , & msg , & saved_errno , MSG_ZEROCOPY ) ;
sent_length = tcp_send ( tcp - > fd , & msg , & saved_errno , MSG_ZEROCOPY ) ;
}
}
if ( tcp - > tcp_zerocopy_send_ctx . UpdateZeroCopyOMemStateAfterSend (
saved_errno = = ENOBUFS ) ) {
grpc_fd_set_writable ( tcp - > em_fd ) ;
}
if ( sent_length < 0 ) {
if ( sent_length < 0 ) {
// If this particular send failed, drop ref taken earlier in this method.
// If this particular send failed, drop ref taken earlier in this method.
tcp - > tcp_zerocopy_send_ctx . UndoSend ( ) ;
tcp - > tcp_zerocopy_send_ctx . UndoSend ( ) ;
if ( saved_errno = = EAGAIN ) {
if ( saved_errno = = EAGAIN | | saved_errno = = ENOBUFS ) {
record - > UnwindIfThrottled ( unwind_slice_idx , unwind_byte_idx ) ;
record - > UnwindIfThrottled ( unwind_slice_idx , unwind_byte_idx ) ;
return false ;
return false ;
} else if ( saved_errno = = EPIPE ) {
} else if ( saved_errno = = EPIPE ) {
@ -1547,7 +1660,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error_handle* error) {
}
}
if ( sent_length < 0 ) {
if ( sent_length < 0 ) {
if ( saved_errno = = EAGAIN ) {
if ( saved_errno = = EAGAIN | | saved_errno = = ENOBUFS ) {
tcp - > outgoing_byte_idx = unwind_byte_idx ;
tcp - > outgoing_byte_idx = unwind_byte_idx ;
// unref all and forget about all slices that have been written to this
// unref all and forget about all slices that have been written to this
// point
// point
@ -1609,7 +1722,6 @@ static void tcp_handle_write(void* arg /* grpc_tcp */,
TCP_UNREF ( tcp , " write " ) ;
TCP_UNREF ( tcp , " write " ) ;
return ;
return ;
}
}
bool flush_result =
bool flush_result =
tcp - > current_zerocopy_send ! = nullptr
tcp - > current_zerocopy_send ! = nullptr
? tcp_flush_zerocopy ( tcp , tcp - > current_zerocopy_send , & error )
? tcp_flush_zerocopy ( tcp , tcp - > current_zerocopy_send , & error )