Merge pull request #12915 from ctiller/flow++

C++ize flow control
pull/13026/head
Craig Tiller 7 years ago committed by GitHub
commit e9cb5d8da4
  1. 1
      BUILD
  2. 1
      build.yaml
  3. 2
      gRPC-Core.podspec
  4. 1
      grpc.gemspec
  5. 1
      package.xml
  6. 153
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  7. 573
      src/core/ext/transport/chttp2/transport/flow_control.cc
  8. 328
      src/core/ext/transport/chttp2/transport/flow_control.h
  9. 4
      src/core/ext/transport/chttp2/transport/frame_settings.cc
  10. 10
      src/core/ext/transport/chttp2/transport/frame_window_update.cc
  11. 153
      src/core/ext/transport/chttp2/transport/internal.h
  12. 17
      src/core/ext/transport/chttp2/transport/parsing.cc
  13. 29
      src/core/ext/transport/chttp2/transport/writing.cc
  14. 11
      src/core/lib/transport/bdp_estimator.h
  15. 2
      test/core/transport/BUILD
  16. 20
      test/core/transport/bdp_estimator_test.cc
  17. 36
      test/cpp/microbenchmarks/bm_chttp2_transport.cc
  18. 21
      test/cpp/microbenchmarks/bm_fullstack_trickle.cc
  19. 1
      tools/doxygen/Doxyfile.core.internal
  20. 2
      tools/run_tests/generated/sources_and_headers.json

@ -1261,6 +1261,7 @@ grpc_cc_library(
"src/core/ext/transport/chttp2/transport/bin_encoder.h", "src/core/ext/transport/chttp2/transport/bin_encoder.h",
"src/core/ext/transport/chttp2/transport/chttp2_transport.h", "src/core/ext/transport/chttp2/transport/chttp2_transport.h",
"src/core/ext/transport/chttp2/transport/frame.h", "src/core/ext/transport/chttp2/transport/frame.h",
"src/core/ext/transport/chttp2/transport/flow_control.h",
"src/core/ext/transport/chttp2/transport/frame_data.h", "src/core/ext/transport/chttp2/transport/frame_data.h",
"src/core/ext/transport/chttp2/transport/frame_goaway.h", "src/core/ext/transport/chttp2/transport/frame_goaway.h",
"src/core/ext/transport/chttp2/transport/frame_ping.h", "src/core/ext/transport/chttp2/transport/frame_ping.h",

@ -778,6 +778,7 @@ filegroups:
- src/core/ext/transport/chttp2/transport/bin_decoder.h - src/core/ext/transport/chttp2/transport/bin_decoder.h
- src/core/ext/transport/chttp2/transport/bin_encoder.h - src/core/ext/transport/chttp2/transport/bin_encoder.h
- src/core/ext/transport/chttp2/transport/chttp2_transport.h - src/core/ext/transport/chttp2/transport/chttp2_transport.h
- src/core/ext/transport/chttp2/transport/flow_control.h
- src/core/ext/transport/chttp2/transport/frame.h - src/core/ext/transport/chttp2/transport/frame.h
- src/core/ext/transport/chttp2/transport/frame_data.h - src/core/ext/transport/chttp2/transport/frame_data.h
- src/core/ext/transport/chttp2/transport/frame_goaway.h - src/core/ext/transport/chttp2/transport/frame_goaway.h

@ -249,6 +249,7 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/transport/bin_decoder.h', 'src/core/ext/transport/chttp2/transport/bin_decoder.h',
'src/core/ext/transport/chttp2/transport/bin_encoder.h', 'src/core/ext/transport/chttp2/transport/bin_encoder.h',
'src/core/ext/transport/chttp2/transport/chttp2_transport.h', 'src/core/ext/transport/chttp2/transport/chttp2_transport.h',
'src/core/ext/transport/chttp2/transport/flow_control.h',
'src/core/ext/transport/chttp2/transport/frame.h', 'src/core/ext/transport/chttp2/transport/frame.h',
'src/core/ext/transport/chttp2/transport/frame_data.h', 'src/core/ext/transport/chttp2/transport/frame_data.h',
'src/core/ext/transport/chttp2/transport/frame_goaway.h', 'src/core/ext/transport/chttp2/transport/frame_goaway.h',
@ -751,6 +752,7 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/transport/bin_decoder.h', 'src/core/ext/transport/chttp2/transport/bin_decoder.h',
'src/core/ext/transport/chttp2/transport/bin_encoder.h', 'src/core/ext/transport/chttp2/transport/bin_encoder.h',
'src/core/ext/transport/chttp2/transport/chttp2_transport.h', 'src/core/ext/transport/chttp2/transport/chttp2_transport.h',
'src/core/ext/transport/chttp2/transport/flow_control.h',
'src/core/ext/transport/chttp2/transport/frame.h', 'src/core/ext/transport/chttp2/transport/frame.h',
'src/core/ext/transport/chttp2/transport/frame_data.h', 'src/core/ext/transport/chttp2/transport/frame_data.h',
'src/core/ext/transport/chttp2/transport/frame_goaway.h', 'src/core/ext/transport/chttp2/transport/frame_goaway.h',

@ -181,6 +181,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/transport/chttp2/transport/bin_decoder.h ) s.files += %w( src/core/ext/transport/chttp2/transport/bin_decoder.h )
s.files += %w( src/core/ext/transport/chttp2/transport/bin_encoder.h ) s.files += %w( src/core/ext/transport/chttp2/transport/bin_encoder.h )
s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_transport.h ) s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_transport.h )
s.files += %w( src/core/ext/transport/chttp2/transport/flow_control.h )
s.files += %w( src/core/ext/transport/chttp2/transport/frame.h ) s.files += %w( src/core/ext/transport/chttp2/transport/frame.h )
s.files += %w( src/core/ext/transport/chttp2/transport/frame_data.h ) s.files += %w( src/core/ext/transport/chttp2/transport/frame_data.h )
s.files += %w( src/core/ext/transport/chttp2/transport/frame_goaway.h ) s.files += %w( src/core/ext/transport/chttp2/transport/frame_goaway.h )

@ -193,6 +193,7 @@
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/bin_decoder.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/bin_decoder.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/bin_encoder.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/bin_encoder.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_transport.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_transport.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/flow_control.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/frame.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/frame.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/frame_data.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/frame_data.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/frame_goaway.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/frame_goaway.h" role="src" />

@ -54,7 +54,6 @@
#include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport.h"
#include "src/core/lib/transport/transport_impl.h" #include "src/core/lib/transport/transport_impl.h"
#define DEFAULT_WINDOW 65535
#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024) #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
#define MAX_WINDOW 0x7fffffffu #define MAX_WINDOW 0x7fffffffu
#define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024) #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
@ -222,7 +221,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
t->write_cb_pool = next; t->write_cb_pool = next;
} }
t->flow_control.bdp_estimator.Destroy(); t->flow_control.Destroy();
GRPC_ERROR_UNREF(t->closed_with_error); GRPC_ERROR_UNREF(t->closed_with_error);
gpr_free(t->ping_acks); gpr_free(t->ping_acks);
@ -282,10 +281,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->endpoint_reading = 1; t->endpoint_reading = 1;
t->next_stream_id = is_client ? 1 : 2; t->next_stream_id = is_client ? 1 : 2;
t->is_client = is_client; t->is_client = is_client;
t->flow_control.remote_window = DEFAULT_WINDOW;
t->flow_control.announced_window = DEFAULT_WINDOW;
t->flow_control.target_initial_window_size = DEFAULT_WINDOW;
t->flow_control.t = t;
t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->is_first_frame = true; t->is_first_frame = true;
grpc_connectivity_state_init( grpc_connectivity_state_init(
@ -325,8 +320,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
keepalive_watchdog_fired_locked, t, keepalive_watchdog_fired_locked, t,
grpc_combiner_scheduler(t->combiner)); grpc_combiner_scheduler(t->combiner));
t->flow_control.bdp_estimator.Init(t->peer_string);
grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser); grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser);
@ -350,8 +343,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
window -- this should by rights be 0 */ window -- this should by rights be 0 */
t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
t->sent_local_settings = 0; t->sent_local_settings = 0;
t->write_buffer_size = DEFAULT_WINDOW; t->write_buffer_size = grpc_core::chttp2::kDefaultWindow;
t->flow_control.enable_bdp_probe = true;
if (is_client) { if (is_client) {
grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string( grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
@ -396,6 +388,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
bool enable_bdp = true;
if (channel_args) { if (channel_args) {
for (i = 0; i < channel_args->num_args; i++) { for (i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, if (0 == strcmp(channel_args->args[i].key,
@ -456,8 +450,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE}); &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE});
} else if (0 == } else if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) { strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
t->flow_control.enable_bdp_probe = enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true);
grpc_channel_arg_get_integer(&channel_args->args[i], {1, 0, 1});
} else if (0 == strcmp(channel_args->args[i].key, } else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_KEEPALIVE_TIME_MS)) { GRPC_ARG_KEEPALIVE_TIME_MS)) {
const int value = grpc_channel_arg_get_integer( const int value = grpc_channel_arg_get_integer(
@ -552,6 +545,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
} }
} }
t->flow_control.Init(exec_ctx, t, enable_bdp);
/* No pings allowed before receiving a header or data frame. */ /* No pings allowed before receiving a header or data frame. */
t->ping_state.pings_before_data_required = 0; t->ping_state.pings_before_data_required = 0;
t->ping_state.is_delayed_ping_timer_set = false; t->ping_state.is_delayed_ping_timer_set = false;
@ -572,15 +567,13 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
} }
if (t->flow_control.enable_bdp_probe) { if (enable_bdp) {
GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
schedule_bdp_ping_locked(exec_ctx, t); schedule_bdp_ping_locked(exec_ctx, t);
}
grpc_chttp2_act_on_flowctl_action( grpc_chttp2_act_on_flowctl_action(
exec_ctx, exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, NULL);
grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t, }
NULL);
grpc_chttp2_initiate_write(exec_ctx, t, grpc_chttp2_initiate_write(exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE); GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
@ -718,7 +711,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
post_destructive_reclaimer(exec_ctx, t); post_destructive_reclaimer(exec_ctx, t);
} }
s->flow_control.s = s; s->flow_control.Init(t->flow_control.get(), s);
GPR_TIMER_END("init_stream", 0); GPR_TIMER_END("init_stream", 0);
return 0; return 0;
@ -769,7 +762,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GRPC_ERROR_UNREF(s->write_closed_error); GRPC_ERROR_UNREF(s->write_closed_error);
GRPC_ERROR_UNREF(s->byte_stream_error); GRPC_ERROR_UNREF(s->byte_stream_error);
grpc_chttp2_flowctl_destroy_stream(&t->flow_control, &s->flow_control); s->flow_control.Destroy();
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream");
@ -1638,13 +1631,10 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if (s->id != 0) { if (s->id != 0) {
if (!s->read_closed) { if (!s->read_closed) {
already_received = s->frame_storage.length; already_received = s->frame_storage.length;
grpc_chttp2_flowctl_incoming_bs_update( s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
&t->flow_control, &s->flow_control, GRPC_HEADER_SIZE_IN_BYTES, already_received);
already_received); grpc_chttp2_act_on_flowctl_action(exec_ctx,
grpc_chttp2_act_on_flowctl_action( s->flow_control->MakeAction(), t, s);
exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control,
&s->flow_control),
t, s);
} }
} }
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
@ -2420,49 +2410,44 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
* INPUT PROCESSING - PARSING * INPUT PROCESSING - PARSING
*/ */
void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, template <class F>
grpc_chttp2_flowctl_action action, static void WithUrgency(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_transport *t, grpc_core::chttp2::FlowControlAction::Urgency urgency,
grpc_chttp2_stream *s) { grpc_chttp2_initiate_write_reason reason, F action) {
switch (action.send_stream_update) { switch (urgency) {
case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
break;
case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
grpc_chttp2_initiate_write(
exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL);
break; break;
case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
grpc_chttp2_mark_stream_writable(exec_ctx, t, s); grpc_chttp2_initiate_write(exec_ctx, t, reason);
// fallthrough
case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
action();
break; break;
} }
switch (action.send_transport_update) { }
case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
break; void grpc_chttp2_act_on_flowctl_action(
case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: grpc_exec_ctx *exec_ctx, const grpc_core::chttp2::FlowControlAction &action,
grpc_chttp2_initiate_write( grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL); WithUrgency(
break; exec_ctx, t, action.send_stream_update(),
// this is the same as no action b/c every time the transport enters the GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
// writing path it will maybe do an update [exec_ctx, t, s]() { grpc_chttp2_mark_stream_writable(exec_ctx, t, s); });
case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: WithUrgency(exec_ctx, t, action.send_transport_update(),
break; GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
} WithUrgency(exec_ctx, t, action.send_initial_window_update(),
if (action.send_setting_update != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) { GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS,
if (action.initial_window_size > 0) { [exec_ctx, t, &action]() {
queue_setting_update(exec_ctx, t, queue_setting_update(exec_ctx, t,
GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
(uint32_t)action.initial_window_size); action.initial_window_size());
} });
if (action.max_frame_size > 0) { WithUrgency(
queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, exec_ctx, t, action.send_max_frame_size_update(),
(uint32_t)action.max_frame_size); GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [exec_ctx, t, &action]() {
} queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
if (action.send_setting_update == GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY) { action.max_frame_size());
grpc_chttp2_initiate_write(exec_ctx, t, });
GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS);
}
}
} }
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
@ -2518,7 +2503,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
GRPC_ERROR_NONE}; GRPC_ERROR_NONE};
for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) { for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
t->flow_control.bdp_estimator->AddIncomingBytes( t->flow_control->bdp_estimator()->AddIncomingBytes(
(int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i])); (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
errors[1] = errors[1] =
grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]); grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]);
@ -2535,8 +2520,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END("reading_action.parse", 0); GPR_TIMER_END("reading_action.parse", 0);
GPR_TIMER_BEGIN("post_parse_locked", 0); GPR_TIMER_BEGIN("post_parse_locked", 0);
if (t->flow_control.initial_window_update != 0) { if (t->initial_window_update != 0) {
if (t->flow_control.initial_window_update > 0) { if (t->initial_window_update > 0) {
grpc_chttp2_stream *s; grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
grpc_chttp2_mark_stream_writable(exec_ctx, t, s); grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
@ -2545,7 +2530,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING); GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
} }
} }
t->flow_control.initial_window_update = 0; t->initial_window_update = 0;
} }
GPR_TIMER_END("post_parse_locked", 0); GPR_TIMER_END("post_parse_locked", 0);
} }
@ -2568,10 +2553,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (keep_reading) { if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer,
&t->read_action_locked); &t->read_action_locked);
grpc_chttp2_act_on_flowctl_action( grpc_chttp2_act_on_flowctl_action(exec_ctx, t->flow_control->MakeAction(),
exec_ctx, t, NULL);
grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t,
NULL);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else { } else {
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action");
@ -2588,7 +2571,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
// that kicks off finishes, it's unreffed // that kicks off finishes, it's unreffed
static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx, static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) { grpc_chttp2_transport *t) {
t->flow_control.bdp_estimator->SchedulePing(); t->flow_control->bdp_estimator()->SchedulePing();
send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked, send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked,
&t->finish_bdp_ping_locked); &t->finish_bdp_ping_locked);
} }
@ -2604,7 +2587,7 @@ static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) { if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
} }
t->flow_control.bdp_estimator->StartPing(); t->flow_control->bdp_estimator()->StartPing();
} }
static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
@ -2618,7 +2601,10 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
return; return;
} }
grpc_millis next_ping = t->flow_control.bdp_estimator->CompletePing(exec_ctx); grpc_millis next_ping =
t->flow_control->bdp_estimator()->CompletePing(exec_ctx);
grpc_chttp2_act_on_flowctl_action(
exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, nullptr);
GPR_ASSERT(!t->have_next_bdp_ping_timer); GPR_ASSERT(!t->have_next_bdp_ping_timer);
t->have_next_bdp_ping_timer = true; t->have_next_bdp_ping_timer = true;
grpc_timer_init(exec_ctx, &t->next_bdp_ping_timer, next_ping, grpc_timer_init(exec_ctx, &t->next_bdp_ping_timer, next_ping,
@ -2844,13 +2830,10 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
size_t cur_length = s->frame_storage.length; size_t cur_length = s->frame_storage.length;
if (!s->read_closed) { if (!s->read_closed) {
grpc_chttp2_flowctl_incoming_bs_update(&t->flow_control, &s->flow_control, s->flow_control->IncomingByteStreamUpdate(bs->next_action.max_size_hint,
bs->next_action.max_size_hint, cur_length);
cur_length); grpc_chttp2_act_on_flowctl_action(exec_ctx, s->flow_control->MakeAction(),
grpc_chttp2_act_on_flowctl_action( t, s);
exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control,
&s->flow_control),
t, s);
} }
GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
if (s->frame_storage.length > 0) { if (s->frame_storage.length > 0) {

@ -16,7 +16,7 @@
* *
*/ */
#include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/flow_control.h"
#include <inttypes.h> #include <inttypes.h>
#include <limits.h> #include <limits.h>
@ -28,38 +28,15 @@
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include <grpc/support/useful.h> #include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/lib/support/string.h" #include "src/core/lib/support/string.h"
static uint32_t grpc_chttp2_target_announced_window( namespace grpc_core {
const grpc_chttp2_transport_flowctl* tfc); namespace chttp2 {
#ifndef NDEBUG namespace {
typedef struct {
int64_t remote_window;
int64_t target_window;
int64_t announced_window;
int64_t remote_window_delta;
int64_t local_window_delta;
int64_t announced_window_delta;
uint32_t local_init_window;
uint32_t local_max_frame;
} shadow_flow_control;
static void pretrace(shadow_flow_control* shadow_fc,
grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc) {
shadow_fc->remote_window = tfc->remote_window;
shadow_fc->target_window = grpc_chttp2_target_announced_window(tfc);
shadow_fc->announced_window = tfc->announced_window;
if (sfc != NULL) {
shadow_fc->remote_window_delta = sfc->remote_window_delta;
shadow_fc->local_window_delta = sfc->local_window_delta;
shadow_fc->announced_window_delta = sfc->announced_window_delta;
}
}
#define TRACE_PADDING 30 static constexpr const int kTracePadding = 30;
static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) { static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) {
char* str; char* str;
@ -68,7 +45,7 @@ static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) {
} else { } else {
gpr_asprintf(&str, "%" PRId64 "", old_val); gpr_asprintf(&str, "%" PRId64 "", old_val);
} }
char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING); char* str_lp = gpr_leftpad(str, ' ', kTracePadding);
gpr_free(str); gpr_free(str);
return str_lp; return str_lp;
} }
@ -80,47 +57,58 @@ static char* fmt_uint32_diff_str(uint32_t old_val, uint32_t new_val) {
} else { } else {
gpr_asprintf(&str, "%" PRIu32 "", old_val); gpr_asprintf(&str, "%" PRIu32 "", old_val);
} }
char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING); char* str_lp = gpr_leftpad(str, ' ', kTracePadding);
gpr_free(str); gpr_free(str);
return str_lp; return str_lp;
} }
} // namespace
void FlowControlTrace::Init(const char* reason, TransportFlowControl* tfc,
StreamFlowControl* sfc) {
tfc_ = tfc;
sfc_ = sfc;
reason_ = reason;
remote_window_ = tfc->remote_window();
target_window_ = tfc->target_window();
announced_window_ = tfc->announced_window();
if (sfc != nullptr) {
remote_window_delta_ = sfc->remote_window_delta();
local_window_delta_ = sfc->local_window_delta();
announced_window_delta_ = sfc->announced_window_delta();
}
}
static void posttrace(shadow_flow_control* shadow_fc, void FlowControlTrace::Finish() {
grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc, const char* reason) {
uint32_t acked_local_window = uint32_t acked_local_window =
tfc->t->settings[GRPC_SENT_SETTINGS] tfc_->transport()->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
uint32_t remote_window = uint32_t remote_window =
tfc->t->settings[GRPC_PEER_SETTINGS] tfc_->transport()->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
char* trw_str = char* trw_str = fmt_int64_diff_str(remote_window_, tfc_->remote_window());
fmt_int64_diff_str(shadow_fc->remote_window, tfc->remote_window); char* tlw_str = fmt_int64_diff_str(target_window_, tfc_->target_window());
char* tlw_str = fmt_int64_diff_str(shadow_fc->target_window,
grpc_chttp2_target_announced_window(tfc));
char* taw_str = char* taw_str =
fmt_int64_diff_str(shadow_fc->announced_window, tfc->announced_window); fmt_int64_diff_str(announced_window_, tfc_->announced_window());
char* srw_str; char* srw_str;
char* slw_str; char* slw_str;
char* saw_str; char* saw_str;
if (sfc != NULL) { if (sfc_ != nullptr) {
srw_str = fmt_int64_diff_str(shadow_fc->remote_window_delta + remote_window, srw_str = fmt_int64_diff_str(remote_window_delta_ + remote_window,
sfc->remote_window_delta + remote_window); sfc_->remote_window_delta() + remote_window);
slw_str = slw_str = fmt_int64_diff_str(local_window_delta_ + acked_local_window,
fmt_int64_diff_str(shadow_fc->local_window_delta + acked_local_window, local_window_delta_ + acked_local_window);
sfc->local_window_delta + acked_local_window); saw_str = fmt_int64_diff_str(announced_window_delta_ + acked_local_window,
saw_str = fmt_int64_diff_str( announced_window_delta_ + acked_local_window);
shadow_fc->announced_window_delta + acked_local_window,
sfc->announced_window_delta + acked_local_window);
} else { } else {
srw_str = gpr_leftpad("", ' ', TRACE_PADDING); srw_str = gpr_leftpad("", ' ', kTracePadding);
slw_str = gpr_leftpad("", ' ', TRACE_PADDING); slw_str = gpr_leftpad("", ' ', kTracePadding);
saw_str = gpr_leftpad("", ' ', TRACE_PADDING); saw_str = gpr_leftpad("", ' ', kTracePadding);
} }
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s", "%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s",
tfc, sfc != NULL ? sfc->s->id : 0, tfc->t->is_client ? "cli" : "svr", tfc_, sfc_ != nullptr ? sfc_->stream()->id : 0,
reason, trw_str, tlw_str, taw_str, srw_str, slw_str, saw_str); tfc_->transport()->is_client ? "cli" : "svr", reason_, trw_str,
tlw_str, taw_str, srw_str, slw_str, saw_str);
gpr_free(trw_str); gpr_free(trw_str);
gpr_free(tlw_str); gpr_free(tlw_str);
gpr_free(taw_str); gpr_free(taw_str);
@ -129,13 +117,13 @@ static void posttrace(shadow_flow_control* shadow_fc,
gpr_free(saw_str); gpr_free(saw_str);
} }
static const char* urgency_to_string(grpc_chttp2_flowctl_urgency urgency) { const char* FlowControlAction::UrgencyString(Urgency u) {
switch (urgency) { switch (u) {
case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: case Urgency::NO_ACTION_NEEDED:
return "no action"; return "no action";
case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: case Urgency::UPDATE_IMMEDIATELY:
return "update immediately"; return "update immediately";
case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: case Urgency::QUEUE_UPDATE:
return "queue update"; return "queue update";
default: default:
GPR_UNREACHABLE_CODE(return "unknown"); GPR_UNREACHABLE_CODE(return "unknown");
@ -143,209 +131,132 @@ static const char* urgency_to_string(grpc_chttp2_flowctl_urgency urgency) {
GPR_UNREACHABLE_CODE(return "unknown"); GPR_UNREACHABLE_CODE(return "unknown");
} }
static void trace_action(grpc_chttp2_transport_flowctl* tfc, void FlowControlAction::Trace(grpc_chttp2_transport* t) const {
grpc_chttp2_flowctl_action action) {
char* iw_str = fmt_uint32_diff_str( char* iw_str = fmt_uint32_diff_str(
tfc->t->settings[GRPC_SENT_SETTINGS] t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], initial_window_size_);
action.initial_window_size);
char* mf_str = fmt_uint32_diff_str( char* mf_str = fmt_uint32_diff_str(
tfc->t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
action.max_frame_size); max_frame_size_);
gpr_log(GPR_DEBUG, "t[%s], s[%s], settings[%s] iw:%s mf:%s", gpr_log(GPR_DEBUG, "t[%s], s[%s], iw:%s:%s mf:%s:%s",
urgency_to_string(action.send_transport_update), UrgencyString(send_transport_update_),
urgency_to_string(action.send_stream_update), UrgencyString(send_stream_update_),
urgency_to_string(action.send_setting_update), iw_str, mf_str); UrgencyString(send_initial_window_update_), iw_str,
UrgencyString(send_max_frame_size_update_), mf_str);
gpr_free(iw_str); gpr_free(iw_str);
gpr_free(mf_str); gpr_free(mf_str);
} }
#define PRETRACE(tfc, sfc) \ TransportFlowControl::TransportFlowControl(grpc_exec_ctx* exec_ctx,
shadow_flow_control shadow_fc; \ const grpc_chttp2_transport* t,
GRPC_FLOW_CONTROL_IF_TRACING(pretrace(&shadow_fc, tfc, sfc)) bool enable_bdp_probe)
#define POSTTRACE(tfc, sfc, reason) \ : t_(t),
GRPC_FLOW_CONTROL_IF_TRACING(posttrace(&shadow_fc, tfc, sfc, reason)) enable_bdp_probe_(enable_bdp_probe),
#define TRACEACTION(tfc, action) \ bdp_estimator_(t->peer_string),
GRPC_FLOW_CONTROL_IF_TRACING(trace_action(tfc, action)) pid_controller_(grpc_core::PidController::Args()
#else .set_gain_p(4)
#define PRETRACE(tfc, sfc) .set_gain_i(8)
#define POSTTRACE(tfc, sfc, reason) .set_gain_d(0)
#define TRACEACTION(tfc, action) .set_initial_control_value(TargetLogBdp())
#endif .set_min_control_value(-1)
.set_max_control_value(25)
/* How many bytes of incoming flow control would we like to advertise */ .set_integral_range(10)),
static uint32_t grpc_chttp2_target_announced_window( last_pid_update_(grpc_exec_ctx_now(exec_ctx)) {}
const grpc_chttp2_transport_flowctl* tfc) {
return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1), uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) {
tfc->announced_stream_total_over_incoming_window + FlowControlTrace trace("t updt sent", this, nullptr);
tfc->target_initial_window_size); const uint32_t target_announced_window = target_window();
} if ((writing_anyway || announced_window_ <= target_announced_window / 2) &&
announced_window_ != target_announced_window) {
// we have sent data on the wire, we must track this in our bookkeeping for the const uint32_t announce = (uint32_t)GPR_CLAMP(
// remote peer's flow control. target_announced_window - announced_window_, 0, UINT32_MAX);
void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl* tfc, announced_window_ += announce;
grpc_chttp2_stream_flowctl* sfc, return announce;
int64_t size) {
PRETRACE(tfc, sfc);
tfc->remote_window -= size;
sfc->remote_window_delta -= size;
POSTTRACE(tfc, sfc, " data sent");
}
static void announced_window_delta_preupdate(grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc) {
if (sfc->announced_window_delta > 0) {
tfc->announced_stream_total_over_incoming_window -=
sfc->announced_window_delta;
} else {
tfc->announced_stream_total_under_incoming_window +=
-sfc->announced_window_delta;
}
}
static void announced_window_delta_postupdate(
grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) {
if (sfc->announced_window_delta > 0) {
tfc->announced_stream_total_over_incoming_window +=
sfc->announced_window_delta;
} else {
tfc->announced_stream_total_under_incoming_window -=
-sfc->announced_window_delta;
} }
return 0;
} }
// We have received data from the wire. We must track this in our own flow grpc_error* TransportFlowControl::ValidateRecvData(
// control bookkeeping. int64_t incoming_frame_size) {
// Returns an error if the incoming frame violates our flow control. if (incoming_frame_size > announced_window_) {
grpc_error* grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc,
int64_t incoming_frame_size) {
uint32_t sent_init_window =
tfc->t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
uint32_t acked_init_window =
tfc->t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
PRETRACE(tfc, sfc);
if (incoming_frame_size > tfc->announced_window) {
char* msg; char* msg;
gpr_asprintf(&msg, gpr_asprintf(&msg,
"frame of size %" PRId64 " overflows local window of %" PRId64, "frame of size %" PRId64 " overflows local window of %" PRId64,
incoming_frame_size, tfc->announced_window); incoming_frame_size, announced_window_);
grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg); gpr_free(msg);
return err; return err;
} }
return GRPC_ERROR_NONE;
}
if (sfc != NULL) { StreamFlowControl::StreamFlowControl(TransportFlowControl* tfc,
int64_t acked_stream_window = const grpc_chttp2_stream* s)
sfc->announced_window_delta + acked_init_window; : tfc_(tfc), s_(s) {}
int64_t sent_stream_window = sfc->announced_window_delta + sent_init_window;
if (incoming_frame_size > acked_stream_window) {
if (incoming_frame_size <= sent_stream_window) {
gpr_log(
GPR_ERROR,
"Incoming frame of size %" PRId64
" exceeds local window size of %" PRId64
".\n"
"The (un-acked, future) window size would be %" PRId64
" which is not exceeded.\n"
"This would usually cause a disconnection, but allowing it due to"
"broken HTTP2 implementations in the wild.\n"
"See (for example) https://github.com/netty/netty/issues/6520.",
incoming_frame_size, acked_stream_window, sent_stream_window);
} else {
char* msg;
gpr_asprintf(&msg, "frame of size %" PRId64
" overflows local window of %" PRId64,
incoming_frame_size, acked_stream_window);
grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg);
return err;
}
}
announced_window_delta_preupdate(tfc, sfc); grpc_error* StreamFlowControl::RecvData(int64_t incoming_frame_size) {
sfc->announced_window_delta -= incoming_frame_size; FlowControlTrace trace(" data recv", tfc_, this);
announced_window_delta_postupdate(tfc, sfc);
sfc->local_window_delta -= incoming_frame_size;
}
tfc->announced_window -= incoming_frame_size; grpc_error* error = GRPC_ERROR_NONE;
error = tfc_->ValidateRecvData(incoming_frame_size);
if (error != GRPC_ERROR_NONE) return error;
POSTTRACE(tfc, sfc, " data recv"); uint32_t sent_init_window =
return GRPC_ERROR_NONE; tfc_->transport()->settings[GRPC_SENT_SETTINGS]
} [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
uint32_t acked_init_window =
// Returns a non zero announce integer if we should send a transport window tfc_->transport()->settings[GRPC_ACKED_SETTINGS]
// update [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
uint32_t grpc_chttp2_flowctl_maybe_send_transport_update(
grpc_chttp2_transport_flowctl* tfc, bool writing_anyway) { int64_t acked_stream_window = announced_window_delta_ + acked_init_window;
PRETRACE(tfc, NULL); int64_t sent_stream_window = announced_window_delta_ + sent_init_window;
uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc); if (incoming_frame_size > acked_stream_window) {
uint32_t threshold_to_send_transport_window_update = if (incoming_frame_size <= sent_stream_window) {
tfc->t->outbuf.count > 0 ? 3 * target_announced_window / 4 gpr_log(GPR_ERROR,
: target_announced_window / 2; "Incoming frame of size %" PRId64
if ((writing_anyway || " exceeds local window size of %" PRId64
tfc->announced_window <= threshold_to_send_transport_window_update) && ".\n"
tfc->announced_window != target_announced_window) { "The (un-acked, future) window size would be %" PRId64
uint32_t announce = (uint32_t)GPR_CLAMP( " which is not exceeded.\n"
target_announced_window - tfc->announced_window, 0, UINT32_MAX); "This would usually cause a disconnection, but allowing it due to"
tfc->announced_window += announce; "broken HTTP2 implementations in the wild.\n"
POSTTRACE(tfc, NULL, "t updt sent"); "See (for example) https://github.com/netty/netty/issues/6520.",
return announce; incoming_frame_size, acked_stream_window, sent_stream_window);
} else {
char* msg;
gpr_asprintf(&msg, "frame of size %" PRId64
" overflows local window of %" PRId64,
incoming_frame_size, acked_stream_window);
grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg);
return err;
}
} }
GRPC_FLOW_CONTROL_IF_TRACING(
gpr_log(GPR_DEBUG, "%p[0][%s] will not send transport update", tfc, UpdateAnnouncedWindowDelta(tfc_, -incoming_frame_size);
tfc->t->is_client ? "cli" : "svr")); local_window_delta_ -= incoming_frame_size;
return 0; tfc_->CommitRecvData(incoming_frame_size);
return GRPC_ERROR_NONE;
} }
// Returns a non zero announce integer if we should send a stream window update uint32_t StreamFlowControl::MaybeSendUpdate() {
uint32_t grpc_chttp2_flowctl_maybe_send_stream_update( FlowControlTrace trace("s updt sent", tfc_, this);
grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) { if (local_window_delta_ > announced_window_delta_) {
PRETRACE(tfc, sfc);
if (sfc->local_window_delta > sfc->announced_window_delta) {
uint32_t announce = (uint32_t)GPR_CLAMP( uint32_t announce = (uint32_t)GPR_CLAMP(
sfc->local_window_delta - sfc->announced_window_delta, 0, UINT32_MAX); local_window_delta_ - announced_window_delta_, 0, UINT32_MAX);
announced_window_delta_preupdate(tfc, sfc); UpdateAnnouncedWindowDelta(tfc_, announce);
sfc->announced_window_delta += announce;
announced_window_delta_postupdate(tfc, sfc);
POSTTRACE(tfc, sfc, "s updt sent");
return announce; return announce;
} }
GRPC_FLOW_CONTROL_IF_TRACING(
gpr_log(GPR_DEBUG, "%p[%u][%s] will not send stream update", tfc,
sfc->s->id, tfc->t->is_client ? "cli" : "svr"));
return 0; return 0;
} }
// we have received a WINDOW_UPDATE frame for a transport void StreamFlowControl::IncomingByteStreamUpdate(size_t max_size_hint,
void grpc_chttp2_flowctl_recv_transport_update( size_t have_already) {
grpc_chttp2_transport_flowctl* tfc, uint32_t size) { FlowControlTrace trace("app st recv", tfc_, this);
PRETRACE(tfc, NULL);
tfc->remote_window += size;
POSTTRACE(tfc, NULL, "t updt recv");
}
// we have received a WINDOW_UPDATE frame for a stream
void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc,
uint32_t size) {
PRETRACE(tfc, sfc);
sfc->remote_window_delta += size;
POSTTRACE(tfc, sfc, "s updt recv");
}
void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc,
size_t max_size_hint,
size_t have_already) {
PRETRACE(tfc, sfc);
uint32_t max_recv_bytes; uint32_t max_recv_bytes;
uint32_t sent_init_window = uint32_t sent_init_window =
tfc->t->settings[GRPC_SENT_SETTINGS] tfc_->transport()->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
/* clamp max recv hint to an allowable size */ /* clamp max recv hint to an allowable size */
if (max_size_hint >= UINT32_MAX - sent_init_window) { if (max_size_hint >= UINT32_MAX - sent_init_window) {
@ -363,65 +274,18 @@ void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl* tfc,
/* add some small lookahead to keep pipelines flowing */ /* add some small lookahead to keep pipelines flowing */
GPR_ASSERT(max_recv_bytes <= UINT32_MAX - sent_init_window); GPR_ASSERT(max_recv_bytes <= UINT32_MAX - sent_init_window);
if (sfc->local_window_delta < max_recv_bytes) { if (local_window_delta_ < max_recv_bytes) {
uint32_t add_max_recv_bytes = uint32_t add_max_recv_bytes =
(uint32_t)(max_recv_bytes - sfc->local_window_delta); (uint32_t)(max_recv_bytes - local_window_delta_);
sfc->local_window_delta += add_max_recv_bytes; local_window_delta_ += add_max_recv_bytes;
}
POSTTRACE(tfc, sfc, "app st recv");
}
void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc) {
announced_window_delta_preupdate(tfc, sfc);
}
// Returns an urgency with which to make an update
static grpc_chttp2_flowctl_urgency delta_is_significant(
const grpc_chttp2_transport_flowctl* tfc, int32_t value,
grpc_chttp2_setting_id setting_id) {
int64_t delta = (int64_t)value -
(int64_t)tfc->t->settings[GRPC_LOCAL_SETTINGS][setting_id];
// TODO(ncteisen): tune this
if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) {
return GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE;
} else {
return GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED;
}
}
// Takes in a target and uses the pid controller to return a stabilized
// guess at the new bdp.
static double get_pid_controller_guess(grpc_exec_ctx* exec_ctx,
grpc_chttp2_transport_flowctl* tfc,
double target) {
grpc_millis now = grpc_exec_ctx_now(exec_ctx);
if (!tfc->pid_controller_initialized) {
tfc->last_pid_update = now;
tfc->pid_controller_initialized = true;
tfc->pid_controller.Init(grpc_core::PidController::Args()
.set_gain_p(4)
.set_gain_i(8)
.set_gain_d(0)
.set_initial_control_value(target)
.set_min_control_value(-1)
.set_max_control_value(25)
.set_integral_range(10));
return pow(2, target);
} }
double bdp_error = target - tfc->pid_controller->last_control_value();
double dt = (double)(now - tfc->last_pid_update) * 1e-3;
double log2_bdp_guess = tfc->pid_controller->Update(bdp_error, dt);
tfc->last_pid_update = now;
return pow(2, log2_bdp_guess);
} }
// Take in a target and modifies it based on the memory pressure of the system // Take in a target and modifies it based on the memory pressure of the system
static double get_target_under_memory_pressure( static double AdjustForMemoryPressure(grpc_resource_quota* quota,
grpc_chttp2_transport_flowctl* tfc, double target) { double target) {
// do not increase window under heavy memory pressure. // do not increase window under heavy memory pressure.
double memory_pressure = grpc_resource_quota_get_memory_pressure( double memory_pressure = grpc_resource_quota_get_memory_pressure(quota);
grpc_resource_user_quota(grpc_endpoint_get_resource_user(tfc->t->ep)));
static const double kLowMemPressure = 0.1; static const double kLowMemPressure = 0.1;
static const double kZeroTarget = 22; static const double kZeroTarget = 22;
static const double kHighMemPressure = 0.8; static const double kHighMemPressure = 0.8;
@ -436,75 +300,82 @@ static double get_target_under_memory_pressure(
return target; return target;
} }
grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( double TransportFlowControl::TargetLogBdp() {
grpc_exec_ctx* exec_ctx, grpc_chttp2_transport_flowctl* tfc, return AdjustForMemoryPressure(
grpc_chttp2_stream_flowctl* sfc) { grpc_resource_user_quota(grpc_endpoint_get_resource_user(t_->ep)),
grpc_chttp2_flowctl_action action; 1 + log2(bdp_estimator_.EstimateBdp()));
memset(&action, 0, sizeof(action)); }
double TransportFlowControl::SmoothLogBdp(grpc_exec_ctx* exec_ctx,
double value) {
grpc_millis now = grpc_exec_ctx_now(exec_ctx);
double bdp_error = value - pid_controller_.last_control_value();
const double dt = (double)(now - last_pid_update_) * 1e-3;
last_pid_update_ = now;
return pid_controller_.Update(bdp_error, dt);
}
FlowControlAction::Urgency TransportFlowControl::DeltaUrgency(
int32_t value, grpc_chttp2_setting_id setting_id) {
int64_t delta =
(int64_t)value - (int64_t)t_->settings[GRPC_LOCAL_SETTINGS][setting_id];
// TODO(ncteisen): tune this // TODO(ncteisen): tune this
if (sfc != NULL && !sfc->s->read_closed) { if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) {
uint32_t sent_init_window = return FlowControlAction::Urgency::QUEUE_UPDATE;
tfc->t->settings[GRPC_SENT_SETTINGS] } else {
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; return FlowControlAction::Urgency::NO_ACTION_NEEDED;
if ((int64_t)sfc->local_window_delta >
(int64_t)sfc->announced_window_delta &&
(int64_t)sfc->announced_window_delta + sent_init_window <=
sent_init_window / 2) {
action.send_stream_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY;
} else if (sfc->local_window_delta > sfc->announced_window_delta) {
action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE;
}
} }
if (tfc->enable_bdp_probe) { }
FlowControlAction TransportFlowControl::PeriodicUpdate(
grpc_exec_ctx* exec_ctx) {
FlowControlAction action;
if (enable_bdp_probe_) {
// get bdp estimate and update initial_window accordingly. // get bdp estimate and update initial_window accordingly.
int64_t estimate = -1; // target might change based on how much memory pressure we are under
if (tfc->bdp_estimator->EstimateBdp(&estimate)) { // TODO(ncteisen): experiment with setting target to be huge under low
double target = 1 + log2((double)estimate); // memory pressure.
const double target = pow(2, SmoothLogBdp(exec_ctx, TargetLogBdp()));
// target might change based on how much memory pressure we are under
// TODO(ncteisen): experiment with setting target to be huge under low // Though initial window 'could' drop to 0, we keep the floor at 128
// memory pressure. target_initial_window_size_ = (int32_t)GPR_CLAMP(target, 128, INT32_MAX);
target = get_target_under_memory_pressure(tfc, target);
action.set_send_initial_window_update(
// run our target through the pid controller to stabilize change. DeltaUrgency(target_initial_window_size_,
// TODO(ncteisen): experiment with other controllers here. GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE),
double bdp_guess = get_pid_controller_guess(exec_ctx, tfc, target); target_initial_window_size_);
// Though initial window 'could' drop to 0, we keep the floor at 128
tfc->target_initial_window_size =
(int32_t)GPR_CLAMP(bdp_guess, 128, INT32_MAX);
grpc_chttp2_flowctl_urgency init_window_update_urgency =
delta_is_significant(tfc, tfc->target_initial_window_size,
GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE);
if (init_window_update_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
action.send_setting_update = init_window_update_urgency;
action.initial_window_size = (uint32_t)tfc->target_initial_window_size;
}
}
// get bandwidth estimate and update max_frame accordingly. // get bandwidth estimate and update max_frame accordingly.
double bw_dbl = -1; double bw_dbl = bdp_estimator_.EstimateBandwidth();
if (tfc->bdp_estimator->EstimateBandwidth(&bw_dbl)) { // we target the max of BDP or bandwidth in microseconds.
// we target the max of BDP or bandwidth in microseconds. int32_t frame_size = (int32_t)GPR_CLAMP(
int32_t frame_size = (int32_t)GPR_CLAMP( GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000,
GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000, target_initial_window_size_),
tfc->target_initial_window_size), 16384, 16777215);
16384, 16777215); action.set_send_max_frame_size_update(
grpc_chttp2_flowctl_urgency frame_size_urgency = delta_is_significant( DeltaUrgency(frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE),
tfc, frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE); frame_size);
if (frame_size_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
if (frame_size_urgency > action.send_setting_update) {
action.send_setting_update = frame_size_urgency;
}
action.max_frame_size = (uint32_t)frame_size;
}
}
} }
uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc); return UpdateAction(action);
if (tfc->announced_window < target_announced_window / 2) { }
action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY;
FlowControlAction StreamFlowControl::UpdateAction(FlowControlAction action) {
// TODO(ncteisen): tune this
if (!s_->read_closed) {
uint32_t sent_init_window =
tfc_->transport()->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
if (local_window_delta_ > announced_window_delta_ &&
announced_window_delta_ + sent_init_window <= sent_init_window / 2) {
action.set_send_stream_update(
FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
} else if (local_window_delta_ > announced_window_delta_) {
action.set_send_stream_update(FlowControlAction::Urgency::QUEUE_UPDATE);
}
} }
TRACEACTION(tfc, action);
return action; return action;
} }
} // namespace chttp2
} // namespace grpc_core

@ -0,0 +1,328 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
#include <stdint.h>
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/http2_settings.h"
#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/pid_controller.h"
struct grpc_chttp2_transport;
struct grpc_chttp2_stream;
extern "C" grpc_tracer_flag grpc_flowctl_trace;
namespace grpc_core {
namespace chttp2 {
static constexpr uint32_t kDefaultWindow = 65535;
class TransportFlowControl;
class StreamFlowControl;
class FlowControlAction {
public:
enum class Urgency : uint8_t {
// Nothing to be done.
NO_ACTION_NEEDED = 0,
// Initiate a write to update the initial window immediately.
UPDATE_IMMEDIATELY,
// Push the flow control update into a send buffer, to be sent
// out the next time a write is initiated.
QUEUE_UPDATE,
};
Urgency send_stream_update() const { return send_stream_update_; }
Urgency send_transport_update() const { return send_transport_update_; }
Urgency send_initial_window_update() const {
return send_initial_window_update_;
}
Urgency send_max_frame_size_update() const {
return send_max_frame_size_update_;
}
uint32_t initial_window_size() const { return initial_window_size_; }
uint32_t max_frame_size() const { return max_frame_size_; }
FlowControlAction& set_send_stream_update(Urgency u) {
send_stream_update_ = u;
return *this;
}
FlowControlAction& set_send_transport_update(Urgency u) {
send_transport_update_ = u;
return *this;
}
FlowControlAction& set_send_initial_window_update(Urgency u,
uint32_t update) {
send_initial_window_update_ = u;
initial_window_size_ = update;
return *this;
}
FlowControlAction& set_send_max_frame_size_update(Urgency u,
uint32_t update) {
send_max_frame_size_update_ = u;
max_frame_size_ = update;
return *this;
}
static const char* UrgencyString(Urgency u);
void Trace(grpc_chttp2_transport* t) const;
private:
Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED;
Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED;
Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED;
Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED;
uint32_t initial_window_size_ = 0;
uint32_t max_frame_size_ = 0;
};
class FlowControlTrace {
public:
FlowControlTrace(const char* reason, TransportFlowControl* tfc,
StreamFlowControl* sfc) {
if (enabled_) Init(reason, tfc, sfc);
}
~FlowControlTrace() {
if (enabled_) Finish();
}
private:
void Init(const char* reason, TransportFlowControl* tfc,
StreamFlowControl* sfc);
void Finish();
const bool enabled_ = GRPC_TRACER_ON(grpc_flowctl_trace);
TransportFlowControl* tfc_;
StreamFlowControl* sfc_;
const char* reason_;
int64_t remote_window_;
int64_t target_window_;
int64_t announced_window_;
int64_t remote_window_delta_;
int64_t local_window_delta_;
int64_t announced_window_delta_;
};
class TransportFlowControl {
public:
TransportFlowControl(grpc_exec_ctx* exec_ctx, const grpc_chttp2_transport* t,
bool enable_bdp_probe);
~TransportFlowControl() {}
bool bdp_probe() const { return enable_bdp_probe_; }
// returns an announce if we should send a transport update to our peer,
// else returns zero; writing_anyway indicates if a write would happen
// regardless of the send - if it is false and this function returns non-zero,
// this announce will cause a write to occur
uint32_t MaybeSendUpdate(bool writing_anyway);
// Reads the flow control data and returns and actionable struct that will
// tell chttp2 exactly what it needs to do
FlowControlAction MakeAction() { return UpdateAction(FlowControlAction()); }
// Call periodically (at a low-ish rate, 100ms - 10s makes sense)
// to perform more complex flow control calculations and return an action
// to let chttp2 change its parameters
FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx);
void StreamSentData(int64_t size) { remote_window_ -= size; }
grpc_error* ValidateRecvData(int64_t incoming_frame_size);
void CommitRecvData(int64_t incoming_frame_size) {
announced_window_ -= incoming_frame_size;
}
grpc_error* RecvData(int64_t incoming_frame_size) {
FlowControlTrace trace(" data recv", this, nullptr);
grpc_error* error = ValidateRecvData(incoming_frame_size);
if (error != GRPC_ERROR_NONE) return error;
CommitRecvData(incoming_frame_size);
return GRPC_ERROR_NONE;
}
// we have received a WINDOW_UPDATE frame for a transport
void RecvUpdate(uint32_t size) {
FlowControlTrace trace("t updt recv", this, nullptr);
remote_window_ += size;
}
int64_t remote_window() const { return remote_window_; }
int64_t target_window() const {
return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1),
announced_stream_total_over_incoming_window_ +
target_initial_window_size_);
}
int64_t announced_window() const { return announced_window_; }
const grpc_chttp2_transport* transport() const { return t_; }
void PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) {
if (delta > 0) {
announced_stream_total_over_incoming_window_ -= delta;
} else {
announced_stream_total_under_incoming_window_ += -delta;
}
}
void PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) {
if (delta > 0) {
announced_stream_total_over_incoming_window_ += delta;
} else {
announced_stream_total_under_incoming_window_ -= -delta;
}
}
BdpEstimator* bdp_estimator() { return &bdp_estimator_; }
void TestOnlyForceHugeWindow() {
announced_window_ = 1024 * 1024 * 1024;
remote_window_ = 1024 * 1024 * 1024;
}
private:
double TargetLogBdp();
double SmoothLogBdp(grpc_exec_ctx* exec_ctx, double value);
FlowControlAction::Urgency DeltaUrgency(int32_t value,
grpc_chttp2_setting_id setting_id);
FlowControlAction UpdateAction(FlowControlAction action) {
if (announced_window_ < target_window() / 2) {
action.set_send_transport_update(
FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
}
return action;
}
const grpc_chttp2_transport* const t_;
/** Our bookkeeping for the remote peer's available window */
int64_t remote_window_ = kDefaultWindow;
/** calculating what we should give for local window:
we track the total amount of flow control over initial window size
across all streams: this is data that we want to receive right now (it
has an outstanding read)
and the total amount of flow control under initial window size across all
streams: this is data we've read early
we want to adjust incoming_window such that:
incoming_window = total_over - max(bdp - total_under, 0) */
int64_t announced_stream_total_over_incoming_window_ = 0;
int64_t announced_stream_total_under_incoming_window_ = 0;
/** This is out window according to what we have sent to our remote peer. The
* difference between this and target window is what we use to decide when
* to send WINDOW_UPDATE frames. */
int64_t announced_window_ = kDefaultWindow;
int32_t target_initial_window_size_ = kDefaultWindow;
/** should we probe bdp? */
const bool enable_bdp_probe_;
/* bdp estimation */
grpc_core::BdpEstimator bdp_estimator_;
/* pid controller */
grpc_core::PidController pid_controller_;
grpc_millis last_pid_update_ = 0;
};
class StreamFlowControl {
public:
StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s);
~StreamFlowControl() {
tfc_->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
}
FlowControlAction UpdateAction(FlowControlAction action);
FlowControlAction MakeAction() { return UpdateAction(tfc_->MakeAction()); }
// we have sent data on the wire, we must track this in our bookkeeping for
// the remote peer's flow control.
void SentData(int64_t outgoing_frame_size) {
FlowControlTrace tracer(" data sent", tfc_, this);
tfc_->StreamSentData(outgoing_frame_size);
remote_window_delta_ -= outgoing_frame_size;
}
// we have received data from the wire
grpc_error* RecvData(int64_t incoming_frame_size);
// returns an announce if we should send a stream update to our peer, else
// returns zero
uint32_t MaybeSendUpdate();
// we have received a WINDOW_UPDATE frame for a stream
void RecvUpdate(uint32_t size) {
FlowControlTrace trace("s updt recv", tfc_, this);
remote_window_delta_ += size;
}
// the application is asking for a certain amount of bytes
void IncomingByteStreamUpdate(size_t max_size_hint, size_t have_already);
int64_t remote_window_delta() const { return remote_window_delta_; }
int64_t local_window_delta() const { return local_window_delta_; }
int64_t announced_window_delta() const { return announced_window_delta_; }
const grpc_chttp2_stream* stream() const { return s_; }
void TestOnlyForceHugeWindow() {
announced_window_delta_ = 1024 * 1024 * 1024;
local_window_delta_ = 1024 * 1024 * 1024;
remote_window_delta_ = 1024 * 1024 * 1024;
}
private:
TransportFlowControl* const tfc_;
const grpc_chttp2_stream* const s_;
void UpdateAnnouncedWindowDelta(TransportFlowControl* tfc, int64_t change) {
tfc->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
announced_window_delta_ += change;
tfc->PostUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
}
/** window available for us to send to peer, over or under the initial
* window
* size of the transport... ie:
* remote_window = remote_window_delta + transport.initial_window_size */
int64_t remote_window_delta_ = 0;
/** window available for peer to send to us (as a delta on
* transport.initial_window_size)
* local_window = local_window_delta + transport.initial_window_size */
int64_t local_window_delta_ = 0;
/** window available for peer to send to us over this stream that we have
* announced to the peer */
int64_t announced_window_delta_ = 0;
};
} // namespace chttp2
} // namespace grpc_core
#endif

@ -202,13 +202,13 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p,
} }
if (id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE && if (id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE &&
parser->incoming_settings[id] != parser->value) { parser->incoming_settings[id] != parser->value) {
t->flow_control.initial_window_update += t->initial_window_update +=
(int64_t)parser->value - parser->incoming_settings[id]; (int64_t)parser->value - parser->incoming_settings[id];
if (GRPC_TRACER_ON(grpc_http_trace) || if (GRPC_TRACER_ON(grpc_http_trace) ||
GRPC_TRACER_ON(grpc_flowctl_trace)) { GRPC_TRACER_ON(grpc_flowctl_trace)) {
gpr_log(GPR_DEBUG, "%p[%s] adding %d for initial_window change", gpr_log(GPR_DEBUG, "%p[%s] adding %d for initial_window change",
t, t->is_client ? "cli" : "svr", t, t->is_client ? "cli" : "svr",
(int)t->flow_control.initial_window_update); (int)t->initial_window_update);
} }
} }
parser->incoming_settings[id] = parser->value; parser->incoming_settings[id] = parser->value;

@ -96,8 +96,7 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
if (t->incoming_stream_id != 0) { if (t->incoming_stream_id != 0) {
if (s != NULL) { if (s != NULL) {
grpc_chttp2_flowctl_recv_stream_update( s->flow_control->RecvUpdate(received_update);
&t->flow_control, &s->flow_control, received_update);
if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) { if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) {
grpc_chttp2_mark_stream_writable(exec_ctx, t, s); grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
grpc_chttp2_initiate_write( grpc_chttp2_initiate_write(
@ -106,10 +105,9 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
} }
} }
} else { } else {
bool was_zero = t->flow_control.remote_window <= 0; bool was_zero = t->flow_control->remote_window() <= 0;
grpc_chttp2_flowctl_recv_transport_update(&t->flow_control, t->flow_control->RecvUpdate(received_update);
received_update); bool is_zero = t->flow_control->remote_window() <= 0;
bool is_zero = t->flow_control.remote_window <= 0;
if (was_zero && !is_zero) { if (was_zero && !is_zero) {
grpc_chttp2_initiate_write( grpc_chttp2_initiate_write(
exec_ctx, t, exec_ctx, t,

@ -22,6 +22,7 @@
#include <assert.h> #include <assert.h>
#include <stdbool.h> #include <stdbool.h>
#include "src/core/ext/transport/chttp2/transport/flow_control.h"
#include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/ext/transport/chttp2/transport/frame.h"
#include "src/core/ext/transport/chttp2/transport/frame_data.h" #include "src/core/ext/transport/chttp2/transport/frame_data.h"
#include "src/core/ext/transport/chttp2/transport/frame_goaway.h" #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
@ -38,9 +39,7 @@
#include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/pid_controller.h"
#include "src/core/lib/transport/transport_impl.h" #include "src/core/lib/transport/transport_impl.h"
#ifdef __cplusplus #ifdef __cplusplus
@ -238,48 +237,6 @@ typedef enum {
GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED, GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
} grpc_chttp2_keepalive_state; } grpc_chttp2_keepalive_state;
typedef struct {
/** initial window change. This is tracked as we parse settings frames from
* the remote peer. If there is a positive delta, then we will make all
* streams readable since they may have become unstalled */
int64_t initial_window_update;
/** Our bookkeeping for the remote peer's available window */
int64_t remote_window;
/** calculating what we should give for local window:
we track the total amount of flow control over initial window size
across all streams: this is data that we want to receive right now (it
has an outstanding read)
and the total amount of flow control under initial window size across all
streams: this is data we've read early
we want to adjust incoming_window such that:
incoming_window = total_over - max(bdp - total_under, 0) */
int64_t announced_stream_total_over_incoming_window;
int64_t announced_stream_total_under_incoming_window;
/** This is out window according to what we have sent to our remote peer. The
* difference between this and target window is what we use to decide when
* to send WINDOW_UPDATE frames. */
int64_t announced_window;
int32_t target_initial_window_size;
/** should we probe bdp? */
bool enable_bdp_probe;
/* bdp estimation */
grpc_core::ManualConstructor<grpc_core::BdpEstimator> bdp_estimator;
/* pid controller */
bool pid_controller_initialized;
grpc_core::ManualConstructor<grpc_core::PidController> pid_controller;
grpc_millis last_pid_update;
// pointer back to transport for tracing
const grpc_chttp2_transport *t;
} grpc_chttp2_transport_flowctl;
struct grpc_chttp2_transport { struct grpc_chttp2_transport {
grpc_transport base; /* must be first */ grpc_transport base; /* must be first */
gpr_refcount refs; gpr_refcount refs;
@ -395,7 +352,12 @@ struct grpc_chttp2_transport {
/** parser for goaway frames */ /** parser for goaway frames */
grpc_chttp2_goaway_parser goaway_parser; grpc_chttp2_goaway_parser goaway_parser;
grpc_chttp2_transport_flowctl flow_control; grpc_core::ManualConstructor<grpc_core::chttp2::TransportFlowControl>
flow_control;
/** initial window change. This is tracked as we parse settings frames from
* the remote peer. If there is a positive delta, then we will make all
* streams readable since they may have become unstalled */
int64_t initial_window_update = 0;
/* deframing */ /* deframing */
grpc_chttp2_deframe_transport_state deframe_state; grpc_chttp2_deframe_transport_state deframe_state;
@ -477,25 +439,6 @@ typedef enum {
GPRC_METADATA_PUBLISHED_AT_CLOSE GPRC_METADATA_PUBLISHED_AT_CLOSE
} grpc_published_metadata_method; } grpc_published_metadata_method;
typedef struct {
/** window available for us to send to peer, over or under the initial window
* size of the transport... ie:
* remote_window = remote_window_delta + transport.initial_window_size */
int64_t remote_window_delta;
/** window available for peer to send to us (as a delta on
* transport.initial_window_size)
* local_window = local_window_delta + transport.initial_window_size */
int64_t local_window_delta;
/** window available for peer to send to us over this stream that we have
* announced to the peer */
int64_t announced_window_delta;
// read only pointer back to stream for data
const grpc_chttp2_stream *s;
} grpc_chttp2_stream_flowctl;
struct grpc_chttp2_stream { struct grpc_chttp2_stream {
grpc_chttp2_transport *t; grpc_chttp2_transport *t;
grpc_stream_refcount *refcount; grpc_stream_refcount *refcount;
@ -589,7 +532,8 @@ struct grpc_chttp2_stream {
bool sent_initial_metadata; bool sent_initial_metadata;
bool sent_trailing_metadata; bool sent_trailing_metadata;
grpc_chttp2_stream_flowctl flow_control; grpc_core::ManualConstructor<grpc_core::chttp2::StreamFlowControl>
flow_control;
grpc_slice_buffer flow_controlled_buffer; grpc_slice_buffer flow_controlled_buffer;
@ -700,73 +644,10 @@ bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t,
/********* Flow Control ***************/ /********* Flow Control ***************/
// we have sent data on the wire
void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl *tfc,
grpc_chttp2_stream_flowctl *sfc,
int64_t size);
// we have received data from the wire
grpc_error *grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl *tfc,
grpc_chttp2_stream_flowctl *sfc,
int64_t incoming_frame_size);
// returns an announce if we should send a transport update to our peer,
// else returns zero
uint32_t grpc_chttp2_flowctl_maybe_send_transport_update(
grpc_chttp2_transport_flowctl *tfc, bool writing_anyway);
// returns an announce if we should send a stream update to our peer, else
// returns zero
uint32_t grpc_chttp2_flowctl_maybe_send_stream_update(
grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc);
// we have received a WINDOW_UPDATE frame for a transport
void grpc_chttp2_flowctl_recv_transport_update(
grpc_chttp2_transport_flowctl *tfc, uint32_t size);
// we have received a WINDOW_UPDATE frame for a stream
void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl *tfc,
grpc_chttp2_stream_flowctl *sfc,
uint32_t size);
// the application is asking for a certain amount of bytes
void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl *tfc,
grpc_chttp2_stream_flowctl *sfc,
size_t max_size_hint,
size_t have_already);
void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl *tfc,
grpc_chttp2_stream_flowctl *sfc);
typedef enum {
// Nothing to be done.
GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED = 0,
// Initiate a write to update the initial window immediately.
GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY,
// Push the flow control update into a send buffer, to be sent
// out the next time a write is initiated.
GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE,
} grpc_chttp2_flowctl_urgency;
typedef struct {
grpc_chttp2_flowctl_urgency send_stream_update;
grpc_chttp2_flowctl_urgency send_transport_update;
grpc_chttp2_flowctl_urgency send_setting_update;
uint32_t initial_window_size;
uint32_t max_frame_size;
} grpc_chttp2_flowctl_action;
// Reads the flow control data and returns and actionable struct that will tell
// chttp2 exactly what it needs to do
grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_flowctl *tfc,
grpc_chttp2_stream_flowctl *sfc);
// Takes in a flow control action and performs all the needed operations. // Takes in a flow control action and performs all the needed operations.
void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, void grpc_chttp2_act_on_flowctl_action(
grpc_chttp2_flowctl_action action, grpc_exec_ctx *exec_ctx, const grpc_core::chttp2::FlowControlAction &action,
grpc_chttp2_transport *t, grpc_chttp2_transport *t, grpc_chttp2_stream *s);
grpc_chttp2_stream *s);
/********* End of Flow Control ***************/ /********* End of Flow Control ***************/
@ -800,16 +681,6 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
extern grpc_tracer_flag grpc_http_trace; extern grpc_tracer_flag grpc_http_trace;
extern grpc_tracer_flag grpc_flowctl_trace; extern grpc_tracer_flag grpc_flowctl_trace;
#ifndef NDEBUG
#define GRPC_FLOW_CONTROL_IF_TRACING(stmt) \
if (!(GRPC_TRACER_ON(grpc_flowctl_trace))) \
; \
else \
stmt
#else
#define GRPC_FLOW_CONTROL_IF_TRACING(stmt)
#endif
#define GRPC_CHTTP2_IF_TRACING(stmt) \ #define GRPC_CHTTP2_IF_TRACING(stmt) \
if (!(GRPC_TRACER_ON(grpc_http_trace))) \ if (!(GRPC_TRACER_ON(grpc_http_trace))) \
; \ ; \

@ -355,14 +355,15 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s = grpc_chttp2_stream *s =
grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id); grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
grpc_error *err = GRPC_ERROR_NONE; grpc_error *err = GRPC_ERROR_NONE;
err = grpc_chttp2_flowctl_recv_data(&t->flow_control, grpc_core::chttp2::FlowControlAction action;
s == NULL ? NULL : &s->flow_control, if (s == nullptr) {
t->incoming_frame_size); err = t->flow_control->RecvData(t->incoming_frame_size);
grpc_chttp2_act_on_flowctl_action( action = t->flow_control->MakeAction();
exec_ctx, } else {
grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, err = s->flow_control->RecvData(t->incoming_frame_size);
s == NULL ? NULL : &s->flow_control), action = s->flow_control->MakeAction();
t, s); }
grpc_chttp2_act_on_flowctl_action(exec_ctx, action, t, s);
if (err != GRPC_ERROR_NONE) { if (err != GRPC_ERROR_NONE) {
goto error_handler; goto error_handler;
} }

@ -146,13 +146,13 @@ static void report_stall(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
s->flow_controlled_bytes_flowed, s->flow_controlled_bytes_flowed,
t->settings[GRPC_ACKED_SETTINGS] t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
t->flow_control.remote_window, t->flow_control->remote_window(),
(uint32_t)GPR_MAX( (uint32_t)GPR_MAX(
0, 0,
s->flow_control.remote_window_delta + s->flow_control->remote_window_delta() +
(int64_t)t->settings[GRPC_PEER_SETTINGS] (int64_t)t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]), [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
s->flow_control.remote_window_delta); s->flow_control->remote_window_delta());
} }
static bool stream_ref_if_not_destroyed(gpr_refcount *r) { static bool stream_ref_if_not_destroyed(gpr_refcount *r) {
@ -216,8 +216,7 @@ class WriteContext {
void FlushWindowUpdates(grpc_exec_ctx *exec_ctx) { void FlushWindowUpdates(grpc_exec_ctx *exec_ctx) {
uint32_t transport_announce = uint32_t transport_announce =
grpc_chttp2_flowctl_maybe_send_transport_update(&t_->flow_control, t_->flow_control->MaybeSendUpdate(t_->outbuf.count > 0);
t_->outbuf.count > 0);
if (transport_announce) { if (transport_announce) {
grpc_transport_one_way_stats throwaway_stats; grpc_transport_one_way_stats throwaway_stats;
grpc_slice_buffer_add( grpc_slice_buffer_add(
@ -312,7 +311,7 @@ class DataSendContext {
uint32_t stream_remote_window() const { uint32_t stream_remote_window() const {
return (uint32_t)GPR_MAX( return (uint32_t)GPR_MAX(
0, s_->flow_control.remote_window_delta + 0, s_->flow_control->remote_window_delta() +
(int64_t)t_->settings[GRPC_PEER_SETTINGS] (int64_t)t_->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
} }
@ -320,7 +319,7 @@ class DataSendContext {
uint32_t max_outgoing() const { uint32_t max_outgoing() const {
return (uint32_t)GPR_MIN( return (uint32_t)GPR_MIN(
t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
GPR_MIN(stream_remote_window(), t_->flow_control.remote_window)); GPR_MIN(stream_remote_window(), t_->flow_control->remote_window()));
} }
bool AnyOutgoing() const { return max_outgoing() != 0; } bool AnyOutgoing() const { return max_outgoing() != 0; }
@ -352,8 +351,7 @@ class DataSendContext {
grpc_metadata_batch_is_empty(s_->send_trailing_metadata); grpc_metadata_batch_is_empty(s_->send_trailing_metadata);
grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes, grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes,
is_last_frame_, &s_->stats.outgoing, &t_->outbuf); is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
grpc_chttp2_flowctl_sent_data(&t_->flow_control, &s_->flow_control, s_->flow_control->SentData(send_bytes);
send_bytes);
if (s_->compressed_data_buffer.length == 0) { if (s_->compressed_data_buffer.length == 0) {
s_->sending_bytes += s_->uncompressed_data_size; s_->sending_bytes += s_->uncompressed_data_size;
} }
@ -400,8 +398,8 @@ class StreamWriteContext {
gpr_log(GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t_, gpr_log(GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t_,
t_->is_client ? "CLIENT" : "SERVER", s->id, t_->is_client ? "CLIENT" : "SERVER", s->id,
s->sent_initial_metadata, s->send_initial_metadata != NULL, s->sent_initial_metadata, s->send_initial_metadata != NULL,
(int)(s->flow_control.local_window_delta - (int)(s->flow_control->local_window_delta() -
s->flow_control.announced_window_delta))); s->flow_control->announced_window_delta())));
} }
void FlushInitialMetadata(grpc_exec_ctx *exec_ctx) { void FlushInitialMetadata(grpc_exec_ctx *exec_ctx) {
@ -447,8 +445,7 @@ class StreamWriteContext {
void FlushWindowUpdates(grpc_exec_ctx *exec_ctx) { void FlushWindowUpdates(grpc_exec_ctx *exec_ctx) {
/* send any window updates */ /* send any window updates */
uint32_t stream_announce = grpc_chttp2_flowctl_maybe_send_stream_update( const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate();
&t_->flow_control, &s_->flow_control);
if (stream_announce == 0) return; if (stream_announce == 0) return;
grpc_slice_buffer_add( grpc_slice_buffer_add(
@ -469,10 +466,10 @@ class StreamWriteContext {
DataSendContext data_send_context(write_context_, t_, s_); DataSendContext data_send_context(write_context_, t_, s_);
if (!data_send_context.AnyOutgoing()) { if (!data_send_context.AnyOutgoing()) {
if (t_->flow_control.remote_window == 0) { if (t_->flow_control->remote_window() <= 0) {
report_stall(t_, s_, "transport"); report_stall(t_, s_, "transport");
grpc_chttp2_list_add_stalled_by_transport(t_, s_); grpc_chttp2_list_add_stalled_by_transport(t_, s_);
} else if (data_send_context.stream_remote_window() == 0) { } else if (data_send_context.stream_remote_window() <= 0) {
report_stall(t_, s_, "stream"); report_stall(t_, s_, "stream");
grpc_chttp2_list_add_stalled_by_stream(t_, s_); grpc_chttp2_list_add_stalled_by_stream(t_, s_);
} }
@ -588,7 +585,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
ctx.FlushQueuedBuffers(exec_ctx); ctx.FlushQueuedBuffers(exec_ctx);
ctx.EnactHpackSettings(exec_ctx); ctx.EnactHpackSettings(exec_ctx);
if (t->flow_control.remote_window > 0) { if (t->flow_control->remote_window() > 0) {
ctx.UpdateStreamsNoLongerStalled(); ctx.UpdateStreamsNoLongerStalled();
} }

@ -40,15 +40,8 @@ class BdpEstimator {
explicit BdpEstimator(const char *name); explicit BdpEstimator(const char *name);
~BdpEstimator() {} ~BdpEstimator() {}
// Returns true if a reasonable estimate could be obtained int64_t EstimateBdp() const { return estimate_; }
bool EstimateBdp(int64_t *estimate_out) const { double EstimateBandwidth() const { return bw_est_; }
*estimate_out = estimate_;
return true;
}
bool EstimateBandwidth(double *bw_out) const {
*bw_out = bw_est_;
return true;
}
void AddIncomingBytes(int64_t num_bytes) { accumulator_ += num_bytes; } void AddIncomingBytes(int64_t num_bytes) { accumulator_ += num_bytes; }

@ -72,7 +72,7 @@ grpc_cc_test(
grpc_cc_test( grpc_cc_test(
name = "pid_controller_test", name = "pid_controller_test",
srcs = ["pid_controller_test.cc"], srcs = ["pid_controller_test.cc"],
language = "C", language = "C++",
deps = [ deps = [
"//:gpr", "//:gpr",
"//:grpc", "//:grpc",

@ -51,8 +51,7 @@ TEST(BdpEstimatorTest, NoOp) { BdpEstimator est("test"); }
TEST(BdpEstimatorTest, EstimateBdpNoSamples) { TEST(BdpEstimatorTest, EstimateBdpNoSamples) {
BdpEstimator est("test"); BdpEstimator est("test");
int64_t estimate; est.EstimateBdp();
est.EstimateBdp(&estimate);
} }
namespace { namespace {
@ -80,16 +79,14 @@ void AddSample(BdpEstimator *estimator, int64_t sample) {
TEST(BdpEstimatorTest, GetEstimate1Sample) { TEST(BdpEstimatorTest, GetEstimate1Sample) {
BdpEstimator est("test"); BdpEstimator est("test");
AddSample(&est, 100); AddSample(&est, 100);
int64_t estimate; est.EstimateBdp();
est.EstimateBdp(&estimate);
} }
TEST(BdpEstimatorTest, GetEstimate2Samples) { TEST(BdpEstimatorTest, GetEstimate2Samples) {
BdpEstimator est("test"); BdpEstimator est("test");
AddSample(&est, 100); AddSample(&est, 100);
AddSample(&est, 100); AddSample(&est, 100);
int64_t estimate; est.EstimateBdp();
est.EstimateBdp(&estimate);
} }
TEST(BdpEstimatorTest, GetEstimate3Samples) { TEST(BdpEstimatorTest, GetEstimate3Samples) {
@ -97,17 +94,10 @@ TEST(BdpEstimatorTest, GetEstimate3Samples) {
AddSample(&est, 100); AddSample(&est, 100);
AddSample(&est, 100); AddSample(&est, 100);
AddSample(&est, 100); AddSample(&est, 100);
int64_t estimate; est.EstimateBdp();
est.EstimateBdp(&estimate);
} }
namespace { namespace {
static int64_t GetEstimate(const BdpEstimator &estimator) {
int64_t out;
EXPECT_TRUE(estimator.EstimateBdp(&out));
return out;
}
int64_t NextPow2(int64_t v) { int64_t NextPow2(int64_t v) {
v--; v--;
v |= v >> 1; v |= v >> 1;
@ -134,7 +124,7 @@ TEST_P(BdpEstimatorRandomTest, GetEstimateRandomValues) {
if (sample > max) max = sample; if (sample > max) max = sample;
AddSample(&est, sample); AddSample(&est, sample);
if (i >= 3) { if (i >= 3) {
EXPECT_LE(GetEstimate(est), GPR_MAX(65536, 2 * NextPow2(max))) EXPECT_LE(est.EstimateBdp(), GPR_MAX(65536, 2 * NextPow2(max)))
<< " min:" << min << " max:" << max << " sample:" << sample; << " min:" << min << " max:" << max << " sample:" << sample;
} }
} }

@ -428,9 +428,8 @@ static void BM_TransportStreamSend(benchmark::State &state) {
return; return;
} }
// force outgoing window to be yuge // force outgoing window to be yuge
s->chttp2_stream()->flow_control.remote_window_delta = s->chttp2_stream()->flow_control->TestOnlyForceHugeWindow();
1024 * 1024 * 1024; f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow();
f.chttp2_transport()->flow_control.remote_window = 1024 * 1024 * 1024;
grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0); grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0);
reset_op(); reset_op();
op.on_complete = c.get(); op.on_complete = c.get();
@ -560,22 +559,21 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
std::unique_ptr<Closure> drain_continue; std::unique_ptr<Closure> drain_continue;
grpc_slice recv_slice; grpc_slice recv_slice;
std::unique_ptr<Closure> c = MakeClosure([&](grpc_exec_ctx *exec_ctx, std::unique_ptr<Closure> c =
grpc_error *error) { MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
if (!state.KeepRunning()) return; if (!state.KeepRunning()) return;
// force outgoing window to be yuge // force outgoing window to be yuge
s.chttp2_stream()->flow_control.local_window_delta = 1024 * 1024 * 1024; s.chttp2_stream()->flow_control->TestOnlyForceHugeWindow();
s.chttp2_stream()->flow_control.announced_window_delta = 1024 * 1024 * 1024; f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow();
f.chttp2_transport()->flow_control.announced_window = 1024 * 1024 * 1024; received = 0;
received = 0; reset_op();
reset_op(); op.on_complete = do_nothing.get();
op.on_complete = do_nothing.get(); op.recv_message = true;
op.recv_message = true; op.payload->recv_message.recv_message = &recv_stream;
op.payload->recv_message.recv_message = &recv_stream; op.payload->recv_message.recv_message_ready = drain_start.get();
op.payload->recv_message.recv_message_ready = drain_start.get(); s.Op(exec_ctx, &op);
s.Op(exec_ctx, &op); f.PushInput(grpc_slice_ref(incoming_data));
f.PushInput(grpc_slice_ref(incoming_data)); });
});
drain_start = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { drain_start = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
if (recv_stream == NULL) { if (recv_stream == NULL) {

@ -142,15 +142,18 @@ class TrickledCHTTP2 : public EndpointPairFixture {
client->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr, client->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr,
server->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr, server->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr,
server->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr, server->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr,
client->flow_control.remote_window, server->flow_control.remote_window, client->flow_control->remote_window(),
client->flow_control.announced_window, server->flow_control->remote_window(),
server->flow_control.announced_window, client->flow_control->announced_window(),
client_stream ? client_stream->flow_control.remote_window_delta : -1, server->flow_control->announced_window(),
server_stream ? server_stream->flow_control.remote_window_delta : -1, client_stream ? client_stream->flow_control->remote_window_delta() : -1,
client_stream ? client_stream->flow_control.local_window_delta : -1, server_stream ? server_stream->flow_control->remote_window_delta() : -1,
server_stream ? server_stream->flow_control.local_window_delta : -1, client_stream ? client_stream->flow_control->local_window_delta() : -1,
client_stream ? client_stream->flow_control.announced_window_delta : -1, server_stream ? server_stream->flow_control->local_window_delta() : -1,
server_stream ? server_stream->flow_control.announced_window_delta : -1, client_stream ? client_stream->flow_control->announced_window_delta()
: -1,
server_stream ? server_stream->flow_control->announced_window_delta()
: -1,
client->settings[GRPC_PEER_SETTINGS] client->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
client->settings[GRPC_LOCAL_SETTINGS] client->settings[GRPC_LOCAL_SETTINGS]

@ -1020,6 +1020,7 @@ src/core/ext/transport/chttp2/transport/chttp2_plugin.cc \
src/core/ext/transport/chttp2/transport/chttp2_transport.cc \ src/core/ext/transport/chttp2/transport/chttp2_transport.cc \
src/core/ext/transport/chttp2/transport/chttp2_transport.h \ src/core/ext/transport/chttp2/transport/chttp2_transport.h \
src/core/ext/transport/chttp2/transport/flow_control.cc \ src/core/ext/transport/chttp2/transport/flow_control.cc \
src/core/ext/transport/chttp2/transport/flow_control.h \
src/core/ext/transport/chttp2/transport/frame.h \ src/core/ext/transport/chttp2/transport/frame.h \
src/core/ext/transport/chttp2/transport/frame_data.cc \ src/core/ext/transport/chttp2/transport/frame_data.cc \
src/core/ext/transport/chttp2/transport/frame_data.h \ src/core/ext/transport/chttp2/transport/frame_data.h \

@ -9028,6 +9028,7 @@
"src/core/ext/transport/chttp2/transport/bin_decoder.h", "src/core/ext/transport/chttp2/transport/bin_decoder.h",
"src/core/ext/transport/chttp2/transport/bin_encoder.h", "src/core/ext/transport/chttp2/transport/bin_encoder.h",
"src/core/ext/transport/chttp2/transport/chttp2_transport.h", "src/core/ext/transport/chttp2/transport/chttp2_transport.h",
"src/core/ext/transport/chttp2/transport/flow_control.h",
"src/core/ext/transport/chttp2/transport/frame.h", "src/core/ext/transport/chttp2/transport/frame.h",
"src/core/ext/transport/chttp2/transport/frame_data.h", "src/core/ext/transport/chttp2/transport/frame_data.h",
"src/core/ext/transport/chttp2/transport/frame_goaway.h", "src/core/ext/transport/chttp2/transport/frame_goaway.h",
@ -9057,6 +9058,7 @@
"src/core/ext/transport/chttp2/transport/chttp2_transport.cc", "src/core/ext/transport/chttp2/transport/chttp2_transport.cc",
"src/core/ext/transport/chttp2/transport/chttp2_transport.h", "src/core/ext/transport/chttp2/transport/chttp2_transport.h",
"src/core/ext/transport/chttp2/transport/flow_control.cc", "src/core/ext/transport/chttp2/transport/flow_control.cc",
"src/core/ext/transport/chttp2/transport/flow_control.h",
"src/core/ext/transport/chttp2/transport/frame.h", "src/core/ext/transport/chttp2/transport/frame.h",
"src/core/ext/transport/chttp2/transport/frame_data.cc", "src/core/ext/transport/chttp2/transport/frame_data.cc",
"src/core/ext/transport/chttp2/transport/frame_data.h", "src/core/ext/transport/chttp2/transport/frame_data.h",

Loading…
Cancel
Save