diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 21ffd2abf04..2d3d720f2a3 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -423,8 +423,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { gpr_mu_unlock(&t->mu); for (i = 0; i < STREAM_LIST_COUNT; i++) { - GPR_ASSERT(s->links[i].next == NULL); - GPR_ASSERT(s->links[i].prev == NULL); + GPR_ASSERT(!s->included[i]); } GPR_ASSERT(s->global.outgoing_sopb == NULL); @@ -483,26 +482,16 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); } static void unlock(grpc_chttp2_transport *t) { grpc_iomgr_closure *run_closures; + unlock_check_read_write_state(t); if (!t->writing_active && t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE && grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) { t->writing_active = 1; REF_TRANSPORT(t, "writing"); grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1); } - unlock_check_read_write_state(t); /* unlock_check_parser(t); */ unlock_check_channel_callbacks(t); - if (!t->parsing_active) { - size_t new_stream_count = - grpc_chttp2_stream_map_size(&t->parsing_stream_map) + - grpc_chttp2_stream_map_size(&t->new_stream_map); - if (new_stream_count != t->global.concurrent_stream_count) { - t->global.concurrent_stream_count = new_stream_count; - maybe_start_some_streams(&t->global); - } - } - run_closures = t->global.pending_closures; t->global.pending_closures = NULL; @@ -734,6 +723,7 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed, } static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) { + size_t new_stream_count; grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->parsing_stream_map, id); if (!s) { @@ -745,6 +735,14 @@ static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) { t->parsing.incoming_stream = NULL; grpc_chttp2_parsing_become_skip_parser(&t->parsing); } + + new_stream_count = + grpc_chttp2_stream_map_size(&t->parsing_stream_map) + + grpc_chttp2_stream_map_size(&t->new_stream_map); + if (new_stream_count != t->global.concurrent_stream_count) { + t->global.concurrent_stream_count = new_stream_count; + maybe_start_some_streams(&t->global); + } } static void unlock_check_read_write_state(grpc_chttp2_transport *t) { @@ -752,10 +750,10 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { grpc_chttp2_stream_global *stream_global; grpc_stream_state state; - /* if a stream is in the stream map, and gets cancelled, we need to ensure - we are not parsing before continuing the cancellation to keep things in - a sane state */ if (!t->parsing_active) { + /* if a stream is in the stream map, and gets cancelled, we need to ensure + we are not parsing before continuing the cancellation to keep things in + a sane state */ while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global, &stream_global)) { GPR_ASSERT(stream_global->in_stream_map); @@ -1017,6 +1015,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, /* merge stream lists */ grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); + t->global.concurrent_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map); /* handle higher level things */ grpc_chttp2_publish_reads(&t->global, &t->parsing); t->parsing_active = 0;