From 9d9313cfc650e79b44f95d9b7fd9e75e2dc38111 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 11 Apr 2017 15:05:46 -0700 Subject: [PATCH 1/3] Threading robustness Move server startup to a separate thread. Where there is no opportunity for failure, do not return bool. --- .../grpc++/impl/codegen/server_interface.h | 2 +- include/grpc++/server.h | 2 +- src/core/lib/surface/server.c | 19 ++++++++++++++----- src/cpp/server/server_builder.cc | 5 +---- src/cpp/server/server_cc.cc | 4 +--- 5 files changed, 18 insertions(+), 14 deletions(-) diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h index bd1b36e8839..79750117047 100644 --- a/include/grpc++/impl/codegen/server_interface.h +++ b/include/grpc++/impl/codegen/server_interface.h @@ -124,7 +124,7 @@ class ServerInterface : public CallHook { /// \param num_cqs How many completion queues does \a cqs hold. /// /// \return true on a successful shutdown. - virtual bool Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0; + virtual void Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0; virtual void ShutdownInternal(gpr_timespec deadline) = 0; diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 489937712ec..af90a5c70b7 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -179,7 +179,7 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { /// \param num_cqs How many completion queues does \a cqs hold. /// /// \return true on a successful shutdown. - bool Start(ServerCompletionQueue** cqs, size_t num_cqs) override; + void Start(ServerCompletionQueue** cqs, size_t num_cqs) override; void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) override; diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 191ee75252d..6ac3181482a 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -44,6 +44,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/stack_lockfree.h" @@ -1077,8 +1078,16 @@ void *grpc_server_register_method( return m; } +static void start_listeners(grpc_exec_ctx *exec_ctx, void *s, + grpc_error *error) { + grpc_server *server = s; + for (listener *l = server->listeners; l; l = l->next) { + l->start(exec_ctx, server, l->arg, server->pollsets, server->pollset_count); + } + server_unref(exec_ctx, server); +} + void grpc_server_start(grpc_server *server) { - listener *l; size_t i; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -1112,10 +1121,10 @@ void grpc_server_start(grpc_server *server) { (size_t)server->max_requested_calls_per_cq, server); } - for (l = server->listeners; l; l = l->next) { - l->start(&exec_ctx, server, l->arg, server->pollsets, - server->pollset_count); - } + server_ref(server); + grpc_closure_sched(&exec_ctx, grpc_closure_create(start_listeners, server, + grpc_executor_scheduler), + GRPC_ERROR_NONE); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 4eb4b5a1b21..3a15afc49e5 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -337,10 +337,7 @@ std::unique_ptr ServerBuilder::BuildAndStart() { } auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0]; - if (!server->Start(cqs_data, cqs_.size())) { - if (added_port) server->Shutdown(); - return nullptr; - } + server->Start(cqs_data, cqs_.size()); for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { (*plugin)->Finish(initializer); diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index ce173a1ee2d..43e9531f17f 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -494,7 +494,7 @@ int Server::AddListeningPort(const grpc::string& addr, return port; } -bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { +void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { GPR_ASSERT(!started_); global_callbacks_->PreServerStart(this); started_ = true; @@ -530,8 +530,6 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { (*it)->Start(); } - - return true; } void Server::ShutdownInternal(gpr_timespec deadline) { From bd3cc405290c8f4d4e1ea42b15652e172b34410f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 12 Apr 2017 08:06:44 -0700 Subject: [PATCH 2/3] Add synchronization so startup completes before shutdown begins --- src/core/lib/surface/server.c | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 6ac3181482a..da4dd703d58 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -212,6 +212,11 @@ struct grpc_server { gpr_mu mu_global; /* mutex for server and channel state */ gpr_mu mu_call; /* mutex for call-specific state */ + /* startup synchronization: flag is protected by mu_global, signals whether + we are doing the listener start routine or not */ + bool starting; + gpr_cv starting_cv; + registered_method *registered_methods; /** one request matcher for unregistered methods */ request_matcher unregistered_request_matcher; @@ -389,6 +394,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { grpc_channel_args_destroy(exec_ctx, server->channel_args); gpr_mu_destroy(&server->mu_global); gpr_mu_destroy(&server->mu_call); + gpr_cv_destroy(&server->starting_cv); while ((rm = server->registered_methods) != NULL) { server->registered_methods = rm->next; if (server->started) { @@ -1022,6 +1028,7 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { gpr_mu_init(&server->mu_global); gpr_mu_init(&server->mu_call); + gpr_cv_init(&server->starting_cv); /* decremented by grpc_server_destroy */ gpr_ref_init(&server->internal_refcount, 1); @@ -1084,6 +1091,12 @@ static void start_listeners(grpc_exec_ctx *exec_ctx, void *s, for (listener *l = server->listeners; l; l = l->next) { l->start(exec_ctx, server, l->arg, server->pollsets, server->pollset_count); } + + gpr_mu_lock(&server->mu_global); + server->starting = false; + gpr_cv_signal(&server->starting_cv); + gpr_mu_unlock(&server->mu_global); + server_unref(exec_ctx, server); } @@ -1122,6 +1135,7 @@ void grpc_server_start(grpc_server *server) { } server_ref(server); + server->starting = true; grpc_closure_sched(&exec_ctx, grpc_closure_create(start_listeners, server, grpc_executor_scheduler), GRPC_ERROR_NONE); @@ -1258,8 +1272,14 @@ void grpc_server_shutdown_and_notify(grpc_server *server, GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3, (server, cq, tag)); - /* lock, and gather up some stuff to do */ + /* wait for startup to be finished: locks mu_global */ gpr_mu_lock(&server->mu_global); + while (server->starting) { + gpr_cv_wait(&server->starting_cv, &server->mu_global, + gpr_inf_future(GPR_CLOCK_REALTIME)); + } + + /* stay locked, and gather up some stuff to do */ grpc_cq_begin_op(cq, tag); if (server->shutdown_published) { grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, From a103f7bbdfd1d8834d8386644d7f5e2d7b8d1a90 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 13 Apr 2017 09:25:44 -0700 Subject: [PATCH 3/3] update comments --- include/grpc++/impl/codegen/server_interface.h | 2 -- include/grpc++/server.h | 2 -- 2 files changed, 4 deletions(-) diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h index 79750117047..31d55529efc 100644 --- a/include/grpc++/impl/codegen/server_interface.h +++ b/include/grpc++/impl/codegen/server_interface.h @@ -122,8 +122,6 @@ class ServerInterface : public CallHook { /// caller is required to keep all completion queues live until the server is /// destroyed. /// \param num_cqs How many completion queues does \a cqs hold. - /// - /// \return true on a successful shutdown. virtual void Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0; virtual void ShutdownInternal(gpr_timespec deadline) = 0; diff --git a/include/grpc++/server.h b/include/grpc++/server.h index af90a5c70b7..6d5f89d360a 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -177,8 +177,6 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { /// caller is required to keep all completion queues live until the server is /// destroyed. /// \param num_cqs How many completion queues does \a cqs hold. - /// - /// \return true on a successful shutdown. void Start(ServerCompletionQueue** cqs, size_t num_cqs) override; void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) override;