From 7dfca52ed6ff3b2d2d519b6f608ec11a0a358de3 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 26 Aug 2022 15:09:40 -0700 Subject: [PATCH] [fixit] Fix wakeup conditions on TimerManager (#30752) * [fixit] Fix wakeup conditions on timer_manager * explain * clean up loops a little --- .../posix_engine/timer_manager.cc | 62 +++++++++---------- .../event_engine/posix_engine/timer_manager.h | 22 ++++++- 2 files changed, 49 insertions(+), 35 deletions(-) diff --git a/src/core/lib/event_engine/posix_engine/timer_manager.cc b/src/core/lib/event_engine/posix_engine/timer_manager.cc index 65659b3ce17..70d91df681f 100644 --- a/src/core/lib/event_engine/posix_engine/timer_manager.cc +++ b/src/core/lib/event_engine/posix_engine/timer_manager.cc @@ -88,7 +88,7 @@ void TimerManager::RunSomeTimers( // if there's no thread waiting with a timeout, kick an existing untimed // waiter so that the next deadline is not missed if (!has_timed_waiter_) { - cv_.Signal(); + cv_wait_.Signal(); } } } @@ -151,8 +151,8 @@ bool TimerManager::WaitUntil(grpc_core::Timestamp next) { } } - cv_.WaitWithTimeout(&mu_, - absl::Milliseconds((next - host_.Now()).millis())); + cv_wait_.WaitWithTimeout(&mu_, + absl::Milliseconds((next - host_.Now()).millis())); // if this was the timed waiter, then we need to check timers, and flag // that there's now no timed waiter... we'll look for a replacement if @@ -197,13 +197,15 @@ void TimerManager::MainLoop() { void TimerManager::RunThread(void* arg) { std::unique_ptr thread(static_cast(arg)); - thread->self->MainLoop(); - { - grpc_core::MutexLock lock(&thread->self->mu_); - thread->self->thread_count_--; - thread->self->completed_threads_.push_back(std::move(thread->thread)); - } - thread->self->cv_.Signal(); + thread->self->Run(std::move(thread->thread)); +} + +void TimerManager::Run(grpc_core::Thread thread) { + MainLoop(); + grpc_core::MutexLock lock(&mu_); + completed_threads_.push_back(std::move(thread)); + thread_count_--; + if (thread_count_ == 0) cv_threadcount_.Signal(); } TimerManager::TimerManager() : host_(this) { @@ -227,18 +229,14 @@ bool TimerManager::TimerCancel(Timer* timer) { } TimerManager::~TimerManager() { - { - grpc_core::MutexLock lock(&mu_); - shutdown_ = true; - cv_.SignalAll(); - } - while (true) { - ThreadCollector collector; - grpc_core::MutexLock lock(&mu_); - collector.Collect(std::move(completed_threads_)); - if (thread_count_ == 0) break; - cv_.Wait(&mu_); + ThreadCollector collector; + grpc_core::MutexLock lock(&mu_); + shutdown_ = true; + cv_wait_.SignalAll(); + while (thread_count_ > 0) { + cv_threadcount_.Wait(&mu_); } + collector.Collect(std::move(completed_threads_)); } void TimerManager::Host::Kick() { timer_manager_->Kick(); } @@ -249,23 +247,19 @@ void TimerManager::Kick() { timed_waiter_deadline_ = grpc_core::Timestamp::InfFuture(); ++timed_waiter_generation_; kicked_ = true; - cv_.Signal(); + cv_wait_.Signal(); } void TimerManager::PrepareFork() { - { - grpc_core::MutexLock lock(&mu_); - forking_ = true; - prefork_thread_count_ = thread_count_; - cv_.SignalAll(); - } - while (true) { - grpc_core::MutexLock lock(&mu_); - ThreadCollector collector; - collector.Collect(std::move(completed_threads_)); - if (thread_count_ == 0) break; - cv_.Wait(&mu_); + ThreadCollector collector; + grpc_core::MutexLock lock(&mu_); + forking_ = true; + prefork_thread_count_ = thread_count_; + cv_wait_.SignalAll(); + while (thread_count_ > 0) { + cv_threadcount_.Wait(&mu_); } + collector.Collect(std::move(completed_threads_)); } void TimerManager::PostforkParent() { diff --git a/src/core/lib/event_engine/posix_engine/timer_manager.h b/src/core/lib/event_engine/posix_engine/timer_manager.h index e3a429e687d..22048bb8805 100644 --- a/src/core/lib/event_engine/posix_engine/timer_manager.h +++ b/src/core/lib/event_engine/posix_engine/timer_manager.h @@ -80,13 +80,33 @@ class TimerManager final : public grpc_event_engine::experimental::Forkable { void StartThread() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); static void RunThread(void* arg); + void Run(grpc_core::Thread thread); void MainLoop(); void RunSomeTimers(std::vector timers); bool WaitUntil(grpc_core::Timestamp next); void Kick(); grpc_core::Mutex mu_; - grpc_core::CondVar cv_; + // Condvar associated with decrementing the thread count. + // Threads will signal this when thread count reaches zero, and the forking + // code *or* the destructor will wait upon it. + grpc_core::CondVar cv_threadcount_; + // Condvar associated with threads waiting to wakeup and work. + // Threads wait on this until either a timeout is reached or another thread is + // needed to wait for a timeout. + // On shutdown we SignalAll against this to wake up all threads and have them + // finish. + // On kick we Signal against this to wake up at least one thread (but not + // all)! Similarly when we note that no thread is watching timers. + // + // This is a different condvar than cv_threadcount_! + // If this were the same: + // - thread exits would require a SignalAll to ensure that the specific thread + // we want to wake is woken up. + // - kicks would need to signal all threads to avoid having the kick absorbed + // by a shutdown thread and cause a deadlock, leading to thundering herd + // problems in the common case. + grpc_core::CondVar cv_wait_; Host host_; // number of threads in the system size_t thread_count_ ABSL_GUARDED_BY(mu_) = 0;