@ -45,6 +45,8 @@
# include <algorithm>
# include <unordered_map>
# include "absl/log/check.h"
# include <grpc/slice.h>
# include <grpc/support/alloc.h>
# include <grpc/support/log.h>
@ -158,7 +160,7 @@ class TcpZerocopySendRecord {
// sendmsg() failed or when tcp_write() is done.
bool Unref ( ) {
const intptr_t prior = ref_ . fetch_sub ( 1 , std : : memory_order_acq_rel ) ;
GPR_DEBUG_ASSERT ( prior > 0 ) ;
DCHECK_GT ( prior , 0 ) ;
if ( prior = = 1 ) {
AllSendsComplete ( ) ;
return true ;
@ -173,9 +175,9 @@ class TcpZerocopySendRecord {
} ;
void AssertEmpty ( ) {
GPR_DEBUG_ASSERT ( buf_ . count = = 0 ) ;
GPR_DEBUG_ASSERT ( buf_ . length = = 0 ) ;
GPR_DEBUG_ASSERT ( ref_ . load ( std : : memory_order_relaxed ) = = 0 ) ;
DCHECK_EQ ( buf_ . count , 0u ) ;
DCHECK_EQ ( buf_ . length , 0u ) ;
DCHECK_EQ ( ref_ . load ( std : : memory_order_relaxed ) , 0 ) ;
}
// When all sendmsg() calls associated with this tcp_write() have been
@ -183,7 +185,7 @@ class TcpZerocopySendRecord {
// for each sendmsg()) and all reference counts have been dropped, drop our
// reference to the underlying data since we no longer need it.
void AllSendsComplete ( ) {
GPR_DEBUG_ASSERT ( ref_ . load ( std : : memory_order_relaxed ) = = 0 ) ;
DCHECK_EQ ( ref_ . load ( std : : memory_order_relaxed ) , 0 ) ;
grpc_slice_buffer_reset_and_unref ( & buf_ ) ;
}
@ -262,7 +264,7 @@ class TcpZerocopySendCtx {
- - last_send_ ;
if ( ReleaseSendRecord ( last_send_ ) - > Unref ( ) ) {
// We should still be holding the ref taken by tcp_write().
GPR_DEBUG_ASSERT ( 0 ) ;
DCHECK ( 0 ) ;
}
}
@ -298,8 +300,8 @@ class TcpZerocopySendCtx {
// max_sends_ tcp_write() instances with zerocopy enabled in flight at the
// same time.
void PutSendRecord ( TcpZerocopySendRecord * record ) {
GPR_DEBUG_ASSERT ( record > = send_records_ & &
record < send_records_ + max_sends_ ) ;
DCHECK ( record > = send_records_ ) ;
DCHECK ( record < send_records_ + max_sends_ ) ;
MutexLock guard ( & lock_ ) ;
PutSendRecordLocked ( record ) ;
}
@ -318,7 +320,7 @@ class TcpZerocopySendCtx {
bool enabled ( ) const { return enabled_ ; }
void set_enabled ( bool enabled ) {
GPR_DEBUG_ASSERT ( ! enabled | | ! memory_limited ( ) ) ;
DCHECK ( ! enabled | | ! memory_limited ( ) ) ;
enabled_ = enabled ;
}
@ -356,7 +358,7 @@ class TcpZerocopySendCtx {
zcopy_enobuf_state_ = OMemState : : CHECK ;
return false ;
}
GPR_DEBUG_ASSERT ( zcopy_enobuf_state_ ! = OMemState : : CHECK ) ;
DCHECK ( zcopy_enobuf_state_ ! = OMemState : : CHECK ) ;
if ( zcopy_enobuf_state_ = = OMemState : : FULL ) {
// A previous sendmsg attempt was blocked by ENOBUFS. Return true to
// mark the fd as writable so the next write attempt could be made.
@ -430,7 +432,7 @@ class TcpZerocopySendCtx {
TcpZerocopySendRecord * ReleaseSendRecordLocked ( uint32_t seq ) {
auto iter = ctx_lookup_ . find ( seq ) ;
GPR_DEBUG_ASSERT ( iter ! = ctx_lookup_ . end ( ) ) ;
DCHECK ( iter ! = ctx_lookup_ . end ( ) ) ;
TcpZerocopySendRecord * record = iter - > second ;
ctx_lookup_ . erase ( iter ) ;
return record ;
@ -448,7 +450,7 @@ class TcpZerocopySendCtx {
}
void PutSendRecordLocked ( TcpZerocopySendRecord * record ) {
GPR_DEBUG_ASSERT ( free_send_records_size_ < max_sends_ ) ;
DCHECK ( free_send_records_size_ < max_sends_ ) ;
free_send_records_ [ free_send_records_size_ ] = record ;
free_send_records_size_ + + ;
}
@ -640,7 +642,7 @@ static void run_poller(void* bp, grpc_error_handle /*error_ignored*/) {
g_backup_poller_mu - > Lock ( ) ;
// last "uncovered" notification is the ref that keeps us polling
if ( g_uncovered_notifications_pending = = 1 ) {
GPR_ASSERT ( g_backup_poller = = p ) ;
CHECK ( g_backup_poller = = p ) ;
g_backup_poller = nullptr ;
g_uncovered_notifications_pending = 0 ;
g_backup_poller_mu - > Unlock ( ) ;
@ -668,7 +670,7 @@ static void drop_uncovered(grpc_tcp* /*tcp*/) {
p = g_backup_poller ;
old_count = g_uncovered_notifications_pending - - ;
g_backup_poller_mu - > Unlock ( ) ;
GPR_ASSERT ( old_count > 1 ) ;
CHECK_GT ( old_count , 1 ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_tcp_trace ) ) {
gpr_log ( GPR_INFO , " BACKUP_POLLER:%p uncover cnt %d->%d " , p , old_count ,
old_count - 1 ) ;
@ -949,8 +951,8 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
iov [ i ] . iov_len = GRPC_SLICE_LENGTH ( tcp - > incoming_buffer - > slices [ i ] ) ;
}
GPR_ASSERT ( tcp - > incoming_buffer - > length ! = 0 ) ;
GPR_DEBUG_ASSER T( tcp - > min_progress_size > 0 ) ;
CHECK_NE ( tcp - > incoming_buffer - > length , 0u ) ;
DCHECK_G T( tcp - > min_progress_size , 0 ) ;
do {
// Assume there is something on the queue. If we receive TCP_INQ from
@ -1018,12 +1020,12 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
grpc_core : : global_stats ( ) . IncrementTcpReadSize ( read_bytes ) ;
add_to_estimate ( tcp , static_cast < size_t > ( read_bytes ) ) ;
GPR_DEBUG_ASSERT ( ( size_t ) read_bytes < =
tcp - > incoming_buffer - > length - total_read_bytes ) ;
DCHECK ( ( size_t ) read_bytes < =
tcp - > incoming_buffer - > length - total_read_bytes ) ;
# ifdef GRPC_HAVE_TCP_INQ
if ( tcp - > inq_capable ) {
GPR_DEBUG_ASSERT ( ! ( msg . msg_flags & MSG_CTRUNC ) ) ;
DCHECK ( ! ( msg . msg_flags & MSG_CTRUNC ) ) ;
struct cmsghdr * cmsg = CMSG_FIRSTHDR ( & msg ) ;
for ( ; cmsg ! = nullptr ; cmsg = CMSG_NXTHDR ( & msg , cmsg ) ) {
if ( cmsg - > cmsg_level = = SOL_TCP & & cmsg - > cmsg_type = = TCP_CM_INQ & &
@ -1066,7 +1068,7 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
finish_estimate ( tcp ) ;
}
GPR_DEBUG_ASSERT ( total_read_bytes > 0 ) ;
DCHECK_GT ( total_read_bytes , 0u ) ;
* error = absl : : OkStatus ( ) ;
if ( grpc_core : : IsTcpFrameSizeTuningEnabled ( ) ) {
// Update min progress size based on the total number of bytes read in
@ -1186,7 +1188,7 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) {
static void tcp_read ( grpc_endpoint * ep , grpc_slice_buffer * incoming_buffer ,
grpc_closure * cb , bool urgent , int min_progress_size ) {
grpc_tcp * tcp = reinterpret_cast < grpc_tcp * > ( ep ) ;
GPR_ASSERT ( tcp - > read_cb = = nullptr ) ;
CHECK_EQ ( tcp - > read_cb , nullptr ) ;
tcp - > read_cb = cb ;
tcp - > read_mu . Lock ( ) ;
tcp - > incoming_buffer = incoming_buffer ;
@ -1267,8 +1269,8 @@ static TcpZerocopySendRecord* tcp_get_send_zerocopy_record(
}
if ( zerocopy_send_record ! = nullptr ) {
zerocopy_send_record - > PrepareForSends ( buf ) ;
GPR_DEBUG_ASSERT ( buf - > count = = 0 ) ;
GPR_DEBUG_ASSERT ( buf - > length = = 0 ) ;
DCHECK_EQ ( buf - > count , 0u ) ;
DCHECK_EQ ( buf - > length , 0u ) ;
tcp - > outgoing_byte_idx = 0 ;
tcp - > outgoing_buffer = nullptr ;
}
@ -1330,10 +1332,10 @@ static void UnrefMaybePutZerocopySendRecord(grpc_tcp* tcp,
uint32_t seq , const char * tag ) ;
// Reads \a cmsg to process zerocopy control messages.
static void process_zerocopy ( grpc_tcp * tcp , struct cmsghdr * cmsg ) {
GPR_DEBUG_ASSERT ( cmsg ) ;
DCHECK ( cmsg ) ;
auto serr = reinterpret_cast < struct sock_extended_err * > ( CMSG_DATA ( cmsg ) ) ;
GPR_DEBUG_ASSERT ( serr - > ee_errno = = 0 ) ;
GPR_DEBUG_ASSERT ( serr - > ee_origin = = SO_EE_ORIGIN_ZEROCOPY ) ;
DCHECK_EQ ( serr - > ee_errno , 0u ) ;
DCHECK ( serr - > ee_origin = = SO_EE_ORIGIN_ZEROCOPY ) ;
const uint32_t lo = serr - > ee_info ;
const uint32_t hi = serr - > ee_data ;
for ( uint32_t seq = lo ; seq < = hi ; + + seq ) {
@ -1343,7 +1345,7 @@ static void process_zerocopy(grpc_tcp* tcp, struct cmsghdr* cmsg) {
// both; if so, batch the unref/put.
TcpZerocopySendRecord * record =
tcp - > tcp_zerocopy_send_ctx . ReleaseSendRecord ( seq ) ;
GPR_DEBUG_ASSERT ( record ) ;
DCHECK ( record ) ;
UnrefMaybePutZerocopySendRecord ( tcp , record , seq , " CALLBACK RCVD " ) ;
}
if ( tcp - > tcp_zerocopy_send_ctx . UpdateZeroCopyOMemStateAfterFree ( ) ) {
@ -1538,14 +1540,14 @@ static bool tcp_write_with_timestamps(grpc_tcp* /*tcp*/, struct msghdr* /*msg*/,
int * /* saved_errno */ ,
int /*additional_flags*/ ) {
gpr_log ( GPR_ERROR , " Write with timestamps not supported for this platform " ) ;
GPR_ASSERT ( 0 ) ;
CHECK ( 0 ) ;
return false ;
}
static void tcp_handle_error ( void * /*arg*/ /* grpc_tcp */ ,
grpc_error_handle /*error*/ ) {
gpr_log ( GPR_ERROR , " Error handling is not supported for this platform " ) ;
GPR_ASSERT ( 0 ) ;
CHECK ( 0 ) ;
}
# endif // GRPC_LINUX_ERRQUEUE
@ -1584,7 +1586,7 @@ msg_iovlen_type TcpZerocopySendRecord::PopulateIovs(size_t* unwind_slice_idx,
+ + ( out_offset_ . slice_idx ) ;
out_offset_ . byte_idx = 0 ;
}
GPR_DEBUG_ASSERT ( iov_size > 0 ) ;
DCHECK_GT ( iov_size , 0u ) ;
return iov_size ;
}
@ -1739,7 +1741,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error_handle* error) {
outgoing_slice_idx + + ;
tcp - > outgoing_byte_idx = 0 ;
}
GPR_ASSERT ( iov_size > 0 ) ;
CHECK_GT ( iov_size , 0u ) ;
msg . msg_name = nullptr ;
msg . msg_namelen = 0 ;
@ -1792,7 +1794,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error_handle* error) {
}
}
GPR_ASSERT ( tcp - > outgoing_byte_idx = = 0 ) ;
CHECK_EQ ( tcp - > outgoing_byte_idx , 0u ) ;
grpc_core : : EventLog : : Append ( " tcp-write-outstanding " , - sent_length ) ;
tcp - > bytes_counter + = sent_length ;
trailing = sending_length - static_cast < size_t > ( sent_length ) ;
@ -1844,7 +1846,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */,
}
notify_on_write ( tcp ) ;
// tcp_flush does not populate error if it has returned false.
GPR_DEBUG_ASSERT ( error . ok ( ) ) ;
DCHECK ( error . ok ( ) ) ;
} else {
cb = tcp - > write_cb ;
tcp - > write_cb = nullptr ;
@ -1880,8 +1882,8 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
}
}
GPR_ASSERT ( tcp - > write_cb = = nullptr ) ;
GPR_DEBUG_ASSERT ( tcp - > current_zerocopy_send = = nullptr ) ;
CHECK_EQ ( tcp - > write_cb , nullptr ) ;
DCHECK_EQ ( tcp - > current_zerocopy_send , nullptr ) ;
if ( buf - > length = = 0 ) {
grpc_core : : Closure : : Run (
@ -1901,7 +1903,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
}
tcp - > outgoing_buffer_arg = arg ;
if ( arg ) {
GPR_ASSERT ( grpc_event_engine_can_track_errors ( ) ) ;
CHECK ( grpc_event_engine_can_track_errors ( ) ) ;
}
bool flush_result =
@ -1988,7 +1990,7 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp - > base . vtable = & vtable ;
tcp - > peer_string = std : : string ( peer_string ) ;
tcp - > fd = grpc_fd_wrapped_fd ( em_fd ) ;
GPR_ASSERT ( options . resource_quota ! = nullptr ) ;
CHECK ( options . resource_quota ! = nullptr ) ;
tcp - > memory_owner =
options . resource_quota - > memory_quota ( ) - > CreateMemoryOwner ( ) ;
tcp - > self_reservation = tcp - > memory_owner . MakeReservation ( sizeof ( grpc_tcp ) ) ;
@ -2079,7 +2081,7 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
int grpc_tcp_fd ( grpc_endpoint * ep ) {
grpc_tcp * tcp = reinterpret_cast < grpc_tcp * > ( ep ) ;
GPR_ASSERT ( ep - > vtable = = & vtable ) ;
CHECK ( ep - > vtable = = & vtable ) ;
return grpc_fd_wrapped_fd ( tcp - > em_fd ) ;
}
@ -2090,7 +2092,7 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
grpc_event_engine_endpoint_destroy_and_release_fd ( ep , fd , done ) ;
}
grpc_tcp * tcp = reinterpret_cast < grpc_tcp * > ( ep ) ;
GPR_ASSERT ( ep - > vtable = = & vtable ) ;
CHECK ( ep - > vtable = = & vtable ) ;
tcp - > release_fd = fd ;
tcp - > release_fd_cb = done ;
grpc_slice_buffer_reset_and_unref ( & tcp - > last_read_buffer ) ;