[EventEngine] Eliminate busy loop in the work stealing lifeguard's shutdown (#33386)

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
pull/33805/head
AJ Heller 2 years ago committed by GitHub
parent 19414da96d
commit 112421760a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      src/core/BUILD
  2. 85
      src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
  3. 11
      src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h
  4. 1
      test/core/event_engine/BUILD
  5. 37
      test/core/event_engine/thread_pool_test.cc

@ -1510,6 +1510,7 @@ grpc_cc_library(
"event_engine_work_queue",
"experiments",
"forkable",
"notification",
"time",
"useful",
"//:backoff",

@ -23,7 +23,6 @@
#include <inttypes.h>
#include <atomic>
#include <cmath>
#include <memory>
#include <utility>
@ -39,10 +38,50 @@
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/event_engine/work_queue/basic_work_queue.h"
#include "src/core/lib/event_engine/work_queue/work_queue.h"
#include "src/core/lib/gpr/time_precise.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gprpp/time.h"
// ## Thread Pool Fork-handling
//
// Thread-safety needs special attention with regard to fork() calls. The
// Forkable system employs a pre- and post- fork callback system that does not
// guarantee any ordering of execution. On fork() events, the thread pool does
// the following:
//
// On pre-fork:
// * the WorkStealingThreadPool triggers all threads to exit,
// * all queued work is saved, and
// * all threads will are down, including the Lifeguard thread.
//
// On post-fork:
// * all threads are restarted, including the Lifeguard thread, and
// * all previously-saved work is enqueued for execution.
//
// However, the queue may may get into trouble if one thread is attempting to
// restart the thread pool while another thread is shutting it down. For that
// reason, Quiesce and Start must be thread-safe, and Quiesce must wait for the
// pool to be in a fully started state before it is allowed to continue.
// Consider this potential ordering of events between Start and Quiesce:
//
// ┌──────────┐
// │ Thread 1 │
// └────┬─────┘ ┌──────────┐
// │ │ Thread 2 │
// ▼ └────┬─────┘
// Start() │
// │ ▼
// │ Quiesce()
// │ Wait for worker threads to exit
// │ Wait for the lifeguard thread to exit
// ▼
// Start the Lifeguard thread
// Start the worker threads
//
// Thread 2 will find no worker threads, and it will then want to wait on a
// non-existent Lifeguard thread to finish. Trying a simple
// `lifeguard_thread_.Join()` leads to memory access errors. This implementation
// uses Notifications to coordinate startup and shutdown states.
namespace grpc_event_engine {
namespace experimental {
@ -169,16 +208,13 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Quiesce() {
// Note that if this is a threadpool thread then we won't exit this thread
// until all other threads have exited, so we need to wait for just one thread
// running instead of zero.
gpr_cycle_counter start_time = gpr_get_cycle_counter();
bool is_threadpool_thread = g_local_queue != nullptr;
thread_count()->BlockUntilThreadCount(CounterType::kLivingThreadCount,
is_threadpool_thread ? 1 : 0,
"shutting down", work_signal());
GPR_ASSERT(queue_.Empty());
quiesced_.store(true, std::memory_order_relaxed);
lifeguard_.BlockUntilShutdown();
GRPC_EVENT_ENGINE_TRACE("%ld cycles spent quiescing the pool",
std::lround(gpr_get_cycle_counter() - start_time));
lifeguard_.BlockUntilShutdownAndReset();
}
bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::SetThrottled(
@ -215,7 +251,7 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::PrepareFork() {
SetForking(true);
thread_count()->BlockUntilThreadCount(CounterType::kLivingThreadCount, 0,
"forking", &work_signal_);
lifeguard_.BlockUntilShutdown();
lifeguard_.BlockUntilShutdownAndReset();
}
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Postfork() {
@ -231,13 +267,14 @@ WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Lifeguard(
backoff_(grpc_core::BackOff::Options()
.set_initial_backoff(kLifeguardMinSleepBetweenChecks)
.set_max_backoff(kLifeguardMaxSleepBetweenChecks)
.set_multiplier(1.3)) {}
.set_multiplier(1.3)),
lifeguard_should_shut_down_(std::make_unique<grpc_core::Notification>()),
lifeguard_is_shut_down_(std::make_unique<grpc_core::Notification>()) {}
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Start() {
// lifeguard_running_ is set early to avoid a quiesce race while the
// lifeguard is still starting up.
grpc_core::MutexLock lock(&lifeguard_shutdown_mu_);
lifeguard_running_ = true;
lifeguard_running_.store(true);
grpc_core::Thread(
"lifeguard",
[](void* arg) {
@ -251,7 +288,6 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Start() {
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::
LifeguardMain() {
grpc_core::MutexLock lock(&lifeguard_shutdown_mu_);
while (true) {
if (pool_->IsForking()) break;
// If the pool is shut down, loop quickly until quiesced. Otherwise,
@ -259,29 +295,33 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::
if (pool_->IsShutdown()) {
if (pool_->IsQuiesced()) break;
} else {
lifeguard_shutdown_cv_.WaitWithTimeout(
&lifeguard_shutdown_mu_,
lifeguard_should_shut_down_->WaitForNotificationWithTimeout(
absl::Milliseconds(
(backoff_.NextAttemptTime() - grpc_core::Timestamp::Now())
.millis()));
}
MaybeStartNewThread();
}
lifeguard_running_ = false;
lifeguard_shutdown_cv_.Signal();
lifeguard_running_.store(false, std::memory_order_relaxed);
lifeguard_is_shut_down_->Notify();
}
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::
BlockUntilShutdown() {
grpc_core::MutexLock lock(&lifeguard_shutdown_mu_);
while (lifeguard_running_) {
lifeguard_shutdown_cv_.Signal();
lifeguard_shutdown_cv_.WaitWithTimeout(
&lifeguard_shutdown_mu_, absl::Seconds(kBlockingQuiesceLogRateSeconds));
BlockUntilShutdownAndReset() {
lifeguard_should_shut_down_->Notify();
while (lifeguard_running_.load(std::memory_order_relaxed)) {
GRPC_LOG_EVERY_N_SEC_DELAYED(kBlockingQuiesceLogRateSeconds, GPR_DEBUG,
"%s",
"Waiting for lifeguard thread to shut down");
lifeguard_is_shut_down_->WaitForNotification();
}
// Do an additional wait in case this method races with LifeguardMain's
// shutdown. This should return immediately if the lifeguard is already shut
// down.
lifeguard_is_shut_down_->WaitForNotification();
backoff_.Reset();
lifeguard_should_shut_down_ = std::make_unique<grpc_core::Notification>();
lifeguard_is_shut_down_ = std::make_unique<grpc_core::Notification>();
}
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::
@ -406,6 +446,7 @@ bool WorkStealingThreadPool::ThreadState::Step() {
if (pool_->IsShutdown()) break;
bool timed_out = pool_->work_signal()->WaitWithTimeout(
backoff_.NextAttemptTime() - grpc_core::Timestamp::Now());
if (pool_->IsForking() || pool_->IsShutdown()) break;
// Quit a thread if the pool has more than it requires, and this thread
// has been idle long enough.
if (timed_out &&
@ -472,6 +513,7 @@ void WorkStealingThreadPool::ThreadCount::BlockUntilThreadCount(
CounterType counter_type, size_t desired_threads, const char* why,
WorkSignal* work_signal) {
// Wait for all threads to exit.
work_signal->SignalAll();
while (true) {
auto curr_threads = WaitForCountChange(
counter_type, desired_threads,
@ -482,7 +524,6 @@ void WorkStealingThreadPool::ThreadCount::BlockUntilThreadCount(
"Waiting for thread pool to idle before %s. (%" PRIdPTR " to %" PRIdPTR
")",
why, curr_threads, desired_threads);
work_signal->SignalAll();
}
}

@ -36,6 +36,7 @@
#include "src/core/lib/event_engine/thread_pool/thread_pool.h"
#include "src/core/lib/event_engine/work_queue/basic_work_queue.h"
#include "src/core/lib/event_engine/work_queue/work_queue.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
@ -202,7 +203,8 @@ class WorkStealingThreadPool final : public ThreadPool {
// Start the lifeguard thread.
void Start();
// Block until the lifeguard thread is shut down.
void BlockUntilShutdown();
// Afterwards, reset the lifeguard state so it can start again cleanly.
void BlockUntilShutdownAndReset();
private:
// The main body of the lifeguard thread.
@ -213,10 +215,9 @@ class WorkStealingThreadPool final : public ThreadPool {
WorkStealingThreadPoolImpl* pool_;
grpc_core::BackOff backoff_;
// Used for signaling that the lifeguard thread has stopped running.
grpc_core::Mutex lifeguard_shutdown_mu_;
bool lifeguard_running_ ABSL_GUARDED_BY(lifeguard_shutdown_mu_) = false;
grpc_core::CondVar lifeguard_shutdown_cv_
ABSL_GUARDED_BY(lifeguard_shutdown_mu_);
std::unique_ptr<grpc_core::Notification> lifeguard_should_shut_down_;
std::unique_ptr<grpc_core::Notification> lifeguard_is_shut_down_;
std::atomic<bool> lifeguard_running_{false};
};
const size_t reserve_threads_;

@ -58,6 +58,7 @@ grpc_cc_test(
"gtest",
],
deps = [
"//:gpr",
"//:grpc",
"//src/core:event_engine_thread_pool",
"//src/core:notification",

@ -29,6 +29,7 @@
#include "src/core/lib/event_engine/thread_pool/original_thread_pool.h"
#include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/thd.h"
#include "test/core/util/test_config.h"
namespace grpc_event_engine {
@ -132,6 +133,42 @@ TYPED_TEST(ThreadPoolTest, ForkStressTest) {
pool.Quiesce();
}
TYPED_TEST(ThreadPoolTest, StartQuiesceRaceStressTest) {
// Repeatedly race Start and Quiesce against each other to ensure thread
// safety.
constexpr int iter_count = 500;
struct ThdState {
std::unique_ptr<TypeParam> pool;
int i;
};
for (int i = 0; i < iter_count; i++) {
ThdState state{std::make_unique<TypeParam>(8), i};
state.pool->PrepareFork();
grpc_core::Thread t1(
"t1",
[](void* arg) {
ThdState* state = static_cast<ThdState*>(arg);
state->i % 2 == 0 ? state->pool->Quiesce()
: state->pool->PostforkParent();
},
&state, nullptr,
grpc_core::Thread::Options().set_tracked(false).set_joinable(true));
grpc_core::Thread t2(
"t2",
[](void* arg) {
ThdState* state = static_cast<ThdState*>(arg);
state->i % 2 == 1 ? state->pool->Quiesce()
: state->pool->PostforkParent();
},
&state, nullptr,
grpc_core::Thread::Options().set_tracked(false).set_joinable(true));
t1.Start();
t2.Start();
t1.Join();
t2.Join();
}
}
void ScheduleSelf(ThreadPool* p) {
p->Run([p] { ScheduleSelf(p); });
}

Loading…
Cancel
Save