[iomgr] SO_RCVLOWAT impl for grpc (#30736)

* [stats] Cleanup stats system

* clear out optionality

* fix

* might as well...

* Automated change: Fix sanity tests

* only allocate 8k or 64k blocks for tcp reads

* Automated change: Fix sanity tests

* fix

* Update tcp_posix.cc

* Automated change: Fix sanity tests

* [chttp2] Inform endpoint of min progress size

* respect target, min progress sizes

* cleaner loops

* clean out more unused stuff

* clean out more unused stuff

* Automated change: Fix sanity tests

* update

* [iomgr] SO_RCVLOWAT impl for tcp

* tsan race

* tsan race

* fix

* Automated change: Fix sanity tests

* Introduce flag

* protect with flag

* [experiments] Make output more diffable/readable

* Automated change: Fix sanity tests

* buildifier sized indentations

* add experiment

* Update tcp_posix.cc

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/30822/head
Craig Tiller 2 years ago committed by GitHub
parent 70dd34b438
commit 8b13c0d581
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      bazel/experiments.bzl
  2. 10
      src/core/lib/experiments/experiments.cc
  3. 3
      src/core/lib/experiments/experiments.h
  4. 11
      src/core/lib/experiments/experiments.yaml
  5. 52
      src/core/lib/iomgr/tcp_posix.cc

@ -19,10 +19,12 @@
EXPERIMENTS = {
"core_end2end_test": [
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
"tcp_read_chunks",
],
"endpoint_test": [
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
"tcp_read_chunks",
],
}

@ -29,12 +29,16 @@ const char* const description_tcp_frame_size_tuning =
const char* const description_tcp_read_chunks =
"Allocate only 8kb or 64kb chunks for TCP reads to reduce pressure on "
"malloc to recycle arbitrary large blocks.";
const char* const description_tcp_rcv_lowat =
"Use SO_RCVLOWAT to avoid wakeups on the read path.";
} // namespace
GPR_GLOBAL_CONFIG_DEFINE_BOOL(grpc_experimental_enable_tcp_frame_size_tuning,
false, description_tcp_frame_size_tuning);
GPR_GLOBAL_CONFIG_DEFINE_BOOL(grpc_experimental_enable_tcp_read_chunks, false,
description_tcp_read_chunks);
GPR_GLOBAL_CONFIG_DEFINE_BOOL(grpc_experimental_enable_tcp_rcv_lowat, false,
description_tcp_rcv_lowat);
namespace grpc_core {
@ -48,12 +52,18 @@ bool IsTcpReadChunksEnabled() {
GPR_GLOBAL_CONFIG_GET(grpc_experimental_enable_tcp_read_chunks);
return enabled;
}
bool IsTcpRcvLowatEnabled() {
static const bool enabled =
GPR_GLOBAL_CONFIG_GET(grpc_experimental_enable_tcp_rcv_lowat);
return enabled;
}
const ExperimentMetadata g_experiment_metadata[] = {
{"tcp_frame_size_tuning", description_tcp_frame_size_tuning, false,
IsTcpFrameSizeTuningEnabled},
{"tcp_read_chunks", description_tcp_read_chunks, false,
IsTcpReadChunksEnabled},
{"tcp_rcv_lowat", description_tcp_rcv_lowat, false, IsTcpRcvLowatEnabled},
};
} // namespace grpc_core

@ -25,6 +25,7 @@ namespace grpc_core {
bool IsTcpFrameSizeTuningEnabled();
bool IsTcpReadChunksEnabled();
bool IsTcpRcvLowatEnabled();
struct ExperimentMetadata {
const char* name;
@ -33,7 +34,7 @@ struct ExperimentMetadata {
bool (*is_enabled)();
};
constexpr const size_t kNumExperiments = 2;
constexpr const size_t kNumExperiments = 3;
extern const ExperimentMetadata g_experiment_metadata[kNumExperiments];
} // namespace grpc_core

@ -32,7 +32,7 @@
number of bytes have been read over the socket.
Buffers are also allocated according to estimated RPC sizes.
default: false
expiry: 2022/09/01
expiry: 2022/10/01
owner: ctiller@google.com
test_tags: ["endpoint_test", "core_end2end_test"]
- name: tcp_read_chunks
@ -40,6 +40,13 @@
Allocate only 8kb or 64kb chunks for TCP reads to reduce pressure on
malloc to recycle arbitrary large blocks.
default: false
expiry: 2022/09/01
expiry: 2022/10/01
owner: ctiller@google.com
test_tags: ["endpoint_test", "core_end2end_test"]
- name: tcp_rcv_lowat
description:
Use SO_RCVLOWAT to avoid wakeups on the read path.
default: false
expiry: 2022/10/01
owner: ctiller@google.com
test_tags: ["endpoint_test", "core_end2end_test"]

@ -485,6 +485,7 @@ struct grpc_tcp {
int min_read_chunk_size;
int max_read_chunk_size;
int set_rcvlowat = 0;
/* garbage after the last read */
grpc_slice_buffer last_read_buffer;
@ -823,6 +824,50 @@ static void tcp_trace_read(grpc_tcp* tcp, grpc_error_handle error)
}
}
static void update_rcvlowat(grpc_tcp* tcp)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) {
if (!grpc_core::IsTcpRcvLowatEnabled()) return;
// TODO(ctiller): Check if supported by OS.
// TODO(ctiller): Allow some adjustments instead of hardcoding things.
static constexpr int kRcvLowatMax = 16 * 1024 * 1024;
static constexpr int kRcvLowatThreshold = 16 * 1024;
int remaining = std::min(static_cast<int>(tcp->incoming_buffer->length),
tcp->min_progress_size);
remaining = std::min(remaining, kRcvLowatMax);
// Setting SO_RCVLOWAT for small quantities does not save on CPU.
if (remaining < kRcvLowatThreshold) {
remaining = 0;
}
// If zerocopy is off, wake shortly before the full RPC is here. More can
// show up partway through recvmsg() since it takes a while to copy data.
// So an early wakeup aids latency.
if (!tcp->tcp_zerocopy_send_ctx.enabled() && remaining > 0) {
remaining -= kRcvLowatThreshold;
}
// We still do not know the RPC size. Do not set SO_RCVLOWAT.
if (tcp->set_rcvlowat <= 1 && remaining <= 1) return;
// Previous value is still valid. No change needed in SO_RCVLOWAT.
if (tcp->set_rcvlowat == remaining) {
return;
}
if (setsockopt(tcp->fd, SOL_SOCKET, SO_RCVLOWAT, &remaining,
sizeof(remaining)) != 0) {
gpr_log(GPR_ERROR, "%s",
absl::StrCat("Cannot set SO_RCVLOWAT on fd=", tcp->fd,
" err=", strerror(errno))
.c_str());
return;
}
tcp->set_rcvlowat = remaining;
}
/* Returns true if data available to read or error other than EAGAIN. */
#define MAX_READ_IOVEC 64
static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
@ -1077,6 +1122,7 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) {
maybe_make_read_slices(tcp);
if (!tcp_do_read(tcp, &tcp_read_error)) {
/* We've consumed the edge, request a new one */
update_rcvlowat(tcp);
tcp->read_mu.Unlock();
notify_on_read(tcp);
return;
@ -1106,19 +1152,23 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
tcp->frame_size_tuning_enabled ? min_progress_size : 1;
grpc_slice_buffer_reset_and_unref_internal(incoming_buffer);
grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
tcp->read_mu.Unlock();
TCP_REF(tcp, "read");
if (tcp->is_first_read) {
update_rcvlowat(tcp);
tcp->read_mu.Unlock();
/* Endpoint read called for the very first time. Register read callback with
* the polling engine */
tcp->is_first_read = false;
notify_on_read(tcp);
} else if (!urgent && tcp->inq == 0) {
update_rcvlowat(tcp);
tcp->read_mu.Unlock();
/* Upper layer asked to read more but we know there is no pending data
* to read from previous reads. So, wait for POLLIN.
*/
notify_on_read(tcp);
} else {
tcp->read_mu.Unlock();
/* Not the first time. We may or may not have more bytes available. In any
* case call tcp->read_done_closure (i.e tcp_handle_read()) which does the
* right thing (i.e calls tcp_do_read() which either reads the available

Loading…
Cancel
Save