Initial pass reifying read and global locks

pull/7793/head
Craig Tiller 9 years ago
parent 99bef209bd
commit 67eb59ee01
  1. 314
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 4
      src/core/ext/transport/chttp2/transport/frame.h
  3. 29
      src/core/ext/transport/chttp2/transport/frame_data.c
  4. 4
      src/core/ext/transport/chttp2/transport/frame_data.h
  5. 13
      src/core/ext/transport/chttp2/transport/frame_goaway.c
  6. 4
      src/core/ext/transport/chttp2/transport/frame_goaway.h
  7. 8
      src/core/ext/transport/chttp2/transport/frame_ping.c
  8. 4
      src/core/ext/transport/chttp2/transport/frame_ping.h
  9. 35
      src/core/ext/transport/chttp2/transport/frame_rst_stream.c
  10. 4
      src/core/ext/transport/chttp2/transport/frame_rst_stream.h
  11. 17
      src/core/ext/transport/chttp2/transport/frame_settings.c
  12. 4
      src/core/ext/transport/chttp2/transport/frame_settings.h
  13. 33
      src/core/ext/transport/chttp2/transport/frame_window_update.c
  14. 4
      src/core/ext/transport/chttp2/transport/frame_window_update.h
  15. 28
      src/core/ext/transport/chttp2/transport/hpack_parser.c
  16. 4
      src/core/ext/transport/chttp2/transport/hpack_parser.h
  17. 169
      src/core/ext/transport/chttp2/transport/internal.h
  18. 759
      src/core/ext/transport/chttp2/transport/parsing.c
  19. 57
      src/core/ext/transport/chttp2/transport/stream_lists.c

@ -69,10 +69,6 @@ int grpc_http_write_state_trace = 0;
((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
writing)))
#define TRANSPORT_FROM_PARSING(tp) \
((grpc_chttp2_transport *)((char *)(tp)-offsetof(grpc_chttp2_transport, \
parsing)))
#define TRANSPORT_FROM_GLOBAL(tg) \
((grpc_chttp2_transport *)((char *)(tg)-offsetof(grpc_chttp2_transport, \
global)))
@ -80,19 +76,13 @@ int grpc_http_write_state_trace = 0;
#define STREAM_FROM_GLOBAL(sg) \
((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, global)))
#define STREAM_FROM_PARSING(sg) \
((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, parsing)))
static const grpc_transport_vtable vtable;
/* forward declarations of various callbacks that we'll build closures around */
static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *t,
@ -164,10 +154,9 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
gpr_slice_buffer_destroy(&t->writing.outbuf);
grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor);
gpr_slice_buffer_destroy(&t->parsing.qbuf);
gpr_slice_buffer_destroy(&t->read_buffer);
grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser);
grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser);
grpc_chttp2_hpack_parser_destroy(&t->global.hpack_parser);
grpc_chttp2_goaway_parser_destroy(&t->global.goaway_parser);
for (i = 0; i < STREAM_LIST_COUNT; i++) {
GPR_ASSERT(t->lists[i].head == NULL);
@ -251,15 +240,14 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->global.next_stream_id = is_client ? 1 : 2;
t->global.is_client = is_client;
t->writing.outgoing_window = DEFAULT_WINDOW;
t->parsing.incoming_window = DEFAULT_WINDOW;
t->global.incoming_window = DEFAULT_WINDOW;
t->global.stream_lookahead = DEFAULT_WINDOW;
t->global.connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
t->global.ping_counter = 1;
t->global.pings.next = t->global.pings.prev = &t->global.pings;
t->parsing.is_client = is_client;
t->parsing.deframe_state =
t->global.deframe_state =
is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->parsing.is_first_frame = true;
t->global.is_first_frame = true;
t->writing.is_client = is_client;
grpc_connectivity_state_init(
&t->channel_callback.state_tracker, GRPC_CHANNEL_READY,
@ -272,8 +260,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->writing_action, writing_action, t);
grpc_closure_init(&t->reading_action, reading_action, t);
grpc_closure_init(&t->reading_action_locked, reading_action_locked, t);
grpc_closure_init(&t->parsing_action, parsing_action, t);
grpc_closure_init(&t->post_parse_locked, post_parse_locked, t);
grpc_closure_init(&t->initiate_writing, initiate_writing_locked, t);
grpc_closure_init(&t->terminate_writing, terminate_writing_with_lock, t);
grpc_closure_init(&t->initiate_read_flush_locked, initiate_read_flush_locked,
@ -281,9 +267,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
&t->writing);
gpr_slice_buffer_init(&t->parsing.qbuf);
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser);
grpc_chttp2_goaway_parser_init(&t->global.goaway_parser);
grpc_chttp2_hpack_parser_init(&t->global.hpack_parser);
gpr_slice_buffer_init(&t->read_buffer);
@ -297,7 +282,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
/* copy in initial settings to all setting sets */
for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
t->parsing.settings[i] = grpc_chttp2_settings_parameters[i].default_value;
for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
t->global.settings[j][i] =
grpc_chttp2_settings_parameters[i].default_value;
@ -508,13 +492,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
gpr_ref_init(&s->global.active_streams, 1);
GRPC_CHTTP2_STREAM_REF(&s->global, "chttp2");
grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.metadata_buffer[0]);
grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.metadata_buffer[1]);
grpc_chttp2_incoming_metadata_buffer_init(
&s->global.received_initial_metadata);
grpc_chttp2_incoming_metadata_buffer_init(
&s->global.received_trailing_metadata);
grpc_chttp2_data_parser_init(&s->parsing.data_parser);
grpc_chttp2_incoming_metadata_buffer_init(&s->global.metadata_buffer[0]);
grpc_chttp2_incoming_metadata_buffer_init(&s->global.metadata_buffer[1]);
grpc_chttp2_data_parser_init(&s->global.data_parser);
gpr_slice_buffer_init(&s->writing.flow_controlled_buffer);
s->global.deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
@ -523,11 +503,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
if (server_data) {
GPR_ASSERT(t->executor.parsing_active);
s->global.id = (uint32_t)(uintptr_t)server_data;
s->parsing.id = s->global.id;
s->global.outgoing_window =
t->global.settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->parsing.incoming_window = s->global.max_recv_bytes =
s->global.incoming_window = s->global.max_recv_bytes =
t->global.settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s;
@ -571,8 +550,6 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global,
&s->global);
grpc_chttp2_list_remove_stalled_by_transport(&t->global, &s->global);
grpc_chttp2_list_remove_check_read_ops(&t->global, &s->global);
@ -590,13 +567,9 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_ASSERT(s->global.recv_initial_metadata_ready == NULL);
GPR_ASSERT(s->global.recv_message_ready == NULL);
GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL);
grpc_chttp2_data_parser_destroy(exec_ctx, &s->parsing.data_parser);
grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[0]);
grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[1]);
grpc_chttp2_incoming_metadata_buffer_destroy(
&s->global.received_initial_metadata);
grpc_chttp2_incoming_metadata_buffer_destroy(
&s->global.received_trailing_metadata);
grpc_chttp2_data_parser_destroy(exec_ctx, &s->global.data_parser);
grpc_chttp2_incoming_metadata_buffer_destroy(&s->global.metadata_buffer[0]);
grpc_chttp2_incoming_metadata_buffer_destroy(&s->global.metadata_buffer[1]);
gpr_slice_buffer_destroy(&s->writing.flow_controlled_buffer);
GRPC_ERROR_UNREF(s->global.read_closed_error);
GRPC_ERROR_UNREF(s->global.write_closed_error);
@ -621,26 +594,26 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
GPR_TIMER_END("destroy_stream", 0);
}
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
grpc_chttp2_transport_parsing *transport_parsing, uint32_t id) {
grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing);
grpc_chttp2_stream_global *grpc_chttp2_parsing_lookup_stream(
grpc_chttp2_transport_global *transport_global, uint32_t id) {
grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
grpc_chttp2_stream *s =
grpc_chttp2_stream_map_find(&t->parsing_stream_map, id);
return s ? &s->parsing : NULL;
return s ? &s->global : NULL;
}
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_global *grpc_chttp2_parsing_accept_stream(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
uint32_t id) {
grpc_chttp2_stream *accepting;
grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing);
grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
GPR_ASSERT(t->accepting_stream == NULL);
t->accepting_stream = &accepting;
t->channel_callback.accept_stream(exec_ctx,
t->channel_callback.accept_stream_user_data,
&t->base, (void *)(uintptr_t)id);
t->accepting_stream = NULL;
return &accepting->parsing;
return &accepting->global;
}
/*******************************************************************************
@ -913,15 +886,13 @@ static void maybe_start_some_streams(
grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
&stream_global)) {
/* safe since we can't (legally) be parsing this stream yet */
grpc_chttp2_stream_parsing *stream_parsing =
&STREAM_FROM_GLOBAL(stream_global)->parsing;
GRPC_CHTTP2_IF_TRACING(gpr_log(
GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
transport_global->is_client ? "CLI" : "SVR", stream_global,
transport_global->next_stream_id));
GPR_ASSERT(stream_global->id == 0);
stream_global->id = stream_parsing->id = transport_global->next_stream_id;
stream_global->id = transport_global->next_stream_id;
transport_global->next_stream_id += 2;
if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) {
@ -933,7 +904,7 @@ static void maybe_start_some_streams(
stream_global->outgoing_window =
transport_global->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
stream_parsing->incoming_window = stream_incoming_window =
stream_global->incoming_window = stream_incoming_window =
transport_global->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
stream_global->max_recv_bytes =
@ -1271,10 +1242,10 @@ static void ack_ping_locked(grpc_exec_ctx *exec_ctx, void *a,
}
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_transport_global *transport_global,
const uint8_t *opaque_8bytes) {
ack_ping_args *args = gpr_malloc(sizeof(*args));
args->t = TRANSPORT_FROM_PARSING(transport_parsing);
args->t = TRANSPORT_FROM_GLOBAL(transport_global);
memcpy(args->opaque_8bytes, opaque_8bytes, sizeof(args->opaque_8bytes));
grpc_closure_init(&args->closure, ack_ping_locked, args);
REF_TRANSPORT(args->t, "ack_ping");
@ -1366,7 +1337,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
while (
grpc_chttp2_list_pop_check_read_ops(transport_global, &stream_global)) {
if (stream_global->recv_initial_metadata_ready != NULL &&
stream_global->published_initial_metadata) {
stream_global->published_metadata[0]) {
if (stream_global->seen_error) {
while ((bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
@ -1382,7 +1353,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
}
}
grpc_chttp2_incoming_metadata_buffer_publish(
&stream_global->received_initial_metadata,
&stream_global->metadata_buffer[0],
stream_global->recv_initial_metadata);
grpc_exec_ctx_sched(exec_ctx, stream_global->recv_initial_metadata_ready,
GRPC_ERROR_NONE, NULL);
@ -1402,7 +1373,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx_sched(exec_ctx, stream_global->recv_message_ready,
GRPC_ERROR_NONE, NULL);
stream_global->recv_message_ready = NULL;
} else if (stream_global->published_trailing_metadata) {
} else if (stream_global->published_metadata[1]) {
*stream_global->recv_message = NULL;
grpc_exec_ctx_sched(exec_ctx, stream_global->recv_message_ready,
GRPC_ERROR_NONE, NULL);
@ -1427,7 +1398,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
}
if (stream_global->all_incoming_byte_streams_finished) {
grpc_chttp2_incoming_metadata_buffer_publish(
&stream_global->received_trailing_metadata,
&stream_global->metadata_buffer[0],
stream_global->recv_trailing_metadata);
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
@ -1458,15 +1429,15 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
GPR_ASSERT(s);
s->global.in_stream_map = false;
if (t->parsing.incoming_stream == &s->parsing) {
t->parsing.incoming_stream = NULL;
grpc_chttp2_parsing_become_skip_parser(exec_ctx, &t->parsing);
if (t->global.incoming_stream == &s->global) {
t->global.incoming_stream = NULL;
grpc_chttp2_parsing_become_skip_parser(exec_ctx, &t->global);
}
if (s->parsing.data_parser.parsing_frame != NULL) {
if (s->global.data_parser.parsing_frame != NULL) {
grpc_chttp2_incoming_byte_stream_finished(
exec_ctx, s->parsing.data_parser.parsing_frame, GRPC_ERROR_REF(error),
exec_ctx, s->global.data_parser.parsing_frame, GRPC_ERROR_REF(error),
0);
s->parsing.data_parser.parsing_frame = NULL;
s->global.data_parser.parsing_frame = NULL;
}
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
@ -1570,22 +1541,22 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
to the upper layers - drop what we've got, and then publish
what we want - which is safe because we haven't told anyone
about the metadata yet */
if (!stream_global->published_trailing_metadata ||
if (!stream_global->published_metadata[1] ||
stream_global->recv_trailing_metadata_finished != NULL) {
char status_string[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(status, status_string);
grpc_chttp2_incoming_metadata_buffer_add(
&stream_global->received_trailing_metadata,
&stream_global->metadata_buffer[1],
grpc_mdelem_from_metadata_strings(
GRPC_MDSTR_GRPC_STATUS, grpc_mdstr_from_string(status_string)));
if (slice) {
grpc_chttp2_incoming_metadata_buffer_add(
&stream_global->received_trailing_metadata,
&stream_global->metadata_buffer[1],
grpc_mdelem_from_metadata_strings(
GRPC_MDSTR_GRPC_MESSAGE,
grpc_mdstr_from_slice(gpr_slice_ref(*slice))));
}
stream_global->published_trailing_metadata = true;
stream_global->published_metadata[1] = true;
grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
stream_global);
}
@ -1652,8 +1623,8 @@ void grpc_chttp2_mark_stream_closed(
if (close_reads && !stream_global->read_closed) {
stream_global->read_closed_error = GRPC_ERROR_REF(error);
stream_global->read_closed = true;
stream_global->published_initial_metadata = true;
stream_global->published_trailing_metadata = true;
stream_global->published_metadata[0] = true;
stream_global->published_metadata[0] = true;
decrement_active_streams_locked(exec_ctx, transport_global, stream_global);
}
if (close_writes && !stream_global->write_closed) {
@ -1851,7 +1822,7 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
grpc_chttp2_stream_global *stream_global = &s->global;
int was_zero;
int is_zero;
int64_t initial_window_update = t->parsing.initial_window_update;
int64_t initial_window_update = t->global.initial_window_update;
was_zero = stream_global->outgoing_window <= 0;
GRPC_CHTTP2_FLOW_CREDIT_STREAM("settings", transport_global, stream_global,
@ -1868,13 +1839,6 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
* INPUT PROCESSING - PARSING
*/
static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static void reading_action(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
/* Control flow:
@ -1888,30 +1852,6 @@ static void reading_action(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END("reading_action", 0);
}
static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
GPR_TIMER_BEGIN("reading_action_locked", 0);
grpc_chttp2_transport *t = tp;
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
GPR_ASSERT(!t->executor.parsing_active);
if (!t->closed) {
t->executor.parsing_active = 1;
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map);
grpc_chttp2_prepare_to_read(transport_global, transport_parsing);
grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, GRPC_ERROR_REF(error),
NULL);
} else {
post_reading_action_locked(exec_ctx, t, error);
}
GPR_TIMER_END("reading_action_locked", 0);
}
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_http_parser parser;
@ -1939,87 +1879,86 @@ static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
return error;
}
static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
grpc_error *err = GRPC_ERROR_NONE;
GPR_TIMER_BEGIN("reading_action.parse", 0);
size_t i = 0;
grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
GRPC_ERROR_NONE};
for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
errors[1] = grpc_chttp2_perform_read(exec_ctx, &t->parsing,
t->read_buffer.slices[i]);
};
if (errors[1] == GRPC_ERROR_NONE) {
err = GRPC_ERROR_REF(error);
} else {
errors[2] = try_http_parsing(exec_ctx, t);
err = GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors,
GPR_ARRAY_SIZE(errors));
}
for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) {
GRPC_ERROR_UNREF(errors[i]);
}
grpc_combiner_execute(exec_ctx, t->executor.combiner, &t->post_parse_locked,
err);
GPR_TIMER_END("reading_action.parse", 0);
}
static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
GPR_TIMER_BEGIN("reading_action_locked", 0);
static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
GPR_TIMER_BEGIN("post_parse_locked", 0);
grpc_chttp2_transport *t = arg;
grpc_chttp2_transport *t = tp;
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
/* copy parsing qbuf to global qbuf */
if (t->parsing.qbuf.count > 0) {
gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf);
grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
"parsing_qbuf");
}
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map);
transport_global->concurrent_stream_count =
(uint32_t)grpc_chttp2_stream_map_size(&t->parsing_stream_map);
if (transport_parsing->initial_window_update != 0) {
update_global_window_args args = {t, exec_ctx};
grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
update_global_window, &args);
transport_parsing->initial_window_update = 0;
}
/* handle higher level things */
grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing);
t->executor.parsing_active = 0;
/* handle delayed transport ops (if there is one) */
if (t->post_parsing_op) {
grpc_transport_op *op = t->post_parsing_op;
t->post_parsing_op = NULL;
perform_transport_op_locked(exec_ctx, op, GRPC_ERROR_NONE);
gpr_free(op);
}
/* if a stream is in the stream map, and gets cancelled, we need to
* ensure we are not parsing before continuing the cancellation to keep
* things in a sane state */
grpc_chttp2_stream_global *stream_global;
while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global,
&stream_global)) {
GPR_ASSERT(stream_global->in_stream_map);
GPR_ASSERT(stream_global->write_closed);
GPR_ASSERT(stream_global->read_closed);
remove_stream(exec_ctx, t, stream_global->id,
removal_error(GRPC_ERROR_NONE, stream_global));
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
post_reading_action_locked(exec_ctx, t, error);
GPR_TIMER_END("post_parse_locked", 0);
}
GRPC_ERROR_REF(error);
GPR_ASSERT(!t->executor.parsing_active);
if (!t->closed) {
t->executor.parsing_active = 1;
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map);
GPR_TIMER_BEGIN("reading_action.parse", 0);
size_t i = 0;
grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
GRPC_ERROR_NONE};
for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
errors[1] = grpc_chttp2_perform_read(exec_ctx, &t->global,
t->read_buffer.slices[i]);
};
if (errors[1] != GRPC_ERROR_NONE) {
errors[2] = try_http_parsing(exec_ctx, t);
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors,
GPR_ARRAY_SIZE(errors));
}
for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) {
GRPC_ERROR_UNREF(errors[i]);
}
GPR_TIMER_END("reading_action.parse", 0);
GPR_TIMER_BEGIN("post_parse_locked", 0);
if (transport_global->initial_window_update != 0) {
update_global_window_args args = {t, exec_ctx};
grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
update_global_window, &args);
transport_global->initial_window_update = 0;
}
/* handle higher level things */
if (transport_global->incoming_window <
transport_global->connection_window_target * 3 / 4) {
int64_t announce_bytes = transport_global->connection_window_target -
transport_global->incoming_window;
GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT(
"parsed", transport_global, announce_incoming_window, announce_bytes);
GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_global,
incoming_window, announce_bytes);
grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
"global incoming window");
}
t->executor.parsing_active = 0;
/* handle delayed transport ops (if there is one) */
if (t->post_parsing_op) {
grpc_transport_op *op = t->post_parsing_op;
t->post_parsing_op = NULL;
perform_transport_op_locked(exec_ctx, op, GRPC_ERROR_NONE);
gpr_free(op);
}
/* if a stream is in the stream map, and gets cancelled, we need to
* ensure we are not parsing before continuing the cancellation to keep
* things in a sane state */
grpc_chttp2_stream_global *stream_global;
while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global,
&stream_global)) {
GPR_ASSERT(stream_global->in_stream_map);
GPR_ASSERT(stream_global->write_closed);
GPR_ASSERT(stream_global->read_closed);
remove_stream(exec_ctx, t, stream_global->id,
removal_error(GRPC_ERROR_NONE, stream_global));
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
GPR_TIMER_END("post_parse_locked", 0);
}
static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
GPR_TIMER_BEGIN("post_reading_action_locked", 0);
grpc_chttp2_transport *t = arg;
bool keep_reading = false;
GRPC_ERROR_REF(error);
if (error == GRPC_ERROR_NONE && t->closed) {
@ -2049,6 +1988,10 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_UNREF(error);
GPR_TIMER_END("post_reading_action_locked", 0);
GRPC_ERROR_UNREF(error);
GPR_TIMER_END("reading_action_locked", 0);
}
/*******************************************************************************
@ -2125,13 +2068,10 @@ static void incoming_byte_stream_update_flow_control(
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
max_recv_bytes, add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
unannounced_incoming_window_for_parse,
add_max_recv_bytes);
incoming_window, add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
unannounced_incoming_window_for_writing,
add_max_recv_bytes);
grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global,
stream_global);
grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
false, "read_incoming_stream");
}
@ -2259,8 +2199,8 @@ void grpc_chttp2_incoming_byte_stream_finished(
}
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, uint32_t frame_size,
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, uint32_t frame_size,
uint32_t flags, grpc_chttp2_incoming_frame_queue *add_to_queue) {
grpc_chttp2_incoming_byte_stream *incoming_byte_stream =
gpr_malloc(sizeof(*incoming_byte_stream));
@ -2271,8 +2211,8 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
gpr_mu_init(&incoming_byte_stream->slice_mu);
gpr_ref_init(&incoming_byte_stream->refs, 2);
incoming_byte_stream->next_message = NULL;
incoming_byte_stream->transport = TRANSPORT_FROM_PARSING(transport_parsing);
incoming_byte_stream->stream = STREAM_FROM_PARSING(stream_parsing);
incoming_byte_stream->transport = TRANSPORT_FROM_GLOBAL(transport_global);
incoming_byte_stream->stream = STREAM_FROM_GLOBAL(stream_global);
gpr_ref(&incoming_byte_stream->stream->global.active_streams);
gpr_slice_buffer_init(&incoming_byte_stream->slices);
incoming_byte_stream->on_next = NULL;

@ -40,8 +40,8 @@
#include "src/core/lib/iomgr/error.h"
/* defined in internal.h */
typedef struct grpc_chttp2_stream_parsing grpc_chttp2_stream_parsing;
typedef struct grpc_chttp2_transport_parsing grpc_chttp2_transport_parsing;
typedef struct grpc_chttp2_stream_global grpc_chttp2_stream_global;
typedef struct grpc_chttp2_transport_global grpc_chttp2_transport_global;
#define GRPC_CHTTP2_FRAME_DATA 0
#define GRPC_CHTTP2_FRAME_HEADER 1

@ -147,8 +147,8 @@ void grpc_chttp2_encode_data(uint32_t id, gpr_slice_buffer *inbuf,
grpc_error *grpc_chttp2_data_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) {
uint8_t *const beg = GPR_SLICE_START_PTR(slice);
uint8_t *const end = GPR_SLICE_END_PTR(slice);
uint8_t *cur = beg;
@ -158,7 +158,8 @@ grpc_error *grpc_chttp2_data_parser_parse(
char *msg;
if (is_last && p->is_last_frame) {
stream_parsing->received_close = 1;
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
true, false, GRPC_ERROR_NONE);
}
if (cur == end) {
@ -171,7 +172,7 @@ grpc_error *grpc_chttp2_data_parser_parse(
return GRPC_ERROR_REF(p->error);
fh_0:
case GRPC_CHTTP2_DATA_FH_0:
stream_parsing->stats.incoming.framing_bytes++;
stream_global->stats.incoming.framing_bytes++;
p->frame_type = *cur;
switch (p->frame_type) {
case 0:
@ -184,7 +185,7 @@ grpc_error *grpc_chttp2_data_parser_parse(
gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
p->error = GRPC_ERROR_CREATE(msg);
p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
(intptr_t)stream_parsing->id);
(intptr_t)stream_global->id);
gpr_free(msg);
msg = gpr_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
p->error =
@ -201,7 +202,7 @@ grpc_error *grpc_chttp2_data_parser_parse(
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_1:
stream_parsing->stats.incoming.framing_bytes++;
stream_global->stats.incoming.framing_bytes++;
p->frame_size = ((uint32_t)*cur) << 24;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_2;
@ -209,7 +210,7 @@ grpc_error *grpc_chttp2_data_parser_parse(
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_2:
stream_parsing->stats.incoming.framing_bytes++;
stream_global->stats.incoming.framing_bytes++;
p->frame_size |= ((uint32_t)*cur) << 16;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_3;
@ -217,7 +218,7 @@ grpc_error *grpc_chttp2_data_parser_parse(
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_3:
stream_parsing->stats.incoming.framing_bytes++;
stream_global->stats.incoming.framing_bytes++;
p->frame_size |= ((uint32_t)*cur) << 8;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_4;
@ -225,7 +226,7 @@ grpc_error *grpc_chttp2_data_parser_parse(
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_4:
stream_parsing->stats.incoming.framing_bytes++;
stream_global->stats.incoming.framing_bytes++;
p->frame_size |= ((uint32_t)*cur);
p->state = GRPC_CHTTP2_DATA_FRAME;
++cur;
@ -235,18 +236,16 @@ grpc_error *grpc_chttp2_data_parser_parse(
}
p->parsing_frame = incoming_byte_stream =
grpc_chttp2_incoming_byte_stream_create(
exec_ctx, transport_parsing, stream_parsing, p->frame_size,
exec_ctx, transport_global, stream_global, p->frame_size,
message_flags, &p->incoming_frames);
/* fallthrough */
case GRPC_CHTTP2_DATA_FRAME:
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
stream_parsing);
if (cur == end) {
return GRPC_ERROR_NONE;
}
uint32_t remaining = (uint32_t)(end - cur);
if (remaining == p->frame_size) {
stream_parsing->stats.incoming.data_bytes += p->frame_size;
stream_global->stats.incoming.data_bytes += p->frame_size;
grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
@ -256,7 +255,7 @@ grpc_error *grpc_chttp2_data_parser_parse(
p->state = GRPC_CHTTP2_DATA_FH_0;
return GRPC_ERROR_NONE;
} else if (remaining > p->frame_size) {
stream_parsing->stats.incoming.data_bytes += p->frame_size;
stream_global->stats.incoming.data_bytes += p->frame_size;
grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg),
@ -272,7 +271,7 @@ grpc_error *grpc_chttp2_data_parser_parse(
exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
p->frame_size -= remaining;
stream_parsing->stats.incoming.data_bytes += remaining;
stream_global->stats.incoming.data_bytes += remaining;
return GRPC_ERROR_NONE;
}
}

@ -94,8 +94,8 @@ grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
frame */
grpc_error *grpc_chttp2_data_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last);
void grpc_chttp2_encode_data(uint32_t id, gpr_slice_buffer *inbuf,
uint32_t write_bytes, int is_eof,

@ -69,8 +69,8 @@ grpc_error *grpc_chttp2_goaway_parser_begin_frame(grpc_chttp2_goaway_parser *p,
grpc_error *grpc_chttp2_goaway_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) {
uint8_t *const beg = GPR_SLICE_START_PTR(slice);
uint8_t *const end = GPR_SLICE_END_PTR(slice);
uint8_t *cur = beg;
@ -148,12 +148,9 @@ grpc_error *grpc_chttp2_goaway_parser_parse(
p->debug_pos += (uint32_t)(end - cur);
p->state = GRPC_CHTTP2_GOAWAY_DEBUG;
if (is_last) {
transport_parsing->goaway_received = 1;
transport_parsing->goaway_last_stream_index = p->last_stream_id;
gpr_slice_unref(transport_parsing->goaway_text);
transport_parsing->goaway_error = (grpc_status_code)p->error_code;
transport_parsing->goaway_text =
gpr_slice_new(p->debug_data, p->debug_length, gpr_free);
grpc_chttp2_add_incoming_goaway(
exec_ctx, transport_global, (uint32_t)p->error_code,
gpr_slice_new(p->debug_data, p->debug_length, gpr_free));
p->debug_data = NULL;
}
return GRPC_ERROR_NONE;

@ -67,8 +67,8 @@ grpc_error *grpc_chttp2_goaway_parser_begin_frame(
grpc_chttp2_goaway_parser *parser, uint32_t length, uint8_t flags);
grpc_error *grpc_chttp2_goaway_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last);
void grpc_chttp2_goaway_append(uint32_t last_stream_id, uint32_t error_code,
gpr_slice debug_data,

@ -75,8 +75,8 @@ grpc_error *grpc_chttp2_ping_parser_begin_frame(grpc_chttp2_ping_parser *parser,
grpc_error *grpc_chttp2_ping_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) {
uint8_t *const beg = GPR_SLICE_START_PTR(slice);
uint8_t *const end = GPR_SLICE_END_PTR(slice);
uint8_t *cur = beg;
@ -91,9 +91,9 @@ grpc_error *grpc_chttp2_ping_parser_parse(
if (p->byte == 8) {
GPR_ASSERT(is_last);
if (p->is_ack) {
grpc_chttp2_ack_ping(exec_ctx, transport_parsing, p->opaque_8bytes);
grpc_chttp2_ack_ping(exec_ctx, transport_global, p->opaque_8bytes);
} else {
gpr_slice_buffer_add(&transport_parsing->qbuf,
gpr_slice_buffer_add(&transport_global->qbuf,
grpc_chttp2_ping_create(1, p->opaque_8bytes));
}
}

@ -50,7 +50,7 @@ grpc_error *grpc_chttp2_ping_parser_begin_frame(grpc_chttp2_ping_parser *parser,
uint32_t length, uint8_t flags);
grpc_error *grpc_chttp2_ping_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_PING_H */

@ -39,6 +39,8 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/transport/chttp2/transport/frame.h"
#include "src/core/ext/transport/chttp2/transport/http2_errors.h"
#include "src/core/ext/transport/chttp2/transport/status_conversion.h"
gpr_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code,
grpc_transport_one_way_stats *stats) {
@ -85,8 +87,8 @@ grpc_error *grpc_chttp2_rst_stream_parser_begin_frame(
grpc_error *grpc_chttp2_rst_stream_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) {
uint8_t *const beg = GPR_SLICE_START_PTR(slice);
uint8_t *const end = GPR_SLICE_END_PTR(slice);
uint8_t *cur = beg;
@ -97,19 +99,30 @@ grpc_error *grpc_chttp2_rst_stream_parser_parse(
cur++;
p->byte++;
}
stream_parsing->stats.incoming.framing_bytes += (uint64_t)(end - cur);
stream_global->stats.incoming.framing_bytes += (uint64_t)(end - cur);
if (p->byte == 4) {
GPR_ASSERT(is_last);
stream_parsing->received_close = 1;
if (stream_parsing->forced_close_error == GRPC_ERROR_NONE) {
stream_parsing->forced_close_error = grpc_error_set_int(
GRPC_ERROR_CREATE("RST_STREAM"), GRPC_ERROR_INT_HTTP2_ERROR,
(intptr_t)((((uint32_t)p->reason_bytes[0]) << 24) |
(((uint32_t)p->reason_bytes[1]) << 16) |
(((uint32_t)p->reason_bytes[2]) << 8) |
(((uint32_t)p->reason_bytes[3]))));
uint32_t reason = (((uint32_t)p->reason_bytes[0]) << 24) |
(((uint32_t)p->reason_bytes[1]) << 16) |
(((uint32_t)p->reason_bytes[2]) << 8) |
(((uint32_t)p->reason_bytes[3]));
grpc_error *error = GRPC_ERROR_NONE;
if (reason != GRPC_CHTTP2_NO_ERROR) {
error = grpc_error_set_int(GRPC_ERROR_CREATE("RST_STREAM"),
GRPC_ERROR_INT_HTTP2_ERROR, reason);
grpc_status_code status_code = grpc_chttp2_http2_error_to_grpc_status(
(grpc_chttp2_error_code)reason, stream_global->deadline);
char *status_details;
gpr_asprintf(&status_details, "Received RST_STREAM with error code %d",
reason);
gpr_slice slice_details = gpr_slice_from_copied_string(status_details);
gpr_free(status_details);
grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global,
status_code, &slice_details);
}
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
true, true, error);
}
return GRPC_ERROR_NONE;

@ -51,7 +51,7 @@ grpc_error *grpc_chttp2_rst_stream_parser_begin_frame(
grpc_chttp2_rst_stream_parser *parser, uint32_t length, uint8_t flags);
grpc_error *grpc_chttp2_rst_stream_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_RST_STREAM_H */

@ -145,8 +145,8 @@ grpc_error *grpc_chttp2_settings_parser_begin_frame(
grpc_error *grpc_chttp2_settings_parser_parse(
grpc_exec_ctx *exec_ctx, void *p,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) {
grpc_chttp2_settings_parser *parser = p;
const uint8_t *cur = GPR_SLICE_START_PTR(slice);
const uint8_t *end = GPR_SLICE_END_PTR(slice);
@ -162,10 +162,9 @@ grpc_error *grpc_chttp2_settings_parser_parse(
if (cur == end) {
parser->state = GRPC_CHTTP2_SPS_ID0;
if (is_last) {
transport_parsing->settings_updated = 1;
memcpy(parser->target_settings, parser->incoming_settings,
GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t));
gpr_slice_buffer_add(&transport_parsing->qbuf,
gpr_slice_buffer_add(&transport_global->qbuf,
grpc_chttp2_settings_ack_create());
}
return GRPC_ERROR_NONE;
@ -226,9 +225,9 @@ grpc_error *grpc_chttp2_settings_parser_parse(
break;
case GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE:
grpc_chttp2_goaway_append(
transport_parsing->last_incoming_stream_id, sp->error_value,
transport_global->last_incoming_stream_id, sp->error_value,
gpr_slice_from_static_string("HTTP2 settings error"),
&transport_parsing->qbuf);
&transport_global->qbuf);
gpr_asprintf(&msg, "invalid value %u passed for %s",
parser->value, sp->name);
grpc_error *err = GRPC_ERROR_CREATE(msg);
@ -238,17 +237,17 @@ grpc_error *grpc_chttp2_settings_parser_parse(
}
if (parser->id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE &&
parser->incoming_settings[parser->id] != parser->value) {
transport_parsing->initial_window_update =
transport_global->initial_window_update =
(int64_t)parser->value - parser->incoming_settings[parser->id];
if (grpc_http_trace) {
gpr_log(GPR_DEBUG, "adding %d for initial_window change",
(int)transport_parsing->initial_window_update);
(int)transport_global->initial_window_update);
}
}
parser->incoming_settings[parser->id] = parser->value;
if (grpc_http_trace) {
gpr_log(GPR_DEBUG, "CHTTP2:%s: got setting %d = %d",
transport_parsing->is_client ? "CLI" : "SVR", parser->id,
transport_global->is_client ? "CLI" : "SVR", parser->id,
parser->value);
}
} else if (grpc_http_trace) {

@ -97,7 +97,7 @@ grpc_error *grpc_chttp2_settings_parser_begin_frame(
uint32_t *settings);
grpc_error *grpc_chttp2_settings_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_SETTINGS_H */

@ -81,8 +81,8 @@ grpc_error *grpc_chttp2_window_update_parser_begin_frame(
grpc_error *grpc_chttp2_window_update_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) {
uint8_t *const beg = GPR_SLICE_START_PTR(slice);
uint8_t *const end = GPR_SLICE_END_PTR(slice);
uint8_t *cur = beg;
@ -94,8 +94,8 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
p->byte++;
}
if (stream_parsing != NULL) {
stream_parsing->stats.incoming.framing_bytes += (uint32_t)(end - cur);
if (stream_global != NULL) {
stream_global->stats.incoming.framing_bytes += (uint32_t)(end - cur);
}
if (p->byte == 4) {
@ -109,17 +109,26 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
}
GPR_ASSERT(is_last);
if (transport_parsing->incoming_stream_id != 0) {
if (stream_parsing != NULL) {
GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", transport_parsing,
stream_parsing, outgoing_window,
received_update);
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
stream_parsing);
if (transport_global->incoming_stream_id != 0) {
if (stream_global != NULL) {
bool was_zero = stream_global->outgoing_window <= 0;
GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", transport_global, stream_global,
outgoing_window, received_update);
bool is_zero = stream_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
false, "stream.read_flow_control");
}
}
} else {
GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parse", transport_parsing,
bool was_zero = transport_global->outgoing_window <= 0;
GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parse", transport_global,
outgoing_window, received_update);
bool is_zero = transport_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
"new_global_flow_control");
}
}
}

@ -52,7 +52,7 @@ grpc_error *grpc_chttp2_window_update_parser_begin_frame(
grpc_chttp2_window_update_parser *parser, uint32_t length, uint8_t flags);
grpc_error *grpc_chttp2_window_update_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_WINDOW_UPDATE_H */

@ -1496,12 +1496,12 @@ grpc_error *grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p,
grpc_error *grpc_chttp2_header_parser_parse(
grpc_exec_ctx *exec_ctx, void *hpack_parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) {
grpc_chttp2_hpack_parser *parser = hpack_parser;
GPR_TIMER_BEGIN("grpc_chttp2_hpack_parser_parse", 0);
if (stream_parsing != NULL) {
stream_parsing->stats.incoming.header_bytes += GPR_SLICE_LENGTH(slice);
if (stream_global != NULL) {
stream_global->stats.incoming.header_bytes += GPR_SLICE_LENGTH(slice);
}
grpc_error *error = grpc_chttp2_hpack_parser_parse(
parser, GPR_SLICE_START_PTR(slice), GPR_SLICE_END_PTR(slice));
@ -1517,20 +1517,22 @@ grpc_error *grpc_chttp2_header_parser_parse(
}
/* need to check for null stream: this can occur if we receive an invalid
stream id on a header */
if (stream_parsing != NULL) {
if (stream_global != NULL) {
if (parser->is_boundary) {
if (stream_parsing->header_frames_received ==
GPR_ARRAY_SIZE(stream_parsing->got_metadata_on_parse)) {
if (stream_global->header_frames_received ==
GPR_ARRAY_SIZE(stream_global->metadata_buffer)) {
return GRPC_ERROR_CREATE("Too many trailer frames");
}
stream_parsing
->got_metadata_on_parse[stream_parsing->header_frames_received] = 1;
stream_parsing->header_frames_received++;
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
stream_parsing);
stream_global
->published_metadata[stream_global->header_frames_received] = true;
stream_global->header_frames_received++;
grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
stream_global);
}
if (parser->is_eof) {
stream_parsing->received_close = 1;
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global,
stream_global, true, false,
GRPC_ERROR_NONE);
}
}
parser->on_header = NULL;

@ -112,7 +112,7 @@ grpc_error *grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p,
the transport */
grpc_error *grpc_chttp2_header_parser_parse(
grpc_exec_ctx *exec_ctx, void *hpack_parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_PARSER_H */

@ -61,11 +61,9 @@ typedef struct grpc_chttp2_stream grpc_chttp2_stream;
typedef enum {
GRPC_CHTTP2_LIST_ALL_STREAMS,
GRPC_CHTTP2_LIST_CHECK_READ_OPS,
GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE,
GRPC_CHTTP2_LIST_WRITABLE,
GRPC_CHTTP2_LIST_WRITING,
GRPC_CHTTP2_LIST_WRITTEN,
GRPC_CHTTP2_LIST_PARSING_SEEN,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_WRITING,
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
@ -178,7 +176,7 @@ struct grpc_chttp2_incoming_byte_stream {
grpc_closure finished_action;
};
typedef struct {
struct grpc_chttp2_transport_global {
/** data to write next write */
gpr_slice_buffer qbuf;
@ -223,36 +221,7 @@ typedef struct {
/** concurrent stream count: updated when not parsing,
so this is a strict over-estimation on the client */
uint32_t concurrent_stream_count;
} grpc_chttp2_transport_global;
typedef struct {
/** data to write now */
gpr_slice_buffer outbuf;
/** hpack encoding */
grpc_chttp2_hpack_compressor hpack_compressor;
int64_t outgoing_window;
/** is this a client? */
uint8_t is_client;
/** callback for when writing is done */
grpc_closure done_cb;
} grpc_chttp2_transport_writing;
struct grpc_chttp2_transport_parsing {
/** is this transport a client? (boolean) */
uint8_t is_client;
/** were settings updated? */
uint8_t settings_updated;
/** was a settings ack received? */
uint8_t settings_ack_received;
/** was a goaway frame received? */
uint8_t goaway_received;
/** initial window change */
int64_t initial_window_update;
/** data to write later - after parsing */
gpr_slice_buffer qbuf;
/** parser for headers */
grpc_chttp2_hpack_parser hpack_parser;
/** simple one shot parsers */
@ -265,13 +234,12 @@ struct grpc_chttp2_transport_parsing {
/** parser for goaway frames */
grpc_chttp2_goaway_parser goaway_parser;
/** initial window change */
int64_t initial_window_update;
/** window available for peer to send to us */
int64_t incoming_window;
/** next stream id available at the time of beginning parsing */
uint32_t next_stream_id;
uint32_t last_incoming_stream_id;
/* deframing */
grpc_chttp2_deframe_transport_state deframe_state;
uint8_t incoming_frame_type;
@ -282,29 +250,36 @@ struct grpc_chttp2_transport_parsing {
uint32_t incoming_frame_size;
uint32_t incoming_stream_id;
/* current max frame size */
uint32_t max_frame_size;
/* active parser */
void *parser_data;
grpc_chttp2_stream_parsing *incoming_stream;
grpc_chttp2_stream_global *incoming_stream;
grpc_error *(*parser)(grpc_exec_ctx *exec_ctx, void *parser_user_data,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
gpr_slice slice, int is_last);
/* received settings */
uint32_t settings[GRPC_CHTTP2_NUM_SETTINGS];
/* last settings that were sent */
uint32_t last_sent_settings[GRPC_CHTTP2_NUM_SETTINGS];
/* goaway data */
grpc_status_code goaway_error;
uint32_t goaway_last_stream_index;
gpr_slice goaway_text;
};
typedef struct {
/** data to write now */
gpr_slice_buffer outbuf;
/** hpack encoding */
grpc_chttp2_hpack_compressor hpack_compressor;
int64_t outgoing_window;
/** is this a client? */
uint8_t is_client;
/** callback for when writing is done */
grpc_closure done_cb;
} grpc_chttp2_transport_writing;
#if 0
struct grpc_chttp2_transport_parsing {
};
#endif
typedef enum {
/** no writing activity allowed */
@ -360,9 +335,6 @@ struct grpc_chttp2_transport {
set writing_state >= GRPC_WRITING, and only by the writing closure
chain. */
grpc_chttp2_transport_writing writing;
/** state only accessible by the chain of execution that
set parsing_active=1 */
grpc_chttp2_transport_parsing parsing;
/** maps stream id to grpc_chttp2_stream objects;
owned by the parsing thread when parsing */
@ -378,9 +350,6 @@ struct grpc_chttp2_transport {
/** closure to start reading from the endpoint */
grpc_closure reading_action;
grpc_closure reading_action_locked;
grpc_closure post_parse_locked;
/** closure to actually do parsing */
grpc_closure parsing_action;
/** closure to initiate writing */
grpc_closure initiate_writing;
/** closure to finish writing */
@ -410,7 +379,7 @@ struct grpc_chttp2_transport {
grpc_transport_op *post_parsing_op;
};
typedef struct {
struct grpc_chttp2_stream_global {
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
uint32_t id;
@ -424,7 +393,8 @@ typedef struct {
not yet announced to HTTP2 flow control.
As the upper layers offer to read more bytes, this value increases.
As we advertise incoming flow control window, this value decreases. */
uint32_t unannounced_incoming_window_for_parse;
/* TODO(ctiller): remove this, it's equivalent to incoming_window now
uint32_t unannounced_incoming_window_for_parse; */
uint32_t unannounced_incoming_window_for_writing;
/** things the upper layers would like to send */
grpc_metadata_batch *send_initial_metadata;
@ -465,17 +435,26 @@ typedef struct {
/** the error that resulted in this stream being write-closed */
grpc_error *write_closed_error;
bool published_initial_metadata;
bool published_trailing_metadata;
bool published_metadata[2];
bool final_metadata_requested;
grpc_chttp2_incoming_metadata_buffer received_initial_metadata;
grpc_chttp2_incoming_metadata_buffer received_trailing_metadata;
grpc_chttp2_incoming_metadata_buffer metadata_buffer[2];
grpc_chttp2_incoming_frame_queue incoming_frames;
gpr_timespec deadline;
} grpc_chttp2_stream_global;
/** saw some stream level error */
grpc_error *forced_close_error;
/** how many header frames have we received? */
uint8_t header_frames_received;
/** window available for peer to send to us */
int64_t incoming_window;
/** parsing state for data frames */
grpc_chttp2_data_parser data_parser;
/** number of bytes received - reset at end of parse thread execution */
int64_t received_bytes;
};
typedef struct {
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
@ -500,41 +479,19 @@ typedef struct {
grpc_transport_one_way_stats stats;
} grpc_chttp2_stream_writing;
#if 0
struct grpc_chttp2_stream_parsing {
/** saw some stream level error */
grpc_error *forced_close_error;
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
uint32_t id;
/** has this stream received a close */
uint8_t received_close;
/** how many header frames have we received? */
uint8_t header_frames_received;
/** which metadata did we get (on this parse) */
uint8_t got_metadata_on_parse[2];
/** should we raise the seen_error flag in transport_global */
bool seen_error;
bool exceeded_metadata_size;
/** window available for peer to send to us */
int64_t incoming_window;
/** parsing state for data frames */
grpc_chttp2_data_parser data_parser;
/** amount of window given */
int64_t outgoing_window;
/** number of bytes received - reset at end of parse thread execution */
int64_t received_bytes;
/** stats gathered during the parse */
grpc_transport_stream_stats stats;
/** incoming metadata */
grpc_chttp2_incoming_metadata_buffer metadata_buffer[2];
};
#endif
struct grpc_chttp2_stream {
grpc_chttp2_transport *t;
grpc_stream_refcount *refcount;
grpc_chttp2_stream_global global;
grpc_chttp2_stream_writing writing;
grpc_chttp2_stream_parsing parsing;
grpc_closure init_stream;
grpc_closure destroy_stream;
@ -574,16 +531,11 @@ void grpc_chttp2_cleanup_writing(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing);
void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global,
grpc_chttp2_transport_parsing *parsing);
/** Process one slice of incoming data; return 1 if the connection is still
viable after reading, or 0 if the connection should be torn down */
grpc_error *grpc_chttp2_perform_read(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
gpr_slice slice);
void grpc_chttp2_publish_reads(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *global,
grpc_chttp2_transport_parsing *parsing);
bool grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global,
@ -617,15 +569,6 @@ int grpc_chttp2_list_pop_written_stream(
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_writing **stream_writing);
void grpc_chttp2_list_add_parsing_seen_stream(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing);
int grpc_chttp2_list_pop_parsing_seen_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_parsing **stream_parsing);
void grpc_chttp2_list_add_waiting_for_concurrency(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
@ -659,18 +602,6 @@ void grpc_chttp2_list_remove_stalled_by_transport(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
void grpc_chttp2_list_add_unannounced_incoming_window_available(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
void grpc_chttp2_list_remove_unannounced_incoming_window_available(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
int grpc_chttp2_list_pop_unannounced_incoming_window_available(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_parsing **stream_parsing);
void grpc_chttp2_list_add_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
@ -685,10 +616,10 @@ int grpc_chttp2_list_pop_closed_waiting_for_writing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
grpc_chttp2_transport_parsing *transport_parsing, uint32_t id);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_global *grpc_chttp2_parsing_lookup_stream(
grpc_chttp2_transport_global *transport_global, uint32_t id);
grpc_chttp2_stream_global *grpc_chttp2_parsing_accept_stream(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
uint32_t id);
void grpc_chttp2_add_incoming_goaway(
@ -707,7 +638,7 @@ void grpc_chttp2_for_all_streams(
grpc_chttp2_stream_global *stream_global));
void grpc_chttp2_parsing_become_skip_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);
void grpc_chttp2_complete_closure_step(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
@ -835,8 +766,8 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
#endif
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, uint32_t frame_size,
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, uint32_t frame_size,
uint32_t flags, grpc_chttp2_incoming_frame_queue *add_to_queue);
void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs,
@ -846,7 +777,7 @@ void grpc_chttp2_incoming_byte_stream_finished(
grpc_error *error, int from_parsing_thread);
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_parsing *parsing,
grpc_chttp2_transport_global *parsing,
const uint8_t *opaque_8bytes);
/** add a ref to the stream and add it to the writable list;

File diff suppressed because it is too large Load Diff

@ -220,63 +220,6 @@ int grpc_chttp2_list_pop_written_stream(
return r;
}
void grpc_chttp2_list_add_unannounced_incoming_window_available(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
GPR_ASSERT(stream_global->id != 0);
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE);
}
void grpc_chttp2_list_remove_unannounced_incoming_window_available(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_maybe_remove(
TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE);
}
int grpc_chttp2_list_pop_unannounced_incoming_window_available(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_parsing **stream_parsing) {
grpc_chttp2_stream *stream;
int r =
stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE);
if (r != 0) {
*stream_global = &stream->global;
*stream_parsing = &stream->parsing;
}
return r;
}
void grpc_chttp2_list_add_parsing_seen_stream(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing) {
stream_list_add(TRANSPORT_FROM_PARSING(transport_parsing),
STREAM_FROM_PARSING(stream_parsing),
GRPC_CHTTP2_LIST_PARSING_SEEN);
}
int grpc_chttp2_list_pop_parsing_seen_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_parsing **stream_parsing) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_PARSING(transport_parsing), &stream,
GRPC_CHTTP2_LIST_PARSING_SEEN);
if (r != 0) {
*stream_global = &stream->global;
*stream_parsing = &stream->parsing;
}
return r;
}
void grpc_chttp2_list_add_waiting_for_concurrency(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {

Loading…
Cancel
Save