From 88a706eaacccedc73aa7c4bb4182636609a19673 Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Mon, 11 Apr 2022 13:32:19 -0700 Subject: [PATCH] Adjusting tcp min read chunk size growth to reduce wastage of incoming buffer size (#29334) * Adjusting tcp min read chunk size growth to reduce wastage of incoming buffer space * syncing tcp-posix to head * fix sanity checks * fix typo * remove un-necessary log * minor fix to handle tcp read error case * renaming stuff * changing to std::atomic * fix sanity checks --- src/core/lib/iomgr/tcp_posix.cc | 24 ++++++++++++++++++++++-- src/core/lib/iomgr/tcp_server_posix.cc | 11 ++++++++++- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 9b291433b70..35377c260d3 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -426,6 +426,9 @@ struct grpc_tcp { on errors anymore */ TcpZerocopySendCtx tcp_zerocopy_send_ctx; TcpZerocopySendRecord* current_zerocopy_send = nullptr; + + bool curr_read_completed; + int curr_min_read_chunk_size; }; struct backup_poller { @@ -776,10 +779,12 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) /* NB: After calling call_read_cb a parallel call of the read handler may * be running. */ if (errno == EAGAIN) { + tcp->curr_read_completed = true; finish_estimate(tcp); tcp->inq = 0; return false; } else { + tcp->curr_read_completed = false; grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer); *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp); return true; @@ -791,6 +796,7 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) * We may have read something, i.e., total_read_bytes > 0, but * since the connection is closed we will drop the data here, because we * can't call the callback multiple times. */ + tcp->curr_read_completed = true; grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer); *error = tcp_annotate_error( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp); @@ -847,6 +853,8 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) finish_estimate(tcp); } + // There may be more data to be read because recvmsg did not return EAGAIN. + tcp->curr_read_completed = false; GPR_DEBUG_ASSERT(total_read_bytes > 0); if (total_read_bytes < tcp->incoming_buffer->length) { grpc_slice_buffer_trim_end(tcp->incoming_buffer, @@ -871,11 +879,21 @@ static void maybe_make_read_slices(grpc_tcp* tcp) int target_length = static_cast(tcp->target_length); int extra_wanted = target_length - static_cast(tcp->incoming_buffer->length); + if (tcp->curr_read_completed) { + // Set it to false again to start the next block of reads + tcp->curr_read_completed = false; + // Reset curr_min_read_chunk_size for the next block of reads + tcp->curr_min_read_chunk_size = tcp->min_read_chunk_size; + } else { + // Last read is not completed yet. Double the last min read chunk size. + tcp->curr_min_read_chunk_size = + std::min(2 * tcp->curr_min_read_chunk_size, tcp->max_read_chunk_size); + } grpc_slice_buffer_add_indexed( tcp->incoming_buffer, tcp->memory_owner.MakeSlice(grpc_core::MemoryRequest( - tcp->min_read_chunk_size, - grpc_core::Clamp(extra_wanted, tcp->min_read_chunk_size, + tcp->curr_min_read_chunk_size, + grpc_core::Clamp(extra_wanted, tcp->curr_min_read_chunk_size, tcp->max_read_chunk_size)))); maybe_post_reclaimer(tcp); } @@ -1790,6 +1808,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->socket_ts_enabled = false; tcp->ts_capable = true; tcp->outgoing_buffer_arg = nullptr; + tcp->curr_read_completed = true; + tcp->curr_min_read_chunk_size = tcp->min_read_chunk_size; if (tcp_tx_zerocopy_enabled && !tcp->tcp_zerocopy_send_ctx.memory_limited()) { #ifdef GRPC_LINUX_ERRQUEUE const int enable = 1; diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index 580a1f2cff7..c40ddbf6467 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -29,6 +29,7 @@ #include #include +#include #include #include #include @@ -61,6 +62,8 @@ #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/resource_quota/api.h" +static std::atomic num_dropped_connections{0}; + static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete, const grpc_channel_args* args, grpc_tcp_server** server) { @@ -221,7 +224,13 @@ static void on_read(void* arg, grpc_error_handle err) { } if (sp->server->memory_quota->IsMemoryPressureHigh()) { - gpr_log(GPR_INFO, "Drop incoming connection: high memory pressure"); + int64_t dropped_connections_count = ++num_dropped_connections; + if (dropped_connections_count % 1000 == 0) { + gpr_log(GPR_INFO, + "Dropped >= %" PRId64 + " new connection attempts due to high memory pressure", + dropped_connections_count); + } close(fd); continue; }