diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl index 287c9cfc24d..30906fc81b5 100644 --- a/bazel/experiments.bzl +++ b/bazel/experiments.bzl @@ -21,6 +21,12 @@ EXPERIMENTS = { "core_end2end_tests": [ "new_hpack_huffman_decoder", ], + "endpoint_test": [ + "tcp_read_chunks", + ], + "flow_control_test": [ + "tcp_read_chunks", + ], "hpack_test": [ "new_hpack_huffman_decoder", ], @@ -32,7 +38,6 @@ EXPERIMENTS = { "endpoint_test": [ "tcp_frame_size_tuning", "tcp_rcv_lowat", - "tcp_read_chunks", ], "event_engine_client_test": [ "event_engine_client", @@ -42,7 +47,6 @@ EXPERIMENTS = { "peer_state_based_framing", "tcp_frame_size_tuning", "tcp_rcv_lowat", - "tcp_read_chunks", ], "lame_client_test": [ "promise_based_client_call", diff --git a/src/core/lib/event_engine/posix_engine/posix_endpoint.cc b/src/core/lib/event_engine/posix_engine/posix_endpoint.cc index a2bfa19b538..e4e77f7f464 100644 --- a/src/core/lib/event_engine/posix_engine/posix_endpoint.cc +++ b/src/core/lib/event_engine/posix_engine/posix_endpoint.cc @@ -316,14 +316,6 @@ bool PosixEndpointImpl::TcpDoRead(absl::Status& status) { read_bytes = recvmsg(fd_, &msg, 0); } while (read_bytes < 0 && errno == EINTR); - // We have read something in previous reads. We need to deliver those bytes - // to the upper layer. - if (read_bytes <= 0 && - total_read_bytes >= static_cast(min_progress_size_)) { - inq_ = 1; - break; - } - if (read_bytes < 0) { // NB: After calling call_read_cb a parallel call of the read handler may // be running. @@ -341,12 +333,16 @@ bool PosixEndpointImpl::TcpDoRead(absl::Status& status) { return true; } } + + // We have read something in previous reads. We need to deliver those bytes + // to the upper layer. + if (read_bytes <= 0 && total_read_bytes >= 1) { + inq_ = 1; + break; + } + if (read_bytes == 0) { // 0 read size ==> end of stream - // - // We may have read something, i.e., total_read_bytes > 0, but since the - // connection is closed we will drop the data here, because we can't call - // the callback multiple times. incoming_buffer_->Clear(); status = absl::InternalError("Socket closed"); return true; @@ -403,7 +399,7 @@ bool PosixEndpointImpl::TcpDoRead(absl::Status& status) { GPR_DEBUG_ASSERT(total_read_bytes > 0); status = absl::OkStatus(); - if (frame_size_tuning_enabled_) { + if (grpc_core::IsTcpFrameSizeTuningEnabled()) { // Update min progress size based on the total number of bytes read in // this round. min_progress_size_ -= total_read_bytes; @@ -584,7 +580,7 @@ void PosixEndpointImpl::Read(absl::AnyInvocable on_read, incoming_buffer_->Clear(); incoming_buffer_->Swap(last_read_buffer_); read_mu_.Unlock(); - if (args != nullptr && frame_size_tuning_enabled_) { + if (args != nullptr && grpc_core::IsTcpFrameSizeTuningEnabled()) { min_progress_size_ = args->read_hint_bytes; } else { min_progress_size_ = 1; @@ -1225,7 +1221,6 @@ PosixEndpointImpl::PosixEndpointImpl(EventHandle* handle, tcp_zerocopy_send_ctx_ = std::make_unique( zerocopy_enabled, options.tcp_tx_zerocopy_max_simultaneous_sends, options.tcp_tx_zerocopy_send_bytes_threshold); - frame_size_tuning_enabled_ = grpc_core::IsTcpFrameSizeTuningEnabled(); #ifdef GRPC_HAVE_TCP_INQ int one = 1; if (setsockopt(fd_, SOL_TCP, TCP_INQ, &one, sizeof(one)) == 0) { diff --git a/src/core/lib/event_engine/posix_engine/posix_endpoint.h b/src/core/lib/event_engine/posix_engine/posix_endpoint.h index dbddd946678..b3b5e8512ef 100644 --- a/src/core/lib/event_engine/posix_engine/posix_endpoint.h +++ b/src/core/lib/event_engine/posix_engine/posix_endpoint.h @@ -576,9 +576,6 @@ class PosixEndpointImpl : public grpc_core::RefCounted { std::atomic stop_error_notification_{false}; std::unique_ptr tcp_zerocopy_send_ctx_; TcpZerocopySendRecord* current_zerocopy_send_ = nullptr; - // If true, the size of buffers alloted for tcp reads will be based on the - // specified min_progress_size values conveyed by the upper layers. - bool frame_size_tuning_enabled_ = false; // A hint from upper layers specifying the minimum number of bytes that need // to be read to make meaningful progress. int min_progress_size_ = 1; diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index bf97b9f5d21..852e430760d 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -62,7 +62,7 @@ namespace grpc_core { const ExperimentMetadata g_experiment_metadata[] = { {"tcp_frame_size_tuning", description_tcp_frame_size_tuning, false}, - {"tcp_read_chunks", description_tcp_read_chunks, false}, + {"tcp_read_chunks", description_tcp_read_chunks, kDefaultForDebugOnly}, {"tcp_rcv_lowat", description_tcp_rcv_lowat, false}, {"peer_state_based_framing", description_peer_state_based_framing, false}, {"flow_control_fixes", description_flow_control_fixes, false}, diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index b7631023464..b7be9ec39f7 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -50,7 +50,7 @@ description: Allocate only 8kb or 64kb chunks for TCP reads to reduce pressure on malloc to recycle arbitrary large blocks. - default: false + default: debug expiry: 2023/01/01 owner: ctiller@google.com test_tags: ["endpoint_test", "flow_control_test"] diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 05d40f65297..f9802de73b4 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -21,6 +21,7 @@ #include #include "src/core/lib/gprpp/global_config_generic.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/port.h" #ifdef GRPC_POSIX_SOCKET_TCP @@ -542,7 +543,6 @@ struct grpc_tcp { TcpZerocopySendCtx tcp_zerocopy_send_ctx; TcpZerocopySendRecord* current_zerocopy_send = nullptr; - bool frame_size_tuning_enabled; int min_progress_size; /* A hint from upper layers specifying the minimum number of bytes that need to be read to make meaningful progress */ @@ -928,14 +928,6 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) read_bytes = recvmsg(tcp->fd, &msg, 0); } while (read_bytes < 0 && errno == EINTR); - /* We have read something in previous reads. We need to deliver those - * bytes to the upper layer. */ - if (read_bytes <= 0 && - total_read_bytes >= static_cast(tcp->min_progress_size)) { - tcp->inq = 1; - break; - } - if (read_bytes < 0) { /* NB: After calling call_read_cb a parallel call of the read handler may * be running. */ @@ -952,12 +944,16 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) return true; } } + + /* We have read something in previous reads. We need to deliver those + * bytes to the upper layer. */ + if (read_bytes <= 0 && total_read_bytes >= 1) { + tcp->inq = 1; + break; + } + if (read_bytes == 0) { - /* 0 read size ==> end of stream - * - * We may have read something, i.e., total_read_bytes > 0, but - * since the connection is closed we will drop the data here, because we - * can't call the callback multiple times. */ + /* 0 read size ==> end of stream */ grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); *error = tcp_annotate_error(GRPC_ERROR_CREATE("Socket closed"), tcp); return true; @@ -1015,7 +1011,7 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) GPR_DEBUG_ASSERT(total_read_bytes > 0); *error = absl::OkStatus(); - if (tcp->frame_size_tuning_enabled) { + if (grpc_core::IsTcpFrameSizeTuningEnabled()) { // Update min progress size based on the total number of bytes read in // this round. tcp->min_progress_size -= total_read_bytes; @@ -1153,7 +1149,7 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, tcp->read_mu.Lock(); tcp->incoming_buffer = incoming_buffer; tcp->min_progress_size = - tcp->frame_size_tuning_enabled ? min_progress_size : 1; + grpc_core::IsTcpFrameSizeTuningEnabled() ? min_progress_size : 1; grpc_slice_buffer_reset_and_unref(incoming_buffer); grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer); TCP_REF(tcp, "read"); @@ -1987,7 +1983,6 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->socket_ts_enabled = false; tcp->ts_capable = true; tcp->outgoing_buffer_arg = nullptr; - tcp->frame_size_tuning_enabled = grpc_core::IsTcpFrameSizeTuningEnabled(); tcp->min_progress_size = 1; if (options.tcp_tx_zero_copy_enabled && !tcp->tcp_zerocopy_send_ctx.memory_limited()) {