[EventEngine] Potential fix for non-empty queues at thread pool shutdown (#33219)

Sometimes the queues will not be empty after the WorkStealingThreadPool
is quiesced, leading to an assertion failure. The flake rate is
somewhere between 1/20,000 and 1/50,000 runs depending on the platform,
so this fix is speculative, and we should know if this works within 2
days or so.

Example failure:
https://source.cloud.google.com/results/invocations/625c28ee-b811-46ab-87c6-aa2be0c5f5cd/targets/%2F%2Ftest%2Fcore%2Fend2end:core_end2end_tests@poller%3Dpoll@experiment%3Dwork_stealing;shard=1/log
pull/33227/head
AJ Heller 2 years ago committed by GitHub
parent dbcb09d3d4
commit 0d3ecd3e63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
  2. 3
      src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h

@ -321,6 +321,8 @@ void WorkStealingThreadPool::ThreadState::ThreadBody() {
pool_->queue()->Add(closure);
}
}
} else if (pool_->IsShutdown()) {
FinishDraining();
}
GPR_ASSERT(g_local_queue->Empty());
pool_->theft_registry()->Unenroll(g_local_queue);
@ -395,6 +397,28 @@ bool WorkStealingThreadPool::ThreadState::Step() {
return should_run_again;
}
void WorkStealingThreadPool::ThreadState::FinishDraining() {
// If a fork occurs at any point during shutdown, quit draining. The post-fork
// threads will finish draining the global queue.
while (!pool_->IsForking()) {
if (!g_local_queue->Empty()) {
auto* closure = g_local_queue->PopMostRecent();
if (closure != nullptr) {
closure->Run();
}
continue;
}
if (!pool_->queue()->Empty()) {
auto* closure = pool_->queue()->PopMostRecent();
if (closure != nullptr) {
closure->Run();
}
continue;
}
break;
}
}
// -------- WorkStealingThreadPool::ThreadCount --------
void WorkStealingThreadPool::ThreadCount::Add(CounterType counter_type) {

@ -224,6 +224,9 @@ class WorkStealingThreadPool final : public ThreadPool {
void ThreadBody();
void SleepIfRunning();
bool Step();
// After the pool is shut down, ensure all local and global callbacks are
// executed before quitting the thread.
void FinishDraining();
private:
// pool_ must be the first member so that it is alive when the thread count

Loading…
Cancel
Save