[chttp2] Tune initial window size/max frame size setting (#30550)

* [chttp2] Tune initial window size/max frame size setting

- Only choose frame sizes and initial window sizes that are powers of two - as a more predictable function to avoid flapping.
- If window size drops below 1kb then snap to 0 to avoid frantically changing a value that needn't be
- Allow the initial window size to drop to zero (we have fuzzing in place that says this is safe)

* copy/paste fix

* fix build

* fix

* Automated change: Fix sanity tests

* fix announce bug

* cleanup

* put this change under an experiment

* more tests

* fix build

* Automated change: Fix sanity tests

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/30867/head
Craig Tiller 3 years ago committed by GitHub
parent 346afa40c6
commit 47a43d68c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 31
      BUILD
  2. 1
      CMakeLists.txt
  3. 4
      bazel/experiments.bzl
  4. 3
      build_autogenerated.yaml
  5. 126
      src/core/ext/transport/chttp2/transport/flow_control.cc
  6. 13
      src/core/ext/transport/chttp2/transport/flow_control.h
  7. 3
      src/core/lib/experiments/experiments.cc
  8. 9
      src/core/lib/experiments/experiments.h
  9. 8
      src/core/lib/experiments/experiments.yaml
  10. 11
      src/core/lib/gpr/useful.h
  11. 12
      test/core/gpr/useful_test.cc
  12. 6
      test/core/transport/chttp2/BUILD
  13. 1
      test/core/transport/chttp2/flow_control_fuzzer.cc

31
BUILD

@ -3060,7 +3060,6 @@ grpc_cc_library(
],
hdrs = [
"src/core/lib/transport/error_utils.h",
"src/core/lib/transport/http2_errors.h",
"src/core/lib/address_utils/parse_address.h",
"src/core/lib/channel/call_finalization.h",
"src/core/lib/channel/call_tracer.h",
@ -3218,6 +3217,8 @@ grpc_cc_library(
"grpc_public_hdrs",
"grpc_sockaddr",
"grpc_trace",
"handshaker_registry",
"http2_errors",
"iomgr_fwd",
"iomgr_port",
"iomgr_timer",
@ -3249,6 +3250,13 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "http2_errors",
hdrs = [
"src/core/lib/transport/http2_errors.h",
],
)
grpc_cc_library(
name = "channel_stack_type",
srcs = [
@ -3834,6 +3842,7 @@ grpc_cc_library(
"grpc_base",
"grpc_codegen",
"grpc_trace",
"http2_errors",
"idle_filter_state",
"loop",
"orphanable",
@ -6613,6 +6622,7 @@ grpc_cc_library(
"experiments",
"gpr",
"grpc_trace",
"http2_settings",
"memory_quota",
"pid_controller",
"time",
@ -6620,6 +6630,21 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "http2_settings",
srcs = [
"src/core/ext/transport/chttp2/transport/http2_settings.cc",
],
hdrs = [
"src/core/ext/transport/chttp2/transport/http2_settings.h",
],
deps = [
"gpr_platform",
"http2_errors",
"useful",
],
)
grpc_cc_library(
name = "grpc_transport_chttp2",
srcs = [
@ -6636,7 +6661,6 @@ grpc_cc_library(
"src/core/ext/transport/chttp2/transport/hpack_encoder.cc",
"src/core/ext/transport/chttp2/transport/hpack_parser.cc",
"src/core/ext/transport/chttp2/transport/hpack_parser_table.cc",
"src/core/ext/transport/chttp2/transport/http2_settings.cc",
"src/core/ext/transport/chttp2/transport/huffsyms.cc",
"src/core/ext/transport/chttp2/transport/parsing.cc",
"src/core/ext/transport/chttp2/transport/stream_lists.cc",
@ -6659,7 +6683,6 @@ grpc_cc_library(
"src/core/ext/transport/chttp2/transport/hpack_encoder.h",
"src/core/ext/transport/chttp2/transport/hpack_parser.h",
"src/core/ext/transport/chttp2/transport/hpack_parser_table.h",
"src/core/ext/transport/chttp2/transport/http2_settings.h",
"src/core/ext/transport/chttp2/transport/huffsyms.h",
"src/core/ext/transport/chttp2/transport/internal.h",
"src/core/ext/transport/chttp2/transport/stream_map.h",
@ -6690,6 +6713,8 @@ grpc_cc_library(
"grpc_trace",
"hpack_constants",
"hpack_encoder_table",
"http2_errors",
"http2_settings",
"httpcli",
"iomgr_fwd",
"iomgr_timer",

1
CMakeLists.txt generated

@ -10024,6 +10024,7 @@ if(gRPC_BUILD_TESTS)
add_executable(flow_control_test
src/core/ext/transport/chttp2/transport/flow_control.cc
src/core/ext/transport/chttp2/transport/http2_settings.cc
src/core/ext/upb-generated/google/protobuf/any.upb.c
src/core/ext/upb-generated/google/rpc/status.upb.c
src/core/lib/debug/trace.cc

@ -18,6 +18,7 @@
EXPERIMENTS = {
"core_end2end_test": [
"flow_control_fixes",
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
"tcp_read_chunks",
@ -27,6 +28,9 @@ EXPERIMENTS = {
"tcp_rcv_lowat",
"tcp_read_chunks",
],
"flow_control_test": [
"flow_control_fixes",
],
"resource_quota_test": [
"memory_pressure_controller",
"periodic_resource_quota_reclamation",

@ -6063,6 +6063,7 @@ targets:
language: c++
headers:
- src/core/ext/transport/chttp2/transport/flow_control.h
- src/core/ext/transport/chttp2/transport/http2_settings.h
- src/core/ext/upb-generated/google/protobuf/any.upb.h
- src/core/ext/upb-generated/google/rpc/status.upb.h
- src/core/lib/debug/trace.h
@ -6108,9 +6109,11 @@ targets:
- src/core/lib/slice/slice_refcount_base.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/lib/transport/bdp_estimator.h
- src/core/lib/transport/http2_errors.h
- src/core/lib/transport/pid_controller.h
src:
- src/core/ext/transport/chttp2/transport/flow_control.cc
- src/core/ext/transport/chttp2/transport/http2_settings.cc
- src/core/ext/upb-generated/google/protobuf/any.upb.c
- src/core/ext/upb-generated/google/rpc/status.upb.c
- src/core/lib/debug/trace.cc

@ -35,19 +35,14 @@
#include <grpc/support/log.h>
#include "src/core/ext/transport/chttp2/transport/http2_settings.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/global_config_env.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/resource_quota/memory_quota.h"
grpc_core::TraceFlag grpc_flowctl_trace(false, "flowctl");
GPR_GLOBAL_CONFIG_DEFINE_BOOL(
grpc_experimental_broad_flow_control_range, false,
"Use an enlarged memory pressure range for scaling flow control when using "
"a resource quota.");
namespace grpc_core {
namespace chttp2 {
@ -275,54 +270,93 @@ TransportFlowControl::TargetInitialWindowSizeBasedOnMemoryPressureAndBdp()
}
void TransportFlowControl::UpdateSetting(
int64_t* desired_value, int64_t new_desired_value,
FlowControlAction* action,
grpc_chttp2_setting_id id, int64_t* desired_value,
uint32_t new_desired_value, FlowControlAction* action,
FlowControlAction& (FlowControlAction::*set)(FlowControlAction::Urgency,
uint32_t)) {
int64_t delta = new_desired_value - *desired_value;
// TODO(ncteisen): tune this
if (delta != 0 &&
(delta <= -*desired_value / 5 || delta >= *desired_value / 5)) {
*desired_value = new_desired_value;
(action->*set)(FlowControlAction::Urgency::QUEUE_UPDATE, *desired_value);
if (IsFlowControlFixesEnabled()) {
new_desired_value =
Clamp(new_desired_value, grpc_chttp2_settings_parameters[id].min_value,
grpc_chttp2_settings_parameters[id].max_value);
if (new_desired_value != *desired_value) {
*desired_value = new_desired_value;
(action->*set)(FlowControlAction::Urgency::QUEUE_UPDATE, *desired_value);
}
} else {
int64_t delta = new_desired_value - *desired_value;
// TODO(ncteisen): tune this
if (delta != 0 &&
(delta <= -*desired_value / 5 || delta >= *desired_value / 5)) {
*desired_value = new_desired_value;
(action->*set)(FlowControlAction::Urgency::QUEUE_UPDATE, *desired_value);
}
}
}
FlowControlAction TransportFlowControl::PeriodicUpdate() {
FlowControlAction action;
if (enable_bdp_probe_) {
// get bdp estimate and update initial_window accordingly.
// target might change based on how much memory pressure we are under
// TODO(ncteisen): experiment with setting target to be huge under low
// memory pressure.
double target = IsMemoryPressureControllerEnabled()
? TargetInitialWindowSizeBasedOnMemoryPressureAndBdp()
: pow(2, SmoothLogBdp(TargetLogBdp()));
if (g_test_only_transport_target_window_estimates_mocker != nullptr) {
// Hook for simulating unusual flow control situations in tests.
target = g_test_only_transport_target_window_estimates_mocker
->ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(
target_initial_window_size_ /* current target */);
if (IsFlowControlFixesEnabled()) {
// get bdp estimate and update initial_window accordingly.
// target might change based on how much memory pressure we are under
// TODO(ncteisen): experiment with setting target to be huge under low
// memory pressure.
uint32_t target = static_cast<uint32_t>(RoundUpToPowerOf2(
Clamp(IsMemoryPressureControllerEnabled()
? TargetInitialWindowSizeBasedOnMemoryPressureAndBdp()
: pow(2, SmoothLogBdp(TargetLogBdp())),
0.0, static_cast<double>(kMaxInitialWindowSize))));
if (target < kMinPositiveInitialWindowSize) target = 0;
if (g_test_only_transport_target_window_estimates_mocker != nullptr) {
// Hook for simulating unusual flow control situations in tests.
target = g_test_only_transport_target_window_estimates_mocker
->ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(
target_initial_window_size_ /* current target */);
}
// Though initial window 'could' drop to 0, we keep the floor at
// kMinInitialWindowSize
UpdateSetting(GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
&target_initial_window_size_, target, &action,
&FlowControlAction::set_send_initial_window_update);
// we target the max of BDP or bandwidth in microseconds.
UpdateSetting(GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, &target_frame_size_,
target, &action,
&FlowControlAction::set_send_max_frame_size_update);
} else {
// get bdp estimate and update initial_window accordingly.
// target might change based on how much memory pressure we are under
// TODO(ncteisen): experiment with setting target to be huge under low
// memory pressure.
double target = IsMemoryPressureControllerEnabled()
? TargetInitialWindowSizeBasedOnMemoryPressureAndBdp()
: pow(2, SmoothLogBdp(TargetLogBdp()));
if (g_test_only_transport_target_window_estimates_mocker != nullptr) {
// Hook for simulating unusual flow control situations in tests.
target = g_test_only_transport_target_window_estimates_mocker
->ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(
target_initial_window_size_ /* current target */);
}
// Though initial window 'could' drop to 0, we keep the floor at
// kMinInitialWindowSize
UpdateSetting(
GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
&target_initial_window_size_,
static_cast<int32_t>(Clamp(target, double(kMinInitialWindowSize),
double(kMaxInitialWindowSize))),
&action, &FlowControlAction::set_send_initial_window_update);
// get bandwidth estimate and update max_frame accordingly.
double bw_dbl = bdp_estimator_.EstimateBandwidth();
// we target the max of BDP or bandwidth in microseconds.
UpdateSetting(
GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, &target_frame_size_,
static_cast<int32_t>(Clamp(
std::max(
static_cast<int32_t>(Clamp(bw_dbl, 0.0, double(INT_MAX))) /
1000,
static_cast<int32_t>(target_initial_window_size_)),
16384, 16777215)),
&action, &FlowControlAction::set_send_max_frame_size_update);
}
// Though initial window 'could' drop to 0, we keep the floor at
// kMinInitialWindowSize
UpdateSetting(
&target_initial_window_size_,
static_cast<int32_t>(Clamp(target, double(kMinInitialWindowSize),
double(kMaxInitialWindowSize))),
&action, &FlowControlAction::set_send_initial_window_update);
// get bandwidth estimate and update max_frame accordingly.
double bw_dbl = bdp_estimator_.EstimateBandwidth();
// we target the max of BDP or bandwidth in microseconds.
UpdateSetting(
&target_frame_size_,
static_cast<int32_t>(Clamp(
std::max(static_cast<int32_t>(Clamp(bw_dbl, 0.0, double(INT_MAX))) /
1000,
static_cast<int32_t>(target_initial_window_size_)),
16384, 16777215)),
&action, &FlowControlAction::set_send_max_frame_size_update);
}
return UpdateAction(action);
}
@ -357,7 +391,7 @@ uint32_t StreamFlowControl::DesiredAnnounceSize() const {
FlowControlAction StreamFlowControl::UpdateAction(FlowControlAction action) {
const int64_t desired_announce_size = DesiredAnnounceSize();
if (desired_announce_size > 0) {
if ((min_progress_size_ > 0 && announced_window_delta_ < 0) ||
if ((min_progress_size_ > 0 && announced_window_delta_ <= 0) ||
desired_announce_size >= 8192) {
action.set_send_stream_update(
FlowControlAction::Urgency::UPDATE_IMMEDIATELY);

@ -33,6 +33,7 @@
#include <grpc/support/log.h>
#include "src/core/ext/transport/chttp2/transport/http2_settings.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/resource_quota/memory_quota.h"
@ -53,13 +54,16 @@ namespace chttp2 {
static constexpr uint32_t kDefaultWindow = 65535;
static constexpr uint32_t kDefaultFrameSize = 16384;
static constexpr int64_t kMaxWindow = static_cast<int64_t>((1u << 31) - 1);
// TODO(ncteisen): Tune this
static constexpr uint32_t kFrameSize = 1024 * 1024;
static constexpr const uint32_t kMinInitialWindowSize = 128;
// If smaller than this, advertise zero window.
static constexpr uint32_t kMinPositiveInitialWindowSize = 1024;
static constexpr const uint32_t kMaxInitialWindowSize = (1u << 30);
// The maximum per-stream flow control window delta to advertise.
static constexpr const int64_t kMaxWindowDelta = (1u << 20);
// TODO(ctiller): clean up when flow_control_fixes is enabled by default
static constexpr uint32_t kFrameSize = 1024 * 1024;
static constexpr const uint32_t kMinInitialWindowSize = 128;
class TransportFlowControl;
class StreamFlowControl;
@ -253,7 +257,8 @@ class TransportFlowControl final {
double TargetLogBdp();
double SmoothLogBdp(double value);
double TargetInitialWindowSizeBasedOnMemoryPressureAndBdp() const;
static void UpdateSetting(int64_t* desired_value, int64_t new_desired_value,
static void UpdateSetting(grpc_chttp2_setting_id id, int64_t* desired_value,
uint32_t new_desired_value,
FlowControlAction* action,
FlowControlAction& (FlowControlAction::*set)(
FlowControlAction::Urgency, uint32_t));

@ -29,6 +29,8 @@ const char* const description_tcp_read_chunks =
"malloc to recycle arbitrary large blocks.";
const char* const description_tcp_rcv_lowat =
"Use SO_RCVLOWAT to avoid wakeups on the read path.";
const char* const description_flow_control_fixes =
"Various fixes for flow control, max frame size setting.";
const char* const description_memory_pressure_controller =
"New memory pressure controller";
const char* const description_periodic_resource_quota_reclamation =
@ -43,6 +45,7 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"tcp_frame_size_tuning", description_tcp_frame_size_tuning, false},
{"tcp_read_chunks", description_tcp_read_chunks, false},
{"tcp_rcv_lowat", description_tcp_rcv_lowat, false},
{"flow_control_fixes", description_flow_control_fixes, false},
{"memory_pressure_controller", description_memory_pressure_controller,
false},
{"periodic_resource_quota_reclamation",

@ -28,14 +28,15 @@ namespace grpc_core {
inline bool IsTcpFrameSizeTuningEnabled() { return IsExperimentEnabled(0); }
inline bool IsTcpReadChunksEnabled() { return IsExperimentEnabled(1); }
inline bool IsTcpRcvLowatEnabled() { return IsExperimentEnabled(2); }
inline bool IsFlowControlFixesEnabled() { return IsExperimentEnabled(3); }
inline bool IsMemoryPressureControllerEnabled() {
return IsExperimentEnabled(3);
return IsExperimentEnabled(4);
}
inline bool IsPeriodicResourceQuotaReclamationEnabled() {
return IsExperimentEnabled(4);
return IsExperimentEnabled(5);
}
inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() {
return IsExperimentEnabled(5);
return IsExperimentEnabled(6);
}
struct ExperimentMetadata {
@ -44,7 +45,7 @@ struct ExperimentMetadata {
bool default_value;
};
constexpr const size_t kNumExperiments = 6;
constexpr const size_t kNumExperiments = 7;
extern const ExperimentMetadata g_experiment_metadata[kNumExperiments];
} // namespace grpc_core

@ -24,6 +24,7 @@
# Well known test tags:
# core_end2end_tests: all tests, fixtures in the core end2end suite
# endpoint_test: endpoint related iomgr tests
# flow_control_test: tests pertaining explicitly to flow control
- name: tcp_frame_size_tuning
description:
@ -50,6 +51,13 @@
expiry: 2022/10/01
owner: ctiller@google.com
test_tags: ["endpoint_test", "core_end2end_test"]
- name: flow_control_fixes
description:
Various fixes for flow control, max frame size setting.
default: false
expiry: 2022/10/01
owner: ctiller@google.com
test_tags: ["core_end2end_test", "flow_control_test"]
- name: memory_pressure_controller
description:
New memory pressure controller

@ -153,6 +153,17 @@ inline uint32_t MixHash32(uint32_t a, uint32_t b) {
return RotateLeft(a, 2u) ^ b;
}
inline uint32_t RoundUpToPowerOf2(uint32_t v) {
v--;
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
v++;
return v;
}
} // namespace grpc_core
#define GPR_ARRAY_SIZE(array) (sizeof(array) / sizeof(*(array)))

@ -76,6 +76,18 @@ TEST(UsefulTest, SaturatingAdd) {
std::numeric_limits<int64_t>::min());
}
TEST(UsefulTest, RoundUpToPowerOf2) {
EXPECT_EQ(RoundUpToPowerOf2(0), 0);
EXPECT_EQ(RoundUpToPowerOf2(1), 1);
EXPECT_EQ(RoundUpToPowerOf2(2), 2);
EXPECT_EQ(RoundUpToPowerOf2(3), 4);
EXPECT_EQ(RoundUpToPowerOf2(4), 4);
EXPECT_EQ(RoundUpToPowerOf2(5), 8);
EXPECT_EQ(RoundUpToPowerOf2(6), 8);
EXPECT_EQ(RoundUpToPowerOf2(7), 8);
EXPECT_EQ(RoundUpToPowerOf2(8), 8);
}
} // namespace grpc_core
int main(int argc, char** argv) {

@ -111,6 +111,7 @@ grpc_cc_test(
"gtest",
],
language = "C++",
tags = ["flow_control_test"],
uses_event_engine = False,
uses_polling = False,
deps = [
@ -266,7 +267,10 @@ grpc_cc_test(
"gtest",
],
language = "C++",
tags = ["no_windows"], # LARGE_MACHINE is not configured for windows RBE
tags = [
"flow_control_test",
"no_windows",
], # LARGE_MACHINE is not configured for windows RBE
deps = [
"//:gpr",
"//:grpc",

@ -341,7 +341,6 @@ void FlowControlFuzzer::PerformAction(FlowControlAction action,
[this, stream]() { streams_to_update_.push(stream->id); });
with_urgency(action.send_transport_update(), []() {});
with_urgency(action.send_initial_window_update(), [this, &action]() {
GPR_ASSERT(action.initial_window_size() >= chttp2::kMinInitialWindowSize);
GPR_ASSERT(action.initial_window_size() <= chttp2::kMaxInitialWindowSize);
queued_initial_window_size_ = action.initial_window_size();
});

Loading…
Cancel
Save