[EventEngine] ThreadPool: manage fork and shutdown bits separately (#32329)

The previous implementation assumed that shutdown and fork events could
not occur at the same time, but that's not the case. This change adds
separate tracking for fork and shutdown bits.

cc @gnossen
pull/32339/head
AJ Heller 2 years ago committed by GitHub
parent 219f6506b7
commit 51f276e7be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 89
      src/core/lib/event_engine/thread_pool.cc
  2. 29
      src/core/lib/event_engine/thread_pool.h

@ -102,33 +102,26 @@ void ThreadPool::ThreadFunc(StatePtr state) {
} }
bool ThreadPool::Queue::Step() { bool ThreadPool::Queue::Step() {
grpc_core::ReleasableMutexLock lock(&mu_); grpc_core::ReleasableMutexLock lock(&queue_mu_);
// Wait until work is available or we are shutting down. // Wait until work is available or we are shutting down.
while (state_ == State::kRunning && callbacks_.empty()) { while (!shutdown_ && !forking_ && callbacks_.empty()) {
// If there are too many threads waiting, then quit this thread. // If there are too many threads waiting, then quit this thread.
// TODO(ctiller): wait some time in this case to be sure. // TODO(ctiller): wait some time in this case to be sure.
if (threads_waiting_ >= reserve_threads_) { if (threads_waiting_ >= reserve_threads_) {
threads_waiting_++; threads_waiting_++;
bool timeout = cv_.WaitWithTimeout(&mu_, absl::Seconds(30)); bool timeout = cv_.WaitWithTimeout(&queue_mu_, absl::Seconds(30));
threads_waiting_--; threads_waiting_--;
if (timeout && threads_waiting_ >= reserve_threads_) { if (timeout && threads_waiting_ >= reserve_threads_) {
return false; return false;
} }
} else { } else {
threads_waiting_++; threads_waiting_++;
cv_.Wait(&mu_); cv_.Wait(&queue_mu_);
threads_waiting_--; threads_waiting_--;
} }
} }
switch (state_) { if (forking_) return false;
case State::kRunning: if (shutdown_ && callbacks_.empty()) return false;
break;
case State::kForking:
return false;
case State::kShutdown:
if (!callbacks_.empty()) break;
return false;
}
GPR_ASSERT(!callbacks_.empty()); GPR_ASSERT(!callbacks_.empty());
auto callback = std::move(callbacks_.front()); auto callback = std::move(callbacks_.front());
callbacks_.pop(); callbacks_.pop();
@ -148,7 +141,7 @@ bool ThreadPool::IsThreadPoolThread() {
} }
void ThreadPool::Quiesce() { void ThreadPool::Quiesce() {
state_->queue.SetShutdown(); state_->queue.SetShutdown(true);
// Wait until all threads are exited. // Wait until all threads are exited.
// Note that if this is a threadpool thread then we won't exit this thread // Note that if this is a threadpool thread then we won't exit this thread
// until the callstack unwinds a little, so we need to wait for just one // until the callstack unwinds a little, so we need to wait for just one
@ -174,81 +167,65 @@ void ThreadPool::Run(EventEngine::Closure* closure) {
} }
bool ThreadPool::Queue::Add(absl::AnyInvocable<void()> callback) { bool ThreadPool::Queue::Add(absl::AnyInvocable<void()> callback) {
grpc_core::MutexLock lock(&mu_); grpc_core::MutexLock lock(&queue_mu_);
// Add works to the callbacks list // Add works to the callbacks list
callbacks_.push(std::move(callback)); callbacks_.push(std::move(callback));
cv_.Signal(); cv_.Signal();
switch (state_) { if (forking_) return false;
case State::kRunning: return callbacks_.size() > threads_waiting_;
case State::kShutdown:
return callbacks_.size() > threads_waiting_;
case State::kForking:
return false;
}
GPR_UNREACHABLE_CODE(return false);
} }
bool ThreadPool::Queue::IsBacklogged() { bool ThreadPool::Queue::IsBacklogged() {
grpc_core::MutexLock lock(&mu_); grpc_core::MutexLock lock(&queue_mu_);
switch (state_) { if (forking_) return false;
case State::kRunning: return callbacks_.size() > 1;
case State::kShutdown:
return callbacks_.size() > 1;
case State::kForking:
return false;
}
GPR_UNREACHABLE_CODE(return false);
} }
void ThreadPool::Queue::SleepIfRunning() { void ThreadPool::Queue::SleepIfRunning() {
grpc_core::MutexLock lock(&mu_); grpc_core::MutexLock lock(&queue_mu_);
auto end = grpc_core::Duration::Seconds(1) + grpc_core::Timestamp::Now(); auto end = grpc_core::Duration::Seconds(1) + grpc_core::Timestamp::Now();
while (true) { while (true) {
grpc_core::Timestamp now = grpc_core::Timestamp::Now(); grpc_core::Timestamp now = grpc_core::Timestamp::Now();
if (now >= end) return; if (now >= end || forking_) return;
switch (state_) { cv_.WaitWithTimeout(&queue_mu_, absl::Milliseconds((end - now).millis()));
case State::kRunning:
case State::kShutdown:
cv_.WaitWithTimeout(&mu_, absl::Milliseconds((end - now).millis()));
break;
case State::kForking:
return;
}
} }
} }
void ThreadPool::Queue::SetState(State state) { void ThreadPool::Queue::SetShutdown(bool is_shutdown) {
grpc_core::MutexLock lock(&mu_); grpc_core::MutexLock lock(&queue_mu_);
if (state == State::kRunning) { auto was_shutdown = std::exchange(shutdown_, is_shutdown);
GPR_ASSERT(state_ != State::kRunning); GPR_ASSERT(is_shutdown != was_shutdown);
} else { cv_.SignalAll();
GPR_ASSERT(state_ == State::kRunning); }
}
state_ = state; void ThreadPool::Queue::SetForking(bool is_forking) {
grpc_core::MutexLock lock(&queue_mu_);
auto was_forking = std::exchange(forking_, is_forking);
GPR_ASSERT(is_forking != was_forking);
cv_.SignalAll(); cv_.SignalAll();
} }
void ThreadPool::ThreadCount::Add() { void ThreadPool::ThreadCount::Add() {
grpc_core::MutexLock lock(&mu_); grpc_core::MutexLock lock(&thread_count_mu_);
++threads_; ++threads_;
} }
void ThreadPool::ThreadCount::Remove() { void ThreadPool::ThreadCount::Remove() {
grpc_core::MutexLock lock(&mu_); grpc_core::MutexLock lock(&thread_count_mu_);
--threads_; --threads_;
cv_.Signal(); cv_.Signal();
} }
void ThreadPool::ThreadCount::BlockUntilThreadCount(int threads, void ThreadPool::ThreadCount::BlockUntilThreadCount(int threads,
const char* why) { const char* why) {
grpc_core::MutexLock lock(&mu_); grpc_core::MutexLock lock(&thread_count_mu_);
auto last_log = absl::Now(); auto last_log = absl::Now();
while (threads_ > threads) { while (threads_ > threads) {
// Wait for all threads to exit. // Wait for all threads to exit.
// At least once every three seconds (but no faster than once per second in // At least once every three seconds (but no faster than once per second in
// the event of spurious wakeups) log a message indicating we're waiting to // the event of spurious wakeups) log a message indicating we're waiting to
// fork. // fork.
cv_.WaitWithTimeout(&mu_, absl::Seconds(3)); cv_.WaitWithTimeout(&thread_count_mu_, absl::Seconds(3));
if (threads_ > threads && absl::Now() - last_log > absl::Seconds(1)) { if (threads_ > threads && absl::Now() - last_log > absl::Seconds(1)) {
gpr_log(GPR_ERROR, "Waiting for thread pool to idle before %s", why); gpr_log(GPR_ERROR, "Waiting for thread pool to idle before %s", why);
last_log = absl::Now(); last_log = absl::Now();
@ -257,7 +234,7 @@ void ThreadPool::ThreadCount::BlockUntilThreadCount(int threads,
} }
void ThreadPool::PrepareFork() { void ThreadPool::PrepareFork() {
state_->queue.SetForking(); state_->queue.SetForking(true);
state_->thread_count.BlockUntilThreadCount(0, "forking"); state_->thread_count.BlockUntilThreadCount(0, "forking");
} }
@ -266,7 +243,7 @@ void ThreadPool::PostforkParent() { Postfork(); }
void ThreadPool::PostforkChild() { Postfork(); } void ThreadPool::PostforkChild() { Postfork(); }
void ThreadPool::Postfork() { void ThreadPool::Postfork() {
state_->queue.Reset(); state_->queue.SetForking(false);
for (unsigned i = 0; i < reserve_threads_; i++) { for (unsigned i = 0; i < reserve_threads_; i++) {
StartThread(state_, StartThreadReason::kInitialPool); StartThread(state_, StartThreadReason::kInitialPool);
} }

@ -68,26 +68,27 @@ class ThreadPool final : public Forkable, public Executor {
explicit Queue(unsigned reserve_threads) explicit Queue(unsigned reserve_threads)
: reserve_threads_(reserve_threads) {} : reserve_threads_(reserve_threads) {}
bool Step(); bool Step();
void SetShutdown() { SetState(State::kShutdown); }
void SetForking() { SetState(State::kForking); }
// Add a callback to the queue. // Add a callback to the queue.
// Return true if we should also spin up a new thread. // Return true if we should also spin up a new thread.
bool Add(absl::AnyInvocable<void()> callback); bool Add(absl::AnyInvocable<void()> callback);
void Reset() { SetState(State::kRunning); } void SetShutdown(bool is_shutdown);
void SetForking(bool is_forking);
bool IsBacklogged(); bool IsBacklogged();
void SleepIfRunning(); void SleepIfRunning();
private: private:
enum class State { kRunning, kShutdown, kForking };
void SetState(State state);
grpc_core::Mutex mu_;
grpc_core::CondVar cv_;
std::queue<absl::AnyInvocable<void()>> callbacks_ ABSL_GUARDED_BY(mu_);
unsigned threads_waiting_ ABSL_GUARDED_BY(mu_) = 0;
const unsigned reserve_threads_; const unsigned reserve_threads_;
State state_ ABSL_GUARDED_BY(mu_) = State::kRunning; grpc_core::Mutex queue_mu_;
grpc_core::CondVar cv_;
std::queue<absl::AnyInvocable<void()>> callbacks_
ABSL_GUARDED_BY(queue_mu_);
unsigned threads_waiting_ ABSL_GUARDED_BY(queue_mu_) = 0;
// Track shutdown and fork bits separately.
// It's possible for a ThreadPool to initiate shut down while fork handlers
// are running, and similarly possible for a fork event to occur during
// shutdown.
bool shutdown_ ABSL_GUARDED_BY(queue_mu_) = false;
bool forking_ ABSL_GUARDED_BY(queue_mu_) = false;
}; };
class ThreadCount { class ThreadCount {
@ -97,9 +98,9 @@ class ThreadPool final : public Forkable, public Executor {
void BlockUntilThreadCount(int threads, const char* why); void BlockUntilThreadCount(int threads, const char* why);
private: private:
grpc_core::Mutex mu_; grpc_core::Mutex thread_count_mu_;
grpc_core::CondVar cv_; grpc_core::CondVar cv_;
int threads_ ABSL_GUARDED_BY(mu_) = 0; int threads_ ABSL_GUARDED_BY(thread_count_mu_) = 0;
}; };
struct State { struct State {

Loading…
Cancel
Save