From 141bc3b634c4cc80c69282dc1353f684134b0717 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 21 Oct 2022 08:20:19 -0700 Subject: [PATCH] [experiments] Reland enabling tcp_read_chunks for debug builds (#31420) * Revert "Revert "[experiments] Enable tcp_read_chunks for debug builds (#31374)" (#31417)" This reverts commit 987b50a25865c81ce8b4f0cabddca6f26f80739e. * Fix read failure * fixes --- bazel/experiments.bzl | 8 ++- src/core/BUILD | 1 + .../posix_engine/posix_endpoint.cc | 50 ++++++++--------- .../posix_engine/posix_endpoint.h | 3 - src/core/lib/experiments/experiments.cc | 2 +- src/core/lib/experiments/experiments.yaml | 2 +- src/core/lib/iomgr/tcp_posix.cc | 56 +++++++++---------- .../chttp2/graceful_shutdown_test.cc | 2 + 8 files changed, 59 insertions(+), 65 deletions(-) diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl index 196dc0e9d8f..502869db84e 100644 --- a/bazel/experiments.bzl +++ b/bazel/experiments.bzl @@ -21,9 +21,15 @@ EXPERIMENTS = { "core_end2end_tests": [ "new_hpack_huffman_decoder", ], + "endpoint_test": [ + "tcp_read_chunks", + ], "event_engine_client_test": [ "posix_event_engine_enable_polling", ], + "flow_control_test": [ + "tcp_read_chunks", + ], "hpack_test": [ "new_hpack_huffman_decoder", ], @@ -35,7 +41,6 @@ EXPERIMENTS = { "endpoint_test": [ "tcp_frame_size_tuning", "tcp_rcv_lowat", - "tcp_read_chunks", ], "event_engine_client_test": [ "event_engine_client", @@ -45,7 +50,6 @@ EXPERIMENTS = { "peer_state_based_framing", "tcp_frame_size_tuning", "tcp_rcv_lowat", - "tcp_read_chunks", ], "lame_client_test": [ "promise_based_client_call", diff --git a/src/core/BUILD b/src/core/BUILD index 3e5101239c5..6be69d5d123 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -1601,6 +1601,7 @@ grpc_cc_library( "ref_counted", "resource_quota", "slice", + "strerror", "time", "useful", "//:event_engine_base_hdrs", diff --git a/src/core/lib/event_engine/posix_engine/posix_endpoint.cc b/src/core/lib/event_engine/posix_engine/posix_endpoint.cc index fe6f0231dcb..6aa2f2d1196 100644 --- a/src/core/lib/event_engine/posix_engine/posix_endpoint.cc +++ b/src/core/lib/event_engine/posix_engine/posix_endpoint.cc @@ -44,6 +44,7 @@ #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/load_file.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/strerror.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/resource_quota/resource_quota.h" #include "src/core/lib/slice/slice.h" @@ -314,39 +315,33 @@ bool PosixEndpointImpl::TcpDoRead(absl::Status& status) { read_bytes = recvmsg(fd_, &msg, 0); } while (read_bytes < 0 && errno == EINTR); + if (read_bytes < 0 && errno == EAGAIN) { + // NB: After calling call_read_cb a parallel call of the read handler may + // be running. + if (total_read_bytes > 0) { + break; + } + FinishEstimate(); + inq_ = 0; + return false; + } + // We have read something in previous reads. We need to deliver those bytes // to the upper layer. - if (read_bytes <= 0 && - total_read_bytes >= static_cast(min_progress_size_)) { + if (read_bytes <= 0 && total_read_bytes >= 1) { inq_ = 1; break; } - if (read_bytes < 0) { - // NB: After calling call_read_cb a parallel call of the read handler may - // be running. - if (errno == EAGAIN) { - if (total_read_bytes > 0) { - break; - } - FinishEstimate(); - inq_ = 0; - return false; - } else { - incoming_buffer_->Clear(); - status = - absl::InternalError(absl::StrCat("recvmsg:", std::strerror(errno))); - return true; - } - } - if (read_bytes == 0) { + if (read_bytes <= 0) { // 0 read size ==> end of stream - // - // We may have read something, i.e., total_read_bytes > 0, but since the - // connection is closed we will drop the data here, because we can't call - // the callback multiple times. incoming_buffer_->Clear(); - status = absl::InternalError("Socket closed"); + if (read_bytes == 0) { + status = absl::InternalError("Socket closed"); + } else { + status = absl::InternalError( + absl::StrCat("recvmsg:", grpc_core::StrError(errno))); + } return true; } @@ -401,7 +396,7 @@ bool PosixEndpointImpl::TcpDoRead(absl::Status& status) { GPR_DEBUG_ASSERT(total_read_bytes > 0); status = absl::OkStatus(); - if (frame_size_tuning_enabled_) { + if (grpc_core::IsTcpFrameSizeTuningEnabled()) { // Update min progress size based on the total number of bytes read in // this round. min_progress_size_ -= total_read_bytes; @@ -582,7 +577,7 @@ void PosixEndpointImpl::Read(absl::AnyInvocable on_read, incoming_buffer_->Clear(); incoming_buffer_->Swap(last_read_buffer_); read_mu_.Unlock(); - if (args != nullptr && frame_size_tuning_enabled_) { + if (args != nullptr && grpc_core::IsTcpFrameSizeTuningEnabled()) { min_progress_size_ = args->read_hint_bytes; } else { min_progress_size_ = 1; @@ -1224,7 +1219,6 @@ PosixEndpointImpl::PosixEndpointImpl(EventHandle* handle, tcp_zerocopy_send_ctx_ = std::make_unique( zerocopy_enabled, options.tcp_tx_zerocopy_max_simultaneous_sends, options.tcp_tx_zerocopy_send_bytes_threshold); - frame_size_tuning_enabled_ = grpc_core::IsTcpFrameSizeTuningEnabled(); #ifdef GRPC_HAVE_TCP_INQ int one = 1; if (setsockopt(fd_, SOL_TCP, TCP_INQ, &one, sizeof(one)) == 0) { diff --git a/src/core/lib/event_engine/posix_engine/posix_endpoint.h b/src/core/lib/event_engine/posix_engine/posix_endpoint.h index e783b816736..dd993805e98 100644 --- a/src/core/lib/event_engine/posix_engine/posix_endpoint.h +++ b/src/core/lib/event_engine/posix_engine/posix_endpoint.h @@ -576,9 +576,6 @@ class PosixEndpointImpl : public grpc_core::RefCounted { std::atomic stop_error_notification_{false}; std::unique_ptr tcp_zerocopy_send_ctx_; TcpZerocopySendRecord* current_zerocopy_send_ = nullptr; - // If true, the size of buffers alloted for tcp reads will be based on the - // specified min_progress_size values conveyed by the upper layers. - bool frame_size_tuning_enabled_ = false; // A hint from upper layers specifying the minimum number of bytes that need // to be read to make meaningful progress. int min_progress_size_ = 1; diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index c292c08ad9c..86cd52de0b6 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -64,7 +64,7 @@ namespace grpc_core { const ExperimentMetadata g_experiment_metadata[] = { {"tcp_frame_size_tuning", description_tcp_frame_size_tuning, false}, - {"tcp_read_chunks", description_tcp_read_chunks, false}, + {"tcp_read_chunks", description_tcp_read_chunks, kDefaultForDebugOnly}, {"tcp_rcv_lowat", description_tcp_rcv_lowat, false}, {"peer_state_based_framing", description_peer_state_based_framing, false}, {"flow_control_fixes", description_flow_control_fixes, false}, diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index db3d9703bd3..ad959e4a727 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -50,7 +50,7 @@ description: Allocate only 8kb or 64kb chunks for TCP reads to reduce pressure on malloc to recycle arbitrary large blocks. - default: false + default: debug expiry: 2023/01/01 owner: ctiller@google.com test_tags: ["endpoint_test", "flow_control_test"] diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 05d40f65297..d19aba0a1dc 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -21,6 +21,7 @@ #include #include "src/core/lib/gprpp/global_config_generic.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/port.h" #ifdef GRPC_POSIX_SOCKET_TCP @@ -542,7 +543,6 @@ struct grpc_tcp { TcpZerocopySendCtx tcp_zerocopy_send_ctx; TcpZerocopySendRecord* current_zerocopy_send = nullptr; - bool frame_size_tuning_enabled; int min_progress_size; /* A hint from upper layers specifying the minimum number of bytes that need to be read to make meaningful progress */ @@ -820,7 +820,7 @@ static void tcp_trace_read(grpc_tcp* tcp, grpc_error_handle error) for (i = 0; i < tcp->incoming_buffer->count; i++) { char* dump = grpc_dump_slice(tcp->incoming_buffer->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); - gpr_log(GPR_DEBUG, "DATA: %s", dump); + gpr_log(GPR_DEBUG, "READ DATA: %s", dump); gpr_free(dump); } } @@ -928,38 +928,35 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) read_bytes = recvmsg(tcp->fd, &msg, 0); } while (read_bytes < 0 && errno == EINTR); + if (read_bytes < 0 && errno == EAGAIN) { + /* NB: After calling call_read_cb a parallel call of the read handler may + * be running. */ + if (total_read_bytes > 0) { + break; + } + finish_estimate(tcp); + tcp->inq = 0; + return false; + } + /* We have read something in previous reads. We need to deliver those * bytes to the upper layer. */ - if (read_bytes <= 0 && - total_read_bytes >= static_cast(tcp->min_progress_size)) { + if (read_bytes <= 0 && total_read_bytes >= 1) { tcp->inq = 1; break; } - if (read_bytes < 0) { - /* NB: After calling call_read_cb a parallel call of the read handler may - * be running. */ - if (errno == EAGAIN) { - if (total_read_bytes > 0) { - break; - } - finish_estimate(tcp); - tcp->inq = 0; - return false; + if (read_bytes <= 0) { + /* 0 read size ==> end of stream */ + grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); + if (read_bytes == 0) { + *error = tcp_annotate_error(absl::InternalError("Socket closed"), tcp); } else { - grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); - *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp); - return true; + *error = + tcp_annotate_error(absl::InternalError(absl::StrCat( + "recvmsg:", grpc_core::StrError(errno))), + tcp); } - } - if (read_bytes == 0) { - /* 0 read size ==> end of stream - * - * We may have read something, i.e., total_read_bytes > 0, but - * since the connection is closed we will drop the data here, because we - * can't call the callback multiple times. */ - grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); - *error = tcp_annotate_error(GRPC_ERROR_CREATE("Socket closed"), tcp); return true; } @@ -1015,7 +1012,7 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) GPR_DEBUG_ASSERT(total_read_bytes > 0); *error = absl::OkStatus(); - if (tcp->frame_size_tuning_enabled) { + if (grpc_core::IsTcpFrameSizeTuningEnabled()) { // Update min progress size based on the total number of bytes read in // this round. tcp->min_progress_size -= total_read_bytes; @@ -1153,7 +1150,7 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, tcp->read_mu.Lock(); tcp->incoming_buffer = incoming_buffer; tcp->min_progress_size = - tcp->frame_size_tuning_enabled ? min_progress_size : 1; + grpc_core::IsTcpFrameSizeTuningEnabled() ? min_progress_size : 1; grpc_slice_buffer_reset_and_unref(incoming_buffer); grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer); TCP_REF(tcp, "read"); @@ -1844,7 +1841,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) { char* data = grpc_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); - gpr_log(GPR_DEBUG, "DATA: %s", data); + gpr_log(GPR_DEBUG, "WRITE DATA: %s", data); gpr_free(data); } } @@ -1987,7 +1984,6 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->socket_ts_enabled = false; tcp->ts_capable = true; tcp->outgoing_buffer_arg = nullptr; - tcp->frame_size_tuning_enabled = grpc_core::IsTcpFrameSizeTuningEnabled(); tcp->min_progress_size = 1; if (options.tcp_tx_zero_copy_enabled && !tcp->tcp_zerocopy_send_ctx.memory_limited()) { diff --git a/test/core/transport/chttp2/graceful_shutdown_test.cc b/test/core/transport/chttp2/graceful_shutdown_test.cc index ab71c4c0c9e..e68cb105660 100644 --- a/test/core/transport/chttp2/graceful_shutdown_test.cc +++ b/test/core/transport/chttp2/graceful_shutdown_test.cc @@ -176,9 +176,11 @@ class GracefulShutdownTest : public ::testing::Test { // Waits for \a bytes to show up in read_bytes_ void WaitForReadBytes(absl::string_view bytes) { std::atomic done{false}; + auto start_time = absl::Now(); { MutexLock lock(&mu_); while (!absl::StrContains(read_bytes_, bytes)) { + ASSERT_LT(absl::Now() - start_time, absl::Seconds(60)); read_cv_.WaitWithTimeout(&mu_, absl::Seconds(5)); } }