Minor changes to GrpcRpcManager

pull/7466/head
Sree Kuchibhotla 9 years ago
parent 8600438d54
commit 0ba41907a2
  1. 9
      src/cpp/rpcmanager/grpc_rpc_manager.cc
  2. 8
      src/cpp/rpcmanager/grpc_rpc_manager.h
  3. 9
      test/cpp/rpcmanager/grpc_rpc_manager_test.cc
  4. 4
      test/cpp/rpcmanager/grpc_rpc_manager_test.h

@ -83,7 +83,7 @@ void GrpcRpcManager::Wait() {
} }
// For testing only // For testing only
void GrpcRpcManager::Shutdown() { void GrpcRpcManager::ShutdownRpcManager() {
std::unique_lock<grpc::mutex> lock(mu_); std::unique_lock<grpc::mutex> lock(mu_);
shutdown_ = true; shutdown_ = true;
} }
@ -131,9 +131,10 @@ void GrpcRpcManager::MaybeCreatePoller() {
void GrpcRpcManager::MainWorkLoop() { void GrpcRpcManager::MainWorkLoop() {
bool is_work_found = false; bool is_work_found = false;
void *tag;
do { do {
PollForWork(is_work_found); PollForWork(is_work_found, &tag);
// Decrement num_pollers since this thread is no longer polling // Decrement num_pollers since this thread is no longer polling
{ {
@ -146,7 +147,7 @@ void GrpcRpcManager::MainWorkLoop() {
MaybeCreatePoller(); MaybeCreatePoller();
// Do actual work // Do actual work
DoWork(); DoWork(tag);
} }
// Continue to loop if this thread can continue as a poller // Continue to loop if this thread can continue as a poller
@ -158,7 +159,7 @@ void GrpcRpcManager::MainWorkLoop() {
grpc::unique_lock<grpc::mutex> lock(mu_); grpc::unique_lock<grpc::mutex> lock(mu_);
num_threads_--; num_threads_--;
if (num_threads_ == 0) { if (num_threads_ == 0) {
shutdown_cv_.notify_one(); shutdown_cv_.notify_all();
} }
} }

@ -50,12 +50,12 @@ class GrpcRpcManager {
// This function MUST be called before using the object // This function MUST be called before using the object
void Initialize(); void Initialize();
virtual void PollForWork(bool& is_work_found) = 0; virtual void PollForWork(bool& is_work_found, void **tag) = 0;
virtual void DoWork() = 0; virtual void DoWork(void *tag) = 0;
// Use this for testing purposes only // Use the following two functions for testing purposes only
void Wait(); void Wait();
void Shutdown(); void ShutdownRpcManager();
private: private:
// Helper wrapper class around std::thread. This takes a GrpcRpcManager object // Helper wrapper class around std::thread. This takes a GrpcRpcManager object

@ -45,14 +45,15 @@ using grpc::testing::GrpcRpcManagerTest;
// TODO: sreek - Rewrite this test. Find a better test case // TODO: sreek - Rewrite this test. Find a better test case
void GrpcRpcManagerTest::PollForWork(bool& is_work_found) { void GrpcRpcManagerTest::PollForWork(bool& is_work_found, void **tag) {
{ {
std::unique_lock<grpc::mutex> lock(mu_); std::unique_lock<grpc::mutex> lock(mu_);
std::cout << "Poll: " << std::this_thread::get_id() << std::endl; std::cout << "Poll: " << std::this_thread::get_id() << std::endl;
} }
is_work_found = true; is_work_found = true;
*tag = NULL;
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); std::this_thread::sleep_for(std::chrono::milliseconds(10));
{ {
std::unique_lock<grpc::mutex> lock(mu_); std::unique_lock<grpc::mutex> lock(mu_);
@ -60,12 +61,12 @@ void GrpcRpcManagerTest::PollForWork(bool& is_work_found) {
if (num_calls_ > 50) { if (num_calls_ > 50) {
std::cout << "poll: False" << std::endl; std::cout << "poll: False" << std::endl;
is_work_found = false; is_work_found = false;
Shutdown(); ShutdownRpcManager();
} }
} }
} }
void GrpcRpcManagerTest::DoWork() { void GrpcRpcManagerTest::DoWork(void *tag) {
{ {
std::unique_lock<grpc::mutex> lock(mu_); std::unique_lock<grpc::mutex> lock(mu_);
std::cout << "Work: " << std::this_thread::get_id() << std::endl; std::cout << "Work: " << std::this_thread::get_id() << std::endl;

@ -43,8 +43,8 @@ class GrpcRpcManagerTest GRPC_FINAL : public GrpcRpcManager {
GrpcRpcManagerTest(int min_pollers, int max_pollers, int max_threads) GrpcRpcManagerTest(int min_pollers, int max_pollers, int max_threads)
: GrpcRpcManager(min_pollers, max_pollers, max_threads), num_calls_(0){}; : GrpcRpcManager(min_pollers, max_pollers, max_threads), num_calls_(0){};
void PollForWork(bool &is_work_found) GRPC_OVERRIDE; void PollForWork(bool &is_work_found, void **tag) GRPC_OVERRIDE;
void DoWork() GRPC_OVERRIDE; void DoWork(void *tag) GRPC_OVERRIDE;
private: private:
grpc::mutex mu_; grpc::mutex mu_;

Loading…
Cancel
Save