Merge pull request #20808 from vjpai/thread_mgr_race

Properly handle thread creation race: roll-forward of #20376
pull/20826/head
Vijay Pai 5 years ago committed by GitHub
commit a6c7b66f75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      src/cpp/thread_manager/thread_manager.cc
  2. 4
      src/cpp/thread_manager/thread_manager.h

@ -34,8 +34,10 @@ ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr)
thd_ = grpc_core::Thread( thd_ = grpc_core::Thread(
"grpcpp_sync_server", "grpcpp_sync_server",
[](void* th) { static_cast<ThreadManager::WorkerThread*>(th)->Run(); }, [](void* th) { static_cast<ThreadManager::WorkerThread*>(th)->Run(); },
this); this, &created_);
thd_.Start(); if (!created_) {
gpr_log(GPR_ERROR, "Could not create grpc_sync_server worker-thread");
}
} }
void ThreadManager::WorkerThread::Run() { void ThreadManager::WorkerThread::Run() {
@ -139,7 +141,9 @@ void ThreadManager::Initialize() {
} }
for (int i = 0; i < min_pollers_; i++) { for (int i = 0; i < min_pollers_; i++) {
new WorkerThread(this); WorkerThread* worker = new WorkerThread(this);
GPR_ASSERT(worker->created()); // Must be able to create the minimum
worker->Start();
} }
} }
@ -177,7 +181,15 @@ void ThreadManager::MainWorkLoop() {
} }
// Drop lock before spawning thread to avoid contention // Drop lock before spawning thread to avoid contention
lock.Unlock(); lock.Unlock();
new WorkerThread(this); WorkerThread* worker = new WorkerThread(this);
if (worker->created()) {
worker->Start();
} else {
num_pollers_--;
num_threads_--;
resource_exhausted = true;
delete worker;
}
} else if (num_pollers_ > 0) { } else if (num_pollers_ > 0) {
// There is still at least some thread polling, so we can go on // There is still at least some thread polling, so we can go on
// even though we are below the number of pollers that we would // even though we are below the number of pollers that we would

@ -124,6 +124,9 @@ class ThreadManager {
WorkerThread(ThreadManager* thd_mgr); WorkerThread(ThreadManager* thd_mgr);
~WorkerThread(); ~WorkerThread();
bool created() const { return created_; }
void Start() { thd_.Start(); }
private: private:
// Calls thd_mgr_->MainWorkLoop() and once that completes, calls // Calls thd_mgr_->MainWorkLoop() and once that completes, calls
// thd_mgr_>MarkAsCompleted(this) to mark the thread as completed // thd_mgr_>MarkAsCompleted(this) to mark the thread as completed
@ -131,6 +134,7 @@ class ThreadManager {
ThreadManager* const thd_mgr_; ThreadManager* const thd_mgr_;
grpc_core::Thread thd_; grpc_core::Thread thd_;
bool created_;
}; };
// The main function in ThreadManager // The main function in ThreadManager

Loading…
Cancel
Save