[experiments] Reland enabling tcp_read_chunks for debug builds (#31420)

* Revert "Revert "[experiments] Enable tcp_read_chunks for debug builds (#31374)" (#31417)"

This reverts commit 987b50a258.

* Fix read failure

* fixes
pull/31419/head
Craig Tiller 2 years ago committed by GitHub
parent 5f6c357e74
commit 141bc3b634
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      bazel/experiments.bzl
  2. 1
      src/core/BUILD
  3. 50
      src/core/lib/event_engine/posix_engine/posix_endpoint.cc
  4. 3
      src/core/lib/event_engine/posix_engine/posix_endpoint.h
  5. 2
      src/core/lib/experiments/experiments.cc
  6. 2
      src/core/lib/experiments/experiments.yaml
  7. 56
      src/core/lib/iomgr/tcp_posix.cc
  8. 2
      test/core/transport/chttp2/graceful_shutdown_test.cc

@ -21,9 +21,15 @@ EXPERIMENTS = {
"core_end2end_tests": [
"new_hpack_huffman_decoder",
],
"endpoint_test": [
"tcp_read_chunks",
],
"event_engine_client_test": [
"posix_event_engine_enable_polling",
],
"flow_control_test": [
"tcp_read_chunks",
],
"hpack_test": [
"new_hpack_huffman_decoder",
],
@ -35,7 +41,6 @@ EXPERIMENTS = {
"endpoint_test": [
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
"tcp_read_chunks",
],
"event_engine_client_test": [
"event_engine_client",
@ -45,7 +50,6 @@ EXPERIMENTS = {
"peer_state_based_framing",
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
"tcp_read_chunks",
],
"lame_client_test": [
"promise_based_client_call",

@ -1601,6 +1601,7 @@ grpc_cc_library(
"ref_counted",
"resource_quota",
"slice",
"strerror",
"time",
"useful",
"//:event_engine_base_hdrs",

@ -44,6 +44,7 @@
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/load_file.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/strerror.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice.h"
@ -314,39 +315,33 @@ bool PosixEndpointImpl::TcpDoRead(absl::Status& status) {
read_bytes = recvmsg(fd_, &msg, 0);
} while (read_bytes < 0 && errno == EINTR);
if (read_bytes < 0 && errno == EAGAIN) {
// NB: After calling call_read_cb a parallel call of the read handler may
// be running.
if (total_read_bytes > 0) {
break;
}
FinishEstimate();
inq_ = 0;
return false;
}
// We have read something in previous reads. We need to deliver those bytes
// to the upper layer.
if (read_bytes <= 0 &&
total_read_bytes >= static_cast<size_t>(min_progress_size_)) {
if (read_bytes <= 0 && total_read_bytes >= 1) {
inq_ = 1;
break;
}
if (read_bytes < 0) {
// NB: After calling call_read_cb a parallel call of the read handler may
// be running.
if (errno == EAGAIN) {
if (total_read_bytes > 0) {
break;
}
FinishEstimate();
inq_ = 0;
return false;
} else {
incoming_buffer_->Clear();
status =
absl::InternalError(absl::StrCat("recvmsg:", std::strerror(errno)));
return true;
}
}
if (read_bytes == 0) {
if (read_bytes <= 0) {
// 0 read size ==> end of stream
//
// We may have read something, i.e., total_read_bytes > 0, but since the
// connection is closed we will drop the data here, because we can't call
// the callback multiple times.
incoming_buffer_->Clear();
status = absl::InternalError("Socket closed");
if (read_bytes == 0) {
status = absl::InternalError("Socket closed");
} else {
status = absl::InternalError(
absl::StrCat("recvmsg:", grpc_core::StrError(errno)));
}
return true;
}
@ -401,7 +396,7 @@ bool PosixEndpointImpl::TcpDoRead(absl::Status& status) {
GPR_DEBUG_ASSERT(total_read_bytes > 0);
status = absl::OkStatus();
if (frame_size_tuning_enabled_) {
if (grpc_core::IsTcpFrameSizeTuningEnabled()) {
// Update min progress size based on the total number of bytes read in
// this round.
min_progress_size_ -= total_read_bytes;
@ -582,7 +577,7 @@ void PosixEndpointImpl::Read(absl::AnyInvocable<void(absl::Status)> on_read,
incoming_buffer_->Clear();
incoming_buffer_->Swap(last_read_buffer_);
read_mu_.Unlock();
if (args != nullptr && frame_size_tuning_enabled_) {
if (args != nullptr && grpc_core::IsTcpFrameSizeTuningEnabled()) {
min_progress_size_ = args->read_hint_bytes;
} else {
min_progress_size_ = 1;
@ -1224,7 +1219,6 @@ PosixEndpointImpl::PosixEndpointImpl(EventHandle* handle,
tcp_zerocopy_send_ctx_ = std::make_unique<TcpZerocopySendCtx>(
zerocopy_enabled, options.tcp_tx_zerocopy_max_simultaneous_sends,
options.tcp_tx_zerocopy_send_bytes_threshold);
frame_size_tuning_enabled_ = grpc_core::IsTcpFrameSizeTuningEnabled();
#ifdef GRPC_HAVE_TCP_INQ
int one = 1;
if (setsockopt(fd_, SOL_TCP, TCP_INQ, &one, sizeof(one)) == 0) {

@ -576,9 +576,6 @@ class PosixEndpointImpl : public grpc_core::RefCounted<PosixEndpointImpl> {
std::atomic<bool> stop_error_notification_{false};
std::unique_ptr<TcpZerocopySendCtx> tcp_zerocopy_send_ctx_;
TcpZerocopySendRecord* current_zerocopy_send_ = nullptr;
// If true, the size of buffers alloted for tcp reads will be based on the
// specified min_progress_size values conveyed by the upper layers.
bool frame_size_tuning_enabled_ = false;
// A hint from upper layers specifying the minimum number of bytes that need
// to be read to make meaningful progress.
int min_progress_size_ = 1;

@ -64,7 +64,7 @@ namespace grpc_core {
const ExperimentMetadata g_experiment_metadata[] = {
{"tcp_frame_size_tuning", description_tcp_frame_size_tuning, false},
{"tcp_read_chunks", description_tcp_read_chunks, false},
{"tcp_read_chunks", description_tcp_read_chunks, kDefaultForDebugOnly},
{"tcp_rcv_lowat", description_tcp_rcv_lowat, false},
{"peer_state_based_framing", description_peer_state_based_framing, false},
{"flow_control_fixes", description_flow_control_fixes, false},

@ -50,7 +50,7 @@
description:
Allocate only 8kb or 64kb chunks for TCP reads to reduce pressure on
malloc to recycle arbitrary large blocks.
default: false
default: debug
expiry: 2023/01/01
owner: ctiller@google.com
test_tags: ["endpoint_test", "flow_control_test"]

@ -21,6 +21,7 @@
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/gprpp/global_config_generic.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_TCP
@ -542,7 +543,6 @@ struct grpc_tcp {
TcpZerocopySendCtx tcp_zerocopy_send_ctx;
TcpZerocopySendRecord* current_zerocopy_send = nullptr;
bool frame_size_tuning_enabled;
int min_progress_size; /* A hint from upper layers specifying the minimum
number of bytes that need to be read to make
meaningful progress */
@ -820,7 +820,7 @@ static void tcp_trace_read(grpc_tcp* tcp, grpc_error_handle error)
for (i = 0; i < tcp->incoming_buffer->count; i++) {
char* dump = grpc_dump_slice(tcp->incoming_buffer->slices[i],
GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "DATA: %s", dump);
gpr_log(GPR_DEBUG, "READ DATA: %s", dump);
gpr_free(dump);
}
}
@ -928,38 +928,35 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
read_bytes = recvmsg(tcp->fd, &msg, 0);
} while (read_bytes < 0 && errno == EINTR);
if (read_bytes < 0 && errno == EAGAIN) {
/* NB: After calling call_read_cb a parallel call of the read handler may
* be running. */
if (total_read_bytes > 0) {
break;
}
finish_estimate(tcp);
tcp->inq = 0;
return false;
}
/* We have read something in previous reads. We need to deliver those
* bytes to the upper layer. */
if (read_bytes <= 0 &&
total_read_bytes >= static_cast<size_t>(tcp->min_progress_size)) {
if (read_bytes <= 0 && total_read_bytes >= 1) {
tcp->inq = 1;
break;
}
if (read_bytes < 0) {
/* NB: After calling call_read_cb a parallel call of the read handler may
* be running. */
if (errno == EAGAIN) {
if (total_read_bytes > 0) {
break;
}
finish_estimate(tcp);
tcp->inq = 0;
return false;
if (read_bytes <= 0) {
/* 0 read size ==> end of stream */
grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
if (read_bytes == 0) {
*error = tcp_annotate_error(absl::InternalError("Socket closed"), tcp);
} else {
grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp);
return true;
*error =
tcp_annotate_error(absl::InternalError(absl::StrCat(
"recvmsg:", grpc_core::StrError(errno))),
tcp);
}
}
if (read_bytes == 0) {
/* 0 read size ==> end of stream
*
* We may have read something, i.e., total_read_bytes > 0, but
* since the connection is closed we will drop the data here, because we
* can't call the callback multiple times. */
grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
*error = tcp_annotate_error(GRPC_ERROR_CREATE("Socket closed"), tcp);
return true;
}
@ -1015,7 +1012,7 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
GPR_DEBUG_ASSERT(total_read_bytes > 0);
*error = absl::OkStatus();
if (tcp->frame_size_tuning_enabled) {
if (grpc_core::IsTcpFrameSizeTuningEnabled()) {
// Update min progress size based on the total number of bytes read in
// this round.
tcp->min_progress_size -= total_read_bytes;
@ -1153,7 +1150,7 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
tcp->read_mu.Lock();
tcp->incoming_buffer = incoming_buffer;
tcp->min_progress_size =
tcp->frame_size_tuning_enabled ? min_progress_size : 1;
grpc_core::IsTcpFrameSizeTuningEnabled() ? min_progress_size : 1;
grpc_slice_buffer_reset_and_unref(incoming_buffer);
grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
TCP_REF(tcp, "read");
@ -1844,7 +1841,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
char* data =
grpc_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "DATA: %s", data);
gpr_log(GPR_DEBUG, "WRITE DATA: %s", data);
gpr_free(data);
}
}
@ -1987,7 +1984,6 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp->socket_ts_enabled = false;
tcp->ts_capable = true;
tcp->outgoing_buffer_arg = nullptr;
tcp->frame_size_tuning_enabled = grpc_core::IsTcpFrameSizeTuningEnabled();
tcp->min_progress_size = 1;
if (options.tcp_tx_zero_copy_enabled &&
!tcp->tcp_zerocopy_send_ctx.memory_limited()) {

@ -176,9 +176,11 @@ class GracefulShutdownTest : public ::testing::Test {
// Waits for \a bytes to show up in read_bytes_
void WaitForReadBytes(absl::string_view bytes) {
std::atomic<bool> done{false};
auto start_time = absl::Now();
{
MutexLock lock(&mu_);
while (!absl::StrContains(read_bytes_, bytes)) {
ASSERT_LT(absl::Now() - start_time, absl::Seconds(60));
read_cv_.WaitWithTimeout(&mu_, absl::Seconds(5));
}
}

Loading…
Cancel
Save