Add a flag to control usage of min_progress_size in TCP for receive buffer allocation (#29989)

* use max_frame_size to control encrypted frame sizes on the sender

* Add a flag to control usage of min_progress_size in TCP for receive buffer allocation

* revert changes to secure_endpoint and move them over to another PR

* add min_progress_size to tcp_posix_test

* update tcp_posix_test to assert fail on min_progerss_size and fix bug in min_progress_size handling in tcp_posix.cc

* fix corner case

* add a static cast
pull/30149/head^2
Vignesh Babu 3 years ago committed by GitHub
parent 9bc16ede7c
commit 1cd7371ac7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      src/core/lib/iomgr/iomgr.cc
  2. 71
      src/core/lib/iomgr/tcp_posix.cc
  3. 42
      test/core/iomgr/tcp_posix_test.cc

@ -45,6 +45,13 @@ GPR_GLOBAL_CONFIG_DEFINE_BOOL(grpc_abort_on_leaks, false,
"A debugging aid to cause a call to abort() when "
"gRPC objects are leaked past grpc_shutdown()");
GPR_GLOBAL_CONFIG_DEFINE_BOOL(
grpc_experimental_enable_tcp_frame_size_tuning, false,
"If set, enables TCP to use RPC size estimation made by higher layers. TCP "
"would not indicate completion of a read operation until a specified "
"number of bytes have been read over the socket. Buffers are also "
"allocated according to estimated RPC sizes.");
static gpr_mu g_mu;
static gpr_cv g_rcv;
static int g_shutdown;

@ -97,6 +97,8 @@ typedef size_t msg_iovlen_type;
extern grpc_core::TraceFlag grpc_tcp_trace;
GPR_GLOBAL_CONFIG_DECLARE_BOOL(grpc_experimental_enable_tcp_frame_size_tuning);
namespace grpc_core {
class TcpZerocopySendRecord {
@ -357,6 +359,13 @@ using grpc_core::TcpZerocopySendCtx;
using grpc_core::TcpZerocopySendRecord;
namespace {
bool ExperimentalTcpFrameSizeTuningEnabled() {
static const bool kEnableTcpFrameSizeTuning =
GPR_GLOBAL_CONFIG_GET(grpc_experimental_enable_tcp_frame_size_tuning);
return kEnableTcpFrameSizeTuning;
}
struct grpc_tcp {
grpc_tcp(int max_sends, size_t send_bytes_threshold)
: tcp_zerocopy_send_ctx(max_sends, send_bytes_threshold) {}
@ -426,6 +435,11 @@ struct grpc_tcp {
on errors anymore */
TcpZerocopySendCtx tcp_zerocopy_send_ctx;
TcpZerocopySendRecord* current_zerocopy_send = nullptr;
bool frame_size_tuning_enabled;
int min_progress_size; /* A hint from upper layers specifying the minimum
number of bytes that need to be read to make
meaningful progress */
};
struct backup_poller {
@ -736,6 +750,7 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
}
GPR_ASSERT(tcp->incoming_buffer->length != 0);
GPR_DEBUG_ASSERT(tcp->min_progress_size > 0);
do {
/* Assume there is something on the queue. If we receive TCP_INQ from
@ -767,7 +782,8 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
/* We have read something in previous reads. We need to deliver those
* bytes to the upper layer. */
if (read_bytes <= 0 && total_read_bytes > 0) {
if (read_bytes <= 0 &&
total_read_bytes >= static_cast<size_t>(tcp->min_progress_size)) {
tcp->inq = 1;
break;
}
@ -776,6 +792,9 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
/* NB: After calling call_read_cb a parallel call of the read handler may
* be running. */
if (errno == EAGAIN) {
if (total_read_bytes > 0) {
break;
}
finish_estimate(tcp);
tcp->inq = 0;
return false;
@ -848,18 +867,45 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
}
GPR_DEBUG_ASSERT(total_read_bytes > 0);
*error = GRPC_ERROR_NONE;
if (tcp->frame_size_tuning_enabled) {
// Update min progress size based on the total number of bytes read in
// this round.
tcp->min_progress_size -= total_read_bytes;
if (tcp->min_progress_size > 0) {
// There is still some bytes left to be read before we can signal
// the read as complete. Append the bytes read so far into
// last_read_buffer which serves as a staging buffer. Return false
// to indicate tcp_handle_read needs to be scheduled again.
grpc_slice_buffer_move_first(tcp->incoming_buffer, total_read_bytes,
&tcp->last_read_buffer);
return false;
} else {
// The required number of bytes have been read. Append the bytes
// read in this round into last_read_buffer. Then swap last_read_buffer
// and incoming_buffer. Now incoming buffer contains all the bytes
// read since the start of the last tcp_read operation. last_read_buffer
// would contain any spare space left in the incoming buffer. This
// space will be used in the next tcp_read operation.
tcp->min_progress_size = 1;
grpc_slice_buffer_move_first(tcp->incoming_buffer, total_read_bytes,
&tcp->last_read_buffer);
grpc_slice_buffer_swap(&tcp->last_read_buffer, tcp->incoming_buffer);
return true;
}
}
if (total_read_bytes < tcp->incoming_buffer->length) {
grpc_slice_buffer_trim_end(tcp->incoming_buffer,
tcp->incoming_buffer->length - total_read_bytes,
&tcp->last_read_buffer);
}
*error = GRPC_ERROR_NONE;
return true;
}
static void maybe_make_read_slices(grpc_tcp* tcp)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) {
if (tcp->incoming_buffer->length == 0 &&
if (tcp->incoming_buffer->length <
static_cast<size_t>(tcp->min_progress_size) &&
tcp->incoming_buffer->count < MAX_READ_IOVEC) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO,
@ -868,15 +914,20 @@ static void maybe_make_read_slices(grpc_tcp* tcp)
tcp, tcp->min_read_chunk_size, tcp->max_read_chunk_size,
tcp->target_length, tcp->incoming_buffer->length);
}
int target_length = static_cast<int>(tcp->target_length);
int target_length =
std::max(static_cast<int>(tcp->target_length), tcp->min_progress_size);
int extra_wanted =
target_length - static_cast<int>(tcp->incoming_buffer->length);
int min_read_chunk_size =
std::max(tcp->min_read_chunk_size, tcp->min_progress_size);
int max_read_chunk_size =
std::max(tcp->max_read_chunk_size, tcp->min_progress_size);
grpc_slice_buffer_add_indexed(
tcp->incoming_buffer,
tcp->memory_owner.MakeSlice(grpc_core::MemoryRequest(
tcp->min_read_chunk_size,
grpc_core::Clamp(extra_wanted, tcp->min_read_chunk_size,
tcp->max_read_chunk_size))));
min_read_chunk_size,
grpc_core::Clamp(extra_wanted, min_read_chunk_size,
max_read_chunk_size))));
maybe_post_reclaimer(tcp);
}
}
@ -912,12 +963,14 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) {
}
static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
grpc_closure* cb, bool urgent, int /*min_progress_size*/) {
grpc_closure* cb, bool urgent, int min_progress_size) {
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
GPR_ASSERT(tcp->read_cb == nullptr);
tcp->read_cb = cb;
tcp->read_mu.Lock();
tcp->incoming_buffer = incoming_buffer;
tcp->min_progress_size =
tcp->frame_size_tuning_enabled ? min_progress_size : 1;
grpc_slice_buffer_reset_and_unref_internal(incoming_buffer);
grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
tcp->read_mu.Unlock();
@ -1799,6 +1852,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp->socket_ts_enabled = false;
tcp->ts_capable = true;
tcp->outgoing_buffer_arg = nullptr;
tcp->frame_size_tuning_enabled = ExperimentalTcpFrameSizeTuningEnabled();
tcp->min_progress_size = 1;
if (tcp_tx_zerocopy_enabled && !tcp->tcp_zerocopy_send_ctx.memory_limited()) {
#ifdef GRPC_LINUX_ERRQUEUE
const int enable = 1;

@ -47,6 +47,8 @@
static gpr_mu* g_mu;
static grpc_pollset* g_pollset;
GPR_GLOBAL_CONFIG_DECLARE_BOOL(grpc_experimental_enable_tcp_frame_size_tuning);
/*
General test notes:
@ -151,6 +153,7 @@ static size_t fill_socket_partial(int fd, size_t bytes) {
struct read_socket_state {
grpc_endpoint* ep;
int min_progress_size;
size_t read_bytes;
size_t target_read_bytes;
grpc_slice_buffer incoming;
@ -183,6 +186,9 @@ static void read_cb(void* user_data, grpc_error_handle error) {
gpr_mu_lock(g_mu);
current_data = state->read_bytes % 256;
// The number of bytes read each time this callback is invoked must be >=
// the min_progress_size.
GPR_ASSERT(state->min_progress_size <= state->incoming.length);
read_bytes = count_slices(state->incoming.slices, state->incoming.count,
&current_data);
state->read_bytes += read_bytes;
@ -194,13 +200,15 @@ static void read_cb(void* user_data, grpc_error_handle error) {
gpr_mu_unlock(g_mu);
} else {
gpr_mu_unlock(g_mu);
state->min_progress_size = state->target_read_bytes - state->read_bytes;
grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb,
/*urgent=*/false, /*min_progress_size=*/1);
/*urgent=*/false, state->min_progress_size);
}
}
/* Write to a socket, then read from it using the grpc_tcp API. */
static void read_test(size_t num_bytes, size_t slice_size) {
static void read_test(size_t num_bytes, size_t slice_size,
int min_progress_size) {
int sv[2];
grpc_endpoint* ep;
struct read_socket_state state;
@ -233,11 +241,13 @@ static void read_test(size_t num_bytes, size_t slice_size) {
state.ep = ep;
state.read_bytes = 0;
state.target_read_bytes = written_bytes;
state.min_progress_size =
std::min(min_progress_size, static_cast<int>(written_bytes));
grpc_slice_buffer_init(&state.incoming);
GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false,
/*min_progress_size=*/1);
/*min_progress_size=*/state.min_progress_size);
gpr_mu_lock(g_mu);
while (state.read_bytes < state.target_read_bytes) {
@ -259,7 +269,7 @@ static void read_test(size_t num_bytes, size_t slice_size) {
/* Write to a socket until it fills up, then read from it using the grpc_tcp
API. */
static void large_read_test(size_t slice_size) {
static void large_read_test(size_t slice_size, int min_progress_size) {
int sv[2];
grpc_endpoint* ep;
struct read_socket_state state;
@ -291,11 +301,13 @@ static void large_read_test(size_t slice_size) {
state.ep = ep;
state.read_bytes = 0;
state.target_read_bytes = static_cast<size_t>(written_bytes);
state.min_progress_size =
std::min(min_progress_size, static_cast<int>(written_bytes));
grpc_slice_buffer_init(&state.incoming);
GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false,
/*min_progress_size=*/1);
/*min_progress_size=*/state.min_progress_size);
gpr_mu_lock(g_mu);
while (state.read_bytes < state.target_read_bytes) {
@ -544,11 +556,12 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
state.ep = ep;
state.read_bytes = 0;
state.target_read_bytes = written_bytes;
state.min_progress_size = 1;
grpc_slice_buffer_init(&state.incoming);
GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false,
/*min_progress_size=*/1);
/*min_progress_size=*/state.min_progress_size);
gpr_mu_lock(g_mu);
while (state.read_bytes < state.target_read_bytes) {
@ -589,14 +602,14 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
void run_tests(void) {
size_t i = 0;
read_test(100, 8192);
read_test(10000, 8192);
read_test(10000, 137);
read_test(10000, 1);
large_read_test(8192);
large_read_test(1);
for (int i = 1; i <= 8192; i = i * 2) {
read_test(100, 8192, i);
read_test(10000, 8192, i);
read_test(10000, 137, i);
read_test(10000, 1, i);
large_read_test(8192, i);
large_read_test(1, i);
}
write_test(100, 8192, false);
write_test(100, 1, false);
write_test(100000, 8192, false);
@ -658,6 +671,7 @@ static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
int main(int argc, char** argv) {
grpc_closure destroyed;
grpc::testing::TestEnvironment env(&argc, argv);
GPR_GLOBAL_CONFIG_SET(grpc_experimental_enable_tcp_frame_size_tuning, true);
grpc_init();
grpc_core::grpc_tcp_set_write_timestamps_callback(timestamps_verifier);
{

Loading…
Cancel
Save