[tcp] Remove read-chunks experiment (#32126)

* [tcp] Remove read-chunks experiment

* Automated change: Fix sanity tests

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/32131/head
Craig Tiller 2 years ago committed by GitHub
parent 829eb8f98a
commit 356e3fd775
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      bazel/experiments.bzl
  2. 1
      src/core/BUILD
  3. 74
      src/core/lib/event_engine/posix_engine/posix_endpoint.cc
  4. 4
      src/core/lib/experiments/experiments.cc
  5. 25
      src/core/lib/experiments/experiments.h
  6. 8
      src/core/lib/experiments/experiments.yaml
  7. 86
      src/core/lib/iomgr/tcp_posix.cc

@ -48,12 +48,8 @@ EXPERIMENTS = {
"core_end2end_tests": [
"new_hpack_huffman_decoder",
],
"endpoint_test": [
"tcp_read_chunks",
],
"flow_control_test": [
"flow_control_fixes",
"tcp_read_chunks",
],
"hpack_test": [
"new_hpack_huffman_decoder",

@ -1738,7 +1738,6 @@ grpc_cc_library(
"status_helper",
"strerror",
"time",
"useful",
"//:event_engine_base_hdrs",
"//:gpr",
"//:grpc_public_hdrs",

@ -34,7 +34,6 @@
#include "absl/types/optional.h"
#include <grpc/event_engine/internal/slice_cast.h>
#include <grpc/event_engine/memory_request.h>
#include <grpc/event_engine/slice.h>
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/status.h>
@ -45,7 +44,6 @@
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/load_file.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/status_helper.h"
@ -516,55 +514,35 @@ void PosixEndpointImpl::UpdateRcvLowat() {
}
void PosixEndpointImpl::MaybeMakeReadSlices() {
if (grpc_core::IsTcpReadChunksEnabled()) {
static const int kBigAlloc = 64 * 1024;
static const int kSmallAlloc = 8 * 1024;
if (incoming_buffer_->Length() < static_cast<size_t>(min_progress_size_)) {
size_t allocate_length = min_progress_size_;
const size_t target_length = static_cast<size_t>(target_length_);
// If memory pressure is low and we think there will be more than
// min_progress_size bytes to read, allocate a bit more.
const bool low_memory_pressure =
memory_owner_.GetPressureInfo().pressure_control_value < 0.8;
if (low_memory_pressure && target_length > allocate_length) {
allocate_length = target_length;
static const int kBigAlloc = 64 * 1024;
static const int kSmallAlloc = 8 * 1024;
if (incoming_buffer_->Length() < static_cast<size_t>(min_progress_size_)) {
size_t allocate_length = min_progress_size_;
const size_t target_length = static_cast<size_t>(target_length_);
// If memory pressure is low and we think there will be more than
// min_progress_size bytes to read, allocate a bit more.
const bool low_memory_pressure =
memory_owner_.GetPressureInfo().pressure_control_value < 0.8;
if (low_memory_pressure && target_length > allocate_length) {
allocate_length = target_length;
}
int extra_wanted =
allocate_length - static_cast<int>(incoming_buffer_->Length());
if (extra_wanted >=
(low_memory_pressure ? kSmallAlloc * 3 / 2 : kBigAlloc)) {
while (extra_wanted > 0) {
extra_wanted -= kBigAlloc;
incoming_buffer_->AppendIndexed(
Slice(memory_owner_.MakeSlice(kBigAlloc)));
}
int extra_wanted =
allocate_length - static_cast<int>(incoming_buffer_->Length());
if (extra_wanted >=
(low_memory_pressure ? kSmallAlloc * 3 / 2 : kBigAlloc)) {
while (extra_wanted > 0) {
extra_wanted -= kBigAlloc;
incoming_buffer_->AppendIndexed(
Slice(memory_owner_.MakeSlice(kBigAlloc)));
}
} else {
while (extra_wanted > 0) {
extra_wanted -= kSmallAlloc;
incoming_buffer_->AppendIndexed(
Slice(memory_owner_.MakeSlice(kSmallAlloc)));
}
} else {
while (extra_wanted > 0) {
extra_wanted -= kSmallAlloc;
incoming_buffer_->AppendIndexed(
Slice(memory_owner_.MakeSlice(kSmallAlloc)));
}
MaybePostReclaimer();
}
} else {
if (incoming_buffer_->Length() < static_cast<size_t>(min_progress_size_) &&
incoming_buffer_->Count() < MAX_READ_IOVEC) {
int target_length =
std::max(static_cast<int>(target_length_), min_progress_size_);
int extra_wanted =
target_length - static_cast<int>(incoming_buffer_->Length());
int min_read_chunk_size =
std::max(min_read_chunk_size_, min_progress_size_);
int max_read_chunk_size =
std::max(max_read_chunk_size_, min_progress_size_);
incoming_buffer_->AppendIndexed(
Slice(memory_owner_.MakeSlice(grpc_core::MemoryRequest(
min_read_chunk_size,
grpc_core::Clamp(extra_wanted, min_read_chunk_size,
max_read_chunk_size)))));
MaybePostReclaimer();
}
MaybePostReclaimer();
}
}

@ -24,9 +24,6 @@ const char* const description_tcp_frame_size_tuning =
"would not indicate completion of a read operation until a specified "
"number of bytes have been read over the socket. Buffers are also "
"allocated according to estimated RPC sizes.";
const char* const description_tcp_read_chunks =
"Allocate only 8kb or 64kb chunks for TCP reads to reduce pressure on "
"malloc to recycle arbitrary large blocks.";
const char* const description_tcp_rcv_lowat =
"Use SO_RCVLOWAT to avoid wakeups on the read path.";
const char* const description_peer_state_based_framing =
@ -57,7 +54,6 @@ namespace grpc_core {
const ExperimentMetadata g_experiment_metadata[] = {
{"tcp_frame_size_tuning", description_tcp_frame_size_tuning, false},
{"tcp_read_chunks", description_tcp_read_chunks, true},
{"tcp_rcv_lowat", description_tcp_rcv_lowat, false},
{"peer_state_based_framing", description_peer_state_based_framing, false},
{"flow_control_fixes", description_flow_control_fixes, true},

@ -26,23 +26,20 @@
namespace grpc_core {
inline bool IsTcpFrameSizeTuningEnabled() { return IsExperimentEnabled(0); }
inline bool IsTcpReadChunksEnabled() { return IsExperimentEnabled(1); }
inline bool IsTcpRcvLowatEnabled() { return IsExperimentEnabled(2); }
inline bool IsPeerStateBasedFramingEnabled() { return IsExperimentEnabled(3); }
inline bool IsFlowControlFixesEnabled() { return IsExperimentEnabled(4); }
inline bool IsTcpRcvLowatEnabled() { return IsExperimentEnabled(1); }
inline bool IsPeerStateBasedFramingEnabled() { return IsExperimentEnabled(2); }
inline bool IsFlowControlFixesEnabled() { return IsExperimentEnabled(3); }
inline bool IsMemoryPressureControllerEnabled() {
return IsExperimentEnabled(5);
return IsExperimentEnabled(4);
}
inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() {
return IsExperimentEnabled(6);
}
inline bool IsNewHpackHuffmanDecoderEnabled() { return IsExperimentEnabled(7); }
inline bool IsEventEngineClientEnabled() { return IsExperimentEnabled(8); }
inline bool IsMonitoringExperimentEnabled() { return IsExperimentEnabled(9); }
inline bool IsPromiseBasedClientCallEnabled() {
return IsExperimentEnabled(10);
return IsExperimentEnabled(5);
}
inline bool IsFreeLargeAllocatorEnabled() { return IsExperimentEnabled(11); }
inline bool IsNewHpackHuffmanDecoderEnabled() { return IsExperimentEnabled(6); }
inline bool IsEventEngineClientEnabled() { return IsExperimentEnabled(7); }
inline bool IsMonitoringExperimentEnabled() { return IsExperimentEnabled(8); }
inline bool IsPromiseBasedClientCallEnabled() { return IsExperimentEnabled(9); }
inline bool IsFreeLargeAllocatorEnabled() { return IsExperimentEnabled(10); }
struct ExperimentMetadata {
const char* name;
@ -50,7 +47,7 @@ struct ExperimentMetadata {
bool default_value;
};
constexpr const size_t kNumExperiments = 12;
constexpr const size_t kNumExperiments = 11;
extern const ExperimentMetadata g_experiment_metadata[kNumExperiments];
} // namespace grpc_core

@ -46,14 +46,6 @@
expiry: 2023/01/01
owner: ctiller@google.com
test_tags: ["endpoint_test", "flow_control_test"]
- name: tcp_read_chunks
description:
Allocate only 8kb or 64kb chunks for TCP reads to reduce pressure on
malloc to recycle arbitrary large blocks.
default: true
expiry: 2023/01/01
owner: ctiller@google.com
test_tags: ["endpoint_test", "flow_control_test"]
- name: tcp_rcv_lowat
description:
Use SO_RCVLOWAT to avoid wakeups on the read path.

@ -1045,66 +1045,38 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
static void maybe_make_read_slices(grpc_tcp* tcp)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) {
if (grpc_core::IsTcpReadChunksEnabled()) {
static const int kBigAlloc = 64 * 1024;
static const int kSmallAlloc = 8 * 1024;
if (tcp->incoming_buffer->length <
static_cast<size_t>(tcp->min_progress_size)) {
size_t allocate_length = tcp->min_progress_size;
const size_t target_length = static_cast<size_t>(tcp->target_length);
// If memory pressure is low and we think there will be more than
// min_progress_size bytes to read, allocate a bit more.
const bool low_memory_pressure =
tcp->memory_owner.GetPressureInfo().pressure_control_value < 0.8;
if (low_memory_pressure && target_length > allocate_length) {
allocate_length = target_length;
}
int extra_wanted =
allocate_length - static_cast<int>(tcp->incoming_buffer->length);
if (extra_wanted >=
(low_memory_pressure ? kSmallAlloc * 3 / 2 : kBigAlloc)) {
while (extra_wanted > 0) {
extra_wanted -= kBigAlloc;
grpc_slice_buffer_add_indexed(tcp->incoming_buffer,
tcp->memory_owner.MakeSlice(kBigAlloc));
grpc_core::global_stats().IncrementTcpReadAlloc64k();
}
} else {
while (extra_wanted > 0) {
extra_wanted -= kSmallAlloc;
grpc_slice_buffer_add_indexed(
tcp->incoming_buffer, tcp->memory_owner.MakeSlice(kSmallAlloc));
grpc_core::global_stats().IncrementTcpReadAlloc8k();
}
}
maybe_post_reclaimer(tcp);
static const int kBigAlloc = 64 * 1024;
static const int kSmallAlloc = 8 * 1024;
if (tcp->incoming_buffer->length <
static_cast<size_t>(tcp->min_progress_size)) {
size_t allocate_length = tcp->min_progress_size;
const size_t target_length = static_cast<size_t>(tcp->target_length);
// If memory pressure is low and we think there will be more than
// min_progress_size bytes to read, allocate a bit more.
const bool low_memory_pressure =
tcp->memory_owner.GetPressureInfo().pressure_control_value < 0.8;
if (low_memory_pressure && target_length > allocate_length) {
allocate_length = target_length;
}
} else {
if (tcp->incoming_buffer->length <
static_cast<size_t>(tcp->min_progress_size) &&
tcp->incoming_buffer->count < MAX_READ_IOVEC) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO,
"TCP:%p alloc_slices; min_chunk=%d max_chunk=%d target=%lf "
"buf_len=%" PRIdPTR,
tcp, tcp->min_read_chunk_size, tcp->max_read_chunk_size,
tcp->target_length, tcp->incoming_buffer->length);
int extra_wanted =
allocate_length - static_cast<int>(tcp->incoming_buffer->length);
if (extra_wanted >=
(low_memory_pressure ? kSmallAlloc * 3 / 2 : kBigAlloc)) {
while (extra_wanted > 0) {
extra_wanted -= kBigAlloc;
grpc_slice_buffer_add_indexed(tcp->incoming_buffer,
tcp->memory_owner.MakeSlice(kBigAlloc));
grpc_core::global_stats().IncrementTcpReadAlloc64k();
}
} else {
while (extra_wanted > 0) {
extra_wanted -= kSmallAlloc;
grpc_slice_buffer_add_indexed(tcp->incoming_buffer,
tcp->memory_owner.MakeSlice(kSmallAlloc));
grpc_core::global_stats().IncrementTcpReadAlloc8k();
}
int target_length = std::max(static_cast<int>(tcp->target_length),
tcp->min_progress_size);
int extra_wanted =
target_length - static_cast<int>(tcp->incoming_buffer->length);
int min_read_chunk_size =
std::max(tcp->min_read_chunk_size, tcp->min_progress_size);
int max_read_chunk_size =
std::max(tcp->max_read_chunk_size, tcp->min_progress_size);
grpc_slice slice = tcp->memory_owner.MakeSlice(grpc_core::MemoryRequest(
min_read_chunk_size,
grpc_core::Clamp(extra_wanted, min_read_chunk_size,
max_read_chunk_size)));
grpc_slice_buffer_add_indexed(tcp->incoming_buffer, slice);
maybe_post_reclaimer(tcp);
}
maybe_post_reclaimer(tcp);
}
}

Loading…
Cancel
Save