From 112421760a7417d219c89deb97df09e0bdea6e29 Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Fri, 21 Jul 2023 09:25:47 -0700 Subject: [PATCH] [EventEngine] Eliminate busy loop in the work stealing lifeguard's shutdown (#33386) Co-authored-by: drfloob --- src/core/BUILD | 1 + .../thread_pool/work_stealing_thread_pool.cc | 85 ++++++++++++++----- .../thread_pool/work_stealing_thread_pool.h | 11 +-- test/core/event_engine/BUILD | 1 + test/core/event_engine/thread_pool_test.cc | 37 ++++++++ 5 files changed, 108 insertions(+), 27 deletions(-) diff --git a/src/core/BUILD b/src/core/BUILD index f4f04311966..34736a7a53f 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -1510,6 +1510,7 @@ grpc_cc_library( "event_engine_work_queue", "experiments", "forkable", + "notification", "time", "useful", "//:backoff", diff --git a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc index 89b34d919cd..29eefb2facf 100644 --- a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc +++ b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc @@ -23,7 +23,6 @@ #include #include -#include #include #include @@ -39,10 +38,50 @@ #include "src/core/lib/event_engine/trace.h" #include "src/core/lib/event_engine/work_queue/basic_work_queue.h" #include "src/core/lib/event_engine/work_queue/work_queue.h" -#include "src/core/lib/gpr/time_precise.h" #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gprpp/time.h" +// ## Thread Pool Fork-handling +// +// Thread-safety needs special attention with regard to fork() calls. The +// Forkable system employs a pre- and post- fork callback system that does not +// guarantee any ordering of execution. On fork() events, the thread pool does +// the following: +// +// On pre-fork: +// * the WorkStealingThreadPool triggers all threads to exit, +// * all queued work is saved, and +// * all threads will are down, including the Lifeguard thread. +// +// On post-fork: +// * all threads are restarted, including the Lifeguard thread, and +// * all previously-saved work is enqueued for execution. +// +// However, the queue may may get into trouble if one thread is attempting to +// restart the thread pool while another thread is shutting it down. For that +// reason, Quiesce and Start must be thread-safe, and Quiesce must wait for the +// pool to be in a fully started state before it is allowed to continue. +// Consider this potential ordering of events between Start and Quiesce: +// +// ┌──────────┐ +// │ Thread 1 │ +// └────┬─────┘ ┌──────────┐ +// │ │ Thread 2 │ +// ▼ └────┬─────┘ +// Start() │ +// │ ▼ +// │ Quiesce() +// │ Wait for worker threads to exit +// │ Wait for the lifeguard thread to exit +// ▼ +// Start the Lifeguard thread +// Start the worker threads +// +// Thread 2 will find no worker threads, and it will then want to wait on a +// non-existent Lifeguard thread to finish. Trying a simple +// `lifeguard_thread_.Join()` leads to memory access errors. This implementation +// uses Notifications to coordinate startup and shutdown states. + namespace grpc_event_engine { namespace experimental { @@ -169,16 +208,13 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Quiesce() { // Note that if this is a threadpool thread then we won't exit this thread // until all other threads have exited, so we need to wait for just one thread // running instead of zero. - gpr_cycle_counter start_time = gpr_get_cycle_counter(); bool is_threadpool_thread = g_local_queue != nullptr; thread_count()->BlockUntilThreadCount(CounterType::kLivingThreadCount, is_threadpool_thread ? 1 : 0, "shutting down", work_signal()); GPR_ASSERT(queue_.Empty()); quiesced_.store(true, std::memory_order_relaxed); - lifeguard_.BlockUntilShutdown(); - GRPC_EVENT_ENGINE_TRACE("%ld cycles spent quiescing the pool", - std::lround(gpr_get_cycle_counter() - start_time)); + lifeguard_.BlockUntilShutdownAndReset(); } bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::SetThrottled( @@ -215,7 +251,7 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::PrepareFork() { SetForking(true); thread_count()->BlockUntilThreadCount(CounterType::kLivingThreadCount, 0, "forking", &work_signal_); - lifeguard_.BlockUntilShutdown(); + lifeguard_.BlockUntilShutdownAndReset(); } void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Postfork() { @@ -231,13 +267,14 @@ WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Lifeguard( backoff_(grpc_core::BackOff::Options() .set_initial_backoff(kLifeguardMinSleepBetweenChecks) .set_max_backoff(kLifeguardMaxSleepBetweenChecks) - .set_multiplier(1.3)) {} + .set_multiplier(1.3)), + lifeguard_should_shut_down_(std::make_unique()), + lifeguard_is_shut_down_(std::make_unique()) {} void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Start() { // lifeguard_running_ is set early to avoid a quiesce race while the // lifeguard is still starting up. - grpc_core::MutexLock lock(&lifeguard_shutdown_mu_); - lifeguard_running_ = true; + lifeguard_running_.store(true); grpc_core::Thread( "lifeguard", [](void* arg) { @@ -251,7 +288,6 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Start() { void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard:: LifeguardMain() { - grpc_core::MutexLock lock(&lifeguard_shutdown_mu_); while (true) { if (pool_->IsForking()) break; // If the pool is shut down, loop quickly until quiesced. Otherwise, @@ -259,29 +295,33 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard:: if (pool_->IsShutdown()) { if (pool_->IsQuiesced()) break; } else { - lifeguard_shutdown_cv_.WaitWithTimeout( - &lifeguard_shutdown_mu_, + lifeguard_should_shut_down_->WaitForNotificationWithTimeout( absl::Milliseconds( (backoff_.NextAttemptTime() - grpc_core::Timestamp::Now()) .millis())); } MaybeStartNewThread(); } - lifeguard_running_ = false; - lifeguard_shutdown_cv_.Signal(); + lifeguard_running_.store(false, std::memory_order_relaxed); + lifeguard_is_shut_down_->Notify(); } void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard:: - BlockUntilShutdown() { - grpc_core::MutexLock lock(&lifeguard_shutdown_mu_); - while (lifeguard_running_) { - lifeguard_shutdown_cv_.Signal(); - lifeguard_shutdown_cv_.WaitWithTimeout( - &lifeguard_shutdown_mu_, absl::Seconds(kBlockingQuiesceLogRateSeconds)); + BlockUntilShutdownAndReset() { + lifeguard_should_shut_down_->Notify(); + while (lifeguard_running_.load(std::memory_order_relaxed)) { GRPC_LOG_EVERY_N_SEC_DELAYED(kBlockingQuiesceLogRateSeconds, GPR_DEBUG, "%s", "Waiting for lifeguard thread to shut down"); + lifeguard_is_shut_down_->WaitForNotification(); } + // Do an additional wait in case this method races with LifeguardMain's + // shutdown. This should return immediately if the lifeguard is already shut + // down. + lifeguard_is_shut_down_->WaitForNotification(); + backoff_.Reset(); + lifeguard_should_shut_down_ = std::make_unique(); + lifeguard_is_shut_down_ = std::make_unique(); } void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard:: @@ -406,6 +446,7 @@ bool WorkStealingThreadPool::ThreadState::Step() { if (pool_->IsShutdown()) break; bool timed_out = pool_->work_signal()->WaitWithTimeout( backoff_.NextAttemptTime() - grpc_core::Timestamp::Now()); + if (pool_->IsForking() || pool_->IsShutdown()) break; // Quit a thread if the pool has more than it requires, and this thread // has been idle long enough. if (timed_out && @@ -472,6 +513,7 @@ void WorkStealingThreadPool::ThreadCount::BlockUntilThreadCount( CounterType counter_type, size_t desired_threads, const char* why, WorkSignal* work_signal) { // Wait for all threads to exit. + work_signal->SignalAll(); while (true) { auto curr_threads = WaitForCountChange( counter_type, desired_threads, @@ -482,7 +524,6 @@ void WorkStealingThreadPool::ThreadCount::BlockUntilThreadCount( "Waiting for thread pool to idle before %s. (%" PRIdPTR " to %" PRIdPTR ")", why, curr_threads, desired_threads); - work_signal->SignalAll(); } } diff --git a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h index 9f933eb563b..96afc8ab540 100644 --- a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h +++ b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h @@ -36,6 +36,7 @@ #include "src/core/lib/event_engine/thread_pool/thread_pool.h" #include "src/core/lib/event_engine/work_queue/basic_work_queue.h" #include "src/core/lib/event_engine/work_queue/work_queue.h" +#include "src/core/lib/gprpp/notification.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/time.h" @@ -202,7 +203,8 @@ class WorkStealingThreadPool final : public ThreadPool { // Start the lifeguard thread. void Start(); // Block until the lifeguard thread is shut down. - void BlockUntilShutdown(); + // Afterwards, reset the lifeguard state so it can start again cleanly. + void BlockUntilShutdownAndReset(); private: // The main body of the lifeguard thread. @@ -213,10 +215,9 @@ class WorkStealingThreadPool final : public ThreadPool { WorkStealingThreadPoolImpl* pool_; grpc_core::BackOff backoff_; // Used for signaling that the lifeguard thread has stopped running. - grpc_core::Mutex lifeguard_shutdown_mu_; - bool lifeguard_running_ ABSL_GUARDED_BY(lifeguard_shutdown_mu_) = false; - grpc_core::CondVar lifeguard_shutdown_cv_ - ABSL_GUARDED_BY(lifeguard_shutdown_mu_); + std::unique_ptr lifeguard_should_shut_down_; + std::unique_ptr lifeguard_is_shut_down_; + std::atomic lifeguard_running_{false}; }; const size_t reserve_threads_; diff --git a/test/core/event_engine/BUILD b/test/core/event_engine/BUILD index 8131aa5d7a2..e2a3d992626 100644 --- a/test/core/event_engine/BUILD +++ b/test/core/event_engine/BUILD @@ -58,6 +58,7 @@ grpc_cc_test( "gtest", ], deps = [ + "//:gpr", "//:grpc", "//src/core:event_engine_thread_pool", "//src/core:notification", diff --git a/test/core/event_engine/thread_pool_test.cc b/test/core/event_engine/thread_pool_test.cc index 91879b8bf07..8aa86c14a8f 100644 --- a/test/core/event_engine/thread_pool_test.cc +++ b/test/core/event_engine/thread_pool_test.cc @@ -29,6 +29,7 @@ #include "src/core/lib/event_engine/thread_pool/original_thread_pool.h" #include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h" #include "src/core/lib/gprpp/notification.h" +#include "src/core/lib/gprpp/thd.h" #include "test/core/util/test_config.h" namespace grpc_event_engine { @@ -132,6 +133,42 @@ TYPED_TEST(ThreadPoolTest, ForkStressTest) { pool.Quiesce(); } +TYPED_TEST(ThreadPoolTest, StartQuiesceRaceStressTest) { + // Repeatedly race Start and Quiesce against each other to ensure thread + // safety. + constexpr int iter_count = 500; + struct ThdState { + std::unique_ptr pool; + int i; + }; + for (int i = 0; i < iter_count; i++) { + ThdState state{std::make_unique(8), i}; + state.pool->PrepareFork(); + grpc_core::Thread t1( + "t1", + [](void* arg) { + ThdState* state = static_cast(arg); + state->i % 2 == 0 ? state->pool->Quiesce() + : state->pool->PostforkParent(); + }, + &state, nullptr, + grpc_core::Thread::Options().set_tracked(false).set_joinable(true)); + grpc_core::Thread t2( + "t2", + [](void* arg) { + ThdState* state = static_cast(arg); + state->i % 2 == 1 ? state->pool->Quiesce() + : state->pool->PostforkParent(); + }, + &state, nullptr, + grpc_core::Thread::Options().set_tracked(false).set_joinable(true)); + t1.Start(); + t2.Start(); + t1.Join(); + t2.Join(); + } +} + void ScheduleSelf(ThreadPool* p) { p->Run([p] { ScheduleSelf(p); }); }