From b2e52d6e240c57fb8ea804aec5f6dbd04130cbbe Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Thu, 12 Jan 2023 10:48:46 -0800 Subject: [PATCH] Fix bug in Rcv Lowat (#32081) * Fix bug in Rcv Lowat * fix * update --- src/core/lib/iomgr/tcp_posix.cc | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 623534ef0db..f427c2d15b9 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -835,17 +835,18 @@ static void update_rcvlowat(grpc_tcp* tcp) int remaining = std::min(static_cast(tcp->incoming_buffer->length), tcp->min_progress_size); + remaining = std::min(remaining, kRcvLowatMax); // Setting SO_RCVLOWAT for small quantities does not save on CPU. - if (remaining < kRcvLowatThreshold) { + if (remaining < 2 * kRcvLowatThreshold) { remaining = 0; } - // If zerocopy is off, wake shortly before the full RPC is here. More can - // show up partway through recvmsg() since it takes a while to copy data. - // So an early wakeup aids latency. - if (!tcp->tcp_zerocopy_send_ctx.enabled() && remaining > 0) { + // Decrement remaining by kRcvLowatThreshold. This would have the effect of + // waking up a little early. It would help with latency because some bytes + // may arrive while we execute the recvmsg syscall after waking up. + if (remaining > 0) { remaining -= kRcvLowatThreshold; } @@ -1118,9 +1119,11 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) { if (GPR_LIKELY(error.ok())) { maybe_make_read_slices(tcp); if (!tcp_do_read(tcp, &tcp_read_error)) { - // We've consumed the edge, request a new one + // Maybe update rcv lowat value based on the number of bytes read in this + // round. update_rcvlowat(tcp); tcp->read_mu.Unlock(); + // We've consumed the edge, request a new one notify_on_read(tcp); return; } @@ -1130,6 +1133,12 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) { grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer); } + // Update rcv lowat needs to be called at the end of the current read + // operation to ensure the right SO_RCVLOWAT value is set for the next read. + // Otherwise the next endpoint read operation may get stuck indefinitely + // because the previously set rcv lowat value will persist and the socket may + // erroneously considered to not be ready for read. + update_rcvlowat(tcp); grpc_closure* cb = tcp->read_cb; tcp->read_cb = nullptr; tcp->incoming_buffer = nullptr; @@ -1152,14 +1161,12 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer); TCP_REF(tcp, "read"); if (tcp->is_first_read) { - update_rcvlowat(tcp); tcp->read_mu.Unlock(); // Endpoint read called for the very first time. Register read callback with // the polling engine tcp->is_first_read = false; notify_on_read(tcp); } else if (!urgent && tcp->inq == 0) { - update_rcvlowat(tcp); tcp->read_mu.Unlock(); // Upper layer asked to read more but we know there is no pending data // to read from previous reads. So, wait for POLLIN.