Revert "[experiments] Enable tcp_read_chunks for debug builds (#31374)" (#31417)

This reverts commit d05830cb02.
pull/31420/head
Craig Tiller 2 years ago committed by GitHub
parent 9098d8b8db
commit 987b50a258
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      bazel/experiments.bzl
  2. 25
      src/core/lib/event_engine/posix_engine/posix_endpoint.cc
  3. 3
      src/core/lib/event_engine/posix_engine/posix_endpoint.h
  4. 2
      src/core/lib/experiments/experiments.cc
  5. 2
      src/core/lib/experiments/experiments.yaml
  6. 29
      src/core/lib/iomgr/tcp_posix.cc

@ -21,15 +21,9 @@ EXPERIMENTS = {
"core_end2end_tests": [
"new_hpack_huffman_decoder",
],
"endpoint_test": [
"tcp_read_chunks",
],
"event_engine_client_test": [
"posix_event_engine_enable_polling",
],
"flow_control_test": [
"tcp_read_chunks",
],
"hpack_test": [
"new_hpack_huffman_decoder",
],
@ -41,6 +35,7 @@ EXPERIMENTS = {
"endpoint_test": [
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
"tcp_read_chunks",
],
"event_engine_client_test": [
"event_engine_client",
@ -50,6 +45,7 @@ EXPERIMENTS = {
"peer_state_based_framing",
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
"tcp_read_chunks",
],
"lame_client_test": [
"promise_based_client_call",

@ -314,6 +314,14 @@ bool PosixEndpointImpl::TcpDoRead(absl::Status& status) {
read_bytes = recvmsg(fd_, &msg, 0);
} while (read_bytes < 0 && errno == EINTR);
// We have read something in previous reads. We need to deliver those bytes
// to the upper layer.
if (read_bytes <= 0 &&
total_read_bytes >= static_cast<size_t>(min_progress_size_)) {
inq_ = 1;
break;
}
if (read_bytes < 0) {
// NB: After calling call_read_cb a parallel call of the read handler may
// be running.
@ -331,16 +339,12 @@ bool PosixEndpointImpl::TcpDoRead(absl::Status& status) {
return true;
}
}
// We have read something in previous reads. We need to deliver those bytes
// to the upper layer.
if (read_bytes <= 0 && total_read_bytes >= 1) {
inq_ = 1;
break;
}
if (read_bytes == 0) {
// 0 read size ==> end of stream
//
// We may have read something, i.e., total_read_bytes > 0, but since the
// connection is closed we will drop the data here, because we can't call
// the callback multiple times.
incoming_buffer_->Clear();
status = absl::InternalError("Socket closed");
return true;
@ -397,7 +401,7 @@ bool PosixEndpointImpl::TcpDoRead(absl::Status& status) {
GPR_DEBUG_ASSERT(total_read_bytes > 0);
status = absl::OkStatus();
if (grpc_core::IsTcpFrameSizeTuningEnabled()) {
if (frame_size_tuning_enabled_) {
// Update min progress size based on the total number of bytes read in
// this round.
min_progress_size_ -= total_read_bytes;
@ -578,7 +582,7 @@ void PosixEndpointImpl::Read(absl::AnyInvocable<void(absl::Status)> on_read,
incoming_buffer_->Clear();
incoming_buffer_->Swap(last_read_buffer_);
read_mu_.Unlock();
if (args != nullptr && grpc_core::IsTcpFrameSizeTuningEnabled()) {
if (args != nullptr && frame_size_tuning_enabled_) {
min_progress_size_ = args->read_hint_bytes;
} else {
min_progress_size_ = 1;
@ -1220,6 +1224,7 @@ PosixEndpointImpl::PosixEndpointImpl(EventHandle* handle,
tcp_zerocopy_send_ctx_ = std::make_unique<TcpZerocopySendCtx>(
zerocopy_enabled, options.tcp_tx_zerocopy_max_simultaneous_sends,
options.tcp_tx_zerocopy_send_bytes_threshold);
frame_size_tuning_enabled_ = grpc_core::IsTcpFrameSizeTuningEnabled();
#ifdef GRPC_HAVE_TCP_INQ
int one = 1;
if (setsockopt(fd_, SOL_TCP, TCP_INQ, &one, sizeof(one)) == 0) {

@ -576,6 +576,9 @@ class PosixEndpointImpl : public grpc_core::RefCounted<PosixEndpointImpl> {
std::atomic<bool> stop_error_notification_{false};
std::unique_ptr<TcpZerocopySendCtx> tcp_zerocopy_send_ctx_;
TcpZerocopySendRecord* current_zerocopy_send_ = nullptr;
// If true, the size of buffers alloted for tcp reads will be based on the
// specified min_progress_size values conveyed by the upper layers.
bool frame_size_tuning_enabled_ = false;
// A hint from upper layers specifying the minimum number of bytes that need
// to be read to make meaningful progress.
int min_progress_size_ = 1;

@ -64,7 +64,7 @@ namespace grpc_core {
const ExperimentMetadata g_experiment_metadata[] = {
{"tcp_frame_size_tuning", description_tcp_frame_size_tuning, false},
{"tcp_read_chunks", description_tcp_read_chunks, kDefaultForDebugOnly},
{"tcp_read_chunks", description_tcp_read_chunks, false},
{"tcp_rcv_lowat", description_tcp_rcv_lowat, false},
{"peer_state_based_framing", description_peer_state_based_framing, false},
{"flow_control_fixes", description_flow_control_fixes, false},

@ -50,7 +50,7 @@
description:
Allocate only 8kb or 64kb chunks for TCP reads to reduce pressure on
malloc to recycle arbitrary large blocks.
default: debug
default: false
expiry: 2023/01/01
owner: ctiller@google.com
test_tags: ["endpoint_test", "flow_control_test"]

@ -21,7 +21,6 @@
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/gprpp/global_config_generic.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_TCP
@ -543,6 +542,7 @@ struct grpc_tcp {
TcpZerocopySendCtx tcp_zerocopy_send_ctx;
TcpZerocopySendRecord* current_zerocopy_send = nullptr;
bool frame_size_tuning_enabled;
int min_progress_size; /* A hint from upper layers specifying the minimum
number of bytes that need to be read to make
meaningful progress */
@ -928,6 +928,14 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
read_bytes = recvmsg(tcp->fd, &msg, 0);
} while (read_bytes < 0 && errno == EINTR);
/* We have read something in previous reads. We need to deliver those
* bytes to the upper layer. */
if (read_bytes <= 0 &&
total_read_bytes >= static_cast<size_t>(tcp->min_progress_size)) {
tcp->inq = 1;
break;
}
if (read_bytes < 0) {
/* NB: After calling call_read_cb a parallel call of the read handler may
* be running. */
@ -944,16 +952,12 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
return true;
}
}
/* We have read something in previous reads. We need to deliver those
* bytes to the upper layer. */
if (read_bytes <= 0 && total_read_bytes >= 1) {
tcp->inq = 1;
break;
}
if (read_bytes == 0) {
/* 0 read size ==> end of stream */
/* 0 read size ==> end of stream
*
* We may have read something, i.e., total_read_bytes > 0, but
* since the connection is closed we will drop the data here, because we
* can't call the callback multiple times. */
grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
*error = tcp_annotate_error(GRPC_ERROR_CREATE("Socket closed"), tcp);
return true;
@ -1011,7 +1015,7 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
GPR_DEBUG_ASSERT(total_read_bytes > 0);
*error = absl::OkStatus();
if (grpc_core::IsTcpFrameSizeTuningEnabled()) {
if (tcp->frame_size_tuning_enabled) {
// Update min progress size based on the total number of bytes read in
// this round.
tcp->min_progress_size -= total_read_bytes;
@ -1149,7 +1153,7 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
tcp->read_mu.Lock();
tcp->incoming_buffer = incoming_buffer;
tcp->min_progress_size =
grpc_core::IsTcpFrameSizeTuningEnabled() ? min_progress_size : 1;
tcp->frame_size_tuning_enabled ? min_progress_size : 1;
grpc_slice_buffer_reset_and_unref(incoming_buffer);
grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
TCP_REF(tcp, "read");
@ -1983,6 +1987,7 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp->socket_ts_enabled = false;
tcp->ts_capable = true;
tcp->outgoing_buffer_arg = nullptr;
tcp->frame_size_tuning_enabled = grpc_core::IsTcpFrameSizeTuningEnabled();
tcp->min_progress_size = 1;
if (options.tcp_tx_zero_copy_enabled &&
!tcp->tcp_zerocopy_send_ctx.memory_limited()) {

Loading…
Cancel
Save