From e8a44e064ab716424567b07b69b3b7acb744385c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 20 Mar 2017 17:34:09 -0700 Subject: [PATCH] Dynamically adjust TCP read buffer size --- src/core/lib/iomgr/tcp_posix.c | 47 +++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index a4381f8fc96..9fba8b3d0e2 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -52,6 +52,7 @@ #include #include #include +#include #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/ev_posix.h" @@ -80,7 +81,8 @@ typedef struct { int fd; bool finished_edge; msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */ - size_t slice_size; + double target_length; + double bytes_read_this_round; gpr_refcount refcount; gpr_atm shutdown_count; @@ -108,6 +110,29 @@ typedef struct { grpc_resource_user_slice_allocator slice_allocator; } grpc_tcp; +static void add_to_estimate(grpc_tcp *tcp, size_t bytes) { + tcp->bytes_read_this_round += bytes; +} + +static void finish_estimate(grpc_tcp *tcp) { + if (tcp->bytes_read_this_round > tcp->target_length) { + tcp->target_length = 1.5 * tcp->bytes_read_this_round; + } else { + tcp->target_length = + 0.99 * tcp->target_length + 0.01 * tcp->bytes_read_this_round; + } + tcp->bytes_read_this_round = 0; +} + +static size_t get_target_read_size(grpc_tcp *tcp) { + double pressure = grpc_resource_quota_get_memory_pressure( + grpc_resource_user_quota(tcp->resource_user)); + double target = + tcp->target_length * (pressure > 0.8 ? (1.0 - pressure) / 0.2 : 1.0); + return (((size_t)GPR_CLAMP(target, 1024, 4 * 1024 * 1024)) + 255) & + ~(size_t)255; +} + static grpc_error *tcp_annotate_error(grpc_error *src_error, grpc_tcp *tcp) { return grpc_error_set_str( grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd), @@ -231,9 +256,7 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { /* NB: After calling call_read_cb a parallel call of the read handler may * be running. */ if (errno == EAGAIN) { - if (tcp->iov_size > 1) { - tcp->iov_size /= 2; - } + finish_estimate(tcp); /* We've consumed the edge, request a new one */ grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { @@ -250,14 +273,13 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { tcp_annotate_error(GRPC_ERROR_CREATE("Socket closed"), tcp)); TCP_UNREF(exec_ctx, tcp, "read"); } else { + add_to_estimate(tcp, (size_t)read_bytes); GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); if ((size_t)read_bytes < tcp->incoming_buffer->length) { grpc_slice_buffer_trim_end( tcp->incoming_buffer, tcp->incoming_buffer->length - (size_t)read_bytes, &tcp->last_read_buffer); - } else if (tcp->iov_size < MAX_READ_IOVEC) { - ++tcp->iov_size; } GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length); call_read_cb(exec_ctx, tcp, GRPC_ERROR_NONE); @@ -282,11 +304,11 @@ static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp, } static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { - if (tcp->incoming_buffer->count < (size_t)tcp->iov_size) { - grpc_resource_user_alloc_slices( - exec_ctx, &tcp->slice_allocator, tcp->slice_size, - (size_t)tcp->iov_size - tcp->incoming_buffer->count, - tcp->incoming_buffer); + size_t target_read_size = get_target_read_size(tcp); + if (tcp->incoming_buffer->length < target_read_size && + tcp->incoming_buffer->count < MAX_READ_IOVEC) { + grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator, + target_read_size, 1, tcp->incoming_buffer); } else { tcp_do_read(exec_ctx, tcp); } @@ -547,7 +569,8 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, tcp->release_fd_cb = NULL; tcp->release_fd = NULL; tcp->incoming_buffer = NULL; - tcp->slice_size = slice_size; + tcp->target_length = slice_size; + tcp->bytes_read_this_round = 0; tcp->iov_size = 1; tcp->finished_edge = true; /* paired with unref in grpc_tcp_destroy */