From 1937b06b785f577ad4b0fa1734eed286d2d67e6f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 16 Jun 2015 08:47:38 -0700 Subject: [PATCH] Implement some missing pieces for chttp2s lock breakup --- src/core/transport/chttp2/incoming_metadata.c | 4 ++ src/core/transport/chttp2/internal.h | 3 +- src/core/transport/chttp2/parsing.c | 4 ++ src/core/transport/chttp2/stream_lists.c | 8 +++ src/core/transport/chttp2_transport.c | 58 ++++++------------- 5 files changed, 36 insertions(+), 41 deletions(-) diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c index df799047156..b120b3326f0 100644 --- a/src/core/transport/chttp2/incoming_metadata.c +++ b/src/core/transport/chttp2/incoming_metadata.c @@ -44,6 +44,10 @@ void grpc_chttp2_incoming_metadata_buffer_init(grpc_chttp2_incoming_metadata_buf buffer->deadline = gpr_inf_future; } +void grpc_chttp2_incoming_metadata_buffer_destroy(grpc_chttp2_incoming_metadata_buffer *buffer) { + gpr_free(buffer->elems); +} + void grpc_chttp2_incoming_metadata_buffer_add(grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem *elem) { if (buffer->capacity == buffer->count) { diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index d94bb7965e2..d34fc7e6ff9 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -514,9 +514,9 @@ void grpc_chttp2_terminate_writing( void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); -/** Process one slice of incoming data */ void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing); +/** Process one slice of incoming data */ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice); void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global, @@ -589,6 +589,7 @@ void grpc_chttp2_list_add_incoming_window_state_changed( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); +/** schedule a closure to run without the transport lock taken */ void grpc_chttp2_schedule_closure( grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, int success); diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index d6505f396d7..bf66bb42cf3 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -60,6 +60,10 @@ static int init_skip_frame_parser( static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last); +void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global, + grpc_chttp2_transport_parsing *parsing) { +} + void grpc_chttp2_publish_reads( grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_parsing *transport_parsing) { diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 316539efe8e..544174fd67f 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -263,6 +263,14 @@ void grpc_chttp2_list_add_incoming_window_state_changed( void grpc_chttp2_register_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { stream_list_add_tail(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); } + void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { stream_list_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); } + +void grpc_chttp2_for_all_streams(grpc_chttp2_transport_global *transport_global, void *user_data, void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global)) { + grpc_chttp2_stream *s; + for (s = TRANSPORT_FROM_GLOBAL(transport_global)->lists[GRPC_CHTTP2_LIST_ALL_STREAMS].head; s; s = s->links[GRPC_CHTTP2_LIST_ALL_STREAMS].next) { + cb(transport_global, user_data, &s->global); + } +} diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 404a6339c5e..db9a8ef8ee8 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -97,11 +97,6 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, /** Start disconnection chain */ static void drop_connection(grpc_chttp2_transport *t); -/** Schedule a closure to be called outside of the transport lock after the next - unlock() operation */ -static void schedule_cb(grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, - int success); - /** Perform a transport_op */ static void perform_op_locked(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_transport_op *op); @@ -115,34 +110,6 @@ static void cancel_from_api( static void add_to_pollset_locked(grpc_chttp2_transport *t, grpc_pollset *pollset); -#if 0 - -static void unlock_check_parser(grpc_chttp2_transport *t); - -static void end_all_the_calls(grpc_chttp2_transport *t); - -static void cancel_stream_id(grpc_chttp2_transport *t, gpr_uint32 id, - grpc_status_code local_status, - grpc_chttp2_error_code error_code, int send_rst); -static void cancel_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_status_code local_status, - grpc_chttp2_error_code error_code, - grpc_mdstr *optional_message, int send_rst); -static grpc_chttp2_stream *lookup_stream(grpc_chttp2_transport *t, - gpr_uint32 id); -static void remove_from_stream_map(grpc_chttp2_transport *t, - grpc_chttp2_stream *s); -static void maybe_start_some_streams(grpc_chttp2_transport *t); - -static void parsing_become_skip_parser(grpc_chttp2_transport *t); - -static void maybe_finish_read(grpc_chttp2_transport *t, grpc_chttp2_stream *s, - int is_parser); -static void maybe_join_window_updates(grpc_chttp2_transport *t, - grpc_chttp2_stream *s); -static void add_metadata_batch(grpc_chttp2_transport *t, grpc_chttp2_stream *s); -#endif - /* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -431,6 +398,17 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( return &s->parsing; } +grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( + grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id) { + grpc_chttp2_stream *accepting; + grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing); + GPR_ASSERT(t->accepting_stream == NULL); + t->accepting_stream = &accepting; + t->channel_callback.cb->accept_stream(t->channel_callback.cb_user_data, &t->base, (void *)(gpr_uintptr)id); + t->accepting_stream = NULL; + return &accepting->parsing; +} + #if 0 static void remove_from_stream_map(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { if (s->global.id == 0) return; @@ -461,7 +439,7 @@ static void unlock(grpc_chttp2_transport *t) { grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) { t->writing_active = 1; ref_transport(t); - schedule_cb(&t->global, &t->writing_action, 1); + grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1); } unlock_check_cancellations(t); /* unlock_check_parser(t); */ @@ -606,7 +584,7 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global, gr } } else { grpc_sopb_reset(op->send_ops); - schedule_cb(transport_global, stream_global->send_done_closure, 0); + grpc_chttp2_schedule_closure(transport_global, stream_global->send_done_closure, 0); } } @@ -626,7 +604,7 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global, gr } if (op->on_consumed) { - schedule_cb(transport_global, op->on_consumed, 1); + grpc_chttp2_schedule_closure(transport_global, op->on_consumed, 1); } } @@ -728,7 +706,7 @@ static void cancel_stream_inner(grpc_chttp2_transport *t, grpc_chttp2_stream *s, schedule_nuke_sopb(t, s->global.outgoing_sopb); s->global.outgoing_sopb = NULL; stream_list_remove(t, s, WRITABLE); - schedule_cb(t, s->global.send_done_closure, 0); + grpc_chttp2_schedule_closure(t, s->global.send_done_closure, 0); } } if (s->cancelled) { @@ -985,7 +963,7 @@ static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) { t->channel_callback.executing = 1; grpc_iomgr_closure_init(&a->closure, notify_goaways, a); ref_transport(t); - schedule_cb(&t->global, &a->closure, 1); + grpc_chttp2_schedule_closure(&t->global, &a->closure, 1); return; } else if (t->global.goaway_state != GRPC_CHTTP2_ERROR_STATE_NOTIFIED) { return; @@ -995,11 +973,11 @@ static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) { t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED; t->channel_callback.executing = 1; ref_transport(t); - schedule_cb(&t->global, &t->channel_callback.notify_closed, 1); + grpc_chttp2_schedule_closure(&t->global, &t->channel_callback.notify_closed, 1); } } -static void schedule_cb(grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, +void grpc_chttp2_schedule_closure(grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, int success) { closure->success = success; closure->next = transport_global->pending_closures;