From d7f12e30a0050b3fecb72fe7ad558b4d837e140f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 3 Mar 2016 10:08:31 -0800 Subject: [PATCH 1/4] Fix accept_stream being called post-channel deletion - Have the server clear the accept_stream callback prior to destroying the channel (required a small transport op protocol change) - Have the transport not enact transport ops until parsing is completed (prevents accept_stream from disappearing mid-parse) --- src/core/channel/client_channel.c | 2 +- src/core/channel/client_uchannel.c | 2 +- src/core/surface/server.c | 14 +++++++-- src/core/transport/chttp2/internal.h | 3 ++ src/core/transport/chttp2_transport.c | 43 +++++++++++++++++++-------- src/core/transport/transport.h | 6 ++-- 6 files changed, 51 insertions(+), 19 deletions(-) diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index eeac3c146c3..d4ba9508185 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -251,7 +251,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); - GPR_ASSERT(op->set_accept_stream == NULL); + GPR_ASSERT(op->set_accept_stream == false); if (op->bind_pollset != NULL) { grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, op->bind_pollset); diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index b1e7155773f..83fcc3a87f5 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -107,7 +107,7 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); - GPR_ASSERT(op->set_accept_stream == NULL); + GPR_ASSERT(op->set_accept_stream == false); GPR_ASSERT(op->bind_pollset == NULL); if (op->on_connectivity_state_change != NULL) { diff --git a/src/core/surface/server.c b/src/core/surface/server.c index fb5e0d4b9e7..5b13d4ba526 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -407,8 +407,15 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) { maybe_finish_shutdown(exec_ctx, chand->server); chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb_arg = chand; - grpc_exec_ctx_enqueue(exec_ctx, &chand->finish_destroy_channel_closure, true, - NULL); + + grpc_transport_op op; + memset(&op, 0, sizeof(op)); + op.set_accept_stream = true; + op.on_consumed = &chand->finish_destroy_channel_closure; + grpc_channel_next_op(exec_ctx, + grpc_channel_stack_element( + grpc_channel_get_channel_stack(chand->channel), 0), + &op); } static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server, @@ -971,7 +978,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity"); memset(&op, 0, sizeof(op)); - op.set_accept_stream = accept_stream; + op.set_accept_stream = true; + op.set_accept_stream_fn = accept_stream; op.set_accept_stream_user_data = chand; op.on_connectivity_state_change = &chand->channel_connectivity_changed; op.connectivity_state = &chand->connectivity_state; diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 891aad6ef28..b720d1ab3e5 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -358,6 +358,9 @@ struct grpc_chttp2_transport { /** connectivity tracking */ grpc_connectivity_state_tracker state_tracker; } channel_callback; + + /** Transport op to be applied post-parsing */ + grpc_transport_op *post_parsing_op; }; typedef struct { diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index a1ca78d4c4f..369ff0ad7f6 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -951,12 +951,10 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, unlock(exec_ctx, t); } -static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_transport_op *op) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - int close_transport = 0; - - lock(t); +static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_transport_op *op) { + bool close_transport = false; grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); @@ -975,8 +973,8 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, close_transport = !grpc_chttp2_has_streams(t); } - if (op->set_accept_stream != NULL) { - t->channel_callback.accept_stream = op->set_accept_stream; + if (op->set_accept_stream) { + t->channel_callback.accept_stream = op->set_accept_stream_fn; t->channel_callback.accept_stream_user_data = op->set_accept_stream_user_data; } @@ -997,15 +995,29 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, close_transport_locked(exec_ctx, t); } - unlock(exec_ctx, t); - if (close_transport) { - lock(t); close_transport_locked(exec_ctx, t); - unlock(exec_ctx, t); } } +static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_transport_op *op) { + grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; + + lock(t); + + /* Let's be overly cautious: don't change any state while we're parsing */ + if (t->parsing_active) { + GPR_ASSERT(t->post_parsing_op == NULL); + t->post_parsing_op = gpr_malloc(sizeof(*op)); + memcpy(t->post_parsing_op, op, sizeof(*op)); + } else { + perform_transport_op_locked(exec_ctx, t, op); + } + + unlock(exec_ctx, t); +} + /******************************************************************************* * INPUT PROCESSING */ @@ -1401,6 +1413,13 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) { /* handle higher level things */ grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing); t->parsing_active = 0; + /* handle delayed transport ops (if there is one) */ + if (t->post_parsing_op) { + grpc_transport_op *op = t->post_parsing_op; + t->post_parsing_op = NULL; + perform_transport_op_locked(exec_ctx, t, op); + gpr_free(op); + } /* if a stream is in the stream map, and gets cancelled, we need to ensure * we are not parsing before continuing the cancellation to keep things in * a sane state */ diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 8902c5d2f65..7a007ef777b 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -139,8 +139,10 @@ typedef struct grpc_transport_op { gpr_slice *goaway_message; /** set the callback for accepting new streams; this is a permanent callback, unlike the other one-shot closures */ - void (*set_accept_stream)(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_transport *transport, const void *server_data); + bool set_accept_stream; + void (*set_accept_stream_fn)(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_transport *transport, + const void *server_data); void *set_accept_stream_user_data; /** add this transport to a pollset */ grpc_pollset *bind_pollset; From 389297dedfe384246d4831a4fc33e7af7b0200e5 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 3 Mar 2016 21:02:54 -0800 Subject: [PATCH 2/4] Document some things --- src/core/transport/transport.h | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 7a007ef777b..5a90bd6d387 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -123,7 +123,8 @@ typedef struct grpc_transport_stream_op { /** Transport op: a set of operations to perform on a transport as a whole */ typedef struct grpc_transport_op { - /** called when processing of this op is done */ + /** Called when processing of this op is done. + Only one transport_op is allowed to be outstanding at any time. */ grpc_closure *on_consumed; /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */ grpc_closure *on_connectivity_state_change; @@ -138,7 +139,9 @@ typedef struct grpc_transport_op { grpc_status_code goaway_status; gpr_slice *goaway_message; /** set the callback for accepting new streams; - this is a permanent callback, unlike the other one-shot closures */ + this is a permanent callback, unlike the other one-shot closures. + If true, the callback is set to set_accept_stream_fn, with its + user_data argument set to set_accept_stream_user_data */ bool set_accept_stream; void (*set_accept_stream_fn)(grpc_exec_ctx *exec_ctx, void *user_data, grpc_transport *transport, From 1bd9ea407fd673f6c795a524c6454bc5db339a85 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 3 Mar 2016 21:09:31 -0800 Subject: [PATCH 3/4] Refine condition --- src/core/transport/chttp2_transport.c | 5 +++-- src/core/transport/transport.h | 3 +-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 369ff0ad7f6..cf1dbfa0e59 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1006,8 +1006,9 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, lock(t); - /* Let's be overly cautious: don't change any state while we're parsing */ - if (t->parsing_active) { + /* If there's a set_accept_stream ensure that we're not parsing + to avoid changing things out from underneath */ + if (t->parsing_active && t->set_accept_stream) { GPR_ASSERT(t->post_parsing_op == NULL); t->post_parsing_op = gpr_malloc(sizeof(*op)); memcpy(t->post_parsing_op, op, sizeof(*op)); diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 5a90bd6d387..ed6e121c9cb 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -123,8 +123,7 @@ typedef struct grpc_transport_stream_op { /** Transport op: a set of operations to perform on a transport as a whole */ typedef struct grpc_transport_op { - /** Called when processing of this op is done. - Only one transport_op is allowed to be outstanding at any time. */ + /** Called when processing of this op is done. */ grpc_closure *on_consumed; /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */ grpc_closure *on_connectivity_state_change; From 687bf6295f5e3a02f249aced69868e497c1bcc30 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 3 Mar 2016 21:18:48 -0800 Subject: [PATCH 4/4] Fix typo --- src/core/transport/chttp2_transport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index cf1dbfa0e59..643e3feeb97 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1008,7 +1008,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, /* If there's a set_accept_stream ensure that we're not parsing to avoid changing things out from underneath */ - if (t->parsing_active && t->set_accept_stream) { + if (t->parsing_active && op->set_accept_stream) { GPR_ASSERT(t->post_parsing_op == NULL); t->post_parsing_op = gpr_malloc(sizeof(*op)); memcpy(t->post_parsing_op, op, sizeof(*op));