diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl index 502869db84e..196dc0e9d8f 100644 --- a/bazel/experiments.bzl +++ b/bazel/experiments.bzl @@ -21,15 +21,9 @@ EXPERIMENTS = { "core_end2end_tests": [ "new_hpack_huffman_decoder", ], - "endpoint_test": [ - "tcp_read_chunks", - ], "event_engine_client_test": [ "posix_event_engine_enable_polling", ], - "flow_control_test": [ - "tcp_read_chunks", - ], "hpack_test": [ "new_hpack_huffman_decoder", ], @@ -41,6 +35,7 @@ EXPERIMENTS = { "endpoint_test": [ "tcp_frame_size_tuning", "tcp_rcv_lowat", + "tcp_read_chunks", ], "event_engine_client_test": [ "event_engine_client", @@ -50,6 +45,7 @@ EXPERIMENTS = { "peer_state_based_framing", "tcp_frame_size_tuning", "tcp_rcv_lowat", + "tcp_read_chunks", ], "lame_client_test": [ "promise_based_client_call", diff --git a/src/core/lib/event_engine/posix_engine/posix_endpoint.cc b/src/core/lib/event_engine/posix_engine/posix_endpoint.cc index c43bba2f5ef..fe6f0231dcb 100644 --- a/src/core/lib/event_engine/posix_engine/posix_endpoint.cc +++ b/src/core/lib/event_engine/posix_engine/posix_endpoint.cc @@ -314,6 +314,14 @@ bool PosixEndpointImpl::TcpDoRead(absl::Status& status) { read_bytes = recvmsg(fd_, &msg, 0); } while (read_bytes < 0 && errno == EINTR); + // We have read something in previous reads. We need to deliver those bytes + // to the upper layer. + if (read_bytes <= 0 && + total_read_bytes >= static_cast(min_progress_size_)) { + inq_ = 1; + break; + } + if (read_bytes < 0) { // NB: After calling call_read_cb a parallel call of the read handler may // be running. @@ -331,16 +339,12 @@ bool PosixEndpointImpl::TcpDoRead(absl::Status& status) { return true; } } - - // We have read something in previous reads. We need to deliver those bytes - // to the upper layer. - if (read_bytes <= 0 && total_read_bytes >= 1) { - inq_ = 1; - break; - } - if (read_bytes == 0) { // 0 read size ==> end of stream + // + // We may have read something, i.e., total_read_bytes > 0, but since the + // connection is closed we will drop the data here, because we can't call + // the callback multiple times. incoming_buffer_->Clear(); status = absl::InternalError("Socket closed"); return true; @@ -397,7 +401,7 @@ bool PosixEndpointImpl::TcpDoRead(absl::Status& status) { GPR_DEBUG_ASSERT(total_read_bytes > 0); status = absl::OkStatus(); - if (grpc_core::IsTcpFrameSizeTuningEnabled()) { + if (frame_size_tuning_enabled_) { // Update min progress size based on the total number of bytes read in // this round. min_progress_size_ -= total_read_bytes; @@ -578,7 +582,7 @@ void PosixEndpointImpl::Read(absl::AnyInvocable on_read, incoming_buffer_->Clear(); incoming_buffer_->Swap(last_read_buffer_); read_mu_.Unlock(); - if (args != nullptr && grpc_core::IsTcpFrameSizeTuningEnabled()) { + if (args != nullptr && frame_size_tuning_enabled_) { min_progress_size_ = args->read_hint_bytes; } else { min_progress_size_ = 1; @@ -1220,6 +1224,7 @@ PosixEndpointImpl::PosixEndpointImpl(EventHandle* handle, tcp_zerocopy_send_ctx_ = std::make_unique( zerocopy_enabled, options.tcp_tx_zerocopy_max_simultaneous_sends, options.tcp_tx_zerocopy_send_bytes_threshold); + frame_size_tuning_enabled_ = grpc_core::IsTcpFrameSizeTuningEnabled(); #ifdef GRPC_HAVE_TCP_INQ int one = 1; if (setsockopt(fd_, SOL_TCP, TCP_INQ, &one, sizeof(one)) == 0) { diff --git a/src/core/lib/event_engine/posix_engine/posix_endpoint.h b/src/core/lib/event_engine/posix_engine/posix_endpoint.h index dd993805e98..e783b816736 100644 --- a/src/core/lib/event_engine/posix_engine/posix_endpoint.h +++ b/src/core/lib/event_engine/posix_engine/posix_endpoint.h @@ -576,6 +576,9 @@ class PosixEndpointImpl : public grpc_core::RefCounted { std::atomic stop_error_notification_{false}; std::unique_ptr tcp_zerocopy_send_ctx_; TcpZerocopySendRecord* current_zerocopy_send_ = nullptr; + // If true, the size of buffers alloted for tcp reads will be based on the + // specified min_progress_size values conveyed by the upper layers. + bool frame_size_tuning_enabled_ = false; // A hint from upper layers specifying the minimum number of bytes that need // to be read to make meaningful progress. int min_progress_size_ = 1; diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index 86cd52de0b6..c292c08ad9c 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -64,7 +64,7 @@ namespace grpc_core { const ExperimentMetadata g_experiment_metadata[] = { {"tcp_frame_size_tuning", description_tcp_frame_size_tuning, false}, - {"tcp_read_chunks", description_tcp_read_chunks, kDefaultForDebugOnly}, + {"tcp_read_chunks", description_tcp_read_chunks, false}, {"tcp_rcv_lowat", description_tcp_rcv_lowat, false}, {"peer_state_based_framing", description_peer_state_based_framing, false}, {"flow_control_fixes", description_flow_control_fixes, false}, diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index ad959e4a727..db3d9703bd3 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -50,7 +50,7 @@ description: Allocate only 8kb or 64kb chunks for TCP reads to reduce pressure on malloc to recycle arbitrary large blocks. - default: debug + default: false expiry: 2023/01/01 owner: ctiller@google.com test_tags: ["endpoint_test", "flow_control_test"] diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index f9802de73b4..05d40f65297 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -21,7 +21,6 @@ #include #include "src/core/lib/gprpp/global_config_generic.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/port.h" #ifdef GRPC_POSIX_SOCKET_TCP @@ -543,6 +542,7 @@ struct grpc_tcp { TcpZerocopySendCtx tcp_zerocopy_send_ctx; TcpZerocopySendRecord* current_zerocopy_send = nullptr; + bool frame_size_tuning_enabled; int min_progress_size; /* A hint from upper layers specifying the minimum number of bytes that need to be read to make meaningful progress */ @@ -928,6 +928,14 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) read_bytes = recvmsg(tcp->fd, &msg, 0); } while (read_bytes < 0 && errno == EINTR); + /* We have read something in previous reads. We need to deliver those + * bytes to the upper layer. */ + if (read_bytes <= 0 && + total_read_bytes >= static_cast(tcp->min_progress_size)) { + tcp->inq = 1; + break; + } + if (read_bytes < 0) { /* NB: After calling call_read_cb a parallel call of the read handler may * be running. */ @@ -944,16 +952,12 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) return true; } } - - /* We have read something in previous reads. We need to deliver those - * bytes to the upper layer. */ - if (read_bytes <= 0 && total_read_bytes >= 1) { - tcp->inq = 1; - break; - } - if (read_bytes == 0) { - /* 0 read size ==> end of stream */ + /* 0 read size ==> end of stream + * + * We may have read something, i.e., total_read_bytes > 0, but + * since the connection is closed we will drop the data here, because we + * can't call the callback multiple times. */ grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); *error = tcp_annotate_error(GRPC_ERROR_CREATE("Socket closed"), tcp); return true; @@ -1011,7 +1015,7 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) GPR_DEBUG_ASSERT(total_read_bytes > 0); *error = absl::OkStatus(); - if (grpc_core::IsTcpFrameSizeTuningEnabled()) { + if (tcp->frame_size_tuning_enabled) { // Update min progress size based on the total number of bytes read in // this round. tcp->min_progress_size -= total_read_bytes; @@ -1149,7 +1153,7 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, tcp->read_mu.Lock(); tcp->incoming_buffer = incoming_buffer; tcp->min_progress_size = - grpc_core::IsTcpFrameSizeTuningEnabled() ? min_progress_size : 1; + tcp->frame_size_tuning_enabled ? min_progress_size : 1; grpc_slice_buffer_reset_and_unref(incoming_buffer); grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer); TCP_REF(tcp, "read"); @@ -1983,6 +1987,7 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->socket_ts_enabled = false; tcp->ts_capable = true; tcp->outgoing_buffer_arg = nullptr; + tcp->frame_size_tuning_enabled = grpc_core::IsTcpFrameSizeTuningEnabled(); tcp->min_progress_size = 1; if (options.tcp_tx_zero_copy_enabled && !tcp->tcp_zerocopy_send_ctx.memory_limited()) {