[chttp2] Fix performance regression for small stream sends (#31180)

* [chttp2] Fix performance regression for small stream sends

* fix

* comment

* comment

* fix
pull/30622/head^2
Craig Tiller 2 years ago committed by GitHub
parent 46d17e1304
commit 03c049b14f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 44
      src/core/ext/transport/chttp2/transport/flow_control.cc
  2. 1
      src/core/ext/transport/chttp2/transport/flow_control.h
  3. 2
      test/core/transport/chttp2/flow_control_test.cc

@ -279,7 +279,16 @@ void TransportFlowControl::UpdateSetting(
grpc_chttp2_settings_parameters[id].max_value);
if (new_desired_value != *desired_value) {
*desired_value = new_desired_value;
(action->*set)(FlowControlAction::Urgency::QUEUE_UPDATE, *desired_value);
// Reaching zero can only happen for initial window size, and if it occurs
// we really want to wake up writes and ensure all the queued stream
// window updates are flushed, since stream flow control operates
// differently at zero window size.
FlowControlAction::Urgency urgency =
FlowControlAction::Urgency::QUEUE_UPDATE;
if (new_desired_value == 0) {
urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
}
(action->*set)(urgency, *desired_value);
}
} else {
int64_t delta = new_desired_value - *desired_value;
@ -391,13 +400,34 @@ int64_t StreamFlowControl::DesiredAnnounceSize() const {
FlowControlAction StreamFlowControl::UpdateAction(FlowControlAction action) {
const int64_t desired_announce_size = DesiredAnnounceSize();
if (desired_announce_size > 0) {
if ((min_progress_size_ > 0 && announced_window_delta_ <= 0) ||
desired_announce_size >= 8192) {
action.set_send_stream_update(
FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
} else {
action.set_send_stream_update(FlowControlAction::Urgency::QUEUE_UPDATE);
FlowControlAction::Urgency urgency =
FlowControlAction::Urgency::QUEUE_UPDATE;
// Size at which we probably want to wake up and write regardless of whether
// we *have* to.
// Currently set at half the initial window size or 8kb (whichever is
// greater). 8kb means we don't send rapidly unnecessarily when the initial
// window size is small.
const int64_t hurry_up_size =
std::max(static_cast<int64_t>(tfc_->sent_init_window()) / 2,
static_cast<int64_t>(8192));
if (desired_announce_size > hurry_up_size) {
urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
}
// min_progress_size_ > 0 means we have a reader ready to read.
if (min_progress_size_ > 0) {
// If we're into initial window to receive that data we should wake up and
// send an update.
if (announced_window_delta_ < 0) {
urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
} else if (announced_window_delta_ == 0 &&
tfc_->sent_init_window() == 0) {
// Special case when initial window size is zero, meaning that
// announced_window_delta cannot become negative (it may already be so
// however).
urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
}
}
action.set_send_stream_update(urgency);
}
return action;
}

@ -236,6 +236,7 @@ class TransportFlowControl final {
BdpEstimator* bdp_estimator() { return &bdp_estimator_; }
uint32_t acked_init_window() const { return acked_init_window_; }
uint32_t sent_init_window() const { return target_initial_window_size_; }
void SetAckedInitialWindow(uint32_t value) { acked_init_window_ = value; }

@ -160,7 +160,7 @@ TEST(FlowControl, GradualReadsUpdate) {
break;
}
}
EXPECT_GT(immediate_updates, 0);
EXPECT_GE(immediate_updates, 0);
EXPECT_GT(queued_updates, 0);
EXPECT_EQ(immediate_updates + queued_updates, 65535);
}

Loading…
Cancel
Save