fix shutdown crash

pull/7466/head
Sree Kuchibhotla 8 years ago
parent 18d3ace7db
commit 862acb9f3a
  1. 3
      include/grpc++/server.h
  2. 2
      src/cpp/rpcmanager/grpc_rpc_manager.cc
  3. 61
      src/cpp/server/server_cc.cc

@ -208,9 +208,6 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
bool shutdown_;
bool shutdown_notified_;
/// The completion queue to use for server shutdown completion notification
CompletionQueue shutdown_cq_;
// TODO (sreek) : Remove num_running_cb_ and callback_cv_;
// The number of threads which are running callbacks.
// int num_running_cb_;

@ -64,8 +64,6 @@ GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers)
GrpcRpcManager::~GrpcRpcManager() {
std::unique_lock<grpc::mutex> lock(mu_);
// ShutdownRpcManager() and Wait() must be called before destroying the object
GPR_ASSERT(shutdown_);
GPR_ASSERT(num_threads_ == 0);
CleanupCompletedThreads();

@ -129,9 +129,7 @@ class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
class ShutdownTag : public CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool *status) {
return false;
}
bool FinalizeResult(void** tag, bool* status) { return false; }
};
class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
@ -196,9 +194,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
cq_ = nullptr;
}
void ResetRequest() {
in_flight_ = false;
}
void ResetRequest() { in_flight_ = false; }
void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(cq_ && !in_flight_);
@ -301,7 +297,7 @@ class Server::SyncRequestManager : public GrpcRpcManager {
server_cq_(server_cq),
global_callbacks_(global_callbacks) {}
static const int kRpcPollingTimeoutMsec = 500;
static const int kRpcPollingTimeoutMsec = 10;
WorkStatus PollForWork(void** tag, bool* ok) GRPC_OVERRIDE {
*tag = nullptr;
@ -368,6 +364,17 @@ class Server::SyncRequestManager : public GrpcRpcManager {
}
}
void ShutdownAndDrainCompletionQueue() {
server_cq_->Shutdown();
// Drain any pending items from the queue
void* tag;
bool ok;
while (server_cq_->Next(&tag, &ok)) {
// Nothing to be done here
}
}
void Start() {
if (!sync_methods_.empty()) {
for (auto m = sync_methods_.begin(); m != sync_methods_.end(); m++) {
@ -420,23 +427,17 @@ Server::Server(
Server::~Server() {
{
// TODO (sreek) Check if we can just call Shutdown() even in case where
// started_ == false. This will make things much simpler
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
lock.unlock();
Shutdown();
} else if (!started_) {
// TODO (sreek): Check if we can just do this once in ~Server() (i.e
// Do not 'shutdown' queues in Shutdown() function and do it here in the
// destructor
for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
it++) {
(*it)->Shutdown();
// Shutdown the completion queues
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->ShutdownAndDrainCompletionQueue();
}
// TODO (sreek) Delete this
/*
cq_.Shutdown();
*/
}
}
@ -571,8 +572,10 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
if (started_ && !shutdown_) {
shutdown_ = true;
/// The completion queue to use for server shutdown completion notification
CompletionQueue shutdown_cq;
ShutdownTag shutdown_tag; // Dummy shutdown tag
grpc_server_shutdown_and_notify(server_, shutdown_cq_.cq(), &shutdown_tag);
grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
// Shutdown all RpcManagers. This will try to gracefully stop all the
// threads in the RpcManagers (once they process any inflight requests)
@ -580,16 +583,15 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
(*it)->ShutdownRpcManager();
}
shutdown_cq_.Shutdown();
shutdown_cq.Shutdown();
void* tag;
bool ok;
CompletionQueue::NextStatus status =
shutdown_cq_.AsyncNext(&tag, &ok, deadline);
shutdown_cq.AsyncNext(&tag, &ok, deadline);
// If this timed out, it means we are done with the grace-period for
// a clean shutdown. We should force a shutdown now by cancelling all
// inflight calls
// If this timed out, it means we are done with the grace period for a clean
// shutdown. We should force a shutdown now by cancelling all inflight calls
if (status == CompletionQueue::NextStatus::TIMEOUT) {
grpc_server_cancel_all_calls(server_);
}
@ -599,14 +601,13 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// Wait for threads in all RpcManagers to terminate
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Wait();
(*it)->ShutdownAndDrainCompletionQueue();
}
// Shutdown the completion queues
// TODO (sreek) Move this into SyncRequestManager (or move it to Server
// destructor)
for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
it++) {
(*it)->Shutdown();
// Drain the shutdown queue (if the previous call to AsyncNext() timed out
// and we didn't remove the tag from the queue yet)
while(shutdown_cq.Next(&tag, &ok)) {
// Nothing to be done here
}
/*

Loading…
Cancel
Save