Merge pull request #11720 from ncteisen/flow-control-v3

Flow Control
reviewable/pr10155/r3
Noah Eisen 7 years ago committed by GitHub
commit 0efb1e940a
  1. 1
      BUILD
  2. 6
      CMakeLists.txt
  3. 6
      Makefile
  4. 1
      binding.gyp
  5. 1
      build.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 1
      gRPC-Core.podspec
  9. 1
      grpc.gemspec
  10. 1
      package.xml
  11. 213
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  12. 369
      src/core/ext/transport/chttp2/transport/flow_control.c
  13. 10
      src/core/ext/transport/chttp2/transport/frame_settings.c
  14. 12
      src/core/ext/transport/chttp2/transport/frame_window_update.c
  15. 281
      src/core/ext/transport/chttp2/transport/internal.h
  16. 84
      src/core/ext/transport/chttp2/transport/parsing.c
  17. 19
      src/core/ext/transport/chttp2/transport/stream_lists.c
  18. 72
      src/core/ext/transport/chttp2/transport/writing.c
  19. 1
      src/python/grpcio/grpc_core_dependencies.py
  20. 36
      test/cpp/microbenchmarks/bm_chttp2_transport.cc
  21. 27
      test/cpp/microbenchmarks/bm_fullstack_trickle.cc
  22. 1
      tools/doxygen/Doxyfile.core.internal
  23. 1
      tools/run_tests/generated/sources_and_headers.json
  24. 2
      vsprojects/vcxproj/grpc/grpc.vcxproj
  25. 3
      vsprojects/vcxproj/grpc/grpc.vcxproj.filters
  26. 2
      vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
  27. 3
      vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
  28. 2
      vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj
  29. 3
      vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj.filters
  30. 2
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
  31. 3
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters

@ -1217,6 +1217,7 @@ grpc_cc_library(
"src/core/ext/transport/chttp2/transport/bin_encoder.c",
"src/core/ext/transport/chttp2/transport/chttp2_plugin.c",
"src/core/ext/transport/chttp2/transport/chttp2_transport.c",
"src/core/ext/transport/chttp2/transport/flow_control.c",
"src/core/ext/transport/chttp2/transport/frame_data.c",
"src/core/ext/transport/chttp2/transport/frame_goaway.c",
"src/core/ext/transport/chttp2/transport/frame_ping.c",

@ -1079,6 +1079,7 @@ add_library(grpc
src/core/ext/transport/chttp2/transport/bin_encoder.c
src/core/ext/transport/chttp2/transport/chttp2_plugin.c
src/core/ext/transport/chttp2/transport/chttp2_transport.c
src/core/ext/transport/chttp2/transport/flow_control.c
src/core/ext/transport/chttp2/transport/frame_data.c
src/core/ext/transport/chttp2/transport/frame_goaway.c
src/core/ext/transport/chttp2/transport/frame_ping.c
@ -1425,6 +1426,7 @@ add_library(grpc_cronet
src/core/ext/transport/chttp2/transport/bin_encoder.c
src/core/ext/transport/chttp2/transport/chttp2_plugin.c
src/core/ext/transport/chttp2/transport/chttp2_transport.c
src/core/ext/transport/chttp2/transport/flow_control.c
src/core/ext/transport/chttp2/transport/frame_data.c
src/core/ext/transport/chttp2/transport/frame_goaway.c
src/core/ext/transport/chttp2/transport/frame_ping.c
@ -1753,6 +1755,7 @@ add_library(grpc_test_util
src/core/ext/transport/chttp2/transport/bin_encoder.c
src/core/ext/transport/chttp2/transport/chttp2_plugin.c
src/core/ext/transport/chttp2/transport/chttp2_transport.c
src/core/ext/transport/chttp2/transport/flow_control.c
src/core/ext/transport/chttp2/transport/frame_data.c
src/core/ext/transport/chttp2/transport/frame_goaway.c
src/core/ext/transport/chttp2/transport/frame_ping.c
@ -2009,6 +2012,7 @@ add_library(grpc_test_util_unsecure
src/core/ext/transport/chttp2/transport/bin_encoder.c
src/core/ext/transport/chttp2/transport/chttp2_plugin.c
src/core/ext/transport/chttp2/transport/chttp2_transport.c
src/core/ext/transport/chttp2/transport/flow_control.c
src/core/ext/transport/chttp2/transport/frame_data.c
src/core/ext/transport/chttp2/transport/frame_goaway.c
src/core/ext/transport/chttp2/transport/frame_ping.c
@ -2232,6 +2236,7 @@ add_library(grpc_unsecure
src/core/ext/transport/chttp2/transport/bin_encoder.c
src/core/ext/transport/chttp2/transport/chttp2_plugin.c
src/core/ext/transport/chttp2/transport/chttp2_transport.c
src/core/ext/transport/chttp2/transport/flow_control.c
src/core/ext/transport/chttp2/transport/frame_data.c
src/core/ext/transport/chttp2/transport/frame_goaway.c
src/core/ext/transport/chttp2/transport/frame_ping.c
@ -2774,6 +2779,7 @@ add_library(grpc++_cronet
src/core/ext/transport/chttp2/transport/bin_encoder.c
src/core/ext/transport/chttp2/transport/chttp2_plugin.c
src/core/ext/transport/chttp2/transport/chttp2_transport.c
src/core/ext/transport/chttp2/transport/flow_control.c
src/core/ext/transport/chttp2/transport/frame_data.c
src/core/ext/transport/chttp2/transport/frame_goaway.c
src/core/ext/transport/chttp2/transport/frame_ping.c

@ -3026,6 +3026,7 @@ LIBGRPC_SRC = \
src/core/ext/transport/chttp2/transport/bin_encoder.c \
src/core/ext/transport/chttp2/transport/chttp2_plugin.c \
src/core/ext/transport/chttp2/transport/chttp2_transport.c \
src/core/ext/transport/chttp2/transport/flow_control.c \
src/core/ext/transport/chttp2/transport/frame_data.c \
src/core/ext/transport/chttp2/transport/frame_goaway.c \
src/core/ext/transport/chttp2/transport/frame_ping.c \
@ -3370,6 +3371,7 @@ LIBGRPC_CRONET_SRC = \
src/core/ext/transport/chttp2/transport/bin_encoder.c \
src/core/ext/transport/chttp2/transport/chttp2_plugin.c \
src/core/ext/transport/chttp2/transport/chttp2_transport.c \
src/core/ext/transport/chttp2/transport/flow_control.c \
src/core/ext/transport/chttp2/transport/frame_data.c \
src/core/ext/transport/chttp2/transport/frame_goaway.c \
src/core/ext/transport/chttp2/transport/frame_ping.c \
@ -3695,6 +3697,7 @@ LIBGRPC_TEST_UTIL_SRC = \
src/core/ext/transport/chttp2/transport/bin_encoder.c \
src/core/ext/transport/chttp2/transport/chttp2_plugin.c \
src/core/ext/transport/chttp2/transport/chttp2_transport.c \
src/core/ext/transport/chttp2/transport/flow_control.c \
src/core/ext/transport/chttp2/transport/frame_data.c \
src/core/ext/transport/chttp2/transport/frame_goaway.c \
src/core/ext/transport/chttp2/transport/frame_ping.c \
@ -3940,6 +3943,7 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \
src/core/ext/transport/chttp2/transport/bin_encoder.c \
src/core/ext/transport/chttp2/transport/chttp2_plugin.c \
src/core/ext/transport/chttp2/transport/chttp2_transport.c \
src/core/ext/transport/chttp2/transport/flow_control.c \
src/core/ext/transport/chttp2/transport/frame_data.c \
src/core/ext/transport/chttp2/transport/frame_goaway.c \
src/core/ext/transport/chttp2/transport/frame_ping.c \
@ -4139,6 +4143,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/transport/chttp2/transport/bin_encoder.c \
src/core/ext/transport/chttp2/transport/chttp2_plugin.c \
src/core/ext/transport/chttp2/transport/chttp2_transport.c \
src/core/ext/transport/chttp2/transport/flow_control.c \
src/core/ext/transport/chttp2/transport/frame_data.c \
src/core/ext/transport/chttp2/transport/frame_goaway.c \
src/core/ext/transport/chttp2/transport/frame_ping.c \
@ -4664,6 +4669,7 @@ LIBGRPC++_CRONET_SRC = \
src/core/ext/transport/chttp2/transport/bin_encoder.c \
src/core/ext/transport/chttp2/transport/chttp2_plugin.c \
src/core/ext/transport/chttp2/transport/chttp2_transport.c \
src/core/ext/transport/chttp2/transport/flow_control.c \
src/core/ext/transport/chttp2/transport/frame_data.c \
src/core/ext/transport/chttp2/transport/frame_goaway.c \
src/core/ext/transport/chttp2/transport/frame_ping.c \

@ -788,6 +788,7 @@
'src/core/ext/transport/chttp2/transport/bin_encoder.c',
'src/core/ext/transport/chttp2/transport/chttp2_plugin.c',
'src/core/ext/transport/chttp2/transport/chttp2_transport.c',
'src/core/ext/transport/chttp2/transport/flow_control.c',
'src/core/ext/transport/chttp2/transport/frame_data.c',
'src/core/ext/transport/chttp2/transport/frame_goaway.c',
'src/core/ext/transport/chttp2/transport/frame_ping.c',

@ -782,6 +782,7 @@ filegroups:
- src/core/ext/transport/chttp2/transport/bin_encoder.c
- src/core/ext/transport/chttp2/transport/chttp2_plugin.c
- src/core/ext/transport/chttp2/transport/chttp2_transport.c
- src/core/ext/transport/chttp2/transport/flow_control.c
- src/core/ext/transport/chttp2/transport/frame_data.c
- src/core/ext/transport/chttp2/transport/frame_goaway.c
- src/core/ext/transport/chttp2/transport/frame_ping.c

@ -217,6 +217,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/transport/chttp2/transport/bin_encoder.c \
src/core/ext/transport/chttp2/transport/chttp2_plugin.c \
src/core/ext/transport/chttp2/transport/chttp2_transport.c \
src/core/ext/transport/chttp2/transport/flow_control.c \
src/core/ext/transport/chttp2/transport/frame_data.c \
src/core/ext/transport/chttp2/transport/frame_goaway.c \
src/core/ext/transport/chttp2/transport/frame_ping.c \

@ -194,6 +194,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\transport\\chttp2\\transport\\bin_encoder.c " +
"src\\core\\ext\\transport\\chttp2\\transport\\chttp2_plugin.c " +
"src\\core\\ext\\transport\\chttp2\\transport\\chttp2_transport.c " +
"src\\core\\ext\\transport\\chttp2\\transport\\flow_control.c " +
"src\\core\\ext\\transport\\chttp2\\transport\\frame_data.c " +
"src\\core\\ext\\transport\\chttp2\\transport\\frame_goaway.c " +
"src\\core\\ext\\transport\\chttp2\\transport\\frame_ping.c " +

@ -593,6 +593,7 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/transport/bin_encoder.c',
'src/core/ext/transport/chttp2/transport/chttp2_plugin.c',
'src/core/ext/transport/chttp2/transport/chttp2_transport.c',
'src/core/ext/transport/chttp2/transport/flow_control.c',
'src/core/ext/transport/chttp2/transport/frame_data.c',
'src/core/ext/transport/chttp2/transport/frame_goaway.c',
'src/core/ext/transport/chttp2/transport/frame_ping.c',

@ -525,6 +525,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/transport/chttp2/transport/bin_encoder.c )
s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_plugin.c )
s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_transport.c )
s.files += %w( src/core/ext/transport/chttp2/transport/flow_control.c )
s.files += %w( src/core/ext/transport/chttp2/transport/frame_data.c )
s.files += %w( src/core/ext/transport/chttp2/transport/frame_goaway.c )
s.files += %w( src/core/ext/transport/chttp2/transport/frame_ping.c )

@ -539,6 +539,7 @@
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/bin_encoder.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_plugin.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_transport.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/flow_control.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/frame_data.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/frame_goaway.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/frame_ping.c" role="src" />

@ -114,11 +114,6 @@ static void connectivity_state_set(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state state,
grpc_error *error, const char *reason);
static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
size_t max_size_hint,
size_t have_already);
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream,
grpc_error *error_ignored);
@ -270,8 +265,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->endpoint_reading = 1;
t->next_stream_id = is_client ? 1 : 2;
t->is_client = is_client;
t->outgoing_window = DEFAULT_WINDOW;
t->incoming_window = DEFAULT_WINDOW;
t->flow_control.remote_window = DEFAULT_WINDOW;
t->flow_control.announced_window = DEFAULT_WINDOW;
t->flow_control.t = t;
t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->is_first_frame = true;
grpc_connectivity_state_init(
@ -710,6 +706,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
post_destructive_reclaimer(exec_ctx, t);
}
s->flow_control.s = s;
GPR_TIMER_END("init_stream", 0);
return 0;
@ -766,13 +763,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GRPC_ERROR_UNREF(s->write_closed_error);
GRPC_ERROR_UNREF(s->byte_stream_error);
if (s->incoming_window_delta > 0) {
GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA(
"destroy", t, s, s->incoming_window_delta);
} else if (s->incoming_window_delta < 0) {
GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA(
"destroy", t, s, -s->incoming_window_delta);
}
grpc_chttp2_flowctl_destroy_stream(&t->flow_control, &s->flow_control);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream");
@ -1476,9 +1467,16 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
s->recv_message_ready = op_payload->recv_message.recv_message_ready;
s->recv_message = op_payload->recv_message.recv_message;
if (s->id != 0) {
already_received = s->frame_storage.length;
incoming_byte_stream_update_flow_control(
exec_ctx, t, s, GRPC_HEADER_SIZE_IN_BYTES, already_received);
if (!s->read_closed) {
already_received = s->frame_storage.length;
grpc_chttp2_flowctl_incoming_bs_update(
&t->flow_control, &s->flow_control, GRPC_HEADER_SIZE_IN_BYTES,
already_received);
grpc_chttp2_act_on_flowctl_action(
exec_ctx,
grpc_chttp2_flowctl_get_action(&t->flow_control, &s->flow_control),
t, s);
}
}
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
@ -2228,6 +2226,37 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
* INPUT PROCESSING - PARSING
*/
void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
grpc_chttp2_flowctl_action action,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
switch (action.send_stream_update) {
case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
break;
case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
grpc_chttp2_become_writable(exec_ctx, t, s,
GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
"immediate stream flowctl");
break;
case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
grpc_chttp2_become_writable(exec_ctx, t, s,
GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK,
"queue stream flowctl");
break;
}
switch (action.send_transport_update) {
case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
break;
case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
grpc_chttp2_initiate_write(exec_ctx, t, "immediate transport flowctl");
break;
// this is the same as no action b/c every time the transport enters the
// writing path it will maybe do an update
case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
break;
}
}
static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
double bdp_dbl) {
// initial window size bounded [1,2^31-1], but we set the min to 128.
@ -2239,9 +2268,10 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (delta == 0 || (delta > -bdp / 10 && delta < bdp / 10)) {
return;
}
if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
gpr_log(GPR_DEBUG, "%s: update initial window size to %d", t->peer_string,
(int)bdp);
if (GRPC_TRACER_ON(grpc_bdp_estimator_trace) ||
GRPC_TRACER_ON(grpc_flowctl_trace)) {
gpr_log(GPR_DEBUG, "%s | %p[%s] | update initial window size to %d",
t->peer_string, t, t->is_client ? "cli" : "svr", (int)bdp);
}
queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
(uint32_t)bdp);
@ -2341,8 +2371,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END("reading_action.parse", 0);
GPR_TIMER_BEGIN("post_parse_locked", 0);
if (t->initial_window_update != 0) {
if (t->initial_window_update > 0) {
if (t->flow_control.initial_window_update != 0) {
if (t->flow_control.initial_window_update > 0) {
grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
grpc_chttp2_become_writable(
@ -2350,7 +2380,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
"unstalled");
}
}
t->initial_window_update = 0;
t->flow_control.initial_window_update = 0;
}
GPR_TIMER_END("post_parse_locked", 0);
}
@ -2624,54 +2654,6 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
}
}
static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
size_t max_size_hint,
size_t have_already) {
uint32_t max_recv_bytes;
uint32_t initial_window_size =
t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
/* clamp max recv hint to an allowable size */
if (max_size_hint >= UINT32_MAX - initial_window_size) {
max_recv_bytes = UINT32_MAX - initial_window_size;
} else {
max_recv_bytes = (uint32_t)max_size_hint;
}
/* account for bytes already received but unknown to higher layers */
if (max_recv_bytes >= have_already) {
max_recv_bytes -= (uint32_t)have_already;
} else {
max_recv_bytes = 0;
}
/* add some small lookahead to keep pipelines flowing */
GPR_ASSERT(max_recv_bytes <= UINT32_MAX - initial_window_size);
if (s->incoming_window_delta < max_recv_bytes && !s->read_closed) {
uint32_t add_max_recv_bytes =
(uint32_t)(max_recv_bytes - s->incoming_window_delta);
grpc_chttp2_stream_write_type write_type =
GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED;
if (s->incoming_window_delta + initial_window_size <
(int64_t)have_already) {
write_type = GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED;
}
GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA("op", t, s,
add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window,
add_max_recv_bytes);
if ((int64_t)s->incoming_window_delta + (int64_t)initial_window_size -
(int64_t)s->announce_window >
(int64_t)initial_window_size / 2) {
write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK;
}
grpc_chttp2_become_writable(exec_ctx, t, s, write_type,
"read_incoming_stream");
}
}
static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
void *argp,
grpc_error *error_ignored) {
@ -2680,9 +2662,15 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s = bs->stream;
size_t cur_length = s->frame_storage.length;
incoming_byte_stream_update_flow_control(
exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
if (!s->read_closed) {
grpc_chttp2_flowctl_incoming_bs_update(&t->flow_control, &s->flow_control,
bs->next_action.max_size_hint,
cur_length);
grpc_chttp2_act_on_flowctl_action(
exec_ctx,
grpc_chttp2_flowctl_get_action(&t->flow_control, &s->flow_control), t,
s);
}
GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
if (s->frame_storage.length > 0) {
grpc_slice_buffer_swap(&s->frame_storage,
@ -2991,83 +2979,6 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destructive_reclaimer");
}
/*******************************************************************************
* TRACING
*/
static char *format_flowctl_context_var(const char *context, const char *var,
int64_t val, uint32_t id) {
char *name;
if (context == NULL) {
name = gpr_strdup(var);
} else if (0 == strcmp(context, "t")) {
GPR_ASSERT(id == 0);
gpr_asprintf(&name, "TRANSPORT:%s", var);
} else if (0 == strcmp(context, "s")) {
GPR_ASSERT(id != 0);
gpr_asprintf(&name, "STREAM[%d]:%s", id, var);
} else {
gpr_asprintf(&name, "BAD_CONTEXT[%s][%d]:%s", context, id, var);
}
char *name_fld = gpr_leftpad(name, ' ', 64);
char *value;
gpr_asprintf(&value, "%" PRId64, val);
char *value_fld = gpr_leftpad(value, ' ', 8);
char *result;
gpr_asprintf(&result, "%s %s", name_fld, value_fld);
gpr_free(name);
gpr_free(name_fld);
gpr_free(value);
gpr_free(value_fld);
return result;
}
void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
grpc_chttp2_flowctl_op op, const char *context1,
const char *var1, const char *context2,
const char *var2, int is_client,
uint32_t stream_id, int64_t val1, int64_t val2) {
char *tmp_phase;
char *label1 = format_flowctl_context_var(context1, var1, val1, stream_id);
char *label2 = format_flowctl_context_var(context2, var2, val2, stream_id);
char *clisvr = is_client ? "client" : "server";
char *prefix;
tmp_phase = gpr_leftpad(phase, ' ', 8);
gpr_asprintf(&prefix, "FLOW %s: %s ", tmp_phase, clisvr);
gpr_free(tmp_phase);
switch (op) {
case GRPC_CHTTP2_FLOWCTL_MOVE:
if (val2 != 0) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"%sMOVE %s <- %s giving %" PRId64, prefix, label1, label2,
val1 + val2);
}
break;
case GRPC_CHTTP2_FLOWCTL_CREDIT:
GPR_ASSERT(val2 >= 0);
if (val2 != 0) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"%sCREDIT %s by %s giving %" PRId64, prefix, label1, label2,
val1 + val2);
}
break;
case GRPC_CHTTP2_FLOWCTL_DEBIT:
GPR_ASSERT(val2 >= 0);
if (val2 != 0) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"%sDEBIT %s by %s giving %" PRId64, prefix, label1, label2,
val1 - val2);
}
break;
}
gpr_free(label1);
gpr_free(label2);
gpr_free(prefix);
}
/*******************************************************************************
* INTEGRATION GLUE
*/

@ -0,0 +1,369 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/lib/support/string.h"
static uint32_t grpc_chttp2_target_announced_window(
const grpc_chttp2_transport_flowctl* tfc);
#ifndef NDEBUG
typedef struct {
int64_t remote_window;
int64_t target_window;
int64_t announced_window;
int64_t remote_window_delta;
int64_t local_window_delta;
int64_t announced_window_delta;
} shadow_flow_control;
static void pretrace(shadow_flow_control* shadow_fc,
grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc) {
shadow_fc->remote_window = tfc->remote_window;
shadow_fc->target_window = grpc_chttp2_target_announced_window(tfc);
shadow_fc->announced_window = tfc->announced_window;
if (sfc != NULL) {
shadow_fc->remote_window_delta = sfc->remote_window_delta;
shadow_fc->local_window_delta = sfc->local_window_delta;
shadow_fc->announced_window_delta = sfc->announced_window_delta;
}
}
static char* fmt_str(int64_t old, int64_t new) {
char* str;
if (old != new) {
gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old, new);
} else {
gpr_asprintf(&str, "%" PRId64 "", old);
}
char* str_lp = gpr_leftpad(str, ' ', 30);
gpr_free(str);
return str_lp;
}
static void posttrace(shadow_flow_control* shadow_fc,
grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc, char* reason) {
uint32_t acked_local_window =
tfc->t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
uint32_t remote_window =
tfc->t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
char* trw_str = fmt_str(shadow_fc->remote_window, tfc->remote_window);
char* tlw_str = fmt_str(shadow_fc->target_window,
grpc_chttp2_target_announced_window(tfc));
char* taw_str = fmt_str(shadow_fc->announced_window, tfc->announced_window);
char* srw_str;
char* slw_str;
char* saw_str;
if (sfc != NULL) {
srw_str = fmt_str(shadow_fc->remote_window_delta + remote_window,
sfc->remote_window_delta + remote_window);
slw_str = fmt_str(shadow_fc->local_window_delta + acked_local_window,
sfc->local_window_delta + acked_local_window);
saw_str = fmt_str(shadow_fc->announced_window_delta + acked_local_window,
sfc->announced_window_delta + acked_local_window);
} else {
srw_str = gpr_leftpad("", ' ', 30);
slw_str = gpr_leftpad("", ' ', 30);
saw_str = gpr_leftpad("", ' ', 30);
}
gpr_log(GPR_DEBUG,
"%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s",
tfc, sfc != NULL ? sfc->s->id : 0, tfc->t->is_client ? "cli" : "svr",
reason, trw_str, tlw_str, taw_str, srw_str, slw_str, saw_str);
gpr_free(trw_str);
gpr_free(tlw_str);
gpr_free(taw_str);
gpr_free(srw_str);
gpr_free(slw_str);
gpr_free(saw_str);
}
static char* urgency_to_string(grpc_chttp2_flowctl_urgency urgency) {
switch (urgency) {
case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
return "no action";
case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
return "update immediately";
case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
return "queue update";
default:
GPR_UNREACHABLE_CODE(return "unknown");
}
GPR_UNREACHABLE_CODE(return "unknown");
}
static void trace_action(grpc_chttp2_flowctl_action action) {
gpr_log(GPR_DEBUG, "transport: %s, stream: %s",
urgency_to_string(action.send_transport_update),
urgency_to_string(action.send_stream_update));
}
#define PRETRACE(tfc, sfc) \
shadow_flow_control shadow_fc; \
GRPC_FLOW_CONTROL_IF_TRACING(pretrace(&shadow_fc, tfc, sfc))
#define POSTTRACE(tfc, sfc, reason) \
GRPC_FLOW_CONTROL_IF_TRACING(posttrace(&shadow_fc, tfc, sfc, reason))
#define TRACEACTION(action) GRPC_FLOW_CONTROL_IF_TRACING(trace_action(action))
#else
#define PRETRACE(tfc, sfc)
#define POSTTRACE(tfc, sfc, reason)
#define TRACEACTION(action)
#endif
/* How many bytes of incoming flow control would we like to advertise */
static uint32_t grpc_chttp2_target_announced_window(
const grpc_chttp2_transport_flowctl* tfc) {
return (uint32_t)GPR_MIN(
(int64_t)((1u << 31) - 1),
tfc->announced_stream_total_over_incoming_window +
tfc->t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
}
// we have sent data on the wire, we must track this in our bookkeeping for the
// remote peer's flow control.
void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc,
int64_t size) {
PRETRACE(tfc, sfc);
tfc->remote_window -= size;
sfc->remote_window_delta -= size;
POSTTRACE(tfc, sfc, " data sent");
}
static void announced_window_delta_preupdate(grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc) {
if (sfc->announced_window_delta > 0) {
tfc->announced_stream_total_over_incoming_window -=
sfc->announced_window_delta;
} else {
tfc->announced_stream_total_under_incoming_window +=
-sfc->announced_window_delta;
}
}
static void announced_window_delta_postupdate(
grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) {
if (sfc->announced_window_delta > 0) {
tfc->announced_stream_total_over_incoming_window +=
sfc->announced_window_delta;
} else {
tfc->announced_stream_total_under_incoming_window -=
-sfc->announced_window_delta;
}
}
// We have received data from the wire. We must track this in our own flow
// control bookkeeping.
// Returns an error if the incoming frame violates our flow control.
grpc_error* grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc,
int64_t incoming_frame_size) {
uint32_t sent_init_window =
tfc->t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
uint32_t acked_init_window =
tfc->t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
PRETRACE(tfc, sfc);
if (incoming_frame_size > tfc->announced_window) {
char* msg;
gpr_asprintf(&msg,
"frame of size %" PRId64 " overflows local window of %" PRId64,
incoming_frame_size, tfc->announced_window);
grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg);
return err;
}
if (sfc != NULL) {
int64_t acked_stream_window =
sfc->announced_window_delta + acked_init_window;
int64_t sent_stream_window = sfc->announced_window_delta + sent_init_window;
if (incoming_frame_size > acked_stream_window) {
if (incoming_frame_size <= sent_stream_window) {
gpr_log(
GPR_ERROR,
"Incoming frame of size %" PRId64
" exceeds local window size of %" PRId64
".\n"
"The (un-acked, future) window size would be %" PRId64
" which is not exceeded.\n"
"This would usually cause a disconnection, but allowing it due to"
"broken HTTP2 implementations in the wild.\n"
"See (for example) https://github.com/netty/netty/issues/6520.",
incoming_frame_size, acked_stream_window, sent_stream_window);
} else {
char* msg;
gpr_asprintf(&msg, "frame of size %" PRId64
" overflows local window of %" PRId64,
incoming_frame_size, acked_stream_window);
grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg);
return err;
}
}
announced_window_delta_preupdate(tfc, sfc);
sfc->announced_window_delta -= incoming_frame_size;
announced_window_delta_postupdate(tfc, sfc);
sfc->local_window_delta -= incoming_frame_size;
}
tfc->announced_window -= incoming_frame_size;
POSTTRACE(tfc, sfc, " data recv");
return GRPC_ERROR_NONE;
}
// Returns a non zero announce integer if we should send a transport window
// update
uint32_t grpc_chttp2_flowctl_maybe_send_transport_update(
grpc_chttp2_transport_flowctl* tfc) {
PRETRACE(tfc, NULL);
uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc);
uint32_t threshold_to_send_transport_window_update =
tfc->t->outbuf.count > 0 ? 3 * target_announced_window / 4
: target_announced_window / 2;
if (tfc->announced_window <= threshold_to_send_transport_window_update &&
tfc->announced_window != target_announced_window) {
uint32_t announce = (uint32_t)GPR_CLAMP(
target_announced_window - tfc->announced_window, 0, UINT32_MAX);
tfc->announced_window += announce;
POSTTRACE(tfc, NULL, "t updt sent");
return announce;
}
GRPC_FLOW_CONTROL_IF_TRACING(
gpr_log(GPR_DEBUG, "%p[0][%s] will not send transport update", tfc,
tfc->t->is_client ? "cli" : "svr"));
return 0;
}
// Returns a non zero announce integer if we should send a stream window update
uint32_t grpc_chttp2_flowctl_maybe_send_stream_update(
grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) {
PRETRACE(tfc, sfc);
if (sfc->local_window_delta > sfc->announced_window_delta) {
uint32_t announce = (uint32_t)GPR_CLAMP(
sfc->local_window_delta - sfc->announced_window_delta, 0, UINT32_MAX);
announced_window_delta_preupdate(tfc, sfc);
sfc->announced_window_delta += announce;
announced_window_delta_postupdate(tfc, sfc);
POSTTRACE(tfc, sfc, "s updt sent");
return announce;
}
GRPC_FLOW_CONTROL_IF_TRACING(
gpr_log(GPR_DEBUG, "%p[%u][%s] will not send stream update", tfc,
sfc->s->id, tfc->t->is_client ? "cli" : "svr"));
return 0;
}
// we have received a WINDOW_UPDATE frame for a transport
void grpc_chttp2_flowctl_recv_transport_update(
grpc_chttp2_transport_flowctl* tfc, uint32_t size) {
PRETRACE(tfc, NULL);
tfc->remote_window += size;
POSTTRACE(tfc, NULL, "t updt recv");
}
// we have received a WINDOW_UPDATE frame for a stream
void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc,
uint32_t size) {
PRETRACE(tfc, sfc);
sfc->remote_window_delta += size;
POSTTRACE(tfc, sfc, "s updt recv");
}
void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc,
size_t max_size_hint,
size_t have_already) {
PRETRACE(tfc, sfc);
uint32_t max_recv_bytes;
uint32_t sent_init_window =
tfc->t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
/* clamp max recv hint to an allowable size */
if (max_size_hint >= UINT32_MAX - sent_init_window) {
max_recv_bytes = UINT32_MAX - sent_init_window;
} else {
max_recv_bytes = (uint32_t)max_size_hint;
}
/* account for bytes already received but unknown to higher layers */
if (max_recv_bytes >= have_already) {
max_recv_bytes -= (uint32_t)have_already;
} else {
max_recv_bytes = 0;
}
/* add some small lookahead to keep pipelines flowing */
GPR_ASSERT(max_recv_bytes <= UINT32_MAX - sent_init_window);
if (sfc->local_window_delta < max_recv_bytes) {
uint32_t add_max_recv_bytes =
(uint32_t)(max_recv_bytes - sfc->local_window_delta);
sfc->local_window_delta += add_max_recv_bytes;
}
POSTTRACE(tfc, sfc, "app st recv");
}
void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc) {
announced_window_delta_preupdate(tfc, sfc);
}
grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
const grpc_chttp2_transport_flowctl* tfc,
const grpc_chttp2_stream_flowctl* sfc) {
grpc_chttp2_flowctl_action action;
memset(&action, 0, sizeof(action));
uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc);
if (tfc->announced_window < target_announced_window / 2) {
action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY;
}
if (sfc != NULL && !sfc->s->read_closed) {
uint32_t sent_init_window =
tfc->t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
if ((int64_t)sfc->local_window_delta >
(int64_t)sfc->announced_window_delta &&
(int64_t)sfc->announced_window_delta + sent_init_window <=
sent_init_window / 2) {
action.send_stream_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY;
} else if (sfc->local_window_delta > sfc->announced_window_delta) {
action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE;
}
}
TRACEACTION(action);
return action;
}

@ -201,11 +201,13 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p,
}
if (id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE &&
parser->incoming_settings[id] != parser->value) {
t->initial_window_update +=
t->flow_control.initial_window_update +=
(int64_t)parser->value - parser->incoming_settings[id];
if (GRPC_TRACER_ON(grpc_http_trace)) {
gpr_log(GPR_DEBUG, "adding %d for initial_window change",
(int)t->initial_window_update);
if (GRPC_TRACER_ON(grpc_http_trace) ||
GRPC_TRACER_ON(grpc_flowctl_trace)) {
gpr_log(GPR_DEBUG, "%p[%s] adding %d for initial_window change",
t, t->is_client ? "cli" : "svr",
(int)t->flow_control.initial_window_update);
}
}
parser->incoming_settings[id] = parser->value;

@ -95,8 +95,8 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
if (t->incoming_stream_id != 0) {
if (s != NULL) {
GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", t, s, outgoing_window_delta,
received_update);
grpc_chttp2_flowctl_recv_stream_update(
&t->flow_control, &s->flow_control, received_update);
if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) {
grpc_chttp2_become_writable(
exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED,
@ -104,10 +104,10 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
}
}
} else {
bool was_zero = t->outgoing_window <= 0;
GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parse", t, outgoing_window,
received_update);
bool is_zero = t->outgoing_window <= 0;
bool was_zero = t->flow_control.remote_window <= 0;
grpc_chttp2_flowctl_recv_transport_update(&t->flow_control,
received_update);
bool is_zero = t->flow_control.remote_window <= 0;
if (was_zero && !is_zero) {
grpc_chttp2_initiate_write(exec_ctx, t, "new_global_flow_control");
}

@ -213,6 +213,35 @@ typedef enum {
GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
} grpc_chttp2_keepalive_state;
typedef struct {
/** initial window change. This is tracked as we parse settings frames from
* the remote peer. If there is a positive delta, then we will make all
* streams readable since they may have become unstalled */
int64_t initial_window_update;
/** Our bookkeeping for the remote peer's available window */
int64_t remote_window;
/** calculating what we should give for local window:
we track the total amount of flow control over initial window size
across all streams: this is data that we want to receive right now (it
has an outstanding read)
and the total amount of flow control under initial window size across all
streams: this is data we've read early
we want to adjust incoming_window such that:
incoming_window = total_over - max(bdp - total_under, 0) */
int64_t announced_stream_total_over_incoming_window;
int64_t announced_stream_total_under_incoming_window;
/** This is out window according to what we have sent to our remote peer. The
* difference between this and target window is what we use to decide when
* to send WINDOW_UPDATE frames. */
int64_t announced_window;
// read only pointer back to transport for certain data
const grpc_chttp2_transport *t;
} grpc_chttp2_transport_flowctl;
struct grpc_chttp2_transport {
grpc_transport base; /* must be first */
gpr_refcount refs;
@ -271,7 +300,6 @@ struct grpc_chttp2_transport {
grpc_slice_buffer outbuf;
/** hpack encoding */
grpc_chttp2_hpack_compressor hpack_compressor;
int64_t outgoing_window;
/** is this a client? */
uint8_t is_client;
@ -328,21 +356,14 @@ struct grpc_chttp2_transport {
/** parser for goaway frames */
grpc_chttp2_goaway_parser goaway_parser;
/** initial window change */
int64_t initial_window_update;
grpc_chttp2_transport_flowctl flow_control;
/** window available for peer to send to us */
int64_t incoming_window;
/** calculating what we should give for incoming window:
we track the total amount of flow control over initial window size
across all streams: this is data that we want to receive right now (it
has an outstanding read)
and the total amount of flow control under initial window size across all
streams: this is data we've read early
we want to adjust incoming_window such that:
incoming_window = total_over - max(bdp - total_under, 0) */
int64_t stream_total_over_incoming_window;
int64_t stream_total_under_incoming_window;
/* bdp estimation */
grpc_bdp_estimator bdp_estimator;
/* pid controller */
grpc_pid_controller pid_controller;
gpr_timespec last_pid_update;
/* deframing */
grpc_chttp2_deframe_transport_state deframe_state;
@ -369,11 +390,8 @@ struct grpc_chttp2_transport {
grpc_chttp2_write_cb *write_cb_pool;
/* bdp estimator */
grpc_bdp_estimator bdp_estimator;
grpc_pid_controller pid_controller;
grpc_closure start_bdp_ping_locked;
grpc_closure finish_bdp_ping_locked;
gpr_timespec last_pid_update;
/* if non-NULL, close the transport with this error when writes are finished
*/
@ -422,6 +440,25 @@ typedef enum {
GPRC_METADATA_PUBLISHED_AT_CLOSE
} grpc_published_metadata_method;
typedef struct {
/** window available for us to send to peer, over or under the initial window
* size of the transport... ie:
* remote_window = remote_window_delta + transport.initial_window_size */
int64_t remote_window_delta;
/** window available for peer to send to us (as a delta on
* transport.initial_window_size)
* local_window = local_window_delta + transport.initial_window_size */
int64_t local_window_delta;
/** window available for peer to send to us over this stream that we have
* announced to the peer */
int64_t announced_window_delta;
// read only pointer back to stream for data
const grpc_chttp2_stream *s;
} grpc_chttp2_stream_flowctl;
struct grpc_chttp2_stream {
grpc_chttp2_transport *t;
grpc_stream_refcount *refcount;
@ -435,10 +472,6 @@ struct grpc_chttp2_stream {
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
uint32_t id;
/** window available for us to send to peer, over or under the initial window
* size of the transport... ie:
* outgoing_window = outgoing_window_delta + transport.initial_window_size */
int64_t outgoing_window_delta;
/** things the upper layers would like to send */
grpc_metadata_batch *send_initial_metadata;
grpc_closure *send_initial_metadata_finished;
@ -505,10 +538,6 @@ struct grpc_chttp2_stream {
grpc_error *forced_close_error;
/** how many header frames have we received? */
uint8_t header_frames_received;
/** window available for peer to send to us (as a delta on
* transport.initial_window_size)
* incoming_window = incoming_window_delta + transport.initial_window_size */
int64_t incoming_window_delta;
/** parsing state for data frames */
/* Accessed only by transport thread when stream->pending_byte_stream == false
* Accessed only by application thread when stream->pending_byte_stream ==
@ -519,8 +548,9 @@ struct grpc_chttp2_stream {
bool sent_initial_metadata;
bool sent_trailing_metadata;
/** how much window should we announce? */
uint32_t announce_window;
grpc_chttp2_stream_flowctl flow_control;
grpc_slice_buffer flow_controlled_buffer;
grpc_chttp2_write_cb *on_write_finished_cbs;
@ -621,6 +651,75 @@ bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport *t,
bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
/********* Flow Control ***************/
// we have sent data on the wire
void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl *tfc,
grpc_chttp2_stream_flowctl *sfc,
int64_t size);
// we have received data from the wire
grpc_error *grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl *tfc,
grpc_chttp2_stream_flowctl *sfc,
int64_t incoming_frame_size);
// returns an announce if we should send a transport update to our peer,
// else returns zero
uint32_t grpc_chttp2_flowctl_maybe_send_transport_update(
grpc_chttp2_transport_flowctl *tfc);
// returns an announce if we should send a stream update to our peer, else
// returns zero
uint32_t grpc_chttp2_flowctl_maybe_send_stream_update(
grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc);
// we have received a WINDOW_UPDATE frame for a transport
void grpc_chttp2_flowctl_recv_transport_update(
grpc_chttp2_transport_flowctl *tfc, uint32_t size);
// we have received a WINDOW_UPDATE frame for a stream
void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl *tfc,
grpc_chttp2_stream_flowctl *sfc,
uint32_t size);
// the application is asking for a certain amount of bytes
void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl *tfc,
grpc_chttp2_stream_flowctl *sfc,
size_t max_size_hint,
size_t have_already);
void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl *tfc,
grpc_chttp2_stream_flowctl *sfc);
typedef enum {
// Nothing to be done.
GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED = 0,
// Initiate a write to update the initial window immediately.
GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY,
// Push the flow control update into a send buffer, to be sent
// out the next time a write is initiated.
GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE,
} grpc_chttp2_flowctl_urgency;
typedef struct {
grpc_chttp2_flowctl_urgency send_stream_update;
grpc_chttp2_flowctl_urgency send_transport_update;
} grpc_chttp2_flowctl_action;
// Reads the flow control data and returns and actionable struct that will tell
// chttp2 exactly what it needs to do
grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
const grpc_chttp2_transport_flowctl *tfc,
const grpc_chttp2_stream_flowctl *sfc);
// Takes in a flow control action and performs all the needed operations.
void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
grpc_chttp2_flowctl_action action,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
/********* End of Flow Control ***************/
grpc_chttp2_stream *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport *t,
uint32_t id);
grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx,
@ -651,126 +750,22 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
extern grpc_tracer_flag grpc_http_trace;
extern grpc_tracer_flag grpc_flowctl_trace;
#ifndef NDEBUG
#define GRPC_FLOW_CONTROL_IF_TRACING(stmt) \
if (!(GRPC_TRACER_ON(grpc_flowctl_trace))) \
; \
else \
stmt
#else
#define GRPC_FLOW_CONTROL_IF_TRACING(stmt)
#endif
#define GRPC_CHTTP2_IF_TRACING(stmt) \
if (!(GRPC_TRACER_ON(grpc_http_trace))) \
; \
else \
stmt
typedef enum {
GRPC_CHTTP2_FLOWCTL_MOVE,
GRPC_CHTTP2_FLOWCTL_CREDIT,
GRPC_CHTTP2_FLOWCTL_DEBIT
} grpc_chttp2_flowctl_op;
#define GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, transport, id1, id2, dst_context, \
dst_var, src_context, src_var) \
do { \
assert(id1 == id2); \
if (GRPC_TRACER_ON(grpc_flowctl_trace)) { \
grpc_chttp2_flowctl_trace( \
__FILE__, __LINE__, phase, GRPC_CHTTP2_FLOWCTL_MOVE, #dst_context, \
#dst_var, #src_context, #src_var, transport->is_client, id1, \
dst_context->dst_var, src_context->src_var); \
} \
dst_context->dst_var += src_context->src_var; \
src_context->src_var = 0; \
} while (0)
#define GRPC_CHTTP2_FLOW_MOVE_STREAM(phase, transport, dst_context, dst_var, \
src_context, src_var) \
GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, transport, dst_context->id, \
src_context->id, dst_context, dst_var, \
src_context, src_var)
#define GRPC_CHTTP2_FLOW_MOVE_TRANSPORT(phase, dst_context, dst_var, \
src_context, src_var) \
GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, dst_context, 0, 0, dst_context, dst_var, \
src_context, src_var)
#define GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, transport, id, dst_context, \
dst_var, amount) \
do { \
if (GRPC_TRACER_ON(grpc_flowctl_trace)) { \
grpc_chttp2_flowctl_trace(__FILE__, __LINE__, phase, \
GRPC_CHTTP2_FLOWCTL_CREDIT, #dst_context, \
#dst_var, NULL, #amount, transport->is_client, \
id, dst_context->dst_var, amount); \
} \
dst_context->dst_var += amount; \
} while (0)
#define GRPC_CHTTP2_FLOW_CREDIT_STREAM(phase, transport, dst_context, dst_var, \
amount) \
GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, transport, dst_context->id, \
dst_context, dst_var, amount)
#define GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT(phase, dst_context, dst_var, amount) \
GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \
amount)
#define GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE( \
phase, transport, dst_context) \
if (dst_context->incoming_window_delta < 0) { \
transport->stream_total_under_incoming_window += \
dst_context->incoming_window_delta; \
} else if (dst_context->incoming_window_delta > 0) { \
transport->stream_total_over_incoming_window -= \
dst_context->incoming_window_delta; \
}
#define GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE( \
phase, transport, dst_context) \
if (dst_context->incoming_window_delta < 0) { \
transport->stream_total_under_incoming_window -= \
dst_context->incoming_window_delta; \
} else if (dst_context->incoming_window_delta > 0) { \
transport->stream_total_over_incoming_window += \
dst_context->incoming_window_delta; \
}
#define GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA( \
phase, transport, dst_context, amount) \
GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE(phase, transport, \
dst_context); \
GRPC_CHTTP2_FLOW_DEBIT_STREAM(phase, transport, dst_context, \
incoming_window_delta, amount); \
GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE(phase, transport, \
dst_context);
#define GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA( \
phase, transport, dst_context, amount) \
GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE(phase, transport, \
dst_context); \
GRPC_CHTTP2_FLOW_CREDIT_STREAM(phase, transport, dst_context, \
incoming_window_delta, amount); \
GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE(phase, transport, \
dst_context);
#define GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, id, dst_context, \
dst_var, amount) \
do { \
if (GRPC_TRACER_ON(grpc_flowctl_trace)) { \
grpc_chttp2_flowctl_trace(__FILE__, __LINE__, phase, \
GRPC_CHTTP2_FLOWCTL_DEBIT, #dst_context, \
#dst_var, NULL, #amount, transport->is_client, \
id, dst_context->dst_var, amount); \
} \
dst_context->dst_var -= amount; \
} while (0)
#define GRPC_CHTTP2_FLOW_DEBIT_STREAM(phase, transport, dst_context, dst_var, \
amount) \
GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, dst_context->id, \
dst_context, dst_var, amount)
#define GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT(phase, dst_context, dst_var, amount) \
GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \
amount)
void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
grpc_chttp2_flowctl_op op, const char *context1,
const char *var1, const char *context2,
const char *var2, int is_client,
uint32_t stream_id, int64_t val1, int64_t val2);
void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *stream, grpc_error *error);
void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
@ -872,8 +867,6 @@ void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error);
uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t);
/** Set the default keepalive configurations, must only be called at
initialization */
void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args,

@ -349,93 +349,25 @@ void grpc_chttp2_parsing_become_skip_parser(grpc_exec_ctx *exec_ctx,
t->parser == grpc_chttp2_header_parser_parse);
}
static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
uint32_t incoming_frame_size = t->incoming_frame_size;
if (incoming_frame_size > t->incoming_window) {
char *msg;
gpr_asprintf(&msg, "frame of size %d overflows incoming window of %" PRId64,
t->incoming_frame_size, t->incoming_window);
grpc_error *err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg);
return err;
}
if (s != NULL) {
if (incoming_frame_size >
s->incoming_window_delta +
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) {
if (incoming_frame_size <=
s->incoming_window_delta +
t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) {
gpr_log(
GPR_ERROR,
"Incoming frame of size %d exceeds incoming window size of %" PRId64
".\n"
"The (un-acked, future) window size would be %" PRId64
" which is not exceeded.\n"
"This would usually cause a disconnection, but allowing it due to "
"broken HTTP2 implementations in the wild.\n"
"See (for example) https://github.com/netty/netty/issues/6520.",
t->incoming_frame_size,
s->incoming_window_delta +
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
s->incoming_window_delta +
t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
} else {
char *msg;
gpr_asprintf(&msg,
"frame of size %d overflows incoming window of %" PRId64,
t->incoming_frame_size,
s->incoming_window_delta +
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
grpc_error *err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg);
return err;
}
}
GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA("parse", t, s,
incoming_frame_size);
if ((int64_t)s->incoming_window_delta - (int64_t)s->announce_window <=
-(int64_t)t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] /
2) {
grpc_chttp2_become_writable(exec_ctx, t, s,
GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED,
"window-update-required");
}
s->received_bytes += incoming_frame_size;
}
uint32_t target_incoming_window = grpc_chttp2_target_incoming_window(t);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window,
incoming_frame_size);
if (t->incoming_window <= target_incoming_window / 2) {
grpc_chttp2_initiate_write(exec_ctx, t, "flow_control");
}
return GRPC_ERROR_NONE;
}
static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_chttp2_stream *s =
grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
grpc_error *err = GRPC_ERROR_NONE;
err = update_incoming_window(exec_ctx, t, s);
err = grpc_chttp2_flowctl_recv_data(&t->flow_control,
s == NULL ? NULL : &s->flow_control,
t->incoming_frame_size);
grpc_chttp2_act_on_flowctl_action(
exec_ctx, grpc_chttp2_flowctl_get_action(
&t->flow_control, s == NULL ? NULL : &s->flow_control),
t, s);
if (err != GRPC_ERROR_NONE) {
goto error_handler;
}
if (s == NULL) {
return init_skip_frame_parser(exec_ctx, t, 0);
}
s->received_bytes += t->incoming_frame_size;
s->stats.incoming.framing_bytes += 9;
if (err == GRPC_ERROR_NONE && s->read_closed) {
return init_skip_frame_parser(exec_ctx, t, 0);

@ -150,12 +150,17 @@ void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport *t,
void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
GRPC_FLOW_CONTROL_IF_TRACING(
gpr_log(GPR_DEBUG, "stream %u stalled by transport", s->id));
stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
bool ret = stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
GRPC_FLOW_CONTROL_IF_TRACING(if (ret) gpr_log(
GPR_DEBUG, "stream %u un-stalled by transport", (*s)->id));
return ret;
}
void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t,
@ -165,15 +170,23 @@ void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t,
void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
GRPC_FLOW_CONTROL_IF_TRACING(
gpr_log(GPR_DEBUG, "stream %u stalled by stream", s->id));
stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
}
bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
bool ret = stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
GRPC_FLOW_CONTROL_IF_TRACING(
if (ret) gpr_log(GPR_DEBUG, "stream %u un-stalled by stream", (*s)->id));
return ret;
}
bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
bool ret = stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
GRPC_FLOW_CONTROL_IF_TRACING(
if (ret) gpr_log(GPR_DEBUG, "stream %u un-stalled by stream", s->id));
return ret;
}

@ -148,15 +148,6 @@ static bool stream_ref_if_not_destroyed(gpr_refcount *r) {
return true;
}
/* How many bytes of incoming flow control would we like to advertise */
uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t) {
return (uint32_t)GPR_MIN(
(int64_t)((1u << 31) - 1),
t->stream_total_over_incoming_window +
t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
}
/* How many bytes would we like to put on the wire during a single syscall */
static uint32_t target_write_size(grpc_chttp2_transport *t) {
return 1024 * 1024;
@ -201,7 +192,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
&t->hpack_compressor,
t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
if (t->outgoing_window > 0) {
if (t->flow_control.remote_window > 0) {
while (grpc_chttp2_list_pop_stalled_by_transport(t, &s)) {
if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s) &&
stream_ref_if_not_destroyed(&s->refcount->refs)) {
@ -227,10 +218,12 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
bool sent_initial_metadata = s->sent_initial_metadata;
bool now_writing = false;
GRPC_CHTTP2_IF_TRACING(gpr_log(
GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t,
t->is_client ? "CLIENT" : "SERVER", s->id, sent_initial_metadata,
s->send_initial_metadata != NULL, s->announce_window));
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t,
t->is_client ? "CLIENT" : "SERVER", s->id,
sent_initial_metadata, s->send_initial_metadata != NULL,
(int)(s->flow_control.local_window_delta -
s->flow_control.announced_window_delta)));
grpc_mdelem *extra_headers_for_trailing_metadata[2];
size_t num_extra_headers_for_trailing_metadata = 0;
@ -287,11 +280,12 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
sent_initial_metadata = true;
}
/* send any window updates */
if (s->announce_window > 0) {
uint32_t announce = s->announce_window;
grpc_slice_buffer_add(&t->outbuf,
grpc_chttp2_window_update_create(
s->id, s->announce_window, &s->stats.outgoing));
uint32_t stream_announce = grpc_chttp2_flowctl_maybe_send_stream_update(
&t->flow_control, &s->flow_control);
if (stream_announce > 0) {
grpc_slice_buffer_add(
&t->outbuf, grpc_chttp2_window_update_create(s->id, stream_announce,
&s->stats.outgoing));
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
if (!t->is_client) {
@ -299,22 +293,21 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
gpr_inf_past(GPR_CLOCK_MONOTONIC);
t->ping_recv_state.ping_strikes = 0;
}
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, announce_window, announce);
}
if (sent_initial_metadata) {
/* send any body bytes, if allowed by flow control */
if (s->flow_controlled_buffer.length > 0 ||
(s->stream_compression_send_enabled &&
s->compressed_data_buffer->length > 0)) {
uint32_t stream_outgoing_window = (uint32_t)GPR_MAX(
uint32_t stream_remote_window = (uint32_t)GPR_MAX(
0,
s->outgoing_window_delta +
s->flow_control.remote_window_delta +
(int64_t)t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
uint32_t max_outgoing = (uint32_t)GPR_MIN(
t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
GPR_MIN(stream_outgoing_window, t->outgoing_window));
GPR_MIN(stream_remote_window, t->flow_control.remote_window));
if (max_outgoing > 0) {
bool is_last_data_frame = false;
bool is_last_frame = false;
@ -335,10 +328,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
grpc_chttp2_encode_data(s->id, s->compressed_data_buffer,
send_bytes, is_last_frame,
&s->stats.outgoing, &t->outbuf);
GRPC_CHTTP2_FLOW_DEBIT_STREAM(
"write", t, s, outgoing_window_delta, send_bytes);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
send_bytes);
grpc_chttp2_flowctl_sent_data(&t->flow_control,
&s->flow_control, send_bytes);
max_outgoing -= send_bytes;
if (s->compressed_data_buffer->length == 0) {
s->sending_bytes += s->uncompressed_data_size;
@ -367,10 +358,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer,
send_bytes, is_last_frame,
&s->stats.outgoing, &t->outbuf);
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta,
grpc_chttp2_flowctl_sent_data(&t->flow_control, &s->flow_control,
send_bytes);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
send_bytes);
s->sending_bytes += send_bytes;
}
t->ping_state.pings_before_data_required =
@ -396,10 +385,10 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork");
grpc_chttp2_list_add_writable_stream(t, s);
}
} else if (t->outgoing_window == 0) {
} else if (t->flow_control.remote_window == 0) {
grpc_chttp2_list_add_stalled_by_transport(t, s);
now_writing = true;
} else if (stream_outgoing_window == 0) {
} else if (stream_remote_window == 0) {
grpc_chttp2_list_add_stalled_by_stream(t, s);
now_writing = true;
}
@ -453,22 +442,15 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
}
}
/* if the grpc_chttp2_transport is ready to send a window update, do so here
also; 3/4 is a magic number that will likely get tuned soon */
uint32_t target_incoming_window = grpc_chttp2_target_incoming_window(t);
uint32_t threshold_to_send_transport_window_update =
t->outbuf.count > 0 ? 3 * target_incoming_window / 4
: target_incoming_window / 2;
if (t->incoming_window <= threshold_to_send_transport_window_update &&
t->incoming_window != target_incoming_window) {
uint32_t transport_announce =
grpc_chttp2_flowctl_maybe_send_transport_update(&t->flow_control);
if (transport_announce) {
maybe_initiate_ping(exec_ctx, t,
GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE);
uint32_t announced = (uint32_t)GPR_CLAMP(
target_incoming_window - t->incoming_window, 0, UINT32_MAX);
GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("write", t, incoming_window, announced);
grpc_transport_one_way_stats throwaway_stats;
grpc_slice_buffer_add(&t->outbuf, grpc_chttp2_window_update_create(
0, announced, &throwaway_stats));
grpc_slice_buffer_add(
&t->outbuf, grpc_chttp2_window_update_create(0, transport_announce,
&throwaway_stats));
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
if (!t->is_client) {

@ -193,6 +193,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/transport/chttp2/transport/bin_encoder.c',
'src/core/ext/transport/chttp2/transport/chttp2_plugin.c',
'src/core/ext/transport/chttp2/transport/chttp2_transport.c',
'src/core/ext/transport/chttp2/transport/flow_control.c',
'src/core/ext/transport/chttp2/transport/frame_data.c',
'src/core/ext/transport/chttp2/transport/frame_goaway.c',
'src/core/ext/transport/chttp2/transport/frame_ping.c',

@ -391,8 +391,9 @@ static void BM_TransportStreamSend(benchmark::State &state) {
MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
if (!state.KeepRunning()) return;
// force outgoing window to be yuge
s.chttp2_stream()->outgoing_window_delta = 1024 * 1024 * 1024;
f.chttp2_transport()->outgoing_window = 1024 * 1024 * 1024;
s.chttp2_stream()->flow_control.remote_window_delta =
1024 * 1024 * 1024;
f.chttp2_transport()->flow_control.remote_window = 1024 * 1024 * 1024;
grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0);
reset_op();
op.on_complete = c.get();
@ -517,21 +518,22 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
std::unique_ptr<Closure> drain_continue;
grpc_slice recv_slice;
std::unique_ptr<Closure> c =
MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
if (!state.KeepRunning()) return;
// force outgoing window to be yuge
s.chttp2_stream()->incoming_window_delta = 1024 * 1024 * 1024;
f.chttp2_transport()->incoming_window = 1024 * 1024 * 1024;
received = 0;
reset_op();
op.on_complete = do_nothing.get();
op.recv_message = true;
op.payload->recv_message.recv_message = &recv_stream;
op.payload->recv_message.recv_message_ready = drain_start.get();
s.Op(&op);
f.PushInput(grpc_slice_ref(incoming_data));
});
std::unique_ptr<Closure> c = MakeClosure([&](grpc_exec_ctx *exec_ctx,
grpc_error *error) {
if (!state.KeepRunning()) return;
// force outgoing window to be yuge
s.chttp2_stream()->flow_control.local_window_delta = 1024 * 1024 * 1024;
s.chttp2_stream()->flow_control.announced_window_delta = 1024 * 1024 * 1024;
f.chttp2_transport()->flow_control.announced_window = 1024 * 1024 * 1024;
received = 0;
reset_op();
op.on_complete = do_nothing.get();
op.recv_message = true;
op.payload->recv_message.recv_message = &recv_stream;
op.payload->recv_message.recv_message_ready = drain_start.get();
s.Op(&op);
f.PushInput(grpc_slice_ref(incoming_data));
});
drain_start = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
if (recv_stream == NULL) {

@ -73,11 +73,11 @@ class TrickledCHTTP2 : public EndpointPairFixture {
log_.reset(new std::ofstream(fn.str().c_str()));
write_csv(log_.get(), "t", "iteration", "client_backlog",
"server_backlog", "client_t_stall", "client_s_stall",
"server_t_stall", "server_s_stall", "client_t_outgoing",
"server_t_outgoing", "client_t_incoming", "server_t_incoming",
"client_s_outgoing_delta", "server_s_outgoing_delta",
"client_s_incoming_delta", "server_s_incoming_delta",
"client_s_announce_window", "server_s_announce_window",
"server_t_stall", "server_s_stall", "client_t_remote",
"server_t_remote", "client_t_announced", "server_t_announced",
"client_s_remote_delta", "server_s_remote_delta",
"client_s_local_delta", "server_s_local_delta",
"client_s_announced_delta", "server_s_announced_delta",
"client_peer_iws", "client_local_iws", "client_sent_iws",
"client_acked_iws", "server_peer_iws", "server_local_iws",
"server_sent_iws", "server_acked_iws", "client_queued_bytes",
@ -127,14 +127,15 @@ class TrickledCHTTP2 : public EndpointPairFixture {
client->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr,
server->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr,
server->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr,
client->outgoing_window, server->outgoing_window,
client->incoming_window, server->incoming_window,
client_stream ? client_stream->outgoing_window_delta : -1,
server_stream ? server_stream->outgoing_window_delta : -1,
client_stream ? client_stream->incoming_window_delta : -1,
server_stream ? server_stream->incoming_window_delta : -1,
client_stream ? client_stream->announce_window : -1,
server_stream ? server_stream->announce_window : -1,
client->flow_control.remote_window, server->flow_control.remote_window,
client->flow_control.announced_window,
server->flow_control.announced_window,
client_stream ? client_stream->flow_control.remote_window_delta : -1,
server_stream ? server_stream->flow_control.remote_window_delta : -1,
client_stream ? client_stream->flow_control.local_window_delta : -1,
server_stream ? server_stream->flow_control.local_window_delta : -1,
client_stream ? client_stream->flow_control.announced_window_delta : -1,
server_stream ? server_stream->flow_control.announced_window_delta : -1,
client->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
client->settings[GRPC_LOCAL_SETTINGS]

@ -1014,6 +1014,7 @@ src/core/ext/transport/chttp2/transport/bin_encoder.h \
src/core/ext/transport/chttp2/transport/chttp2_plugin.c \
src/core/ext/transport/chttp2/transport/chttp2_transport.c \
src/core/ext/transport/chttp2/transport/chttp2_transport.h \
src/core/ext/transport/chttp2/transport/flow_control.c \
src/core/ext/transport/chttp2/transport/frame.h \
src/core/ext/transport/chttp2/transport/frame_data.c \
src/core/ext/transport/chttp2/transport/frame_data.h \

@ -8808,6 +8808,7 @@
"src/core/ext/transport/chttp2/transport/chttp2_plugin.c",
"src/core/ext/transport/chttp2/transport/chttp2_transport.c",
"src/core/ext/transport/chttp2/transport/chttp2_transport.h",
"src/core/ext/transport/chttp2/transport/flow_control.c",
"src/core/ext/transport/chttp2/transport/frame.h",
"src/core/ext/transport/chttp2/transport/frame_data.c",
"src/core/ext/transport/chttp2/transport/frame_data.h",

@ -784,6 +784,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\chttp2_transport.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\flow_control.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\frame_data.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\frame_goaway.c">

@ -397,6 +397,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\chttp2_transport.c">
<Filter>src\core\ext\transport\chttp2\transport</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\flow_control.c">
<Filter>src\core\ext\transport\chttp2\transport</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\frame_data.c">
<Filter>src\core\ext\transport\chttp2\transport</Filter>
</ClCompile>

@ -571,6 +571,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\chttp2_transport.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\flow_control.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\frame_data.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\frame_goaway.c">

@ -514,6 +514,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\chttp2_transport.c">
<Filter>src\core\ext\transport\chttp2\transport</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\flow_control.c">
<Filter>src\core\ext\transport\chttp2\transport</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\frame_data.c">
<Filter>src\core\ext\transport\chttp2\transport</Filter>
</ClCompile>

@ -559,6 +559,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\chttp2_transport.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\flow_control.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\frame_data.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\frame_goaway.c">

@ -499,6 +499,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\chttp2_transport.c">
<Filter>src\core\ext\transport\chttp2\transport</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\flow_control.c">
<Filter>src\core\ext\transport\chttp2\transport</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\frame_data.c">
<Filter>src\core\ext\transport\chttp2\transport</Filter>
</ClCompile>

@ -752,6 +752,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\chttp2_transport.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\flow_control.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\frame_data.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\frame_goaway.c">

@ -403,6 +403,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\chttp2_transport.c">
<Filter>src\core\ext\transport\chttp2\transport</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\flow_control.c">
<Filter>src\core\ext\transport\chttp2\transport</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\transport\frame_data.c">
<Filter>src\core\ext\transport\chttp2\transport</Filter>
</ClCompile>

Loading…
Cancel
Save