From 0d3ecd3e63cf3520d0d65b1f797cf54c1e97ac03 Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Tue, 23 May 2023 13:22:14 -0700 Subject: [PATCH] [EventEngine] Potential fix for non-empty queues at thread pool shutdown (#33219) Sometimes the queues will not be empty after the WorkStealingThreadPool is quiesced, leading to an assertion failure. The flake rate is somewhere between 1/20,000 and 1/50,000 runs depending on the platform, so this fix is speculative, and we should know if this works within 2 days or so. Example failure: https://source.cloud.google.com/results/invocations/625c28ee-b811-46ab-87c6-aa2be0c5f5cd/targets/%2F%2Ftest%2Fcore%2Fend2end:core_end2end_tests@poller%3Dpoll@experiment%3Dwork_stealing;shard=1/log --- .../thread_pool/work_stealing_thread_pool.cc | 24 +++++++++++++++++++ .../thread_pool/work_stealing_thread_pool.h | 3 +++ 2 files changed, 27 insertions(+) diff --git a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc index 55e8ec7bcf6..fd60eec3ce7 100644 --- a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc +++ b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc @@ -321,6 +321,8 @@ void WorkStealingThreadPool::ThreadState::ThreadBody() { pool_->queue()->Add(closure); } } + } else if (pool_->IsShutdown()) { + FinishDraining(); } GPR_ASSERT(g_local_queue->Empty()); pool_->theft_registry()->Unenroll(g_local_queue); @@ -395,6 +397,28 @@ bool WorkStealingThreadPool::ThreadState::Step() { return should_run_again; } +void WorkStealingThreadPool::ThreadState::FinishDraining() { + // If a fork occurs at any point during shutdown, quit draining. The post-fork + // threads will finish draining the global queue. + while (!pool_->IsForking()) { + if (!g_local_queue->Empty()) { + auto* closure = g_local_queue->PopMostRecent(); + if (closure != nullptr) { + closure->Run(); + } + continue; + } + if (!pool_->queue()->Empty()) { + auto* closure = pool_->queue()->PopMostRecent(); + if (closure != nullptr) { + closure->Run(); + } + continue; + } + break; + } +} + // -------- WorkStealingThreadPool::ThreadCount -------- void WorkStealingThreadPool::ThreadCount::Add(CounterType counter_type) { diff --git a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h index 14c366e4d1a..f5cbba8debd 100644 --- a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h +++ b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h @@ -224,6 +224,9 @@ class WorkStealingThreadPool final : public ThreadPool { void ThreadBody(); void SleepIfRunning(); bool Step(); + // After the pool is shut down, ensure all local and global callbacks are + // executed before quitting the thread. + void FinishDraining(); private: // pool_ must be the first member so that it is alive when the thread count