[EventEngine] Rely on RAII for lifeguard (#36555)

This makes lifeguard easier to manage and removes one case of hang in fork test.

Closes #36555

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36555 from eugeneo:lifeguard-lifetime d6f245cf69
PiperOrigin-RevId: 631917491
pull/36342/head
Eugene Ostroukhov 9 months ago committed by Copybara-Service
parent abda964f52
commit 28f2315c9f
  1. 20
      src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
  2. 9
      src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h

@ -227,18 +227,19 @@ void WorkStealingThreadPool::PostforkChild() { pool_->Postfork(); }
WorkStealingThreadPool::WorkStealingThreadPoolImpl::WorkStealingThreadPoolImpl( WorkStealingThreadPool::WorkStealingThreadPoolImpl::WorkStealingThreadPoolImpl(
size_t reserve_threads) size_t reserve_threads)
: reserve_threads_(reserve_threads), queue_(this), lifeguard_(this) {} : reserve_threads_(reserve_threads), queue_(this) {}
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Start() { void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Start() {
for (size_t i = 0; i < reserve_threads_; i++) { for (size_t i = 0; i < reserve_threads_; i++) {
StartThread(); StartThread();
} }
lifeguard_.Start(); grpc_core::MutexLock lock(&lifeguard_ptr_mu_);
lifeguard_ = std::make_unique<Lifeguard>(this);
} }
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Run( void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Run(
EventEngine::Closure* closure) { EventEngine::Closure* closure) {
DCHECK(quiesced_.load(std::memory_order_relaxed) == false); CHECK(!IsQuiesced());
if (g_local_queue != nullptr && g_local_queue->owner() == this) { if (g_local_queue != nullptr && g_local_queue->owner() == this) {
g_local_queue->Add(closure); g_local_queue->Add(closure);
} else { } else {
@ -283,7 +284,8 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Quiesce() {
} }
CHECK(queue_.Empty()); CHECK(queue_.Empty());
quiesced_.store(true, std::memory_order_relaxed); quiesced_.store(true, std::memory_order_relaxed);
lifeguard_.BlockUntilShutdownAndReset(); grpc_core::MutexLock lock(&lifeguard_ptr_mu_);
lifeguard_.reset();
} }
bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::SetThrottled( bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::SetThrottled(
@ -325,7 +327,8 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::PrepareFork() {
if (!threads_were_shut_down.ok() && g_log_verbose_failures) { if (!threads_were_shut_down.ok() && g_log_verbose_failures) {
DumpStacksAndCrash(); DumpStacksAndCrash();
} }
lifeguard_.BlockUntilShutdownAndReset(); grpc_core::MutexLock lock(&lifeguard_ptr_mu_);
lifeguard_.reset();
} }
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Postfork() { void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Postfork() {
@ -374,9 +377,7 @@ WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Lifeguard(
.set_max_backoff(kLifeguardMaxSleepBetweenChecks) .set_max_backoff(kLifeguardMaxSleepBetweenChecks)
.set_multiplier(1.3)), .set_multiplier(1.3)),
lifeguard_should_shut_down_(std::make_unique<grpc_core::Notification>()), lifeguard_should_shut_down_(std::make_unique<grpc_core::Notification>()),
lifeguard_is_shut_down_(std::make_unique<grpc_core::Notification>()) {} lifeguard_is_shut_down_(std::make_unique<grpc_core::Notification>()) {
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Start() {
// lifeguard_running_ is set early to avoid a quiesce race while the // lifeguard_running_ is set early to avoid a quiesce race while the
// lifeguard is still starting up. // lifeguard is still starting up.
lifeguard_running_.store(true); lifeguard_running_.store(true);
@ -411,8 +412,7 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::
lifeguard_is_shut_down_->Notify(); lifeguard_is_shut_down_->Notify();
} }
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard:: WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::~Lifeguard() {
BlockUntilShutdownAndReset() {
lifeguard_should_shut_down_->Notify(); lifeguard_should_shut_down_->Notify();
while (lifeguard_running_.load(std::memory_order_relaxed)) { while (lifeguard_running_.load(std::memory_order_relaxed)) {
GRPC_LOG_EVERY_N_SEC_DELAYED(kBlockingQuiesceLogRateSeconds, GPR_DEBUG, GRPC_LOG_EVERY_N_SEC_DELAYED(kBlockingQuiesceLogRateSeconds, GPR_DEBUG,

@ -155,11 +155,7 @@ class WorkStealingThreadPool final : public ThreadPool {
class Lifeguard { class Lifeguard {
public: public:
explicit Lifeguard(WorkStealingThreadPoolImpl* pool); explicit Lifeguard(WorkStealingThreadPoolImpl* pool);
// Start the lifeguard thread. ~Lifeguard();
void Start();
// Block until the lifeguard thread is shut down.
// Afterwards, reset the lifeguard state so it can start again cleanly.
void BlockUntilShutdownAndReset();
private: private:
// The main body of the lifeguard thread. // The main body of the lifeguard thread.
@ -194,7 +190,8 @@ class WorkStealingThreadPool final : public ThreadPool {
// at a time. // at a time.
std::atomic<bool> throttled_{false}; std::atomic<bool> throttled_{false};
WorkSignal work_signal_; WorkSignal work_signal_;
Lifeguard lifeguard_; grpc_core::Mutex lifeguard_ptr_mu_;
std::unique_ptr<Lifeguard> lifeguard_ ABSL_GUARDED_BY(lifeguard_ptr_mu_);
// Set of threads for verbose failure debugging // Set of threads for verbose failure debugging
grpc_core::Mutex thd_set_mu_; grpc_core::Mutex thd_set_mu_;
absl::flat_hash_set<gpr_thd_id> thds_ ABSL_GUARDED_BY(thd_set_mu_); absl::flat_hash_set<gpr_thd_id> thds_ ABSL_GUARDED_BY(thd_set_mu_);

Loading…
Cancel
Save