Fix, restore draining

reviewable/pr10712/r1
Craig Tiller 8 years ago
parent 991c101de8
commit a3e87894f2
  1. 12
      src/cpp/server/server_cc.cc
  2. 88
      src/cpp/thread_manager/thread_manager.cc
  3. 6
      src/cpp/thread_manager/thread_manager.h

@ -330,7 +330,17 @@ class Server::SyncRequestThreadManager : public ThreadManager {
void Shutdown() override {
server_cq_->Shutdown();
ThreadManager::Shutdown();
ThreadManager::Shutdown();
}
void Wait() override {
ThreadManager::Wait();
// Drain any pending items from the queue
void* tag;
bool ok;
while (server_cq_->Next(&tag, &ok)) {
// Do nothing
}
}
void Start() {

@ -100,6 +100,8 @@ void ThreadManager::MarkAsCompleted(WorkerThread* thd) {
void ThreadManager::CleanupCompletedThreads() {
std::list<WorkerThread*> completed_threads;
{
// swap out the completed threads list: allows other threads to clean up
// more quickly
std::unique_lock<std::mutex> lock(list_mu_);
completed_threads.swap(completed_threads_);
}
@ -112,20 +114,6 @@ void ThreadManager::Initialize() {
}
}
// If the number of pollers (i.e threads currently blocked in PollForWork()) is
// less than max threshold (i.e max_pollers_) and the total number of threads is
// below the maximum threshold, we can let the current thread continue as poller
bool ThreadManager::MaybeContinueAsPoller(bool work_found) {
std::unique_lock<std::mutex> lock(mu_);
gpr_log(GPR_DEBUG, "s=%d wf=%d np=%d mp=%d", shutdown_, work_found, num_pollers_, max_pollers_);
if (shutdown_ || (!work_found && num_pollers_ > max_pollers_)) {
return false;
}
num_pollers_++;
return true;
}
// Create a new poller if the current number of pollers i.e num_pollers_ (i.e
// threads currently blocked in PollForWork()) is below the threshold (i.e
// min_pollers_) and the total number of threads is below the maximum threshold
@ -143,48 +131,48 @@ void ThreadManager::MaybeCreatePoller() {
}
void ThreadManager::MainWorkLoop() {
void* tag;
bool ok;
/*
1. Poll for work (i.e PollForWork())
2. After returning from PollForWork, reduce the number of pollers by 1. If
PollForWork() returned a TIMEOUT, then it may indicate that we have more
polling threads than needed. Check if the number of pollers is greater
than min_pollers and if so, terminate the thread.
3. Since we are short of one poller now, see if a new poller has to be
created (i.e see MaybeCreatePoller() for more details)
4. Do the actual work (DoWork())
5. After doing the work, see it this thread can resume polling work (i.e
see MaybeContinueAsPoller() for more details) */
WorkStatus work_status;
while (true) {
bool done = false;
work_status = PollForWork(&tag, &ok);
void* tag;
bool ok;
WorkStatus work_status = PollForWork(&tag, &ok);
std::unique_lock<std::mutex> lock(mu_);
// Reduce the number of pollers by 1 and check what happened with the poll
num_pollers_--;
gpr_log(GPR_DEBUG, "%p: work_status:%d num_pollers:%d min_pollers:%d max_pollers:%d num_threads:%d shutdown:%d", this, work_status, num_pollers_, min_pollers_, max_pollers_, num_threads_, shutdown_);
bool done = false;
switch (work_status) {
case TIMEOUT:
if (shutdown_ || num_pollers_ >= max_pollers_) done = true;
break;
case SHUTDOWN: done = true; break;
case WORK_FOUND:
if (!shutdown_ && num_pollers_ < min_pollers_) {
num_pollers_++;
num_threads_++;
lock.unlock();
new WorkerThread(this);
} else {
lock.unlock();
}
DoWork(tag, ok);
lock.lock();
if (shutdown_) done = true;
break;
case TIMEOUT:
// If we timed out and we have more pollers than we need (or we are
// shutdown), finish this thread
if (shutdown_ || num_pollers_ > max_pollers_) done = true;
break;
case SHUTDOWN:
// If the thread manager is shutdown, finish this thread
done = true;
break;
case WORK_FOUND:
// If we got work and there are now insufficient pollers, start a new
// one
if (!shutdown_ && num_pollers_ < min_pollers_) {
num_pollers_++;
num_threads_++;
lock.unlock();
new WorkerThread(this);
} else {
lock.unlock();
}
DoWork(tag, ok);
lock.lock();
// If we're shutdown, we should finish at this point.
// If not, there's a chance that we'll exceed the max poller count: that
// is explicitly ok - we'll decrease after one poll timeout, and prevent
// some thrashing starting up and shutting down threads
if (shutdown_) done = true;
break;
}
if (done) break;
// If we decided to finish the thread, break out of the while loop
if (done) break;
// ... otherwise increase poller count and continue
num_pollers_++;
};

@ -96,7 +96,7 @@ class ThreadManager {
// A blocking call that returns only after the ThreadManager has shutdown and
// all the threads have drained all the outstanding work
void Wait();
virtual void Wait();
private:
// Helper wrapper class around std::thread. This takes a ThreadManager object
@ -126,10 +126,6 @@ class ThreadManager {
// minimum number of pollers needed (i.e min_pollers).
void MaybeCreatePoller();
// Returns true if the current thread can resume as a poller. i.e if the
// current number of pollers is less than the max_pollers.
bool MaybeContinueAsPoller(bool work_found);
void MarkAsCompleted(WorkerThread* thd);
void CleanupCompletedThreads();

Loading…
Cancel
Save