From f1021031962159ffa2fe0e9cb68a9ab5080c3854 Mon Sep 17 00:00:00 2001 From: Yang Gao Date: Sat, 18 Apr 2015 00:10:29 -0700 Subject: [PATCH 1/3] Proof of concept fix for flow control error --- src/core/transport/chttp2/frame.h | 1 + src/core/transport/chttp2/frame_settings.c | 4 ++++ src/core/transport/chttp2_transport.c | 12 ++++++++++++ 3 files changed, 17 insertions(+) diff --git a/src/core/transport/chttp2/frame.h b/src/core/transport/chttp2/frame.h index fbb941969e9..cd1d47342f5 100644 --- a/src/core/transport/chttp2/frame.h +++ b/src/core/transport/chttp2/frame.h @@ -54,6 +54,7 @@ typedef struct { gpr_uint8 process_ping_reply; gpr_uint8 goaway; + gpr_uint32 initial_window_update; gpr_uint32 window_update; gpr_uint32 goaway_last_stream_index; gpr_uint32 goaway_error; diff --git a/src/core/transport/chttp2/frame_settings.c b/src/core/transport/chttp2/frame_settings.c index 8d3250c34ff..9b9374e0fd8 100644 --- a/src/core/transport/chttp2/frame_settings.c +++ b/src/core/transport/chttp2/frame_settings.c @@ -218,6 +218,10 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( return GRPC_CHTTP2_CONNECTION_ERROR; } } + if (parser->id == 4 && parser->incoming_settings[parser->id] != parser->value) { + state->initial_window_update = parser->value - parser->incoming_settings[parser->id]; + gpr_log(GPR_DEBUG, "adding %d for initial_window change", state->window_update); + } parser->incoming_settings[parser->id] = parser->value; if (grpc_http_trace) { gpr_log(GPR_DEBUG, "CHTTP2: got setting %d = %d", parser->id, diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 110a4b544f3..fbd9d8a146b 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1485,6 +1485,18 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { } } } + if (st.initial_window_update) { + for (i = 0; i < t->stream_map.count; i++) { + stream *s = (stream*)(t->stream_map.values[i]); + /* TODO there are other scenarios */ + if (s->outgoing_window == 0) { + s->outgoing_window += st.initial_window_update; + if (s->outgoing_sopb.nops) { + stream_list_join(t, s, WRITABLE); + } + } + } + } if (st.window_update) { if (t->incoming_stream_id) { /* if there was a stream id, this is for some stream */ From 84b88847776aff3a350d6e59f7a19659e8930354 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 20 Apr 2015 08:47:52 -0700 Subject: [PATCH 2/3] Tidying up proof of concept --- src/core/transport/chttp2/frame.h | 2 +- src/core/transport/chttp2/frame_settings.c | 10 ++++++--- src/core/transport/chttp2_transport.c | 24 +++++++++++----------- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/src/core/transport/chttp2/frame.h b/src/core/transport/chttp2/frame.h index cd1d47342f5..ac76c4cc9cd 100644 --- a/src/core/transport/chttp2/frame.h +++ b/src/core/transport/chttp2/frame.h @@ -54,7 +54,7 @@ typedef struct { gpr_uint8 process_ping_reply; gpr_uint8 goaway; - gpr_uint32 initial_window_update; + gpr_int64 initial_window_update; gpr_uint32 window_update; gpr_uint32 goaway_last_stream_index; gpr_uint32 goaway_error; diff --git a/src/core/transport/chttp2/frame_settings.c b/src/core/transport/chttp2/frame_settings.c index 9b9374e0fd8..2ffce730d50 100644 --- a/src/core/transport/chttp2/frame_settings.c +++ b/src/core/transport/chttp2/frame_settings.c @@ -218,9 +218,13 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( return GRPC_CHTTP2_CONNECTION_ERROR; } } - if (parser->id == 4 && parser->incoming_settings[parser->id] != parser->value) { - state->initial_window_update = parser->value - parser->incoming_settings[parser->id]; - gpr_log(GPR_DEBUG, "adding %d for initial_window change", state->window_update); + if (parser->id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE && + parser->incoming_settings[parser->id] != parser->value) { + state->initial_window_update = + (gpr_int64)parser->value - + parser->incoming_settings[parser->id]; + gpr_log(GPR_DEBUG, "adding %d for initial_window change", + (int)state->initial_window_update); } parser->incoming_settings[parser->id] = parser->value; if (grpc_http_trace) { diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index fbd9d8a146b..39297c37a39 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -276,8 +276,8 @@ struct transport { struct stream { gpr_uint32 id; - gpr_uint32 outgoing_window; gpr_uint32 incoming_window; + gpr_int64 outgoing_window; /* when the application requests writes be closed, the write_closed is 'queued'; when the close is flow controlled into the send path, we are 'sending' it; when the write has been performed it is 'sent' */ @@ -852,7 +852,8 @@ static int prepare_write(transport *t) { /* for each stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ - while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE))) { + while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) && + s->outgoing_window > 0) { window_delta = grpc_chttp2_preencode( s->outgoing_sopb.ops, &s->outgoing_sopb.nops, GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb); @@ -867,7 +868,7 @@ static int prepare_write(transport *t) { /* if there are still writes to do and the stream still has window available, then schedule a further write */ - if (s->outgoing_sopb.nops && s->outgoing_window) { + if (s->outgoing_sopb.nops && s->outgoing_window > 0) { GPR_ASSERT(!t->outgoing_window); stream_list_add_tail(t, s, WRITABLE); } @@ -1430,8 +1431,8 @@ static int init_frame_parser(transport *t) { } } -static int is_window_update_legal(gpr_uint32 window_update, gpr_uint32 window) { - return window_update < MAX_WINDOW - window; +static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) { + return window + window_update < MAX_WINDOW; } static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { @@ -1488,12 +1489,11 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { if (st.initial_window_update) { for (i = 0; i < t->stream_map.count; i++) { stream *s = (stream*)(t->stream_map.values[i]); - /* TODO there are other scenarios */ - if (s->outgoing_window == 0) { - s->outgoing_window += st.initial_window_update; - if (s->outgoing_sopb.nops) { - stream_list_join(t, s, WRITABLE); - } + int was_window_empty = s->outgoing_window <= 0; + s->outgoing_window += st.initial_window_update; + if (was_window_empty && s->outgoing_window > 0 && + s->outgoing_sopb.nops > 0) { + stream_list_join(t, s, WRITABLE); } } } @@ -1502,7 +1502,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { /* if there was a stream id, this is for some stream */ stream *s = lookup_stream(t, t->incoming_stream_id); if (s) { - int was_window_empty = s->outgoing_window == 0; + int was_window_empty = s->outgoing_window <= 0; if (!is_window_update_legal(st.window_update, s->outgoing_window)) { cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status( GRPC_CHTTP2_FLOW_CONTROL_ERROR), From 42c15a35e54a4ae8722d2558cfaf1bfe7b5a624a Mon Sep 17 00:00:00 2001 From: Yang Gao Date: Mon, 20 Apr 2015 14:34:28 -0700 Subject: [PATCH 3/3] resolve comment --- src/core/transport/chttp2_transport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 39297c37a39..995d64015a2 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -868,7 +868,7 @@ static int prepare_write(transport *t) { /* if there are still writes to do and the stream still has window available, then schedule a further write */ - if (s->outgoing_sopb.nops && s->outgoing_window > 0) { + if (s->outgoing_sopb.nops > 0 && s->outgoing_window > 0) { GPR_ASSERT(!t->outgoing_window); stream_list_add_tail(t, s, WRITABLE); }