diff --git a/include/grpcpp/server.h b/include/grpcpp/server.h index 81c3907f86d..189cf8accf7 100644 --- a/include/grpcpp/server.h +++ b/include/grpcpp/server.h @@ -144,7 +144,8 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { Server(int max_message_size, ChannelArguments* args, std::shared_ptr>> sync_server_cqs, - int min_pollers, int max_pollers, int sync_cq_timeout_msec); + grpc_resource_quota* server_rq, int min_pollers, int max_pollers, + int sync_cq_timeout_msec); /// Start the server. /// diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index e0b9b7a62bd..0ab3cd0e323 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -261,7 +261,7 @@ std::unique_ptr ServerBuilder::BuildAndStart() { } std::unique_ptr server(new Server( - max_receive_message_size_, &args, sync_server_cqs, + max_receive_message_size_, &args, sync_server_cqs, resource_quota_, sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec)); diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 0d77510e29d..6e6e0bfffe3 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -266,9 +266,9 @@ class Server::SyncRequestThreadManager : public ThreadManager { public: SyncRequestThreadManager(Server* server, CompletionQueue* server_cq, std::shared_ptr global_callbacks, - int min_pollers, int max_pollers, - int cq_timeout_msec) - : ThreadManager(min_pollers, max_pollers), + grpc_resource_quota* rq, int min_pollers, + int max_pollers, int cq_timeout_msec) + : ThreadManager("SyncServer", rq, min_pollers, max_pollers), server_(server), server_cq_(server_cq), cq_timeout_msec_(cq_timeout_msec), @@ -376,7 +376,8 @@ Server::Server( int max_receive_message_size, ChannelArguments* args, std::shared_ptr>> sync_server_cqs, - int min_pollers, int max_pollers, int sync_cq_timeout_msec) + grpc_resource_quota* server_rq, int min_pollers, int max_pollers, + int sync_cq_timeout_msec) : max_receive_message_size_(max_receive_message_size), sync_server_cqs_(std::move(sync_server_cqs)), started_(false), @@ -392,10 +393,20 @@ Server::Server( global_callbacks_->UpdateArguments(args); if (sync_server_cqs_ != nullptr) { + bool default_rq_created = false; + if (server_rq == nullptr) { + server_rq = grpc_resource_quota_create("SyncServer-Default"); + default_rq_created = true; + } + for (const auto& it : *sync_server_cqs_) { sync_req_mgrs_.emplace_back(new SyncRequestThreadManager( - this, it.get(), global_callbacks_, min_pollers, max_pollers, - sync_cq_timeout_msec)); + this, it.get(), global_callbacks_, server_rq, min_pollers, + max_pollers, sync_cq_timeout_msec)); + } + + if (default_rq_created) { + grpc_resource_quota_unref(server_rq); } } diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc index 02ac56a3fdc..c0fa98798a0 100644 --- a/src/cpp/thread_manager/thread_manager.cc +++ b/src/cpp/thread_manager/thread_manager.cc @@ -48,12 +48,16 @@ ThreadManager::WorkerThread::~WorkerThread() { thd_.Join(); } -ThreadManager::ThreadManager(int min_pollers, int max_pollers) +ThreadManager::ThreadManager(const char* name, + grpc_resource_quota* resource_quota, + int min_pollers, int max_pollers) : shutdown_(false), num_pollers_(0), min_pollers_(min_pollers), max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers), - num_threads_(0) {} + num_threads_(0) { + resource_user_ = grpc_resource_user_create(resource_quota, name); +} ThreadManager::~ThreadManager() { { @@ -61,6 +65,7 @@ ThreadManager::~ThreadManager() { GPR_ASSERT(num_threads_ == 0); } + grpc_resource_user_unref(resource_user_); CleanupCompletedThreads(); } @@ -113,9 +118,27 @@ void ThreadManager::Initialize() { } for (int i = 0; i < min_pollers_; i++) { - // Create a new thread (which ends up calling the MainWorkLoop() function - new WorkerThread(this); + if (!CreateNewThread(this)) { + gpr_log(GPR_ERROR, + "No quota available to create additional threads. Created %d (of " + "%d) threads", + i, min_pollers_); + break; + } + } +} + +bool ThreadManager::CreateNewThread(ThreadManager* thd_mgr) { + if (!grpc_resource_user_alloc_threads(thd_mgr->resource_user_, 1)) { + return false; } + // Create a new thread (which ends up calling the MainWorkLoop() function + new WorkerThread(thd_mgr); + return true; +} + +void ThreadManager::ReleaseThread(ThreadManager* thd_mgr) { + grpc_resource_user_free_threads(thd_mgr->resource_user_, 1); } void ThreadManager::MainWorkLoop() { @@ -146,7 +169,7 @@ void ThreadManager::MainWorkLoop() { num_threads_++; // Drop lock before spawning thread to avoid contention lock.unlock(); - new WorkerThread(this); + CreateNewThread(this); } else { // Drop lock for consistency with above branch lock.unlock(); @@ -196,7 +219,10 @@ void ThreadManager::MainWorkLoop() { } }; + // This thread is exiting. Do some cleanup work (i.e delete already completed + // worker threads and also release 1 thread back to the resource quota) CleanupCompletedThreads(); + ReleaseThread(this); // If we are here, either ThreadManager is shutting down or it already has // enough threads. diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h index 5a40f2de477..23bd38ee4f1 100644 --- a/src/cpp/thread_manager/thread_manager.h +++ b/src/cpp/thread_manager/thread_manager.h @@ -27,12 +27,14 @@ #include #include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/iomgr/resource_quota.h" namespace grpc { class ThreadManager { public: - explicit ThreadManager(int min_pollers, int max_pollers); + explicit ThreadManager(const char* name, grpc_resource_quota* resource_quota, + int min_pollers, int max_pollers); virtual ~ThreadManager(); // Initializes and Starts the Rpc Manager threads @@ -111,6 +113,13 @@ class ThreadManager { void MarkAsCompleted(WorkerThread* thd); void CleanupCompletedThreads(); + // Checks the resource quota and if available, creates a thread and returns + // true. If quota is not available, returns false (and thread is not created) + static bool CreateNewThread(ThreadManager* thd_mgr); + + // Give back a thread to the resource quota + static void ReleaseThread(ThreadManager* thd_mgr); + // Protects shutdown_, num_pollers_ and num_threads_ // TODO: sreek - Change num_pollers and num_threads_ to atomics std::mutex mu_; @@ -118,6 +127,14 @@ class ThreadManager { bool shutdown_; std::condition_variable shutdown_cv_; + // The resource user object to use when requesting quota to create threads + // + // Note: The user of this ThreadManager object must create grpc_resource_quota + // object (that contains the actual max thread quota) and a grpc_resource_user + // object through which quota is requested whenver new threads need to be + // created + grpc_resource_user* resource_user_; + // Number of threads doing polling int num_pollers_; diff --git a/test/cpp/thread_manager/thread_manager_test.cc b/test/cpp/thread_manager/thread_manager_test.cc index 7a95a9f17da..cf2cf770e6b 100644 --- a/test/cpp/thread_manager/thread_manager_test.cc +++ b/test/cpp/thread_manager/thread_manager_test.cc @@ -32,8 +32,8 @@ namespace grpc { class ThreadManagerTest final : public grpc::ThreadManager { public: - ThreadManagerTest() - : ThreadManager(kMinPollers, kMaxPollers), + ThreadManagerTest(const char* name, grpc_resource_quota* rq) + : ThreadManager(name, rq, kMinPollers, kMaxPollers), num_do_work_(0), num_poll_for_work_(0), num_work_found_(0) {} @@ -115,7 +115,11 @@ int main(int argc, char** argv) { std::srand(std::time(nullptr)); grpc::testing::InitTest(&argc, &argv, true); - grpc::ThreadManagerTest test_rpc_manager; + + grpc_resource_quota* rq = grpc_resource_quota_create("Test"); + grpc::ThreadManagerTest test_rpc_manager("TestThreadManager", rq); + grpc_resource_quota_unref(rq); + test_rpc_manager.PerformTest(); return 0;