From 9c5318aba097dc8634626ba0b106fc8993ce32b5 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 5 Dec 2016 15:07:04 -0800 Subject: [PATCH 1/2] Fix shutdown problems with sync server --- src/core/lib/surface/call.c | 4 ++++ src/cpp/server/server_cc.cc | 12 ++++++------ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 1e0f3eeca52..8ca3cab9d57 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1551,6 +1551,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } + /* IF this is a server, then GRPC_OP_RECV_INITIAL_METADATA *must* come + from server.c. In that case, it's coming from accept_stream, and in + that case we're not necessarily covered by a poller. */ + stream_op->covered_by_poller = call->is_client; call->received_initial_metadata = 1; call->buffered_metadata[0] = op->data.recv_initial_metadata; grpc_closure_init(&call->receiving_initial_metadata_ready, diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index b7cfd6dbf11..3ec7faddada 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -510,12 +510,6 @@ void Server::ShutdownInternal(gpr_timespec deadline) { ShutdownTag shutdown_tag; // Dummy shutdown tag grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); - // Shutdown all ThreadManagers. This will try to gracefully stop all the - // threads in the ThreadManagers (once they process any inflight requests) - for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { - (*it)->Shutdown(); // ThreadManager's Shutdown() - } - shutdown_cq.Shutdown(); void* tag; @@ -531,6 +525,12 @@ void Server::ShutdownInternal(gpr_timespec deadline) { // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has // successfully shutdown + // Shutdown all ThreadManagers. This will try to gracefully stop all the + // threads in the ThreadManagers (once they process any inflight requests) + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->Shutdown(); // ThreadManager's Shutdown() + } + // Wait for threads in all ThreadManagers to terminate for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { (*it)->Wait(); From 4ee9bb06b92a7cdd5f87240b0ec882c368d325dd Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 5 Dec 2016 15:37:47 -0800 Subject: [PATCH 2/2] Fix race if connector is shutdown while connecting --- .../ext/transport/chttp2/client/chttp2_connector.c | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c index 213395c20f1..568b114d641 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.c +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -56,6 +56,7 @@ typedef struct { gpr_refcount refs; bool shutdown; + bool connecting; char *server_name; grpc_chttp2_create_handshakers_func create_handshakers; @@ -103,7 +104,9 @@ static void chttp2_connector_shutdown(grpc_exec_ctx *exec_ctx, } // If handshaking is not yet in progress, shutdown the endpoint. // Otherwise, the handshaker will do this for us. - if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint); + if (!c->connecting && c->endpoint != NULL) { + grpc_endpoint_shutdown(exec_ctx, c->endpoint); + } gpr_mu_unlock(&c->mu); } @@ -192,6 +195,8 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { chttp2_connector *c = arg; gpr_mu_lock(&c->mu); + GPR_ASSERT(c->connecting); + c->connecting = false; if (error != GRPC_ERROR_NONE || c->shutdown) { if (error == GRPC_ERROR_NONE) { error = GRPC_ERROR_CREATE("connector shutdown"); @@ -202,6 +207,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_closure *notify = c->notify; c->notify = NULL; grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint); gpr_mu_unlock(&c->mu); chttp2_connector_unref(exec_ctx, arg); } else { @@ -235,6 +241,8 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx, GPR_ASSERT(c->endpoint == NULL); chttp2_connector_ref(con); // Ref taken for callback. grpc_closure_init(&c->connected, connected, c); + GPR_ASSERT(!c->connecting); + c->connecting = true; grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint, args->interested_parties, args->channel_args, args->addr, args->deadline);