[EventEngine] Make the thread pool quiesce 10x faster, and add a small stress test (#33223)

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
pull/33313/head
AJ Heller 2 years ago committed by GitHub
parent 173d225150
commit fbc0b38675
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 126
      src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
  2. 44
      src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h
  3. 12
      src/core/lib/gprpp/time.h
  4. 16
      test/core/event_engine/thread_pool_test.cc

@ -20,6 +20,8 @@
#include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h"
#include <inttypes.h>
#include <atomic>
#include <memory>
#include <utility>
@ -36,6 +38,7 @@
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/event_engine/work_queue/basic_work_queue.h"
#include "src/core/lib/event_engine/work_queue/work_queue.h"
#include "src/core/lib/gpr/time_precise.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gprpp/time.h"
@ -43,19 +46,29 @@ namespace grpc_event_engine {
namespace experimental {
namespace {
// Maximum amount of time an extra thread is allowed to idle before being
// reclaimed.
constexpr grpc_core::Duration kIdleThreadLimit =
grpc_core::Duration::Seconds(20);
// Rate at which "Waiting for ..." logs should be printed while quiescing.
constexpr size_t kBlockingQuiesceLogRateSeconds = 3;
// Minumum time between thread creations.
constexpr grpc_core::Duration kTimeBetweenThrottledThreadStarts =
grpc_core::Duration::Seconds(1);
// Minimum time a worker thread should sleep between checking for new work. Used
// in backoff calculations to reduce vigilance when the pool is calm.
constexpr grpc_core::Duration kWorkerThreadMinSleepBetweenChecks{
grpc_core::Duration::Milliseconds(15)};
// Maximum time a worker thread should sleep between checking for new work.
constexpr grpc_core::Duration kWorkerThreadMaxSleepBetweenChecks{
grpc_core::Duration::Seconds(3)};
// Minimum time the lifeguard thread should sleep between checks. Used in
// backoff calculations to reduce vigilance when the pool is calm.
constexpr grpc_core::Duration kLifeguardMinSleepBetweenChecks{
grpc_core::Duration::Milliseconds(15)};
// Maximum time the lifeguard thread should sleep between checking for new work.
constexpr grpc_core::Duration kLifeguardMaxSleepBetweenChecks{
grpc_core::Duration::Seconds(1)};
constexpr absl::Duration kSleepBetweenQuiesceCheck{absl::Milliseconds(10)};
} // namespace
thread_local WorkQueue* g_local_queue = nullptr;
@ -113,13 +126,13 @@ void WorkStealingThreadPool::PostforkChild() { pool_->Postfork(); }
WorkStealingThreadPool::WorkStealingThreadPoolImpl::WorkStealingThreadPoolImpl(
size_t reserve_threads)
: reserve_threads_(reserve_threads), lifeguard_() {}
: reserve_threads_(reserve_threads), lifeguard_(this) {}
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Start() {
lifeguard_.Start(shared_from_this());
for (size_t i = 0; i < reserve_threads_; i++) {
StartThread();
}
lifeguard_.Start();
}
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Run(
@ -155,6 +168,7 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Quiesce() {
// Note that if this is a threadpool thread then we won't exit this thread
// until all other threads have exited, so we need to wait for just one thread
// running instead of zero.
gpr_cycle_counter start_time = gpr_get_cycle_counter();
bool is_threadpool_thread = g_local_queue != nullptr;
thread_count()->BlockUntilThreadCount(CounterType::kLivingThreadCount,
is_threadpool_thread ? 1 : 0,
@ -162,6 +176,8 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Quiesce() {
GPR_ASSERT(queue_.Empty());
quiesced_.store(true, std::memory_order_relaxed);
lifeguard_.BlockUntilShutdown();
GRPC_EVENT_ENGINE_TRACE("%f cycles spent quiescing the pool",
gpr_get_cycle_counter() - start_time);
}
bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::SetThrottled(
@ -206,21 +222,21 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Postfork() {
Start();
}
// -------- WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard
// --------
// -------- WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard -----
WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Lifeguard()
: backoff_(grpc_core::BackOff::Options()
WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Lifeguard(
WorkStealingThreadPoolImpl* pool)
: pool_(pool),
backoff_(grpc_core::BackOff::Options()
.set_initial_backoff(kLifeguardMinSleepBetweenChecks)
.set_max_backoff(kLifeguardMaxSleepBetweenChecks)
.set_multiplier(1.3)) {}
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Start(
std::shared_ptr<WorkStealingThreadPoolImpl> pool) {
// thread_running_ is set early to avoid a quiesce race while the lifeguard is
// still starting up.
thread_running_.store(true);
pool_ = std::move(pool);
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Start() {
// lifeguard_running_ is set early to avoid a quiesce race while the
// lifeguard is still starting up.
grpc_core::MutexLock lock(&lifeguard_shutdown_mu_);
lifeguard_running_ = true;
grpc_core::Thread(
"lifeguard",
[](void* arg) {
@ -234,21 +250,36 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Start(
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::
LifeguardMain() {
grpc_core::MutexLock lock(&lifeguard_shutdown_mu_);
while (true) {
absl::SleepFor(absl::Milliseconds(
(backoff_.NextAttemptTime() - grpc_core::Timestamp::Now()).millis()));
if (pool_->IsForking()) break;
if (pool_->IsShutdown() && pool_->IsQuiesced()) break;
// If the pool is shut down, loop quickly until quiesced. Otherwise,
// reduce the check rate if the pool is idle.
if (pool_->IsShutdown()) {
if (pool_->IsQuiesced()) break;
} else {
lifeguard_shutdown_cv_.WaitWithTimeout(
&lifeguard_shutdown_mu_,
absl::Milliseconds(
(backoff_.NextAttemptTime() - grpc_core::Timestamp::Now())
.millis()));
}
MaybeStartNewThread();
}
pool_.reset();
thread_running_.store(false);
lifeguard_running_ = false;
lifeguard_shutdown_cv_.Signal();
}
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::
BlockUntilShutdown() {
while (thread_running_.load()) {
absl::SleepFor(kSleepBetweenQuiesceCheck);
grpc_core::MutexLock lock(&lifeguard_shutdown_mu_);
while (lifeguard_running_) {
lifeguard_shutdown_cv_.Signal();
lifeguard_shutdown_cv_.WaitWithTimeout(
&lifeguard_shutdown_mu_, absl::Seconds(kBlockingQuiesceLogRateSeconds));
GRPC_LOG_EVERY_N_SEC_DELAYED(kBlockingQuiesceLogRateSeconds, GPR_DEBUG,
"%s",
"Waiting for lifeguard thread to shut down");
}
}
@ -425,36 +456,57 @@ void WorkStealingThreadPool::ThreadState::FinishDraining() {
// -------- WorkStealingThreadPool::ThreadCount --------
void WorkStealingThreadPool::ThreadCount::Add(CounterType counter_type) {
thread_counts_[counter_type].fetch_add(1, std::memory_order_relaxed);
grpc_core::MutexLock lock(&wait_mu_[counter_type]);
++thread_counts_[counter_type];
wait_cv_[counter_type].SignalAll();
}
void WorkStealingThreadPool::ThreadCount::Remove(CounterType counter_type) {
thread_counts_[counter_type].fetch_sub(1, std::memory_order_relaxed);
grpc_core::MutexLock lock(&wait_mu_[counter_type]);
--thread_counts_[counter_type];
wait_cv_[counter_type].SignalAll();
}
void WorkStealingThreadPool::ThreadCount::BlockUntilThreadCount(
CounterType counter_type, int desired_threads, const char* why,
CounterType counter_type, size_t desired_threads, const char* why,
WorkSignal* work_signal) {
auto& counter = thread_counts_[counter_type];
int curr_threads = counter.load(std::memory_order_relaxed);
// Wait for all threads to exit.
auto last_log_time = grpc_core::Timestamp::Now();
while (curr_threads > desired_threads) {
absl::SleepFor(kSleepBetweenQuiesceCheck);
while (true) {
auto curr_threads = WaitForCountChange(
counter_type, desired_threads,
grpc_core::Duration::Seconds(kBlockingQuiesceLogRateSeconds));
if (curr_threads == desired_threads) break;
GRPC_LOG_EVERY_N_SEC_DELAYED(
kBlockingQuiesceLogRateSeconds, GPR_DEBUG,
"Waiting for thread pool to idle before %s. (%" PRIdPTR " to %" PRIdPTR
")",
why, curr_threads, desired_threads);
work_signal->SignalAll();
if (grpc_core::Timestamp::Now() - last_log_time >
grpc_core::Duration::Seconds(3)) {
gpr_log(GPR_DEBUG,
"Waiting for thread pool to idle before %s. (%d to %d)", why,
curr_threads, desired_threads);
last_log_time = grpc_core::Timestamp::Now();
}
curr_threads = counter.load(std::memory_order_relaxed);
}
}
size_t WorkStealingThreadPool::ThreadCount::WaitForCountChange(
CounterType counter_type, size_t desired_threads,
grpc_core::Duration timeout) {
size_t count;
auto deadline = absl::Now() + absl::Milliseconds(timeout.millis());
do {
grpc_core::MutexLock lock(&wait_mu_[counter_type]);
count = GetCountLocked(counter_type);
if (count == desired_threads) break;
wait_cv_[counter_type].WaitWithDeadline(&wait_mu_[counter_type], deadline);
} while (absl::Now() < deadline);
return count;
}
size_t WorkStealingThreadPool::ThreadCount::GetCount(CounterType counter_type) {
return thread_counts_[counter_type].load(std::memory_order_relaxed);
grpc_core::MutexLock lock(&wait_mu_[counter_type]);
return GetCountLocked(counter_type);
}
size_t WorkStealingThreadPool::ThreadCount::GetCountLocked(
CounterType counter_type) {
return thread_counts_[counter_type];
}
WorkStealingThreadPool::ThreadCount::AutoThreadCount::AutoThreadCount(

@ -86,14 +86,21 @@ class WorkStealingThreadPool final : public ThreadPool {
class ThreadCount {
public:
// Adds 1 to the thread count for that counter type.
void Add(CounterType counter_type);
void Add(CounterType counter_type)
ABSL_LOCKS_EXCLUDED(wait_mu_[counter_type]);
// Subtracts 1 from the thread count for that counter type.
void Remove(CounterType counter_type);
void Remove(CounterType counter_type)
ABSL_LOCKS_EXCLUDED(wait_mu_[counter_type]);
// Blocks until the thread count for that type reaches `desired_threads`.
void BlockUntilThreadCount(CounterType counter_type, int desired_threads,
const char* why, WorkSignal* work_signal);
void BlockUntilThreadCount(CounterType counter_type, size_t desired_threads,
const char* why, WorkSignal* work_signal)
ABSL_LOCKS_EXCLUDED(wait_mu_[counter_type]);
// Returns the current thread count for the tracked type.
size_t GetCount(CounterType counter_type);
size_t GetCount(CounterType counter_type)
ABSL_LOCKS_EXCLUDED(wait_mu_[counter_type]);
// Returns the current thread count for the tracked type.
size_t GetCountLocked(CounterType counter_type)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(wait_mu_[counter_type]);
// Adds and removes thread counts on construction and destruction
class AutoThreadCount {
@ -107,7 +114,15 @@ class WorkStealingThreadPool final : public ThreadPool {
};
private:
std::atomic<size_t> thread_counts_[2]{{0}, {0}};
// Wait for the desired count to be reached.
// Returns the current thread count either when the desired count is
// reached, or when the deadline has passed, whichever happens first.
size_t WaitForCountChange(CounterType counter_type, size_t desired_threads,
grpc_core::Duration timeout);
grpc_core::Mutex wait_mu_[2];
grpc_core::CondVar wait_cv_[2];
size_t thread_counts_[2]{0, 0};
};
// A pool of WorkQueues that participate in work stealing.
@ -152,8 +167,8 @@ class WorkStealingThreadPool final : public ThreadPool {
// This method is safe to call from within a ThreadPool thread.
void Quiesce();
// Sets a throttled state.
// After the initial pool has been created, if the pool is backlogged when a
// new thread has started, it is rate limited.
// After the initial pool has been created, if the pool is backlogged when
// a new thread has started, it is rate limited.
// Returns the previous throttling state.
bool SetThrottled(bool throttle);
// Set the shutdown flag.
@ -183,9 +198,9 @@ class WorkStealingThreadPool final : public ThreadPool {
// and there are threads that can accept work.
class Lifeguard {
public:
Lifeguard();
explicit Lifeguard(WorkStealingThreadPoolImpl* pool);
// Start the lifeguard thread.
void Start(std::shared_ptr<WorkStealingThreadPoolImpl> pool);
void Start();
// Block until the lifeguard thread is shut down.
void BlockUntilShutdown();
@ -194,9 +209,14 @@ class WorkStealingThreadPool final : public ThreadPool {
void LifeguardMain();
// Starts a new thread if the pool is backlogged
void MaybeStartNewThread();
std::shared_ptr<WorkStealingThreadPoolImpl> pool_;
WorkStealingThreadPoolImpl* pool_;
grpc_core::BackOff backoff_;
std::atomic<bool> thread_running_{false};
// Used for signaling that the lifeguard thread has stopped running.
grpc_core::Mutex lifeguard_shutdown_mu_;
bool lifeguard_running_ ABSL_GUARDED_BY(lifeguard_shutdown_mu_) = false;
grpc_core::CondVar lifeguard_shutdown_cv_
ABSL_GUARDED_BY(lifeguard_shutdown_mu_);
};
const size_t reserve_threads_;

@ -43,6 +43,18 @@
} \
} while (0)
#define GRPC_LOG_EVERY_N_SEC_DELAYED(n, severity, format, ...) \
do { \
static std::atomic<uint64_t> prev{0}; \
uint64_t now = grpc_core::Timestamp::FromTimespecRoundDown( \
gpr_now(GPR_CLOCK_MONOTONIC)) \
.milliseconds_after_process_epoch(); \
uint64_t prev_tsamp = prev.exchange(now); \
if (now - prev_tsamp > (n)*1000) { \
gpr_log(severity, format, __VA_ARGS__); \
} \
} while (0)
namespace grpc_core {
namespace time_detail {

@ -221,6 +221,22 @@ TEST_F(WorkStealingThreadPoolTest,
p.Quiesce();
}
// TODO(hork): This is currently a pathological case for the original thread
// pool, it takes around 50s to run. When that is fixed, or the implementation
// is deleted, make this a typed test again.
TEST_F(WorkStealingThreadPoolTest, QuiesceRaceStressTest) {
int cycle_count = 333;
int thread_count = 8;
int run_count = thread_count * 2;
for (int i = 0; i < cycle_count; i++) {
WorkStealingThreadPool p(thread_count);
for (int j = 0; j < run_count; j++) {
p.Run([]() {});
}
p.Quiesce();
}
}
} // namespace experimental
} // namespace grpc_event_engine

Loading…
Cancel
Save