From 6c8619bbe7a0eb8ca65782886e8253ebbec87b54 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 7 Jul 2016 10:41:48 -0700 Subject: [PATCH] Better fix for flow control bug --- src/core/ext/transport/chttp2/transport/internal.h | 2 +- src/core/ext/transport/chttp2/transport/parsing.c | 14 ++++++-------- .../ext/transport/chttp2/transport/stream_lists.c | 9 ++++++++- src/core/ext/transport/chttp2/transport/writing.c | 14 ++++++++++++-- test/cpp/end2end/end2end_test.cc | 3 +++ 5 files changed, 30 insertions(+), 12 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 2a12afad6ce..6b47d702aea 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -630,7 +630,7 @@ int grpc_chttp2_list_pop_check_read_ops( void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing); -void grpc_chttp2_list_flush_writing_stalled_by_transport( +bool grpc_chttp2_list_flush_writing_stalled_by_transport( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing); void grpc_chttp2_list_add_stalled_by_transport( diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index efc27775f00..a8ce1db8477 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -87,8 +87,8 @@ void grpc_chttp2_prepare_to_read( transport_global->settings[GRPC_SENT_SETTINGS], sizeof(transport_parsing->last_sent_settings)); transport_parsing->max_frame_size = - transport_global->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]; + transport_global + ->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]; /* update the parsing view of incoming window */ while (grpc_chttp2_list_pop_unannounced_incoming_window_available( @@ -154,11 +154,8 @@ void grpc_chttp2_publish_reads( transport_parsing, outgoing_window); is_zero = transport_global->outgoing_window <= 0; if (was_zero && !is_zero) { - while (grpc_chttp2_list_pop_stalled_by_transport(transport_global, - &stream_global)) { - grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, - false, "transport.read_flow_control"); - } + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "new_global_flow_control"); } if (transport_parsing->incoming_window < @@ -169,7 +166,8 @@ void grpc_chttp2_publish_reads( announce_incoming_window, announce_bytes); GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_parsing, incoming_window, announce_bytes); - grpc_chttp2_initiate_write(exec_ctx, transport_global, false, "global incoming window"); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "global incoming window"); } /* for each stream that saw an update, fixup global state */ diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c index aaa4768c7b0..2eb5f5f632e 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.c +++ b/src/core/ext/transport/chttp2/transport/stream_lists.c @@ -329,6 +329,7 @@ void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { grpc_chttp2_stream *stream = STREAM_FROM_WRITING(stream_writing); + gpr_log(GPR_DEBUG, "writing stalled %d", stream->global.id); if (!stream->included[GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT]) { GRPC_CHTTP2_STREAM_REF(&stream->global, "chttp2_writing_stalled"); } @@ -336,22 +337,28 @@ void grpc_chttp2_list_add_writing_stalled_by_transport( GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT); } -void grpc_chttp2_list_flush_writing_stalled_by_transport( +bool grpc_chttp2_list_flush_writing_stalled_by_transport( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing) { grpc_chttp2_stream *stream; + bool out = false; grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing); while (stream_list_pop(transport, &stream, GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) { + gpr_log(GPR_DEBUG, "move %d from writing stalled to just stalled", + stream->global.id); grpc_chttp2_list_add_stalled_by_transport(transport_writing, &stream->writing); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &stream->global, "chttp2_writing_stalled"); + out = true; } + return out; } void grpc_chttp2_list_add_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { + gpr_log(GPR_DEBUG, "stalled %d", stream_writing->id); stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), STREAM_FROM_WRITING(stream_writing), GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index cbc57d10ad3..e0d87725e9d 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -75,6 +75,13 @@ int grpc_chttp2_unlocking_check_writes( GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window, transport_global, outgoing_window); + if (transport_writing->outgoing_window > 0) { + while (grpc_chttp2_list_pop_stalled_by_transport(transport_global, + &stream_global)) { + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + false, "transport.read_flow_control"); + } + } /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ @@ -328,8 +335,11 @@ void grpc_chttp2_cleanup_writing( grpc_chttp2_stream_writing *stream_writing; grpc_chttp2_stream_global *stream_global; - grpc_chttp2_list_flush_writing_stalled_by_transport(exec_ctx, - transport_writing); + if (grpc_chttp2_list_flush_writing_stalled_by_transport(exec_ctx, + transport_writing)) { + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "resume_stalled_stream"); + } while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 354a59cedd5..0f87ae3e440 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -1166,6 +1166,9 @@ TEST_P(ProxyEnd2endTest, HugeResponse) { request.mutable_param()->set_response_message_length(kResponseSize); ClientContext context; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::seconds(20); + context.set_deadline(deadline); Status s = stub_->Echo(&context, request, &response); EXPECT_EQ(kResponseSize, response.message().size()); EXPECT_TRUE(s.ok());