Threading robustness

Move server startup to a separate thread.
Where there is no opportunity for failure, do not return bool.
pull/10583/head
Craig Tiller 8 years ago
parent 380c3a7e1e
commit 9d9313cfc6
  1. 2
      include/grpc++/impl/codegen/server_interface.h
  2. 2
      include/grpc++/server.h
  3. 19
      src/core/lib/surface/server.c
  4. 5
      src/cpp/server/server_builder.cc
  5. 4
      src/cpp/server/server_cc.cc

@ -124,7 +124,7 @@ class ServerInterface : public CallHook {
/// \param num_cqs How many completion queues does \a cqs hold.
///
/// \return true on a successful shutdown.
virtual bool Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0;
virtual void Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0;
virtual void ShutdownInternal(gpr_timespec deadline) = 0;

@ -179,7 +179,7 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen {
/// \param num_cqs How many completion queues does \a cqs hold.
///
/// \return true on a successful shutdown.
bool Start(ServerCompletionQueue** cqs, size_t num_cqs) override;
void Start(ServerCompletionQueue** cqs, size_t num_cqs) override;
void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) override;

@ -44,6 +44,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/stack_lockfree.h"
@ -1077,8 +1078,16 @@ void *grpc_server_register_method(
return m;
}
static void start_listeners(grpc_exec_ctx *exec_ctx, void *s,
grpc_error *error) {
grpc_server *server = s;
for (listener *l = server->listeners; l; l = l->next) {
l->start(exec_ctx, server, l->arg, server->pollsets, server->pollset_count);
}
server_unref(exec_ctx, server);
}
void grpc_server_start(grpc_server *server) {
listener *l;
size_t i;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@ -1112,10 +1121,10 @@ void grpc_server_start(grpc_server *server) {
(size_t)server->max_requested_calls_per_cq, server);
}
for (l = server->listeners; l; l = l->next) {
l->start(&exec_ctx, server, l->arg, server->pollsets,
server->pollset_count);
}
server_ref(server);
grpc_closure_sched(&exec_ctx, grpc_closure_create(start_listeners, server,
grpc_executor_scheduler),
GRPC_ERROR_NONE);
grpc_exec_ctx_finish(&exec_ctx);
}

@ -337,10 +337,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
}
auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0];
if (!server->Start(cqs_data, cqs_.size())) {
if (added_port) server->Shutdown();
return nullptr;
}
server->Start(cqs_data, cqs_.size());
for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
(*plugin)->Finish(initializer);

@ -494,7 +494,7 @@ int Server::AddListeningPort(const grpc::string& addr,
return port;
}
bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
GPR_ASSERT(!started_);
global_callbacks_->PreServerStart(this);
started_ = true;
@ -530,8 +530,6 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Start();
}
return true;
}
void Server::ShutdownInternal(gpr_timespec deadline) {

Loading…
Cancel
Save