diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index eeac3c146c3..d4ba9508185 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -251,7 +251,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); - GPR_ASSERT(op->set_accept_stream == NULL); + GPR_ASSERT(op->set_accept_stream == false); if (op->bind_pollset != NULL) { grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, op->bind_pollset); diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index b1e7155773f..83fcc3a87f5 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -107,7 +107,7 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); - GPR_ASSERT(op->set_accept_stream == NULL); + GPR_ASSERT(op->set_accept_stream == false); GPR_ASSERT(op->bind_pollset == NULL); if (op->on_connectivity_state_change != NULL) { diff --git a/src/core/surface/server.c b/src/core/surface/server.c index fb5e0d4b9e7..5b13d4ba526 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -407,8 +407,15 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) { maybe_finish_shutdown(exec_ctx, chand->server); chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb_arg = chand; - grpc_exec_ctx_enqueue(exec_ctx, &chand->finish_destroy_channel_closure, true, - NULL); + + grpc_transport_op op; + memset(&op, 0, sizeof(op)); + op.set_accept_stream = true; + op.on_consumed = &chand->finish_destroy_channel_closure; + grpc_channel_next_op(exec_ctx, + grpc_channel_stack_element( + grpc_channel_get_channel_stack(chand->channel), 0), + &op); } static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server, @@ -971,7 +978,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity"); memset(&op, 0, sizeof(op)); - op.set_accept_stream = accept_stream; + op.set_accept_stream = true; + op.set_accept_stream_fn = accept_stream; op.set_accept_stream_user_data = chand; op.on_connectivity_state_change = &chand->channel_connectivity_changed; op.connectivity_state = &chand->connectivity_state; diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 891aad6ef28..b720d1ab3e5 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -358,6 +358,9 @@ struct grpc_chttp2_transport { /** connectivity tracking */ grpc_connectivity_state_tracker state_tracker; } channel_callback; + + /** Transport op to be applied post-parsing */ + grpc_transport_op *post_parsing_op; }; typedef struct { diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index a1ca78d4c4f..369ff0ad7f6 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -951,12 +951,10 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, unlock(exec_ctx, t); } -static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_transport_op *op) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - int close_transport = 0; - - lock(t); +static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_transport_op *op) { + bool close_transport = false; grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); @@ -975,8 +973,8 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, close_transport = !grpc_chttp2_has_streams(t); } - if (op->set_accept_stream != NULL) { - t->channel_callback.accept_stream = op->set_accept_stream; + if (op->set_accept_stream) { + t->channel_callback.accept_stream = op->set_accept_stream_fn; t->channel_callback.accept_stream_user_data = op->set_accept_stream_user_data; } @@ -997,15 +995,29 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, close_transport_locked(exec_ctx, t); } - unlock(exec_ctx, t); - if (close_transport) { - lock(t); close_transport_locked(exec_ctx, t); - unlock(exec_ctx, t); } } +static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_transport_op *op) { + grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; + + lock(t); + + /* Let's be overly cautious: don't change any state while we're parsing */ + if (t->parsing_active) { + GPR_ASSERT(t->post_parsing_op == NULL); + t->post_parsing_op = gpr_malloc(sizeof(*op)); + memcpy(t->post_parsing_op, op, sizeof(*op)); + } else { + perform_transport_op_locked(exec_ctx, t, op); + } + + unlock(exec_ctx, t); +} + /******************************************************************************* * INPUT PROCESSING */ @@ -1401,6 +1413,13 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) { /* handle higher level things */ grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing); t->parsing_active = 0; + /* handle delayed transport ops (if there is one) */ + if (t->post_parsing_op) { + grpc_transport_op *op = t->post_parsing_op; + t->post_parsing_op = NULL; + perform_transport_op_locked(exec_ctx, t, op); + gpr_free(op); + } /* if a stream is in the stream map, and gets cancelled, we need to ensure * we are not parsing before continuing the cancellation to keep things in * a sane state */ diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 8902c5d2f65..7a007ef777b 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -139,8 +139,10 @@ typedef struct grpc_transport_op { gpr_slice *goaway_message; /** set the callback for accepting new streams; this is a permanent callback, unlike the other one-shot closures */ - void (*set_accept_stream)(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_transport *transport, const void *server_data); + bool set_accept_stream; + void (*set_accept_stream_fn)(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_transport *transport, + const void *server_data); void *set_accept_stream_user_data; /** add this transport to a pollset */ grpc_pollset *bind_pollset;