Exploratory work towards splitting parsing from the transport lock

pull/2149/head
Craig Tiller 10 years ago
parent f3fba749aa
commit e0617624ba
  1. 179
      src/core/transport/chttp2_transport.c

@ -104,6 +104,10 @@ typedef enum {
/* streams that have finished reading: we wait until unlock to coalesce
all changes into one callback */
FINISHED_READ_OP,
MAYBE_FINISH_READ_AFTER_PARSE,
PARSER_CHECK_WINDOW_UPDATES_AFTER_PARSE,
OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE,
NEW_OUTGOING_WINDOW,
STREAM_LIST_COUNT /* must be last */
} stream_list_id;
@ -229,6 +233,7 @@ struct transport {
/* basic state management - what are we doing at the moment? */
gpr_uint8 reading;
gpr_uint8 parsing;
gpr_uint8 writing;
/** are we calling back (via cb) with a channel-level event */
gpr_uint8 calling_back_channel;
@ -254,6 +259,7 @@ struct transport {
/* window management */
gpr_uint32 outgoing_window;
gpr_uint32 outgoing_window_update;
gpr_uint32 incoming_window;
gpr_uint32 connection_window_target;
@ -319,6 +325,7 @@ struct stream {
gpr_uint32 incoming_window;
gpr_int64 outgoing_window;
gpr_uint32 outgoing_window_update;
/* when the application requests writes be closed, the write_closed is
'queued'; when the close is flow controlled into the send path, we are
'sending' it; when the write has been performed it is 'sent' */
@ -395,7 +402,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error);
static void schedule_cb(transport *t, op_closure closure, int success);
static void maybe_finish_read(transport *t, stream *s);
static void maybe_finish_read(transport *t, stream *s, int is_parser);
static void maybe_join_window_updates(transport *t, stream *s);
static void finish_reads(transport *t);
static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
@ -652,8 +659,8 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
ref_transport(t);
lock(t);
if (!server_data) {
lock(t);
s->id = 0;
s->outgoing_window = 0;
s->incoming_window = 0;
@ -675,9 +682,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
if (initial_op) perform_op_locked(t, s, initial_op);
if (!server_data) {
unlock(t);
}
unlock(t);
return 0;
}
@ -694,16 +699,11 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
gpr_mu_lock(&t->mu);
/* stop parsing if we're currently parsing this stream */
if (t->deframe_state == DTS_FRAME && t->incoming_stream_id == s->id &&
s->id != 0) {
become_skip_parser(t);
}
GPR_ASSERT(s->published_state == GRPC_STREAM_CLOSED || s->id == 0);
for (i = 0; i < STREAM_LIST_COUNT; i++) {
stream_list_remove(t, s, i);
}
remove_from_stream_map(t, s);
gpr_mu_unlock(&t->mu);
@ -835,7 +835,9 @@ static void unlock(transport *t) {
finalize_cancellations(t);
}
finish_reads(t);
if (!t->parsing) {
finish_reads(t);
}
/* gather any callbacks that need to be made */
if (!t->calling_back_ops) {
@ -850,7 +852,7 @@ static void unlock(transport *t) {
t->cb = NULL; /* no more callbacks */
t->error_state = ERROR_STATE_NOTIFIED;
}
if (t->num_pending_goaways) {
if (!t->parsing && t->num_pending_goaways) {
goaways = t->pending_goaways;
num_goaways = t->num_pending_goaways;
t->pending_goaways = NULL;
@ -930,8 +932,10 @@ static int prepare_write(transport *t) {
gpr_uint32 window_delta;
/* simple writes are queued to qbuf, and flushed here */
gpr_slice_buffer_swap(&t->qbuf, &t->outbuf);
GPR_ASSERT(t->qbuf.count == 0);
if (!t->parsing) {
gpr_slice_buffer_swap(&t->qbuf, &t->outbuf);
GPR_ASSERT(t->qbuf.count == 0);
}
if (t->dirtied_local_settings && !t->sent_local_settings) {
gpr_slice_buffer_add(
@ -976,26 +980,28 @@ static int prepare_write(transport *t) {
}
}
/* for each stream that wants to update its window, add that window here */
while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
window_delta =
t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
s->incoming_window;
if (!s->read_closed && window_delta) {
gpr_slice_buffer_add(
&t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
FLOWCTL_TRACE(t, s, incoming, s->id, window_delta);
s->incoming_window += window_delta;
if (!t->parsing) {
/* for each stream that wants to update its window, add that window here */
while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
window_delta =
t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
s->incoming_window;
if (!s->read_closed && window_delta) {
gpr_slice_buffer_add(
&t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
FLOWCTL_TRACE(t, s, incoming, s->id, window_delta);
s->incoming_window += window_delta;
}
}
}
/* if the transport is ready to send a window update, do so here also */
if (t->incoming_window < t->connection_window_target * 3 / 4) {
window_delta = t->connection_window_target - t->incoming_window;
gpr_slice_buffer_add(&t->outbuf,
grpc_chttp2_window_update_create(0, window_delta));
FLOWCTL_TRACE(t, t, incoming, 0, window_delta);
t->incoming_window += window_delta;
/* if the transport is ready to send a window update, do so here also */
if (t->incoming_window < t->connection_window_target * 3 / 4) {
window_delta = t->connection_window_target - t->incoming_window;
gpr_slice_buffer_add(&t->outbuf,
grpc_chttp2_window_update_create(0, window_delta));
FLOWCTL_TRACE(t, t, incoming, 0, window_delta);
t->incoming_window += window_delta;
}
}
return t->outbuf.length > 0 || !stream_list_empty(t, WRITING);
@ -1031,7 +1037,7 @@ static void finish_write_common(transport *t, int success) {
if (!t->is_client) {
s->read_closed = 1;
}
maybe_finish_read(t, s);
maybe_finish_read(t, s, 0);
}
t->outbuf.count = 0;
t->outbuf.length = 0;
@ -1089,7 +1095,7 @@ static void add_goaway(transport *t, gpr_uint32 goaway_error,
static void maybe_start_some_streams(transport *t) {
/* start streams where we have free stream ids and free concurrency */
while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
while (!t->parsing && t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
grpc_chttp2_stream_map_size(&t->stream_map) <
t->settings[PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) {
@ -1169,7 +1175,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
s->publish_state = op->recv_state;
gpr_free(s->old_incoming_metadata);
s->old_incoming_metadata = NULL;
maybe_finish_read(t, s);
maybe_finish_read(t, s, 0);
maybe_join_window_updates(t, s);
}
@ -1231,7 +1237,7 @@ static void finalize_cancellations(transport *t) {
while ((s = stream_list_remove_head(t, CANCELLED))) {
s->read_closed = 1;
s->write_state = WRITE_STATE_SENT_CLOSE;
maybe_finish_read(t, s);
maybe_finish_read(t, s, 0);
}
}
@ -1249,7 +1255,8 @@ static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) {
static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code,
grpc_mdstr *optional_message, int send_rst) {
grpc_mdstr *optional_message, int send_rst,
int is_parser) {
int had_outgoing;
char buffer[GPR_LTOA_MIN_BUFSIZE];
@ -1299,7 +1306,7 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
add_metadata_batch(t, s);
}
}
maybe_finish_read(t, s);
maybe_finish_read(t, s, is_parser);
}
if (!id) send_rst = 0;
if (send_rst) {
@ -1314,8 +1321,10 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
static void cancel_stream_id(transport *t, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code, int send_rst) {
lock(t);
cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code,
NULL, send_rst);
NULL, send_rst, 1);
unlock(t);
}
static void cancel_stream(transport *t, stream *s,
@ -1323,7 +1332,7 @@ static void cancel_stream(transport *t, stream *s,
grpc_chttp2_error_code error_code,
grpc_mdstr *optional_message, int send_rst) {
cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message,
send_rst);
send_rst, 0);
}
static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) {
@ -1343,13 +1352,19 @@ static void drop_connection(transport *t) {
end_all_the_calls(t);
}
static void maybe_finish_read(transport *t, stream *s) {
if (s->incoming_sopb) {
static void maybe_finish_read(transport *t, stream *s, int is_parser) {
if (is_parser) {
stream_list_join(t, s, MAYBE_FINISH_READ_AFTER_PARSE);
} else if (s->incoming_sopb) {
stream_list_join(t, s, FINISHED_READ_OP);
}
}
static void maybe_join_window_updates(transport *t, stream *s) {
if (t->parsing) {
stream_list_join(t, s, OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE);
return;
}
if (s->incoming_sopb != NULL &&
s->incoming_window <
t->settings[LOCAL_SETTINGS]
@ -1378,7 +1393,7 @@ static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
s->incoming_window -= t->incoming_frame_size;
/* if the stream incoming window is getting low, schedule an update */
maybe_join_window_updates(t, s);
stream_list_join(t, s, PARSER_CHECK_WINDOW_UPDATES_AFTER_PARSE);
return GRPC_CHTTP2_PARSE_OK;
}
@ -1475,7 +1490,7 @@ static void on_header(void *tp, grpc_mdelem *md) {
} else {
add_incoming_metadata(t, s, md);
}
maybe_finish_read(t, s);
maybe_finish_read(t, s, 1);
}
static int init_header_frame_parser(transport *t, int is_continuation) {
@ -1667,9 +1682,11 @@ static int init_frame_parser(transport *t) {
}
}
/*
static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
return window + window_update < MAX_WINDOW;
}
*/
static void add_metadata_batch(transport *t, stream *s) {
grpc_metadata_batch b;
@ -1695,18 +1712,17 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
case GRPC_CHTTP2_PARSE_OK:
if (st.end_of_stream) {
t->incoming_stream->read_closed = 1;
maybe_finish_read(t, t->incoming_stream);
maybe_finish_read(t, t->incoming_stream, 1);
}
if (st.need_flush_reads) {
maybe_finish_read(t, t->incoming_stream);
maybe_finish_read(t, t->incoming_stream, 1);
}
if (st.metadata_boundary) {
add_metadata_batch(t, t->incoming_stream);
maybe_finish_read(t, t->incoming_stream);
maybe_finish_read(t, t->incoming_stream, 1);
}
if (st.ack_settings) {
gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
maybe_start_some_streams(t);
}
if (st.send_ping_ack) {
gpr_slice_buffer_add(
@ -1737,13 +1753,8 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
if (st.initial_window_update) {
for (i = 0; i < t->stream_map.count; i++) {
stream *s = (stream *)(t->stream_map.values[i]);
int was_window_empty = s->outgoing_window <= 0;
FLOWCTL_TRACE(t, s, outgoing, s->id, st.initial_window_update);
s->outgoing_window += st.initial_window_update;
if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb &&
s->outgoing_sopb->nops > 0) {
stream_list_join(t, s, WRITABLE);
}
s->outgoing_window_update += st.initial_window_update;
stream_list_join(t, s, NEW_OUTGOING_WINDOW);
}
}
if (st.window_update) {
@ -1751,30 +1762,12 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
/* if there was a stream id, this is for some stream */
stream *s = lookup_stream(t, t->incoming_stream_id);
if (s) {
int was_window_empty = s->outgoing_window <= 0;
if (!is_window_update_legal(st.window_update, s->outgoing_window)) {
cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
GRPC_CHTTP2_FLOW_CONTROL_ERROR),
GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1);
} else {
FLOWCTL_TRACE(t, s, outgoing, s->id, st.window_update);
s->outgoing_window += st.window_update;
/* if this window update makes outgoing ops writable again,
flag that */
if (was_window_empty && s->outgoing_sopb &&
s->outgoing_sopb->nops > 0) {
stream_list_join(t, s, WRITABLE);
}
}
s->outgoing_window_update += st.window_update;
stream_list_join(t, s, NEW_OUTGOING_WINDOW);
}
} else {
/* transport level window update */
if (!is_window_update_legal(st.window_update, t->outgoing_window)) {
drop_connection(t);
} else {
FLOWCTL_TRACE(t, t, outgoing, 0, st.window_update);
t->outgoing_window += st.window_update;
}
t->outgoing_window_update += st.window_update;
}
}
return 1;
@ -1979,6 +1972,7 @@ static int process_read(transport *t, gpr_slice slice) {
static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error) {
transport *t = tp;
stream *s;
size_t i;
int keep_reading = 0;
@ -1998,11 +1992,40 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
unref_transport(t);
break;
case GRPC_ENDPOINT_CB_OK:
lock(t);
gpr_mu_lock(&t->mu);
GPR_ASSERT(!t->parsing);
t->parsing = 1;
gpr_mu_unlock(&t->mu);
if (t->cb) {
for (i = 0; i < nslices && process_read(t, slices[i]); i++)
;
}
lock(t);
t->parsing = 0;
while ((s = stream_list_remove_head(t, MAYBE_FINISH_READ_AFTER_PARSE))) {
maybe_finish_read(t, s, 0);
}
while ((s = stream_list_remove_head(t, PARSER_CHECK_WINDOW_UPDATES_AFTER_PARSE))) {
maybe_join_window_updates(t, s);
}
while ((s = stream_list_remove_head(t, OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE))) {
maybe_join_window_updates(t, s);
}
while ((s = stream_list_remove_head(t, NEW_OUTGOING_WINDOW))) {
int was_window_empty = s->outgoing_window <= 0;
FLOWCTL_TRACE(t, s, outgoing, s->id, s->outgoing_window_update);
s->outgoing_window += s->outgoing_window_update;
s->outgoing_window_update = 0;
/* if this window update makes outgoing ops writable again,
flag that */
if (was_window_empty && s->outgoing_sopb &&
s->outgoing_sopb->nops > 0) {
stream_list_join(t, s, WRITABLE);
}
}
t->outgoing_window += t->outgoing_window_update;
t->outgoing_window_update = 0;
maybe_start_some_streams(t);
unlock(t);
keep_reading = 1;
break;

Loading…
Cancel
Save