From 17be5dc7969f21d99a40cb116f84067eee54923b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 1 Jul 2015 10:36:27 -0700 Subject: [PATCH] Fix a list management bug exposed by new locking scheme in client_channel --- src/core/transport/chttp2/internal.h | 8 ++++++ src/core/transport/chttp2/stream_lists.c | 18 +++++++++++++ src/core/transport/chttp2_transport.c | 34 +++++++++++++++--------- 3 files changed, 48 insertions(+), 12 deletions(-) diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index c8c46f0e544..7f98a5bd711 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -63,6 +63,7 @@ typedef enum { GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE, GRPC_CHTTP2_LIST_PARSING_SEEN, GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING, + GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING, GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED, /** streams that are waiting to start because there are too many concurrent streams on the connection */ @@ -526,6 +527,13 @@ int grpc_chttp2_list_pop_closed_waiting_for_parsing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); +void grpc_chttp2_list_add_cancelled_waiting_for_writing( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global); +int grpc_chttp2_list_pop_cancelled_waiting_for_writing( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global **stream_global); + void grpc_chttp2_list_add_read_write_state_changed( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index c6ba12fca87..f04e763387c 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -282,6 +282,24 @@ int grpc_chttp2_list_pop_closed_waiting_for_parsing( return r; } +void grpc_chttp2_list_add_cancelled_waiting_for_writing( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global) { + stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING); +} + +int grpc_chttp2_list_pop_cancelled_waiting_for_writing( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global **stream_global) { + grpc_chttp2_stream *stream; + int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, + GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING); + *stream_global = &stream->global; + return r; +} + void grpc_chttp2_list_add_incoming_window_updated( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 4bcb317fe79..a7f9af5c0c4 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -765,21 +765,31 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { } } + if (!t->writing_active) { + while (grpc_chttp2_list_pop_cancelled_waiting_for_writing(transport_global, &stream_global)) { + grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); + } + } + while (grpc_chttp2_list_pop_read_write_state_changed(transport_global, &stream_global)) { if (stream_global->cancelled) { - stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE; - stream_global->read_closed = 1; - if (!stream_global->published_cancelled) { - char buffer[GPR_LTOA_MIN_BUFSIZE]; - gpr_ltoa(stream_global->cancelled_status, buffer); - grpc_chttp2_incoming_metadata_buffer_add( - &stream_global->incoming_metadata, - grpc_mdelem_from_strings(t->metadata_context, "grpc-status", - buffer)); - grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into( - &stream_global->incoming_metadata, &stream_global->incoming_sopb); - stream_global->published_cancelled = 1; + if (t->writing_active && stream_global->write_state != GRPC_WRITE_STATE_SENT_CLOSE) { + grpc_chttp2_list_add_cancelled_waiting_for_writing(transport_global, stream_global); + } else { + stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE; + stream_global->read_closed = 1; + if (!stream_global->published_cancelled) { + char buffer[GPR_LTOA_MIN_BUFSIZE]; + gpr_ltoa(stream_global->cancelled_status, buffer); + grpc_chttp2_incoming_metadata_buffer_add( + &stream_global->incoming_metadata, + grpc_mdelem_from_strings(t->metadata_context, "grpc-status", + buffer)); + grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into( + &stream_global->incoming_metadata, &stream_global->incoming_sopb); + stream_global->published_cancelled = 1; + } } } if (stream_global->write_state == GRPC_WRITE_STATE_SENT_CLOSE &&