diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 9aee7ca4f19..fed30887896 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -91,6 +91,9 @@ typedef enum { /* streams that are waiting to start because there are too many concurrent streams on the connection */ WAITING_FOR_CONCURRENCY, + /* streams that have finished reading: we wait until unlock to coalesce + all changes into one callback */ + FINISHED_READ_OP, STREAM_LIST_COUNT /* must be last */ } stream_list_id; @@ -137,6 +140,12 @@ typedef enum { DTS_FRAME } deframe_transport_state; +typedef enum { + WRITE_STATE_OPEN, + WRITE_STATE_QUEUED_CLOSE, + WRITE_STATE_SENT_CLOSE +} WRITE_STATE; + typedef struct { stream *head; stream *tail; @@ -181,7 +190,7 @@ typedef struct { typedef struct { void (*cb)(void *user_data, int success); void *user_data; - int status; + int success; } op_closure; typedef struct { @@ -293,12 +302,10 @@ struct stream { /* when the application requests writes be closed, the write_closed is 'queued'; when the close is flow controlled into the send path, we are 'sending' it; when the write has been performed it is 'sent' */ - gpr_uint8 queued_write_closed; - gpr_uint8 sending_write_closed; - gpr_uint8 sent_write_closed; + WRITE_STATE write_state; + gpr_uint8 send_closed; gpr_uint8 read_closed; gpr_uint8 cancelled; - gpr_uint8 allow_window_updates; gpr_uint8 published_close; op_closure send_done_closure; @@ -314,7 +321,10 @@ struct stream { gpr_timespec incoming_deadline; /* sops from application */ - grpc_stream_op_buffer outgoing_sopb; + grpc_stream_op_buffer *outgoing_sopb; + grpc_stream_op_buffer *incoming_sopb; + grpc_stream_state *publish_state; + grpc_stream_state published_state; /* sops that have passed flow control to be written */ grpc_stream_op_buffer writing_sopb; @@ -363,6 +373,13 @@ static void become_skip_parser(transport *t); static void recv_data(void *tp, gpr_slice *slices, size_t nslices, grpc_endpoint_cb_status error); +static void schedule_cb(transport *t, op_closure closure, int success); +static void maybe_finish_read(transport *t, stream *s); +static void maybe_join_window_updates(transport *t, stream *s); +static void finish_reads(transport *t); +static void add_to_pollset_locked(transport *t, grpc_pollset *pollset); + + /* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -582,6 +599,8 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, transport *t = (transport *)gt; stream *s = (stream *)gs; + memset(s, 0, sizeof(*s)); + ref_transport(t); if (!server_data) { @@ -597,20 +616,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; s->incoming_window = t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->queued_write_closed = 0; - s->sending_write_closed = 0; - s->sent_write_closed = 0; - s->read_closed = 0; - s->cancelled = 0; - s->allow_window_updates = 0; - s->published_close = 0; - s->incoming_metadata_count = 0; - s->incoming_metadata_capacity = 0; - s->incoming_metadata = NULL; s->incoming_deadline = gpr_inf_future; - memset(&s->links, 0, sizeof(s->links)); - memset(&s->included, 0, sizeof(s->included)); - grpc_sopb_init(&s->outgoing_sopb); grpc_sopb_init(&s->writing_sopb); grpc_sopb_init(&s->callback_sopb); grpc_chttp2_data_parser_init(&s->parser); @@ -647,7 +653,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { gpr_mu_unlock(&t->mu); - grpc_sopb_destroy(&s->outgoing_sopb); + GPR_ASSERT(s->outgoing_sopb == NULL); grpc_sopb_destroy(&s->writing_sopb); grpc_sopb_destroy(&s->callback_sopb); grpc_chttp2_data_parser_destroy(&s->parser); @@ -765,6 +771,8 @@ static void unlock(transport *t) { finalize_cancellations(t); } + finish_reads(t); + /* gather any callbacks that need to be made */ if (!t->calling_back && cb) { perform_callbacks = prepare_callbacks(t); @@ -868,22 +876,23 @@ static int prepare_write(transport *t) { while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) && s->outgoing_window > 0) { window_delta = grpc_chttp2_preencode( - s->outgoing_sopb.ops, &s->outgoing_sopb.nops, + s->outgoing_sopb->ops, &s->outgoing_sopb->nops, GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb); t->outgoing_window -= window_delta; s->outgoing_window -= window_delta; - s->sending_write_closed = - s->queued_write_closed && s->outgoing_sopb.nops == 0; - if (s->writing_sopb.nops > 0 || s->sending_write_closed) { + if (s->write_state == WRITE_STATE_QUEUED_CLOSE && s->outgoing_sopb->nops == 0) { + s->send_closed = 1; + } + if (s->writing_sopb.nops > 0 || s->send_closed) { stream_list_join(t, s, WRITING); } - /* if there are still writes to do and the stream still has window - available, then schedule a further write */ - if (s->outgoing_sopb.nops > 0 && s->outgoing_window > 0) { - GPR_ASSERT(!t->outgoing_window); - stream_list_add_tail(t, s, WRITABLE); + /* we should either exhaust window or have no ops left, but not both */ + GPR_ASSERT(s->outgoing_sopb->nops == 0 || s->outgoing_window <= 0); + if (s->outgoing_sopb->nops == 0) { + s->outgoing_sopb = NULL; + schedule_cb(t, s->send_done_closure, 1); } } @@ -915,10 +924,10 @@ static void finalize_outbuf(transport *t) { while ((s = stream_list_remove_head(t, WRITING))) { grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops, - s->sending_write_closed, s->id, &t->hpack_compressor, + s->send_closed, s->id, &t->hpack_compressor, &t->outbuf); s->writing_sopb.nops = 0; - if (s->sending_write_closed) { + if (s->send_closed) { stream_list_join(t, s, WRITTEN_CLOSED); } } @@ -932,8 +941,10 @@ static void finish_write_common(transport *t, int success) { drop_connection(t); } while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) { - s->sent_write_closed = 1; - if (!s->cancelled) stream_list_join(t, s, PENDING_CALLBACKS); + s->write_state = WRITE_STATE_SENT_CLOSE; + if (!s->cancelled) { + maybe_finish_read(t, s); + } } t->outbuf.count = 0; t->outbuf.length = 0; @@ -998,66 +1009,53 @@ static void perform_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_op *o lock(t); if (op->send_ops) { - abort(); + GPR_ASSERT(s->outgoing_sopb == NULL); + s->send_done_closure.cb = op->on_done_send; + s->send_done_closure.user_data = op->send_user_data; + if (!s->cancelled) { + s->outgoing_sopb = op->send_ops; + if (op->is_last_send && s->write_state == WRITE_STATE_OPEN) { + s->write_state = WRITE_STATE_QUEUED_CLOSE; + } + if (s->id == 0) { + stream_list_join(t, s, WAITING_FOR_CONCURRENCY); + maybe_start_some_streams(t); + } else if (s->outgoing_window > 0) { + stream_list_join(t, s, WRITABLE); + } + } else { + schedule_nuke_sopb(t, op->send_ops); + schedule_cb(t, s->send_done_closure, 0); + } } if (op->recv_ops) { - abort(); + GPR_ASSERT(s->incoming_sopb == NULL); + s->recv_done_closure.cb = op->on_done_recv; + s->recv_done_closure.user_data = op->recv_user_data; + if (!s->cancelled) { + s->incoming_sopb = op->recv_ops; + s->incoming_sopb->nops = 0; + s->publish_state = op->recv_state; + maybe_finish_read(t, s); + maybe_join_window_updates(t, s); + } else { + schedule_cb(t, s->recv_done_closure, 0); + } } if (op->bind_pollset) { - abort(); + add_to_pollset_locked(t, op->bind_pollset); } - if (op->cancel_with_status) { - abort(); + if (op->cancel_with_status != GRPC_STATUS_OK) { + cancel_stream(t, s, op->cancel_with_status, grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status), + 1); } unlock(t); } -#if 0 -static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops, - size_t ops_count, int is_last) { - transport *t = (transport *)gt; - stream *s = (stream *)gs; - - lock(t); - - if (is_last) { - s->queued_write_closed = 1; - } - if (!s->cancelled) { - grpc_sopb_append(&s->outgoing_sopb, ops, ops_count); - if (s->id == 0) { - stream_list_join(t, s, WAITING_FOR_CONCURRENCY); - maybe_start_some_streams(t); - } else { - stream_list_join(t, s, WRITABLE); - } - } else { - grpc_sopb_append(&t->nuke_later_sopb, ops, ops_count); - } - if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed && - !s->published_close) { - stream_list_join(t, s, PENDING_CALLBACKS); - } - - unlock(t); -} -#endif - -static void abort_stream(grpc_transport *gt, grpc_stream *gs, - grpc_status_code status) { - transport *t = (transport *)gt; - stream *s = (stream *)gs; - - lock(t); - cancel_stream(t, s, status, grpc_chttp2_grpc_status_to_http2_error(status), - 1); - unlock(t); -} - static void send_ping(grpc_transport *gt, void (*cb)(void *user_data), void *user_data) { transport *t = (transport *)gt; @@ -1093,8 +1091,8 @@ static void finalize_cancellations(transport *t) { while ((s = stream_list_remove_head(t, CANCELLED))) { s->read_closed = 1; - s->sent_write_closed = 1; - stream_list_join(t, s, PENDING_CALLBACKS); + s->write_state = WRITE_STATE_SENT_CLOSE; + maybe_finish_read(t, s); } } @@ -1118,12 +1116,15 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, if (s) { /* clear out any unreported input & output: nobody cares anymore */ - had_outgoing = s->outgoing_sopb.nops != 0; + had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0; schedule_nuke_sopb(t, &s->parser.incoming_sopb); - schedule_nuke_sopb(t, &s->outgoing_sopb); + if (s->outgoing_sopb) { + schedule_nuke_sopb(t, s->outgoing_sopb); + schedule_cb(t, s->send_done_closure, 0); + } if (s->cancelled) { send_rst = 0; - } else if (!s->read_closed || !s->sent_write_closed || had_outgoing) { + } else if (!s->read_closed || s->write_state != WRITE_STATE_SENT_CLOSE || had_outgoing) { s->cancelled = 1; stream_list_join(t, s, CANCELLED); @@ -1141,7 +1142,7 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, break; } - stream_list_join(t, s, PENDING_CALLBACKS); + maybe_finish_read(t, s); } } if (!id) send_rst = 0; @@ -1180,8 +1181,14 @@ static void drop_connection(transport *t) { end_all_the_calls(t); } +static void maybe_finish_read(transport *t, stream *s) { + if (s->incoming_sopb) { + stream_list_join(t, s, FINISHED_READ_OP); + } +} + static void maybe_join_window_updates(transport *t, stream *s) { - if (s->allow_window_updates && + if (s->incoming_sopb != NULL && s->incoming_window < t->settings[LOCAL_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] * @@ -1190,6 +1197,7 @@ static void maybe_join_window_updates(transport *t, stream *s) { } } +#if 0 static void set_allow_window_updates(grpc_transport *tp, grpc_stream *sp, int allow) { transport *t = (transport *)tp; @@ -1204,6 +1212,7 @@ static void set_allow_window_updates(grpc_transport *tp, grpc_stream *sp, } unlock(t); } +#endif static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) { if (t->incoming_frame_size > t->incoming_window) { @@ -1301,7 +1310,6 @@ static void on_header(void *tp, grpc_mdelem *md) { grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value))); - stream_list_join(t, s, PENDING_CALLBACKS); if (md->key == t->str_grpc_timeout) { gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout); if (!cached_timeout) { @@ -1320,6 +1328,7 @@ static void on_header(void *tp, grpc_mdelem *md) { } else { add_incoming_metadata(t, s, md); } + maybe_finish_read(t, s); } static int init_header_frame_parser(transport *t, int is_continuation) { @@ -1531,14 +1540,14 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { case GRPC_CHTTP2_PARSE_OK: if (st.end_of_stream) { t->incoming_stream->read_closed = 1; - stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); + maybe_finish_read(t, t->incoming_stream); } if (st.need_flush_reads) { - stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); + maybe_finish_read(t, t->incoming_stream); } if (st.metadata_boundary) { add_metadata_batch(t, t->incoming_stream); - stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); + maybe_finish_read(t, t->incoming_stream); } if (st.ack_settings) { gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create()); @@ -1579,7 +1588,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { int was_window_empty = s->outgoing_window <= 0; s->outgoing_window += st.initial_window_update; if (was_window_empty && s->outgoing_window > 0 && - s->outgoing_sopb.nops > 0) { + s->outgoing_sopb && s->outgoing_sopb->nops > 0) { stream_list_join(t, s, WRITABLE); } } @@ -1598,7 +1607,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { s->outgoing_window += st.window_update; /* if this window update makes outgoing ops writable again, flag that */ - if (was_window_empty && s->outgoing_sopb.nops) { + if (was_window_empty && s->outgoing_sopb && s->outgoing_sopb->nops > 0) { stream_list_join(t, s, WRITABLE); } } @@ -1860,45 +1869,49 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed, return GRPC_STREAM_OPEN; } +static void finish_reads(transport *t) { + stream *s; + + while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) { + int publish = 0; + GPR_ASSERT(s->incoming_sopb); + *s->publish_state = compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, + s->read_closed); + if (*s->publish_state != s->published_state) { + s->published_state = *s->publish_state; + publish = 1; + } + if (s->parser.incoming_sopb.nops > 0) { + grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb); + publish = 1; + } + if (publish) { + schedule_cb(t, s->recv_done_closure, 1); + } + } +} + +static void schedule_cb(transport *t, op_closure closure, int success) { + if (t->pending_callbacks.capacity == t->pending_callbacks.count) { + t->pending_callbacks.capacity = GPR_MAX(t->pending_callbacks.capacity * 2, 8); + t->pending_callbacks.callbacks = gpr_realloc(t->pending_callbacks.callbacks, t->pending_callbacks.capacity * sizeof(*t->pending_callbacks.callbacks)); + } + closure.success = success; + t->pending_callbacks.callbacks[t->pending_callbacks.count++] = closure; +} + static int prepare_callbacks(transport *t) { op_closure_array temp = t->pending_callbacks; t->pending_callbacks = t->executing_callbacks; t->executing_callbacks = temp; return t->executing_callbacks.count > 0; - -#if 0 - stream *s; - int n = 0; - while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) { - int execute = 1; - - s->callback_state = compute_state(s->sent_write_closed, s->read_closed); - if (s->callback_state == GRPC_STREAM_CLOSED) { - remove_from_stream_map(t, s); - if (s->published_close) { - execute = 0; - } else if (s->incoming_metadata_count) { - add_metadata_batch(t, s); - } - s->published_close = 1; - } - - grpc_sopb_swap(&s->parser.incoming_sopb, &s->callback_sopb); - - if (execute) { - stream_list_add_tail(t, s, EXECUTING_CALLBACKS); - n = 1; - } - } - return n; -#endif } static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) { size_t i; for (i = 0; i < t->executing_callbacks.count; i++) { op_closure c = t->executing_callbacks.callbacks[i]; - c.cb(c.user_data, c.status); + c.cb(c.user_data, c.success); } t->executing_callbacks.count = 0; } @@ -1907,12 +1920,20 @@ static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) { cb->closed(t->cb_user_data, &t->base); } -static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { - transport *t = (transport *)gt; - lock(t); +/* + * POLLSET STUFF + */ + +static void add_to_pollset_locked(transport *t, grpc_pollset *pollset) { if (t->ep) { grpc_endpoint_add_to_pollset(t->ep, pollset); } +} + +static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { + transport *t = (transport *)gt; + lock(t); + add_to_pollset_locked(t, pollset); unlock(t); }