Adjusting tcp min read chunk size growth to reduce wastage of incoming buffer size (#29334)

* Adjusting tcp min read chunk size growth to reduce wastage of incoming buffer space

* syncing tcp-posix to head

* fix sanity checks

* fix typo

* remove un-necessary log

* minor fix to handle tcp read error case

* renaming stuff

* changing to std::atomic

* fix sanity checks
pull/29374/head
Vignesh Babu 3 years ago committed by GitHub
parent d76397480c
commit 88a706eaac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      src/core/lib/iomgr/tcp_posix.cc
  2. 11
      src/core/lib/iomgr/tcp_server_posix.cc

@ -426,6 +426,9 @@ struct grpc_tcp {
on errors anymore */
TcpZerocopySendCtx tcp_zerocopy_send_ctx;
TcpZerocopySendRecord* current_zerocopy_send = nullptr;
bool curr_read_completed;
int curr_min_read_chunk_size;
};
struct backup_poller {
@ -776,10 +779,12 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
/* NB: After calling call_read_cb a parallel call of the read handler may
* be running. */
if (errno == EAGAIN) {
tcp->curr_read_completed = true;
finish_estimate(tcp);
tcp->inq = 0;
return false;
} else {
tcp->curr_read_completed = false;
grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer);
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp);
return true;
@ -791,6 +796,7 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
* We may have read something, i.e., total_read_bytes > 0, but
* since the connection is closed we will drop the data here, because we
* can't call the callback multiple times. */
tcp->curr_read_completed = true;
grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer);
*error = tcp_annotate_error(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp);
@ -847,6 +853,8 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
finish_estimate(tcp);
}
// There may be more data to be read because recvmsg did not return EAGAIN.
tcp->curr_read_completed = false;
GPR_DEBUG_ASSERT(total_read_bytes > 0);
if (total_read_bytes < tcp->incoming_buffer->length) {
grpc_slice_buffer_trim_end(tcp->incoming_buffer,
@ -871,11 +879,21 @@ static void maybe_make_read_slices(grpc_tcp* tcp)
int target_length = static_cast<int>(tcp->target_length);
int extra_wanted =
target_length - static_cast<int>(tcp->incoming_buffer->length);
if (tcp->curr_read_completed) {
// Set it to false again to start the next block of reads
tcp->curr_read_completed = false;
// Reset curr_min_read_chunk_size for the next block of reads
tcp->curr_min_read_chunk_size = tcp->min_read_chunk_size;
} else {
// Last read is not completed yet. Double the last min read chunk size.
tcp->curr_min_read_chunk_size =
std::min(2 * tcp->curr_min_read_chunk_size, tcp->max_read_chunk_size);
}
grpc_slice_buffer_add_indexed(
tcp->incoming_buffer,
tcp->memory_owner.MakeSlice(grpc_core::MemoryRequest(
tcp->min_read_chunk_size,
grpc_core::Clamp(extra_wanted, tcp->min_read_chunk_size,
tcp->curr_min_read_chunk_size,
grpc_core::Clamp(extra_wanted, tcp->curr_min_read_chunk_size,
tcp->max_read_chunk_size))));
maybe_post_reclaimer(tcp);
}
@ -1790,6 +1808,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp->socket_ts_enabled = false;
tcp->ts_capable = true;
tcp->outgoing_buffer_arg = nullptr;
tcp->curr_read_completed = true;
tcp->curr_min_read_chunk_size = tcp->min_read_chunk_size;
if (tcp_tx_zerocopy_enabled && !tcp->tcp_zerocopy_send_ctx.memory_limited()) {
#ifdef GRPC_LINUX_ERRQUEUE
const int enable = 1;

@ -29,6 +29,7 @@
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <string.h>
@ -61,6 +62,8 @@
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/resource_quota/api.h"
static std::atomic<int64_t> num_dropped_connections{0};
static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
const grpc_channel_args* args,
grpc_tcp_server** server) {
@ -221,7 +224,13 @@ static void on_read(void* arg, grpc_error_handle err) {
}
if (sp->server->memory_quota->IsMemoryPressureHigh()) {
gpr_log(GPR_INFO, "Drop incoming connection: high memory pressure");
int64_t dropped_connections_count = ++num_dropped_connections;
if (dropped_connections_count % 1000 == 0) {
gpr_log(GPR_INFO,
"Dropped >= %" PRId64
" new connection attempts due to high memory pressure",
dropped_connections_count);
}
close(fd);
continue;
}

Loading…
Cancel
Save