Each ThreadManager is a resource user

pull/16081/head
Sree Kuchibhotla 7 years ago
parent f2a57cdd75
commit ec1c112cc1
  1. 3
      include/grpcpp/server.h
  2. 2
      src/cpp/server/server_builder.cc
  3. 23
      src/cpp/server/server_cc.cc
  4. 36
      src/cpp/thread_manager/thread_manager.cc
  5. 19
      src/cpp/thread_manager/thread_manager.h
  6. 10
      test/cpp/thread_manager/thread_manager_test.cc

@ -144,7 +144,8 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
Server(int max_message_size, ChannelArguments* args,
std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
sync_server_cqs,
int min_pollers, int max_pollers, int sync_cq_timeout_msec);
grpc_resource_quota* server_rq, int min_pollers, int max_pollers,
int sync_cq_timeout_msec);
/// Start the server.
///

@ -261,7 +261,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
}
std::unique_ptr<Server> server(new Server(
max_receive_message_size_, &args, sync_server_cqs,
max_receive_message_size_, &args, sync_server_cqs, resource_quota_,
sync_server_settings_.min_pollers, sync_server_settings_.max_pollers,
sync_server_settings_.cq_timeout_msec));

@ -266,9 +266,9 @@ class Server::SyncRequestThreadManager : public ThreadManager {
public:
SyncRequestThreadManager(Server* server, CompletionQueue* server_cq,
std::shared_ptr<GlobalCallbacks> global_callbacks,
int min_pollers, int max_pollers,
int cq_timeout_msec)
: ThreadManager(min_pollers, max_pollers),
grpc_resource_quota* rq, int min_pollers,
int max_pollers, int cq_timeout_msec)
: ThreadManager("SyncServer", rq, min_pollers, max_pollers),
server_(server),
server_cq_(server_cq),
cq_timeout_msec_(cq_timeout_msec),
@ -376,7 +376,8 @@ Server::Server(
int max_receive_message_size, ChannelArguments* args,
std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
sync_server_cqs,
int min_pollers, int max_pollers, int sync_cq_timeout_msec)
grpc_resource_quota* server_rq, int min_pollers, int max_pollers,
int sync_cq_timeout_msec)
: max_receive_message_size_(max_receive_message_size),
sync_server_cqs_(std::move(sync_server_cqs)),
started_(false),
@ -392,10 +393,20 @@ Server::Server(
global_callbacks_->UpdateArguments(args);
if (sync_server_cqs_ != nullptr) {
bool default_rq_created = false;
if (server_rq == nullptr) {
server_rq = grpc_resource_quota_create("SyncServer-Default");
default_rq_created = true;
}
for (const auto& it : *sync_server_cqs_) {
sync_req_mgrs_.emplace_back(new SyncRequestThreadManager(
this, it.get(), global_callbacks_, min_pollers, max_pollers,
sync_cq_timeout_msec));
this, it.get(), global_callbacks_, server_rq, min_pollers,
max_pollers, sync_cq_timeout_msec));
}
if (default_rq_created) {
grpc_resource_quota_unref(server_rq);
}
}

@ -48,12 +48,16 @@ ThreadManager::WorkerThread::~WorkerThread() {
thd_.Join();
}
ThreadManager::ThreadManager(int min_pollers, int max_pollers)
ThreadManager::ThreadManager(const char* name,
grpc_resource_quota* resource_quota,
int min_pollers, int max_pollers)
: shutdown_(false),
num_pollers_(0),
min_pollers_(min_pollers),
max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers),
num_threads_(0) {}
num_threads_(0) {
resource_user_ = grpc_resource_user_create(resource_quota, name);
}
ThreadManager::~ThreadManager() {
{
@ -61,6 +65,7 @@ ThreadManager::~ThreadManager() {
GPR_ASSERT(num_threads_ == 0);
}
grpc_resource_user_unref(resource_user_);
CleanupCompletedThreads();
}
@ -113,9 +118,27 @@ void ThreadManager::Initialize() {
}
for (int i = 0; i < min_pollers_; i++) {
// Create a new thread (which ends up calling the MainWorkLoop() function
new WorkerThread(this);
if (!CreateNewThread(this)) {
gpr_log(GPR_ERROR,
"No quota available to create additional threads. Created %d (of "
"%d) threads",
i, min_pollers_);
break;
}
}
}
bool ThreadManager::CreateNewThread(ThreadManager* thd_mgr) {
if (!grpc_resource_user_alloc_threads(thd_mgr->resource_user_, 1)) {
return false;
}
// Create a new thread (which ends up calling the MainWorkLoop() function
new WorkerThread(thd_mgr);
return true;
}
void ThreadManager::ReleaseThread(ThreadManager* thd_mgr) {
grpc_resource_user_free_threads(thd_mgr->resource_user_, 1);
}
void ThreadManager::MainWorkLoop() {
@ -146,7 +169,7 @@ void ThreadManager::MainWorkLoop() {
num_threads_++;
// Drop lock before spawning thread to avoid contention
lock.unlock();
new WorkerThread(this);
CreateNewThread(this);
} else {
// Drop lock for consistency with above branch
lock.unlock();
@ -196,7 +219,10 @@ void ThreadManager::MainWorkLoop() {
}
};
// This thread is exiting. Do some cleanup work (i.e delete already completed
// worker threads and also release 1 thread back to the resource quota)
CleanupCompletedThreads();
ReleaseThread(this);
// If we are here, either ThreadManager is shutting down or it already has
// enough threads.

@ -27,12 +27,14 @@
#include <grpcpp/support/config.h>
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/resource_quota.h"
namespace grpc {
class ThreadManager {
public:
explicit ThreadManager(int min_pollers, int max_pollers);
explicit ThreadManager(const char* name, grpc_resource_quota* resource_quota,
int min_pollers, int max_pollers);
virtual ~ThreadManager();
// Initializes and Starts the Rpc Manager threads
@ -111,6 +113,13 @@ class ThreadManager {
void MarkAsCompleted(WorkerThread* thd);
void CleanupCompletedThreads();
// Checks the resource quota and if available, creates a thread and returns
// true. If quota is not available, returns false (and thread is not created)
static bool CreateNewThread(ThreadManager* thd_mgr);
// Give back a thread to the resource quota
static void ReleaseThread(ThreadManager* thd_mgr);
// Protects shutdown_, num_pollers_ and num_threads_
// TODO: sreek - Change num_pollers and num_threads_ to atomics
std::mutex mu_;
@ -118,6 +127,14 @@ class ThreadManager {
bool shutdown_;
std::condition_variable shutdown_cv_;
// The resource user object to use when requesting quota to create threads
//
// Note: The user of this ThreadManager object must create grpc_resource_quota
// object (that contains the actual max thread quota) and a grpc_resource_user
// object through which quota is requested whenver new threads need to be
// created
grpc_resource_user* resource_user_;
// Number of threads doing polling
int num_pollers_;

@ -32,8 +32,8 @@
namespace grpc {
class ThreadManagerTest final : public grpc::ThreadManager {
public:
ThreadManagerTest()
: ThreadManager(kMinPollers, kMaxPollers),
ThreadManagerTest(const char* name, grpc_resource_quota* rq)
: ThreadManager(name, rq, kMinPollers, kMaxPollers),
num_do_work_(0),
num_poll_for_work_(0),
num_work_found_(0) {}
@ -115,7 +115,11 @@ int main(int argc, char** argv) {
std::srand(std::time(nullptr));
grpc::testing::InitTest(&argc, &argv, true);
grpc::ThreadManagerTest test_rpc_manager;
grpc_resource_quota* rq = grpc_resource_quota_create("Test");
grpc::ThreadManagerTest test_rpc_manager("TestThreadManager", rq);
grpc_resource_quota_unref(rq);
test_rpc_manager.PerformTest();
return 0;

Loading…
Cancel
Save