diff --git a/BUILD b/BUILD index 8d73a68c23d..e61d4f73a9a 100644 --- a/BUILD +++ b/BUILD @@ -4056,6 +4056,7 @@ grpc_cc_library( "//src/core:time", "//src/core:transport_fwd", "//src/core:useful", + "//src/core:write_size_policy", ], ) diff --git a/CMakeLists.txt b/CMakeLists.txt index 109dd40e69e..ee8e994b470 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1447,6 +1447,7 @@ if(gRPC_BUILD_TESTS) endif() add_dependencies(buildtests_cxx write_buffering_at_end_test) add_dependencies(buildtests_cxx write_buffering_test) + add_dependencies(buildtests_cxx write_size_policy_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx writes_per_rpc_test) endif() @@ -1843,6 +1844,7 @@ add_library(grpc src/core/ext/transport/chttp2/transport/ping_rate_policy.cc src/core/ext/transport/chttp2/transport/stream_lists.cc src/core/ext/transport/chttp2/transport/varint.cc + src/core/ext/transport/chttp2/transport/write_size_policy.cc src/core/ext/transport/chttp2/transport/writing.cc src/core/ext/transport/inproc/inproc_plugin.cc src/core/ext/transport/inproc/inproc_transport.cc @@ -2882,6 +2884,7 @@ add_library(grpc_unsecure src/core/ext/transport/chttp2/transport/ping_rate_policy.cc src/core/ext/transport/chttp2/transport/stream_lists.cc src/core/ext/transport/chttp2/transport/varint.cc + src/core/ext/transport/chttp2/transport/write_size_policy.cc src/core/ext/transport/chttp2/transport/writing.cc src/core/ext/transport/inproc/inproc_plugin.cc src/core/ext/transport/inproc/inproc_transport.cc @@ -26199,6 +26202,42 @@ target_link_libraries(write_buffering_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(write_size_policy_test + src/core/ext/transport/chttp2/transport/write_size_policy.cc + src/core/lib/gprpp/time.cc + test/core/transport/chttp2/write_size_policy_test.cc +) +target_compile_features(write_size_policy_test PUBLIC cxx_std_14) +target_include_directories(write_size_policy_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(write_size_policy_test + ${_gRPC_ALLTARGETS_LIBRARIES} + gtest + absl::statusor + gpr +) + + endif() if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) diff --git a/Makefile b/Makefile index 6cb5533f11b..f9130018d84 100644 --- a/Makefile +++ b/Makefile @@ -1064,6 +1064,7 @@ LIBGRPC_SRC = \ src/core/ext/transport/chttp2/transport/ping_rate_policy.cc \ src/core/ext/transport/chttp2/transport/stream_lists.cc \ src/core/ext/transport/chttp2/transport/varint.cc \ + src/core/ext/transport/chttp2/transport/write_size_policy.cc \ src/core/ext/transport/chttp2/transport/writing.cc \ src/core/ext/transport/inproc/inproc_plugin.cc \ src/core/ext/transport/inproc/inproc_transport.cc \ @@ -1955,6 +1956,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/transport/chttp2/transport/ping_rate_policy.cc \ src/core/ext/transport/chttp2/transport/stream_lists.cc \ src/core/ext/transport/chttp2/transport/varint.cc \ + src/core/ext/transport/chttp2/transport/write_size_policy.cc \ src/core/ext/transport/chttp2/transport/writing.cc \ src/core/ext/transport/inproc/inproc_plugin.cc \ src/core/ext/transport/inproc/inproc_transport.cc \ diff --git a/Package.swift b/Package.swift index cd38e7314f1..11a1f5896c2 100644 --- a/Package.swift +++ b/Package.swift @@ -323,6 +323,8 @@ let package = Package( "src/core/ext/transport/chttp2/transport/stream_lists.cc", "src/core/ext/transport/chttp2/transport/varint.cc", "src/core/ext/transport/chttp2/transport/varint.h", + "src/core/ext/transport/chttp2/transport/write_size_policy.cc", + "src/core/ext/transport/chttp2/transport/write_size_policy.h", "src/core/ext/transport/chttp2/transport/writing.cc", "src/core/ext/transport/inproc/inproc_plugin.cc", "src/core/ext/transport/inproc/inproc_transport.cc", diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl index 25da9bf19d0..eba454dda85 100644 --- a/bazel/experiments.bzl +++ b/bazel/experiments.bzl @@ -83,6 +83,8 @@ EXPERIMENTS = { "chttp2_batch_requests", "chttp2_offload_on_rst_stream", "lazier_stream_updates", + "write_size_cap", + "write_size_policy", ], "lb_unit_test": [ "pick_first_happy_eyeballs", @@ -165,6 +167,8 @@ EXPERIMENTS = { "chttp2_batch_requests", "chttp2_offload_on_rst_stream", "lazier_stream_updates", + "write_size_cap", + "write_size_policy", ], "lb_unit_test": [ "pick_first_happy_eyeballs", @@ -257,6 +261,8 @@ EXPERIMENTS = { "chttp2_batch_requests", "chttp2_offload_on_rst_stream", "lazier_stream_updates", + "write_size_cap", + "write_size_policy", ], "lb_unit_test": [ "pick_first_happy_eyeballs", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 1cdc18d4cee..9d877645116 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -314,6 +314,7 @@ libs: - src/core/ext/transport/chttp2/transport/ping_callbacks.h - src/core/ext/transport/chttp2/transport/ping_rate_policy.h - src/core/ext/transport/chttp2/transport/varint.h + - src/core/ext/transport/chttp2/transport/write_size_policy.h - src/core/ext/transport/inproc/inproc_transport.h - src/core/ext/transport/inproc/legacy_inproc_transport.h - src/core/ext/upb-generated/envoy/admin/v3/certs.upb.h @@ -1145,6 +1146,7 @@ libs: - src/core/ext/transport/chttp2/transport/ping_rate_policy.cc - src/core/ext/transport/chttp2/transport/stream_lists.cc - src/core/ext/transport/chttp2/transport/varint.cc + - src/core/ext/transport/chttp2/transport/write_size_policy.cc - src/core/ext/transport/chttp2/transport/writing.cc - src/core/ext/transport/inproc/inproc_plugin.cc - src/core/ext/transport/inproc/inproc_transport.cc @@ -2060,6 +2062,7 @@ libs: - src/core/ext/transport/chttp2/transport/ping_callbacks.h - src/core/ext/transport/chttp2/transport/ping_rate_policy.h - src/core/ext/transport/chttp2/transport/varint.h + - src/core/ext/transport/chttp2/transport/write_size_policy.h - src/core/ext/transport/inproc/inproc_transport.h - src/core/ext/transport/inproc/legacy_inproc_transport.h - src/core/ext/upb-generated/google/api/annotations.upb.h @@ -2502,6 +2505,7 @@ libs: - src/core/ext/transport/chttp2/transport/ping_rate_policy.cc - src/core/ext/transport/chttp2/transport/stream_lists.cc - src/core/ext/transport/chttp2/transport/varint.cc + - src/core/ext/transport/chttp2/transport/write_size_policy.cc - src/core/ext/transport/chttp2/transport/writing.cc - src/core/ext/transport/inproc/inproc_plugin.cc - src/core/ext/transport/inproc/inproc_transport.cc @@ -17320,6 +17324,22 @@ targets: - grpc_authorization_provider - grpc_unsecure - grpc_test_util +- name: write_size_policy_test + gtest: true + build: test + language: c++ + headers: + - src/core/ext/transport/chttp2/transport/write_size_policy.h + - src/core/lib/gprpp/time.h + src: + - src/core/ext/transport/chttp2/transport/write_size_policy.cc + - src/core/lib/gprpp/time.cc + - test/core/transport/chttp2/write_size_policy_test.cc + deps: + - gtest + - absl/status:statusor + - gpr + uses_polling: false - name: writes_per_rpc_test gtest: true build: test diff --git a/config.m4 b/config.m4 index cfb97cb573e..c83eabba573 100644 --- a/config.m4 +++ b/config.m4 @@ -151,6 +151,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/transport/chttp2/transport/ping_rate_policy.cc \ src/core/ext/transport/chttp2/transport/stream_lists.cc \ src/core/ext/transport/chttp2/transport/varint.cc \ + src/core/ext/transport/chttp2/transport/write_size_policy.cc \ src/core/ext/transport/chttp2/transport/writing.cc \ src/core/ext/transport/inproc/inproc_plugin.cc \ src/core/ext/transport/inproc/inproc_transport.cc \ diff --git a/config.w32 b/config.w32 index b9eab149ff2..794547b1425 100644 --- a/config.w32 +++ b/config.w32 @@ -116,6 +116,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\transport\\chttp2\\transport\\ping_rate_policy.cc " + "src\\core\\ext\\transport\\chttp2\\transport\\stream_lists.cc " + "src\\core\\ext\\transport\\chttp2\\transport\\varint.cc " + + "src\\core\\ext\\transport\\chttp2\\transport\\write_size_policy.cc " + "src\\core\\ext\\transport\\chttp2\\transport\\writing.cc " + "src\\core\\ext\\transport\\inproc\\inproc_plugin.cc " + "src\\core\\ext\\transport\\inproc\\inproc_transport.cc " + diff --git a/fuzztest/core/transport/chttp2/BUILD b/fuzztest/core/transport/chttp2/BUILD new file mode 100644 index 00000000000..b95ff53e774 --- /dev/null +++ b/fuzztest/core/transport/chttp2/BUILD @@ -0,0 +1,26 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +load("//fuzztest:grpc_fuzz_test.bzl", "grpc_fuzz_test") + +grpc_fuzz_test( + name = "write_size_policy_test", + srcs = ["write_size_policy_test.cc"], + external_deps = [ + "fuzztest", + "fuzztest_main", + "gtest", + ], + deps = ["//src/core:write_size_policy"], +) diff --git a/fuzztest/core/transport/chttp2/write_size_policy_test.cc b/fuzztest/core/transport/chttp2/write_size_policy_test.cc new file mode 100644 index 00000000000..cf97ce44cd5 --- /dev/null +++ b/fuzztest/core/transport/chttp2/write_size_policy_test.cc @@ -0,0 +1,64 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Test to verify Fuzztest integration + +#include "src/core/ext/transport/chttp2/transport/write_size_policy.h" + +#include + +#include "fuzztest/fuzztest.h" +#include "gtest/gtest.h" + +namespace grpc_core { + +struct OneWrite { + uint16_t delay_start; + uint32_t size; + uint16_t write_time; + bool success; +}; + +void WriteSizePolicyStaysWithinBounds(std::vector ops) { + ScopedTimeCache time_cache; + uint32_t now = 100; + Chttp2WriteSizePolicy policy; + for (const OneWrite op : ops) { + const auto start_target = policy.WriteTargetSize(); + now += op.delay_start; + time_cache.TestOnlySetNow(Timestamp::ProcessEpoch() + + Duration::Milliseconds(now)); + policy.BeginWrite(op.size); + now += op.write_time; + time_cache.TestOnlySetNow(Timestamp::ProcessEpoch() + + Duration::Milliseconds(now)); + policy.EndWrite(op.success); + if (op.size >= start_target * 7 / 10) { + if (op.write_time < Chttp2WriteSizePolicy::FastWrite().millis()) { + EXPECT_GE(policy.WriteTargetSize(), start_target); + EXPECT_LE(policy.WriteTargetSize(), start_target * 3 / 2); + } else if (op.write_time > Chttp2WriteSizePolicy::SlowWrite().millis()) { + EXPECT_LE(policy.WriteTargetSize(), start_target); + EXPECT_GE(policy.WriteTargetSize(), start_target / 3); + } + } else { + EXPECT_EQ(policy.WriteTargetSize(), start_target); + } + EXPECT_GE(policy.WriteTargetSize(), Chttp2WriteSizePolicy::MinTarget()); + EXPECT_LE(policy.WriteTargetSize(), Chttp2WriteSizePolicy::MaxTarget()); + } +} +FUZZ_TEST(MyTestSuite, WriteSizePolicyStaysWithinBounds); + +} // namespace grpc_core diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index b388efb28c7..d92e113cc3c 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -385,6 +385,7 @@ Pod::Spec.new do |s| 'src/core/ext/transport/chttp2/transport/ping_callbacks.h', 'src/core/ext/transport/chttp2/transport/ping_rate_policy.h', 'src/core/ext/transport/chttp2/transport/varint.h', + 'src/core/ext/transport/chttp2/transport/write_size_policy.h', 'src/core/ext/transport/inproc/inproc_transport.h', 'src/core/ext/transport/inproc/legacy_inproc_transport.h', 'src/core/ext/upb-generated/envoy/admin/v3/certs.upb.h', @@ -1457,6 +1458,7 @@ Pod::Spec.new do |s| 'src/core/ext/transport/chttp2/transport/ping_callbacks.h', 'src/core/ext/transport/chttp2/transport/ping_rate_policy.h', 'src/core/ext/transport/chttp2/transport/varint.h', + 'src/core/ext/transport/chttp2/transport/write_size_policy.h', 'src/core/ext/transport/inproc/inproc_transport.h', 'src/core/ext/transport/inproc/legacy_inproc_transport.h', 'src/core/ext/upb-generated/envoy/admin/v3/certs.upb.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index cdd256ab12f..886f3602d30 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -426,6 +426,8 @@ Pod::Spec.new do |s| 'src/core/ext/transport/chttp2/transport/stream_lists.cc', 'src/core/ext/transport/chttp2/transport/varint.cc', 'src/core/ext/transport/chttp2/transport/varint.h', + 'src/core/ext/transport/chttp2/transport/write_size_policy.cc', + 'src/core/ext/transport/chttp2/transport/write_size_policy.h', 'src/core/ext/transport/chttp2/transport/writing.cc', 'src/core/ext/transport/inproc/inproc_plugin.cc', 'src/core/ext/transport/inproc/inproc_transport.cc', @@ -2214,6 +2216,7 @@ Pod::Spec.new do |s| 'src/core/ext/transport/chttp2/transport/ping_callbacks.h', 'src/core/ext/transport/chttp2/transport/ping_rate_policy.h', 'src/core/ext/transport/chttp2/transport/varint.h', + 'src/core/ext/transport/chttp2/transport/write_size_policy.h', 'src/core/ext/transport/inproc/inproc_transport.h', 'src/core/ext/transport/inproc/legacy_inproc_transport.h', 'src/core/ext/upb-generated/envoy/admin/v3/certs.upb.h', diff --git a/grpc.gemspec b/grpc.gemspec index 296d9a08a44..4ba8ce574bc 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -329,6 +329,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/transport/chttp2/transport/stream_lists.cc ) s.files += %w( src/core/ext/transport/chttp2/transport/varint.cc ) s.files += %w( src/core/ext/transport/chttp2/transport/varint.h ) + s.files += %w( src/core/ext/transport/chttp2/transport/write_size_policy.cc ) + s.files += %w( src/core/ext/transport/chttp2/transport/write_size_policy.h ) s.files += %w( src/core/ext/transport/chttp2/transport/writing.cc ) s.files += %w( src/core/ext/transport/inproc/inproc_plugin.cc ) s.files += %w( src/core/ext/transport/inproc/inproc_transport.cc ) diff --git a/grpc.gyp b/grpc.gyp index b3eb2618ad4..1b0a639d887 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -382,6 +382,7 @@ 'src/core/ext/transport/chttp2/transport/ping_rate_policy.cc', 'src/core/ext/transport/chttp2/transport/stream_lists.cc', 'src/core/ext/transport/chttp2/transport/varint.cc', + 'src/core/ext/transport/chttp2/transport/write_size_policy.cc', 'src/core/ext/transport/chttp2/transport/writing.cc', 'src/core/ext/transport/inproc/inproc_plugin.cc', 'src/core/ext/transport/inproc/inproc_transport.cc', @@ -1215,6 +1216,7 @@ 'src/core/ext/transport/chttp2/transport/ping_rate_policy.cc', 'src/core/ext/transport/chttp2/transport/stream_lists.cc', 'src/core/ext/transport/chttp2/transport/varint.cc', + 'src/core/ext/transport/chttp2/transport/write_size_policy.cc', 'src/core/ext/transport/chttp2/transport/writing.cc', 'src/core/ext/transport/inproc/inproc_plugin.cc', 'src/core/ext/transport/inproc/inproc_transport.cc', diff --git a/package.xml b/package.xml index 5c752179551..6076a544e43 100644 --- a/package.xml +++ b/package.xml @@ -311,6 +311,8 @@ + + diff --git a/src/core/BUILD b/src/core/BUILD index b9bcb0483bd..d851a7996ff 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -5717,6 +5717,21 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "write_size_policy", + srcs = [ + "ext/transport/chttp2/transport/write_size_policy.cc", + ], + hdrs = [ + "ext/transport/chttp2/transport/write_size_policy.h", + ], + deps = [ + "time", + "//:gpr", + "//:gpr_platform", + ], +) + grpc_cc_library( name = "ping_rate_policy", srcs = [ diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 1d511fe8cee..5330a119a07 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -70,6 +70,7 @@ #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h" #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h" #include "src/core/ext/transport/chttp2/transport/varint.h" +#include "src/core/ext/transport/chttp2/transport/write_size_policy.h" #include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/context.h" @@ -1086,6 +1087,7 @@ static void write_action(grpc_chttp2_transport* t) { gpr_log(GPR_INFO, "%s[%p]: Write %" PRIdPTR " bytes", t->is_client ? "CLIENT" : "SERVER", t, t->outbuf.Length()); } + t->write_size_policy.BeginWrite(t->outbuf.Length()); grpc_endpoint_write(t->ep, t->outbuf.c_slice_buffer(), grpc_core::InitTransportClosure( t->Ref(), &t->write_action_end_locked), @@ -1109,6 +1111,8 @@ static void write_action_end(grpc_core::RefCountedPtr t, static void write_action_end_locked( grpc_core::RefCountedPtr t, grpc_error_handle error) { + t->write_size_policy.EndWrite(error.ok()); + bool closed = false; if (!error.ok()) { close_transport_locked(t.get(), error); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 732dc39a330..c7a29994edd 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -55,6 +55,7 @@ #include "src/core/ext/transport/chttp2/transport/ping_abuse_policy.h" #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h" #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h" +#include "src/core/ext/transport/chttp2/transport/write_size_policy.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channelz.h" #include "src/core/lib/debug/trace.h" @@ -485,9 +486,11 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { grpc_core::Duration settings_timeout; /// how much data are we willing to buffer when the WRITE_BUFFER_HINT is set? - /// uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow; + /// policy for how much data we're willing to put into one http2 write + grpc_core::Chttp2WriteSizePolicy write_size_policy; + bool reading_paused_on_pending_induced_frames = false; /// Based on channel args, preferred_rx_crypto_frame_sizes are advertised to /// the peer diff --git a/src/core/ext/transport/chttp2/transport/write_size_policy.cc b/src/core/ext/transport/chttp2/transport/write_size_policy.cc new file mode 100644 index 00000000000..9aae5197e56 --- /dev/null +++ b/src/core/ext/transport/chttp2/transport/write_size_policy.cc @@ -0,0 +1,60 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/ext/transport/chttp2/transport/write_size_policy.h" + +#include + +#include + +namespace grpc_core { + +size_t Chttp2WriteSizePolicy::WriteTargetSize() { return current_target_; } + +void Chttp2WriteSizePolicy::BeginWrite(size_t size) { + GPR_ASSERT(experiment_start_time_ == Timestamp::InfFuture()); + if (size < current_target_ * 7 / 10) { + // If we were trending fast but stopped getting enough data to verify, then + // reset back to the default state. + if (state_ < 0) state_ = 0; + return; + } + experiment_start_time_ = Timestamp::Now(); +} + +void Chttp2WriteSizePolicy::EndWrite(bool success) { + if (experiment_start_time_ == Timestamp::InfFuture()) return; + const auto elapsed = Timestamp::Now() - experiment_start_time_; + experiment_start_time_ = Timestamp::InfFuture(); + if (!success) return; + if (elapsed < FastWrite()) { + --state_; + if (state_ == -2) { + state_ = 0; + current_target_ = std::min(current_target_ * 3 / 2, MaxTarget()); + } + } else if (elapsed > SlowWrite()) { + ++state_; + if (state_ == 2) { + state_ = 0; + current_target_ = std::max(current_target_ / 3, MinTarget()); + } + } else { + state_ = 0; + } +} + +} // namespace grpc_core diff --git a/src/core/ext/transport/chttp2/transport/write_size_policy.h b/src/core/ext/transport/chttp2/transport/write_size_policy.h new file mode 100644 index 00000000000..a300dbd4c2e --- /dev/null +++ b/src/core/ext/transport/chttp2/transport/write_size_policy.h @@ -0,0 +1,66 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_WRITE_SIZE_POLICY_H +#define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_WRITE_SIZE_POLICY_H + +#include + +#include +#include + +#include "src/core/lib/gprpp/time.h" + +namespace grpc_core { + +class Chttp2WriteSizePolicy { + public: + // Smallest possible WriteTargetSize + static constexpr size_t MinTarget() { return 32 * 1024; } + // Largest possible WriteTargetSize + static constexpr size_t MaxTarget() { return 16 * 1024 * 1024; } + // How long should a write take to be considered "fast" + static constexpr Duration FastWrite() { return Duration::Milliseconds(100); } + // How long should a write take to be considered "slow" + static constexpr Duration SlowWrite() { return Duration::Seconds(1); } + // If a read is slow, what target time should we use to try and adjust back + // to? + static constexpr Duration TargetWriteTime() { + return Duration::Milliseconds(300); + } + + // What size should be targetted for the next write. + size_t WriteTargetSize(); + // Notify the policy that a write of some size has begun. + // EndWrite must be called when the write completes. + void BeginWrite(size_t size); + // Notify the policy that a write of some size has ended. + void EndWrite(bool success); + + private: + size_t current_target_ = 128 * 1024; + Timestamp experiment_start_time_ = Timestamp::InfFuture(); + // State varies from -2...2 + // Every time we do a write faster than kFastWrite, we decrement + // Every time we do a write slower than kSlowWrite, we increment + // If we hit -2, we increase the target size and reset state to 0 + // If we hit 2, we decrease the target size and reset state to 0 + // In this way, we need two consecutive fast/slow operations to adjust, + // denoising the signal significantly + int8_t state_ = 0; +}; + +} // namespace grpc_core + +#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_WRITE_SIZE_POLICY_H diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 93edac86533..70cb7787342 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -50,11 +51,13 @@ #include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h" #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h" +#include "src/core/ext/transport/chttp2/transport/write_size_policy.h" #include "src/core/lib/channel/channelz.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/stats_data.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/experiments/experiments.h" +#include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/match.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -374,9 +377,13 @@ class WriteContext { return result_; } + size_t target_write_size() const { return target_write_size_; } + private: grpc_chttp2_transport* const t_; - size_t target_write_size_ = 1024 * 1024; + size_t target_write_size_ = grpc_core::IsWriteSizePolicyEnabled() + ? t_->write_size_policy.WriteTargetSize() + : 1024 * 1024; // stats histogram counters: we increment these throughout this function, // and at the end publish to the central stats histograms @@ -406,11 +413,15 @@ class DataSendContext { } uint32_t max_outgoing() const { - return static_cast(std::min( - t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], - static_cast( - std::min(static_cast(stream_remote_window()), - t_->flow_control.remote_window())))); + return grpc_core::Clamp( + std::min( + {t_->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], + stream_remote_window(), t_->flow_control.remote_window(), + grpc_core::IsWriteSizeCapEnabled() + ? static_cast(write_context_->target_write_size()) + : std::numeric_limits::max()}), + 0, std::numeric_limits::max()); } bool AnyOutgoing() const { return max_outgoing() > 0; } diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index 3e32dcdb8b8..9afd08dc202 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -163,6 +163,12 @@ const char* const description_work_serializer_dispatch = "callback, instead of running things inline in the first thread that " "successfully enqueues work."; const char* const additional_constraints_work_serializer_dispatch = "{}"; +const char* const description_write_size_cap = + "Limit outgoing writes proportional to the target write size"; +const char* const additional_constraints_write_size_cap = "{}"; +const char* const description_write_size_policy = + "Try to size writes such that they don't create too large of a backlog"; +const char* const additional_constraints_write_size_policy = "{}"; const char* const description_wrr_delegate_to_pick_first = "Change WRR code to delegate to pick_first as per dualstack backend " "design."; @@ -262,6 +268,10 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_work_serializer_clears_time_cache, true, true}, {"work_serializer_dispatch", description_work_serializer_dispatch, additional_constraints_work_serializer_dispatch, false, true}, + {"write_size_cap", description_write_size_cap, + additional_constraints_write_size_cap, true, true}, + {"write_size_policy", description_write_size_policy, + additional_constraints_write_size_policy, true, true}, {"wrr_delegate_to_pick_first", description_wrr_delegate_to_pick_first, additional_constraints_wrr_delegate_to_pick_first, true, true}, }; @@ -411,6 +421,12 @@ const char* const description_work_serializer_dispatch = "callback, instead of running things inline in the first thread that " "successfully enqueues work."; const char* const additional_constraints_work_serializer_dispatch = "{}"; +const char* const description_write_size_cap = + "Limit outgoing writes proportional to the target write size"; +const char* const additional_constraints_write_size_cap = "{}"; +const char* const description_write_size_policy = + "Try to size writes such that they don't create too large of a backlog"; +const char* const additional_constraints_write_size_policy = "{}"; const char* const description_wrr_delegate_to_pick_first = "Change WRR code to delegate to pick_first as per dualstack backend " "design."; @@ -510,6 +526,10 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_work_serializer_clears_time_cache, true, true}, {"work_serializer_dispatch", description_work_serializer_dispatch, additional_constraints_work_serializer_dispatch, false, true}, + {"write_size_cap", description_write_size_cap, + additional_constraints_write_size_cap, true, true}, + {"write_size_policy", description_write_size_policy, + additional_constraints_write_size_policy, true, true}, {"wrr_delegate_to_pick_first", description_wrr_delegate_to_pick_first, additional_constraints_wrr_delegate_to_pick_first, true, true}, }; @@ -659,6 +679,12 @@ const char* const description_work_serializer_dispatch = "callback, instead of running things inline in the first thread that " "successfully enqueues work."; const char* const additional_constraints_work_serializer_dispatch = "{}"; +const char* const description_write_size_cap = + "Limit outgoing writes proportional to the target write size"; +const char* const additional_constraints_write_size_cap = "{}"; +const char* const description_write_size_policy = + "Try to size writes such that they don't create too large of a backlog"; +const char* const additional_constraints_write_size_policy = "{}"; const char* const description_wrr_delegate_to_pick_first = "Change WRR code to delegate to pick_first as per dualstack backend " "design."; @@ -758,6 +784,10 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_work_serializer_clears_time_cache, true, true}, {"work_serializer_dispatch", description_work_serializer_dispatch, additional_constraints_work_serializer_dispatch, false, true}, + {"write_size_cap", description_write_size_cap, + additional_constraints_write_size_cap, true, true}, + {"write_size_policy", description_write_size_policy, + additional_constraints_write_size_policy, true, true}, {"wrr_delegate_to_pick_first", description_wrr_delegate_to_pick_first, additional_constraints_wrr_delegate_to_pick_first, true, true}, }; diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index 9bfb8246ee7..ca679950edd 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -118,6 +118,10 @@ inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; } inline bool IsWorkSerializerDispatchEnabled() { return false; } +#define GRPC_EXPERIMENT_IS_INCLUDED_WRITE_SIZE_CAP +inline bool IsWriteSizeCapEnabled() { return true; } +#define GRPC_EXPERIMENT_IS_INCLUDED_WRITE_SIZE_POLICY +inline bool IsWriteSizePolicyEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_WRR_DELEGATE_TO_PICK_FIRST inline bool IsWrrDelegateToPickFirstEnabled() { return true; } @@ -183,6 +187,10 @@ inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; } inline bool IsWorkSerializerDispatchEnabled() { return false; } +#define GRPC_EXPERIMENT_IS_INCLUDED_WRITE_SIZE_CAP +inline bool IsWriteSizeCapEnabled() { return true; } +#define GRPC_EXPERIMENT_IS_INCLUDED_WRITE_SIZE_POLICY +inline bool IsWriteSizePolicyEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_WRR_DELEGATE_TO_PICK_FIRST inline bool IsWrrDelegateToPickFirstEnabled() { return true; } @@ -248,6 +256,10 @@ inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; } inline bool IsWorkSerializerDispatchEnabled() { return false; } +#define GRPC_EXPERIMENT_IS_INCLUDED_WRITE_SIZE_CAP +inline bool IsWriteSizeCapEnabled() { return true; } +#define GRPC_EXPERIMENT_IS_INCLUDED_WRITE_SIZE_POLICY +inline bool IsWriteSizePolicyEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_WRR_DELEGATE_TO_PICK_FIRST inline bool IsWrrDelegateToPickFirstEnabled() { return true; } #endif @@ -292,6 +304,8 @@ enum ExperimentIds { kExperimentIdUnconstrainedMaxQuotaBufferSize, kExperimentIdWorkSerializerClearsTimeCache, kExperimentIdWorkSerializerDispatch, + kExperimentIdWriteSizeCap, + kExperimentIdWriteSizePolicy, kExperimentIdWrrDelegateToPickFirst, kNumExperiments }; @@ -448,6 +462,14 @@ inline bool IsWorkSerializerClearsTimeCacheEnabled() { inline bool IsWorkSerializerDispatchEnabled() { return IsExperimentEnabled(kExperimentIdWorkSerializerDispatch); } +#define GRPC_EXPERIMENT_IS_INCLUDED_WRITE_SIZE_CAP +inline bool IsWriteSizeCapEnabled() { + return IsExperimentEnabled(kExperimentIdWriteSizeCap); +} +#define GRPC_EXPERIMENT_IS_INCLUDED_WRITE_SIZE_POLICY +inline bool IsWriteSizePolicyEnabled() { + return IsExperimentEnabled(kExperimentIdWriteSizePolicy); +} #define GRPC_EXPERIMENT_IS_INCLUDED_WRR_DELEGATE_TO_PICK_FIRST inline bool IsWrrDelegateToPickFirstEnabled() { return IsExperimentEnabled(kExperimentIdWrrDelegateToPickFirst); diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index ae3f6eb2690..ed9dcf6f653 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -278,6 +278,18 @@ expiry: 2024/02/10 owner: ctiller@google.com test_tags: ["core_end2end_test", "cpp_end2end_test", "xds_end2end_test", "lb_unit_test"] +- name: write_size_cap + description: + Limit outgoing writes proportional to the target write size + expiry: 2024/03/03 + owner: ctiller@google.com + test_tags: [flow_control_test] +- name: write_size_policy + description: + Try to size writes such that they don't create too large of a backlog + expiry: 2024/03/03 + owner: ctiller@google.com + test_tags: [flow_control_test] - name: wrr_delegate_to_pick_first description: Change WRR code to delegate to pick_first as per dualstack diff --git a/src/core/lib/experiments/rollouts.yaml b/src/core/lib/experiments/rollouts.yaml index 3d44c258839..70df2e2a8a6 100644 --- a/src/core/lib/experiments/rollouts.yaml +++ b/src/core/lib/experiments/rollouts.yaml @@ -122,5 +122,9 @@ default: true - name: work_serializer_dispatch default: false +- name: write_size_cap + default: true +- name: write_size_policy + default: true - name: wrr_delegate_to_pick_first default: true diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 2d31b89920c..4cf70ff39f6 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -125,6 +125,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/transport/chttp2/transport/ping_rate_policy.cc', 'src/core/ext/transport/chttp2/transport/stream_lists.cc', 'src/core/ext/transport/chttp2/transport/varint.cc', + 'src/core/ext/transport/chttp2/transport/write_size_policy.cc', 'src/core/ext/transport/chttp2/transport/writing.cc', 'src/core/ext/transport/inproc/inproc_plugin.cc', 'src/core/ext/transport/inproc/inproc_transport.cc', diff --git a/test/core/transport/chttp2/BUILD b/test/core/transport/chttp2/BUILD index b56f8c54fc2..77ce6b4d100 100644 --- a/test/core/transport/chttp2/BUILD +++ b/test/core/transport/chttp2/BUILD @@ -234,6 +234,20 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "write_size_policy_test", + srcs = ["write_size_policy_test.cc"], + external_deps = [ + "gtest", + "absl/random", + ], + language = "C++", + uses_polling = False, + deps = [ + "//src/core:write_size_policy", + ], +) + grpc_cc_test( name = "flow_control_test", srcs = ["flow_control_test.cc"], diff --git a/test/core/transport/chttp2/write_size_policy_test.cc b/test/core/transport/chttp2/write_size_policy_test.cc new file mode 100644 index 00000000000..de7c59bcd78 --- /dev/null +++ b/test/core/transport/chttp2/write_size_policy_test.cc @@ -0,0 +1,125 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/ext/transport/chttp2/transport/write_size_policy.h" + +#include + +#include "gtest/gtest.h" + +namespace grpc_core { +namespace { + +TEST(WriteSizePolicyTest, InitialValue) { + Chttp2WriteSizePolicy policy; + EXPECT_EQ(policy.WriteTargetSize(), 131072); +} + +TEST(WriteSizePolicyTest, FastWritesOpenThingsUp) { + ScopedTimeCache time_cache; + auto timestamp = [&time_cache](int i) { + time_cache.TestOnlySetNow(Timestamp::ProcessEpoch() + + Duration::Milliseconds(i)); + }; + Chttp2WriteSizePolicy policy; + EXPECT_EQ(policy.WriteTargetSize(), 131072); + timestamp(10); + policy.BeginWrite(131072); + timestamp(20); + policy.EndWrite(true); + EXPECT_EQ(policy.WriteTargetSize(), 131072); + timestamp(30); + policy.BeginWrite(131072); + timestamp(40); + policy.EndWrite(true); + EXPECT_EQ(policy.WriteTargetSize(), 196608); + timestamp(50); + policy.BeginWrite(196608); + timestamp(60); + policy.EndWrite(true); + EXPECT_EQ(policy.WriteTargetSize(), 196608); + timestamp(70); + policy.BeginWrite(196608); + timestamp(80); + policy.EndWrite(true); + EXPECT_EQ(policy.WriteTargetSize(), 294912); +} + +TEST(WriteSizePolicyTest, SlowWritesCloseThingsUp) { + ScopedTimeCache time_cache; + auto timestamp = [&time_cache](int i) { + time_cache.TestOnlySetNow(Timestamp::ProcessEpoch() + + Duration::Milliseconds(i)); + }; + Chttp2WriteSizePolicy policy; + EXPECT_EQ(policy.WriteTargetSize(), 131072); + timestamp(10000); + policy.BeginWrite(131072); + timestamp(20000); + policy.EndWrite(true); + EXPECT_EQ(policy.WriteTargetSize(), 131072); + timestamp(30000); + policy.BeginWrite(131072); + timestamp(40000); + policy.EndWrite(true); + EXPECT_EQ(policy.WriteTargetSize(), 43690); + timestamp(50000); + policy.BeginWrite(43690); + timestamp(60000); + policy.EndWrite(true); + EXPECT_EQ(policy.WriteTargetSize(), 43690); + timestamp(70000); + policy.BeginWrite(43690); + timestamp(80000); + policy.EndWrite(true); + EXPECT_EQ(policy.WriteTargetSize(), 32768); +} + +TEST(WriteSizePolicyTest, MediumWritesJustHangOut) { + ScopedTimeCache time_cache; + auto timestamp = [&time_cache](int i) { + time_cache.TestOnlySetNow(Timestamp::ProcessEpoch() + + Duration::Milliseconds(i)); + }; + Chttp2WriteSizePolicy policy; + EXPECT_EQ(policy.WriteTargetSize(), 131072); + timestamp(500); + policy.BeginWrite(131072); + timestamp(1000); + policy.EndWrite(true); + EXPECT_EQ(policy.WriteTargetSize(), 131072); + timestamp(1500); + policy.BeginWrite(131072); + timestamp(2000); + policy.EndWrite(true); + EXPECT_EQ(policy.WriteTargetSize(), 131072); + timestamp(2500); + policy.BeginWrite(131072); + timestamp(3000); + policy.EndWrite(true); + EXPECT_EQ(policy.WriteTargetSize(), 131072); + timestamp(3500); + policy.BeginWrite(131072); + timestamp(4000); + policy.EndWrite(true); + EXPECT_EQ(policy.WriteTargetSize(), 131072); +} + +} // namespace +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 914018d3a9c..34909a3976b 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1326,6 +1326,8 @@ src/core/ext/transport/chttp2/transport/ping_rate_policy.h \ src/core/ext/transport/chttp2/transport/stream_lists.cc \ src/core/ext/transport/chttp2/transport/varint.cc \ src/core/ext/transport/chttp2/transport/varint.h \ +src/core/ext/transport/chttp2/transport/write_size_policy.cc \ +src/core/ext/transport/chttp2/transport/write_size_policy.h \ src/core/ext/transport/chttp2/transport/writing.cc \ src/core/ext/transport/inproc/inproc_plugin.cc \ src/core/ext/transport/inproc/inproc_transport.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index fae9f531293..ce8ea8c85cb 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1102,6 +1102,8 @@ src/core/ext/transport/chttp2/transport/ping_rate_policy.h \ src/core/ext/transport/chttp2/transport/stream_lists.cc \ src/core/ext/transport/chttp2/transport/varint.cc \ src/core/ext/transport/chttp2/transport/varint.h \ +src/core/ext/transport/chttp2/transport/write_size_policy.cc \ +src/core/ext/transport/chttp2/transport/write_size_policy.h \ src/core/ext/transport/chttp2/transport/writing.cc \ src/core/ext/transport/inproc/inproc_plugin.cc \ src/core/ext/transport/inproc/inproc_transport.cc \ diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 2e182fefd31..522d4f3189f 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -11227,6 +11227,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "write_size_policy_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,