diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 5465d3306a5..48a10058331 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -237,6 +237,9 @@ struct transport { /* state for a stream that's not yet been created */ grpc_stream_op_buffer new_stream_sopb; + /* stream ops that need to be destroyed, but outside of the lock */ + grpc_stream_op_buffer nuke_later_sopb; + /* active parser */ void *parser_data; stream *incoming_stream; @@ -370,6 +373,8 @@ static void unref_transport(transport *t) { } gpr_free(t->pending_goaways); + grpc_sopb_destroy(&t->nuke_later_sopb); + gpr_free(t); } @@ -416,6 +421,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, t->cap_pending_goaways = 0; gpr_slice_buffer_init(&t->outbuf); gpr_slice_buffer_init(&t->qbuf); + grpc_sopb_init(&t->nuke_later_sopb); if (is_client) { gpr_slice_buffer_add(&t->qbuf, gpr_slice_from_copied_string(CLIENT_CONNECT_STRING)); @@ -555,6 +561,11 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, return 0; } +static void schedule_nuke_sopb(transport *t, grpc_stream_op_buffer *sopb) { + grpc_sopb_append(&t->nuke_later_sopb, sopb->ops, sopb->nops); + sopb->nops = 0; +} + static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { transport *t = (transport *)gt; stream *s = (stream *)gs; @@ -681,6 +692,11 @@ static void unlock(transport *t) { int i; pending_goaway *goaways = NULL; grpc_endpoint *ep = t->ep; + grpc_stream_op_buffer nuke_now = t->nuke_later_sopb; + + if (nuke_now.nops) { + memset(&t->nuke_later_sopb, 0, sizeof(t->nuke_later_sopb)); + } /* see if we need to trigger a write - and if so, get the data ready */ if (ep && !t->writing) { @@ -750,6 +766,10 @@ static void unlock(transport *t) { unref_transport(t); } + if (nuke_now.nops) { + grpc_sopb_destroy(&nuke_now); + } + gpr_free(goaways); } @@ -1006,9 +1026,9 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, if (s) { /* clear out any unreported input & output: nobody cares anymore */ - grpc_sopb_reset(&s->parser.incoming_sopb); had_outgoing = s->outgoing_sopb.nops != 0; - grpc_sopb_reset(&s->outgoing_sopb); + schedule_nuke_sopb(t, &s->parser.incoming_sopb); + schedule_nuke_sopb(t, &s->outgoing_sopb); if (s->cancelled) { send_rst = 0; } else if (!s->read_closed || !s->sent_write_closed || had_outgoing) { @@ -1518,7 +1538,7 @@ static int process_read(transport *t, gpr_slice slice) { dts_fh_0: case DTS_FH_0: GPR_ASSERT(cur < end); - t->incoming_frame_size = ((gpr_uint32) * cur) << 16; + t->incoming_frame_size = ((gpr_uint32)*cur) << 16; if (++cur == end) { t->deframe_state = DTS_FH_1; return 1; @@ -1526,7 +1546,7 @@ static int process_read(transport *t, gpr_slice slice) { /* fallthrough */ case DTS_FH_1: GPR_ASSERT(cur < end); - t->incoming_frame_size |= ((gpr_uint32) * cur) << 8; + t->incoming_frame_size |= ((gpr_uint32)*cur) << 8; if (++cur == end) { t->deframe_state = DTS_FH_2; return 1; @@ -1558,7 +1578,7 @@ static int process_read(transport *t, gpr_slice slice) { /* fallthrough */ case DTS_FH_5: GPR_ASSERT(cur < end); - t->incoming_stream_id = (((gpr_uint32) * cur) << 24) & 0x7f; + t->incoming_stream_id = (((gpr_uint32)*cur) << 24) & 0x7f; if (++cur == end) { t->deframe_state = DTS_FH_6; return 1; @@ -1566,7 +1586,7 @@ static int process_read(transport *t, gpr_slice slice) { /* fallthrough */ case DTS_FH_6: GPR_ASSERT(cur < end); - t->incoming_stream_id |= ((gpr_uint32) * cur) << 16; + t->incoming_stream_id |= ((gpr_uint32)*cur) << 16; if (++cur == end) { t->deframe_state = DTS_FH_7; return 1; @@ -1574,7 +1594,7 @@ static int process_read(transport *t, gpr_slice slice) { /* fallthrough */ case DTS_FH_7: GPR_ASSERT(cur < end); - t->incoming_stream_id |= ((gpr_uint32) * cur) << 8; + t->incoming_stream_id |= ((gpr_uint32)*cur) << 8; if (++cur == end) { t->deframe_state = DTS_FH_8; return 1; @@ -1582,7 +1602,7 @@ static int process_read(transport *t, gpr_slice slice) { /* fallthrough */ case DTS_FH_8: GPR_ASSERT(cur < end); - t->incoming_stream_id |= ((gpr_uint32) * cur); + t->incoming_stream_id |= ((gpr_uint32)*cur); t->deframe_state = DTS_FRAME; if (!init_frame_parser(t)) { return 0; @@ -1738,9 +1758,9 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { */ static const grpc_transport_vtable vtable = { - sizeof(stream), init_stream, send_batch, set_allow_window_updates, - add_to_pollset, destroy_stream, abort_stream, goaway, - close_transport, send_ping, destroy_transport}; + sizeof(stream), init_stream, send_batch, set_allow_window_updates, + add_to_pollset, destroy_stream, abort_stream, goaway, close_transport, + send_ping, destroy_transport}; void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, void *arg,