|
|
|
@ -835,17 +835,18 @@ static void update_rcvlowat(grpc_tcp* tcp) |
|
|
|
|
|
|
|
|
|
int remaining = std::min(static_cast<int>(tcp->incoming_buffer->length), |
|
|
|
|
tcp->min_progress_size); |
|
|
|
|
|
|
|
|
|
remaining = std::min(remaining, kRcvLowatMax); |
|
|
|
|
|
|
|
|
|
// Setting SO_RCVLOWAT for small quantities does not save on CPU.
|
|
|
|
|
if (remaining < kRcvLowatThreshold) { |
|
|
|
|
if (remaining < 2 * kRcvLowatThreshold) { |
|
|
|
|
remaining = 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// If zerocopy is off, wake shortly before the full RPC is here. More can
|
|
|
|
|
// show up partway through recvmsg() since it takes a while to copy data.
|
|
|
|
|
// So an early wakeup aids latency.
|
|
|
|
|
if (!tcp->tcp_zerocopy_send_ctx.enabled() && remaining > 0) { |
|
|
|
|
// Decrement remaining by kRcvLowatThreshold. This would have the effect of
|
|
|
|
|
// waking up a little early. It would help with latency because some bytes
|
|
|
|
|
// may arrive while we execute the recvmsg syscall after waking up.
|
|
|
|
|
if (remaining > 0) { |
|
|
|
|
remaining -= kRcvLowatThreshold; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1118,9 +1119,11 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) { |
|
|
|
|
if (GPR_LIKELY(error.ok())) { |
|
|
|
|
maybe_make_read_slices(tcp); |
|
|
|
|
if (!tcp_do_read(tcp, &tcp_read_error)) { |
|
|
|
|
// We've consumed the edge, request a new one
|
|
|
|
|
// Maybe update rcv lowat value based on the number of bytes read in this
|
|
|
|
|
// round.
|
|
|
|
|
update_rcvlowat(tcp); |
|
|
|
|
tcp->read_mu.Unlock(); |
|
|
|
|
// We've consumed the edge, request a new one
|
|
|
|
|
notify_on_read(tcp); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -1130,6 +1133,12 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) { |
|
|
|
|
grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); |
|
|
|
|
grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer); |
|
|
|
|
} |
|
|
|
|
// Update rcv lowat needs to be called at the end of the current read
|
|
|
|
|
// operation to ensure the right SO_RCVLOWAT value is set for the next read.
|
|
|
|
|
// Otherwise the next endpoint read operation may get stuck indefinitely
|
|
|
|
|
// because the previously set rcv lowat value will persist and the socket may
|
|
|
|
|
// erroneously considered to not be ready for read.
|
|
|
|
|
update_rcvlowat(tcp); |
|
|
|
|
grpc_closure* cb = tcp->read_cb; |
|
|
|
|
tcp->read_cb = nullptr; |
|
|
|
|
tcp->incoming_buffer = nullptr; |
|
|
|
@ -1152,14 +1161,12 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, |
|
|
|
|
grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer); |
|
|
|
|
TCP_REF(tcp, "read"); |
|
|
|
|
if (tcp->is_first_read) { |
|
|
|
|
update_rcvlowat(tcp); |
|
|
|
|
tcp->read_mu.Unlock(); |
|
|
|
|
// Endpoint read called for the very first time. Register read callback with
|
|
|
|
|
// the polling engine
|
|
|
|
|
tcp->is_first_read = false; |
|
|
|
|
notify_on_read(tcp); |
|
|
|
|
} else if (!urgent && tcp->inq == 0) { |
|
|
|
|
update_rcvlowat(tcp); |
|
|
|
|
tcp->read_mu.Unlock(); |
|
|
|
|
// Upper layer asked to read more but we know there is no pending data
|
|
|
|
|
// to read from previous reads. So, wait for POLLIN.
|
|
|
|
|