[fixit] Fix wakeup conditions on TimerManager (#30752)

* [fixit] Fix wakeup conditions on timer_manager

* explain

* clean up loops a little
pull/30767/head
Craig Tiller 2 years ago committed by GitHub
parent 6b365de65d
commit 7dfca52ed6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 62
      src/core/lib/event_engine/posix_engine/timer_manager.cc
  2. 22
      src/core/lib/event_engine/posix_engine/timer_manager.h

@ -88,7 +88,7 @@ void TimerManager::RunSomeTimers(
// if there's no thread waiting with a timeout, kick an existing untimed
// waiter so that the next deadline is not missed
if (!has_timed_waiter_) {
cv_.Signal();
cv_wait_.Signal();
}
}
}
@ -151,8 +151,8 @@ bool TimerManager::WaitUntil(grpc_core::Timestamp next) {
}
}
cv_.WaitWithTimeout(&mu_,
absl::Milliseconds((next - host_.Now()).millis()));
cv_wait_.WaitWithTimeout(&mu_,
absl::Milliseconds((next - host_.Now()).millis()));
// if this was the timed waiter, then we need to check timers, and flag
// that there's now no timed waiter... we'll look for a replacement if
@ -197,13 +197,15 @@ void TimerManager::MainLoop() {
void TimerManager::RunThread(void* arg) {
std::unique_ptr<RunThreadArgs> thread(static_cast<RunThreadArgs*>(arg));
thread->self->MainLoop();
{
grpc_core::MutexLock lock(&thread->self->mu_);
thread->self->thread_count_--;
thread->self->completed_threads_.push_back(std::move(thread->thread));
}
thread->self->cv_.Signal();
thread->self->Run(std::move(thread->thread));
}
void TimerManager::Run(grpc_core::Thread thread) {
MainLoop();
grpc_core::MutexLock lock(&mu_);
completed_threads_.push_back(std::move(thread));
thread_count_--;
if (thread_count_ == 0) cv_threadcount_.Signal();
}
TimerManager::TimerManager() : host_(this) {
@ -227,18 +229,14 @@ bool TimerManager::TimerCancel(Timer* timer) {
}
TimerManager::~TimerManager() {
{
grpc_core::MutexLock lock(&mu_);
shutdown_ = true;
cv_.SignalAll();
}
while (true) {
ThreadCollector collector;
grpc_core::MutexLock lock(&mu_);
collector.Collect(std::move(completed_threads_));
if (thread_count_ == 0) break;
cv_.Wait(&mu_);
ThreadCollector collector;
grpc_core::MutexLock lock(&mu_);
shutdown_ = true;
cv_wait_.SignalAll();
while (thread_count_ > 0) {
cv_threadcount_.Wait(&mu_);
}
collector.Collect(std::move(completed_threads_));
}
void TimerManager::Host::Kick() { timer_manager_->Kick(); }
@ -249,23 +247,19 @@ void TimerManager::Kick() {
timed_waiter_deadline_ = grpc_core::Timestamp::InfFuture();
++timed_waiter_generation_;
kicked_ = true;
cv_.Signal();
cv_wait_.Signal();
}
void TimerManager::PrepareFork() {
{
grpc_core::MutexLock lock(&mu_);
forking_ = true;
prefork_thread_count_ = thread_count_;
cv_.SignalAll();
}
while (true) {
grpc_core::MutexLock lock(&mu_);
ThreadCollector collector;
collector.Collect(std::move(completed_threads_));
if (thread_count_ == 0) break;
cv_.Wait(&mu_);
ThreadCollector collector;
grpc_core::MutexLock lock(&mu_);
forking_ = true;
prefork_thread_count_ = thread_count_;
cv_wait_.SignalAll();
while (thread_count_ > 0) {
cv_threadcount_.Wait(&mu_);
}
collector.Collect(std::move(completed_threads_));
}
void TimerManager::PostforkParent() {

@ -80,13 +80,33 @@ class TimerManager final : public grpc_event_engine::experimental::Forkable {
void StartThread() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
static void RunThread(void* arg);
void Run(grpc_core::Thread thread);
void MainLoop();
void RunSomeTimers(std::vector<experimental::EventEngine::Closure*> timers);
bool WaitUntil(grpc_core::Timestamp next);
void Kick();
grpc_core::Mutex mu_;
grpc_core::CondVar cv_;
// Condvar associated with decrementing the thread count.
// Threads will signal this when thread count reaches zero, and the forking
// code *or* the destructor will wait upon it.
grpc_core::CondVar cv_threadcount_;
// Condvar associated with threads waiting to wakeup and work.
// Threads wait on this until either a timeout is reached or another thread is
// needed to wait for a timeout.
// On shutdown we SignalAll against this to wake up all threads and have them
// finish.
// On kick we Signal against this to wake up at least one thread (but not
// all)! Similarly when we note that no thread is watching timers.
//
// This is a different condvar than cv_threadcount_!
// If this were the same:
// - thread exits would require a SignalAll to ensure that the specific thread
// we want to wake is woken up.
// - kicks would need to signal all threads to avoid having the kick absorbed
// by a shutdown thread and cause a deadlock, leading to thundering herd
// problems in the common case.
grpc_core::CondVar cv_wait_;
Host host_;
// number of threads in the system
size_t thread_count_ ABSL_GUARDED_BY(mu_) = 0;

Loading…
Cancel
Save