pull/501/head
Craig Tiller 10 years ago
parent 14a65f9760
commit 0db1befae1
  1. 8
      include/grpc++/server.h
  2. 4
      include/grpc++/server_builder.h
  3. 0
      include/grpc/support/cpu.h
  4. 33
      src/cpp/server/server.cc
  5. 25
      src/cpp/server/server_builder.cc

@ -70,15 +70,15 @@ class Server {
friend class ServerBuilder; friend class ServerBuilder;
// ServerBuilder use only // ServerBuilder use only
Server(ThreadPoolInterface* thread_pool, ServerCredentials* creds); Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, ServerCredentials* creds);
Server(); Server();
// Register a service. This call does not take ownership of the service. // Register a service. This call does not take ownership of the service.
// The service must exist for the lifetime of the Server instance. // The service must exist for the lifetime of the Server instance.
void RegisterService(RpcService* service); bool RegisterService(RpcService* service);
// Add a listening port. Can be called multiple times. // Add a listening port. Can be called multiple times.
void AddPort(const grpc::string& addr); int AddPort(const grpc::string& addr);
// Start the server. // Start the server.
void Start(); bool Start();
void AllowOneRpc(); void AllowOneRpc();
void HandleQueueClosed(); void HandleQueueClosed();

@ -57,7 +57,7 @@ class ServerBuilder {
// BuildAndStart(). // BuildAndStart().
void RegisterService(SynchronousService* service); void RegisterService(SynchronousService* service);
void ReigsterAsyncService(AsynchronousService *service); void RegisterAsyncService(AsynchronousService *service);
// Add a listening port. Can be called multiple times. // Add a listening port. Can be called multiple times.
void AddPort(const grpc::string& addr); void AddPort(const grpc::string& addr);
@ -78,7 +78,7 @@ class ServerBuilder {
std::vector<AsynchronousService*> async_services_; std::vector<AsynchronousService*> async_services_;
std::vector<grpc::string> ports_; std::vector<grpc::string> ports_;
std::shared_ptr<ServerCredentials> creds_; std::shared_ptr<ServerCredentials> creds_;
ThreadPoolInterface* thread_pool_; ThreadPoolInterface* thread_pool_ = nullptr;
}; };
} // namespace grpc } // namespace grpc

@ -38,24 +38,20 @@
#include <grpc/grpc_security.h> #include <grpc/grpc_security.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "src/cpp/server/server_rpc_handler.h" #include "src/cpp/server/server_rpc_handler.h"
#include "src/cpp/server/thread_pool.h"
#include <grpc++/async_server_context.h> #include <grpc++/async_server_context.h>
#include <grpc++/completion_queue.h> #include <grpc++/completion_queue.h>
#include <grpc++/impl/rpc_service_method.h> #include <grpc++/impl/rpc_service_method.h>
#include <grpc++/server_credentials.h> #include <grpc++/server_credentials.h>
#include <grpc++/thread_pool_interface.h>
namespace grpc { namespace grpc {
// TODO(rocking): consider a better default value like num of cores. Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned, ServerCredentials *creds)
static const int kNumThreads = 4;
Server::Server(ThreadPoolInterface *thread_pool, ServerCredentials *creds)
: started_(false), : started_(false),
shutdown_(false), shutdown_(false),
num_running_cb_(0), num_running_cb_(0),
thread_pool_(thread_pool == nullptr ? new ThreadPool(kNumThreads) thread_pool_(thread_pool),
: thread_pool), thread_pool_owned_(thread_pool_owned),
thread_pool_owned_(thread_pool == nullptr),
secure_(creds != nullptr) { secure_(creds != nullptr) {
if (creds) { if (creds) {
server_ = server_ =
@ -82,31 +78,35 @@ Server::~Server() {
} }
} }
void Server::RegisterService(RpcService *service) { bool Server::RegisterService(RpcService *service) {
for (int i = 0; i < service->GetMethodCount(); ++i) { for (int i = 0; i < service->GetMethodCount(); ++i) {
RpcServiceMethod *method = service->GetMethod(i); RpcServiceMethod *method = service->GetMethod(i);
if (method_map_.find(method->name()) != method_map_.end()) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name());
return false;
}
method_map_.insert(std::make_pair(method->name(), method)); method_map_.insert(std::make_pair(method->name(), method));
} }
} }
void Server::AddPort(const grpc::string &addr) { int Server::AddPort(const grpc::string &addr) {
GPR_ASSERT(!started_); GPR_ASSERT(!started_);
int success;
if (secure_) { if (secure_) {
success = grpc_server_add_secure_http2_port(server_, addr.c_str()); return grpc_server_add_secure_http2_port(server_, addr.c_str());
} else { } else {
success = grpc_server_add_http2_port(server_, addr.c_str()); return grpc_server_add_http2_port(server_, addr.c_str());
} }
GPR_ASSERT(success);
} }
void Server::Start() { bool Server::Start() {
GPR_ASSERT(!started_); GPR_ASSERT(!started_);
started_ = true; started_ = true;
grpc_server_start(server_); grpc_server_start(server_);
// Start processing rpcs. // Start processing rpcs.
ScheduleCallback(); ScheduleCallback();
return true;
} }
void Server::AllowOneRpc() { void Server::AllowOneRpc() {
@ -141,8 +141,7 @@ void Server::ScheduleCallback() {
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
num_running_cb_++; num_running_cb_++;
} }
std::function<void()> callback = std::bind(&Server::RunRpc, this); thread_pool_->ScheduleCallback(std::bind(&Server::RunRpc, this));
thread_pool_->ScheduleCallback(callback);
} }
void Server::RunRpc() { void Server::RunRpc() {

@ -33,13 +33,15 @@
#include <grpc++/server_builder.h> #include <grpc++/server_builder.h>
#include <grpc/support/cpu.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc++/impl/service_type.h> #include <grpc++/impl/service_type.h>
#include <grpc++/server.h> #include <grpc++/server.h>
#include "src/cpp/server/thread_pool.h"
namespace grpc { namespace grpc {
ServerBuilder::ServerBuilder() : thread_pool_(nullptr) {} ServerBuilder::ServerBuilder() {}
void ServerBuilder::RegisterService(SynchronousService *service) { void ServerBuilder::RegisterService(SynchronousService *service) {
services_.push_back(service->service()); services_.push_back(service->service());
@ -64,14 +66,27 @@ void ServerBuilder::SetThreadPool(ThreadPoolInterface *thread_pool) {
} }
std::unique_ptr<Server> ServerBuilder::BuildAndStart() { std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
std::unique_ptr<Server> server(new Server(thread_pool_, creds_.get())); bool thread_pool_owned = false;
if (!thread_pool_ && services_.size()) {
int cores = gpr_cpu_num_cores();
if (!cores) cores = 4;
thread_pool_ = new ThreadPool(cores);
thread_pool_owned = true;
}
std::unique_ptr<Server> server(new Server(thread_pool_, thread_pool_owned, creds_.get()));
for (auto *service : services_) { for (auto *service : services_) {
server->RegisterService(service); if (!server->RegisterService(service)) {
return nullptr;
}
} }
for (auto &port : ports_) { for (auto &port : ports_) {
server->AddPort(port); if (!server->AddPort(port)) {
return nullptr;
}
}
if (!server->Start()) {
return nullptr;
} }
server->Start();
return server; return server;
} }

Loading…
Cancel
Save