Cleanup server_cc.cc

pull/8269/head
Sree Kuchibhotla 8 years ago
parent acd64db4d9
commit 33382d0f53
  1. 6
      include/grpc++/impl/codegen/server_interface.h
  2. 6
      include/grpc++/server.h
  3. 2
      src/cpp/rpcmanager/grpc_rpc_manager.h
  4. 212
      src/cpp/server/server_cc.cc

@ -126,12 +126,6 @@ class ServerInterface : public CallHook {
/// \return true on a successful shutdown.
virtual bool Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0;
/// Process one or more incoming calls.
virtual void RunRpc() = 0;
/// Schedule \a RunRpc to run in the threadpool.
virtual void ScheduleCallback() = 0;
virtual void ShutdownInternal(gpr_timespec deadline) = 0;
virtual int max_receive_message_size() const = 0;

@ -176,12 +176,6 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
/// \return true on a successful shutdown.
bool Start(ServerCompletionQueue** cqs, size_t num_cqs) GRPC_OVERRIDE;
/// Process one or more incoming calls.
void RunRpc() GRPC_OVERRIDE;
/// Schedule \a RunRpc to run in the threadpool.
void ScheduleCallback() GRPC_OVERRIDE;
void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
void ShutdownInternal(gpr_timespec deadline) GRPC_OVERRIDE;

@ -47,7 +47,7 @@ class GrpcRpcManager {
explicit GrpcRpcManager(int min_pollers, int max_pollers);
virtual ~GrpcRpcManager();
// This function MUST be called before using the object
// Initializes and Starts the Rpc Manager threads
void Initialize();
// The return type of PollForWork() function

@ -118,15 +118,6 @@ class Server::UnimplementedAsyncResponse GRPC_FINAL
UnimplementedAsyncRequest* const request_;
};
// TODO (sreek) - This might no longer be needed
class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) {
delete this;
return false;
}
};
class ShutdownTag : public CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) { return false; }
@ -153,40 +144,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_metadata_array_destroy(&request_metadata_);
}
// TODO (Sreek) This function is probably no longer needed
static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
void* tag = nullptr;
*ok = false;
if (!cq->Next(&tag, ok)) {
return nullptr;
}
auto* mrd = static_cast<SyncRequest*>(tag);
GPR_ASSERT(mrd->in_flight_);
return mrd;
}
// TODO (sreek) - This function is probably no longer needed
static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
gpr_timespec deadline) {
void* tag = nullptr;
*ok = false;
switch (cq->AsyncNext(&tag, ok, deadline)) {
case CompletionQueue::TIMEOUT:
*req = nullptr;
return true;
case CompletionQueue::SHUTDOWN:
*req = nullptr;
return false;
case CompletionQueue::GOT_EVENT:
*req = static_cast<SyncRequest*>(tag);
GPR_ASSERT((*req)->in_flight_);
return true;
}
GPR_UNREACHABLE_CODE(return false);
}
// TODO (sreek) - Refactor this SetupRequest/TeardownRequest and ResetRequest
// functions
void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
void TeardownRequest() {
@ -194,8 +151,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
cq_ = nullptr;
}
void ResetRequest() { in_flight_ = false; }
void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
@ -319,32 +274,29 @@ class Server::SyncRequestManager : public GrpcRpcManager {
SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
if (!sync_req) {
// No tag. Nothing to work on
// TODO (sreek) - Log a warning here since this is an unlikely case
// No tag. Nothing to work on. This is an unlikley scenario and possibly a
// bug in RPC Manager implementation.
gpr_log(GPR_ERROR, "Sync server. DoWork() was called with NULL tag");
return;
}
if (ok) {
// Calldata takes ownership of the completion queue inside sync_req
SyncRequest::CallData cd(server_, sync_req);
{
sync_req->SetupRequest();
// Prepare for the next request
if (!IsShutdown()) {
sync_req->SetupRequest(); // Create new completion queue for sync_req
sync_req->Request(server_->c_server(), server_cq_->cq());
} else {
sync_req->TeardownRequest();
}
}
GPR_TIMER_SCOPE("cd.Run()", 0);
cd.Run(global_callbacks_);
} else {
sync_req->ResetRequest();
// ok is false. For some reason, the tag was returned but event was not
// successful. In this case, request again unless we are shutting down
if (!IsShutdown()) {
// TODO (sreek) Remove this
// sync_req->Request(server_->c_server(), server_cq_->cq());
}
}
// TODO (sreek) If ok is false here (which it isn't in case of
// grpc_request_registered_call), we should still re-queue the request
// object
}
void AddSyncMethod(RpcServiceMethod* method, void* tag) {
@ -428,8 +380,6 @@ Server::Server(
Server::~Server() {
{
// TODO (sreek) Check if we can just call Shutdown() even in case where
// started_ == false. This will make things much simpler
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
lock.unlock();
@ -442,12 +392,6 @@ Server::~Server() {
}
}
// TODO(sreek) Do thisfor all cqs ?
/*
void* got_tag;
bool ok;
GPR_ASSERT(!cq_.Next(&got_tag, &ok));
*/
grpc_server_destroy(server_);
}
@ -551,19 +495,6 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
(*it)->Start();
}
/* TODO (Sreek) - No longer needed (being done in (*it)->Start above) */
/*
// Start processing rpcs.
if (!sync_methods_->empty()) {
for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
m->SetupRequest();
m->Request(server_, cq_.cq());
}
GrpcRpcManager::Initialize();
}
*/
return true;
}
@ -608,48 +539,8 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// Drain the shutdown queue (if the previous call to AsyncNext() timed out
// and we didn't remove the tag from the queue yet)
while (shutdown_cq.Next(&tag, &ok)) {
// Nothing to be done here
}
/*
grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
cq_.Shutdown();
lock.unlock();
*/
// TODO (sreek) Delete this
/*
GrpcRpcManager::ShutdownRpcManager();
GrpcRpcManager::Wait();
*/
// Spin, eating requests until the completion queue is completely shutdown.
// If the deadline expires then cancel anything that's pending and keep
// spinning forever until the work is actually drained.
// Since nothing else needs to touch state guarded by mu_, holding it
// through this loop is fine.
//
/*
SyncRequest* request;
bool ok;
while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
if (request == NULL) { // deadline expired
grpc_server_cancel_all_calls(server_);
deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
} else if (ok) {
SyncRequest::CallData call_data(this, request);
}
// Nothing to be done here. Just ignore ok and tag values
}
lock.lock();
*/
/* TODO (sreek) - Remove this block */
// Wait for running callbacks to finish.
/*
while (num_running_cb_ != 0) {
callback_cv_.wait(lock);
}
*/
shutdown_notified_ = true;
shutdown_cv_.notify_all();
@ -774,87 +665,6 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
request_->stream()->call_.PerformOps(this);
}
// TODO: sreek - Remove this function
void Server::ScheduleCallback() {
GPR_ASSERT(false);
/*
{
grpc::unique_lock<grpc::mutex> lock(mu_);
num_running_cb_++;
}
thread_pool_->Add(std::bind(&Server::RunRpc, this));
*/
}
// TODO: sreek - Remove this function
void Server::RunRpc() {
GPR_ASSERT(false);
/*
// Wait for one more incoming rpc.
bool ok;
GPR_TIMER_SCOPE("Server::RunRpc", 0);
auto* mrd = SyncRequest::Wait(&cq_, &ok);
if (mrd) {
ScheduleCallback();
if (ok) {
SyncRequest::CallData cd(this, mrd);
{
mrd->SetupRequest();
grpc::unique_lock<grpc::mutex> lock(mu_);
if (!shutdown_) {
mrd->Request(server_, cq_.cq());
} else {
// destroy the structure that was created
mrd->TeardownRequest();
}
}
GPR_TIMER_SCOPE("cd.Run()", 0);
cd.Run(global_callbacks_);
}
}
{
grpc::unique_lock<grpc::mutex> lock(mu_);
num_running_cb_--;
if (shutdown_) {
callback_cv_.notify_all();
}
}
*/
}
/* TODO (sreek) Move this to SyncRequestManager */
/*
void Server::PollForWork(bool& is_work_found, void** tag) {
is_work_found = true;
*tag = nullptr;
auto* mrd = SyncRequest::Wait(&cq_, &is_work_found);
if (is_work_found) {
*tag = mrd;
}
}
void Server::DoWork(void* tag) {
auto* mrd = static_cast<SyncRequest*>(tag);
if (mrd) {
SyncRequest::CallData cd(this, mrd);
{
mrd->SetupRequest();
grpc::unique_lock<grpc::mutex> lock(mu_);
if (!shutdown_) {
mrd->Request(server_, cq_.cq());
} else {
// destroy the structure that was created
mrd->TeardownRequest();
}
}
GPR_TIMER_SCOPE("cd.Run()", 0);
cd.Run(global_callbacks_);
}
}
*/
ServerInitializer* Server::initializer() { return server_initializer_.get(); }
} // namespace grpc

Loading…
Cancel
Save