[chttp2] Bound write sizes based on observed write performance (#34665)

Instead of fixing a target size for writes, try to adapt it a little to
observed bandwidth.

The initial algorithm tries to get large writes within 100-1000ms
maximum delay - this range probably wants to be tuned, but let's see.

The hope here is that on slow connections we can not back buffer so much
and so when we need to send a ping-ack it's possible without great
delay.
pull/34680/head
Craig Tiller 1 year ago committed by GitHub
parent 716333135d
commit 7c59c09f43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 39
      CMakeLists.txt
  3. 2
      Makefile
  4. 2
      Package.swift
  5. 6
      bazel/experiments.bzl
  6. 20
      build_autogenerated.yaml
  7. 1
      config.m4
  8. 1
      config.w32
  9. 26
      fuzztest/core/transport/chttp2/BUILD
  10. 64
      fuzztest/core/transport/chttp2/write_size_policy_test.cc
  11. 2
      gRPC-C++.podspec
  12. 3
      gRPC-Core.podspec
  13. 2
      grpc.gemspec
  14. 2
      grpc.gyp
  15. 2
      package.xml
  16. 15
      src/core/BUILD
  17. 4
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  18. 5
      src/core/ext/transport/chttp2/transport/internal.h
  19. 60
      src/core/ext/transport/chttp2/transport/write_size_policy.cc
  20. 66
      src/core/ext/transport/chttp2/transport/write_size_policy.h
  21. 23
      src/core/ext/transport/chttp2/transport/writing.cc
  22. 30
      src/core/lib/experiments/experiments.cc
  23. 22
      src/core/lib/experiments/experiments.h
  24. 12
      src/core/lib/experiments/experiments.yaml
  25. 4
      src/core/lib/experiments/rollouts.yaml
  26. 1
      src/python/grpcio/grpc_core_dependencies.py
  27. 14
      test/core/transport/chttp2/BUILD
  28. 125
      test/core/transport/chttp2/write_size_policy_test.cc
  29. 2
      tools/doxygen/Doxyfile.c++.internal
  30. 2
      tools/doxygen/Doxyfile.core.internal
  31. 24
      tools/run_tests/generated/tests.json

@ -4056,6 +4056,7 @@ grpc_cc_library(
"//src/core:time",
"//src/core:transport_fwd",
"//src/core:useful",
"//src/core:write_size_policy",
],
)

39
CMakeLists.txt generated

@ -1447,6 +1447,7 @@ if(gRPC_BUILD_TESTS)
endif()
add_dependencies(buildtests_cxx write_buffering_at_end_test)
add_dependencies(buildtests_cxx write_buffering_test)
add_dependencies(buildtests_cxx write_size_policy_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx writes_per_rpc_test)
endif()
@ -1843,6 +1844,7 @@ add_library(grpc
src/core/ext/transport/chttp2/transport/ping_rate_policy.cc
src/core/ext/transport/chttp2/transport/stream_lists.cc
src/core/ext/transport/chttp2/transport/varint.cc
src/core/ext/transport/chttp2/transport/write_size_policy.cc
src/core/ext/transport/chttp2/transport/writing.cc
src/core/ext/transport/inproc/inproc_plugin.cc
src/core/ext/transport/inproc/inproc_transport.cc
@ -2882,6 +2884,7 @@ add_library(grpc_unsecure
src/core/ext/transport/chttp2/transport/ping_rate_policy.cc
src/core/ext/transport/chttp2/transport/stream_lists.cc
src/core/ext/transport/chttp2/transport/varint.cc
src/core/ext/transport/chttp2/transport/write_size_policy.cc
src/core/ext/transport/chttp2/transport/writing.cc
src/core/ext/transport/inproc/inproc_plugin.cc
src/core/ext/transport/inproc/inproc_transport.cc
@ -26199,6 +26202,42 @@ target_link_libraries(write_buffering_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(write_size_policy_test
src/core/ext/transport/chttp2/transport/write_size_policy.cc
src/core/lib/gprpp/time.cc
test/core/transport/chttp2/write_size_policy_test.cc
)
target_compile_features(write_size_policy_test PUBLIC cxx_std_14)
target_include_directories(write_size_policy_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(write_size_policy_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
absl::statusor
gpr
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)

2
Makefile generated

@ -1064,6 +1064,7 @@ LIBGRPC_SRC = \
src/core/ext/transport/chttp2/transport/ping_rate_policy.cc \
src/core/ext/transport/chttp2/transport/stream_lists.cc \
src/core/ext/transport/chttp2/transport/varint.cc \
src/core/ext/transport/chttp2/transport/write_size_policy.cc \
src/core/ext/transport/chttp2/transport/writing.cc \
src/core/ext/transport/inproc/inproc_plugin.cc \
src/core/ext/transport/inproc/inproc_transport.cc \
@ -1955,6 +1956,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/transport/chttp2/transport/ping_rate_policy.cc \
src/core/ext/transport/chttp2/transport/stream_lists.cc \
src/core/ext/transport/chttp2/transport/varint.cc \
src/core/ext/transport/chttp2/transport/write_size_policy.cc \
src/core/ext/transport/chttp2/transport/writing.cc \
src/core/ext/transport/inproc/inproc_plugin.cc \
src/core/ext/transport/inproc/inproc_transport.cc \

2
Package.swift generated

@ -323,6 +323,8 @@ let package = Package(
"src/core/ext/transport/chttp2/transport/stream_lists.cc",
"src/core/ext/transport/chttp2/transport/varint.cc",
"src/core/ext/transport/chttp2/transport/varint.h",
"src/core/ext/transport/chttp2/transport/write_size_policy.cc",
"src/core/ext/transport/chttp2/transport/write_size_policy.h",
"src/core/ext/transport/chttp2/transport/writing.cc",
"src/core/ext/transport/inproc/inproc_plugin.cc",
"src/core/ext/transport/inproc/inproc_transport.cc",

@ -83,6 +83,8 @@ EXPERIMENTS = {
"chttp2_batch_requests",
"chttp2_offload_on_rst_stream",
"lazier_stream_updates",
"write_size_cap",
"write_size_policy",
],
"lb_unit_test": [
"pick_first_happy_eyeballs",
@ -165,6 +167,8 @@ EXPERIMENTS = {
"chttp2_batch_requests",
"chttp2_offload_on_rst_stream",
"lazier_stream_updates",
"write_size_cap",
"write_size_policy",
],
"lb_unit_test": [
"pick_first_happy_eyeballs",
@ -257,6 +261,8 @@ EXPERIMENTS = {
"chttp2_batch_requests",
"chttp2_offload_on_rst_stream",
"lazier_stream_updates",
"write_size_cap",
"write_size_policy",
],
"lb_unit_test": [
"pick_first_happy_eyeballs",

@ -314,6 +314,7 @@ libs:
- src/core/ext/transport/chttp2/transport/ping_callbacks.h
- src/core/ext/transport/chttp2/transport/ping_rate_policy.h
- src/core/ext/transport/chttp2/transport/varint.h
- src/core/ext/transport/chttp2/transport/write_size_policy.h
- src/core/ext/transport/inproc/inproc_transport.h
- src/core/ext/transport/inproc/legacy_inproc_transport.h
- src/core/ext/upb-generated/envoy/admin/v3/certs.upb.h
@ -1145,6 +1146,7 @@ libs:
- src/core/ext/transport/chttp2/transport/ping_rate_policy.cc
- src/core/ext/transport/chttp2/transport/stream_lists.cc
- src/core/ext/transport/chttp2/transport/varint.cc
- src/core/ext/transport/chttp2/transport/write_size_policy.cc
- src/core/ext/transport/chttp2/transport/writing.cc
- src/core/ext/transport/inproc/inproc_plugin.cc
- src/core/ext/transport/inproc/inproc_transport.cc
@ -2060,6 +2062,7 @@ libs:
- src/core/ext/transport/chttp2/transport/ping_callbacks.h
- src/core/ext/transport/chttp2/transport/ping_rate_policy.h
- src/core/ext/transport/chttp2/transport/varint.h
- src/core/ext/transport/chttp2/transport/write_size_policy.h
- src/core/ext/transport/inproc/inproc_transport.h
- src/core/ext/transport/inproc/legacy_inproc_transport.h
- src/core/ext/upb-generated/google/api/annotations.upb.h
@ -2502,6 +2505,7 @@ libs:
- src/core/ext/transport/chttp2/transport/ping_rate_policy.cc
- src/core/ext/transport/chttp2/transport/stream_lists.cc
- src/core/ext/transport/chttp2/transport/varint.cc
- src/core/ext/transport/chttp2/transport/write_size_policy.cc
- src/core/ext/transport/chttp2/transport/writing.cc
- src/core/ext/transport/inproc/inproc_plugin.cc
- src/core/ext/transport/inproc/inproc_transport.cc
@ -17320,6 +17324,22 @@ targets:
- grpc_authorization_provider
- grpc_unsecure
- grpc_test_util
- name: write_size_policy_test
gtest: true
build: test
language: c++
headers:
- src/core/ext/transport/chttp2/transport/write_size_policy.h
- src/core/lib/gprpp/time.h
src:
- src/core/ext/transport/chttp2/transport/write_size_policy.cc
- src/core/lib/gprpp/time.cc
- test/core/transport/chttp2/write_size_policy_test.cc
deps:
- gtest
- absl/status:statusor
- gpr
uses_polling: false
- name: writes_per_rpc_test
gtest: true
build: test

1
config.m4 generated

@ -151,6 +151,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/transport/chttp2/transport/ping_rate_policy.cc \
src/core/ext/transport/chttp2/transport/stream_lists.cc \
src/core/ext/transport/chttp2/transport/varint.cc \
src/core/ext/transport/chttp2/transport/write_size_policy.cc \
src/core/ext/transport/chttp2/transport/writing.cc \
src/core/ext/transport/inproc/inproc_plugin.cc \
src/core/ext/transport/inproc/inproc_transport.cc \

1
config.w32 generated

@ -116,6 +116,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\transport\\chttp2\\transport\\ping_rate_policy.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\stream_lists.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\varint.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\write_size_policy.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\writing.cc " +
"src\\core\\ext\\transport\\inproc\\inproc_plugin.cc " +
"src\\core\\ext\\transport\\inproc\\inproc_transport.cc " +

@ -0,0 +1,26 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("//fuzztest:grpc_fuzz_test.bzl", "grpc_fuzz_test")
grpc_fuzz_test(
name = "write_size_policy_test",
srcs = ["write_size_policy_test.cc"],
external_deps = [
"fuzztest",
"fuzztest_main",
"gtest",
],
deps = ["//src/core:write_size_policy"],
)

@ -0,0 +1,64 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Test to verify Fuzztest integration
#include "src/core/ext/transport/chttp2/transport/write_size_policy.h"
#include <vector>
#include "fuzztest/fuzztest.h"
#include "gtest/gtest.h"
namespace grpc_core {
struct OneWrite {
uint16_t delay_start;
uint32_t size;
uint16_t write_time;
bool success;
};
void WriteSizePolicyStaysWithinBounds(std::vector<OneWrite> ops) {
ScopedTimeCache time_cache;
uint32_t now = 100;
Chttp2WriteSizePolicy policy;
for (const OneWrite op : ops) {
const auto start_target = policy.WriteTargetSize();
now += op.delay_start;
time_cache.TestOnlySetNow(Timestamp::ProcessEpoch() +
Duration::Milliseconds(now));
policy.BeginWrite(op.size);
now += op.write_time;
time_cache.TestOnlySetNow(Timestamp::ProcessEpoch() +
Duration::Milliseconds(now));
policy.EndWrite(op.success);
if (op.size >= start_target * 7 / 10) {
if (op.write_time < Chttp2WriteSizePolicy::FastWrite().millis()) {
EXPECT_GE(policy.WriteTargetSize(), start_target);
EXPECT_LE(policy.WriteTargetSize(), start_target * 3 / 2);
} else if (op.write_time > Chttp2WriteSizePolicy::SlowWrite().millis()) {
EXPECT_LE(policy.WriteTargetSize(), start_target);
EXPECT_GE(policy.WriteTargetSize(), start_target / 3);
}
} else {
EXPECT_EQ(policy.WriteTargetSize(), start_target);
}
EXPECT_GE(policy.WriteTargetSize(), Chttp2WriteSizePolicy::MinTarget());
EXPECT_LE(policy.WriteTargetSize(), Chttp2WriteSizePolicy::MaxTarget());
}
}
FUZZ_TEST(MyTestSuite, WriteSizePolicyStaysWithinBounds);
} // namespace grpc_core

2
gRPC-C++.podspec generated

@ -385,6 +385,7 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/transport/ping_callbacks.h',
'src/core/ext/transport/chttp2/transport/ping_rate_policy.h',
'src/core/ext/transport/chttp2/transport/varint.h',
'src/core/ext/transport/chttp2/transport/write_size_policy.h',
'src/core/ext/transport/inproc/inproc_transport.h',
'src/core/ext/transport/inproc/legacy_inproc_transport.h',
'src/core/ext/upb-generated/envoy/admin/v3/certs.upb.h',
@ -1457,6 +1458,7 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/transport/ping_callbacks.h',
'src/core/ext/transport/chttp2/transport/ping_rate_policy.h',
'src/core/ext/transport/chttp2/transport/varint.h',
'src/core/ext/transport/chttp2/transport/write_size_policy.h',
'src/core/ext/transport/inproc/inproc_transport.h',
'src/core/ext/transport/inproc/legacy_inproc_transport.h',
'src/core/ext/upb-generated/envoy/admin/v3/certs.upb.h',

3
gRPC-Core.podspec generated

@ -426,6 +426,8 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/transport/stream_lists.cc',
'src/core/ext/transport/chttp2/transport/varint.cc',
'src/core/ext/transport/chttp2/transport/varint.h',
'src/core/ext/transport/chttp2/transport/write_size_policy.cc',
'src/core/ext/transport/chttp2/transport/write_size_policy.h',
'src/core/ext/transport/chttp2/transport/writing.cc',
'src/core/ext/transport/inproc/inproc_plugin.cc',
'src/core/ext/transport/inproc/inproc_transport.cc',
@ -2214,6 +2216,7 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/transport/ping_callbacks.h',
'src/core/ext/transport/chttp2/transport/ping_rate_policy.h',
'src/core/ext/transport/chttp2/transport/varint.h',
'src/core/ext/transport/chttp2/transport/write_size_policy.h',
'src/core/ext/transport/inproc/inproc_transport.h',
'src/core/ext/transport/inproc/legacy_inproc_transport.h',
'src/core/ext/upb-generated/envoy/admin/v3/certs.upb.h',

2
grpc.gemspec generated

@ -329,6 +329,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/transport/chttp2/transport/stream_lists.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/varint.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/varint.h )
s.files += %w( src/core/ext/transport/chttp2/transport/write_size_policy.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/write_size_policy.h )
s.files += %w( src/core/ext/transport/chttp2/transport/writing.cc )
s.files += %w( src/core/ext/transport/inproc/inproc_plugin.cc )
s.files += %w( src/core/ext/transport/inproc/inproc_transport.cc )

2
grpc.gyp generated

@ -382,6 +382,7 @@
'src/core/ext/transport/chttp2/transport/ping_rate_policy.cc',
'src/core/ext/transport/chttp2/transport/stream_lists.cc',
'src/core/ext/transport/chttp2/transport/varint.cc',
'src/core/ext/transport/chttp2/transport/write_size_policy.cc',
'src/core/ext/transport/chttp2/transport/writing.cc',
'src/core/ext/transport/inproc/inproc_plugin.cc',
'src/core/ext/transport/inproc/inproc_transport.cc',
@ -1215,6 +1216,7 @@
'src/core/ext/transport/chttp2/transport/ping_rate_policy.cc',
'src/core/ext/transport/chttp2/transport/stream_lists.cc',
'src/core/ext/transport/chttp2/transport/varint.cc',
'src/core/ext/transport/chttp2/transport/write_size_policy.cc',
'src/core/ext/transport/chttp2/transport/writing.cc',
'src/core/ext/transport/inproc/inproc_plugin.cc',
'src/core/ext/transport/inproc/inproc_transport.cc',

2
package.xml generated

@ -311,6 +311,8 @@
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/stream_lists.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/varint.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/varint.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/write_size_policy.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/write_size_policy.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/writing.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/inproc/inproc_plugin.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/inproc/inproc_transport.cc" role="src" />

@ -5717,6 +5717,21 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "write_size_policy",
srcs = [
"ext/transport/chttp2/transport/write_size_policy.cc",
],
hdrs = [
"ext/transport/chttp2/transport/write_size_policy.h",
],
deps = [
"time",
"//:gpr",
"//:gpr_platform",
],
)
grpc_cc_library(
name = "ping_rate_policy",
srcs = [

@ -70,6 +70,7 @@
#include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
#include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
#include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/ext/transport/chttp2/transport/write_size_policy.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/context.h"
@ -1086,6 +1087,7 @@ static void write_action(grpc_chttp2_transport* t) {
gpr_log(GPR_INFO, "%s[%p]: Write %" PRIdPTR " bytes",
t->is_client ? "CLIENT" : "SERVER", t, t->outbuf.Length());
}
t->write_size_policy.BeginWrite(t->outbuf.Length());
grpc_endpoint_write(t->ep, t->outbuf.c_slice_buffer(),
grpc_core::InitTransportClosure<write_action_end>(
t->Ref(), &t->write_action_end_locked),
@ -1109,6 +1111,8 @@ static void write_action_end(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
static void write_action_end_locked(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
grpc_error_handle error) {
t->write_size_policy.EndWrite(error.ok());
bool closed = false;
if (!error.ok()) {
close_transport_locked(t.get(), error);

@ -55,6 +55,7 @@
#include "src/core/ext/transport/chttp2/transport/ping_abuse_policy.h"
#include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
#include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
#include "src/core/ext/transport/chttp2/transport/write_size_policy.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/debug/trace.h"
@ -485,9 +486,11 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
grpc_core::Duration settings_timeout;
/// how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
///
uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow;
/// policy for how much data we're willing to put into one http2 write
grpc_core::Chttp2WriteSizePolicy write_size_policy;
bool reading_paused_on_pending_induced_frames = false;
/// Based on channel args, preferred_rx_crypto_frame_sizes are advertised to
/// the peer

@ -0,0 +1,60 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/ext/transport/chttp2/transport/write_size_policy.h"
#include <algorithm>
#include <grpc/support/log.h>
namespace grpc_core {
size_t Chttp2WriteSizePolicy::WriteTargetSize() { return current_target_; }
void Chttp2WriteSizePolicy::BeginWrite(size_t size) {
GPR_ASSERT(experiment_start_time_ == Timestamp::InfFuture());
if (size < current_target_ * 7 / 10) {
// If we were trending fast but stopped getting enough data to verify, then
// reset back to the default state.
if (state_ < 0) state_ = 0;
return;
}
experiment_start_time_ = Timestamp::Now();
}
void Chttp2WriteSizePolicy::EndWrite(bool success) {
if (experiment_start_time_ == Timestamp::InfFuture()) return;
const auto elapsed = Timestamp::Now() - experiment_start_time_;
experiment_start_time_ = Timestamp::InfFuture();
if (!success) return;
if (elapsed < FastWrite()) {
--state_;
if (state_ == -2) {
state_ = 0;
current_target_ = std::min(current_target_ * 3 / 2, MaxTarget());
}
} else if (elapsed > SlowWrite()) {
++state_;
if (state_ == 2) {
state_ = 0;
current_target_ = std::max(current_target_ / 3, MinTarget());
}
} else {
state_ = 0;
}
}
} // namespace grpc_core

@ -0,0 +1,66 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_WRITE_SIZE_POLICY_H
#define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_WRITE_SIZE_POLICY_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <stdint.h>
#include "src/core/lib/gprpp/time.h"
namespace grpc_core {
class Chttp2WriteSizePolicy {
public:
// Smallest possible WriteTargetSize
static constexpr size_t MinTarget() { return 32 * 1024; }
// Largest possible WriteTargetSize
static constexpr size_t MaxTarget() { return 16 * 1024 * 1024; }
// How long should a write take to be considered "fast"
static constexpr Duration FastWrite() { return Duration::Milliseconds(100); }
// How long should a write take to be considered "slow"
static constexpr Duration SlowWrite() { return Duration::Seconds(1); }
// If a read is slow, what target time should we use to try and adjust back
// to?
static constexpr Duration TargetWriteTime() {
return Duration::Milliseconds(300);
}
// What size should be targetted for the next write.
size_t WriteTargetSize();
// Notify the policy that a write of some size has begun.
// EndWrite must be called when the write completes.
void BeginWrite(size_t size);
// Notify the policy that a write of some size has ended.
void EndWrite(bool success);
private:
size_t current_target_ = 128 * 1024;
Timestamp experiment_start_time_ = Timestamp::InfFuture();
// State varies from -2...2
// Every time we do a write faster than kFastWrite, we decrement
// Every time we do a write slower than kSlowWrite, we increment
// If we hit -2, we increase the target size and reset state to 0
// If we hit 2, we decrease the target size and reset state to 0
// In this way, we need two consecutive fast/slow operations to adjust,
// denoising the signal significantly
int8_t state_ = 0;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_WRITE_SIZE_POLICY_H

@ -22,6 +22,7 @@
#include <stddef.h>
#include <algorithm>
#include <limits>
#include <memory>
#include <string>
#include <utility>
@ -50,11 +51,13 @@
#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h"
#include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
#include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
#include "src/core/ext/transport/chttp2/transport/write_size_policy.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -374,9 +377,13 @@ class WriteContext {
return result_;
}
size_t target_write_size() const { return target_write_size_; }
private:
grpc_chttp2_transport* const t_;
size_t target_write_size_ = 1024 * 1024;
size_t target_write_size_ = grpc_core::IsWriteSizePolicyEnabled()
? t_->write_size_policy.WriteTargetSize()
: 1024 * 1024;
// stats histogram counters: we increment these throughout this function,
// and at the end publish to the central stats histograms
@ -406,11 +413,15 @@ class DataSendContext {
}
uint32_t max_outgoing() const {
return static_cast<uint32_t>(std::min(
t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
static_cast<uint32_t>(
std::min(static_cast<int64_t>(stream_remote_window()),
t_->flow_control.remote_window()))));
return grpc_core::Clamp<uint32_t>(
std::min<int64_t>(
{t_->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
stream_remote_window(), t_->flow_control.remote_window(),
grpc_core::IsWriteSizeCapEnabled()
? static_cast<int64_t>(write_context_->target_write_size())
: std::numeric_limits<uint32_t>::max()}),
0, std::numeric_limits<uint32_t>::max());
}
bool AnyOutgoing() const { return max_outgoing() > 0; }

@ -163,6 +163,12 @@ const char* const description_work_serializer_dispatch =
"callback, instead of running things inline in the first thread that "
"successfully enqueues work.";
const char* const additional_constraints_work_serializer_dispatch = "{}";
const char* const description_write_size_cap =
"Limit outgoing writes proportional to the target write size";
const char* const additional_constraints_write_size_cap = "{}";
const char* const description_write_size_policy =
"Try to size writes such that they don't create too large of a backlog";
const char* const additional_constraints_write_size_policy = "{}";
const char* const description_wrr_delegate_to_pick_first =
"Change WRR code to delegate to pick_first as per dualstack backend "
"design.";
@ -262,6 +268,10 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_work_serializer_clears_time_cache, true, true},
{"work_serializer_dispatch", description_work_serializer_dispatch,
additional_constraints_work_serializer_dispatch, false, true},
{"write_size_cap", description_write_size_cap,
additional_constraints_write_size_cap, true, true},
{"write_size_policy", description_write_size_policy,
additional_constraints_write_size_policy, true, true},
{"wrr_delegate_to_pick_first", description_wrr_delegate_to_pick_first,
additional_constraints_wrr_delegate_to_pick_first, true, true},
};
@ -411,6 +421,12 @@ const char* const description_work_serializer_dispatch =
"callback, instead of running things inline in the first thread that "
"successfully enqueues work.";
const char* const additional_constraints_work_serializer_dispatch = "{}";
const char* const description_write_size_cap =
"Limit outgoing writes proportional to the target write size";
const char* const additional_constraints_write_size_cap = "{}";
const char* const description_write_size_policy =
"Try to size writes such that they don't create too large of a backlog";
const char* const additional_constraints_write_size_policy = "{}";
const char* const description_wrr_delegate_to_pick_first =
"Change WRR code to delegate to pick_first as per dualstack backend "
"design.";
@ -510,6 +526,10 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_work_serializer_clears_time_cache, true, true},
{"work_serializer_dispatch", description_work_serializer_dispatch,
additional_constraints_work_serializer_dispatch, false, true},
{"write_size_cap", description_write_size_cap,
additional_constraints_write_size_cap, true, true},
{"write_size_policy", description_write_size_policy,
additional_constraints_write_size_policy, true, true},
{"wrr_delegate_to_pick_first", description_wrr_delegate_to_pick_first,
additional_constraints_wrr_delegate_to_pick_first, true, true},
};
@ -659,6 +679,12 @@ const char* const description_work_serializer_dispatch =
"callback, instead of running things inline in the first thread that "
"successfully enqueues work.";
const char* const additional_constraints_work_serializer_dispatch = "{}";
const char* const description_write_size_cap =
"Limit outgoing writes proportional to the target write size";
const char* const additional_constraints_write_size_cap = "{}";
const char* const description_write_size_policy =
"Try to size writes such that they don't create too large of a backlog";
const char* const additional_constraints_write_size_policy = "{}";
const char* const description_wrr_delegate_to_pick_first =
"Change WRR code to delegate to pick_first as per dualstack backend "
"design.";
@ -758,6 +784,10 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_work_serializer_clears_time_cache, true, true},
{"work_serializer_dispatch", description_work_serializer_dispatch,
additional_constraints_work_serializer_dispatch, false, true},
{"write_size_cap", description_write_size_cap,
additional_constraints_write_size_cap, true, true},
{"write_size_policy", description_write_size_policy,
additional_constraints_write_size_policy, true, true},
{"wrr_delegate_to_pick_first", description_wrr_delegate_to_pick_first,
additional_constraints_wrr_delegate_to_pick_first, true, true},
};

@ -118,6 +118,10 @@ inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
inline bool IsWorkSerializerDispatchEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WRITE_SIZE_CAP
inline bool IsWriteSizeCapEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WRITE_SIZE_POLICY
inline bool IsWriteSizePolicyEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WRR_DELEGATE_TO_PICK_FIRST
inline bool IsWrrDelegateToPickFirstEnabled() { return true; }
@ -183,6 +187,10 @@ inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
inline bool IsWorkSerializerDispatchEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WRITE_SIZE_CAP
inline bool IsWriteSizeCapEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WRITE_SIZE_POLICY
inline bool IsWriteSizePolicyEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WRR_DELEGATE_TO_PICK_FIRST
inline bool IsWrrDelegateToPickFirstEnabled() { return true; }
@ -248,6 +256,10 @@ inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
inline bool IsWorkSerializerDispatchEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WRITE_SIZE_CAP
inline bool IsWriteSizeCapEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WRITE_SIZE_POLICY
inline bool IsWriteSizePolicyEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WRR_DELEGATE_TO_PICK_FIRST
inline bool IsWrrDelegateToPickFirstEnabled() { return true; }
#endif
@ -292,6 +304,8 @@ enum ExperimentIds {
kExperimentIdUnconstrainedMaxQuotaBufferSize,
kExperimentIdWorkSerializerClearsTimeCache,
kExperimentIdWorkSerializerDispatch,
kExperimentIdWriteSizeCap,
kExperimentIdWriteSizePolicy,
kExperimentIdWrrDelegateToPickFirst,
kNumExperiments
};
@ -448,6 +462,14 @@ inline bool IsWorkSerializerClearsTimeCacheEnabled() {
inline bool IsWorkSerializerDispatchEnabled() {
return IsExperimentEnabled(kExperimentIdWorkSerializerDispatch);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_WRITE_SIZE_CAP
inline bool IsWriteSizeCapEnabled() {
return IsExperimentEnabled(kExperimentIdWriteSizeCap);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_WRITE_SIZE_POLICY
inline bool IsWriteSizePolicyEnabled() {
return IsExperimentEnabled(kExperimentIdWriteSizePolicy);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_WRR_DELEGATE_TO_PICK_FIRST
inline bool IsWrrDelegateToPickFirstEnabled() {
return IsExperimentEnabled(kExperimentIdWrrDelegateToPickFirst);

@ -278,6 +278,18 @@
expiry: 2024/02/10
owner: ctiller@google.com
test_tags: ["core_end2end_test", "cpp_end2end_test", "xds_end2end_test", "lb_unit_test"]
- name: write_size_cap
description:
Limit outgoing writes proportional to the target write size
expiry: 2024/03/03
owner: ctiller@google.com
test_tags: [flow_control_test]
- name: write_size_policy
description:
Try to size writes such that they don't create too large of a backlog
expiry: 2024/03/03
owner: ctiller@google.com
test_tags: [flow_control_test]
- name: wrr_delegate_to_pick_first
description:
Change WRR code to delegate to pick_first as per dualstack

@ -122,5 +122,9 @@
default: true
- name: work_serializer_dispatch
default: false
- name: write_size_cap
default: true
- name: write_size_policy
default: true
- name: wrr_delegate_to_pick_first
default: true

@ -125,6 +125,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/transport/chttp2/transport/ping_rate_policy.cc',
'src/core/ext/transport/chttp2/transport/stream_lists.cc',
'src/core/ext/transport/chttp2/transport/varint.cc',
'src/core/ext/transport/chttp2/transport/write_size_policy.cc',
'src/core/ext/transport/chttp2/transport/writing.cc',
'src/core/ext/transport/inproc/inproc_plugin.cc',
'src/core/ext/transport/inproc/inproc_transport.cc',

@ -234,6 +234,20 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "write_size_policy_test",
srcs = ["write_size_policy_test.cc"],
external_deps = [
"gtest",
"absl/random",
],
language = "C++",
uses_polling = False,
deps = [
"//src/core:write_size_policy",
],
)
grpc_cc_test(
name = "flow_control_test",
srcs = ["flow_control_test.cc"],

@ -0,0 +1,125 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/ext/transport/chttp2/transport/write_size_policy.h"
#include <memory>
#include "gtest/gtest.h"
namespace grpc_core {
namespace {
TEST(WriteSizePolicyTest, InitialValue) {
Chttp2WriteSizePolicy policy;
EXPECT_EQ(policy.WriteTargetSize(), 131072);
}
TEST(WriteSizePolicyTest, FastWritesOpenThingsUp) {
ScopedTimeCache time_cache;
auto timestamp = [&time_cache](int i) {
time_cache.TestOnlySetNow(Timestamp::ProcessEpoch() +
Duration::Milliseconds(i));
};
Chttp2WriteSizePolicy policy;
EXPECT_EQ(policy.WriteTargetSize(), 131072);
timestamp(10);
policy.BeginWrite(131072);
timestamp(20);
policy.EndWrite(true);
EXPECT_EQ(policy.WriteTargetSize(), 131072);
timestamp(30);
policy.BeginWrite(131072);
timestamp(40);
policy.EndWrite(true);
EXPECT_EQ(policy.WriteTargetSize(), 196608);
timestamp(50);
policy.BeginWrite(196608);
timestamp(60);
policy.EndWrite(true);
EXPECT_EQ(policy.WriteTargetSize(), 196608);
timestamp(70);
policy.BeginWrite(196608);
timestamp(80);
policy.EndWrite(true);
EXPECT_EQ(policy.WriteTargetSize(), 294912);
}
TEST(WriteSizePolicyTest, SlowWritesCloseThingsUp) {
ScopedTimeCache time_cache;
auto timestamp = [&time_cache](int i) {
time_cache.TestOnlySetNow(Timestamp::ProcessEpoch() +
Duration::Milliseconds(i));
};
Chttp2WriteSizePolicy policy;
EXPECT_EQ(policy.WriteTargetSize(), 131072);
timestamp(10000);
policy.BeginWrite(131072);
timestamp(20000);
policy.EndWrite(true);
EXPECT_EQ(policy.WriteTargetSize(), 131072);
timestamp(30000);
policy.BeginWrite(131072);
timestamp(40000);
policy.EndWrite(true);
EXPECT_EQ(policy.WriteTargetSize(), 43690);
timestamp(50000);
policy.BeginWrite(43690);
timestamp(60000);
policy.EndWrite(true);
EXPECT_EQ(policy.WriteTargetSize(), 43690);
timestamp(70000);
policy.BeginWrite(43690);
timestamp(80000);
policy.EndWrite(true);
EXPECT_EQ(policy.WriteTargetSize(), 32768);
}
TEST(WriteSizePolicyTest, MediumWritesJustHangOut) {
ScopedTimeCache time_cache;
auto timestamp = [&time_cache](int i) {
time_cache.TestOnlySetNow(Timestamp::ProcessEpoch() +
Duration::Milliseconds(i));
};
Chttp2WriteSizePolicy policy;
EXPECT_EQ(policy.WriteTargetSize(), 131072);
timestamp(500);
policy.BeginWrite(131072);
timestamp(1000);
policy.EndWrite(true);
EXPECT_EQ(policy.WriteTargetSize(), 131072);
timestamp(1500);
policy.BeginWrite(131072);
timestamp(2000);
policy.EndWrite(true);
EXPECT_EQ(policy.WriteTargetSize(), 131072);
timestamp(2500);
policy.BeginWrite(131072);
timestamp(3000);
policy.EndWrite(true);
EXPECT_EQ(policy.WriteTargetSize(), 131072);
timestamp(3500);
policy.BeginWrite(131072);
timestamp(4000);
policy.EndWrite(true);
EXPECT_EQ(policy.WriteTargetSize(), 131072);
}
} // namespace
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -1326,6 +1326,8 @@ src/core/ext/transport/chttp2/transport/ping_rate_policy.h \
src/core/ext/transport/chttp2/transport/stream_lists.cc \
src/core/ext/transport/chttp2/transport/varint.cc \
src/core/ext/transport/chttp2/transport/varint.h \
src/core/ext/transport/chttp2/transport/write_size_policy.cc \
src/core/ext/transport/chttp2/transport/write_size_policy.h \
src/core/ext/transport/chttp2/transport/writing.cc \
src/core/ext/transport/inproc/inproc_plugin.cc \
src/core/ext/transport/inproc/inproc_transport.cc \

@ -1102,6 +1102,8 @@ src/core/ext/transport/chttp2/transport/ping_rate_policy.h \
src/core/ext/transport/chttp2/transport/stream_lists.cc \
src/core/ext/transport/chttp2/transport/varint.cc \
src/core/ext/transport/chttp2/transport/varint.h \
src/core/ext/transport/chttp2/transport/write_size_policy.cc \
src/core/ext/transport/chttp2/transport/write_size_policy.h \
src/core/ext/transport/chttp2/transport/writing.cc \
src/core/ext/transport/inproc/inproc_plugin.cc \
src/core/ext/transport/inproc/inproc_transport.cc \

@ -11227,6 +11227,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "write_size_policy_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save