From 1766a917e87e06094301250df3b689db766f724d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 22 Jun 2015 16:43:43 -0700 Subject: [PATCH 01/19] Executor sketch --- src/core/transport/chttp2/internal.h | 36 +++- src/core/transport/chttp2_transport.c | 251 +++++++++++++++++--------- 2 files changed, 193 insertions(+), 94 deletions(-) diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 3d1cd56e618..784c85ec4f7 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -290,24 +290,39 @@ struct grpc_chttp2_transport_parsing { grpc_chttp2_outstanding_ping pings; }; +typedef struct grpc_chttp2_executor_action_header { + grpc_chttp2_stream *stream; + void (*action)(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void *arg); + struct grpc_chttp2_executor_action_header *next; + void *arg; +} grpc_chttp2_executor_action_header; + struct grpc_chttp2_transport { grpc_transport base; /* must be first */ + gpr_refcount refs; grpc_endpoint *ep; grpc_mdctx *metadata_context; - gpr_refcount refs; - gpr_mu mu; + struct { + gpr_mu mu; + + /** is a thread currently in the global lock */ + gpr_uint8 global_active; + /** is a thread currently writing */ + gpr_uint8 writing_active; + /** is a thread currently parsing */ + gpr_uint8 parsing_active; + /** is a thread currently executing channel callbacks */ + gpr_uint8 channel_callback_active; + + grpc_chttp2_executor_action_header *pending_actions; + } executor; /** is the transport destroying itself? */ gpr_uint8 destroying; /** has the upper layer closed the transport? */ gpr_uint8 closed; - /** is a thread currently writing */ - gpr_uint8 writing_active; - /** is a thread currently parsing */ - gpr_uint8 parsing_active; - /** is there a read request to the endpoint outstanding? */ gpr_uint8 endpoint_reading; @@ -343,8 +358,6 @@ struct grpc_chttp2_transport { grpc_chttp2_stream **accepting_stream; struct { - /** is a thread currently performing channel callbacks */ - gpr_uint8 executing; /** transport channel-level callback */ const grpc_transport_callbacks *cb; /** user data for cb calls */ @@ -615,6 +628,11 @@ void grpc_chttp2_for_all_streams( void grpc_chttp2_parsing_become_skip_parser( grpc_chttp2_transport_parsing *transport_parsing); +void grpc_chttp2_run_with_global_lock( + grpc_chttp2_transport *transport, grpc_chttp2_stream *optional_stream, + void (*action)(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void *arg), + void *arg, size_t sizeof_arg); + #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \ (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1) diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 0032ef1d4da..b32d9f0a71d 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -78,8 +78,10 @@ int grpc_flowctl_trace = 0; static const grpc_transport_vtable vtable; +#if 0 static void lock(grpc_chttp2_transport *t); static void unlock(grpc_chttp2_transport *t); +#endif static void unlock_check_channel_callbacks(grpc_chttp2_transport *t); static void unlock_check_read_write_state(grpc_chttp2_transport *t); @@ -101,9 +103,9 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, static void drop_connection(grpc_chttp2_transport *t); /** Perform a transport_op */ -static void perform_op_locked(grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global, - grpc_transport_op *op); +static void perform_op_locked(grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *transport_op); /** Cancel a stream: coming from the transport API */ static void cancel_from_api(grpc_chttp2_transport_global *transport_global, @@ -112,12 +114,15 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global, /** Add endpoint from this transport to pollset */ static void add_to_pollset_locked(grpc_chttp2_transport *t, - grpc_pollset *pollset); + grpc_chttp2_stream *s_ignored, + void *pollset); /** Start new streams that have been created if we can */ static void maybe_start_some_streams( grpc_chttp2_transport_global *transport_global); +static void finish_global_actions(grpc_chttp2_transport *t); + /* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -125,7 +130,7 @@ static void maybe_start_some_streams( static void destruct_transport(grpc_chttp2_transport *t) { size_t i; - gpr_mu_lock(&t->mu); + gpr_mu_lock(&t->executor.mu); GPR_ASSERT(t->ep == NULL); @@ -151,8 +156,8 @@ static void destruct_transport(grpc_chttp2_transport *t) { grpc_chttp2_stream_map_destroy(&t->parsing_stream_map); grpc_chttp2_stream_map_destroy(&t->new_stream_map); - gpr_mu_unlock(&t->mu); - gpr_mu_destroy(&t->mu); + gpr_mu_unlock(&t->executor.mu); + gpr_mu_destroy(&t->executor.mu); /* callback remaining pings: they're not allowed to call into the transpot, and maybe they hold resources that need to be freed */ @@ -215,7 +220,7 @@ static void init_transport(grpc_chttp2_transport *t, t->ep = ep; /* one ref is for destroy, the other for when ep becomes NULL */ gpr_ref_init(&t->refs, 2); - gpr_mu_init(&t->mu); + gpr_mu_init(&t->executor.mu); grpc_mdctx_ref(mdctx); t->metadata_context = mdctx; t->endpoint_reading = 1; @@ -311,18 +316,19 @@ static void init_transport(grpc_chttp2_transport *t, } } - gpr_mu_lock(&t->mu); - t->channel_callback.executing = 1; + gpr_mu_lock(&t->executor.mu); + t->executor.channel_callback_active = 1; + t->executor.global_active = 1; REF_TRANSPORT(t, "init"); /* matches unref at end of this function */ - gpr_mu_unlock(&t->mu); + gpr_mu_unlock(&t->executor.mu); sr = setup(arg, &t->base, t->metadata_context); - lock(t); t->channel_callback.cb = sr.callbacks; t->channel_callback.cb_user_data = sr.user_data; - t->channel_callback.executing = 0; - unlock(t); + t->executor.channel_callback_active = 0; + + finish_global_actions(t); REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */ recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK); @@ -330,18 +336,18 @@ static void init_transport(grpc_chttp2_transport *t, UNREF_TRANSPORT(t, "init"); } -static void destroy_transport(grpc_transport *gt) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - - lock(t); +static void destroy_transport_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *arg_ignored) { t->destroying = 1; drop_connection(t); - unlock(t); +} +static void destroy_transport(grpc_transport *gt) { + grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; + grpc_chttp2_run_with_global_lock(t, NULL, destroy_transport_locked, NULL, 0); UNREF_TRANSPORT(t, "destroy"); } -static void close_transport_locked(grpc_chttp2_transport *t) { +static void close_transport_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *arg_ignored) { if (!t->closed) { t->closed = 1; if (t->ep) { @@ -352,19 +358,32 @@ static void close_transport_locked(grpc_chttp2_transport *t) { static void close_transport(grpc_transport *gt) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - gpr_mu_lock(&t->mu); - close_transport_locked(t); - gpr_mu_unlock(&t->mu); + grpc_chttp2_run_with_global_lock(t, NULL, close_transport_locked, NULL, 0); +} + +typedef struct { + grpc_status_code status; + gpr_slice debug_data; +} goaway_arg; + +static void goaway_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *a) { + goaway_arg *arg = a; + grpc_chttp2_goaway_append(t->global.last_incoming_stream_id, + grpc_chttp2_grpc_status_to_http2_error(arg->status), + arg->debug_data, &t->global.qbuf); } static void goaway(grpc_transport *gt, grpc_status_code status, gpr_slice debug_data) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - lock(t); - grpc_chttp2_goaway_append(t->global.last_incoming_stream_id, - grpc_chttp2_grpc_status_to_http2_error(status), - debug_data, &t->global.qbuf); - unlock(t); + goaway_arg arg; + arg.status = status; + arg.debug_data = debug_data; + grpc_chttp2_run_with_global_lock(t, NULL, goaway_locked, &arg, sizeof(arg)); +} + +static void finish_init_stream_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void *arg_ignored) { + grpc_chttp2_register_stream(t, s); } static int init_stream(grpc_transport *gt, grpc_stream *gs, @@ -382,10 +401,8 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, REF_TRANSPORT(t, "stream"); - lock(t); - grpc_chttp2_register_stream(t, s); if (server_data) { - GPR_ASSERT(t->parsing_active); + GPR_ASSERT(t->executor.parsing_active); s->global.id = (gpr_uint32)(gpr_uintptr)server_data; s->global.outgoing_window = t->global @@ -398,8 +415,8 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, s->global.in_stream_map = 1; } - if (initial_op) perform_op_locked(&t->global, &s->global, initial_op); - unlock(t); + grpc_chttp2_run_with_global_lock(t, s, finish_init_stream_locked, NULL, 0); + if (initial_op) grpc_chttp2_run_with_global_lock(t, s, perform_op_locked, initial_op, sizeof(*initial_op)); return 0; } @@ -407,8 +424,10 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; + int i; +#if 0 gpr_mu_lock(&t->mu); GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED || @@ -424,6 +443,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { grpc_chttp2_list_remove_writable_window_update_stream(&t->global, &s->global); gpr_mu_unlock(&t->mu); +#endif for (i = 0; i < STREAM_LIST_COUNT; i++) { GPR_ASSERT(!s->included[i]); @@ -474,36 +494,92 @@ static void remove_from_stream_map(grpc_chttp2_transport *t, grpc_chttp2_stream * LOCK MANAGEMENT */ -/* We take a grpc_chttp2_transport-global lock in response to calls coming in - from above, - and in response to data being received from below. New data to be written - is always queued, as are callbacks to process data. During unlock() we - check our todo lists and initiate callbacks and flush writes. */ +static void finish_global_actions(grpc_chttp2_transport *t) { + grpc_chttp2_executor_action_header *hdr; + grpc_chttp2_executor_action_header *next; + grpc_iomgr_closure *run_closures; -static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); } + for (;;) { + unlock_check_read_write_state(t); + if (!t->executor.writing_active && t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE && + grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) { + t->executor.writing_active = 1; + REF_TRANSPORT(t, "writing"); + grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1); + } + unlock_check_channel_callbacks(t); -static void unlock(grpc_chttp2_transport *t) { - grpc_iomgr_closure *run_closures; + run_closures = t->global.pending_closures; + t->global.pending_closures = NULL; + + gpr_mu_lock(&t->executor.mu); + t->executor.global_active = 0; + gpr_mu_unlock(&t->executor.mu); - unlock_check_read_write_state(t); - if (!t->writing_active && t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE && - grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) { - t->writing_active = 1; - REF_TRANSPORT(t, "writing"); - grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1); + while (run_closures) { + grpc_iomgr_closure *next = run_closures->next; + run_closures->cb(run_closures->cb_arg, run_closures->success); + run_closures = next; + } + + gpr_mu_lock(&t->executor.mu); + if (!t->executor.global_active && t->executor.pending_actions) { + t->executor.global_active = 1; + hdr = t->executor.pending_actions; + t->executor.pending_actions = NULL; + gpr_mu_unlock(&t->executor.mu); + while (hdr != NULL) { + hdr->action(t, hdr->stream, hdr->arg); + next = hdr->next; + gpr_free(hdr); + hdr = next; + } + continue; + } + gpr_mu_unlock(&t->executor.mu); + break; } - /* unlock_check_parser(t); */ - unlock_check_channel_callbacks(t); +} - run_closures = t->global.pending_closures; - t->global.pending_closures = NULL; +void grpc_chttp2_run_with_global_lock(grpc_chttp2_transport *t, grpc_chttp2_stream *optional_stream, + void (*action)(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void *arg), + void *arg, size_t sizeof_arg) { + grpc_chttp2_executor_action_header *hdr; - gpr_mu_unlock(&t->mu); + gpr_mu_lock(&t->executor.mu); + + for (;;) { + if (!t->executor.global_active) { + t->executor.global_active = 1; + GPR_ASSERT(t->executor.pending_actions == NULL); + gpr_mu_unlock(&t->executor.mu); + + action(t, optional_stream, arg); + + finish_global_actions(t); + } else { + gpr_mu_unlock(&t->executor.mu); + + hdr = gpr_malloc(sizeof(*hdr) + sizeof_arg); + hdr->stream = optional_stream; + hdr->action = action; + if (sizeof_arg == 0) { + hdr->arg = arg; + } else { + hdr->arg = hdr + 1; + memcpy(hdr->arg, arg, sizeof_arg); + } - while (run_closures) { - grpc_iomgr_closure *next = run_closures->next; - run_closures->cb(run_closures->cb_arg, run_closures->success); - run_closures = next; + gpr_mu_lock(&t->executor.mu); + if (!t->executor.global_active) { + gpr_free(hdr); + continue; + } + hdr->next = t->executor.pending_actions; + t->executor.pending_actions = hdr; + gpr_mu_unlock(&t->executor.mu); + } + break; } } @@ -526,6 +602,7 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, } } +#if 0 void grpc_chttp2_terminate_writing( grpc_chttp2_transport_writing *transport_writing, int success) { grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); @@ -553,6 +630,7 @@ void grpc_chttp2_terminate_writing( UNREF_TRANSPORT(t, "writing"); } +#endif static void writing_action(void *gt, int iomgr_success_ignored) { grpc_chttp2_transport *t = gt; @@ -622,9 +700,13 @@ static void maybe_start_some_streams( } } -static void perform_op_locked(grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global, - grpc_transport_op *op) { +static void perform_op_locked(grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *transport_op) { + grpc_chttp2_transport_global *transport_global = &t->global; + grpc_chttp2_stream_global *stream_global = &s->global; + grpc_transport_op *op = transport_op; + if (op->cancel_with_status != GRPC_STATUS_OK) { cancel_from_api(transport_global, stream_global, op->cancel_with_status); } @@ -671,7 +753,7 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global, } if (op->bind_pollset) { - add_to_pollset_locked(TRANSPORT_FROM_GLOBAL(transport_global), + add_to_pollset_locked(TRANSPORT_FROM_GLOBAL(transport_global), NULL, op->bind_pollset); } @@ -684,17 +766,11 @@ static void perform_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_op *op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - - lock(t); - perform_op_locked(&t->global, &s->global, op); - unlock(t); + grpc_chttp2_run_with_global_lock(t, s, perform_op_locked, op, sizeof(*op)); } -static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; +static void send_ping_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *a) { grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p)); - - lock(t); p->next = &t->global.pings; p->prev = p->next->prev; p->prev->next = p->next->prev = p; @@ -706,9 +782,13 @@ static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) { p->id[5] = (t->global.ping_counter >> 16) & 0xff; p->id[6] = (t->global.ping_counter >> 8) & 0xff; p->id[7] = t->global.ping_counter & 0xff; - p->on_recv = on_recv; + p->on_recv = *(grpc_iomgr_closure**)a; gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); - unlock(t); +} + +static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) { + grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; + grpc_chttp2_run_with_global_lock(t, NULL, send_ping_locked, &on_recv, sizeof(on_recv)); } /* @@ -751,7 +831,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { grpc_chttp2_stream_global *stream_global; grpc_stream_state state; - if (!t->parsing_active) { + if (!t->executor.parsing_active) { /* if a stream is in the stream map, and gets cancelled, we need to ensure we are not parsing before continuing the cancellation to keep things in a sane state */ @@ -784,7 +864,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { } if (stream_global->write_state == WRITE_STATE_SENT_CLOSE && stream_global->read_closed && stream_global->in_stream_map) { - if (t->parsing_active) { + if (t->executor.parsing_active) { grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, stream_global); } else { @@ -934,7 +1014,7 @@ static void drop_connection(grpc_chttp2_transport *t) { if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) { t->global.error_state = GRPC_CHTTP2_ERROR_STATE_SEEN; } - close_transport_locked(t); + close_transport_locked(t, NULL, NULL); end_all_the_calls(t); } @@ -968,6 +1048,7 @@ static grpc_chttp2_stream *lookup_stream(grpc_chttp2_transport *t, gpr_uint32 id #endif /* tcp read callback */ +#if 0 static void recv_data(void *tp, gpr_slice *slices, size_t nslices, grpc_endpoint_cb_status error) { grpc_chttp2_transport *t = tp; @@ -1025,6 +1106,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, break; } } +#endif static void reading_action(void *pt, int iomgr_success_ignored) { grpc_chttp2_transport *t = pt; @@ -1042,6 +1124,10 @@ typedef struct { grpc_iomgr_closure closure; } notify_goaways_args; +static void finished_channel_callbacks_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *arg_ignored) { + t->executor.channel_callback_active = 0; +} + static void notify_goaways(void *p, int iomgr_success_ignored) { notify_goaways_args *a = p; grpc_chttp2_transport *t = a->t; @@ -1051,10 +1137,7 @@ static void notify_goaways(void *p, int iomgr_success_ignored) { gpr_free(a); - lock(t); - t->channel_callback.executing = 0; - unlock(t); - + grpc_chttp2_run_with_global_lock(t, NULL, finished_channel_callbacks_locked, NULL, 0); UNREF_TRANSPORT(t, "notify_goaways"); } @@ -1062,13 +1145,11 @@ static void notify_closed(void *gt, int iomgr_success_ignored) { grpc_chttp2_transport *t = gt; t->channel_callback.cb->closed(t->channel_callback.cb_user_data, &t->base); - lock(t); - t->channel_callback.executing = 0; - unlock(t); - + grpc_chttp2_run_with_global_lock(t, NULL, finished_channel_callbacks_locked, NULL, 0); UNREF_TRANSPORT(t, "notify_closed"); } +#if 0 static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) { if (t->channel_callback.executing) { return; @@ -1098,6 +1179,7 @@ static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) { 1); } } +#endif void grpc_chttp2_schedule_closure( grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, @@ -1112,7 +1194,8 @@ void grpc_chttp2_schedule_closure( */ static void add_to_pollset_locked(grpc_chttp2_transport *t, - grpc_pollset *pollset) { + grpc_chttp2_stream *s, + void *pollset) { if (t->ep) { grpc_endpoint_add_to_pollset(t->ep, pollset); } @@ -1120,9 +1203,7 @@ static void add_to_pollset_locked(grpc_chttp2_transport *t, static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - lock(t); - add_to_pollset_locked(t, pollset); - unlock(t); + grpc_chttp2_run_with_global_lock(t, NULL, add_to_pollset_locked, pollset, 0); } /* From 4b074d5274af7f1c2b2067181b398c9d09e4443c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 23 Jun 2015 07:48:21 -0700 Subject: [PATCH 02/19] CHTTP2 executor --- src/core/transport/chttp2/internal.h | 7 ++ src/core/transport/chttp2_transport.c | 167 ++++++++++++++------------ 2 files changed, 94 insertions(+), 80 deletions(-) diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 784c85ec4f7..61947f73958 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -351,6 +351,13 @@ struct grpc_chttp2_transport { grpc_iomgr_closure writing_action; /** closure to start reading from the endpoint */ grpc_iomgr_closure reading_action; + /** closure to actually do parsing */ + grpc_iomgr_closure parsing_action; + + struct { + size_t nslices; + gpr_slice *slices; + } executor_parsing; /** address to place a newly accepted stream - set and unset by grpc_chttp2_parsing_accept_stream; used by init_stream to diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 3cf08e17721..159072eab4c 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -89,6 +89,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t); /* forward declarations of various callbacks that we'll build closures around */ static void writing_action(void *t, int iomgr_success_ignored); static void reading_action(void *t, int iomgr_success_ignored); +static void parsing_action(void *t, int iomgr_success_ignored); static void notify_closed(void *t, int iomgr_success_ignored); /** Set a transport level setting, and push it to our peer */ @@ -244,6 +245,7 @@ static void init_transport(grpc_chttp2_transport *t, grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx); grpc_iomgr_closure_init(&t->writing_action, writing_action, t); grpc_iomgr_closure_init(&t->reading_action, reading_action, t); + grpc_iomgr_closure_init(&t->parsing_action, parsing_action, t); gpr_slice_buffer_init(&t->parsing.qbuf); grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); @@ -427,24 +429,6 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { int i; -#if 0 - gpr_mu_lock(&t->mu); - - GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED || - s->global.id == 0); - GPR_ASSERT(!s->global.in_stream_map); - grpc_chttp2_unregister_stream(t, s); - if (!t->parsing_active && s->global.id) { - GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map, - s->global.id) == NULL); - } - - grpc_chttp2_list_remove_incoming_window_updated(&t->global, &s->global); - grpc_chttp2_list_remove_writable_window_update_stream(&t->global, &s->global); - - gpr_mu_unlock(&t->mu); -#endif - for (i = 0; i < STREAM_LIST_COUNT; i++) { GPR_ASSERT(!s->included[i]); } @@ -540,7 +524,6 @@ void grpc_chttp2_run_with_global_lock(grpc_chttp2_transport *t, grpc_chttp2_stre for (;;) { if (!t->executor.global_active) { t->executor.global_active = 1; - GPR_ASSERT(t->executor.pending_actions == NULL); gpr_mu_unlock(&t->executor.mu); action(t, optional_stream, arg); @@ -591,12 +574,8 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, } } -#if 0 -void grpc_chttp2_terminate_writing( - grpc_chttp2_transport_writing *transport_writing, int success) { - grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); - - lock(t); +static void terminate_writing_with_lock(grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *a) { + int success = *(int*)a; if (!success) { drop_connection(t); @@ -607,7 +586,7 @@ void grpc_chttp2_terminate_writing( /* leave the writing flag up on shutdown to prevent further writes in unlock() from starting */ - t->writing_active = 0; + t->executor.writing_active = 0; if (t->ep && !t->endpoint_reading) { grpc_endpoint_destroy(t->ep); t->ep = NULL; @@ -615,11 +594,14 @@ void grpc_chttp2_terminate_writing( t, "disconnect"); /* safe because we'll still have the ref for write */ } - unlock(t); - UNREF_TRANSPORT(t, "writing"); } -#endif + +void grpc_chttp2_terminate_writing( + grpc_chttp2_transport_writing *transport_writing, int success) { + grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); + grpc_chttp2_run_with_global_lock(t, NULL, terminate_writing_with_lock, &success, sizeof(success)); +} static void writing_action(void *gt, int iomgr_success_ignored) { grpc_chttp2_transport *t = gt; @@ -879,6 +861,12 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op( &stream_global->incoming_metadata, &stream_global->incoming_sopb, &stream_global->outstanding_metadata); + if (state == GRPC_STREAM_CLOSED) { + GPR_ASSERT(!stream_global->in_stream_map); + grpc_chttp2_unregister_stream(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global)); + grpc_chttp2_list_remove_incoming_window_updated(transport_global, stream_global); + grpc_chttp2_list_remove_writable_window_update_stream(transport_global, stream_global); + } grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb); stream_global->published_state = *stream_global->publish_state = state; grpc_chttp2_schedule_closure(transport_global, @@ -922,66 +910,87 @@ static void drop_connection(grpc_chttp2_transport *t) { end_all_the_calls(t); } +static void recv_data_error_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void *a) { + size_t i; + + drop_connection(t); + t->endpoint_reading = 0; + if (!t->executor.writing_active && t->ep) { + grpc_endpoint_destroy(t->ep); + t->ep = NULL; + UNREF_TRANSPORT( + t, "disconnect"); /* safe as we still have a ref for read */ + } + UNREF_TRANSPORT(t, "recv_data"); + for (i = 0; i < t->executor_parsing.nslices; i++) gpr_slice_unref(t->executor_parsing.slices[i]); +} + +static void finish_parsing_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *a) { + size_t i = *(size_t *)a; + + if (i != t->executor_parsing.nslices) { + drop_connection(t); + } + /* merge stream lists */ + grpc_chttp2_stream_map_move_into(&t->new_stream_map, + &t->parsing_stream_map); + t->global.concurrent_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map); + /* handle higher level things */ + grpc_chttp2_publish_reads(&t->global, &t->parsing); + t->executor.parsing_active = 0; + + for (; i < t->executor_parsing.nslices; i++) gpr_slice_unref(t->executor_parsing.slices[i]); + + if (i == t->executor_parsing.nslices) { + grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1); + } +} + +static void parsing_action(void *pt, int iomgr_success_ignored) { + size_t i; + grpc_chttp2_transport *t = pt; + for (i = 0; i < t->executor_parsing.nslices && grpc_chttp2_perform_read(&t->parsing, t->executor_parsing.slices[i]); + i++) { + gpr_slice_unref(t->executor_parsing.slices[i]); + } + grpc_chttp2_run_with_global_lock(t, NULL, finish_parsing_locked, &i, sizeof(i)); +} + +static void recv_data_ok_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void *a) { + size_t i; + GPR_ASSERT(!t->executor.parsing_active); + if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) { + t->executor.parsing_active = 1; + /* merge stream lists */ + grpc_chttp2_stream_map_move_into(&t->new_stream_map, + &t->parsing_stream_map); + grpc_chttp2_prepare_to_read(&t->global, &t->parsing); + /* schedule more work to do unlocked */ + grpc_chttp2_schedule_closure(&t->global, &t->parsing_action, 1); + } else { + for (i = 0; i < t->executor_parsing.nslices; i++) gpr_slice_unref(t->executor_parsing.slices[i]); + } +} + /* tcp read callback */ -#if 0 static void recv_data(void *tp, gpr_slice *slices, size_t nslices, grpc_endpoint_cb_status error) { grpc_chttp2_transport *t = tp; - size_t i; + + t->executor_parsing.slices = slices; + t->executor_parsing.nslices = nslices; switch (error) { case GRPC_ENDPOINT_CB_SHUTDOWN: case GRPC_ENDPOINT_CB_EOF: case GRPC_ENDPOINT_CB_ERROR: - lock(t); - drop_connection(t); - t->endpoint_reading = 0; - if (!t->writing_active && t->ep) { - grpc_endpoint_destroy(t->ep); - t->ep = NULL; - UNREF_TRANSPORT( - t, "disconnect"); /* safe as we still have a ref for read */ - } - unlock(t); - UNREF_TRANSPORT(t, "recv_data"); - for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]); + grpc_chttp2_run_with_global_lock(t, NULL, recv_data_error_locked, NULL, 0); break; case GRPC_ENDPOINT_CB_OK: - lock(t); - i = 0; - GPR_ASSERT(!t->parsing_active); - if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) { - t->parsing_active = 1; - /* merge stream lists */ - grpc_chttp2_stream_map_move_into(&t->new_stream_map, - &t->parsing_stream_map); - grpc_chttp2_prepare_to_read(&t->global, &t->parsing); - gpr_mu_unlock(&t->mu); - for (; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]); - i++) { - gpr_slice_unref(slices[i]); - } - gpr_mu_lock(&t->mu); - if (i != nslices) { - drop_connection(t); - } - /* merge stream lists */ - grpc_chttp2_stream_map_move_into(&t->new_stream_map, - &t->parsing_stream_map); - t->global.concurrent_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map); - /* handle higher level things */ - grpc_chttp2_publish_reads(&t->global, &t->parsing); - t->parsing_active = 0; - } - if (i == nslices) { - grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1); - } - unlock(t); - for (; i < nslices; i++) gpr_slice_unref(slices[i]); + grpc_chttp2_run_with_global_lock(t, NULL, recv_data_ok_locked, NULL, 0); break; } } -#endif static void reading_action(void *pt, int iomgr_success_ignored) { grpc_chttp2_transport *t = pt; @@ -1024,9 +1033,8 @@ static void notify_closed(void *gt, int iomgr_success_ignored) { UNREF_TRANSPORT(t, "notify_closed"); } -#if 0 static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) { - if (t->channel_callback.executing) { + if (t->executor.channel_callback_active) { return; } if (t->global.goaway_state != GRPC_CHTTP2_ERROR_STATE_NONE) { @@ -1037,7 +1045,7 @@ static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) { a->error = t->global.goaway_error; a->text = t->global.goaway_text; t->global.goaway_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED; - t->channel_callback.executing = 1; + t->executor.channel_callback_active = 1; grpc_iomgr_closure_init(&a->closure, notify_goaways, a); REF_TRANSPORT(t, "notify_goaways"); grpc_chttp2_schedule_closure(&t->global, &a->closure, 1); @@ -1048,13 +1056,12 @@ static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) { } if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_SEEN) { t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED; - t->channel_callback.executing = 1; + t->executor.channel_callback_active = 1; REF_TRANSPORT(t, "notify_closed"); grpc_chttp2_schedule_closure(&t->global, &t->channel_callback.notify_closed, 1); } } -#endif void grpc_chttp2_schedule_closure( grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, From 55e9953d949755e651a7320b854bead6dbcc2b64 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 23 Jun 2015 12:25:55 -0700 Subject: [PATCH 03/19] Fix some memory issues --- src/core/transport/chttp2_transport.c | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 159072eab4c..0aae67a2c0b 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -505,6 +505,7 @@ static void finish_global_actions(grpc_chttp2_transport *t) { hdr->action(t, hdr->stream, hdr->arg); next = hdr->next; gpr_free(hdr); + UNREF_TRANSPORT(t, "pending_action"); hdr = next; } continue; @@ -519,6 +520,7 @@ void grpc_chttp2_run_with_global_lock(grpc_chttp2_transport *t, grpc_chttp2_stre void *arg, size_t sizeof_arg) { grpc_chttp2_executor_action_header *hdr; + REF_TRANSPORT(t, "run_global"); gpr_mu_lock(&t->executor.mu); for (;;) { @@ -549,10 +551,13 @@ void grpc_chttp2_run_with_global_lock(grpc_chttp2_transport *t, grpc_chttp2_stre } hdr->next = t->executor.pending_actions; t->executor.pending_actions = hdr; + REF_TRANSPORT(t, "pending_action"); gpr_mu_unlock(&t->executor.mu); } break; } + + UNREF_TRANSPORT(t, "run_global"); } /* @@ -921,8 +926,9 @@ static void recv_data_error_locked(grpc_chttp2_transport *t, grpc_chttp2_stream UNREF_TRANSPORT( t, "disconnect"); /* safe as we still have a ref for read */ } - UNREF_TRANSPORT(t, "recv_data"); for (i = 0; i < t->executor_parsing.nslices; i++) gpr_slice_unref(t->executor_parsing.slices[i]); + memset(&t->executor_parsing, 0, sizeof(t->executor_parsing)); + UNREF_TRANSPORT(t, "recv_data"); } static void finish_parsing_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *a) { @@ -944,6 +950,7 @@ static void finish_parsing_locked(grpc_chttp2_transport *t, grpc_chttp2_stream * if (i == t->executor_parsing.nslices) { grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1); } + memset(&t->executor_parsing, 0, sizeof(t->executor_parsing)); } static void parsing_action(void *pt, int iomgr_success_ignored) { @@ -969,6 +976,7 @@ static void recv_data_ok_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_schedule_closure(&t->global, &t->parsing_action, 1); } else { for (i = 0; i < t->executor_parsing.nslices; i++) gpr_slice_unref(t->executor_parsing.slices[i]); + memset(&t->executor_parsing, 0, sizeof(t->executor_parsing)); } } From 0cd216cd160926d70f3566bf21f212e3c56a7e89 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 1 Jul 2015 16:11:25 -0700 Subject: [PATCH 04/19] Merge with latest --- src/core/transport/chttp2_transport.c | 37 ++++++++++++++------------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 09d1a8cb390..53c1898843e 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -940,6 +940,19 @@ static void recv_data_error_locked(grpc_chttp2_transport *t, grpc_chttp2_stream UNREF_TRANSPORT(t, "recv_data"); } +/** update window from a settings change */ +static void update_global_window(void *args, gpr_uint32 id, void *stream) { + grpc_chttp2_transport *t = args; + grpc_chttp2_stream *s = stream; + grpc_chttp2_transport_global *transport_global = &t->global; + grpc_chttp2_stream_global *stream_global = &s->global; + + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("settings", transport_global, stream_global, + outgoing_window, + t->parsing.initial_window_update); + stream_global->outgoing_window += t->parsing.initial_window_update; +} + static void finish_parsing_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *a) { size_t i = *(size_t *)a; @@ -950,20 +963,24 @@ static void finish_parsing_locked(grpc_chttp2_transport *t, grpc_chttp2_stream * grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); t->global.concurrent_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map); + if (t->parsing.initial_window_update != 0) { + grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, + update_global_window, t); + } /* handle higher level things */ grpc_chttp2_publish_reads(&t->global, &t->parsing); t->executor.parsing_active = 0; for (; i < t->executor_parsing.nslices; i++) gpr_slice_unref(t->executor_parsing.slices[i]); - memset(&t->executor_parsing, 0, sizeof(t->executor_parsing)); - if (i == t->executor_parsing.nslices) { grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1); } else { read_error_locked(t); UNREF_TRANSPORT(t, "recv_data"); } + + memset(&t->executor_parsing, 0, sizeof(t->executor_parsing)); } static void parsing_action(void *pt, int iomgr_success_ignored) { @@ -993,19 +1010,6 @@ static void recv_data_ok_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, } } -/** update window from a settings change */ -static void update_global_window(void *args, gpr_uint32 id, void *stream) { - grpc_chttp2_transport *t = args; - grpc_chttp2_stream *s = stream; - grpc_chttp2_transport_global *transport_global = &t->global; - grpc_chttp2_stream_global *stream_global = &s->global; - - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("settings", transport_global, stream_global, - outgoing_window, - t->parsing.initial_window_update); - stream_global->outgoing_window += t->parsing.initial_window_update; -} - /* tcp read callback */ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, grpc_endpoint_cb_status error) { @@ -1024,9 +1028,6 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, grpc_chttp2_run_with_global_lock(t, NULL, recv_data_ok_locked, NULL, 0); break; } - if (unref) { - UNREF_TRANSPORT(t, "recv_data"); - } } static void reading_action(void *pt, int iomgr_success_ignored) { From 45b135e2f56ade45ecb9e6dc040edda595eb99c0 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 22 Mar 2016 16:02:39 -0700 Subject: [PATCH 05/19] Bring chttp2 executor code back up to compiling --- src/core/transport/chttp2/internal.h | 29 +- src/core/transport/chttp2_transport.c | 479 ++++++++++++-------------- 2 files changed, 240 insertions(+), 268 deletions(-) diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index c2977c7b3f1..7489cc67fbc 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -291,9 +291,13 @@ struct grpc_chttp2_transport_parsing { int64_t outgoing_window; }; +typedef void (*grpc_chttp2_locked_action)(grpc_exec_ctx *ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, void *arg); + typedef struct grpc_chttp2_executor_action_header { grpc_chttp2_stream *stream; - void (*action)(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void *arg); + grpc_chttp2_locked_action action; struct grpc_chttp2_executor_action_header *next; void *arg; } grpc_chttp2_executor_action_header; @@ -311,13 +315,11 @@ struct grpc_chttp2_transport { gpr_mu mu; /** is a thread currently in the global lock */ - uint8_t global_active; + bool global_active; /** is a thread currently writing */ - uint8_t writing_active; + bool writing_active; /** is a thread currently parsing */ - uint8_t parsing_active; - /** is a thread currently executing channel callbacks */ - uint8_t channel_callback_active; + bool parsing_active; grpc_chttp2_executor_action_header *pending_actions; } executor; @@ -352,11 +354,11 @@ struct grpc_chttp2_transport { grpc_chttp2_stream_map new_stream_map; /** closure to execute writing */ - grpc_iomgr_closure writing_action; + grpc_closure writing_action; /** closure to start reading from the endpoint */ - grpc_iomgr_closure reading_action; + grpc_closure reading_action; /** closure to actually do parsing */ - grpc_iomgr_closure parsing_action; + grpc_closure parsing_action; struct { size_t nslices; @@ -659,10 +661,11 @@ void grpc_chttp2_parsing_become_skip_parser( void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, grpc_closure **pclosure, int success); -void grpc_chttp2_run_with_global_lock( - grpc_chttp2_transport *transport, grpc_chttp2_stream *optional_stream, - void (*action)(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void *arg), - void *arg, size_t sizeof_arg); +void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *transport, + grpc_chttp2_stream *optional_stream, + grpc_chttp2_locked_action action, + void *arg, size_t sizeof_arg); #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \ diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index a7844ea8e53..eb9f7a23650 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -81,9 +81,6 @@ int grpc_flowctl_trace = 0; static const grpc_transport_vtable vtable; -static void unlock_check_channel_callbacks(grpc_chttp2_transport *t); -static void unlock_check_read_write_state(grpc_chttp2_transport *t); - /* forward declarations of various callbacks that we'll build closures around */ static void writing_action(grpc_exec_ctx *exec_ctx, void *t, bool iomgr_success_ignored); @@ -96,9 +93,6 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, uint32_t value); -/** Endpoint callback to process incoming data */ -static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success); - /** Start disconnection chain */ static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); @@ -132,7 +126,8 @@ static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx, static void maybe_start_some_streams( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global); -static void finish_global_actions(grpc_chttp2_transport *t); +static void finish_global_actions(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t); static void connectivity_state_set( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, @@ -246,9 +241,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, /* ref is dropped at transport close() */ gpr_ref_init(&t->shutdown_ep_refs, 1); gpr_mu_init(&t->executor.mu); - grpc_mdctx_ref(mdctx); t->peer_string = grpc_endpoint_get_peer(ep); - t->metadata_context = mdctx; t->endpoint_reading = 1; t->global.next_stream_id = is_client ? 1 : 2; t->global.is_client = is_client; @@ -280,7 +273,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing, &t->writing); - grpc_closure_init(&t->recv_data, recv_data, t); gpr_slice_buffer_init(&t->read_buffer); if (is_client) { @@ -395,17 +387,19 @@ static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) { gpr_ref(&t->shutdown_ep_refs); } -static void destroy_transport_locked(grpc_chttp2_transport *t, +static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *arg_ignored) { t->destroying = 1; - drop_connection(t); + drop_connection(exec_ctx, t); } -static void destroy_transport(grpc_transport *gt) { +static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_chttp2_run_with_global_lock(t, NULL, destroy_transport_locked, NULL, 0); - UNREF_TRANSPORT(t, "destroy"); + grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, destroy_transport_locked, + NULL, 0); + UNREF_TRANSPORT(exec_ctx, t, "destroy"); } static void allow_endpoint_shutdown_locked(grpc_exec_ctx *exec_ctx, @@ -417,17 +411,6 @@ static void allow_endpoint_shutdown_locked(grpc_exec_ctx *exec_ctx, } } -static void allow_endpoint_shutdown_unlocked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { - if (gpr_unref(&t->shutdown_ep_refs)) { - gpr_mu_lock(&t->mu); - if (t->ep) { - grpc_endpoint_shutdown(exec_ctx, t->ep); - } - gpr_mu_unlock(&t->mu); - } -} - static void destroy_endpoint(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_endpoint_destroy(exec_ctx, t->ep); @@ -479,34 +462,8 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, } #endif -static void close_transport(grpc_transport *gt) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_chttp2_run_with_global_lock(t, NULL, close_transport_locked, NULL, 0); -} - -typedef struct { - grpc_status_code status; - gpr_slice debug_data; -} goaway_arg; - -static void goaway_locked(grpc_chttp2_transport *t, - grpc_chttp2_stream *s_ignored, void *a) { - goaway_arg *arg = a; - grpc_chttp2_goaway_append(t->global.last_incoming_stream_id, - grpc_chttp2_grpc_status_to_http2_error(arg->status), - arg->debug_data, &t->global.qbuf); -} - -static void goaway(grpc_transport *gt, grpc_status_code status, - gpr_slice debug_data) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - goaway_arg arg; - arg.status = status; - arg.debug_data = debug_data; - grpc_chttp2_run_with_global_lock(t, NULL, goaway_locked, &arg, sizeof(arg)); -} - -static void finish_init_stream_locked(grpc_chttp2_transport *t, +static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, grpc_chttp2_stream *s, void *arg_ignored) { grpc_chttp2_register_stream(t, s); @@ -549,7 +506,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->global.in_stream_map = 1; } - grpc_chttp2_run_with_global_lock(t, s, finish_init_stream_locked, NULL, 0); + grpc_chttp2_run_with_global_lock(exec_ctx, t, s, finish_init_stream_locked, + NULL, 0); return 0; } @@ -558,10 +516,10 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; + grpc_byte_stream *bs; #if 0 int i; - grpc_byte_stream *bs; GPR_TIMER_BEGIN("destroy_stream", 0); @@ -644,60 +602,48 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( * LOCK MANAGEMENT */ -static void finish_global_actions(grpc_chttp2_transport *t) { +static void finish_global_actions(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t) { grpc_chttp2_executor_action_header *hdr; grpc_chttp2_executor_action_header *next; - grpc_iomgr_closure *run_closures; for (;;) { - unlock_check_read_write_state(t); if (!t->executor.writing_active && !t->closed && - grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) { + grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing, + t->executor.parsing_active)) { t->executor.writing_active = 1; REF_TRANSPORT(t, "writing"); prevent_endpoint_shutdown(t); - grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1); + grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, true, NULL); } check_read_ops(exec_ctx, &t->global); - unlock_check_channel_callbacks(t); - - run_closures = t->global.pending_closures; - t->global.pending_closures = NULL; - - gpr_mu_lock(&t->executor.mu); - t->executor.global_active = 0; - gpr_mu_unlock(&t->executor.mu); - - while (run_closures) { - grpc_iomgr_closure *next = run_closures->next; - run_closures->cb(run_closures->cb_arg, run_closures->success); - run_closures = next; - } gpr_mu_lock(&t->executor.mu); - if (!t->executor.global_active && t->executor.pending_actions) { - t->executor.global_active = 1; + if (t->executor.pending_actions != NULL) { hdr = t->executor.pending_actions; t->executor.pending_actions = NULL; gpr_mu_unlock(&t->executor.mu); while (hdr != NULL) { - hdr->action(t, hdr->stream, hdr->arg); + hdr->action(exec_ctx, t, hdr->stream, hdr->arg); next = hdr->next; gpr_free(hdr); - UNREF_TRANSPORT(t, "pending_action"); + UNREF_TRANSPORT(exec_ctx, t, "pending_action"); hdr = next; } continue; + } else { + t->executor.global_active = false; } gpr_mu_unlock(&t->executor.mu); break; } } -void grpc_chttp2_run_with_global_lock( - grpc_chttp2_transport *t, grpc_chttp2_stream *optional_stream, - void (*action)(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void *arg), - void *arg, size_t sizeof_arg) { +void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *optional_stream, + grpc_chttp2_locked_action action, + void *arg, size_t sizeof_arg) { grpc_chttp2_executor_action_header *hdr; REF_TRANSPORT(t, "run_global"); @@ -708,9 +654,9 @@ void grpc_chttp2_run_with_global_lock( t->executor.global_active = 1; gpr_mu_unlock(&t->executor.mu); - action(t, optional_stream, arg); + action(exec_ctx, t, optional_stream, arg); - finish_global_actions(t); + finish_global_actions(exec_ctx, t); } else { gpr_mu_unlock(&t->executor.mu); @@ -726,6 +672,7 @@ void grpc_chttp2_run_with_global_lock( gpr_mu_lock(&t->executor.mu); if (!t->executor.global_active) { + /* global lock was released while allocating memory: release & retry */ gpr_free(hdr); continue; } @@ -737,7 +684,7 @@ void grpc_chttp2_run_with_global_lock( break; } - UNREF_TRANSPORT(t, "run_global"); + UNREF_TRANSPORT(exec_ctx, t, "run_global"); } /******************************************************************************* @@ -767,7 +714,8 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, } } -static void terminate_writing_with_lock(grpc_chttp2_transport *t, +static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *a) { int success = *(int *)a; @@ -780,6 +728,7 @@ static void terminate_writing_with_lock(grpc_chttp2_transport *t, grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing); + grpc_chttp2_stream_global *stream_global; while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, &stream_global)) { fail_pending_writes(exec_ctx, stream_global); @@ -794,14 +743,15 @@ static void terminate_writing_with_lock(grpc_chttp2_transport *t, destroy_endpoint(exec_ctx, t); } - UNREF_TRANSPORT(t, "writing"); + UNREF_TRANSPORT(exec_ctx, t, "writing"); } -void grpc_chttp2_terminate_writing( - grpc_chttp2_transport_writing *transport_writing, int success) { +void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, + void *transport_writing, bool success) { grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); - grpc_chttp2_run_with_global_lock(t, NULL, terminate_writing_with_lock, - &success, sizeof(success)); + grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, + terminate_writing_with_lock, &success, + sizeof(success)); } static void writing_action(grpc_exec_ctx *exec_ctx, void *gt, @@ -915,14 +865,16 @@ static int contains_non_ok_status( static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, bool success) {} -static void perform_stream_op_locked( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) { - grpc_closure *on_complete; - +static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, void *stream_op) { GPR_TIMER_BEGIN("perform_stream_op_locked", 0); - on_complete = op->on_complete; + grpc_transport_stream_op *op = stream_op; + grpc_chttp2_transport_global *transport_global = &t->global; + grpc_chttp2_stream_global *stream_global = &s->global; + + grpc_closure *on_complete = op->on_complete; if (on_complete == NULL) { on_complete = grpc_closure_create(do_nothing, NULL); } @@ -1039,12 +991,11 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_transport_stream_op *op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - grpc_chttp2_run_with_global_lock(t, s, perform_stream_op_locked, op, + grpc_chttp2_run_with_global_lock(exec_ctx, t, s, perform_stream_op_locked, op, sizeof(*op)); } -static void send_ping_locked(grpc_chttp2_transport *t, - grpc_chttp2_stream *s_ignored, void *a) { +static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p)); p->next = &t->global.pings; p->prev = p->next->prev; @@ -1057,23 +1008,14 @@ static void send_ping_locked(grpc_chttp2_transport *t, p->id[5] = (t->global.ping_counter >> 16) & 0xff; p->id[6] = (t->global.ping_counter >> 8) & 0xff; p->id[7] = t->global.ping_counter & 0xff; - p->on_recv = *(grpc_iomgr_closure **)a; + p->on_recv = on_recv; gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); } -static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_chttp2_run_with_global_lock(t, NULL, send_ping_locked, &on_recv, - sizeof(on_recv)); -} - -void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport_parsing *transport_parsing, - const uint8_t *opaque_8bytes) { +static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_stream *s, void *opaque_8bytes) { grpc_chttp2_outstanding_ping *ping; - grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing); grpc_chttp2_transport_global *transport_global = &t->global; - lock(t); for (ping = transport_global->pings.next; ping != &transport_global->pings; ping = ping->next) { if (0 == memcmp(opaque_8bytes, ping->id, 8)) { @@ -1084,13 +1026,30 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, break; } } - unlock(exec_ctx, t); +} + +void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_parsing *transport_parsing, + const uint8_t *opaque_8bytes) { + grpc_chttp2_run_with_global_lock( + exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing), NULL, + ack_ping_locked, (void *)opaque_8bytes, 8); } static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_transport_op *op) { - bool close_transport = false; + grpc_chttp2_stream *s_unused, + void *stream_op) { + grpc_transport_op *op = stream_op; + bool close_transport = op->disconnect; + + /* If there's a set_accept_stream ensure that we're not parsing + to avoid changing things out from underneath */ + if (t->executor.parsing_active && op->set_accept_stream) { + GPR_ASSERT(t->post_parsing_op == NULL); + t->post_parsing_op = gpr_malloc(sizeof(*op)); + memcpy(t->post_parsing_op, op, sizeof(*op)); + } grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); @@ -1116,47 +1075,31 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, } if (op->bind_pollset) { - add_to_pollset_locked(exec_ctx, t, op->bind_pollset); + add_to_pollset_locked(exec_ctx, t, NULL, op->bind_pollset); } if (op->bind_pollset_set) { - add_to_pollset_set_locked(exec_ctx, t, op->bind_pollset_set); + add_to_pollset_set_locked(exec_ctx, t, NULL, op->bind_pollset_set); } if (op->send_ping) { send_ping_locked(t, op->send_ping); } - if (op->disconnect) { - close_transport_locked(exec_ctx, t); - } - if (close_transport) { - close_transport_locked(exec_ctx, t); + close_transport_locked(exec_ctx, t, NULL, NULL); } } static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_transport_op *op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - - lock(t); - - /* If there's a set_accept_stream ensure that we're not parsing - to avoid changing things out from underneath */ - if (t->parsing_active && op->set_accept_stream) { - GPR_ASSERT(t->post_parsing_op == NULL); - t->post_parsing_op = gpr_malloc(sizeof(*op)); - memcpy(t->post_parsing_op, op, sizeof(*op)); - } else { - perform_transport_op_locked(exec_ctx, t, op); - } - - unlock(exec_ctx, t); + grpc_chttp2_run_with_global_lock( + exec_ctx, t, NULL, perform_transport_op_locked, op, sizeof(*op)); } /******************************************************************************* - * INPUT PROCESSING + * INPUT PROCESSING - GENERAL */ static void check_read_ops(grpc_exec_ctx *exec_ctx, @@ -1233,7 +1176,7 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { - close_transport_locked(exec_ctx, t); + close_transport_locked(exec_ctx, t, NULL, NULL); } if (grpc_chttp2_list_remove_writable_stream(&t->global, &s->global)) { GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "chttp2_writing"); @@ -1328,7 +1271,7 @@ void grpc_chttp2_mark_stream_closed( } if (close_writes && !stream_global->write_closed) { stream_global->write_closed = 1; - if (TRANSPORT_FROM_GLOBAL(transport_global)->writing_active) { + if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.writing_active) { GRPC_CHTTP2_STREAM_REF(stream_global, "finish_writes"); grpc_chttp2_list_add_closed_waiting_for_writing(transport_global, stream_global); @@ -1338,7 +1281,7 @@ void grpc_chttp2_mark_stream_closed( } if (stream_global->read_closed && stream_global->write_closed) { if (stream_global->id != 0 && - TRANSPORT_FROM_GLOBAL(transport_global)->parsing_active) { + TRANSPORT_FROM_GLOBAL(transport_global)->executor.parsing_active) { grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, stream_global); } else { @@ -1469,35 +1412,10 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, } static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { - if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) { - t->global.error_state = GRPC_CHTTP2_ERROR_STATE_SEEN; - } close_transport_locked(exec_ctx, t, NULL, NULL); end_all_the_calls(exec_ctx, t); } -static void read_error_locked(grpc_chttp2_transport *t) { - t->endpoint_reading = 0; - if (!t->executor.writing_active && t->ep) { - grpc_endpoint_destroy(t->ep); - t->ep = NULL; - /* safe as we still have a ref for read */ - UNREF_TRANSPORT(t, "disconnect"); - } -} - -static void recv_data_error_locked(grpc_chttp2_transport *t, - grpc_chttp2_stream *s, void *a) { - size_t i; - - drop_connection(t); - read_error_locked(t); - for (i = 0; i < t->executor_parsing.nslices; i++) - gpr_slice_unref(t->executor_parsing.slices[i]); - memset(&t->executor_parsing, 0, sizeof(t->executor_parsing)); - UNREF_TRANSPORT(t, "recv_data"); -} - /** update window from a settings change */ static void update_global_window(void *args, uint32_t id, void *stream) { grpc_chttp2_transport *t = args; @@ -1518,58 +1436,72 @@ static void update_global_window(void *args, uint32_t id, void *stream) { } } -static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) { - grpc_chttp2_run_with_global_lock(t, NULL, recv_data_locked, - (void *)(uintptr_t)success, 0); +/******************************************************************************* + * INPUT PROCESSING - PARSING + */ + +static void reading_action_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, void *arg); +static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success); +static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, void *arg); +static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, void *arg); + +static void reading_action(grpc_exec_ctx *exec_ctx, void *tp, bool success) { /* Control flow: - recv_data_locked -> + reading_action_locked -> (parse_unlocked -> post_parse_locked)? -> - post_recv_data_locked */ + post_reading_action_locked */ + grpc_chttp2_run_with_global_lock(exec_ctx, tp, NULL, reading_action_locked, + (void *)(uintptr_t)success, 0); } -static void recv_data_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s_unused, void *arg) { - size_t i; - int keep_reading = 0; +static void reading_action_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, void *arg) { grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_transport_parsing *transport_parsing = &t->parsing; - grpc_chttp2_stream_global *stream_global; bool success = (bool)(uintptr_t)arg; - i = 0; - GPR_ASSERT(!t->parsing_active); + GPR_ASSERT(!t->executor.parsing_active); if (!t->closed) { t->executor.parsing_active = 1; /* merge stream lists */ grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); grpc_chttp2_prepare_to_read(transport_global, transport_parsing); - grpc_exec_ctx_enqueue(exec_ctx, parse_locked, t, NULL); + grpc_exec_ctx_enqueue(exec_ctx, &t->parsing_action, success, NULL); } else { - post_recv_data_locked(exec_ctx, t, s_unused, arg); + post_reading_action_locked(exec_ctx, t, s_unused, arg); } } -static void parse_locked(grpc_exec_ctx *exec_ctx, void *arg, bool success) { - GPR_TIMER_BEGIN("recv_data.parse", 0); +static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success) { + grpc_chttp2_transport *t = arg; + GPR_TIMER_BEGIN("reading_action.parse", 0); + size_t i = 0; for (; i < t->read_buffer.count && - grpc_chttp2_perform_read(exec_ctx, transport_parsing, + grpc_chttp2_perform_read(exec_ctx, &t->parsing, t->read_buffer.slices[i]); i++) ; - GPR_TIMER_END("recv_data.parse", 0); - grpc_chttp2_run_with_global_lock(t, s_unused, post_parse_locked, arg, 0) + if (i != t->read_buffer.count) { + success = false; + } + GPR_TIMER_END("reading_action.parse", 0); + grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, post_parse_locked, + (void *)(uintptr_t)success, 0); } static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s_unused, void *arg) { + grpc_chttp2_transport_global *transport_global = &t->global; + grpc_chttp2_transport_parsing *transport_parsing = &t->parsing; /* copy parsing qbuf to global qbuf */ gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf); - if (i != t->read_buffer.count) { - unlock(exec_ctx, t); - lock(t); - drop_connection(exec_ctx, t); - } /* merge stream lists */ grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); transport_global->concurrent_stream_count = @@ -1581,20 +1513,18 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } /* handle higher level things */ grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing); - t->parsing_active = 0; + t->executor.parsing_active = 0; /* handle delayed transport ops (if there is one) */ if (t->post_parsing_op) { grpc_transport_op *op = t->post_parsing_op; t->post_parsing_op = NULL; - perform_transport_op_locked(exec_ctx, t, op); + perform_transport_op_locked(exec_ctx, t, NULL, op); gpr_free(op); } /* if a stream is in the stream map, and gets cancelled, we need to - * ensure - * we are not parsing before continuing the cancellation to keep - * things - * in - * a sane state */ + * ensure we are not parsing before continuing the cancellation to keep + * things in a sane state */ + grpc_chttp2_stream_global *stream_global; while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global, &stream_global)) { GPR_ASSERT(stream_global->in_stream_map); @@ -1604,28 +1534,37 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); } - post_recv_data_locked(exec_ctx, t, s_unused, arg); + post_reading_action_locked(exec_ctx, t, s_unused, arg); } -static void post_recv_data_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s_unused, void *arg) { - if (!success || i != t->read_buffer.count || t->closed) { +static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, + void *arg) { + bool success = (bool)(uintptr_t)arg; + bool keep_reading = false; + if (!success || t->closed) { drop_connection(exec_ctx, t); - read_error_locked(exec_ctx, t); + t->endpoint_reading = 0; + if (!t->executor.writing_active && t->ep) { + grpc_endpoint_destroy(exec_ctx, t->ep); + t->ep = NULL; + /* safe as we still have a ref for read */ + UNREF_TRANSPORT(exec_ctx, t, "disconnect"); + } } else if (!t->closed) { - keep_reading = 1; + keep_reading = true; REF_TRANSPORT(t, "keep_reading"); prevent_endpoint_shutdown(t); } gpr_slice_buffer_reset_and_unref(&t->read_buffer); if (keep_reading) { - grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->recv_data); + grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->reading_action); allow_endpoint_shutdown_locked(exec_ctx, t); UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); } else { - UNREF_TRANSPORT(exec_ctx, t, "recv_data"); + UNREF_TRANSPORT(exec_ctx, t, "reading_action"); } } @@ -1650,7 +1589,7 @@ static void connectivity_state_set( static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_pollset *pollset) { + grpc_chttp2_stream *s_unused, void *pollset) { if (t->ep) { grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset); } @@ -1658,7 +1597,8 @@ static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx, static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_pollset_set *pollset_set) { + grpc_chttp2_stream *s_unused, + void *pollset_set) { if (t->ep) { grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set); } @@ -1666,10 +1606,10 @@ static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx, static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_pollset *pollset) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; /* TODO(ctiller): keep pollset alive */ - grpc_chttp2_run_with_global_lock(gt, gs, add_to_pollset_locked, pollset, - NULL); + grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt, + (grpc_chttp2_stream *)gs, + add_to_pollset_locked, pollset, 0); } /******************************************************************************* @@ -1716,36 +1656,51 @@ static void incoming_byte_stream_update_flow_control( } } -static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, - gpr_slice *slice, size_t max_size_hint, - grpc_closure *on_complete) { +typedef struct { + grpc_chttp2_incoming_byte_stream *byte_stream; + gpr_slice *slice; + size_t max_size_hint; + grpc_closure *on_complete; +} incoming_byte_stream_next_arg; + +static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *argp) { + incoming_byte_stream_next_arg *arg = argp; grpc_chttp2_incoming_byte_stream *bs = - (grpc_chttp2_incoming_byte_stream *)byte_stream; + (grpc_chttp2_incoming_byte_stream *)arg->byte_stream; grpc_chttp2_transport_global *transport_global = &bs->transport->global; grpc_chttp2_stream_global *stream_global = &bs->stream->global; - lock(bs->transport); if (bs->is_tail) { - incoming_byte_stream_update_flow_control(transport_global, stream_global, - max_size_hint, bs->slices.length); + incoming_byte_stream_update_flow_control( + transport_global, stream_global, arg->max_size_hint, bs->slices.length); } if (bs->slices.count > 0) { - *slice = gpr_slice_buffer_take_first(&bs->slices); - unlock(exec_ctx, bs->transport); - return 1; + *arg->slice = gpr_slice_buffer_take_first(&bs->slices); + grpc_exec_ctx_enqueue(exec_ctx, arg->on_complete, true, NULL); } else if (bs->failed) { - grpc_exec_ctx_enqueue(exec_ctx, on_complete, false, NULL); - unlock(exec_ctx, bs->transport); - return 0; + grpc_exec_ctx_enqueue(exec_ctx, arg->on_complete, false, NULL); } else { - bs->on_next = on_complete; - bs->next = slice; - unlock(exec_ctx, bs->transport); - return 0; + bs->on_next = arg->on_complete; + bs->next = arg->slice; } } +static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + gpr_slice *slice, size_t max_size_hint, + grpc_closure *on_complete) { + grpc_chttp2_incoming_byte_stream *bs = + (grpc_chttp2_incoming_byte_stream *)byte_stream; + incoming_byte_stream_next_arg arg = {bs, slice, max_size_hint, on_complete}; + grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, + incoming_byte_stream_next_locked, &arg, + sizeof(arg)); + return 0; +} + static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream *bs) { if (gpr_unref(&bs->refs)) { gpr_slice_buffer_destroy(&bs->slices); @@ -1758,18 +1713,43 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, incoming_byte_stream_unref((grpc_chttp2_incoming_byte_stream *)byte_stream); } -void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, - grpc_chttp2_incoming_byte_stream *bs, - gpr_slice slice) { - gpr_mu_lock(&bs->transport->mu); +typedef struct { + grpc_chttp2_incoming_byte_stream *byte_stream; + gpr_slice slice; +} incoming_byte_stream_push_arg; + +static void incoming_byte_stream_push_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *argp) { + incoming_byte_stream_push_arg *arg = argp; + grpc_chttp2_incoming_byte_stream *bs = arg->byte_stream; if (bs->on_next != NULL) { - *bs->next = slice; + *bs->next = arg->slice; grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, true, NULL); bs->on_next = NULL; } else { - gpr_slice_buffer_add(&bs->slices, slice); + gpr_slice_buffer_add(&bs->slices, arg->slice); } - gpr_mu_unlock(&bs->transport->mu); +} + +void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, + grpc_chttp2_incoming_byte_stream *bs, + gpr_slice slice) { + incoming_byte_stream_push_arg arg = {bs, slice}; + grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, + incoming_byte_stream_push_locked, &arg, + sizeof(arg)); +} + +static void incoming_byte_stream_finished_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *argp) { + grpc_chttp2_incoming_byte_stream *bs = argp; + grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, false, NULL); + bs->on_next = NULL; + bs->failed = 1; } void grpc_chttp2_incoming_byte_stream_finished( @@ -1777,24 +1757,13 @@ void grpc_chttp2_incoming_byte_stream_finished( int from_parsing_thread) { if (!success) { if (from_parsing_thread) { - gpr_mu_lock(&bs->transport->mu); - } - grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, false, NULL); - bs->on_next = NULL; - bs->failed = 1; - if (from_parsing_thread) { - gpr_mu_unlock(&bs->transport->mu); - } - } else { -#ifndef NDEBUG - if (from_parsing_thread) { - gpr_mu_lock(&bs->transport->mu); - } - GPR_ASSERT(bs->on_next == NULL); - if (from_parsing_thread) { - gpr_mu_unlock(&bs->transport->mu); + grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, + incoming_byte_stream_finished_locked, bs, + 0); + } else { + incoming_byte_stream_finished_locked(exec_ctx, bs->transport, bs->stream, + bs); } -#endif } incoming_byte_stream_unref(bs); } @@ -1943,7 +1912,7 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx, grpc_transport *transport, gpr_slice *slices, size_t nslices) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport; - REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */ + REF_TRANSPORT(t, "reading_action"); /* matches unref inside reading_action */ gpr_slice_buffer_addn(&t->read_buffer, slices, nslices); - recv_data(exec_ctx, t, 1); + reading_action(exec_ctx, t, 1); } From 0027de17fc5e75af5ab5379f0e689223d0c81a3e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 22 Mar 2016 16:04:40 -0700 Subject: [PATCH 06/19] Remove unused declaration --- src/core/transport/chttp2/internal.h | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 7489cc67fbc..7c97c7e33d9 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -360,11 +360,6 @@ struct grpc_chttp2_transport { /** closure to actually do parsing */ grpc_closure parsing_action; - struct { - size_t nslices; - gpr_slice *slices; - } executor_parsing; - /** incoming read bytes */ gpr_slice_buffer read_buffer; From 1907fc6ee9b26880f781eeee5459971d79d7337e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 22 Mar 2016 16:05:54 -0700 Subject: [PATCH 07/19] Swap order of functions to reduce diff --- src/core/transport/chttp2_transport.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index eb9f7a23650..596e01f5c92 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -380,13 +380,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } -/** block grpc_endpoint_shutdown being called until a paired - allow_endpoint_shutdown is made */ -static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) { - GPR_ASSERT(t->ep); - gpr_ref(&t->shutdown_ep_refs); -} - static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, @@ -402,6 +395,13 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { UNREF_TRANSPORT(exec_ctx, t, "destroy"); } +/** block grpc_endpoint_shutdown being called until a paired + allow_endpoint_shutdown is made */ +static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) { + GPR_ASSERT(t->ep); + gpr_ref(&t->shutdown_ep_refs); +} + static void allow_endpoint_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { if (gpr_unref(&t->shutdown_ep_refs)) { From 223ae1c682dc56eefaae40f3fa28dd2b38d624d5 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 22 Mar 2016 16:08:35 -0700 Subject: [PATCH 08/19] Fix inadvertently reverted code --- src/core/transport/chttp2_transport.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 596e01f5c92..a82440701dc 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1000,14 +1000,14 @@ static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { p->next = &t->global.pings; p->prev = p->next->prev; p->prev->next = p->next->prev = p; - p->id[0] = (t->global.ping_counter >> 56) & 0xff; - p->id[1] = (t->global.ping_counter >> 48) & 0xff; - p->id[2] = (t->global.ping_counter >> 40) & 0xff; - p->id[3] = (t->global.ping_counter >> 32) & 0xff; - p->id[4] = (t->global.ping_counter >> 24) & 0xff; - p->id[5] = (t->global.ping_counter >> 16) & 0xff; - p->id[6] = (t->global.ping_counter >> 8) & 0xff; - p->id[7] = t->global.ping_counter & 0xff; + p->id[0] = (uint8_t)((t->global.ping_counter >> 56) & 0xff); + p->id[1] = (uint8_t)((t->global.ping_counter >> 48) & 0xff); + p->id[2] = (uint8_t)((t->global.ping_counter >> 40) & 0xff); + p->id[3] = (uint8_t)((t->global.ping_counter >> 32) & 0xff); + p->id[4] = (uint8_t)((t->global.ping_counter >> 24) & 0xff); + p->id[5] = (uint8_t)((t->global.ping_counter >> 16) & 0xff); + p->id[6] = (uint8_t)((t->global.ping_counter >> 8) & 0xff); + p->id[7] = (uint8_t)(t->global.ping_counter & 0xff); p->on_recv = on_recv; gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); } From b4f7cbd339ea89e1f736aa22c86ad72ad69decec Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 22 Mar 2016 16:22:37 -0700 Subject: [PATCH 09/19] Fix stack corruption --- src/core/transport/chttp2_transport.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index a82440701dc..83739db1e2f 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -718,7 +718,7 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *a) { - int success = *(int *)a; + bool success = (bool)(uintptr_t)a; allow_endpoint_shutdown_locked(exec_ctx, t); @@ -750,8 +750,8 @@ void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, void *transport_writing, bool success) { grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, - terminate_writing_with_lock, &success, - sizeof(success)); + terminate_writing_with_lock, + (void *)(uintptr_t)success, 0); } static void writing_action(grpc_exec_ctx *exec_ctx, void *gt, From a67a83ebde2ab47b230857d47330ac63c7ab62b1 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 22 Mar 2016 21:43:59 -0700 Subject: [PATCH 10/19] Move the most important member first --- src/core/transport/transport.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 56ac927e365..908ef0bda45 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -98,6 +98,11 @@ void grpc_transport_move_stats(grpc_transport_stream_stats *from, /* Transport stream op: a set of operations to perform on a transport against a single stream */ typedef struct grpc_transport_stream_op { + /** Should be enqueued when all requested operations (excluding recv_message + and recv_initial_metadata which have their own closures) in a given batch + have been completed. */ + grpc_closure *on_complete; + /** Send initial metadata to the peer, from the provided metadata batch. */ grpc_metadata_batch *send_initial_metadata; @@ -124,11 +129,6 @@ typedef struct grpc_transport_stream_op { /** Collect any stats into provided buffer, zero internal stat counters */ grpc_transport_stream_stats *collect_stats; - /** Should be enqueued when all requested operations (excluding recv_message - and recv_initial_metadata which have their own closures) in a given batch - have been completed. */ - grpc_closure *on_complete; - /** If != GRPC_STATUS_OK, cancel this stream */ grpc_status_code cancel_with_status; From 2c8063cc7200789da67a10c53d98d0d15b1ad378 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 22 Mar 2016 22:12:15 -0700 Subject: [PATCH 11/19] Introduce a new memory reclamation scheme for channel stacks Allows the bottom member of the stack to schedule the actual deallocation, allowing a final transport lock to be entered when destroying a call. --- src/core/census/grpc_filter.c | 4 +- src/core/channel/channel_stack.c | 6 ++- src/core/channel/channel_stack.h | 11 +++-- src/core/channel/client_channel.c | 5 ++- src/core/channel/compress_filter.c | 4 +- src/core/channel/connected_channel.c | 13 +++--- src/core/channel/http_client_filter.c | 4 +- src/core/channel/http_server_filter.c | 4 +- src/core/client_config/subchannel.c | 6 +-- src/core/security/client_auth_filter.c | 4 +- src/core/security/server_auth_filter.c | 4 +- src/core/surface/call.c | 3 +- src/core/surface/lame_client.c | 12 +++--- src/core/surface/server.c | 4 +- src/core/transport/chttp2_transport.c | 57 +++++++++++++------------- src/core/transport/transport.c | 5 ++- src/core/transport/transport.h | 2 +- src/core/transport/transport_impl.h | 2 +- test/core/channel/channel_stack_test.c | 6 +-- 19 files changed, 83 insertions(+), 73 deletions(-) diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c index c8aaf31e2d3..987407f050e 100644 --- a/src/core/census/grpc_filter.c +++ b/src/core/census/grpc_filter.c @@ -134,7 +134,7 @@ static void client_init_call_elem(grpc_exec_ctx *exec_ctx, } static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { + grpc_call_element *elem, void *ignored) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); /* TODO(hongyu): record rpc client stats and census_rpc_end_op here */ @@ -152,7 +152,7 @@ static void server_init_call_elem(grpc_exec_ctx *exec_ctx, } static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { + grpc_call_element *elem, void *ignored) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); /* TODO(hongyu): record rpc server stats and census_tracing_end_op here */ diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 3e616883640..1869597975f 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -213,14 +213,16 @@ void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_pollset *pollset) {} -void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack) { +void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack, + void *and_free_memory) { grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack); size_t count = stack->count; size_t i; /* destroy per-filter data */ for (i = 0; i < count; i++) { - elems[i].filter->destroy_call_elem(exec_ctx, &elems[i]); + elems[i].filter->destroy_call_elem(exec_ctx, &elems[i], + i == count - 1 ? and_free_memory : NULL); } } diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 52362f0b20e..4407333affe 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -104,8 +104,12 @@ typedef struct { void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_pollset *pollset); /* Destroy per call data. - The filter does not need to do any chaining */ - void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); + The filter does not need to do any chaining. + The bottom filter of a stack will be passed a non-NULL pointer to + \a and_free_memory that should be passed to gpr_free when destruction + is complete. */ + void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *and_free_memory); /* sizeof(per channel data) */ size_t sizeof_channel_data; @@ -223,7 +227,8 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx, #endif /* Destroy a call stack */ -void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack); +void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack, + void *and_free_memory); /* Ignore set pollset - used by filters to implement the set_pollset method if they don't care about pollsets at all. Does nothing. */ diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index f021a8ae329..27d6e03d286 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -379,9 +379,10 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *and_free_memory) { grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data); + gpr_free(and_free_memory); } /* Constructor for channel_data */ diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 3e7ca08fd29..d5d5fc2e350 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -246,8 +246,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; gpr_slice_buffer_destroy(&calld->slices); diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index e7ed3ccfebe..e6e9daa4522 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -37,13 +37,13 @@ #include #include -#include "src/core/support/string.h" -#include "src/core/transport/transport.h" -#include "src/core/profiling/timers.h" #include #include #include #include +#include "src/core/profiling/timers.h" +#include "src/core/support/string.h" +#include "src/core/transport/transport.h" #define MAX_BUFFER_LENGTH 8192 @@ -102,12 +102,13 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *and_free_memory) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_transport_destroy_stream(exec_ctx, chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld)); + TRANSPORT_STREAM_FROM_CALL_DATA(calld), + and_free_memory); } /* Constructor for channel_data */ diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 1aa27208c2e..26f1110ac47 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -151,8 +151,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) {} +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) {} static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) { unsigned i; diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 370f8dbe423..6c97a0762d3 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -212,8 +212,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) {} +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) {} /* Constructor for channel_data */ static void init_channel_elem(grpc_exec_ctx *exec_ctx, diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 8f150a8d81d..7a61df40df7 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -620,9 +620,9 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, bool success) { grpc_subchannel_call *c = call; GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); - grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, c->connection, "subchannel_call"); - gpr_free(c); + grpc_connected_subchannel *connection = c->connection; + grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), c); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call"); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index 332d4259d28..c72bfc40e68 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -277,8 +277,8 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) { call_data *calld = elem->call_data; grpc_call_credentials_unref(calld->creds); if (calld->host != NULL) { diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index 3d8e5e8d35d..3d348c8266e 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -224,8 +224,8 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_pollset *pollset) {} /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) {} +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) {} /* Constructor for channel_data */ static void init_channel_elem(grpc_exec_ctx *exec_ctx, diff --git a/src/core/surface/call.c b/src/core/surface/call.c index d9808e6f4cc..e7861b8e3bd 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -375,7 +375,6 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) { if (c->receiving_stream != NULL) { grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); } - grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c)); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); gpr_mu_destroy(&c->mu); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { @@ -394,7 +393,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) { if (c->cq) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } - gpr_free(c); + grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), c); GPR_TIMER_END("destroy_call", 0); } diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 58f89946d22..9ef88bba652 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -37,13 +37,13 @@ #include +#include +#include #include "src/core/channel/channel_stack.h" #include "src/core/support/string.h" #include "src/core/surface/api_trace.h" -#include "src/core/surface/channel.h" #include "src/core/surface/call.h" -#include -#include +#include "src/core/surface/channel.h" typedef struct { grpc_linked_mdelem status; @@ -104,8 +104,10 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) {} -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) {} +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *and_free_memory) { + gpr_free(and_free_memory); +} static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, diff --git a/src/core/surface/server.c b/src/core/surface/server.c index da93474b261..88554e4224c 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -692,8 +692,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, server_ref(chand->server); } -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 182c713ee78..3a73826c3aa 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -512,26 +512,20 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, return 0; } -static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; +static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, void *arg) { grpc_byte_stream *bs; -#if 0 - int i; - GPR_TIMER_BEGIN("destroy_stream", 0); - gpr_mu_lock(&t->mu); - GPR_ASSERT((s->global.write_closed && s->global.read_closed) || s->global.id == 0); GPR_ASSERT(!s->global.in_stream_map); if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { - close_transport_locked(exec_ctx, t); + close_transport_locked(exec_ctx, t, NULL, NULL); } - if (!t->parsing_active && s->global.id) { + if (!t->executor.parsing_active && s->global.id) { GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map, s->global.id) == NULL); } @@ -539,11 +533,11 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global, &s->global); grpc_chttp2_list_remove_stalled_by_transport(&t->global, &s->global); -#endif - int i; + /* TODO(ctiller): the remainder of this function could be done without the + global lock */ - for (i = 0; i < STREAM_LIST_COUNT; i++) { + for (int i = 0; i < STREAM_LIST_COUNT; i++) { if (s->included[i]) { gpr_log(GPR_ERROR, "%s stream %d still included in list %d", t->global.is_client ? "client" : "server", s->global.id, i); @@ -574,6 +568,17 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, UNREF_TRANSPORT(exec_ctx, t, "stream"); GPR_TIMER_END("destroy_stream", 0); + + gpr_free(arg); +} + +static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, void *and_free_memory) { + grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; + grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; + + grpc_chttp2_run_with_global_lock(exec_ctx, t, s, destroy_stream_locked, + and_free_memory, 0); } grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( @@ -1509,8 +1514,8 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success) { GPR_TIMER_BEGIN("reading_action.parse", 0); size_t i = 0; for (; i < t->read_buffer.count && - grpc_chttp2_perform_read(exec_ctx, &t->parsing, - t->read_buffer.slices[i]); + grpc_chttp2_perform_read(exec_ctx, &t->parsing, + t->read_buffer.slices[i]); i++) ; if (i != t->read_buffer.count) { @@ -1602,10 +1607,9 @@ static void connectivity_state_set( grpc_connectivity_state state, const char *reason) { GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_DEBUG, "set connectivity_state=%d", state)); - grpc_connectivity_state_set( - exec_ctx, - &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker, - state, reason); + grpc_connectivity_state_set(exec_ctx, &TRANSPORT_FROM_GLOBAL(transport_global) + ->channel_callback.state_tracker, + state, reason); } /******************************************************************************* @@ -1915,15 +1919,10 @@ static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) { return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string); } -static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), - "chttp2", - init_stream, - set_pollset, - perform_stream_op, - perform_transport_op, - destroy_stream, - destroy_transport, - chttp2_get_peer}; +static const grpc_transport_vtable vtable = { + sizeof(grpc_chttp2_stream), "chttp2", init_stream, set_pollset, + perform_stream_op, perform_transport_op, destroy_stream, destroy_transport, + chttp2_get_peer}; grpc_transport *grpc_create_chttp2_transport( grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args, diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 5b5af0e7d0b..b42980431a3 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -133,8 +133,9 @@ void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx, void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, - grpc_stream *stream) { - transport->vtable->destroy_stream(exec_ctx, transport, stream); + grpc_stream *stream, void *and_free_memory) { + transport->vtable->destroy_stream(exec_ctx, transport, stream, + and_free_memory); } char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 908ef0bda45..ddb81f9a34b 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -208,7 +208,7 @@ void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx, caller, but any child memory must be cleaned up) */ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, - grpc_stream *stream); + grpc_stream *stream, void *and_free_memory); void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op); diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index d9ecc4d2ba2..9d53971f4d5 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -63,7 +63,7 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_destroy_stream */ void (*destroy_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self, - grpc_stream *stream); + grpc_stream *stream, void *and_free_memory); /* implementation of grpc_transport_destroy */ void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self); diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c index e19e9a57aed..ca7c57c94e8 100644 --- a/test/core/channel/channel_stack_test.c +++ b/test/core/channel/channel_stack_test.c @@ -62,8 +62,8 @@ static void call_init_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void channel_destroy_func(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {} -static void call_destroy_func(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void call_destroy_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) { ++*(int *)(elem->channel_data); } @@ -87,7 +87,7 @@ static void free_channel(grpc_exec_ctx *exec_ctx, void *arg, bool success) { } static void free_call(grpc_exec_ctx *exec_ctx, void *arg, bool success) { - grpc_call_stack_destroy(exec_ctx, arg); + grpc_call_stack_destroy(exec_ctx, arg, NULL); gpr_free(arg); } From 414217e22c019d8c23e528fb010091244506d896 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 25 Mar 2016 13:31:06 -0700 Subject: [PATCH 12/19] Fix ordering problem: trailing metadata could come before the last message was read --- src/core/surface/completion_queue.c | 4 ++ src/core/transport/chttp2/internal.h | 19 ++++--- src/core/transport/chttp2_transport.c | 74 ++++++++++++++++++++++----- 3 files changed, 76 insertions(+), 21 deletions(-) diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index b22818ea87d..1e7d1b0028d 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -227,6 +227,10 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, #endif GPR_TIMER_BEGIN("grpc_cq_end_op", 0); + GRPC_API_TRACE( + "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, success=%d, done=%p, " + "done_arg=%p, storage=%p)", + 7, (exec_ctx, cc, tag, success, done, done_arg, storage)); storage->tag = tag; storage->done = done; diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 8c5ed6532b1..7f6c4d3de22 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -416,21 +416,26 @@ typedef struct { grpc_transport_stream_stats *collecting_stats; grpc_transport_stream_stats stats; + /** number of streams that are currently being read */ + gpr_refcount active_streams; + /** when the application requests writes be closed, the write_closed is 'queued'; when the close is flow controlled into the send path, we are 'sending' it; when the write has been performed it is 'sent' */ - uint8_t write_closed; + bool write_closed; /** is this stream reading half-closed (boolean) */ - uint8_t read_closed; + bool read_closed; + /** are all published incoming byte streams closed */ + bool all_incoming_byte_streams_finished; /** is this stream in the stream map? (boolean) */ - uint8_t in_stream_map; + bool in_stream_map; /** has this stream seen an error? if 1, then pending incoming frames can be thrown away */ - uint8_t seen_error; + bool seen_error; - uint8_t published_initial_metadata; - uint8_t published_trailing_metadata; - uint8_t faked_trailing_metadata; + bool published_initial_metadata; + bool published_trailing_metadata; + bool faked_trailing_metadata; grpc_chttp2_incoming_metadata_buffer received_initial_metadata; grpc_chttp2_incoming_metadata_buffer received_trailing_metadata; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 3a73826c3aa..8f660a537fd 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -478,6 +478,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, memset(s, 0, sizeof(*s)); s->refcount = refcount; + /* We reserve one 'active stream' that's dropped when the stream is + read-closed. The others are for incoming_byte_streams that are actively + reading */ + gpr_ref_init(&s->global.active_streams, 1); GRPC_CHTTP2_STREAM_REF(&s->global, "chttp2"); grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.metadata_buffer[0]); @@ -1169,7 +1173,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, &stream_global->incoming_frames)) != NULL) { grpc_byte_stream_destroy(exec_ctx, bs); } - if (stream_global->incoming_frames.head == NULL) { + if (stream_global->all_incoming_byte_streams_finished) { grpc_chttp2_incoming_metadata_buffer_publish( &stream_global->received_trailing_metadata, stream_global->recv_trailing_metadata); @@ -1181,6 +1185,15 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, } } +static void decrement_active_streams_locked( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global) { + if ((stream_global->all_incoming_byte_streams_finished = + gpr_unref(&stream_global->active_streams))) { + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } +} + static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, uint32_t id) { size_t new_stream_count; @@ -1297,6 +1310,7 @@ void grpc_chttp2_mark_stream_closed( stream_global->read_closed = 1; stream_global->published_initial_metadata = 1; stream_global->published_trailing_metadata = 1; + decrement_active_streams_locked(exec_ctx, transport_global, stream_global); } if (close_writes && !stream_global->write_closed) { stream_global->write_closed = 1; @@ -1730,8 +1744,14 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, return 0; } -static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream *bs) { +static void incoming_byte_stream_unref_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *argp) { + grpc_chttp2_incoming_byte_stream *bs = argp; if (gpr_unref(&bs->refs)) { + decrement_active_streams_locked(exec_ctx, &bs->transport->global, + &bs->stream->global); gpr_slice_buffer_destroy(&bs->slices); gpr_free(bs); } @@ -1739,7 +1759,10 @@ static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream *bs) { static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) { - incoming_byte_stream_unref((grpc_chttp2_incoming_byte_stream *)byte_stream); + grpc_chttp2_incoming_byte_stream *bs = + (grpc_chttp2_incoming_byte_stream *)byte_stream; + grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, + incoming_byte_stream_unref_locked, bs, 0); } typedef struct { @@ -1771,30 +1794,52 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, sizeof(arg)); } -static void incoming_byte_stream_finished_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, - void *argp) { +static void incoming_byte_stream_deactivate_locked( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, + grpc_chttp2_incoming_byte_stream *bs) { + incoming_byte_stream_unref_locked(exec_ctx, t, s, bs); +} + +static void incoming_byte_stream_finished_failed_locked( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, + void *argp) { grpc_chttp2_incoming_byte_stream *bs = argp; grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, false, NULL); bs->on_next = NULL; bs->failed = 1; + incoming_byte_stream_deactivate_locked(exec_ctx, t, s, bs); +} + +static void incoming_byte_stream_finished_ok_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *argp) { + grpc_chttp2_incoming_byte_stream *bs = argp; + incoming_byte_stream_deactivate_locked(exec_ctx, t, s, bs); } void grpc_chttp2_incoming_byte_stream_finished( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, int success, int from_parsing_thread) { - if (!success) { - if (from_parsing_thread) { + if (from_parsing_thread) { + if (success) { grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, - incoming_byte_stream_finished_locked, bs, - 0); + incoming_byte_stream_finished_ok_locked, + bs, 0); + } else { + incoming_byte_stream_finished_ok_locked(exec_ctx, bs->transport, + bs->stream, bs); + } + } else { + if (success) { + grpc_chttp2_run_with_global_lock( + exec_ctx, bs->transport, bs->stream, + incoming_byte_stream_finished_failed_locked, bs, 0); } else { - incoming_byte_stream_finished_locked(exec_ctx, bs->transport, bs->stream, - bs); + incoming_byte_stream_finished_failed_locked(exec_ctx, bs->transport, + bs->stream, bs); } } - incoming_byte_stream_unref(bs); } grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( @@ -1811,6 +1856,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( incoming_byte_stream->next_message = NULL; incoming_byte_stream->transport = TRANSPORT_FROM_PARSING(transport_parsing); incoming_byte_stream->stream = STREAM_FROM_PARSING(stream_parsing); + gpr_ref(&incoming_byte_stream->stream->global.active_streams); gpr_slice_buffer_init(&incoming_byte_stream->slices); incoming_byte_stream->on_next = NULL; incoming_byte_stream->is_tail = 1; From a3b54cdff87181193bf66022d204635ddb24cd80 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 25 Mar 2016 15:59:55 -0700 Subject: [PATCH 13/19] Get asan passing with new locking scheme --- src/core/iomgr/iomgr.c | 6 +- src/core/transport/chttp2/internal.h | 3 + src/core/transport/chttp2/stream_lists.c | 8 +++ src/core/transport/chttp2_transport.c | 82 +++++++++++++----------- src/core/transport/transport.h | 2 +- 5 files changed, 62 insertions(+), 39 deletions(-) diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 3ab4430668e..5a617e1e485 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -168,8 +168,10 @@ bool grpc_iomgr_abort_on_leaks(void) { if (env == NULL) return false; static const char *truthy[] = {"yes", "Yes", "YES", "true", "True", "TRUE", "1"}; + bool should_we = false; for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { - if (0 == strcmp(env, truthy[i])) return true; + if (0 == strcmp(env, truthy[i])) should_we = true; } - return false; + gpr_free(env); + return should_we; } diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 7f6c4d3de22..e447b0d4226 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -594,6 +594,9 @@ int grpc_chttp2_list_pop_waiting_for_concurrency( void grpc_chttp2_list_add_check_read_ops( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); +bool grpc_chttp2_list_remove_check_read_ops( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global); int grpc_chttp2_list_pop_check_read_ops( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 60fe735cfce..053ded1bc34 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -305,6 +305,14 @@ void grpc_chttp2_list_add_check_read_ops( GRPC_CHTTP2_LIST_CHECK_READ_OPS); } +bool grpc_chttp2_list_remove_check_read_ops( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global) { + return stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_CHECK_READ_OPS); +} + int grpc_chttp2_list_pop_check_read_ops( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global) { diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 8f660a537fd..d71061452ac 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -140,7 +140,10 @@ static void incoming_byte_stream_update_flow_control( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, size_t max_size_hint, size_t have_already); - +static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *byte_stream); static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_global *stream_global); @@ -534,12 +537,15 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, s->global.id) == NULL); } + while ( + (bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) { + incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); + } + grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global, &s->global); grpc_chttp2_list_remove_stalled_by_transport(&t->global, &s->global); - - /* TODO(ctiller): the remainder of this function could be done without the - global lock */ + grpc_chttp2_list_remove_check_read_ops(&t->global, &s->global); for (int i = 0; i < STREAM_LIST_COUNT; i++) { if (s->included[i]) { @@ -549,11 +555,6 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, } } - while ( - (bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) { - grpc_byte_stream_destroy(exec_ctx, bs); - } - GPR_ASSERT(s->global.send_initial_metadata_finished == NULL); GPR_ASSERT(s->global.send_message_finished == NULL); GPR_ASSERT(s->global.send_trailing_metadata_finished == NULL); @@ -1150,7 +1151,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, while (stream_global->seen_error && (bs = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames)) != NULL) { - grpc_byte_stream_destroy(exec_ctx, bs); + incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); } if (stream_global->incoming_frames.head != NULL) { *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop( @@ -1171,7 +1172,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, while (stream_global->seen_error && (bs = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames)) != NULL) { - grpc_byte_stream_destroy(exec_ctx, bs); + incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); } if (stream_global->all_incoming_byte_streams_finished) { grpc_chttp2_incoming_metadata_buffer_publish( @@ -1528,8 +1529,8 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success) { GPR_TIMER_BEGIN("reading_action.parse", 0); size_t i = 0; for (; i < t->read_buffer.count && - grpc_chttp2_perform_read(exec_ctx, &t->parsing, - t->read_buffer.slices[i]); + grpc_chttp2_perform_read(exec_ctx, &t->parsing, + t->read_buffer.slices[i]); i++) ; if (i != t->read_buffer.count) { @@ -1621,9 +1622,10 @@ static void connectivity_state_set( grpc_connectivity_state state, const char *reason) { GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_DEBUG, "set connectivity_state=%d", state)); - grpc_connectivity_state_set(exec_ctx, &TRANSPORT_FROM_GLOBAL(transport_global) - ->channel_callback.state_tracker, - state, reason); + grpc_connectivity_state_set( + exec_ctx, + &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker, + state, reason); } /******************************************************************************* @@ -1744,25 +1746,34 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, return 0; } -static void incoming_byte_stream_unref_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, - void *argp) { - grpc_chttp2_incoming_byte_stream *bs = argp; +static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, + grpc_chttp2_incoming_byte_stream *bs) { if (gpr_unref(&bs->refs)) { - decrement_active_streams_locked(exec_ctx, &bs->transport->global, - &bs->stream->global); gpr_slice_buffer_destroy(&bs->slices); gpr_free(bs); } } +static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream); + +static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *byte_stream) { + grpc_chttp2_incoming_byte_stream *bs = byte_stream; + GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy); + decrement_active_streams_locked(exec_ctx, &bs->transport->global, + &bs->stream->global); + incoming_byte_stream_unref(exec_ctx, bs); +} + static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) { grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, - incoming_byte_stream_unref_locked, bs, 0); + incoming_byte_stream_destroy_locked, bs, 0); } typedef struct { @@ -1794,12 +1805,6 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, sizeof(arg)); } -static void incoming_byte_stream_deactivate_locked( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_chttp2_incoming_byte_stream *bs) { - incoming_byte_stream_unref_locked(exec_ctx, t, s, bs); -} - static void incoming_byte_stream_finished_failed_locked( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, void *argp) { @@ -1807,7 +1812,7 @@ static void incoming_byte_stream_finished_failed_locked( grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, false, NULL); bs->on_next = NULL; bs->failed = 1; - incoming_byte_stream_deactivate_locked(exec_ctx, t, s, bs); + incoming_byte_stream_unref(exec_ctx, bs); } static void incoming_byte_stream_finished_ok_locked(grpc_exec_ctx *exec_ctx, @@ -1815,7 +1820,7 @@ static void incoming_byte_stream_finished_ok_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s, void *argp) { grpc_chttp2_incoming_byte_stream *bs = argp; - incoming_byte_stream_deactivate_locked(exec_ctx, t, s, bs); + incoming_byte_stream_unref(exec_ctx, bs); } void grpc_chttp2_incoming_byte_stream_finished( @@ -1965,10 +1970,15 @@ static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) { return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string); } -static const grpc_transport_vtable vtable = { - sizeof(grpc_chttp2_stream), "chttp2", init_stream, set_pollset, - perform_stream_op, perform_transport_op, destroy_stream, destroy_transport, - chttp2_get_peer}; +static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), + "chttp2", + init_stream, + set_pollset, + perform_stream_op, + perform_transport_op, + destroy_stream, + destroy_transport, + chttp2_get_peer}; grpc_transport *grpc_create_chttp2_transport( grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args, diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index ddb81f9a34b..c7778df47df 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -50,7 +50,7 @@ typedef struct grpc_transport grpc_transport; for a stream. */ typedef struct grpc_stream grpc_stream; -/*#define GRPC_STREAM_REFCOUNT_DEBUG*/ +#define GRPC_STREAM_REFCOUNT_DEBUG typedef struct grpc_stream_refcount { gpr_refcount refs; From 489b4744bee765e11825f0918c4f75bc9d09f1f1 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Sat, 26 Mar 2016 13:38:29 -0700 Subject: [PATCH 14/19] Fix copyrights --- src/core/channel/channel_stack.c | 2 +- src/core/transport/chttp2/frame_rst_stream.c | 2 +- src/core/transport/chttp2/frame_window_update.c | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 1869597975f..8fbf8438f66 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/transport/chttp2/frame_rst_stream.c b/src/core/transport/chttp2/frame_rst_stream.c index 8063dfbb211..a5655ce79ba 100644 --- a/src/core/transport/chttp2/frame_rst_stream.c +++ b/src/core/transport/chttp2/frame_rst_stream.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/transport/chttp2/frame_window_update.c b/src/core/transport/chttp2/frame_window_update.c index 4a6944eef78..f7ae6003c84 100644 --- a/src/core/transport/chttp2/frame_window_update.c +++ b/src/core/transport/chttp2/frame_window_update.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without From d12bdd11cb9e320fe9be2ab13c43d7c4f27094e0 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Sat, 26 Mar 2016 16:01:00 -0700 Subject: [PATCH 15/19] Fix inadvertent race --- src/core/transport/chttp2_transport.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index d71061452ac..227591bf18b 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1079,6 +1079,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(t->post_parsing_op == NULL); t->post_parsing_op = gpr_malloc(sizeof(*op)); memcpy(t->post_parsing_op, op, sizeof(*op)); + return; } grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); From caf606f0794818930ff9e531567d95cf8b20b9f8 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Sat, 26 Mar 2016 18:09:56 -0700 Subject: [PATCH 16/19] Turn off debug --- src/core/transport/transport.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index c7778df47df..ddb81f9a34b 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -50,7 +50,7 @@ typedef struct grpc_transport grpc_transport; for a stream. */ typedef struct grpc_stream grpc_stream; -#define GRPC_STREAM_REFCOUNT_DEBUG +/*#define GRPC_STREAM_REFCOUNT_DEBUG*/ typedef struct grpc_stream_refcount { gpr_refcount refs; From 6c8ae9aad5f1aaaf9c043817e184ec26bee75c86 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 6 Apr 2016 15:50:38 -0700 Subject: [PATCH 17/19] Fix new test --- test/core/end2end/tests/filter_causes_close.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/core/end2end/tests/filter_causes_close.c b/test/core/end2end/tests/filter_causes_close.c index ca54167b205..2de03a3b078 100644 --- a/test/core/end2end/tests/filter_causes_close.c +++ b/test/core/end2end/tests/filter_causes_close.c @@ -102,7 +102,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_destroy(f->cq); } -/* Request with a large amount of metadata.*/ +/* Simple request via a server filter that always closes the stream.*/ static void test_request(grpc_end2end_test_config config) { grpc_call *c; grpc_call *s; @@ -235,8 +235,8 @@ static void start_transport_stream_op(grpc_exec_ctx *exec_ctx, static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) {} -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) {} +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *and_free_memory) {} static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, From 9859d8d2969ab28541a91b77a321d9c7265c45fd Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 26 Apr 2016 21:07:53 -0700 Subject: [PATCH 18/19] Fix use-after-free --- src/core/lib/surface/call.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index f33688beef0..3f31e77247f 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -373,7 +373,6 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) { if (c->receiving_stream != NULL) { grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); } - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); gpr_mu_destroy(&c->mu); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (c->status[i].details) { @@ -391,7 +390,9 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) { if (c->cq) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } + grpc_channel *channel = c->channel; grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), c); + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); GPR_TIMER_END("destroy_call", 0); } From a93e5a49e3d04c2f72e320040a15121e4d88b5d3 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 26 Apr 2016 21:47:49 -0700 Subject: [PATCH 19/19] Fix use-after-free --- .../chttp2/transport/chttp2_transport.c | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 24448714a8d..fcf2abfe66e 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1668,6 +1668,14 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, * BYTE STREAM */ +static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, + grpc_chttp2_incoming_byte_stream *bs) { + if (gpr_unref(&bs->refs)) { + gpr_slice_buffer_destroy(&bs->slices); + gpr_free(bs); + } +} + static void incoming_byte_stream_update_flow_control( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, size_t max_size_hint, @@ -1738,6 +1746,7 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, bs->on_next = arg->on_complete; bs->next = arg->slice; } + incoming_byte_stream_unref(exec_ctx, bs); } static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, @@ -1747,20 +1756,13 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; incoming_byte_stream_next_arg arg = {bs, slice, max_size_hint, on_complete}; + gpr_ref(&bs->refs); grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, incoming_byte_stream_next_locked, &arg, sizeof(arg)); return 0; } -static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, - grpc_chttp2_incoming_byte_stream *bs) { - if (gpr_unref(&bs->refs)) { - gpr_slice_buffer_destroy(&bs->slices); - gpr_free(bs); - } -} - static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); @@ -1801,12 +1803,14 @@ static void incoming_byte_stream_push_locked(grpc_exec_ctx *exec_ctx, } else { gpr_slice_buffer_add(&bs->slices, arg->slice); } + incoming_byte_stream_unref(exec_ctx, bs); } void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, gpr_slice slice) { incoming_byte_stream_push_arg arg = {bs, slice}; + gpr_ref(&bs->refs); grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, incoming_byte_stream_push_locked, &arg, sizeof(arg));