Possible connection window flow control improvements

Instead of incrementally doling out connection level flow control, use the fact that if a stream's initial_window_delta >= 0 then the application has made some commitment to read those bytes.
That means that we should target connection level flow control to be:

target_incoming_window = sum(stream_incoming_window_delta if stream_incoming_window_delta > 0) +
                         max(0, k * bdp - sum(abs(stream_incoming_window_delta) if stream_incoming_window_delta < 0))
pull/9720/head
Craig Tiller 8 years ago
parent 7e54f14d80
commit fd2cd7856b
  1. 12
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 54
      src/core/ext/transport/chttp2/transport/internal.h
  3. 20
      src/core/ext/transport/chttp2/transport/parsing.c
  4. 18
      src/core/ext/transport/chttp2/transport/writing.c

@ -569,6 +569,14 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GRPC_ERROR_UNREF(s->read_closed_error);
GRPC_ERROR_UNREF(s->write_closed_error);
if (s->incoming_window_delta > 0) {
GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA(
"destroy", t, s, s->incoming_window_delta);
} else if (s->incoming_window_delta < 0) {
GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA(
"destroy", t, s, -s->incoming_window_delta);
}
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream");
GPR_TIMER_END("destroy_stream", 0);
@ -2054,8 +2062,8 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
(int64_t)have_already) {
write_type = GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED;
}
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, incoming_window_delta,
add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA("op", t, s,
add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window,
add_max_recv_bytes);
if ((int64_t)s->incoming_window_delta + (int64_t)initial_window_size -

@ -271,10 +271,6 @@ struct grpc_chttp2_transport {
/** data to write next write */
grpc_slice_buffer qbuf;
/** window available to announce to peer */
int64_t announce_incoming_window;
/** how much window would we like to have for incoming_window */
uint32_t connection_window_target;
/** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
*/
uint32_t write_buffer_size;
@ -328,6 +324,16 @@ struct grpc_chttp2_transport {
/** window available for peer to send to us */
int64_t incoming_window;
/** calculating what we should give for incoming window:
we track the total amount of flow control over initial window size
across all streams: this is data that we want to receive right now (it
has an outstanding read)
and the total amount of flow control under initial window size across all
streams: this is data we've read early
we want to adjust incoming_window such that:
incoming_window = total_over - max(bdp - total_under, 0) */
int64_t stream_total_over_incoming_window;
int64_t stream_total_under_incoming_window;
/* deframing */
grpc_chttp2_deframe_transport_state deframe_state;
@ -634,6 +640,44 @@ typedef enum {
GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \
amount)
#define GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE( \
phase, transport, dst_context) \
if (dst_context->incoming_window_delta < 0) { \
transport->stream_total_under_incoming_window += \
dst_context->incoming_window_delta; \
} else if (dst_context->incoming_window_delta > 0) { \
transport->stream_total_over_incoming_window -= \
dst_context->incoming_window_delta; \
}
#define GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE( \
phase, transport, dst_context) \
if (dst_context->incoming_window_delta < 0) { \
transport->stream_total_under_incoming_window -= \
dst_context->incoming_window_delta; \
} else if (dst_context->incoming_window_delta > 0) { \
transport->stream_total_over_incoming_window += \
dst_context->incoming_window_delta; \
}
#define GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA( \
phase, transport, dst_context, amount) \
GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE(phase, transport, \
dst_context); \
GRPC_CHTTP2_FLOW_DEBIT_STREAM(phase, transport, dst_context, \
incoming_window_delta, amount); \
GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE(phase, transport, \
dst_context);
#define GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA( \
phase, transport, dst_context, amount) \
GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE(phase, transport, \
dst_context); \
GRPC_CHTTP2_FLOW_CREDIT_STREAM(phase, transport, dst_context, \
incoming_window_delta, amount); \
GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE(phase, transport, \
dst_context);
#define GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, id, dst_context, \
dst_var, amount) \
do { \
@ -752,4 +796,6 @@ void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error);
uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */

@ -376,15 +376,6 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx,
return err;
}
uint32_t target_incoming_window = GPR_MAX(
t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
1024);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window,
incoming_frame_size);
if (t->incoming_window <= target_incoming_window / 2) {
grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control");
}
if (s != NULL) {
if (incoming_frame_size >
s->incoming_window_delta +
@ -402,8 +393,8 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx,
return err;
}
GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", t, s, incoming_window_delta,
incoming_frame_size);
GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA("parse", t, s,
incoming_frame_size);
if ((int64_t)t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] +
(int64_t)s->incoming_window_delta - (int64_t)s->announce_window <=
@ -417,6 +408,13 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx,
s->received_bytes += incoming_frame_size;
}
uint32_t target_incoming_window = grpc_chttp2_target_incoming_window(t);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window,
incoming_frame_size);
if (t->incoming_window <= target_incoming_window / 2) {
grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control");
}
return GRPC_ERROR_NONE;
}

@ -152,6 +152,17 @@ static bool stream_ref_if_not_destroyed(gpr_refcount *r) {
return true;
}
uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t) {
return (uint32_t)GPR_MAX(
(int64_t)((1u << 31) - 1),
t->stream_total_over_incoming_window +
(int64_t)GPR_MAX(
t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
t->stream_total_under_incoming_window,
0));
}
bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
@ -310,13 +321,12 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
/* if the grpc_chttp2_transport is ready to send a window update, do so here
also; 3/4 is a magic number that will likely get tuned soon */
uint32_t target_incoming_window = GPR_MAX(
t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
1024);
uint32_t target_incoming_window = grpc_chttp2_target_incoming_window(t);
uint32_t threshold_to_send_transport_window_update =
t->outbuf.count > 0 ? 3 * target_incoming_window / 4
: target_incoming_window / 2;
if (t->incoming_window <= threshold_to_send_transport_window_update) {
if (t->incoming_window <= threshold_to_send_transport_window_update &&
t->incoming_window != target_incoming_window) {
maybe_initiate_ping(exec_ctx, t,
GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE);
uint32_t announced = (uint32_t)GPR_CLAMP(

Loading…
Cancel
Save