Pull out flow control objects

pull/11720/head
ncteisen 8 years ago
parent 2d1c61699e
commit 3c909d55fd
  1. 52
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 291
      src/core/ext/transport/chttp2/transport/flow_control.c
  3. 4
      src/core/ext/transport/chttp2/transport/frame_settings.c
  4. 12
      src/core/ext/transport/chttp2/transport/frame_window_update.c
  5. 138
      src/core/ext/transport/chttp2/transport/internal.h
  6. 18
      src/core/ext/transport/chttp2/transport/parsing.c
  7. 27
      src/core/ext/transport/chttp2/transport/writing.c
  8. 15
      test/cpp/microbenchmarks/bm_chttp2_transport.cc
  9. 17
      test/cpp/microbenchmarks/bm_fullstack_trickle.cc

@ -265,8 +265,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->endpoint_reading = 1; t->endpoint_reading = 1;
t->next_stream_id = is_client ? 1 : 2; t->next_stream_id = is_client ? 1 : 2;
t->is_client = is_client; t->is_client = is_client;
t->remote_window = DEFAULT_WINDOW; t->flow_control.remote_window = DEFAULT_WINDOW;
t->announced_window = DEFAULT_WINDOW; t->flow_control.announced_window = DEFAULT_WINDOW;
#ifndef NDEBUG
t->flow_control.t = t;
#endif
t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->is_first_frame = true; t->is_first_frame = true;
grpc_connectivity_state_init( grpc_connectivity_state_init(
@ -705,6 +708,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
post_destructive_reclaimer(exec_ctx, t); post_destructive_reclaimer(exec_ctx, t);
} }
#ifndef NDEBUG
s->flow_control.s = s;
#endif
GPR_TIMER_END("init_stream", 0); GPR_TIMER_END("init_stream", 0);
return 0; return 0;
@ -753,7 +760,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GRPC_ERROR_UNREF(s->write_closed_error); GRPC_ERROR_UNREF(s->write_closed_error);
GRPC_ERROR_UNREF(s->byte_stream_error); GRPC_ERROR_UNREF(s->byte_stream_error);
grpc_chttp2_flowctl_destroy_stream(s); grpc_chttp2_flowctl_destroy_stream(&t->flow_control, &s->flow_control);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream");
@ -1449,9 +1456,20 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
already_received = s->frame_storage.length + already_received = s->frame_storage.length +
s->unprocessed_incoming_frames_buffer.length; s->unprocessed_incoming_frames_buffer.length;
} }
grpc_chttp2_flowctl_incoming_bs_update(t, s, 5, already_received); if (!s->read_closed) {
grpc_chttp2_flowctl_incoming_bs_update(
&t->flow_control, &s->flow_control,
t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
5, already_received);
grpc_chttp2_flowctl_act_on_action( grpc_chttp2_flowctl_act_on_action(
exec_ctx, grpc_chttp2_flowctl_get_action(t, s), t, s); exec_ctx,
grpc_chttp2_flowctl_get_action(
&t->flow_control, &s->flow_control, false,
t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
t, s);
}
} }
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
} }
@ -2252,8 +2270,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END("reading_action.parse", 0); GPR_TIMER_END("reading_action.parse", 0);
GPR_TIMER_BEGIN("post_parse_locked", 0); GPR_TIMER_BEGIN("post_parse_locked", 0);
if (t->initial_window_update != 0) { if (t->flow_control.initial_window_update != 0) {
if (t->initial_window_update > 0) { if (t->flow_control.initial_window_update > 0) {
grpc_chttp2_stream *s; grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
grpc_chttp2_become_writable( grpc_chttp2_become_writable(
@ -2261,7 +2279,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
"unstalled"); "unstalled");
} }
} }
t->initial_window_update = 0; t->flow_control.initial_window_update = 0;
} }
GPR_TIMER_END("post_parse_locked", 0); GPR_TIMER_END("post_parse_locked", 0);
} }
@ -2543,11 +2561,19 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s = bs->stream; grpc_chttp2_stream *s = bs->stream;
size_t cur_length = s->frame_storage.length; size_t cur_length = s->frame_storage.length;
grpc_chttp2_flowctl_incoming_bs_update(t, s, bs->next_action.max_size_hint, if (!s->read_closed) {
cur_length); grpc_chttp2_flowctl_incoming_bs_update(
grpc_chttp2_flowctl_act_on_action(exec_ctx, &t->flow_control, &s->flow_control,
grpc_chttp2_flowctl_get_action(t, s), t, s); t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
bs->next_action.max_size_hint, cur_length);
grpc_chttp2_flowctl_act_on_action(
exec_ctx, grpc_chttp2_flowctl_get_action(
&t->flow_control, &s->flow_control, false,
t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
t, s);
}
GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
if (s->frame_storage.length > 0) { if (s->frame_storage.length > 0) {
grpc_slice_buffer_swap(&s->frame_storage, grpc_slice_buffer_swap(&s->frame_storage,

@ -28,7 +28,7 @@
#include "src/core/lib/support/string.h" #include "src/core/lib/support/string.h"
static uint32_t grpc_chttp2_target_announced_window( static uint32_t grpc_chttp2_target_announced_window(
const grpc_chttp2_transport* t); const grpc_chttp2_transport_flowctl* tfc, uint32_t acked_local_window);
#ifndef NDEBUG #ifndef NDEBUG
@ -41,15 +41,18 @@ typedef struct {
int64_t announced_window_delta; int64_t announced_window_delta;
} shadow_flow_control; } shadow_flow_control;
static void pretrace(shadow_flow_control* sfc, grpc_chttp2_transport* t, static void pretrace(shadow_flow_control* shadow_fc,
grpc_chttp2_stream* s) { grpc_chttp2_transport_flowctl* tfc,
sfc->remote_window = t->remote_window; grpc_chttp2_stream_flowctl* sfc) {
sfc->target_window = grpc_chttp2_target_announced_window(t); shadow_fc->remote_window = tfc->remote_window;
sfc->announced_window = t->announced_window; shadow_fc->target_window = grpc_chttp2_target_announced_window(
if (s != NULL) { tfc, tfc->t->settings[GRPC_ACKED_SETTINGS]
sfc->remote_window_delta = s->remote_window_delta; [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
sfc->local_window_delta = s->local_window_delta; shadow_fc->announced_window = tfc->announced_window;
sfc->announced_window_delta = s->announced_window_delta; if (sfc != NULL) {
shadow_fc->remote_window_delta = sfc->remote_window_delta;
shadow_fc->local_window_delta = sfc->local_window_delta;
shadow_fc->announced_window_delta = sfc->announced_window_delta;
} }
} }
@ -65,35 +68,39 @@ static char* fmt_str(int64_t old, int64_t new) {
return str_lp; return str_lp;
} }
static void posttrace(shadow_flow_control* sfc, grpc_chttp2_transport* t, static void posttrace(shadow_flow_control* shadow_fc,
grpc_chttp2_stream* s, char* reason) { grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc, char* reason) {
uint32_t acked_local_window = uint32_t acked_local_window =
t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; tfc->t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
uint32_t remote_window = uint32_t remote_window =
t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; tfc->t->settings[GRPC_PEER_SETTINGS]
char* trw_str = fmt_str(sfc->remote_window, t->remote_window); [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
char* trw_str = fmt_str(shadow_fc->remote_window, tfc->remote_window);
char* tlw_str = char* tlw_str =
fmt_str(sfc->target_window, grpc_chttp2_target_announced_window(t)); fmt_str(shadow_fc->target_window,
char* taw_str = fmt_str(sfc->announced_window, t->announced_window); grpc_chttp2_target_announced_window(tfc, acked_local_window));
char* taw_str = fmt_str(shadow_fc->announced_window, tfc->announced_window);
char* srw_str; char* srw_str;
char* slw_str; char* slw_str;
char* saw_str; char* saw_str;
if (s != NULL) { if (sfc != NULL) {
srw_str = fmt_str(sfc->remote_window_delta + remote_window, srw_str = fmt_str(shadow_fc->remote_window_delta + remote_window,
s->remote_window_delta + remote_window); sfc->remote_window_delta + remote_window);
slw_str = fmt_str(sfc->local_window_delta + acked_local_window, slw_str = fmt_str(shadow_fc->local_window_delta + acked_local_window,
s->local_window_delta + acked_local_window); sfc->local_window_delta + acked_local_window);
saw_str = fmt_str(sfc->announced_window_delta + acked_local_window, saw_str = fmt_str(shadow_fc->announced_window_delta + acked_local_window,
s->announced_window_delta + acked_local_window); sfc->announced_window_delta + acked_local_window);
} else { } else {
srw_str = gpr_leftpad("", ' ', 30); srw_str = gpr_leftpad("", ' ', 30);
slw_str = gpr_leftpad("", ' ', 30); slw_str = gpr_leftpad("", ' ', 30);
saw_str = gpr_leftpad("", ' ', 30); saw_str = gpr_leftpad("", ' ', 30);
} }
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s", t, "%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s",
s != NULL ? s->id : 0, t->is_client ? "cli" : "svr", reason, trw_str, tfc, sfc != NULL ? sfc->s->id : 0, tfc->t->is_client ? "cli" : "svr",
tlw_str, taw_str, srw_str, slw_str, saw_str); reason, trw_str, tlw_str, taw_str, srw_str, slw_str, saw_str);
gpr_free(trw_str); gpr_free(trw_str);
gpr_free(tlw_str); gpr_free(tlw_str);
gpr_free(taw_str); gpr_free(taw_str);
@ -122,190 +129,190 @@ static void trace_action(grpc_chttp2_flowctl_action action) {
urgency_to_string(action.send_stream_update)); urgency_to_string(action.send_stream_update));
} }
#define PRETRACE(t, s) \ #define PRETRACE(tfc, sfc) \
shadow_flow_control sfc; \ shadow_flow_control shadow_fc; \
GRPC_FLOW_CONTROL_IF_TRACING(pretrace(&sfc, t, s)) GRPC_FLOW_CONTROL_IF_TRACING(pretrace(&shadow_fc, tfc, sfc))
#define POSTTRACE(t, s, reason) \ #define POSTTRACE(tfc, sfc, reason) \
GRPC_FLOW_CONTROL_IF_TRACING(posttrace(&sfc, t, s, reason)) GRPC_FLOW_CONTROL_IF_TRACING(posttrace(&shadow_fc, tfc, sfc, reason))
#define TRACEACTION(action) GRPC_FLOW_CONTROL_IF_TRACING(trace_action(action)) #define TRACEACTION(action) GRPC_FLOW_CONTROL_IF_TRACING(trace_action(action))
#else #else
#define PRETRACE(t, s) #define PRETRACE(tfc, sfc)
#define POSTTRACE(t, s, reason) #define POSTTRACE(tfc, sfc, reason)
#define TRACEACTION(action) #define TRACEACTION(action)
#endif #endif
/* How many bytes of incoming flow control would we like to advertise */ /* How many bytes of incoming flow control would we like to advertise */
static uint32_t grpc_chttp2_target_announced_window( static uint32_t grpc_chttp2_target_announced_window(
const grpc_chttp2_transport* t) { const grpc_chttp2_transport_flowctl* tfc, uint32_t acked_init_window) {
return (uint32_t)GPR_MIN( return (uint32_t)GPR_MIN(
(int64_t)((1u << 31) - 1), (int64_t)((1u << 31) - 1),
t->announced_stream_total_over_incoming_window + tfc->announced_stream_total_over_incoming_window + acked_init_window);
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
} }
// we have sent data on the wire, we must track this in our bookkeeping for the // we have sent data on the wire, we must track this in our bookkeeping for the
// remote peer's flow control. // remote peer's flow control.
void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport* t, void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream* s, int64_t size) { grpc_chttp2_stream_flowctl* sfc,
PRETRACE(t, s); int64_t size) {
t->remote_window -= size; PRETRACE(tfc, sfc);
s->remote_window_delta -= size; tfc->remote_window -= size;
POSTTRACE(t, s, " data sent"); sfc->remote_window_delta -= size;
} POSTTRACE(tfc, sfc, " data sent");
}
static void announced_window_delta_preupdate(grpc_chttp2_transport* t,
grpc_chttp2_stream* s) { static void announced_window_delta_preupdate(grpc_chttp2_transport_flowctl* tfc,
if (s->announced_window_delta > 0) { grpc_chttp2_stream_flowctl* sfc) {
t->announced_stream_total_over_incoming_window -= s->announced_window_delta; if (sfc->announced_window_delta > 0) {
tfc->announced_stream_total_over_incoming_window -=
sfc->announced_window_delta;
} else { } else {
t->announced_stream_total_under_incoming_window += tfc->announced_stream_total_under_incoming_window +=
-s->announced_window_delta; -sfc->announced_window_delta;
} }
} }
static void announced_window_delta_postupdate(grpc_chttp2_transport* t, static void announced_window_delta_postupdate(
grpc_chttp2_stream* s) { grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) {
if (s->announced_window_delta > 0) { if (sfc->announced_window_delta > 0) {
t->announced_stream_total_over_incoming_window += s->announced_window_delta; tfc->announced_stream_total_over_incoming_window +=
sfc->announced_window_delta;
} else { } else {
t->announced_stream_total_under_incoming_window -= tfc->announced_stream_total_under_incoming_window -=
-s->announced_window_delta; -sfc->announced_window_delta;
} }
} }
// We have received data from the wire. We must track this in our own flow // We have received data from the wire. We must track this in our own flow
// control bookkeeping. // control bookkeeping.
// Returns an error if the incoming frame violates our flow control. // Returns an error if the incoming frame violates our flow control.
grpc_error* grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport* t, grpc_error* grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream* s, grpc_chttp2_stream_flowctl* sfc,
int64_t incoming_frame_size) { int64_t incoming_frame_size,
PRETRACE(t, s); uint32_t acked_init_window,
if (incoming_frame_size > t->announced_window) { uint32_t sent_init_window) {
PRETRACE(tfc, sfc);
if (incoming_frame_size > tfc->announced_window) {
char* msg; char* msg;
gpr_asprintf(&msg, "frame of size %d overflows local window of %" PRId64, gpr_asprintf(&msg,
t->incoming_frame_size, t->announced_window); "frame of size %" PRId64 " overflows local window of %" PRId64,
incoming_frame_size, tfc->announced_window);
grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg); gpr_free(msg);
return err; return err;
} }
// TODO(ncteisen): can this ever be null? ANSWER: only when incoming frame if (sfc != NULL) {
// size is zero?
if (s != NULL) {
int64_t acked_stream_window = int64_t acked_stream_window =
s->announced_window_delta + sfc->announced_window_delta + acked_init_window;
t->settings[GRPC_ACKED_SETTINGS] int64_t sent_stream_window = sfc->announced_window_delta + sent_init_window;
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
int64_t sent_stream_window =
s->announced_window_delta +
t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
if (incoming_frame_size > acked_stream_window) { if (incoming_frame_size > acked_stream_window) {
if (incoming_frame_size <= sent_stream_window) { if (incoming_frame_size <= sent_stream_window) {
gpr_log( gpr_log(
GPR_ERROR, GPR_ERROR,
"Incoming frame of size %d exceeds local window size of %" PRId64 "Incoming frame of size %" PRId64
" exceeds local window size of %" PRId64
".\n" ".\n"
"The (un-acked, future) window size would be %" PRId64 "The (un-acked, future) window size would be %" PRId64
" which is not exceeded.\n" " which is not exceeded.\n"
"This would usually cause a disconnection, but allowing it due to" "This would usually cause a disconnection, but allowing it due to"
"broken HTTP2 implementations in the wild.\n" "broken HTTP2 implementations in the wild.\n"
"See (for example) https://github.com/netty/netty/issues/6520.", "See (for example) https://github.com/netty/netty/issues/6520.",
t->incoming_frame_size, acked_stream_window, sent_stream_window); incoming_frame_size, acked_stream_window, sent_stream_window);
} else { } else {
char* msg; char* msg;
gpr_asprintf(&msg, gpr_asprintf(&msg, "frame of size %" PRId64
"frame of size %d overflows local window of %" PRId64, " overflows local window of %" PRId64,
t->incoming_frame_size, acked_stream_window); incoming_frame_size, acked_stream_window);
grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg); gpr_free(msg);
return err; return err;
} }
} }
announced_window_delta_preupdate(t, s); announced_window_delta_preupdate(tfc, sfc);
s->announced_window_delta -= incoming_frame_size; sfc->announced_window_delta -= incoming_frame_size;
announced_window_delta_postupdate(t, s); announced_window_delta_postupdate(tfc, sfc);
s->local_window_delta -= incoming_frame_size; sfc->local_window_delta -= incoming_frame_size;
s->received_bytes += incoming_frame_size;
} }
t->announced_window -= incoming_frame_size; tfc->announced_window -= incoming_frame_size;
POSTTRACE(t, s, " data recv"); POSTTRACE(tfc, sfc, " data recv");
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
// Returns a non zero announce integer if we should send a transport window // Returns a non zero announce integer if we should send a transport window
// update // update
uint32_t grpc_chttp2_flowctl_maybe_send_transport_update( uint32_t grpc_chttp2_flowctl_maybe_send_transport_update(
grpc_chttp2_transport* t) { grpc_chttp2_transport_flowctl* tfc, uint32_t acked_init_window,
PRETRACE(t, NULL); bool has_outbuf) {
uint32_t target_announced_window = grpc_chttp2_target_announced_window(t); PRETRACE(tfc, NULL);
uint32_t target_announced_window =
grpc_chttp2_target_announced_window(tfc, acked_init_window);
uint32_t threshold_to_send_transport_window_update = uint32_t threshold_to_send_transport_window_update =
t->outbuf.count > 0 ? 3 * target_announced_window / 4 has_outbuf ? 3 * target_announced_window / 4
: target_announced_window / 2; : target_announced_window / 2;
if (t->announced_window <= threshold_to_send_transport_window_update && if (tfc->announced_window <= threshold_to_send_transport_window_update &&
t->announced_window != target_announced_window) { tfc->announced_window != target_announced_window) {
uint32_t announce = (uint32_t)GPR_CLAMP( uint32_t announce = (uint32_t)GPR_CLAMP(
target_announced_window - t->announced_window, 0, UINT32_MAX); target_announced_window - tfc->announced_window, 0, UINT32_MAX);
t->announced_window += announce; tfc->announced_window += announce;
POSTTRACE(t, NULL, "t updt sent"); POSTTRACE(tfc, NULL, "t updt sent");
return announce; return announce;
} }
GRPC_FLOW_CONTROL_IF_TRACING( GRPC_FLOW_CONTROL_IF_TRACING(
gpr_log(GPR_DEBUG, "%p[0][%s] will not to send transport update", t, gpr_log(GPR_DEBUG, "%p[0][%s] will not send transport update", tfc,
t->is_client ? "cli" : "svr")); tfc->t->is_client ? "cli" : "svr"));
return 0; return 0;
} }
// Returns a non zero announce integer if we should send a stream window update // Returns a non zero announce integer if we should send a stream window update
uint32_t grpc_chttp2_flowctl_maybe_send_stream_update(grpc_chttp2_stream* s) { uint32_t grpc_chttp2_flowctl_maybe_send_stream_update(
PRETRACE(s->t, s); grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) {
if (s->local_window_delta > s->announced_window_delta) { PRETRACE(tfc, sfc);
if (sfc->local_window_delta > sfc->announced_window_delta) {
uint32_t announce = (uint32_t)GPR_CLAMP( uint32_t announce = (uint32_t)GPR_CLAMP(
s->local_window_delta - s->announced_window_delta, 0, UINT32_MAX); sfc->local_window_delta - sfc->announced_window_delta, 0, UINT32_MAX);
announced_window_delta_preupdate(s->t, s); announced_window_delta_preupdate(tfc, sfc);
s->announced_window_delta += announce; sfc->announced_window_delta += announce;
announced_window_delta_postupdate(s->t, s); announced_window_delta_postupdate(tfc, sfc);
POSTTRACE(s->t, s, "s updt sent"); POSTTRACE(tfc, sfc, "s updt sent");
return announce; return announce;
} }
GRPC_FLOW_CONTROL_IF_TRACING( GRPC_FLOW_CONTROL_IF_TRACING(
gpr_log(GPR_DEBUG, "%p[%u][%s] will not to send stream update", s->t, gpr_log(GPR_DEBUG, "%p[%u][%s] will not send stream update", tfc,
s->id, s->t->is_client ? "cli" : "svr")); sfc->s->id, tfc->t->is_client ? "cli" : "svr"));
return 0; return 0;
} }
// we have received a WINDOW_UPDATE frame for a transport // we have received a WINDOW_UPDATE frame for a transport
void grpc_chttp2_flowctl_recv_transport_update(grpc_chttp2_transport* t, void grpc_chttp2_flowctl_recv_transport_update(
uint32_t size) { grpc_chttp2_transport_flowctl* tfc, uint32_t size) {
PRETRACE(t, NULL); PRETRACE(tfc, NULL);
t->remote_window += size; tfc->remote_window += size;
POSTTRACE(t, NULL, "t updt recv"); POSTTRACE(tfc, NULL, "t updt recv");
} }
// we have received a WINDOW_UPDATE frame for a stream // we have received a WINDOW_UPDATE frame for a stream
void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_stream* s, void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc,
uint32_t size) { uint32_t size) {
PRETRACE(s->t, s); PRETRACE(tfc, sfc);
s->remote_window_delta += size; sfc->remote_window_delta += size;
POSTTRACE(s->t, s, "s updt recv"); POSTTRACE(tfc, sfc, "s updt recv");
} }
void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport* t, void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream* s, grpc_chttp2_stream_flowctl* sfc,
uint32_t sent_init_window,
size_t max_size_hint, size_t max_size_hint,
size_t have_already) { size_t have_already) {
PRETRACE(t, s); PRETRACE(tfc, sfc);
uint32_t max_recv_bytes; uint32_t max_recv_bytes;
uint32_t initial_window_size =
t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
/* clamp max recv hint to an allowable size */ /* clamp max recv hint to an allowable size */
if (max_size_hint >= UINT32_MAX - initial_window_size) { if (max_size_hint >= UINT32_MAX - sent_init_window) {
max_recv_bytes = UINT32_MAX - initial_window_size; max_recv_bytes = UINT32_MAX - sent_init_window;
} else { } else {
max_recv_bytes = (uint32_t)max_size_hint; max_recv_bytes = (uint32_t)max_size_hint;
} }
@ -318,34 +325,38 @@ void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport* t,
} }
/* add some small lookahead to keep pipelines flowing */ /* add some small lookahead to keep pipelines flowing */
GPR_ASSERT(max_recv_bytes <= UINT32_MAX - initial_window_size); GPR_ASSERT(max_recv_bytes <= UINT32_MAX - sent_init_window);
if (s->local_window_delta < max_recv_bytes && !s->read_closed) { if (sfc->local_window_delta < max_recv_bytes) {
uint32_t add_max_recv_bytes = uint32_t add_max_recv_bytes =
(uint32_t)(max_recv_bytes - s->local_window_delta); (uint32_t)(max_recv_bytes - sfc->local_window_delta);
s->local_window_delta += add_max_recv_bytes; sfc->local_window_delta += add_max_recv_bytes;
} }
POSTTRACE(t, s, "app st recv"); POSTTRACE(tfc, sfc, "app st recv");
} }
void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_stream* s) { void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl* tfc,
announced_window_delta_preupdate(s->t, s); grpc_chttp2_stream_flowctl* sfc) {
announced_window_delta_preupdate(tfc, sfc);
} }
grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
const grpc_chttp2_transport* t, const grpc_chttp2_stream* s) { const grpc_chttp2_transport_flowctl* tfc,
const grpc_chttp2_stream_flowctl* sfc, bool stream_read_closed,
uint32_t sent_init_window) {
grpc_chttp2_flowctl_action action; grpc_chttp2_flowctl_action action;
memset(&action, 0, sizeof(action)); memset(&action, 0, sizeof(action));
uint32_t target_announced_window = grpc_chttp2_target_announced_window(t); uint32_t target_announced_window =
int64_t init_window = grpc_chttp2_target_announced_window(tfc, sent_init_window);
t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; if (tfc->announced_window < target_announced_window / 2) {
if (t->announced_window < target_announced_window / 2) {
action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY; action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY;
} }
if (s != NULL && !s->read_closed) { if (sfc != NULL && !stream_read_closed) {
if ((int64_t)s->local_window_delta > (int64_t)s->announced_window_delta && if ((int64_t)sfc->local_window_delta >
(int64_t)s->announced_window_delta <= -init_window / 2) { (int64_t)sfc->announced_window_delta &&
(int64_t)sfc->announced_window_delta + sent_init_window <=
sent_init_window / 2) {
action.send_stream_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY; action.send_stream_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY;
} else if (s->local_window_delta > s->announced_window_delta) { } else if (sfc->local_window_delta > sfc->announced_window_delta) {
action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE; action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE;
} }
} }

@ -201,13 +201,13 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p,
} }
if (id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE && if (id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE &&
parser->incoming_settings[id] != parser->value) { parser->incoming_settings[id] != parser->value) {
t->initial_window_update += t->flow_control.initial_window_update +=
(int64_t)parser->value - parser->incoming_settings[id]; (int64_t)parser->value - parser->incoming_settings[id];
if (GRPC_TRACER_ON(grpc_http_trace) || if (GRPC_TRACER_ON(grpc_http_trace) ||
GRPC_TRACER_ON(grpc_flowctl_trace)) { GRPC_TRACER_ON(grpc_flowctl_trace)) {
gpr_log(GPR_DEBUG, "%p[%s] adding %d for initial_window change", gpr_log(GPR_DEBUG, "%p[%s] adding %d for initial_window change",
t, t->is_client ? "cli" : "svr", t, t->is_client ? "cli" : "svr",
(int)t->initial_window_update); (int)t->flow_control.initial_window_update);
} }
} }
parser->incoming_settings[id] = parser->value; parser->incoming_settings[id] = parser->value;

@ -95,8 +95,8 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
if (t->incoming_stream_id != 0) { if (t->incoming_stream_id != 0) {
if (s != NULL) { if (s != NULL) {
grpc_chttp2_flowctl_recv_stream_update(s, received_update); grpc_chttp2_flowctl_recv_stream_update(
// TODO(control bits) &t->flow_control, &s->flow_control, received_update);
if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) { if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) {
grpc_chttp2_become_writable( grpc_chttp2_become_writable(
exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED, exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED,
@ -104,10 +104,10 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
} }
} }
} else { } else {
bool was_zero = t->remote_window <= 0; bool was_zero = t->flow_control.remote_window <= 0;
grpc_chttp2_flowctl_recv_transport_update(t, received_update); grpc_chttp2_flowctl_recv_transport_update(&t->flow_control,
// TODO(control bits) received_update);
bool is_zero = t->remote_window <= 0; bool is_zero = t->flow_control.remote_window <= 0;
if (was_zero && !is_zero) { if (was_zero && !is_zero) {
grpc_chttp2_initiate_write(exec_ctx, t, "new_global_flow_control"); grpc_chttp2_initiate_write(exec_ctx, t, "new_global_flow_control");
} }

@ -213,6 +213,37 @@ typedef enum {
GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED, GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
} grpc_chttp2_keepalive_state; } grpc_chttp2_keepalive_state;
typedef struct {
/** initial window change. This is tracked as we parse settings frames from
* the remote peer. If there is a positive delta, then we will make all
* streams readable since they may have become unstalled */
int64_t initial_window_update;
/** Our bookkeeping for the remote peer's available window */
int64_t remote_window;
/** calculating what we should give for local window:
we track the total amount of flow control over initial window size
across all streams: this is data that we want to receive right now (it
has an outstanding read)
and the total amount of flow control under initial window size across all
streams: this is data we've read early
we want to adjust incoming_window such that:
incoming_window = total_over - max(bdp - total_under, 0) */
int64_t announced_stream_total_over_incoming_window;
int64_t announced_stream_total_under_incoming_window;
/** This is out window according to what we have sent to our remote peer. The
* difference between this and target window is what we use to decide when
* to send WINDOW_UPDATE frames. */
int64_t announced_window;
// pointer back to transport for tracing
#ifndef NDEBUG
const grpc_chttp2_transport *t;
#endif
} grpc_chttp2_transport_flowctl;
struct grpc_chttp2_transport { struct grpc_chttp2_transport {
grpc_transport base; /* must be first */ grpc_transport base; /* must be first */
gpr_refcount refs; gpr_refcount refs;
@ -327,31 +358,7 @@ struct grpc_chttp2_transport {
/** parser for goaway frames */ /** parser for goaway frames */
grpc_chttp2_goaway_parser goaway_parser; grpc_chttp2_goaway_parser goaway_parser;
/*********** Flow Control **************/ grpc_chttp2_transport_flowctl flow_control;
/** initial window change. This is tracked as we parse settings frames from
* the remote peer. If there is a positive delta, then we will make all
* streams readable since they may have become unstalled */
int64_t initial_window_update;
/** Our bookkeeping for the remote peer's available window */
int64_t remote_window;
/** calculating what we should give for local window:
we track the total amount of flow control over initial window size
across all streams: this is data that we want to receive right now (it
has an outstanding read)
and the total amount of flow control under initial window size across all
streams: this is data we've read early
we want to adjust incoming_window such that:
incoming_window = total_over - max(bdp - total_under, 0) */
int64_t announced_stream_total_over_incoming_window;
int64_t announced_stream_total_under_incoming_window;
/** This is out window according to what we have sent to our remote peer. The
* difference between this and target window is what we use to decide when
* to send WINDOW_UPDATE frames. */
int64_t announced_window;
/* bdp estimation */ /* bdp estimation */
grpc_bdp_estimator bdp_estimator; grpc_bdp_estimator bdp_estimator;
@ -360,8 +367,6 @@ struct grpc_chttp2_transport {
grpc_pid_controller pid_controller; grpc_pid_controller pid_controller;
gpr_timespec last_pid_update; gpr_timespec last_pid_update;
/*********** End of Flow Control **************/
/* deframing */ /* deframing */
grpc_chttp2_deframe_transport_state deframe_state; grpc_chttp2_deframe_transport_state deframe_state;
uint8_t incoming_frame_type; uint8_t incoming_frame_type;
@ -437,6 +442,27 @@ typedef enum {
GPRC_METADATA_PUBLISHED_AT_CLOSE GPRC_METADATA_PUBLISHED_AT_CLOSE
} grpc_published_metadata_method; } grpc_published_metadata_method;
typedef struct {
/** window available for us to send to peer, over or under the initial window
* size of the transport... ie:
* remote_window = remote_window_delta + transport.initial_window_size */
int64_t remote_window_delta;
/** window available for peer to send to us (as a delta on
* transport.initial_window_size)
* local_window = local_window_delta + transport.initial_window_size */
int64_t local_window_delta;
/** window available for peer to send to us over this stream that we have
* announced to the peer */
int64_t announced_window_delta;
// pointer back to stream for tracing
#ifndef NDEBUG
const grpc_chttp2_stream *s;
#endif
} grpc_chttp2_stream_flowctl;
struct grpc_chttp2_stream { struct grpc_chttp2_stream {
grpc_chttp2_transport *t; grpc_chttp2_transport *t;
grpc_stream_refcount *refcount; grpc_stream_refcount *refcount;
@ -527,23 +553,7 @@ struct grpc_chttp2_stream {
bool sent_initial_metadata; bool sent_initial_metadata;
bool sent_trailing_metadata; bool sent_trailing_metadata;
/*********** Flow Control ***********/ grpc_chttp2_stream_flowctl flow_control;
/** window available for us to send to peer, over or under the initial window
* size of the transport... ie:
* remote_window = remote_window_delta + transport.initial_window_size */
int64_t remote_window_delta;
/** window available for peer to send to us (as a delta on
* transport.initial_window_size)
* local_window = local_window_delta + transport.initial_window_size */
int64_t local_window_delta;
/** window available for peer to send to us over this stream that we have
* announced to the peer */
int64_t announced_window_delta;
/*********** End of Flow Control ***********/
grpc_slice_buffer flow_controlled_buffer; grpc_slice_buffer flow_controlled_buffer;
@ -628,33 +638,43 @@ bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t,
/********* Flow Control ***************/ /********* Flow Control ***************/
// we have sent data on the wire // we have sent data on the wire
void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport *t, void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl *tfc,
grpc_chttp2_stream *s, int64_t size); grpc_chttp2_stream_flowctl *sfc,
int64_t size);
// we have received data from the wire // we have received data from the wire
grpc_error *grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport *t, grpc_error *grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl *tfc,
grpc_chttp2_stream *s, grpc_chttp2_stream_flowctl *sfc,
int64_t incoming_frame_size); int64_t incoming_frame_size,
uint32_t acked_init_window,
uint32_t sent_init_window);
uint32_t grpc_chttp2_flowctl_maybe_send_transport_update( uint32_t grpc_chttp2_flowctl_maybe_send_transport_update(
grpc_chttp2_transport *t); grpc_chttp2_transport_flowctl *tfc, uint32_t acked_init_window,
bool has_outbuf);
uint32_t grpc_chttp2_flowctl_maybe_send_stream_update(grpc_chttp2_stream *s); uint32_t grpc_chttp2_flowctl_maybe_send_stream_update(
grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc);
// we have received a WINDOW_UPDATE frame for a transport // we have received a WINDOW_UPDATE frame for a transport
void grpc_chttp2_flowctl_recv_transport_update(grpc_chttp2_transport *t, void grpc_chttp2_flowctl_recv_transport_update(
uint32_t size); grpc_chttp2_transport_flowctl *tfc, uint32_t size);
// we have received a WINDOW_UPDATE frame for a stream // we have received a WINDOW_UPDATE frame for a stream
void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_stream *s, void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl *tfc,
grpc_chttp2_stream_flowctl *sfc,
uint32_t size); uint32_t size);
// the application is asking for a certain amount of bytes // the application is asking for a certain amount of bytes
void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport *t, void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl *tfc,
grpc_chttp2_stream *s, grpc_chttp2_stream_flowctl *sfc,
uint32_t initial_window_size,
size_t max_size_hint, size_t max_size_hint,
size_t have_already); size_t have_already);
void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl *tfc,
grpc_chttp2_stream_flowctl *sfc);
typedef enum { typedef enum {
// Nothing to be done. // Nothing to be done.
GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED = 0, GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED = 0,
@ -671,15 +691,15 @@ typedef struct {
} grpc_chttp2_flowctl_action; } grpc_chttp2_flowctl_action;
grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
const grpc_chttp2_transport *t, const grpc_chttp2_stream *s); const grpc_chttp2_transport_flowctl *tfc,
const grpc_chttp2_stream_flowctl *sfc, bool stream_read_closed,
uint32_t acked_init_window);
void grpc_chttp2_flowctl_act_on_action(grpc_exec_ctx *exec_ctx, void grpc_chttp2_flowctl_act_on_action(grpc_exec_ctx *exec_ctx,
grpc_chttp2_flowctl_action action, grpc_chttp2_flowctl_action action,
grpc_chttp2_transport *t, grpc_chttp2_transport *t,
grpc_chttp2_stream *s); grpc_chttp2_stream *s);
void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_stream *s);
/********* End of Flow Control ***************/ /********* End of Flow Control ***************/
grpc_chttp2_stream *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport *t,

@ -354,15 +354,27 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s = grpc_chttp2_stream *s =
grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id); grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
grpc_error *err = GRPC_ERROR_NONE; grpc_error *err = GRPC_ERROR_NONE;
err = grpc_chttp2_flowctl_recv_data(t, s, t->incoming_frame_size); err = grpc_chttp2_flowctl_recv_data(
grpc_chttp2_flowctl_act_on_action(exec_ctx, &t->flow_control, s == NULL ? NULL : &s->flow_control,
grpc_chttp2_flowctl_get_action(t, s), t, s); t->incoming_frame_size,
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
grpc_chttp2_flowctl_act_on_action(
exec_ctx, grpc_chttp2_flowctl_get_action(
&t->flow_control, s == NULL ? NULL : &s->flow_control,
s == NULL ? false : s->read_closed,
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
t, s);
if (err != GRPC_ERROR_NONE) { if (err != GRPC_ERROR_NONE) {
goto error_handler; goto error_handler;
} }
if (s == NULL) { if (s == NULL) {
return init_skip_frame_parser(exec_ctx, t, 0); return init_skip_frame_parser(exec_ctx, t, 0);
} }
s->received_bytes += t->incoming_frame_size;
s->stats.incoming.framing_bytes += 9; s->stats.incoming.framing_bytes += 9;
if (err == GRPC_ERROR_NONE && s->read_closed) { if (err == GRPC_ERROR_NONE && s->read_closed) {
return init_skip_frame_parser(exec_ctx, t, 0); return init_skip_frame_parser(exec_ctx, t, 0);

@ -192,7 +192,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
&t->hpack_compressor, &t->hpack_compressor,
t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]); t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
if (t->remote_window > 0) { if (t->flow_control.remote_window > 0) {
while (grpc_chttp2_list_pop_stalled_by_transport(t, &s)) { while (grpc_chttp2_list_pop_stalled_by_transport(t, &s)) {
if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s) && if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s) &&
stream_ref_if_not_destroyed(&s->refcount->refs)) { stream_ref_if_not_destroyed(&s->refcount->refs)) {
@ -222,7 +222,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
gpr_log(GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t, gpr_log(GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t,
t->is_client ? "CLIENT" : "SERVER", s->id, t->is_client ? "CLIENT" : "SERVER", s->id,
sent_initial_metadata, s->send_initial_metadata != NULL, sent_initial_metadata, s->send_initial_metadata != NULL,
(int)(s->local_window_delta - s->announced_window_delta))); (int)(s->flow_control.local_window_delta -
s->flow_control.announced_window_delta)));
grpc_mdelem *extra_headers_for_trailing_metadata[2]; grpc_mdelem *extra_headers_for_trailing_metadata[2];
size_t num_extra_headers_for_trailing_metadata = 0; size_t num_extra_headers_for_trailing_metadata = 0;
@ -279,7 +280,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
sent_initial_metadata = true; sent_initial_metadata = true;
} }
/* send any window updates */ /* send any window updates */
uint32_t stream_announce = grpc_chttp2_flowctl_maybe_send_stream_update(s); uint32_t stream_announce = grpc_chttp2_flowctl_maybe_send_stream_update(
&t->flow_control, &s->flow_control);
if (stream_announce > 0) { if (stream_announce > 0) {
grpc_slice_buffer_add( grpc_slice_buffer_add(
&t->outbuf, grpc_chttp2_window_update_create(s->id, stream_announce, &t->outbuf, grpc_chttp2_window_update_create(s->id, stream_announce,
@ -297,13 +299,13 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
if (s->flow_controlled_buffer.length > 0) { if (s->flow_controlled_buffer.length > 0) {
uint32_t stream_remote_window = (uint32_t)GPR_MAX( uint32_t stream_remote_window = (uint32_t)GPR_MAX(
0, 0,
s->remote_window_delta + s->flow_control.remote_window_delta +
(int64_t)t->settings[GRPC_PEER_SETTINGS] (int64_t)t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
uint32_t max_outgoing = uint32_t max_outgoing = (uint32_t)GPR_MIN(
(uint32_t)GPR_MIN(t->settings[GRPC_PEER_SETTINGS] t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
GPR_MIN(stream_remote_window, t->remote_window)); GPR_MIN(stream_remote_window, t->flow_control.remote_window));
if (max_outgoing > 0) { if (max_outgoing > 0) {
uint32_t send_bytes = uint32_t send_bytes =
(uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length); (uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length);
@ -316,7 +318,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes, grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes,
is_last_frame, &s->stats.outgoing, is_last_frame, &s->stats.outgoing,
&t->outbuf); &t->outbuf);
grpc_chttp2_flowctl_sent_data(t, s, send_bytes); grpc_chttp2_flowctl_sent_data(&t->flow_control, &s->flow_control,
send_bytes);
t->ping_state.pings_before_data_required = t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data; t->ping_policy.max_pings_without_data;
if (!t->is_client) { if (!t->is_client) {
@ -339,7 +342,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork"); GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork");
grpc_chttp2_list_add_writable_stream(t, s); grpc_chttp2_list_add_writable_stream(t, s);
} }
} else if (t->remote_window == 0) { } else if (t->flow_control.remote_window == 0) {
grpc_chttp2_list_add_stalled_by_transport(t, s); grpc_chttp2_list_add_stalled_by_transport(t, s);
now_writing = true; now_writing = true;
} else if (stream_remote_window == 0) { } else if (stream_remote_window == 0) {
@ -394,8 +397,10 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
} }
} }
uint32_t transport_announce = uint32_t transport_announce = grpc_chttp2_flowctl_maybe_send_transport_update(
grpc_chttp2_flowctl_maybe_send_transport_update(t); &t->flow_control, t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
t->outbuf.count > 0);
if (transport_announce) { if (transport_announce) {
maybe_initiate_ping(exec_ctx, t, maybe_initiate_ping(exec_ctx, t,
GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE); GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE);

@ -391,8 +391,9 @@ static void BM_TransportStreamSend(benchmark::State &state) {
MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
if (!state.KeepRunning()) return; if (!state.KeepRunning()) return;
// force outgoing window to be yuge // force outgoing window to be yuge
s.chttp2_stream()->remote_window_delta = 1024 * 1024 * 1024; s.chttp2_stream()->flow_control.remote_window_delta =
f.chttp2_transport()->remote_window = 1024 * 1024 * 1024; 1024 * 1024 * 1024;
f.chttp2_transport()->flow_control.remote_window = 1024 * 1024 * 1024;
grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0); grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0);
reset_op(); reset_op();
op.on_complete = c.get(); op.on_complete = c.get();
@ -517,13 +518,13 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
std::unique_ptr<Closure> drain_continue; std::unique_ptr<Closure> drain_continue;
grpc_slice recv_slice; grpc_slice recv_slice;
std::unique_ptr<Closure> c = std::unique_ptr<Closure> c = MakeClosure([&](grpc_exec_ctx *exec_ctx,
MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { grpc_error *error) {
if (!state.KeepRunning()) return; if (!state.KeepRunning()) return;
// force outgoing window to be yuge // force outgoing window to be yuge
s.chttp2_stream()->local_window_delta = 1024 * 1024 * 1024; s.chttp2_stream()->flow_control.local_window_delta = 1024 * 1024 * 1024;
s.chttp2_stream()->announced_window_delta = 1024 * 1024 * 1024; s.chttp2_stream()->flow_control.announced_window_delta = 1024 * 1024 * 1024;
f.chttp2_transport()->announced_window = 1024 * 1024 * 1024; f.chttp2_transport()->flow_control.announced_window = 1024 * 1024 * 1024;
received = 0; received = 0;
reset_op(); reset_op();
op.on_complete = do_nothing.get(); op.on_complete = do_nothing.get();

@ -127,14 +127,15 @@ class TrickledCHTTP2 : public EndpointPairFixture {
client->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr, client->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr,
server->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr, server->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr,
server->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr, server->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr,
client->remote_window, server->remote_window, client->announced_window, client->flow_control.remote_window, server->flow_control.remote_window,
server->announced_window, client->flow_control.announced_window,
client_stream ? client_stream->remote_window_delta : -1, server->flow_control.announced_window,
server_stream ? server_stream->remote_window_delta : -1, client_stream ? client_stream->flow_control.remote_window_delta : -1,
client_stream ? client_stream->local_window_delta : -1, server_stream ? server_stream->flow_control.remote_window_delta : -1,
server_stream ? server_stream->local_window_delta : -1, client_stream ? client_stream->flow_control.local_window_delta : -1,
client_stream ? client_stream->announced_window_delta : -1, server_stream ? server_stream->flow_control.local_window_delta : -1,
server_stream ? server_stream->announced_window_delta : -1, client_stream ? client_stream->flow_control.announced_window_delta : -1,
server_stream ? server_stream->flow_control.announced_window_delta : -1,
client->settings[GRPC_PEER_SETTINGS] client->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
client->settings[GRPC_LOCAL_SETTINGS] client->settings[GRPC_LOCAL_SETTINGS]

Loading…
Cancel
Save