From 67eb59ee01269a8c6d87d614aaf548cccc6f7130 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 16 Aug 2016 11:40:54 -0700 Subject: [PATCH] Initial pass reifying read and global locks --- .../chttp2/transport/chttp2_transport.c | 314 +++----- .../ext/transport/chttp2/transport/frame.h | 4 +- .../transport/chttp2/transport/frame_data.c | 29 +- .../transport/chttp2/transport/frame_data.h | 4 +- .../transport/chttp2/transport/frame_goaway.c | 13 +- .../transport/chttp2/transport/frame_goaway.h | 4 +- .../transport/chttp2/transport/frame_ping.c | 8 +- .../transport/chttp2/transport/frame_ping.h | 4 +- .../chttp2/transport/frame_rst_stream.c | 35 +- .../chttp2/transport/frame_rst_stream.h | 4 +- .../chttp2/transport/frame_settings.c | 17 +- .../chttp2/transport/frame_settings.h | 4 +- .../chttp2/transport/frame_window_update.c | 33 +- .../chttp2/transport/frame_window_update.h | 4 +- .../transport/chttp2/transport/hpack_parser.c | 28 +- .../transport/chttp2/transport/hpack_parser.h | 4 +- .../ext/transport/chttp2/transport/internal.h | 169 ++-- .../ext/transport/chttp2/transport/parsing.c | 759 +++++++----------- .../transport/chttp2/transport/stream_lists.c | 57 -- 19 files changed, 563 insertions(+), 931 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 16946de853d..45ad8d7ed0f 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -69,10 +69,6 @@ int grpc_http_write_state_trace = 0; ((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \ writing))) -#define TRANSPORT_FROM_PARSING(tp) \ - ((grpc_chttp2_transport *)((char *)(tp)-offsetof(grpc_chttp2_transport, \ - parsing))) - #define TRANSPORT_FROM_GLOBAL(tg) \ ((grpc_chttp2_transport *)((char *)(tg)-offsetof(grpc_chttp2_transport, \ global))) @@ -80,19 +76,13 @@ int grpc_http_write_state_trace = 0; #define STREAM_FROM_GLOBAL(sg) \ ((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, global))) -#define STREAM_FROM_PARSING(sg) \ - ((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, parsing))) - static const grpc_transport_vtable vtable; /* forward declarations of various callbacks that we'll build closures around */ static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); -static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *t, @@ -164,10 +154,9 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, gpr_slice_buffer_destroy(&t->writing.outbuf); grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor); - gpr_slice_buffer_destroy(&t->parsing.qbuf); gpr_slice_buffer_destroy(&t->read_buffer); - grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser); - grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser); + grpc_chttp2_hpack_parser_destroy(&t->global.hpack_parser); + grpc_chttp2_goaway_parser_destroy(&t->global.goaway_parser); for (i = 0; i < STREAM_LIST_COUNT; i++) { GPR_ASSERT(t->lists[i].head == NULL); @@ -251,15 +240,14 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->global.next_stream_id = is_client ? 1 : 2; t->global.is_client = is_client; t->writing.outgoing_window = DEFAULT_WINDOW; - t->parsing.incoming_window = DEFAULT_WINDOW; + t->global.incoming_window = DEFAULT_WINDOW; t->global.stream_lookahead = DEFAULT_WINDOW; t->global.connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET; t->global.ping_counter = 1; t->global.pings.next = t->global.pings.prev = &t->global.pings; - t->parsing.is_client = is_client; - t->parsing.deframe_state = + t->global.deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; - t->parsing.is_first_frame = true; + t->global.is_first_frame = true; t->writing.is_client = is_client; grpc_connectivity_state_init( &t->channel_callback.state_tracker, GRPC_CHANNEL_READY, @@ -272,8 +260,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure_init(&t->writing_action, writing_action, t); grpc_closure_init(&t->reading_action, reading_action, t); grpc_closure_init(&t->reading_action_locked, reading_action_locked, t); - grpc_closure_init(&t->parsing_action, parsing_action, t); - grpc_closure_init(&t->post_parse_locked, post_parse_locked, t); grpc_closure_init(&t->initiate_writing, initiate_writing_locked, t); grpc_closure_init(&t->terminate_writing, terminate_writing_with_lock, t); grpc_closure_init(&t->initiate_read_flush_locked, initiate_read_flush_locked, @@ -281,9 +267,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing, &t->writing); - gpr_slice_buffer_init(&t->parsing.qbuf); - grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); - grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser); + grpc_chttp2_goaway_parser_init(&t->global.goaway_parser); + grpc_chttp2_hpack_parser_init(&t->global.hpack_parser); gpr_slice_buffer_init(&t->read_buffer); @@ -297,7 +282,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, /* copy in initial settings to all setting sets */ for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) { - t->parsing.settings[i] = grpc_chttp2_settings_parameters[i].default_value; for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) { t->global.settings[j][i] = grpc_chttp2_settings_parameters[i].default_value; @@ -508,13 +492,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_ref_init(&s->global.active_streams, 1); GRPC_CHTTP2_STREAM_REF(&s->global, "chttp2"); - grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.metadata_buffer[0]); - grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.metadata_buffer[1]); - grpc_chttp2_incoming_metadata_buffer_init( - &s->global.received_initial_metadata); - grpc_chttp2_incoming_metadata_buffer_init( - &s->global.received_trailing_metadata); - grpc_chttp2_data_parser_init(&s->parsing.data_parser); + grpc_chttp2_incoming_metadata_buffer_init(&s->global.metadata_buffer[0]); + grpc_chttp2_incoming_metadata_buffer_init(&s->global.metadata_buffer[1]); + grpc_chttp2_data_parser_init(&s->global.data_parser); gpr_slice_buffer_init(&s->writing.flow_controlled_buffer); s->global.deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); @@ -523,11 +503,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, if (server_data) { GPR_ASSERT(t->executor.parsing_active); s->global.id = (uint32_t)(uintptr_t)server_data; - s->parsing.id = s->global.id; s->global.outgoing_window = t->global.settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->parsing.incoming_window = s->global.max_recv_bytes = + s->global.incoming_window = s->global.max_recv_bytes = t->global.settings[GRPC_SENT_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; *t->accepting_stream = s; @@ -571,8 +550,6 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); } - grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global, - &s->global); grpc_chttp2_list_remove_stalled_by_transport(&t->global, &s->global); grpc_chttp2_list_remove_check_read_ops(&t->global, &s->global); @@ -590,13 +567,9 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GPR_ASSERT(s->global.recv_initial_metadata_ready == NULL); GPR_ASSERT(s->global.recv_message_ready == NULL); GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL); - grpc_chttp2_data_parser_destroy(exec_ctx, &s->parsing.data_parser); - grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[0]); - grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[1]); - grpc_chttp2_incoming_metadata_buffer_destroy( - &s->global.received_initial_metadata); - grpc_chttp2_incoming_metadata_buffer_destroy( - &s->global.received_trailing_metadata); + grpc_chttp2_data_parser_destroy(exec_ctx, &s->global.data_parser); + grpc_chttp2_incoming_metadata_buffer_destroy(&s->global.metadata_buffer[0]); + grpc_chttp2_incoming_metadata_buffer_destroy(&s->global.metadata_buffer[1]); gpr_slice_buffer_destroy(&s->writing.flow_controlled_buffer); GRPC_ERROR_UNREF(s->global.read_closed_error); GRPC_ERROR_UNREF(s->global.write_closed_error); @@ -621,26 +594,26 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, GPR_TIMER_END("destroy_stream", 0); } -grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( - grpc_chttp2_transport_parsing *transport_parsing, uint32_t id) { - grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing); +grpc_chttp2_stream_global *grpc_chttp2_parsing_lookup_stream( + grpc_chttp2_transport_global *transport_global, uint32_t id) { + grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global); grpc_chttp2_stream *s = grpc_chttp2_stream_map_find(&t->parsing_stream_map, id); - return s ? &s->parsing : NULL; + return s ? &s->global : NULL; } -grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, +grpc_chttp2_stream_global *grpc_chttp2_parsing_accept_stream( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, uint32_t id) { grpc_chttp2_stream *accepting; - grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing); + grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global); GPR_ASSERT(t->accepting_stream == NULL); t->accepting_stream = &accepting; t->channel_callback.accept_stream(exec_ctx, t->channel_callback.accept_stream_user_data, &t->base, (void *)(uintptr_t)id); t->accepting_stream = NULL; - return &accepting->parsing; + return &accepting->global; } /******************************************************************************* @@ -913,15 +886,13 @@ static void maybe_start_some_streams( grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, &stream_global)) { /* safe since we can't (legally) be parsing this stream yet */ - grpc_chttp2_stream_parsing *stream_parsing = - &STREAM_FROM_GLOBAL(stream_global)->parsing; GRPC_CHTTP2_IF_TRACING(gpr_log( GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d", transport_global->is_client ? "CLI" : "SVR", stream_global, transport_global->next_stream_id)); GPR_ASSERT(stream_global->id == 0); - stream_global->id = stream_parsing->id = transport_global->next_stream_id; + stream_global->id = transport_global->next_stream_id; transport_global->next_stream_id += 2; if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) { @@ -933,7 +904,7 @@ static void maybe_start_some_streams( stream_global->outgoing_window = transport_global->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - stream_parsing->incoming_window = stream_incoming_window = + stream_global->incoming_window = stream_incoming_window = transport_global->settings[GRPC_SENT_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; stream_global->max_recv_bytes = @@ -1271,10 +1242,10 @@ static void ack_ping_locked(grpc_exec_ctx *exec_ctx, void *a, } void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport_parsing *transport_parsing, + grpc_chttp2_transport_global *transport_global, const uint8_t *opaque_8bytes) { ack_ping_args *args = gpr_malloc(sizeof(*args)); - args->t = TRANSPORT_FROM_PARSING(transport_parsing); + args->t = TRANSPORT_FROM_GLOBAL(transport_global); memcpy(args->opaque_8bytes, opaque_8bytes, sizeof(args->opaque_8bytes)); grpc_closure_init(&args->closure, ack_ping_locked, args); REF_TRANSPORT(args->t, "ack_ping"); @@ -1366,7 +1337,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, while ( grpc_chttp2_list_pop_check_read_ops(transport_global, &stream_global)) { if (stream_global->recv_initial_metadata_ready != NULL && - stream_global->published_initial_metadata) { + stream_global->published_metadata[0]) { if (stream_global->seen_error) { while ((bs = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames)) != NULL) { @@ -1382,7 +1353,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, } } grpc_chttp2_incoming_metadata_buffer_publish( - &stream_global->received_initial_metadata, + &stream_global->metadata_buffer[0], stream_global->recv_initial_metadata); grpc_exec_ctx_sched(exec_ctx, stream_global->recv_initial_metadata_ready, GRPC_ERROR_NONE, NULL); @@ -1402,7 +1373,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_sched(exec_ctx, stream_global->recv_message_ready, GRPC_ERROR_NONE, NULL); stream_global->recv_message_ready = NULL; - } else if (stream_global->published_trailing_metadata) { + } else if (stream_global->published_metadata[1]) { *stream_global->recv_message = NULL; grpc_exec_ctx_sched(exec_ctx, stream_global->recv_message_ready, GRPC_ERROR_NONE, NULL); @@ -1427,7 +1398,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, } if (stream_global->all_incoming_byte_streams_finished) { grpc_chttp2_incoming_metadata_buffer_publish( - &stream_global->received_trailing_metadata, + &stream_global->metadata_buffer[0], stream_global->recv_trailing_metadata); grpc_chttp2_complete_closure_step( exec_ctx, transport_global, stream_global, @@ -1458,15 +1429,15 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } GPR_ASSERT(s); s->global.in_stream_map = false; - if (t->parsing.incoming_stream == &s->parsing) { - t->parsing.incoming_stream = NULL; - grpc_chttp2_parsing_become_skip_parser(exec_ctx, &t->parsing); + if (t->global.incoming_stream == &s->global) { + t->global.incoming_stream = NULL; + grpc_chttp2_parsing_become_skip_parser(exec_ctx, &t->global); } - if (s->parsing.data_parser.parsing_frame != NULL) { + if (s->global.data_parser.parsing_frame != NULL) { grpc_chttp2_incoming_byte_stream_finished( - exec_ctx, s->parsing.data_parser.parsing_frame, GRPC_ERROR_REF(error), + exec_ctx, s->global.data_parser.parsing_frame, GRPC_ERROR_REF(error), 0); - s->parsing.data_parser.parsing_frame = NULL; + s->global.data_parser.parsing_frame = NULL; } if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { @@ -1570,22 +1541,22 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, to the upper layers - drop what we've got, and then publish what we want - which is safe because we haven't told anyone about the metadata yet */ - if (!stream_global->published_trailing_metadata || + if (!stream_global->published_metadata[1] || stream_global->recv_trailing_metadata_finished != NULL) { char status_string[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(status, status_string); grpc_chttp2_incoming_metadata_buffer_add( - &stream_global->received_trailing_metadata, + &stream_global->metadata_buffer[1], grpc_mdelem_from_metadata_strings( GRPC_MDSTR_GRPC_STATUS, grpc_mdstr_from_string(status_string))); if (slice) { grpc_chttp2_incoming_metadata_buffer_add( - &stream_global->received_trailing_metadata, + &stream_global->metadata_buffer[1], grpc_mdelem_from_metadata_strings( GRPC_MDSTR_GRPC_MESSAGE, grpc_mdstr_from_slice(gpr_slice_ref(*slice)))); } - stream_global->published_trailing_metadata = true; + stream_global->published_metadata[1] = true; grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, stream_global); } @@ -1652,8 +1623,8 @@ void grpc_chttp2_mark_stream_closed( if (close_reads && !stream_global->read_closed) { stream_global->read_closed_error = GRPC_ERROR_REF(error); stream_global->read_closed = true; - stream_global->published_initial_metadata = true; - stream_global->published_trailing_metadata = true; + stream_global->published_metadata[0] = true; + stream_global->published_metadata[0] = true; decrement_active_streams_locked(exec_ctx, transport_global, stream_global); } if (close_writes && !stream_global->write_closed) { @@ -1851,7 +1822,7 @@ static void update_global_window(void *args, uint32_t id, void *stream) { grpc_chttp2_stream_global *stream_global = &s->global; int was_zero; int is_zero; - int64_t initial_window_update = t->parsing.initial_window_update; + int64_t initial_window_update = t->global.initial_window_update; was_zero = stream_global->outgoing_window <= 0; GRPC_CHTTP2_FLOW_CREDIT_STREAM("settings", transport_global, stream_global, @@ -1868,13 +1839,6 @@ static void update_global_window(void *args, uint32_t id, void *stream) { * INPUT PROCESSING - PARSING */ -static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); -static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); -static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); - static void reading_action(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { /* Control flow: @@ -1888,30 +1852,6 @@ static void reading_action(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_END("reading_action", 0); } -static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { - GPR_TIMER_BEGIN("reading_action_locked", 0); - - grpc_chttp2_transport *t = tp; - grpc_chttp2_transport_global *transport_global = &t->global; - grpc_chttp2_transport_parsing *transport_parsing = &t->parsing; - - GPR_ASSERT(!t->executor.parsing_active); - if (!t->closed) { - t->executor.parsing_active = 1; - /* merge stream lists */ - grpc_chttp2_stream_map_move_into(&t->new_stream_map, - &t->parsing_stream_map); - grpc_chttp2_prepare_to_read(transport_global, transport_parsing); - grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, GRPC_ERROR_REF(error), - NULL); - } else { - post_reading_action_locked(exec_ctx, t, error); - } - - GPR_TIMER_END("reading_action_locked", 0); -} - static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_http_parser parser; @@ -1939,87 +1879,86 @@ static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, return error; } -static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_chttp2_transport *t = arg; - grpc_error *err = GRPC_ERROR_NONE; - GPR_TIMER_BEGIN("reading_action.parse", 0); - size_t i = 0; - grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, - GRPC_ERROR_NONE}; - for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) { - errors[1] = grpc_chttp2_perform_read(exec_ctx, &t->parsing, - t->read_buffer.slices[i]); - }; - if (errors[1] == GRPC_ERROR_NONE) { - err = GRPC_ERROR_REF(error); - } else { - errors[2] = try_http_parsing(exec_ctx, t); - err = GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors, - GPR_ARRAY_SIZE(errors)); - } - for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) { - GRPC_ERROR_UNREF(errors[i]); - } - grpc_combiner_execute(exec_ctx, t->executor.combiner, &t->post_parse_locked, - err); - GPR_TIMER_END("reading_action.parse", 0); -} +static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp, + grpc_error *error) { + GPR_TIMER_BEGIN("reading_action_locked", 0); -static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - GPR_TIMER_BEGIN("post_parse_locked", 0); - grpc_chttp2_transport *t = arg; + grpc_chttp2_transport *t = tp; grpc_chttp2_transport_global *transport_global = &t->global; - grpc_chttp2_transport_parsing *transport_parsing = &t->parsing; - /* copy parsing qbuf to global qbuf */ - if (t->parsing.qbuf.count > 0) { - gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf); - grpc_chttp2_initiate_write(exec_ctx, transport_global, false, - "parsing_qbuf"); - } - /* merge stream lists */ - grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); - transport_global->concurrent_stream_count = - (uint32_t)grpc_chttp2_stream_map_size(&t->parsing_stream_map); - if (transport_parsing->initial_window_update != 0) { - update_global_window_args args = {t, exec_ctx}; - grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, - update_global_window, &args); - transport_parsing->initial_window_update = 0; - } - /* handle higher level things */ - grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing); - t->executor.parsing_active = 0; - /* handle delayed transport ops (if there is one) */ - if (t->post_parsing_op) { - grpc_transport_op *op = t->post_parsing_op; - t->post_parsing_op = NULL; - perform_transport_op_locked(exec_ctx, op, GRPC_ERROR_NONE); - gpr_free(op); - } - /* if a stream is in the stream map, and gets cancelled, we need to - * ensure we are not parsing before continuing the cancellation to keep - * things in a sane state */ - grpc_chttp2_stream_global *stream_global; - while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global, - &stream_global)) { - GPR_ASSERT(stream_global->in_stream_map); - GPR_ASSERT(stream_global->write_closed); - GPR_ASSERT(stream_global->read_closed); - remove_stream(exec_ctx, t, stream_global->id, - removal_error(GRPC_ERROR_NONE, stream_global)); - GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); - } - post_reading_action_locked(exec_ctx, t, error); - GPR_TIMER_END("post_parse_locked", 0); -} + GRPC_ERROR_REF(error); + + GPR_ASSERT(!t->executor.parsing_active); + if (!t->closed) { + t->executor.parsing_active = 1; + /* merge stream lists */ + grpc_chttp2_stream_map_move_into(&t->new_stream_map, + &t->parsing_stream_map); + + GPR_TIMER_BEGIN("reading_action.parse", 0); + size_t i = 0; + grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, + GRPC_ERROR_NONE}; + for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) { + errors[1] = grpc_chttp2_perform_read(exec_ctx, &t->global, + t->read_buffer.slices[i]); + }; + if (errors[1] != GRPC_ERROR_NONE) { + errors[2] = try_http_parsing(exec_ctx, t); + GRPC_ERROR_UNREF(error); + error = GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors, + GPR_ARRAY_SIZE(errors)); + } + for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) { + GRPC_ERROR_UNREF(errors[i]); + } + GPR_TIMER_END("reading_action.parse", 0); + + GPR_TIMER_BEGIN("post_parse_locked", 0); + if (transport_global->initial_window_update != 0) { + update_global_window_args args = {t, exec_ctx}; + grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, + update_global_window, &args); + transport_global->initial_window_update = 0; + } + /* handle higher level things */ + if (transport_global->incoming_window < + transport_global->connection_window_target * 3 / 4) { + int64_t announce_bytes = transport_global->connection_window_target - + transport_global->incoming_window; + GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT( + "parsed", transport_global, announce_incoming_window, announce_bytes); + GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_global, + incoming_window, announce_bytes); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "global incoming window"); + } + t->executor.parsing_active = 0; + /* handle delayed transport ops (if there is one) */ + if (t->post_parsing_op) { + grpc_transport_op *op = t->post_parsing_op; + t->post_parsing_op = NULL; + perform_transport_op_locked(exec_ctx, op, GRPC_ERROR_NONE); + gpr_free(op); + } + /* if a stream is in the stream map, and gets cancelled, we need to + * ensure we are not parsing before continuing the cancellation to keep + * things in a sane state */ + grpc_chttp2_stream_global *stream_global; + while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global, + &stream_global)) { + GPR_ASSERT(stream_global->in_stream_map); + GPR_ASSERT(stream_global->write_closed); + GPR_ASSERT(stream_global->read_closed); + remove_stream(exec_ctx, t, stream_global->id, + removal_error(GRPC_ERROR_NONE, stream_global)); + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); + } + + GPR_TIMER_END("post_parse_locked", 0); + } -static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { GPR_TIMER_BEGIN("post_reading_action_locked", 0); - grpc_chttp2_transport *t = arg; bool keep_reading = false; GRPC_ERROR_REF(error); if (error == GRPC_ERROR_NONE && t->closed) { @@ -2049,6 +1988,10 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_UNREF(error); GPR_TIMER_END("post_reading_action_locked", 0); + + GRPC_ERROR_UNREF(error); + + GPR_TIMER_END("reading_action_locked", 0); } /******************************************************************************* @@ -2125,13 +2068,10 @@ static void incoming_byte_stream_update_flow_control( GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, max_recv_bytes, add_max_recv_bytes); GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, - unannounced_incoming_window_for_parse, - add_max_recv_bytes); + incoming_window, add_max_recv_bytes); GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, unannounced_incoming_window_for_writing, add_max_recv_bytes); - grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global, - stream_global); grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, false, "read_incoming_stream"); } @@ -2259,8 +2199,8 @@ void grpc_chttp2_incoming_byte_stream_finished( } grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, uint32_t frame_size, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, uint32_t frame_size, uint32_t flags, grpc_chttp2_incoming_frame_queue *add_to_queue) { grpc_chttp2_incoming_byte_stream *incoming_byte_stream = gpr_malloc(sizeof(*incoming_byte_stream)); @@ -2271,8 +2211,8 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( gpr_mu_init(&incoming_byte_stream->slice_mu); gpr_ref_init(&incoming_byte_stream->refs, 2); incoming_byte_stream->next_message = NULL; - incoming_byte_stream->transport = TRANSPORT_FROM_PARSING(transport_parsing); - incoming_byte_stream->stream = STREAM_FROM_PARSING(stream_parsing); + incoming_byte_stream->transport = TRANSPORT_FROM_GLOBAL(transport_global); + incoming_byte_stream->stream = STREAM_FROM_GLOBAL(stream_global); gpr_ref(&incoming_byte_stream->stream->global.active_streams); gpr_slice_buffer_init(&incoming_byte_stream->slices); incoming_byte_stream->on_next = NULL; diff --git a/src/core/ext/transport/chttp2/transport/frame.h b/src/core/ext/transport/chttp2/transport/frame.h index 7776609367b..48e3ac64823 100644 --- a/src/core/ext/transport/chttp2/transport/frame.h +++ b/src/core/ext/transport/chttp2/transport/frame.h @@ -40,8 +40,8 @@ #include "src/core/lib/iomgr/error.h" /* defined in internal.h */ -typedef struct grpc_chttp2_stream_parsing grpc_chttp2_stream_parsing; -typedef struct grpc_chttp2_transport_parsing grpc_chttp2_transport_parsing; +typedef struct grpc_chttp2_stream_global grpc_chttp2_stream_global; +typedef struct grpc_chttp2_transport_global grpc_chttp2_transport_global; #define GRPC_CHTTP2_FRAME_DATA 0 #define GRPC_CHTTP2_FRAME_HEADER 1 diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c index 9046fbc453f..5c6d6f8ce1e 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.c +++ b/src/core/ext/transport/chttp2/transport/frame_data.c @@ -147,8 +147,8 @@ void grpc_chttp2_encode_data(uint32_t id, gpr_slice_buffer *inbuf, grpc_error *grpc_chttp2_data_parser_parse( grpc_exec_ctx *exec_ctx, void *parser, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) { uint8_t *const beg = GPR_SLICE_START_PTR(slice); uint8_t *const end = GPR_SLICE_END_PTR(slice); uint8_t *cur = beg; @@ -158,7 +158,8 @@ grpc_error *grpc_chttp2_data_parser_parse( char *msg; if (is_last && p->is_last_frame) { - stream_parsing->received_close = 1; + grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, + true, false, GRPC_ERROR_NONE); } if (cur == end) { @@ -171,7 +172,7 @@ grpc_error *grpc_chttp2_data_parser_parse( return GRPC_ERROR_REF(p->error); fh_0: case GRPC_CHTTP2_DATA_FH_0: - stream_parsing->stats.incoming.framing_bytes++; + stream_global->stats.incoming.framing_bytes++; p->frame_type = *cur; switch (p->frame_type) { case 0: @@ -184,7 +185,7 @@ grpc_error *grpc_chttp2_data_parser_parse( gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type); p->error = GRPC_ERROR_CREATE(msg); p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID, - (intptr_t)stream_parsing->id); + (intptr_t)stream_global->id); gpr_free(msg); msg = gpr_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); p->error = @@ -201,7 +202,7 @@ grpc_error *grpc_chttp2_data_parser_parse( } /* fallthrough */ case GRPC_CHTTP2_DATA_FH_1: - stream_parsing->stats.incoming.framing_bytes++; + stream_global->stats.incoming.framing_bytes++; p->frame_size = ((uint32_t)*cur) << 24; if (++cur == end) { p->state = GRPC_CHTTP2_DATA_FH_2; @@ -209,7 +210,7 @@ grpc_error *grpc_chttp2_data_parser_parse( } /* fallthrough */ case GRPC_CHTTP2_DATA_FH_2: - stream_parsing->stats.incoming.framing_bytes++; + stream_global->stats.incoming.framing_bytes++; p->frame_size |= ((uint32_t)*cur) << 16; if (++cur == end) { p->state = GRPC_CHTTP2_DATA_FH_3; @@ -217,7 +218,7 @@ grpc_error *grpc_chttp2_data_parser_parse( } /* fallthrough */ case GRPC_CHTTP2_DATA_FH_3: - stream_parsing->stats.incoming.framing_bytes++; + stream_global->stats.incoming.framing_bytes++; p->frame_size |= ((uint32_t)*cur) << 8; if (++cur == end) { p->state = GRPC_CHTTP2_DATA_FH_4; @@ -225,7 +226,7 @@ grpc_error *grpc_chttp2_data_parser_parse( } /* fallthrough */ case GRPC_CHTTP2_DATA_FH_4: - stream_parsing->stats.incoming.framing_bytes++; + stream_global->stats.incoming.framing_bytes++; p->frame_size |= ((uint32_t)*cur); p->state = GRPC_CHTTP2_DATA_FRAME; ++cur; @@ -235,18 +236,16 @@ grpc_error *grpc_chttp2_data_parser_parse( } p->parsing_frame = incoming_byte_stream = grpc_chttp2_incoming_byte_stream_create( - exec_ctx, transport_parsing, stream_parsing, p->frame_size, + exec_ctx, transport_global, stream_global, p->frame_size, message_flags, &p->incoming_frames); /* fallthrough */ case GRPC_CHTTP2_DATA_FRAME: - grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, - stream_parsing); if (cur == end) { return GRPC_ERROR_NONE; } uint32_t remaining = (uint32_t)(end - cur); if (remaining == p->frame_size) { - stream_parsing->stats.incoming.data_bytes += p->frame_size; + stream_global->stats.incoming.data_bytes += p->frame_size; grpc_chttp2_incoming_byte_stream_push( exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); @@ -256,7 +255,7 @@ grpc_error *grpc_chttp2_data_parser_parse( p->state = GRPC_CHTTP2_DATA_FH_0; return GRPC_ERROR_NONE; } else if (remaining > p->frame_size) { - stream_parsing->stats.incoming.data_bytes += p->frame_size; + stream_global->stats.incoming.data_bytes += p->frame_size; grpc_chttp2_incoming_byte_stream_push( exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), @@ -272,7 +271,7 @@ grpc_error *grpc_chttp2_data_parser_parse( exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); p->frame_size -= remaining; - stream_parsing->stats.incoming.data_bytes += remaining; + stream_global->stats.incoming.data_bytes += remaining; return GRPC_ERROR_NONE; } } diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h index a21a7942b94..b0c1cad9760 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.h +++ b/src/core/ext/transport/chttp2/transport/frame_data.h @@ -94,8 +94,8 @@ grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser, frame */ grpc_error *grpc_chttp2_data_parser_parse( grpc_exec_ctx *exec_ctx, void *parser, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last); void grpc_chttp2_encode_data(uint32_t id, gpr_slice_buffer *inbuf, uint32_t write_bytes, int is_eof, diff --git a/src/core/ext/transport/chttp2/transport/frame_goaway.c b/src/core/ext/transport/chttp2/transport/frame_goaway.c index 299e27ad702..e40c5e393bc 100644 --- a/src/core/ext/transport/chttp2/transport/frame_goaway.c +++ b/src/core/ext/transport/chttp2/transport/frame_goaway.c @@ -69,8 +69,8 @@ grpc_error *grpc_chttp2_goaway_parser_begin_frame(grpc_chttp2_goaway_parser *p, grpc_error *grpc_chttp2_goaway_parser_parse( grpc_exec_ctx *exec_ctx, void *parser, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) { uint8_t *const beg = GPR_SLICE_START_PTR(slice); uint8_t *const end = GPR_SLICE_END_PTR(slice); uint8_t *cur = beg; @@ -148,12 +148,9 @@ grpc_error *grpc_chttp2_goaway_parser_parse( p->debug_pos += (uint32_t)(end - cur); p->state = GRPC_CHTTP2_GOAWAY_DEBUG; if (is_last) { - transport_parsing->goaway_received = 1; - transport_parsing->goaway_last_stream_index = p->last_stream_id; - gpr_slice_unref(transport_parsing->goaway_text); - transport_parsing->goaway_error = (grpc_status_code)p->error_code; - transport_parsing->goaway_text = - gpr_slice_new(p->debug_data, p->debug_length, gpr_free); + grpc_chttp2_add_incoming_goaway( + exec_ctx, transport_global, (uint32_t)p->error_code, + gpr_slice_new(p->debug_data, p->debug_length, gpr_free)); p->debug_data = NULL; } return GRPC_ERROR_NONE; diff --git a/src/core/ext/transport/chttp2/transport/frame_goaway.h b/src/core/ext/transport/chttp2/transport/frame_goaway.h index eb4303405ae..f8212299310 100644 --- a/src/core/ext/transport/chttp2/transport/frame_goaway.h +++ b/src/core/ext/transport/chttp2/transport/frame_goaway.h @@ -67,8 +67,8 @@ grpc_error *grpc_chttp2_goaway_parser_begin_frame( grpc_chttp2_goaway_parser *parser, uint32_t length, uint8_t flags); grpc_error *grpc_chttp2_goaway_parser_parse( grpc_exec_ctx *exec_ctx, void *parser, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last); void grpc_chttp2_goaway_append(uint32_t last_stream_id, uint32_t error_code, gpr_slice debug_data, diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c index 1f814ab1bdc..65b7cec986c 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.c +++ b/src/core/ext/transport/chttp2/transport/frame_ping.c @@ -75,8 +75,8 @@ grpc_error *grpc_chttp2_ping_parser_begin_frame(grpc_chttp2_ping_parser *parser, grpc_error *grpc_chttp2_ping_parser_parse( grpc_exec_ctx *exec_ctx, void *parser, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) { uint8_t *const beg = GPR_SLICE_START_PTR(slice); uint8_t *const end = GPR_SLICE_END_PTR(slice); uint8_t *cur = beg; @@ -91,9 +91,9 @@ grpc_error *grpc_chttp2_ping_parser_parse( if (p->byte == 8) { GPR_ASSERT(is_last); if (p->is_ack) { - grpc_chttp2_ack_ping(exec_ctx, transport_parsing, p->opaque_8bytes); + grpc_chttp2_ack_ping(exec_ctx, transport_global, p->opaque_8bytes); } else { - gpr_slice_buffer_add(&transport_parsing->qbuf, + gpr_slice_buffer_add(&transport_global->qbuf, grpc_chttp2_ping_create(1, p->opaque_8bytes)); } } diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.h b/src/core/ext/transport/chttp2/transport/frame_ping.h index 5a8723421c2..c2c4fb2ee5d 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.h +++ b/src/core/ext/transport/chttp2/transport/frame_ping.h @@ -50,7 +50,7 @@ grpc_error *grpc_chttp2_ping_parser_begin_frame(grpc_chttp2_ping_parser *parser, uint32_t length, uint8_t flags); grpc_error *grpc_chttp2_ping_parser_parse( grpc_exec_ctx *exec_ctx, void *parser, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_PING_H */ diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c index e3a3c9e4a7c..7be664ced47 100644 --- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c +++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c @@ -39,6 +39,8 @@ #include #include "src/core/ext/transport/chttp2/transport/frame.h" +#include "src/core/ext/transport/chttp2/transport/http2_errors.h" +#include "src/core/ext/transport/chttp2/transport/status_conversion.h" gpr_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code, grpc_transport_one_way_stats *stats) { @@ -85,8 +87,8 @@ grpc_error *grpc_chttp2_rst_stream_parser_begin_frame( grpc_error *grpc_chttp2_rst_stream_parser_parse( grpc_exec_ctx *exec_ctx, void *parser, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) { uint8_t *const beg = GPR_SLICE_START_PTR(slice); uint8_t *const end = GPR_SLICE_END_PTR(slice); uint8_t *cur = beg; @@ -97,19 +99,30 @@ grpc_error *grpc_chttp2_rst_stream_parser_parse( cur++; p->byte++; } - stream_parsing->stats.incoming.framing_bytes += (uint64_t)(end - cur); + stream_global->stats.incoming.framing_bytes += (uint64_t)(end - cur); if (p->byte == 4) { GPR_ASSERT(is_last); - stream_parsing->received_close = 1; - if (stream_parsing->forced_close_error == GRPC_ERROR_NONE) { - stream_parsing->forced_close_error = grpc_error_set_int( - GRPC_ERROR_CREATE("RST_STREAM"), GRPC_ERROR_INT_HTTP2_ERROR, - (intptr_t)((((uint32_t)p->reason_bytes[0]) << 24) | - (((uint32_t)p->reason_bytes[1]) << 16) | - (((uint32_t)p->reason_bytes[2]) << 8) | - (((uint32_t)p->reason_bytes[3])))); + uint32_t reason = (((uint32_t)p->reason_bytes[0]) << 24) | + (((uint32_t)p->reason_bytes[1]) << 16) | + (((uint32_t)p->reason_bytes[2]) << 8) | + (((uint32_t)p->reason_bytes[3])); + grpc_error *error = GRPC_ERROR_NONE; + if (reason != GRPC_CHTTP2_NO_ERROR) { + error = grpc_error_set_int(GRPC_ERROR_CREATE("RST_STREAM"), + GRPC_ERROR_INT_HTTP2_ERROR, reason); + grpc_status_code status_code = grpc_chttp2_http2_error_to_grpc_status( + (grpc_chttp2_error_code)reason, stream_global->deadline); + char *status_details; + gpr_asprintf(&status_details, "Received RST_STREAM with error code %d", + reason); + gpr_slice slice_details = gpr_slice_from_copied_string(status_details); + gpr_free(status_details); + grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, + status_code, &slice_details); } + grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, + true, true, error); } return GRPC_ERROR_NONE; diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.h b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h index 11cf94f3ea7..64a6a273411 100644 --- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.h +++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h @@ -51,7 +51,7 @@ grpc_error *grpc_chttp2_rst_stream_parser_begin_frame( grpc_chttp2_rst_stream_parser *parser, uint32_t length, uint8_t flags); grpc_error *grpc_chttp2_rst_stream_parser_parse( grpc_exec_ctx *exec_ctx, void *parser, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_RST_STREAM_H */ diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.c b/src/core/ext/transport/chttp2/transport/frame_settings.c index 04b96c4cd95..32e6aa545f9 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.c +++ b/src/core/ext/transport/chttp2/transport/frame_settings.c @@ -145,8 +145,8 @@ grpc_error *grpc_chttp2_settings_parser_begin_frame( grpc_error *grpc_chttp2_settings_parser_parse( grpc_exec_ctx *exec_ctx, void *p, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) { grpc_chttp2_settings_parser *parser = p; const uint8_t *cur = GPR_SLICE_START_PTR(slice); const uint8_t *end = GPR_SLICE_END_PTR(slice); @@ -162,10 +162,9 @@ grpc_error *grpc_chttp2_settings_parser_parse( if (cur == end) { parser->state = GRPC_CHTTP2_SPS_ID0; if (is_last) { - transport_parsing->settings_updated = 1; memcpy(parser->target_settings, parser->incoming_settings, GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t)); - gpr_slice_buffer_add(&transport_parsing->qbuf, + gpr_slice_buffer_add(&transport_global->qbuf, grpc_chttp2_settings_ack_create()); } return GRPC_ERROR_NONE; @@ -226,9 +225,9 @@ grpc_error *grpc_chttp2_settings_parser_parse( break; case GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE: grpc_chttp2_goaway_append( - transport_parsing->last_incoming_stream_id, sp->error_value, + transport_global->last_incoming_stream_id, sp->error_value, gpr_slice_from_static_string("HTTP2 settings error"), - &transport_parsing->qbuf); + &transport_global->qbuf); gpr_asprintf(&msg, "invalid value %u passed for %s", parser->value, sp->name); grpc_error *err = GRPC_ERROR_CREATE(msg); @@ -238,17 +237,17 @@ grpc_error *grpc_chttp2_settings_parser_parse( } if (parser->id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE && parser->incoming_settings[parser->id] != parser->value) { - transport_parsing->initial_window_update = + transport_global->initial_window_update = (int64_t)parser->value - parser->incoming_settings[parser->id]; if (grpc_http_trace) { gpr_log(GPR_DEBUG, "adding %d for initial_window change", - (int)transport_parsing->initial_window_update); + (int)transport_global->initial_window_update); } } parser->incoming_settings[parser->id] = parser->value; if (grpc_http_trace) { gpr_log(GPR_DEBUG, "CHTTP2:%s: got setting %d = %d", - transport_parsing->is_client ? "CLI" : "SVR", parser->id, + transport_global->is_client ? "CLI" : "SVR", parser->id, parser->value); } } else if (grpc_http_trace) { diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.h b/src/core/ext/transport/chttp2/transport/frame_settings.h index f654c598c8e..c19f8067cce 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.h +++ b/src/core/ext/transport/chttp2/transport/frame_settings.h @@ -97,7 +97,7 @@ grpc_error *grpc_chttp2_settings_parser_begin_frame( uint32_t *settings); grpc_error *grpc_chttp2_settings_parser_parse( grpc_exec_ctx *exec_ctx, void *parser, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_SETTINGS_H */ diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c index 3cf848fd5cb..26bd73f0afb 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.c +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c @@ -81,8 +81,8 @@ grpc_error *grpc_chttp2_window_update_parser_begin_frame( grpc_error *grpc_chttp2_window_update_parser_parse( grpc_exec_ctx *exec_ctx, void *parser, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) { uint8_t *const beg = GPR_SLICE_START_PTR(slice); uint8_t *const end = GPR_SLICE_END_PTR(slice); uint8_t *cur = beg; @@ -94,8 +94,8 @@ grpc_error *grpc_chttp2_window_update_parser_parse( p->byte++; } - if (stream_parsing != NULL) { - stream_parsing->stats.incoming.framing_bytes += (uint32_t)(end - cur); + if (stream_global != NULL) { + stream_global->stats.incoming.framing_bytes += (uint32_t)(end - cur); } if (p->byte == 4) { @@ -109,17 +109,26 @@ grpc_error *grpc_chttp2_window_update_parser_parse( } GPR_ASSERT(is_last); - if (transport_parsing->incoming_stream_id != 0) { - if (stream_parsing != NULL) { - GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", transport_parsing, - stream_parsing, outgoing_window, - received_update); - grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, - stream_parsing); + if (transport_global->incoming_stream_id != 0) { + if (stream_global != NULL) { + bool was_zero = stream_global->outgoing_window <= 0; + GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", transport_global, stream_global, + outgoing_window, received_update); + bool is_zero = stream_global->outgoing_window <= 0; + if (was_zero && !is_zero) { + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + false, "stream.read_flow_control"); + } } } else { - GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parse", transport_parsing, + bool was_zero = transport_global->outgoing_window <= 0; + GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parse", transport_global, outgoing_window, received_update); + bool is_zero = transport_global->outgoing_window <= 0; + if (was_zero && !is_zero) { + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "new_global_flow_control"); + } } } diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.h b/src/core/ext/transport/chttp2/transport/frame_window_update.h index 1bcbbf92478..36ae6bd0b19 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.h +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.h @@ -52,7 +52,7 @@ grpc_error *grpc_chttp2_window_update_parser_begin_frame( grpc_chttp2_window_update_parser *parser, uint32_t length, uint8_t flags); grpc_error *grpc_chttp2_window_update_parser_parse( grpc_exec_ctx *exec_ctx, void *parser, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_WINDOW_UPDATE_H */ diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 522455f7dca..a40591c8092 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1496,12 +1496,12 @@ grpc_error *grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p, grpc_error *grpc_chttp2_header_parser_parse( grpc_exec_ctx *exec_ctx, void *hpack_parser, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) { grpc_chttp2_hpack_parser *parser = hpack_parser; GPR_TIMER_BEGIN("grpc_chttp2_hpack_parser_parse", 0); - if (stream_parsing != NULL) { - stream_parsing->stats.incoming.header_bytes += GPR_SLICE_LENGTH(slice); + if (stream_global != NULL) { + stream_global->stats.incoming.header_bytes += GPR_SLICE_LENGTH(slice); } grpc_error *error = grpc_chttp2_hpack_parser_parse( parser, GPR_SLICE_START_PTR(slice), GPR_SLICE_END_PTR(slice)); @@ -1517,20 +1517,22 @@ grpc_error *grpc_chttp2_header_parser_parse( } /* need to check for null stream: this can occur if we receive an invalid stream id on a header */ - if (stream_parsing != NULL) { + if (stream_global != NULL) { if (parser->is_boundary) { - if (stream_parsing->header_frames_received == - GPR_ARRAY_SIZE(stream_parsing->got_metadata_on_parse)) { + if (stream_global->header_frames_received == + GPR_ARRAY_SIZE(stream_global->metadata_buffer)) { return GRPC_ERROR_CREATE("Too many trailer frames"); } - stream_parsing - ->got_metadata_on_parse[stream_parsing->header_frames_received] = 1; - stream_parsing->header_frames_received++; - grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, - stream_parsing); + stream_global + ->published_metadata[stream_global->header_frames_received] = true; + stream_global->header_frames_received++; + grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, + stream_global); } if (parser->is_eof) { - stream_parsing->received_close = 1; + grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, + stream_global, true, false, + GRPC_ERROR_NONE); } } parser->on_header = NULL; diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.h b/src/core/ext/transport/chttp2/transport/hpack_parser.h index 78eb38db5ec..cbcf12ffed2 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.h +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.h @@ -112,7 +112,7 @@ grpc_error *grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p, the transport */ grpc_error *grpc_chttp2_header_parser_parse( grpc_exec_ctx *exec_ctx, void *hpack_parser, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_PARSER_H */ diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index b72cd61fcf9..f3c3e1d2fe5 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -61,11 +61,9 @@ typedef struct grpc_chttp2_stream grpc_chttp2_stream; typedef enum { GRPC_CHTTP2_LIST_ALL_STREAMS, GRPC_CHTTP2_LIST_CHECK_READ_OPS, - GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE, GRPC_CHTTP2_LIST_WRITABLE, GRPC_CHTTP2_LIST_WRITING, GRPC_CHTTP2_LIST_WRITTEN, - GRPC_CHTTP2_LIST_PARSING_SEEN, GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING, GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_WRITING, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT, @@ -178,7 +176,7 @@ struct grpc_chttp2_incoming_byte_stream { grpc_closure finished_action; }; -typedef struct { +struct grpc_chttp2_transport_global { /** data to write next write */ gpr_slice_buffer qbuf; @@ -223,36 +221,7 @@ typedef struct { /** concurrent stream count: updated when not parsing, so this is a strict over-estimation on the client */ uint32_t concurrent_stream_count; -} grpc_chttp2_transport_global; - -typedef struct { - /** data to write now */ - gpr_slice_buffer outbuf; - /** hpack encoding */ - grpc_chttp2_hpack_compressor hpack_compressor; - int64_t outgoing_window; - /** is this a client? */ - uint8_t is_client; - /** callback for when writing is done */ - grpc_closure done_cb; -} grpc_chttp2_transport_writing; - -struct grpc_chttp2_transport_parsing { - /** is this transport a client? (boolean) */ - uint8_t is_client; - - /** were settings updated? */ - uint8_t settings_updated; - /** was a settings ack received? */ - uint8_t settings_ack_received; - /** was a goaway frame received? */ - uint8_t goaway_received; - /** initial window change */ - int64_t initial_window_update; - - /** data to write later - after parsing */ - gpr_slice_buffer qbuf; /** parser for headers */ grpc_chttp2_hpack_parser hpack_parser; /** simple one shot parsers */ @@ -265,13 +234,12 @@ struct grpc_chttp2_transport_parsing { /** parser for goaway frames */ grpc_chttp2_goaway_parser goaway_parser; + /** initial window change */ + int64_t initial_window_update; + /** window available for peer to send to us */ int64_t incoming_window; - /** next stream id available at the time of beginning parsing */ - uint32_t next_stream_id; - uint32_t last_incoming_stream_id; - /* deframing */ grpc_chttp2_deframe_transport_state deframe_state; uint8_t incoming_frame_type; @@ -282,29 +250,36 @@ struct grpc_chttp2_transport_parsing { uint32_t incoming_frame_size; uint32_t incoming_stream_id; - /* current max frame size */ - uint32_t max_frame_size; - /* active parser */ void *parser_data; - grpc_chttp2_stream_parsing *incoming_stream; + grpc_chttp2_stream_global *incoming_stream; grpc_error *(*parser)(grpc_exec_ctx *exec_ctx, void *parser_user_data, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last); - /* received settings */ - uint32_t settings[GRPC_CHTTP2_NUM_SETTINGS]; - /* last settings that were sent */ - uint32_t last_sent_settings[GRPC_CHTTP2_NUM_SETTINGS]; - /* goaway data */ grpc_status_code goaway_error; uint32_t goaway_last_stream_index; gpr_slice goaway_text; +}; +typedef struct { + /** data to write now */ + gpr_slice_buffer outbuf; + /** hpack encoding */ + grpc_chttp2_hpack_compressor hpack_compressor; int64_t outgoing_window; + /** is this a client? */ + uint8_t is_client; + /** callback for when writing is done */ + grpc_closure done_cb; +} grpc_chttp2_transport_writing; + +#if 0 +struct grpc_chttp2_transport_parsing { }; +#endif typedef enum { /** no writing activity allowed */ @@ -360,9 +335,6 @@ struct grpc_chttp2_transport { set writing_state >= GRPC_WRITING, and only by the writing closure chain. */ grpc_chttp2_transport_writing writing; - /** state only accessible by the chain of execution that - set parsing_active=1 */ - grpc_chttp2_transport_parsing parsing; /** maps stream id to grpc_chttp2_stream objects; owned by the parsing thread when parsing */ @@ -378,9 +350,6 @@ struct grpc_chttp2_transport { /** closure to start reading from the endpoint */ grpc_closure reading_action; grpc_closure reading_action_locked; - grpc_closure post_parse_locked; - /** closure to actually do parsing */ - grpc_closure parsing_action; /** closure to initiate writing */ grpc_closure initiate_writing; /** closure to finish writing */ @@ -410,7 +379,7 @@ struct grpc_chttp2_transport { grpc_transport_op *post_parsing_op; }; -typedef struct { +struct grpc_chttp2_stream_global { /** HTTP2 stream id for this stream, or zero if one has not been assigned */ uint32_t id; @@ -424,7 +393,8 @@ typedef struct { not yet announced to HTTP2 flow control. As the upper layers offer to read more bytes, this value increases. As we advertise incoming flow control window, this value decreases. */ - uint32_t unannounced_incoming_window_for_parse; + /* TODO(ctiller): remove this, it's equivalent to incoming_window now + uint32_t unannounced_incoming_window_for_parse; */ uint32_t unannounced_incoming_window_for_writing; /** things the upper layers would like to send */ grpc_metadata_batch *send_initial_metadata; @@ -465,17 +435,26 @@ typedef struct { /** the error that resulted in this stream being write-closed */ grpc_error *write_closed_error; - bool published_initial_metadata; - bool published_trailing_metadata; + bool published_metadata[2]; bool final_metadata_requested; - grpc_chttp2_incoming_metadata_buffer received_initial_metadata; - grpc_chttp2_incoming_metadata_buffer received_trailing_metadata; + grpc_chttp2_incoming_metadata_buffer metadata_buffer[2]; grpc_chttp2_incoming_frame_queue incoming_frames; gpr_timespec deadline; -} grpc_chttp2_stream_global; + + /** saw some stream level error */ + grpc_error *forced_close_error; + /** how many header frames have we received? */ + uint8_t header_frames_received; + /** window available for peer to send to us */ + int64_t incoming_window; + /** parsing state for data frames */ + grpc_chttp2_data_parser data_parser; + /** number of bytes received - reset at end of parse thread execution */ + int64_t received_bytes; +}; typedef struct { /** HTTP2 stream id for this stream, or zero if one has not been assigned */ @@ -500,41 +479,19 @@ typedef struct { grpc_transport_one_way_stats stats; } grpc_chttp2_stream_writing; +#if 0 struct grpc_chttp2_stream_parsing { - /** saw some stream level error */ - grpc_error *forced_close_error; - /** HTTP2 stream id for this stream, or zero if one has not been assigned */ - uint32_t id; - /** has this stream received a close */ - uint8_t received_close; - /** how many header frames have we received? */ - uint8_t header_frames_received; - /** which metadata did we get (on this parse) */ - uint8_t got_metadata_on_parse[2]; - /** should we raise the seen_error flag in transport_global */ - bool seen_error; - bool exceeded_metadata_size; - /** window available for peer to send to us */ - int64_t incoming_window; - /** parsing state for data frames */ - grpc_chttp2_data_parser data_parser; - /** amount of window given */ - int64_t outgoing_window; - /** number of bytes received - reset at end of parse thread execution */ - int64_t received_bytes; - /** stats gathered during the parse */ - grpc_transport_stream_stats stats; /** incoming metadata */ grpc_chttp2_incoming_metadata_buffer metadata_buffer[2]; }; +#endif struct grpc_chttp2_stream { grpc_chttp2_transport *t; grpc_stream_refcount *refcount; grpc_chttp2_stream_global global; grpc_chttp2_stream_writing writing; - grpc_chttp2_stream_parsing parsing; grpc_closure init_stream; grpc_closure destroy_stream; @@ -574,16 +531,11 @@ void grpc_chttp2_cleanup_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); -void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global, - grpc_chttp2_transport_parsing *parsing); /** Process one slice of incoming data; return 1 if the connection is still viable after reading, or 0 if the connection should be torn down */ grpc_error *grpc_chttp2_perform_read( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, gpr_slice slice); -void grpc_chttp2_publish_reads(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport_global *global, - grpc_chttp2_transport_parsing *parsing); bool grpc_chttp2_list_add_writable_stream( grpc_chttp2_transport_global *transport_global, @@ -617,15 +569,6 @@ int grpc_chttp2_list_pop_written_stream( grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing); -void grpc_chttp2_list_add_parsing_seen_stream( - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing); -int grpc_chttp2_list_pop_parsing_seen_stream( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_global **stream_global, - grpc_chttp2_stream_parsing **stream_parsing); - void grpc_chttp2_list_add_waiting_for_concurrency( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); @@ -659,18 +602,6 @@ void grpc_chttp2_list_remove_stalled_by_transport( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); -void grpc_chttp2_list_add_unannounced_incoming_window_available( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global); -void grpc_chttp2_list_remove_unannounced_incoming_window_available( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global); -int grpc_chttp2_list_pop_unannounced_incoming_window_available( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_global **stream_global, - grpc_chttp2_stream_parsing **stream_parsing); - void grpc_chttp2_list_add_closed_waiting_for_parsing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); @@ -685,10 +616,10 @@ int grpc_chttp2_list_pop_closed_waiting_for_writing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); -grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( - grpc_chttp2_transport_parsing *transport_parsing, uint32_t id); -grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, +grpc_chttp2_stream_global *grpc_chttp2_parsing_lookup_stream( + grpc_chttp2_transport_global *transport_global, uint32_t id); +grpc_chttp2_stream_global *grpc_chttp2_parsing_accept_stream( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, uint32_t id); void grpc_chttp2_add_incoming_goaway( @@ -707,7 +638,7 @@ void grpc_chttp2_for_all_streams( grpc_chttp2_stream_global *stream_global)); void grpc_chttp2_parsing_become_skip_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global); void grpc_chttp2_complete_closure_step( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, @@ -835,8 +766,8 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, #endif grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, uint32_t frame_size, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, uint32_t frame_size, uint32_t flags, grpc_chttp2_incoming_frame_queue *add_to_queue); void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, @@ -846,7 +777,7 @@ void grpc_chttp2_incoming_byte_stream_finished( grpc_error *error, int from_parsing_thread); void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport_parsing *parsing, + grpc_chttp2_transport_global *parsing, const uint8_t *opaque_8bytes); /** add a ref to the stream and add it to the writable list; diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 0e6d579ba9b..ee01d3beb7f 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -45,226 +45,33 @@ #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/timeout_encoding.h" -#define TRANSPORT_FROM_PARSING(tp) \ - ((grpc_chttp2_transport *)((char *)(tp)-offsetof(grpc_chttp2_transport, \ - parsing))) - static grpc_error *init_frame_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global); static grpc_error *init_header_frame_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, int is_continuation); static grpc_error *init_data_frame_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global); static grpc_error *init_rst_stream_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global); static grpc_error *init_settings_frame_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global); static grpc_error *init_window_update_frame_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global); static grpc_error *init_ping_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global); static grpc_error *init_goaway_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global); static grpc_error *init_skip_frame_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, int is_header); static grpc_error *parse_frame_slice( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, - gpr_slice slice, int is_last); - -void grpc_chttp2_prepare_to_read( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_transport_parsing *transport_parsing) { - grpc_chttp2_stream_global *stream_global; - grpc_chttp2_stream_parsing *stream_parsing; - - GPR_TIMER_BEGIN("grpc_chttp2_prepare_to_read", 0); - - transport_parsing->next_stream_id = transport_global->next_stream_id; - memcpy(transport_parsing->last_sent_settings, - transport_global->settings[GRPC_SENT_SETTINGS], - sizeof(transport_parsing->last_sent_settings)); - transport_parsing->max_frame_size = - transport_global->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]; - - /* update the parsing view of incoming window */ - while (grpc_chttp2_list_pop_unannounced_incoming_window_available( - transport_global, transport_parsing, &stream_global, &stream_parsing)) { - GRPC_CHTTP2_FLOW_MOVE_STREAM("parse", transport_parsing, stream_parsing, - incoming_window, stream_global, - unannounced_incoming_window_for_parse); - } - - GPR_TIMER_END("grpc_chttp2_prepare_to_read", 0); -} - -void grpc_chttp2_publish_reads( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, - grpc_chttp2_transport_parsing *transport_parsing) { - grpc_chttp2_stream_global *stream_global; - grpc_chttp2_stream_parsing *stream_parsing; - int was_zero; - int is_zero; - - /* transport_parsing->last_incoming_stream_id is used as - last-grpc_chttp2_stream-id when - sending GOAWAY frame. - https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8 - says that last-grpc_chttp2_stream-id is peer-initiated grpc_chttp2_stream - ID. So, - since we don't have server pushed streams, client should send - GOAWAY last-grpc_chttp2_stream-id=0 in this case. */ - if (!transport_parsing->is_client) { - transport_global->last_incoming_stream_id = - transport_parsing->last_incoming_stream_id; - } - - /* update global settings */ - if (transport_parsing->settings_updated) { - memcpy(transport_global->settings[GRPC_PEER_SETTINGS], - transport_parsing->settings, sizeof(transport_parsing->settings)); - transport_parsing->settings_updated = 0; - } - - /* update settings based on ack if received */ - if (transport_parsing->settings_ack_received) { - memcpy(transport_global->settings[GRPC_ACKED_SETTINGS], - transport_global->settings[GRPC_SENT_SETTINGS], - GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t)); - transport_parsing->settings_ack_received = 0; - transport_global->sent_local_settings = 0; - } - - /* move goaway to the global state if we received one (it will be - published later */ - if (transport_parsing->goaway_received) { - grpc_chttp2_add_incoming_goaway(exec_ctx, transport_global, - (uint32_t)transport_parsing->goaway_error, - transport_parsing->goaway_text); - transport_parsing->goaway_text = gpr_empty_slice(); - transport_parsing->goaway_received = 0; - } - - /* propagate flow control tokens to global state */ - was_zero = transport_global->outgoing_window <= 0; - GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("parsed", transport_global, outgoing_window, - transport_parsing, outgoing_window); - is_zero = transport_global->outgoing_window <= 0; - if (was_zero && !is_zero) { - grpc_chttp2_initiate_write(exec_ctx, transport_global, false, - "new_global_flow_control"); - } - - if (transport_parsing->incoming_window < - transport_global->connection_window_target * 3 / 4) { - int64_t announce_bytes = transport_global->connection_window_target - - transport_parsing->incoming_window; - GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_global, - announce_incoming_window, announce_bytes); - GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_parsing, - incoming_window, announce_bytes); - grpc_chttp2_initiate_write(exec_ctx, transport_global, false, - "global incoming window"); - } - - /* for each stream that saw an update, fixup global state */ - while (grpc_chttp2_list_pop_parsing_seen_stream( - transport_global, transport_parsing, &stream_global, &stream_parsing)) { - if (stream_parsing->seen_error) { - stream_global->seen_error = true; - stream_global->exceeded_metadata_size = - stream_parsing->exceeded_metadata_size; - grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, - stream_global); - } - - /* flush stats to global stream state */ - grpc_transport_move_stats(&stream_parsing->stats, &stream_global->stats); - - /* update outgoing flow control window */ - was_zero = stream_global->outgoing_window <= 0; - GRPC_CHTTP2_FLOW_MOVE_STREAM("parsed", transport_global, stream_global, - outgoing_window, stream_parsing, - outgoing_window); - is_zero = stream_global->outgoing_window <= 0; - if (was_zero && !is_zero) { - grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, - false, "stream.read_flow_control"); - } - - stream_global->max_recv_bytes -= (uint32_t)GPR_MIN( - stream_global->max_recv_bytes, stream_parsing->received_bytes); - stream_parsing->received_bytes = 0; - - /* publish incoming stream ops */ - if (stream_global->incoming_frames.tail != NULL) { - stream_global->incoming_frames.tail->is_tail = 0; - } - if (stream_parsing->data_parser.incoming_frames.head != NULL) { - grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, - stream_global); - } - grpc_chttp2_incoming_frame_queue_merge( - &stream_global->incoming_frames, - &stream_parsing->data_parser.incoming_frames); - if (stream_global->incoming_frames.tail != NULL) { - stream_global->incoming_frames.tail->is_tail = 1; - } - - if (!stream_global->published_initial_metadata && - stream_parsing->got_metadata_on_parse[0]) { - stream_parsing->got_metadata_on_parse[0] = 0; - stream_global->published_initial_metadata = 1; - GPR_SWAP(grpc_chttp2_incoming_metadata_buffer, - stream_parsing->metadata_buffer[0], - stream_global->received_initial_metadata); - grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, - stream_global); - } - if (!stream_global->published_trailing_metadata && - stream_parsing->got_metadata_on_parse[1]) { - stream_parsing->got_metadata_on_parse[1] = 0; - stream_global->published_trailing_metadata = 1; - GPR_SWAP(grpc_chttp2_incoming_metadata_buffer, - stream_parsing->metadata_buffer[1], - stream_global->received_trailing_metadata); - grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, - stream_global); - } - - if (stream_parsing->forced_close_error != GRPC_ERROR_NONE) { - intptr_t reason; - bool has_reason = grpc_error_get_int(stream_parsing->forced_close_error, - GRPC_ERROR_INT_HTTP2_ERROR, &reason); - if (has_reason && reason != GRPC_CHTTP2_NO_ERROR) { - grpc_status_code status_code = - has_reason - ? grpc_chttp2_http2_error_to_grpc_status( - (grpc_chttp2_error_code)reason, stream_global->deadline) - : GRPC_STATUS_INTERNAL; - const char *status_details = - grpc_error_string(stream_parsing->forced_close_error); - gpr_slice slice_details = gpr_slice_from_copied_string(status_details); - grpc_error_free_string(status_details); - grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, - status_code, &slice_details); - } - grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, - 1, 1, stream_parsing->forced_close_error); - } - - if (stream_parsing->received_close) { - grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, - 1, 0, GRPC_ERROR_NONE); - } - } -} + gpr_slice slice, int is_last); grpc_error *grpc_chttp2_perform_read( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, gpr_slice slice) { uint8_t *beg = GPR_SLICE_START_PTR(slice); uint8_t *end = GPR_SLICE_END_PTR(slice); @@ -273,7 +80,7 @@ grpc_error *grpc_chttp2_perform_read( if (cur == end) return GRPC_ERROR_NONE; - switch (transport_parsing->deframe_state) { + switch (transport_global->deframe_state) { case GRPC_DTS_CLIENT_PREFIX_0: case GRPC_DTS_CLIENT_PREFIX_1: case GRPC_DTS_CLIENT_PREFIX_2: @@ -298,25 +105,25 @@ grpc_error *grpc_chttp2_perform_read( case GRPC_DTS_CLIENT_PREFIX_21: case GRPC_DTS_CLIENT_PREFIX_22: case GRPC_DTS_CLIENT_PREFIX_23: - while (cur != end && transport_parsing->deframe_state != GRPC_DTS_FH_0) { - if (*cur != GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing + while (cur != end && transport_global->deframe_state != GRPC_DTS_FH_0) { + if (*cur != GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_global ->deframe_state]) { char *msg; gpr_asprintf( &msg, "Connect string mismatch: expected '%c' (%d) got '%c' (%d) " "at byte %d", - GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing + GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_global ->deframe_state], (int)(uint8_t)GRPC_CHTTP2_CLIENT_CONNECT_STRING - [transport_parsing->deframe_state], - *cur, (int)*cur, transport_parsing->deframe_state); + [transport_global->deframe_state], + *cur, (int)*cur, transport_global->deframe_state); err = GRPC_ERROR_CREATE(msg); gpr_free(msg); return err; } ++cur; - ++transport_parsing->deframe_state; + ++transport_global->deframe_state; } if (cur == end) { return GRPC_ERROR_NONE; @@ -325,100 +132,104 @@ grpc_error *grpc_chttp2_perform_read( dts_fh_0: case GRPC_DTS_FH_0: GPR_ASSERT(cur < end); - transport_parsing->incoming_frame_size = ((uint32_t)*cur) << 16; + transport_global->incoming_frame_size = ((uint32_t)*cur) << 16; if (++cur == end) { - transport_parsing->deframe_state = GRPC_DTS_FH_1; + transport_global->deframe_state = GRPC_DTS_FH_1; return GRPC_ERROR_NONE; } /* fallthrough */ case GRPC_DTS_FH_1: GPR_ASSERT(cur < end); - transport_parsing->incoming_frame_size |= ((uint32_t)*cur) << 8; + transport_global->incoming_frame_size |= ((uint32_t)*cur) << 8; if (++cur == end) { - transport_parsing->deframe_state = GRPC_DTS_FH_2; + transport_global->deframe_state = GRPC_DTS_FH_2; return GRPC_ERROR_NONE; } /* fallthrough */ case GRPC_DTS_FH_2: GPR_ASSERT(cur < end); - transport_parsing->incoming_frame_size |= *cur; + transport_global->incoming_frame_size |= *cur; if (++cur == end) { - transport_parsing->deframe_state = GRPC_DTS_FH_3; + transport_global->deframe_state = GRPC_DTS_FH_3; return GRPC_ERROR_NONE; } /* fallthrough */ case GRPC_DTS_FH_3: GPR_ASSERT(cur < end); - transport_parsing->incoming_frame_type = *cur; + transport_global->incoming_frame_type = *cur; if (++cur == end) { - transport_parsing->deframe_state = GRPC_DTS_FH_4; + transport_global->deframe_state = GRPC_DTS_FH_4; return GRPC_ERROR_NONE; } /* fallthrough */ case GRPC_DTS_FH_4: GPR_ASSERT(cur < end); - transport_parsing->incoming_frame_flags = *cur; + transport_global->incoming_frame_flags = *cur; if (++cur == end) { - transport_parsing->deframe_state = GRPC_DTS_FH_5; + transport_global->deframe_state = GRPC_DTS_FH_5; return GRPC_ERROR_NONE; } /* fallthrough */ case GRPC_DTS_FH_5: GPR_ASSERT(cur < end); - transport_parsing->incoming_stream_id = (((uint32_t)*cur) & 0x7f) << 24; + transport_global->incoming_stream_id = (((uint32_t)*cur) & 0x7f) << 24; if (++cur == end) { - transport_parsing->deframe_state = GRPC_DTS_FH_6; + transport_global->deframe_state = GRPC_DTS_FH_6; return GRPC_ERROR_NONE; } /* fallthrough */ case GRPC_DTS_FH_6: GPR_ASSERT(cur < end); - transport_parsing->incoming_stream_id |= ((uint32_t)*cur) << 16; + transport_global->incoming_stream_id |= ((uint32_t)*cur) << 16; if (++cur == end) { - transport_parsing->deframe_state = GRPC_DTS_FH_7; + transport_global->deframe_state = GRPC_DTS_FH_7; return GRPC_ERROR_NONE; } /* fallthrough */ case GRPC_DTS_FH_7: GPR_ASSERT(cur < end); - transport_parsing->incoming_stream_id |= ((uint32_t)*cur) << 8; + transport_global->incoming_stream_id |= ((uint32_t)*cur) << 8; if (++cur == end) { - transport_parsing->deframe_state = GRPC_DTS_FH_8; + transport_global->deframe_state = GRPC_DTS_FH_8; return GRPC_ERROR_NONE; } /* fallthrough */ case GRPC_DTS_FH_8: GPR_ASSERT(cur < end); - transport_parsing->incoming_stream_id |= ((uint32_t)*cur); - transport_parsing->deframe_state = GRPC_DTS_FRAME; - err = init_frame_parser(exec_ctx, transport_parsing); + transport_global->incoming_stream_id |= ((uint32_t)*cur); + transport_global->deframe_state = GRPC_DTS_FRAME; + err = init_frame_parser(exec_ctx, transport_global); if (err != GRPC_ERROR_NONE) { return err; } - if (transport_parsing->incoming_stream_id != 0 && - transport_parsing->incoming_stream_id > - transport_parsing->last_incoming_stream_id) { - transport_parsing->last_incoming_stream_id = - transport_parsing->incoming_stream_id; + if (transport_global->incoming_stream_id != 0 && + transport_global->incoming_stream_id > + transport_global->last_incoming_stream_id) { + transport_global->last_incoming_stream_id = + transport_global->incoming_stream_id; } - if (transport_parsing->incoming_frame_size == 0) { - err = parse_frame_slice(exec_ctx, transport_parsing, gpr_empty_slice(), - 1); + if (transport_global->incoming_frame_size == 0) { + err = + parse_frame_slice(exec_ctx, transport_global, gpr_empty_slice(), 1); if (err != GRPC_ERROR_NONE) { return err; } - transport_parsing->incoming_stream = NULL; + transport_global->incoming_stream = NULL; if (++cur == end) { - transport_parsing->deframe_state = GRPC_DTS_FH_0; + transport_global->deframe_state = GRPC_DTS_FH_0; return GRPC_ERROR_NONE; } goto dts_fh_0; /* loop */ - } else if (transport_parsing->incoming_frame_size > - transport_parsing->max_frame_size) { + } else if (transport_global->incoming_frame_size > + transport_global + ->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]) { char *msg; - gpr_asprintf(&msg, "Frame size %d is larger than max frame size %d", - transport_parsing->incoming_frame_size, - transport_parsing->max_frame_size); + gpr_asprintf( + &msg, "Frame size %d is larger than max frame size %d", + transport_global->incoming_frame_size, + transport_global->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]); err = GRPC_ERROR_CREATE(msg); gpr_free(msg); return err; @@ -429,41 +240,41 @@ grpc_error *grpc_chttp2_perform_read( /* fallthrough */ case GRPC_DTS_FRAME: GPR_ASSERT(cur < end); - if ((uint32_t)(end - cur) == transport_parsing->incoming_frame_size) { - err = parse_frame_slice(exec_ctx, transport_parsing, + if ((uint32_t)(end - cur) == transport_global->incoming_frame_size) { + err = parse_frame_slice(exec_ctx, transport_global, gpr_slice_sub_no_ref(slice, (size_t)(cur - beg), (size_t)(end - beg)), 1); if (err != GRPC_ERROR_NONE) { return err; } - transport_parsing->deframe_state = GRPC_DTS_FH_0; - transport_parsing->incoming_stream = NULL; + transport_global->deframe_state = GRPC_DTS_FH_0; + transport_global->incoming_stream = NULL; return GRPC_ERROR_NONE; } else if ((uint32_t)(end - cur) > - transport_parsing->incoming_frame_size) { + transport_global->incoming_frame_size) { size_t cur_offset = (size_t)(cur - beg); err = parse_frame_slice( - exec_ctx, transport_parsing, + exec_ctx, transport_global, gpr_slice_sub_no_ref( slice, cur_offset, - cur_offset + transport_parsing->incoming_frame_size), + cur_offset + transport_global->incoming_frame_size), 1); if (err != GRPC_ERROR_NONE) { return err; } - cur += transport_parsing->incoming_frame_size; - transport_parsing->incoming_stream = NULL; + cur += transport_global->incoming_frame_size; + transport_global->incoming_stream = NULL; goto dts_fh_0; /* loop */ } else { - err = parse_frame_slice(exec_ctx, transport_parsing, + err = parse_frame_slice(exec_ctx, transport_global, gpr_slice_sub_no_ref(slice, (size_t)(cur - beg), (size_t)(end - beg)), 0); if (err != GRPC_ERROR_NONE) { return err; } - transport_parsing->incoming_frame_size -= (uint32_t)(end - cur); + transport_global->incoming_frame_size -= (uint32_t)(end - cur); return GRPC_ERROR_NONE; } GPR_UNREACHABLE_CODE(return 0); @@ -473,72 +284,72 @@ grpc_error *grpc_chttp2_perform_read( } static grpc_error *init_frame_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) { - if (transport_parsing->is_first_frame && - transport_parsing->incoming_frame_type != GRPC_CHTTP2_FRAME_SETTINGS) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) { + if (transport_global->is_first_frame && + transport_global->incoming_frame_type != GRPC_CHTTP2_FRAME_SETTINGS) { char *msg; gpr_asprintf( &msg, "Expected SETTINGS frame as the first frame, got frame type %d", - transport_parsing->incoming_frame_type); + transport_global->incoming_frame_type); grpc_error *err = GRPC_ERROR_CREATE(msg); gpr_free(msg); return err; } - transport_parsing->is_first_frame = false; - if (transport_parsing->expect_continuation_stream_id != 0) { - if (transport_parsing->incoming_frame_type != + transport_global->is_first_frame = false; + if (transport_global->expect_continuation_stream_id != 0) { + if (transport_global->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) { char *msg; gpr_asprintf(&msg, "Expected CONTINUATION frame, got frame type %02x", - transport_parsing->incoming_frame_type); + transport_global->incoming_frame_type); grpc_error *err = GRPC_ERROR_CREATE(msg); gpr_free(msg); return err; } - if (transport_parsing->expect_continuation_stream_id != - transport_parsing->incoming_stream_id) { + if (transport_global->expect_continuation_stream_id != + transport_global->incoming_stream_id) { char *msg; gpr_asprintf( &msg, "Expected CONTINUATION frame for grpc_chttp2_stream %08x, got " "grpc_chttp2_stream %08x", - transport_parsing->expect_continuation_stream_id, - transport_parsing->incoming_stream_id); + transport_global->expect_continuation_stream_id, + transport_global->incoming_stream_id); grpc_error *err = GRPC_ERROR_CREATE(msg); gpr_free(msg); return err; } - return init_header_frame_parser(exec_ctx, transport_parsing, 1); + return init_header_frame_parser(exec_ctx, transport_global, 1); } - switch (transport_parsing->incoming_frame_type) { + switch (transport_global->incoming_frame_type) { case GRPC_CHTTP2_FRAME_DATA: - return init_data_frame_parser(exec_ctx, transport_parsing); + return init_data_frame_parser(exec_ctx, transport_global); case GRPC_CHTTP2_FRAME_HEADER: - return init_header_frame_parser(exec_ctx, transport_parsing, 0); + return init_header_frame_parser(exec_ctx, transport_global, 0); case GRPC_CHTTP2_FRAME_CONTINUATION: return GRPC_ERROR_CREATE("Unexpected CONTINUATION frame"); case GRPC_CHTTP2_FRAME_RST_STREAM: - return init_rst_stream_parser(exec_ctx, transport_parsing); + return init_rst_stream_parser(exec_ctx, transport_global); case GRPC_CHTTP2_FRAME_SETTINGS: - return init_settings_frame_parser(exec_ctx, transport_parsing); + return init_settings_frame_parser(exec_ctx, transport_global); case GRPC_CHTTP2_FRAME_WINDOW_UPDATE: - return init_window_update_frame_parser(exec_ctx, transport_parsing); + return init_window_update_frame_parser(exec_ctx, transport_global); case GRPC_CHTTP2_FRAME_PING: - return init_ping_parser(exec_ctx, transport_parsing); + return init_ping_parser(exec_ctx, transport_global); case GRPC_CHTTP2_FRAME_GOAWAY: - return init_goaway_parser(exec_ctx, transport_parsing); + return init_goaway_parser(exec_ctx, transport_global); default: if (grpc_http_trace) { gpr_log(GPR_ERROR, "Unknown frame type %02x", - transport_parsing->incoming_frame_type); + transport_global->incoming_frame_type); } - return init_skip_frame_parser(exec_ctx, transport_parsing, 0); + return init_skip_frame_parser(exec_ctx, transport_global, 0); } } static grpc_error *skip_parser(grpc_exec_ctx *exec_ctx, void *parser, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) { return GRPC_ERROR_NONE; } @@ -546,101 +357,100 @@ static grpc_error *skip_parser(grpc_exec_ctx *exec_ctx, void *parser, static void skip_header(void *tp, grpc_mdelem *md) { GRPC_MDELEM_UNREF(md); } static grpc_error *init_skip_frame_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, int is_header) { if (is_header) { - uint8_t is_eoh = transport_parsing->expect_continuation_stream_id != 0; - transport_parsing->parser = grpc_chttp2_header_parser_parse; - transport_parsing->parser_data = &transport_parsing->hpack_parser; - transport_parsing->hpack_parser.on_header = skip_header; - transport_parsing->hpack_parser.on_header_user_data = NULL; - transport_parsing->hpack_parser.is_boundary = is_eoh; - transport_parsing->hpack_parser.is_eof = - (uint8_t)(is_eoh ? transport_parsing->header_eof : 0); + uint8_t is_eoh = transport_global->expect_continuation_stream_id != 0; + transport_global->parser = grpc_chttp2_header_parser_parse; + transport_global->parser_data = &transport_global->hpack_parser; + transport_global->hpack_parser.on_header = skip_header; + transport_global->hpack_parser.on_header_user_data = NULL; + transport_global->hpack_parser.is_boundary = is_eoh; + transport_global->hpack_parser.is_eof = + (uint8_t)(is_eoh ? transport_global->header_eof : 0); } else { - transport_parsing->parser = skip_parser; + transport_global->parser = skip_parser; } return GRPC_ERROR_NONE; } void grpc_chttp2_parsing_become_skip_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) { init_skip_frame_parser( - exec_ctx, transport_parsing, - transport_parsing->parser == grpc_chttp2_header_parser_parse); + exec_ctx, transport_global, + transport_global->parser == grpc_chttp2_header_parser_parse); } static grpc_error *update_incoming_window( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing) { - uint32_t incoming_frame_size = transport_parsing->incoming_frame_size; - if (incoming_frame_size > transport_parsing->incoming_window) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global) { + uint32_t incoming_frame_size = transport_global->incoming_frame_size; + if (incoming_frame_size > transport_global->incoming_window) { char *msg; gpr_asprintf(&msg, "frame of size %d overflows incoming window of %" PRId64, - transport_parsing->incoming_frame_size, - transport_parsing->incoming_window); + transport_global->incoming_frame_size, + transport_global->incoming_window); grpc_error *err = GRPC_ERROR_CREATE(msg); gpr_free(msg); return err; } - if (incoming_frame_size > stream_parsing->incoming_window) { + if (incoming_frame_size > stream_global->incoming_window) { char *msg; gpr_asprintf(&msg, "frame of size %d overflows incoming window of %" PRId64, - transport_parsing->incoming_frame_size, - stream_parsing->incoming_window); + transport_global->incoming_frame_size, + stream_global->incoming_window); grpc_error *err = GRPC_ERROR_CREATE(msg); gpr_free(msg); return err; } - GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", transport_parsing, incoming_window, + GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", transport_global, incoming_window, incoming_frame_size); - GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", transport_parsing, stream_parsing, + GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", transport_global, stream_global, incoming_window, incoming_frame_size); - stream_parsing->received_bytes += incoming_frame_size; - - grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); + stream_global->received_bytes += incoming_frame_size; + stream_global->max_recv_bytes -= + (uint32_t)GPR_MIN(stream_global->max_recv_bytes, incoming_frame_size); return GRPC_ERROR_NONE; } static grpc_error *init_data_frame_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) { - grpc_chttp2_stream_parsing *stream_parsing = - grpc_chttp2_parsing_lookup_stream(transport_parsing, - transport_parsing->incoming_stream_id); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) { + grpc_chttp2_stream_global *stream_global = grpc_chttp2_parsing_lookup_stream( + transport_global, transport_global->incoming_stream_id); grpc_error *err = GRPC_ERROR_NONE; - if (stream_parsing == NULL) { - return init_skip_frame_parser(exec_ctx, transport_parsing, 0); + if (stream_global == NULL) { + return init_skip_frame_parser(exec_ctx, transport_global, 0); } - stream_parsing->stats.incoming.framing_bytes += 9; - if (stream_parsing->received_close) { - return init_skip_frame_parser(exec_ctx, transport_parsing, 0); + stream_global->stats.incoming.framing_bytes += 9; + if (stream_global->read_closed) { + return init_skip_frame_parser(exec_ctx, transport_global, 0); } if (err == GRPC_ERROR_NONE) { - err = update_incoming_window(exec_ctx, transport_parsing, stream_parsing); + err = update_incoming_window(exec_ctx, transport_global, stream_global); } if (err == GRPC_ERROR_NONE) { err = grpc_chttp2_data_parser_begin_frame( - &stream_parsing->data_parser, transport_parsing->incoming_frame_flags, - stream_parsing->id); + &stream_global->data_parser, transport_global->incoming_frame_flags, + stream_global->id); } if (err == GRPC_ERROR_NONE) { - transport_parsing->incoming_stream = stream_parsing; - transport_parsing->parser = grpc_chttp2_data_parser_parse; - transport_parsing->parser_data = &stream_parsing->data_parser; + transport_global->incoming_stream = stream_global; + transport_global->parser = grpc_chttp2_data_parser_parse; + transport_global->parser_data = &stream_global->data_parser; return GRPC_ERROR_NONE; } else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, NULL)) { /* handle stream errors by closing the stream */ - stream_parsing->received_close = 1; - stream_parsing->forced_close_error = err; + grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, + true, false, GRPC_ERROR_REF(err)); gpr_slice_buffer_add( - &transport_parsing->qbuf, - grpc_chttp2_rst_stream_create(transport_parsing->incoming_stream_id, + &transport_global->qbuf, + grpc_chttp2_rst_stream_create(transport_global->incoming_stream_id, GRPC_CHTTP2_PROTOCOL_ERROR, - &stream_parsing->stats.outgoing)); - return init_skip_frame_parser(exec_ctx, transport_parsing, 0); + &stream_global->stats.outgoing)); + return init_skip_frame_parser(exec_ctx, transport_global, 0); } else { return err; } @@ -649,22 +459,21 @@ static grpc_error *init_data_frame_parser( static void free_timeout(void *p) { gpr_free(p); } static void on_initial_header(void *tp, grpc_mdelem *md) { - grpc_chttp2_transport_parsing *transport_parsing = tp; - grpc_chttp2_stream_parsing *stream_parsing = - transport_parsing->incoming_stream; + grpc_chttp2_transport_global *transport_global = tp; + grpc_chttp2_stream_global *stream_global = transport_global->incoming_stream; GPR_TIMER_BEGIN("on_initial_header", 0); - GPR_ASSERT(stream_parsing); + GPR_ASSERT(stream_global); GRPC_CHTTP2_IF_TRACING(gpr_log( - GPR_INFO, "HTTP:%d:HDR:%s: %s: %s", stream_parsing->id, - transport_parsing->is_client ? "CLI" : "SVR", + GPR_INFO, "HTTP:%d:HDR:%s: %s: %s", stream_global->id, + transport_global->is_client ? "CLI" : "SVR", grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value))); if (md->key == GRPC_MDSTR_GRPC_STATUS && md != GRPC_MDELEM_GRPC_STATUS_0) { /* TODO(ctiller): check for a status like " 0" */ - stream_parsing->seen_error = true; + stream_global->seen_error = true; } if (md->key == GRPC_MDSTR_GRPC_TIMEOUT) { @@ -681,269 +490,260 @@ static void on_initial_header(void *tp, grpc_mdelem *md) { grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); } grpc_chttp2_incoming_metadata_buffer_set_deadline( - &stream_parsing->metadata_buffer[0], + &stream_global->metadata_buffer[0], gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), *cached_timeout)); GRPC_MDELEM_UNREF(md); } else { const size_t new_size = - stream_parsing->metadata_buffer[0].size + GRPC_MDELEM_LENGTH(md); - grpc_chttp2_transport_global *transport_global = - &TRANSPORT_FROM_PARSING(transport_parsing)->global; + stream_global->metadata_buffer[0].size + GRPC_MDELEM_LENGTH(md); const size_t metadata_size_limit = - transport_global->settings[GRPC_LOCAL_SETTINGS] + transport_global->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE]; if (new_size > metadata_size_limit) { - if (!stream_parsing->exceeded_metadata_size) { + if (!stream_global->exceeded_metadata_size) { gpr_log(GPR_DEBUG, "received initial metadata size exceeds limit (%" PRIuPTR " vs. %" PRIuPTR ")", new_size, metadata_size_limit); - stream_parsing->seen_error = true; - stream_parsing->exceeded_metadata_size = true; + stream_global->seen_error = true; + stream_global->exceeded_metadata_size = true; } GRPC_MDELEM_UNREF(md); } else { grpc_chttp2_incoming_metadata_buffer_add( - &stream_parsing->metadata_buffer[0], md); + &stream_global->metadata_buffer[0], md); } } - grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); - GPR_TIMER_END("on_initial_header", 0); } static void on_trailing_header(void *tp, grpc_mdelem *md) { - grpc_chttp2_transport_parsing *transport_parsing = tp; - grpc_chttp2_stream_parsing *stream_parsing = - transport_parsing->incoming_stream; + grpc_chttp2_transport_global *transport_global = tp; + grpc_chttp2_stream_global *stream_global = transport_global->incoming_stream; GPR_TIMER_BEGIN("on_trailing_header", 0); - GPR_ASSERT(stream_parsing); + GPR_ASSERT(stream_global); GRPC_CHTTP2_IF_TRACING(gpr_log( - GPR_INFO, "HTTP:%d:TRL:%s: %s: %s", stream_parsing->id, - transport_parsing->is_client ? "CLI" : "SVR", + GPR_INFO, "HTTP:%d:TRL:%s: %s: %s", stream_global->id, + transport_global->is_client ? "CLI" : "SVR", grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value))); if (md->key == GRPC_MDSTR_GRPC_STATUS && md != GRPC_MDELEM_GRPC_STATUS_0) { /* TODO(ctiller): check for a status like " 0" */ - stream_parsing->seen_error = true; + stream_global->seen_error = true; } const size_t new_size = - stream_parsing->metadata_buffer[1].size + GRPC_MDELEM_LENGTH(md); - grpc_chttp2_transport_global *transport_global = - &TRANSPORT_FROM_PARSING(transport_parsing)->global; + stream_global->metadata_buffer[1].size + GRPC_MDELEM_LENGTH(md); const size_t metadata_size_limit = - transport_global->settings[GRPC_LOCAL_SETTINGS] + transport_global->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE]; if (new_size > metadata_size_limit) { - if (!stream_parsing->exceeded_metadata_size) { + if (!stream_global->exceeded_metadata_size) { gpr_log(GPR_DEBUG, "received trailing metadata size exceeds limit (%" PRIuPTR " vs. %" PRIuPTR ")", new_size, metadata_size_limit); - stream_parsing->seen_error = true; - stream_parsing->exceeded_metadata_size = true; + stream_global->seen_error = true; + stream_global->exceeded_metadata_size = true; } GRPC_MDELEM_UNREF(md); } else { - grpc_chttp2_incoming_metadata_buffer_add( - &stream_parsing->metadata_buffer[1], md); + grpc_chttp2_incoming_metadata_buffer_add(&stream_global->metadata_buffer[1], + md); } - grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); - GPR_TIMER_END("on_trailing_header", 0); } static grpc_error *init_header_frame_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, int is_continuation) { - uint8_t is_eoh = (transport_parsing->incoming_frame_flags & + uint8_t is_eoh = (transport_global->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0; int via_accept = 0; - grpc_chttp2_stream_parsing *stream_parsing; + grpc_chttp2_stream_global *stream_global; /* TODO(ctiller): when to increment header_frames_received? */ if (is_eoh) { - transport_parsing->expect_continuation_stream_id = 0; + transport_global->expect_continuation_stream_id = 0; } else { - transport_parsing->expect_continuation_stream_id = - transport_parsing->incoming_stream_id; + transport_global->expect_continuation_stream_id = + transport_global->incoming_stream_id; } if (!is_continuation) { - transport_parsing->header_eof = (transport_parsing->incoming_frame_flags & - GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0; + transport_global->header_eof = (transport_global->incoming_frame_flags & + GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0; } /* could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream */ - stream_parsing = grpc_chttp2_parsing_lookup_stream( - transport_parsing, transport_parsing->incoming_stream_id); - if (stream_parsing == NULL) { + stream_global = grpc_chttp2_parsing_lookup_stream( + transport_global, transport_global->incoming_stream_id); + if (stream_global == NULL) { if (is_continuation) { gpr_log(GPR_ERROR, "grpc_chttp2_stream disbanded before CONTINUATION received"); - return init_skip_frame_parser(exec_ctx, transport_parsing, 1); + return init_skip_frame_parser(exec_ctx, transport_global, 1); } - if (transport_parsing->is_client) { - if ((transport_parsing->incoming_stream_id & 1) && - transport_parsing->incoming_stream_id < - transport_parsing->next_stream_id) { + if (transport_global->is_client) { + if ((transport_global->incoming_stream_id & 1) && + transport_global->incoming_stream_id < + transport_global->next_stream_id) { /* this is an old (probably cancelled) grpc_chttp2_stream */ } else { gpr_log(GPR_ERROR, "ignoring new grpc_chttp2_stream creation on client"); } - return init_skip_frame_parser(exec_ctx, transport_parsing, 1); - } else if (transport_parsing->last_incoming_stream_id > - transport_parsing->incoming_stream_id) { + return init_skip_frame_parser(exec_ctx, transport_global, 1); + } else if (transport_global->last_incoming_stream_id > + transport_global->incoming_stream_id) { gpr_log(GPR_ERROR, "ignoring out of order new grpc_chttp2_stream request on server; " "last grpc_chttp2_stream " "id=%d, new grpc_chttp2_stream id=%d", - transport_parsing->last_incoming_stream_id, - transport_parsing->incoming_stream_id); - return init_skip_frame_parser(exec_ctx, transport_parsing, 1); - } else if ((transport_parsing->incoming_stream_id & 1) == 0) { + transport_global->last_incoming_stream_id, + transport_global->incoming_stream_id); + return init_skip_frame_parser(exec_ctx, transport_global, 1); + } else if ((transport_global->incoming_stream_id & 1) == 0) { gpr_log(GPR_ERROR, "ignoring grpc_chttp2_stream with non-client generated index %d", - transport_parsing->incoming_stream_id); - return init_skip_frame_parser(exec_ctx, transport_parsing, 1); + transport_global->incoming_stream_id); + return init_skip_frame_parser(exec_ctx, transport_global, 1); } - stream_parsing = transport_parsing->incoming_stream = - grpc_chttp2_parsing_accept_stream( - exec_ctx, transport_parsing, transport_parsing->incoming_stream_id); - if (stream_parsing == NULL) { + stream_global = transport_global->incoming_stream = + grpc_chttp2_parsing_accept_stream(exec_ctx, transport_global, + transport_global->incoming_stream_id); + if (stream_global == NULL) { gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted"); - return init_skip_frame_parser(exec_ctx, transport_parsing, 1); + return init_skip_frame_parser(exec_ctx, transport_global, 1); } via_accept = 1; } else { - transport_parsing->incoming_stream = stream_parsing; + transport_global->incoming_stream = stream_global; } - GPR_ASSERT(stream_parsing != NULL && (via_accept == 0 || via_accept == 1)); - stream_parsing->stats.incoming.framing_bytes += 9; - if (stream_parsing->received_close) { + GPR_ASSERT(stream_global != NULL && (via_accept == 0 || via_accept == 1)); + stream_global->stats.incoming.framing_bytes += 9; + if (stream_global->read_closed) { gpr_log(GPR_ERROR, "skipping already closed grpc_chttp2_stream header"); - transport_parsing->incoming_stream = NULL; - return init_skip_frame_parser(exec_ctx, transport_parsing, 1); + transport_global->incoming_stream = NULL; + return init_skip_frame_parser(exec_ctx, transport_global, 1); } - transport_parsing->parser = grpc_chttp2_header_parser_parse; - transport_parsing->parser_data = &transport_parsing->hpack_parser; - switch (stream_parsing->header_frames_received) { + transport_global->parser = grpc_chttp2_header_parser_parse; + transport_global->parser_data = &transport_global->hpack_parser; + switch (stream_global->header_frames_received) { case 0: - transport_parsing->hpack_parser.on_header = on_initial_header; + transport_global->hpack_parser.on_header = on_initial_header; break; case 1: - transport_parsing->hpack_parser.on_header = on_trailing_header; + transport_global->hpack_parser.on_header = on_trailing_header; break; case 2: gpr_log(GPR_ERROR, "too many header frames received"); - return init_skip_frame_parser(exec_ctx, transport_parsing, 1); + return init_skip_frame_parser(exec_ctx, transport_global, 1); } - transport_parsing->hpack_parser.on_header_user_data = transport_parsing; - transport_parsing->hpack_parser.is_boundary = is_eoh; - transport_parsing->hpack_parser.is_eof = - (uint8_t)(is_eoh ? transport_parsing->header_eof : 0); - if (!is_continuation && (transport_parsing->incoming_frame_flags & + transport_global->hpack_parser.on_header_user_data = transport_global; + transport_global->hpack_parser.is_boundary = is_eoh; + transport_global->hpack_parser.is_eof = + (uint8_t)(is_eoh ? transport_global->header_eof : 0); + if (!is_continuation && (transport_global->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) { - grpc_chttp2_hpack_parser_set_has_priority(&transport_parsing->hpack_parser); + grpc_chttp2_hpack_parser_set_has_priority(&transport_global->hpack_parser); } return GRPC_ERROR_NONE; } static grpc_error *init_window_update_frame_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) { grpc_error *err = grpc_chttp2_window_update_parser_begin_frame( - &transport_parsing->simple.window_update, - transport_parsing->incoming_frame_size, - transport_parsing->incoming_frame_flags); + &transport_global->simple.window_update, + transport_global->incoming_frame_size, + transport_global->incoming_frame_flags); if (err != GRPC_ERROR_NONE) return err; - if (transport_parsing->incoming_stream_id != 0) { - grpc_chttp2_stream_parsing *stream_parsing = - transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream( - transport_parsing, transport_parsing->incoming_stream_id); - if (stream_parsing == NULL) { - return init_skip_frame_parser(exec_ctx, transport_parsing, 0); + if (transport_global->incoming_stream_id != 0) { + grpc_chttp2_stream_global *stream_global = + transport_global->incoming_stream = grpc_chttp2_parsing_lookup_stream( + transport_global, transport_global->incoming_stream_id); + if (stream_global == NULL) { + return init_skip_frame_parser(exec_ctx, transport_global, 0); } - stream_parsing->stats.incoming.framing_bytes += 9; + stream_global->stats.incoming.framing_bytes += 9; } - transport_parsing->parser = grpc_chttp2_window_update_parser_parse; - transport_parsing->parser_data = &transport_parsing->simple.window_update; + transport_global->parser = grpc_chttp2_window_update_parser_parse; + transport_global->parser_data = &transport_global->simple.window_update; return GRPC_ERROR_NONE; } static grpc_error *init_ping_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) { grpc_error *err = grpc_chttp2_ping_parser_begin_frame( - &transport_parsing->simple.ping, transport_parsing->incoming_frame_size, - transport_parsing->incoming_frame_flags); + &transport_global->simple.ping, transport_global->incoming_frame_size, + transport_global->incoming_frame_flags); if (err != GRPC_ERROR_NONE) return err; - transport_parsing->parser = grpc_chttp2_ping_parser_parse; - transport_parsing->parser_data = &transport_parsing->simple.ping; + transport_global->parser = grpc_chttp2_ping_parser_parse; + transport_global->parser_data = &transport_global->simple.ping; return GRPC_ERROR_NONE; } static grpc_error *init_rst_stream_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) { grpc_error *err = grpc_chttp2_rst_stream_parser_begin_frame( - &transport_parsing->simple.rst_stream, - transport_parsing->incoming_frame_size, - transport_parsing->incoming_frame_flags); + &transport_global->simple.rst_stream, + transport_global->incoming_frame_size, + transport_global->incoming_frame_flags); if (err != GRPC_ERROR_NONE) return err; - grpc_chttp2_stream_parsing *stream_parsing = - transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream( - transport_parsing, transport_parsing->incoming_stream_id); - if (!transport_parsing->incoming_stream) { - return init_skip_frame_parser(exec_ctx, transport_parsing, 0); - } - stream_parsing->stats.incoming.framing_bytes += 9; - transport_parsing->parser = grpc_chttp2_rst_stream_parser_parse; - transport_parsing->parser_data = &transport_parsing->simple.rst_stream; + grpc_chttp2_stream_global *stream_global = transport_global->incoming_stream = + grpc_chttp2_parsing_lookup_stream(transport_global, + transport_global->incoming_stream_id); + if (!transport_global->incoming_stream) { + return init_skip_frame_parser(exec_ctx, transport_global, 0); + } + stream_global->stats.incoming.framing_bytes += 9; + transport_global->parser = grpc_chttp2_rst_stream_parser_parse; + transport_global->parser_data = &transport_global->simple.rst_stream; return GRPC_ERROR_NONE; } static grpc_error *init_goaway_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) { grpc_error *err = grpc_chttp2_goaway_parser_begin_frame( - &transport_parsing->goaway_parser, transport_parsing->incoming_frame_size, - transport_parsing->incoming_frame_flags); + &transport_global->goaway_parser, transport_global->incoming_frame_size, + transport_global->incoming_frame_flags); if (err != GRPC_ERROR_NONE) return err; - transport_parsing->parser = grpc_chttp2_goaway_parser_parse; - transport_parsing->parser_data = &transport_parsing->goaway_parser; + transport_global->parser = grpc_chttp2_goaway_parser_parse; + transport_global->parser_data = &transport_global->goaway_parser; return GRPC_ERROR_NONE; } static grpc_error *init_settings_frame_parser( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) { - if (transport_parsing->incoming_stream_id != 0) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) { + if (transport_global->incoming_stream_id != 0) { return GRPC_ERROR_CREATE("Settings frame received for grpc_chttp2_stream"); } grpc_error *err = grpc_chttp2_settings_parser_begin_frame( - &transport_parsing->simple.settings, - transport_parsing->incoming_frame_size, - transport_parsing->incoming_frame_flags, transport_parsing->settings); + &transport_global->simple.settings, transport_global->incoming_frame_size, + transport_global->incoming_frame_flags, + transport_global->settings[GRPC_PEER_SETTINGS]); if (err != GRPC_ERROR_NONE) { return err; } - if (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) { - transport_parsing->settings_ack_received = 1; + if (transport_global->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) { + memcpy(transport_global->settings[GRPC_ACKED_SETTINGS], + transport_global->settings[GRPC_SENT_SETTINGS], + GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t)); grpc_chttp2_hptbl_set_max_bytes( - &transport_parsing->hpack_parser.table, - transport_parsing - ->last_sent_settings[GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]); - transport_parsing->max_frame_size = - transport_parsing - ->last_sent_settings[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]; + &transport_global->hpack_parser.table, + transport_global->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]); + transport_global->sent_local_settings = 0; } - transport_parsing->parser = grpc_chttp2_settings_parser_parse; - transport_parsing->parser_data = &transport_parsing->simple.settings; + transport_global->parser = grpc_chttp2_settings_parser_parse; + transport_global->parser_data = &transport_global->simple.settings; return GRPC_ERROR_NONE; } @@ -954,33 +754,32 @@ static int is_window_update_legal(int64_t window_update, int64_t window) { */ static grpc_error *parse_frame_slice( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, gpr_slice slice, int is_last) { - grpc_chttp2_stream_parsing *stream_parsing = - transport_parsing->incoming_stream; - grpc_error *err = transport_parsing->parser( - exec_ctx, transport_parsing->parser_data, transport_parsing, - stream_parsing, slice, is_last); + grpc_chttp2_stream_global *stream_global = transport_global->incoming_stream; + grpc_error *err = + transport_global->parser(exec_ctx, transport_global->parser_data, + transport_global, stream_global, slice, is_last); if (err == GRPC_ERROR_NONE) { - if (stream_parsing) { - grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, - stream_parsing); + if (stream_global != NULL) { + grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, + stream_global); } - return GRPC_ERROR_NONE; + return err; } else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, NULL)) { if (grpc_http_trace) { const char *msg = grpc_error_string(err); gpr_log(GPR_ERROR, "%s", msg); grpc_error_free_string(msg); } - grpc_chttp2_parsing_become_skip_parser(exec_ctx, transport_parsing); - if (stream_parsing) { - stream_parsing->forced_close_error = err; + grpc_chttp2_parsing_become_skip_parser(exec_ctx, transport_global); + if (stream_global) { + stream_global->forced_close_error = err; gpr_slice_buffer_add( - &transport_parsing->qbuf, - grpc_chttp2_rst_stream_create(transport_parsing->incoming_stream_id, + &transport_global->qbuf, + grpc_chttp2_rst_stream_create(transport_global->incoming_stream_id, GRPC_CHTTP2_PROTOCOL_ERROR, - &stream_parsing->stats.outgoing)); + &stream_global->stats.outgoing)); } else { GRPC_ERROR_UNREF(err); } diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c index 4dc4968248d..7c31466c800 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.c +++ b/src/core/ext/transport/chttp2/transport/stream_lists.c @@ -220,63 +220,6 @@ int grpc_chttp2_list_pop_written_stream( return r; } -void grpc_chttp2_list_add_unannounced_incoming_window_available( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global) { - GPR_ASSERT(stream_global->id != 0); - stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), - STREAM_FROM_GLOBAL(stream_global), - GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE); -} - -void grpc_chttp2_list_remove_unannounced_incoming_window_available( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global) { - stream_list_maybe_remove( - TRANSPORT_FROM_GLOBAL(transport_global), - STREAM_FROM_GLOBAL(stream_global), - GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE); -} - -int grpc_chttp2_list_pop_unannounced_incoming_window_available( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_global **stream_global, - grpc_chttp2_stream_parsing **stream_parsing) { - grpc_chttp2_stream *stream; - int r = - stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, - GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE); - if (r != 0) { - *stream_global = &stream->global; - *stream_parsing = &stream->parsing; - } - return r; -} - -void grpc_chttp2_list_add_parsing_seen_stream( - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing) { - stream_list_add(TRANSPORT_FROM_PARSING(transport_parsing), - STREAM_FROM_PARSING(stream_parsing), - GRPC_CHTTP2_LIST_PARSING_SEEN); -} - -int grpc_chttp2_list_pop_parsing_seen_stream( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_global **stream_global, - grpc_chttp2_stream_parsing **stream_parsing) { - grpc_chttp2_stream *stream; - int r = stream_list_pop(TRANSPORT_FROM_PARSING(transport_parsing), &stream, - GRPC_CHTTP2_LIST_PARSING_SEEN); - if (r != 0) { - *stream_global = &stream->global; - *stream_parsing = &stream->parsing; - } - return r; -} - void grpc_chttp2_list_add_waiting_for_concurrency( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) {