|
|
@ -276,8 +276,8 @@ struct transport { |
|
|
|
struct stream { |
|
|
|
struct stream { |
|
|
|
gpr_uint32 id; |
|
|
|
gpr_uint32 id; |
|
|
|
|
|
|
|
|
|
|
|
gpr_uint32 outgoing_window; |
|
|
|
|
|
|
|
gpr_uint32 incoming_window; |
|
|
|
gpr_uint32 incoming_window; |
|
|
|
|
|
|
|
gpr_int64 outgoing_window; |
|
|
|
/* when the application requests writes be closed, the write_closed is
|
|
|
|
/* when the application requests writes be closed, the write_closed is
|
|
|
|
'queued'; when the close is flow controlled into the send path, we are |
|
|
|
'queued'; when the close is flow controlled into the send path, we are |
|
|
|
'sending' it; when the write has been performed it is 'sent' */ |
|
|
|
'sending' it; when the write has been performed it is 'sent' */ |
|
|
@ -852,7 +852,8 @@ static int prepare_write(transport *t) { |
|
|
|
|
|
|
|
|
|
|
|
/* for each stream that's become writable, frame it's data (according to
|
|
|
|
/* for each stream that's become writable, frame it's data (according to
|
|
|
|
available window sizes) and add to the output buffer */ |
|
|
|
available window sizes) and add to the output buffer */ |
|
|
|
while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE))) { |
|
|
|
while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) && |
|
|
|
|
|
|
|
s->outgoing_window > 0) { |
|
|
|
window_delta = grpc_chttp2_preencode( |
|
|
|
window_delta = grpc_chttp2_preencode( |
|
|
|
s->outgoing_sopb.ops, &s->outgoing_sopb.nops, |
|
|
|
s->outgoing_sopb.ops, &s->outgoing_sopb.nops, |
|
|
|
GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb); |
|
|
|
GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb); |
|
|
@ -867,7 +868,7 @@ static int prepare_write(transport *t) { |
|
|
|
|
|
|
|
|
|
|
|
/* if there are still writes to do and the stream still has window
|
|
|
|
/* if there are still writes to do and the stream still has window
|
|
|
|
available, then schedule a further write */ |
|
|
|
available, then schedule a further write */ |
|
|
|
if (s->outgoing_sopb.nops && s->outgoing_window) { |
|
|
|
if (s->outgoing_sopb.nops && s->outgoing_window > 0) { |
|
|
|
GPR_ASSERT(!t->outgoing_window); |
|
|
|
GPR_ASSERT(!t->outgoing_window); |
|
|
|
stream_list_add_tail(t, s, WRITABLE); |
|
|
|
stream_list_add_tail(t, s, WRITABLE); |
|
|
|
} |
|
|
|
} |
|
|
@ -1430,8 +1431,8 @@ static int init_frame_parser(transport *t) { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static int is_window_update_legal(gpr_uint32 window_update, gpr_uint32 window) { |
|
|
|
static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) { |
|
|
|
return window_update < MAX_WINDOW - window; |
|
|
|
return window + window_update < MAX_WINDOW; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { |
|
|
|
static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { |
|
|
@ -1488,12 +1489,11 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { |
|
|
|
if (st.initial_window_update) { |
|
|
|
if (st.initial_window_update) { |
|
|
|
for (i = 0; i < t->stream_map.count; i++) { |
|
|
|
for (i = 0; i < t->stream_map.count; i++) { |
|
|
|
stream *s = (stream*)(t->stream_map.values[i]); |
|
|
|
stream *s = (stream*)(t->stream_map.values[i]); |
|
|
|
/* TODO there are other scenarios */ |
|
|
|
int was_window_empty = s->outgoing_window <= 0; |
|
|
|
if (s->outgoing_window == 0) { |
|
|
|
s->outgoing_window += st.initial_window_update; |
|
|
|
s->outgoing_window += st.initial_window_update; |
|
|
|
if (was_window_empty && s->outgoing_window > 0 && |
|
|
|
if (s->outgoing_sopb.nops) { |
|
|
|
s->outgoing_sopb.nops > 0) { |
|
|
|
stream_list_join(t, s, WRITABLE); |
|
|
|
stream_list_join(t, s, WRITABLE); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -1502,7 +1502,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { |
|
|
|
/* if there was a stream id, this is for some stream */ |
|
|
|
/* if there was a stream id, this is for some stream */ |
|
|
|
stream *s = lookup_stream(t, t->incoming_stream_id); |
|
|
|
stream *s = lookup_stream(t, t->incoming_stream_id); |
|
|
|
if (s) { |
|
|
|
if (s) { |
|
|
|
int was_window_empty = s->outgoing_window == 0; |
|
|
|
int was_window_empty = s->outgoing_window <= 0; |
|
|
|
if (!is_window_update_legal(st.window_update, s->outgoing_window)) { |
|
|
|
if (!is_window_update_legal(st.window_update, s->outgoing_window)) { |
|
|
|
cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status( |
|
|
|
cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status( |
|
|
|
GRPC_CHTTP2_FLOW_CONTROL_ERROR), |
|
|
|
GRPC_CHTTP2_FLOW_CONTROL_ERROR), |
|
|
|