From 4306eeee397760e11b416f43e881e7dfb87f88b0 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Wed, 21 Sep 2016 09:56:29 -0700 Subject: [PATCH] Minor changes --- src/cpp/rpcmanager/grpc_rpc_manager.cc | 8 +++--- src/cpp/server/server.cc | 34 ++++++++++++++++++++------ 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.cc b/src/cpp/rpcmanager/grpc_rpc_manager.cc index 4236fcefafc..c47f76b5afc 100644 --- a/src/cpp/rpcmanager/grpc_rpc_manager.cc +++ b/src/cpp/rpcmanager/grpc_rpc_manager.cc @@ -34,6 +34,7 @@ #include #include #include +#include #include "src/cpp/rpcmanager/grpc_rpc_manager.h" @@ -58,7 +59,7 @@ GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers) : shutdown_(false), num_pollers_(0), min_pollers_(min_pollers), - max_pollers_(max_pollers), + max_pollers_(max_pollers == -1 ? INT_MAX: max_pollers), num_threads_(0) {} GrpcRpcManager::~GrpcRpcManager() { @@ -111,6 +112,7 @@ void GrpcRpcManager::Initialize() { // below the maximum threshold, we can let the current thread continue as poller bool GrpcRpcManager::MaybeContinueAsPoller() { std::unique_lock lock(mu_); + if (shutdown_ || num_pollers_ > max_pollers_) { return false; } @@ -169,8 +171,8 @@ void GrpcRpcManager::MainWorkLoop() { } } while (MaybeContinueAsPoller()); - // If we are here, it means that the GrpcRpcManager already has enough threads - // and that the current thread can be terminated + // If we are here, either GrpcRpcManager is shutting down or it already has + // enough threads. In both cases, current thread can be terminated { grpc::unique_lock lock(mu_); num_threads_--; diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 28b874d9fb4..21debcc7482 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -306,7 +306,14 @@ class Server::SyncRequestManager : public GrpcRpcManager { void DoWork(void* tag, bool ok) GRPC_OVERRIDE { SyncRequest* sync_req = static_cast(tag); - if (ok && sync_req) { + + if (!sync_req) { + // No tag. Nothing to work on + // TODO (sreek) - Log a warning here since this is an unlikely case + return; + } + + if (ok) { SyncRequest::CallData cd(server_, sync_req); { sync_req->SetupRequest(); @@ -318,9 +325,13 @@ class Server::SyncRequestManager : public GrpcRpcManager { } GPR_TIMER_SCOPE("cd.Run()", 0); cd.Run(global_callbacks_); + } else { + // ok is false. For some reason, the tag was returned but event was not + // successful. In this case, request again unless we are shutting down + if (!IsShutdown()) { + sync_req->Request(server_->c_server(), server_cq_->cq()); + } } - - // TODO (sreek): If ok == false, log an error } void AddSyncMethod(RpcServiceMethod* method, void* tag) { @@ -395,7 +406,15 @@ Server::~Server() { lock.unlock(); Shutdown(); } else if (!started_) { - // TODO (sreek): Shutdown all cqs + // TODO (sreek): Check if we can just do this once in ~Server() (i.e + // Do not 'shutdown' queues in Shutdown() function and do it here in the + // destructor + for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end(); + it++) { + (*it).Shutdown(); + } + + // TODO (sreek) Delete this /* cq_.Shutdown(); */ @@ -511,7 +530,7 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { (*it)->Start(); } - /* TODO (Sreek) - Do this for all cqs */ + /* TODO (Sreek) - No longer needed (being done in (*it)->Start above) */ /* // Start processing rpcs. if (!sync_methods_->empty()) { @@ -527,7 +546,7 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { return true; } -// TODO (sreek) - Reimplement this +/* TODO (sreek) check if started_ and shutdown_ are needed anymore */ void Server::ShutdownInternal(gpr_timespec deadline) { grpc::unique_lock lock(mu_); if (started_ && !shutdown_) { @@ -564,7 +583,8 @@ void Server::ShutdownInternal(gpr_timespec deadline) { } // Shutdown the completion queues - // TODO (sreek) Move this into SyncRequestManager + // TODO (sreek) Move this into SyncRequestManager (or move it to Server + // destructor) for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end(); it++) { (*it).Shutdown();