diff --git a/src/core/transport/chttp2/frame_ping.c b/src/core/transport/chttp2/frame_ping.c index 26004b3b7c6..a85f2bbba30 100644 --- a/src/core/transport/chttp2/frame_ping.c +++ b/src/core/transport/chttp2/frame_ping.c @@ -32,9 +32,11 @@ */ #include "src/core/transport/chttp2/frame_ping.h" +#include "src/core/transport/chttp2/internal.h" #include +#include #include gpr_slice grpc_chttp2_ping_create(gpr_uint8 ack, gpr_uint8 *opaque_8bytes) { @@ -67,12 +69,14 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_begin_frame( } grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse( - void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, + void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); gpr_uint8 *cur = beg; grpc_chttp2_ping_parser *p = parser; + grpc_chttp2_outstanding_ping *ping; + while (p->byte != 8 && cur != end) { p->opaque_8bytes[p->byte] = *cur; @@ -83,9 +87,18 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse( if (p->byte == 8) { GPR_ASSERT(is_last); if (p->is_ack) { - state->process_ping_reply = 1; + for (ping = transport_parsing->pings.next; ping != &transport_parsing->pings; ping = ping->next) { + if (0 == memcmp(p->opaque_8bytes, ping->id, 8)) { + grpc_iomgr_add_delayed_callback(ping->on_recv, 1); + } + ping->next->prev = ping->prev; + ping->prev->next = ping->next; + gpr_free(ping); + } } else { - state->send_ping_ack = 1; + gpr_slice_buffer_add( + &transport_parsing->qbuf, + grpc_chttp2_ping_create(1, p->opaque_8bytes)); } } diff --git a/src/core/transport/chttp2/frame_rst_stream.c b/src/core/transport/chttp2/frame_rst_stream.c index 3016aac7a20..77da1bcc411 100644 --- a/src/core/transport/chttp2/frame_rst_stream.c +++ b/src/core/transport/chttp2/frame_rst_stream.c @@ -32,6 +32,7 @@ */ #include "src/core/transport/chttp2/frame_rst_stream.h" +#include "src/core/transport/chttp2/internal.h" #include @@ -69,7 +70,7 @@ grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame( } grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse( - void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, + void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); @@ -84,8 +85,9 @@ grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse( if (p->byte == 4) { GPR_ASSERT(is_last); - state->rst_stream = 1; - state->rst_stream_reason = + stream_parsing->received_close = 1; + stream_parsing->saw_rst_stream = 1; + stream_parsing->rst_stream_reason = (((gpr_uint32)p->reason_bytes[0]) << 24) | (((gpr_uint32)p->reason_bytes[1]) << 16) | (((gpr_uint32)p->reason_bytes[2]) << 8) | diff --git a/src/core/transport/chttp2/frame_settings.c b/src/core/transport/chttp2/frame_settings.c index 2ffce730d50..ed9bf0fd339 100644 --- a/src/core/transport/chttp2/frame_settings.c +++ b/src/core/transport/chttp2/frame_settings.c @@ -32,6 +32,7 @@ */ #include "src/core/transport/chttp2/frame_settings.h" +#include "src/core/transport/chttp2/internal.h" #include @@ -137,7 +138,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_begin_frame( } grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( - void *p, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last) { + void *p, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { grpc_chttp2_settings_parser *parser = p; const gpr_uint8 *cur = GPR_SLICE_START_PTR(slice); const gpr_uint8 *end = GPR_SLICE_END_PTR(slice); @@ -154,7 +155,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( if (is_last) { memcpy(parser->target_settings, parser->incoming_settings, GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32)); - state->ack_settings = 1; + gpr_slice_buffer_add(&transport_parsing->qbuf, grpc_chttp2_settings_ack_create()); } return GRPC_CHTTP2_PARSE_OK; } @@ -220,11 +221,11 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( } if (parser->id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE && parser->incoming_settings[parser->id] != parser->value) { - state->initial_window_update = + transport_parsing->initial_window_update = (gpr_int64)parser->value - parser->incoming_settings[parser->id]; gpr_log(GPR_DEBUG, "adding %d for initial_window change", - (int)state->initial_window_update); + (int)transport_parsing->initial_window_update); } parser->incoming_settings[parser->id] = parser->value; if (grpc_http_trace) { diff --git a/src/core/transport/chttp2/frame_window_update.c b/src/core/transport/chttp2/frame_window_update.c index a8db7d66531..d43a137bd38 100644 --- a/src/core/transport/chttp2/frame_window_update.c +++ b/src/core/transport/chttp2/frame_window_update.c @@ -32,6 +32,7 @@ */ #include "src/core/transport/chttp2/frame_window_update.h" +#include "src/core/transport/chttp2/internal.h" #include @@ -73,7 +74,7 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_begin_frame( } grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse( - void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, + void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); @@ -92,7 +93,14 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse( return GRPC_CHTTP2_CONNECTION_ERROR; } GPR_ASSERT(is_last); - state->window_update = p->amount; + + if (transport_parsing->incoming_stream_id) { + if (stream_parsing) { + stream_parsing->outgoing_window_update += p->amount; + } + } else { + transport_parsing->outgoing_window_update += p->amount; + } } return GRPC_CHTTP2_PARSE_OK; diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c index a4895438687..edab308381e 100644 --- a/src/core/transport/chttp2/hpack_parser.c +++ b/src/core/transport/chttp2/hpack_parser.c @@ -32,6 +32,7 @@ */ #include "src/core/transport/chttp2/hpack_parser.h" +#include "src/core/transport/chttp2/internal.h" #include #include @@ -1369,7 +1370,7 @@ int grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p, } grpc_chttp2_parse_error grpc_chttp2_header_parser_parse( - void *hpack_parser, grpc_chttp2_parse_state *state, gpr_slice slice, + void *hpack_parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { grpc_chttp2_hpack_parser *parser = hpack_parser; if (!grpc_chttp2_hpack_parser_parse(parser, GPR_SLICE_START_PTR(slice), @@ -1382,9 +1383,12 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse( "end of header frame not aligned with a hpack record boundary"); return GRPC_CHTTP2_CONNECTION_ERROR; } - state->metadata_boundary = parser->is_boundary; - state->end_of_stream = parser->is_eof; - state->need_flush_reads = parser->is_eof; + if (parser->is_boundary) { + grpc_chttp2_parsing_add_metadata_batch(transport_parsing, stream_parsing); + } + if (parser->is_eof) { + stream_parsing->received_close = 1; + } parser->on_header = on_header_not_set; parser->on_header_user_data = NULL; parser->is_boundary = 0xde; diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 5eba01a3e1b..f89d327f8cb 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -162,10 +162,11 @@ typedef enum { } grpc_chttp2_setting_set; /* Outstanding ping request data */ -typedef struct { +typedef struct grpc_chttp2_outstanding_ping { gpr_uint8 id[8]; - void (*cb)(void *user_data); - void *user_data; + grpc_iomgr_closure *on_recv; + struct grpc_chttp2_outstanding_ping *next; + struct grpc_chttp2_outstanding_ping *prev; } grpc_chttp2_outstanding_ping; typedef struct { @@ -181,10 +182,13 @@ typedef struct { /** window available for us to send to peer */ gpr_uint32 outgoing_window; + /** window available for peer to send to us - updated after parse */ + gpr_uint32 incoming_window; /** how much window would we like to have for incoming_window */ gpr_uint32 connection_window_target; - + /** is this transport a client? */ + gpr_uint8 is_client; /** are the local settings dirty and need to be sent? */ gpr_uint8 dirtied_local_settings; /** have local settings been sent? */ @@ -196,6 +200,9 @@ typedef struct { /** last received stream id */ gpr_uint32 last_incoming_stream_id; + + /** pings awaiting responses */ + grpc_chttp2_outstanding_ping pings; } grpc_chttp2_transport_global; typedef struct { @@ -216,6 +223,9 @@ struct grpc_chttp2_transport_parsing { /** was a goaway frame received? */ gpr_uint8 goaway_received; + /** initial window change */ + gpr_int64 initial_window_update; + /** data to write later - after parsing */ gpr_slice_buffer qbuf; /* metadata object cache */ @@ -262,6 +272,11 @@ struct grpc_chttp2_transport_parsing { grpc_status_code goaway_error; gpr_uint32 goaway_last_stream_index; gpr_slice goaway_text; + + gpr_uint64 outgoing_window_update; + + /** pings awaiting responses */ + grpc_chttp2_outstanding_ping pings; }; @@ -306,9 +321,6 @@ struct grpc_chttp2_transport { grpc_chttp2_stream_map stream_map; /* pings */ - grpc_chttp2_outstanding_ping *pings; - size_t ping_count; - size_t ping_capacity; gpr_int64 ping_counter; grpc_chttp2_transport_global global; @@ -339,6 +351,8 @@ typedef struct { /** window available for us to send to peer */ gpr_int64 outgoing_window; + /** window available for peer to send to us - updated after parse */ + gpr_uint32 incoming_window; /** stream ops the transport user would like to send */ grpc_stream_op_buffer *outgoing_sopb; /** when the application requests writes be closed, the write_closed is @@ -367,10 +381,16 @@ struct grpc_chttp2_stream_parsing { gpr_uint8 incoming_window_changed; /** saw an error on this stream during parsing (it should be cancelled) */ gpr_uint8 saw_error; + /** saw a rst_stream */ + gpr_uint8 saw_rst_stream; /** window available for peer to send to us */ gpr_uint32 incoming_window; /** parsing state for data frames */ grpc_chttp2_data_parser data_parser; + /** reason give to rst_stream */ + gpr_uint32 rst_stream_reason; + /* amount of window given */ + gpr_uint64 outgoing_window_update; /* incoming metadata */ grpc_linked_mdelem *incoming_metadata; @@ -440,6 +460,8 @@ void grpc_chttp2_read_write_state_changed(grpc_chttp2_transport_global *transpor grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); +void grpc_chttp2_parsing_add_metadata_batch(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing); + #define GRPC_CHTTP2_FLOW_CTL_TRACE(a,b,c,d,e) do {} while (0) #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 2dd46b31168..e2c39bec3c9 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -275,14 +275,14 @@ static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) { } static grpc_chttp2_parse_error skip_parser(void *parser, - grpc_chttp2_parse_state *st, + grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { return GRPC_CHTTP2_PARSE_OK; } static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); } -static int init_skip_frame(grpc_chttp2_transport_parsing *transport_parsing, int is_header) { +static int init_skip_frame_parser(grpc_chttp2_transport_parsing *transport_parsing, int is_header) { if (is_header) { int is_eoh = transport_parsing->expect_continuation_stream_id != 0; transport_parsing->parser = grpc_chttp2_header_parser_parse; @@ -298,7 +298,7 @@ static int init_skip_frame(grpc_chttp2_transport_parsing *transport_parsing, int } static void become_skip_parser(grpc_chttp2_transport_parsing *transport_parsing) { - init_skip_frame(transport_parsing, transport_parsing->parser == grpc_chttp2_header_parser_parse); + init_skip_frame_parser(transport_parsing, transport_parsing->parser == grpc_chttp2_header_parser_parse); } static grpc_chttp2_parse_error update_incoming_window(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) { @@ -329,7 +329,7 @@ static grpc_chttp2_parse_error update_incoming_window(grpc_chttp2_transport_pars static int init_data_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) { grpc_chttp2_stream_parsing *stream_parsing = grpc_chttp2_parsing_lookup_stream(transport_parsing, transport_parsing->incoming_stream_id); grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK; - if (!stream_parsing || stream_parsing->received_close) return init_skip_frame(transport_parsing, 0); + if (!stream_parsing || stream_parsing->received_close) return init_skip_frame_parser(transport_parsing, 0); if (err == GRPC_CHTTP2_PARSE_OK) { err = update_incoming_window(transport_parsing, stream_parsing); } @@ -346,7 +346,7 @@ static int init_data_frame_parser(grpc_chttp2_transport_parsing *transport_parsi case GRPC_CHTTP2_STREAM_ERROR: stream_parsing->received_close = 1; stream_parsing->saw_error = 1; - return init_skip_frame(transport_parsing, 0); + return init_skip_frame_parser(transport_parsing, 0); case GRPC_CHTTP2_CONNECTION_ERROR: return 0; } @@ -421,7 +421,7 @@ static int init_header_frame_parser(grpc_chttp2_transport_parsing *transport_par if (!stream_parsing) { if (is_continuation) { gpr_log(GPR_ERROR, "grpc_chttp2_stream disbanded before CONTINUATION received"); - return init_skip_frame(transport_parsing, 1); + return init_skip_frame_parser(transport_parsing, 1); } if (transport_parsing->is_client) { if ((transport_parsing->incoming_stream_id & 1) && @@ -430,22 +430,22 @@ static int init_header_frame_parser(grpc_chttp2_transport_parsing *transport_par } else { gpr_log(GPR_ERROR, "ignoring new grpc_chttp2_stream creation on client"); } - return init_skip_frame(transport_parsing, 1); + return init_skip_frame_parser(transport_parsing, 1); } else if (transport_parsing->last_incoming_stream_id > transport_parsing->incoming_stream_id) { gpr_log(GPR_ERROR, "ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream " "id=%d, new grpc_chttp2_stream id=%d", transport_parsing->last_incoming_stream_id, transport_parsing->incoming_stream_id); - return init_skip_frame(transport_parsing, 1); + return init_skip_frame_parser(transport_parsing, 1); } else if ((transport_parsing->incoming_stream_id & 1) == 0) { gpr_log(GPR_ERROR, "ignoring grpc_chttp2_stream with non-client generated index %d", transport_parsing->incoming_stream_id); - return init_skip_frame(transport_parsing, 1); + return init_skip_frame_parser(transport_parsing, 1); } stream_parsing = transport_parsing->incoming_stream = grpc_chttp2_parsing_accept_stream(transport_parsing, transport_parsing->incoming_stream_id); if (!stream_parsing) { gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted"); - return init_skip_frame(transport_parsing, 1); + return init_skip_frame_parser(transport_parsing, 1); } } else { transport_parsing->incoming_stream = stream_parsing; @@ -453,7 +453,7 @@ static int init_header_frame_parser(grpc_chttp2_transport_parsing *transport_par if (stream_parsing->received_close) { gpr_log(GPR_ERROR, "skipping already closed grpc_chttp2_stream header"); transport_parsing->incoming_stream = NULL; - return init_skip_frame(transport_parsing, 1); + return init_skip_frame_parser(transport_parsing, 1); } transport_parsing->parser = grpc_chttp2_header_parser_parse; transport_parsing->parser_data = &transport_parsing->hpack_parser; @@ -539,7 +539,7 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) { } */ -static void add_metadata_batch(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) { +void grpc_chttp2_parsing_add_metadata_batch(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) { grpc_metadata_batch b; b.list.head = NULL; @@ -555,15 +555,33 @@ static void add_metadata_batch(grpc_chttp2_transport_parsing *transport_parsing, grpc_sopb_add_metadata(&stream_parsing->data_parser.incoming_sopb, b); } -static int parse_frame_slice(grpc_chttp2_transport_parsing *t, gpr_slice slice, int is_last) { - grpc_chttp2_parse_state st; - size_t i; - memset(&st, 0, sizeof(st)); - switch (transport_parsing->parser(transport_parsing->parser_data, &st, slice, is_last)) { +static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last) { + grpc_chttp2_stream_parsing *stream_parsing = transport_parsing->incoming_stream; + switch (transport_parsing->parser(transport_parsing->parser_data, transport_parsing, stream_parsing, slice, is_last)) { case GRPC_CHTTP2_PARSE_OK: if (stream_parsing) { grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); } + return 1; + case GRPC_CHTTP2_STREAM_ERROR: + become_skip_parser(transport_parsing); + if (stream_parsing) { + stream_parsing->saw_error = 1; + } + return 1; + case GRPC_CHTTP2_CONNECTION_ERROR: + return 0; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + return 0; +} + + + + + +#if 0 if (st.end_of_stream) { transport_parsing->incoming_stream->read_closed = 1; maybe_finish_read(t, transport_parsing->incoming_stream, 1); @@ -576,12 +594,8 @@ static int parse_frame_slice(grpc_chttp2_transport_parsing *t, gpr_slice slice, maybe_finish_read(t, transport_parsing->incoming_stream, 1); } if (st.ack_settings) { - gpr_slice_buffer_add(&transport_parsing->qbuf, grpc_chttp2_settings_ack_create()); } if (st.send_ping_ack) { - gpr_slice_buffer_add( - &transport_parsing->qbuf, - grpc_chttp2_ping_create(1, transport_parsing->simple.ping.opaque_8bytes)); } if (st.goaway) { add_goaway(t, st.goaway_error, st.goaway_text); @@ -624,19 +638,4 @@ static int parse_frame_slice(grpc_chttp2_transport_parsing *t, gpr_slice slice, transport_parsing->global.outgoing_window_update += st.window_update; } } - return 1; - case GRPC_CHTTP2_STREAM_ERROR: - become_skip_parser(transport_parsing); - cancel_stream_id( - t, transport_parsing->incoming_stream_id, - grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_INTERNAL_ERROR), - GRPC_CHTTP2_INTERNAL_ERROR, 1); - return 1; - case GRPC_CHTTP2_CONNECTION_ERROR: - drop_connection(transport_parsing); - return 0; - } - gpr_log(GPR_ERROR, "should never reach here"); - abort(); - return 0; -} +#endif diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index 0265f173f38..3f25cbedac3 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -39,7 +39,7 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing); static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status); -int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_constants *transport_constants, grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) { +int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) { grpc_chttp2_stream_global *stream_global; grpc_chttp2_stream_writing *stream_writing; gpr_uint32 window_delta; @@ -75,7 +75,7 @@ int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_constants *transpor if (stream_global->write_state == WRITE_STATE_QUEUED_CLOSE && stream_global->outgoing_sopb->nops == 0) { - if (!transport_constants->is_client && !stream_global->read_closed) { + if (!transport_global->is_client && !stream_global->read_closed) { stream_writing->send_closed = SEND_CLOSED_WITH_RST_STREAM; } else { stream_writing->send_closed = SEND_CLOSED; @@ -158,14 +158,14 @@ static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status) { grpc_chttp2_terminate_writing(transport_writing, write_status == GRPC_ENDPOINT_CB_OK); } -void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_constants *transport_constants, grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) { +void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) { grpc_chttp2_stream_writing *stream_writing; grpc_chttp2_stream_global *stream_global; while (grpc_chttp2_list_pop_written_stream(transport_global, transport_writing, &stream_global, &stream_writing)) { if (stream_writing->send_closed != DONT_SEND_CLOSED) { stream_global->write_state = WRITE_STATE_SENT_CLOSE; - if (!transport_constants->is_client) { + if (!transport_global->is_client) { stream_global->read_closed = 1; } grpc_chttp2_read_write_state_changed(transport_global, stream_global);