From fbc0b386757f8edeea94fd8dbc38e5f7a5648121 Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Thu, 1 Jun 2023 10:35:07 -0700 Subject: [PATCH] [EventEngine] Make the thread pool quiesce 10x faster, and add a small stress test (#33223) Co-authored-by: drfloob --- .../thread_pool/work_stealing_thread_pool.cc | 126 +++++++++++++----- .../thread_pool/work_stealing_thread_pool.h | 44 ++++-- src/core/lib/gprpp/time.h | 12 ++ test/core/event_engine/thread_pool_test.cc | 16 +++ 4 files changed, 149 insertions(+), 49 deletions(-) diff --git a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc index 19c9eda0c4b..fc62c4b076f 100644 --- a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc +++ b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc @@ -20,6 +20,8 @@ #include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h" +#include + #include #include #include @@ -36,6 +38,7 @@ #include "src/core/lib/event_engine/trace.h" #include "src/core/lib/event_engine/work_queue/basic_work_queue.h" #include "src/core/lib/event_engine/work_queue/work_queue.h" +#include "src/core/lib/gpr/time_precise.h" #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gprpp/time.h" @@ -43,19 +46,29 @@ namespace grpc_event_engine { namespace experimental { namespace { +// Maximum amount of time an extra thread is allowed to idle before being +// reclaimed. constexpr grpc_core::Duration kIdleThreadLimit = grpc_core::Duration::Seconds(20); +// Rate at which "Waiting for ..." logs should be printed while quiescing. +constexpr size_t kBlockingQuiesceLogRateSeconds = 3; +// Minumum time between thread creations. constexpr grpc_core::Duration kTimeBetweenThrottledThreadStarts = grpc_core::Duration::Seconds(1); +// Minimum time a worker thread should sleep between checking for new work. Used +// in backoff calculations to reduce vigilance when the pool is calm. constexpr grpc_core::Duration kWorkerThreadMinSleepBetweenChecks{ grpc_core::Duration::Milliseconds(15)}; +// Maximum time a worker thread should sleep between checking for new work. constexpr grpc_core::Duration kWorkerThreadMaxSleepBetweenChecks{ grpc_core::Duration::Seconds(3)}; +// Minimum time the lifeguard thread should sleep between checks. Used in +// backoff calculations to reduce vigilance when the pool is calm. constexpr grpc_core::Duration kLifeguardMinSleepBetweenChecks{ grpc_core::Duration::Milliseconds(15)}; +// Maximum time the lifeguard thread should sleep between checking for new work. constexpr grpc_core::Duration kLifeguardMaxSleepBetweenChecks{ grpc_core::Duration::Seconds(1)}; -constexpr absl::Duration kSleepBetweenQuiesceCheck{absl::Milliseconds(10)}; } // namespace thread_local WorkQueue* g_local_queue = nullptr; @@ -113,13 +126,13 @@ void WorkStealingThreadPool::PostforkChild() { pool_->Postfork(); } WorkStealingThreadPool::WorkStealingThreadPoolImpl::WorkStealingThreadPoolImpl( size_t reserve_threads) - : reserve_threads_(reserve_threads), lifeguard_() {} + : reserve_threads_(reserve_threads), lifeguard_(this) {} void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Start() { - lifeguard_.Start(shared_from_this()); for (size_t i = 0; i < reserve_threads_; i++) { StartThread(); } + lifeguard_.Start(); } void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Run( @@ -155,6 +168,7 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Quiesce() { // Note that if this is a threadpool thread then we won't exit this thread // until all other threads have exited, so we need to wait for just one thread // running instead of zero. + gpr_cycle_counter start_time = gpr_get_cycle_counter(); bool is_threadpool_thread = g_local_queue != nullptr; thread_count()->BlockUntilThreadCount(CounterType::kLivingThreadCount, is_threadpool_thread ? 1 : 0, @@ -162,6 +176,8 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Quiesce() { GPR_ASSERT(queue_.Empty()); quiesced_.store(true, std::memory_order_relaxed); lifeguard_.BlockUntilShutdown(); + GRPC_EVENT_ENGINE_TRACE("%f cycles spent quiescing the pool", + gpr_get_cycle_counter() - start_time); } bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::SetThrottled( @@ -206,21 +222,21 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Postfork() { Start(); } -// -------- WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard -// -------- +// -------- WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard ----- -WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Lifeguard() - : backoff_(grpc_core::BackOff::Options() +WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Lifeguard( + WorkStealingThreadPoolImpl* pool) + : pool_(pool), + backoff_(grpc_core::BackOff::Options() .set_initial_backoff(kLifeguardMinSleepBetweenChecks) .set_max_backoff(kLifeguardMaxSleepBetweenChecks) .set_multiplier(1.3)) {} -void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Start( - std::shared_ptr pool) { - // thread_running_ is set early to avoid a quiesce race while the lifeguard is - // still starting up. - thread_running_.store(true); - pool_ = std::move(pool); +void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Start() { + // lifeguard_running_ is set early to avoid a quiesce race while the + // lifeguard is still starting up. + grpc_core::MutexLock lock(&lifeguard_shutdown_mu_); + lifeguard_running_ = true; grpc_core::Thread( "lifeguard", [](void* arg) { @@ -234,21 +250,36 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Start( void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard:: LifeguardMain() { + grpc_core::MutexLock lock(&lifeguard_shutdown_mu_); while (true) { - absl::SleepFor(absl::Milliseconds( - (backoff_.NextAttemptTime() - grpc_core::Timestamp::Now()).millis())); if (pool_->IsForking()) break; - if (pool_->IsShutdown() && pool_->IsQuiesced()) break; + // If the pool is shut down, loop quickly until quiesced. Otherwise, + // reduce the check rate if the pool is idle. + if (pool_->IsShutdown()) { + if (pool_->IsQuiesced()) break; + } else { + lifeguard_shutdown_cv_.WaitWithTimeout( + &lifeguard_shutdown_mu_, + absl::Milliseconds( + (backoff_.NextAttemptTime() - grpc_core::Timestamp::Now()) + .millis())); + } MaybeStartNewThread(); } - pool_.reset(); - thread_running_.store(false); + lifeguard_running_ = false; + lifeguard_shutdown_cv_.Signal(); } void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard:: BlockUntilShutdown() { - while (thread_running_.load()) { - absl::SleepFor(kSleepBetweenQuiesceCheck); + grpc_core::MutexLock lock(&lifeguard_shutdown_mu_); + while (lifeguard_running_) { + lifeguard_shutdown_cv_.Signal(); + lifeguard_shutdown_cv_.WaitWithTimeout( + &lifeguard_shutdown_mu_, absl::Seconds(kBlockingQuiesceLogRateSeconds)); + GRPC_LOG_EVERY_N_SEC_DELAYED(kBlockingQuiesceLogRateSeconds, GPR_DEBUG, + "%s", + "Waiting for lifeguard thread to shut down"); } } @@ -425,36 +456,57 @@ void WorkStealingThreadPool::ThreadState::FinishDraining() { // -------- WorkStealingThreadPool::ThreadCount -------- void WorkStealingThreadPool::ThreadCount::Add(CounterType counter_type) { - thread_counts_[counter_type].fetch_add(1, std::memory_order_relaxed); + grpc_core::MutexLock lock(&wait_mu_[counter_type]); + ++thread_counts_[counter_type]; + wait_cv_[counter_type].SignalAll(); } void WorkStealingThreadPool::ThreadCount::Remove(CounterType counter_type) { - thread_counts_[counter_type].fetch_sub(1, std::memory_order_relaxed); + grpc_core::MutexLock lock(&wait_mu_[counter_type]); + --thread_counts_[counter_type]; + wait_cv_[counter_type].SignalAll(); } void WorkStealingThreadPool::ThreadCount::BlockUntilThreadCount( - CounterType counter_type, int desired_threads, const char* why, + CounterType counter_type, size_t desired_threads, const char* why, WorkSignal* work_signal) { - auto& counter = thread_counts_[counter_type]; - int curr_threads = counter.load(std::memory_order_relaxed); // Wait for all threads to exit. - auto last_log_time = grpc_core::Timestamp::Now(); - while (curr_threads > desired_threads) { - absl::SleepFor(kSleepBetweenQuiesceCheck); + while (true) { + auto curr_threads = WaitForCountChange( + counter_type, desired_threads, + grpc_core::Duration::Seconds(kBlockingQuiesceLogRateSeconds)); + if (curr_threads == desired_threads) break; + GRPC_LOG_EVERY_N_SEC_DELAYED( + kBlockingQuiesceLogRateSeconds, GPR_DEBUG, + "Waiting for thread pool to idle before %s. (%" PRIdPTR " to %" PRIdPTR + ")", + why, curr_threads, desired_threads); work_signal->SignalAll(); - if (grpc_core::Timestamp::Now() - last_log_time > - grpc_core::Duration::Seconds(3)) { - gpr_log(GPR_DEBUG, - "Waiting for thread pool to idle before %s. (%d to %d)", why, - curr_threads, desired_threads); - last_log_time = grpc_core::Timestamp::Now(); - } - curr_threads = counter.load(std::memory_order_relaxed); } } +size_t WorkStealingThreadPool::ThreadCount::WaitForCountChange( + CounterType counter_type, size_t desired_threads, + grpc_core::Duration timeout) { + size_t count; + auto deadline = absl::Now() + absl::Milliseconds(timeout.millis()); + do { + grpc_core::MutexLock lock(&wait_mu_[counter_type]); + count = GetCountLocked(counter_type); + if (count == desired_threads) break; + wait_cv_[counter_type].WaitWithDeadline(&wait_mu_[counter_type], deadline); + } while (absl::Now() < deadline); + return count; +} + size_t WorkStealingThreadPool::ThreadCount::GetCount(CounterType counter_type) { - return thread_counts_[counter_type].load(std::memory_order_relaxed); + grpc_core::MutexLock lock(&wait_mu_[counter_type]); + return GetCountLocked(counter_type); +} + +size_t WorkStealingThreadPool::ThreadCount::GetCountLocked( + CounterType counter_type) { + return thread_counts_[counter_type]; } WorkStealingThreadPool::ThreadCount::AutoThreadCount::AutoThreadCount( diff --git a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h index f5cbba8debd..9f933eb563b 100644 --- a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h +++ b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h @@ -86,14 +86,21 @@ class WorkStealingThreadPool final : public ThreadPool { class ThreadCount { public: // Adds 1 to the thread count for that counter type. - void Add(CounterType counter_type); + void Add(CounterType counter_type) + ABSL_LOCKS_EXCLUDED(wait_mu_[counter_type]); // Subtracts 1 from the thread count for that counter type. - void Remove(CounterType counter_type); + void Remove(CounterType counter_type) + ABSL_LOCKS_EXCLUDED(wait_mu_[counter_type]); // Blocks until the thread count for that type reaches `desired_threads`. - void BlockUntilThreadCount(CounterType counter_type, int desired_threads, - const char* why, WorkSignal* work_signal); + void BlockUntilThreadCount(CounterType counter_type, size_t desired_threads, + const char* why, WorkSignal* work_signal) + ABSL_LOCKS_EXCLUDED(wait_mu_[counter_type]); // Returns the current thread count for the tracked type. - size_t GetCount(CounterType counter_type); + size_t GetCount(CounterType counter_type) + ABSL_LOCKS_EXCLUDED(wait_mu_[counter_type]); + // Returns the current thread count for the tracked type. + size_t GetCountLocked(CounterType counter_type) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(wait_mu_[counter_type]); // Adds and removes thread counts on construction and destruction class AutoThreadCount { @@ -107,7 +114,15 @@ class WorkStealingThreadPool final : public ThreadPool { }; private: - std::atomic thread_counts_[2]{{0}, {0}}; + // Wait for the desired count to be reached. + // Returns the current thread count either when the desired count is + // reached, or when the deadline has passed, whichever happens first. + size_t WaitForCountChange(CounterType counter_type, size_t desired_threads, + grpc_core::Duration timeout); + + grpc_core::Mutex wait_mu_[2]; + grpc_core::CondVar wait_cv_[2]; + size_t thread_counts_[2]{0, 0}; }; // A pool of WorkQueues that participate in work stealing. @@ -152,8 +167,8 @@ class WorkStealingThreadPool final : public ThreadPool { // This method is safe to call from within a ThreadPool thread. void Quiesce(); // Sets a throttled state. - // After the initial pool has been created, if the pool is backlogged when a - // new thread has started, it is rate limited. + // After the initial pool has been created, if the pool is backlogged when + // a new thread has started, it is rate limited. // Returns the previous throttling state. bool SetThrottled(bool throttle); // Set the shutdown flag. @@ -183,9 +198,9 @@ class WorkStealingThreadPool final : public ThreadPool { // and there are threads that can accept work. class Lifeguard { public: - Lifeguard(); + explicit Lifeguard(WorkStealingThreadPoolImpl* pool); // Start the lifeguard thread. - void Start(std::shared_ptr pool); + void Start(); // Block until the lifeguard thread is shut down. void BlockUntilShutdown(); @@ -194,9 +209,14 @@ class WorkStealingThreadPool final : public ThreadPool { void LifeguardMain(); // Starts a new thread if the pool is backlogged void MaybeStartNewThread(); - std::shared_ptr pool_; + + WorkStealingThreadPoolImpl* pool_; grpc_core::BackOff backoff_; - std::atomic thread_running_{false}; + // Used for signaling that the lifeguard thread has stopped running. + grpc_core::Mutex lifeguard_shutdown_mu_; + bool lifeguard_running_ ABSL_GUARDED_BY(lifeguard_shutdown_mu_) = false; + grpc_core::CondVar lifeguard_shutdown_cv_ + ABSL_GUARDED_BY(lifeguard_shutdown_mu_); }; const size_t reserve_threads_; diff --git a/src/core/lib/gprpp/time.h b/src/core/lib/gprpp/time.h index 557139eb46a..3c39916c3b0 100644 --- a/src/core/lib/gprpp/time.h +++ b/src/core/lib/gprpp/time.h @@ -43,6 +43,18 @@ } \ } while (0) +#define GRPC_LOG_EVERY_N_SEC_DELAYED(n, severity, format, ...) \ + do { \ + static std::atomic prev{0}; \ + uint64_t now = grpc_core::Timestamp::FromTimespecRoundDown( \ + gpr_now(GPR_CLOCK_MONOTONIC)) \ + .milliseconds_after_process_epoch(); \ + uint64_t prev_tsamp = prev.exchange(now); \ + if (now - prev_tsamp > (n)*1000) { \ + gpr_log(severity, format, __VA_ARGS__); \ + } \ + } while (0) + namespace grpc_core { namespace time_detail { diff --git a/test/core/event_engine/thread_pool_test.cc b/test/core/event_engine/thread_pool_test.cc index 352972880cb..91879b8bf07 100644 --- a/test/core/event_engine/thread_pool_test.cc +++ b/test/core/event_engine/thread_pool_test.cc @@ -221,6 +221,22 @@ TEST_F(WorkStealingThreadPoolTest, p.Quiesce(); } +// TODO(hork): This is currently a pathological case for the original thread +// pool, it takes around 50s to run. When that is fixed, or the implementation +// is deleted, make this a typed test again. +TEST_F(WorkStealingThreadPoolTest, QuiesceRaceStressTest) { + int cycle_count = 333; + int thread_count = 8; + int run_count = thread_count * 2; + for (int i = 0; i < cycle_count; i++) { + WorkStealingThreadPool p(thread_count); + for (int j = 0; j < run_count; j++) { + p.Run([]() {}); + } + p.Quiesce(); + } +} + } // namespace experimental } // namespace grpc_event_engine