From 4420d11ee0b07ea20be972f43c3a8bb5588a3c83 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 20 Sep 2022 19:45:25 -0700 Subject: [PATCH] [event-engine] Throttle thread starting in thread pool (#31075) * [event_engine] Thread pool thread start throttling * add test * review feedback * fix --- src/core/lib/event_engine/thread_pool.cc | 24 ++++++++++++++++------ src/core/lib/event_engine/thread_pool.h | 10 ++++++++- test/core/event_engine/thread_pool_test.cc | 15 ++++++++++++++ 3 files changed, 42 insertions(+), 7 deletions(-) diff --git a/src/core/lib/event_engine/thread_pool.cc b/src/core/lib/event_engine/thread_pool.cc index 15bff113112..b1d4257cd3d 100644 --- a/src/core/lib/event_engine/thread_pool.cc +++ b/src/core/lib/event_engine/thread_pool.cc @@ -40,15 +40,24 @@ namespace { GPR_THREAD_LOCAL(bool) g_threadpool_thread; } // namespace -void ThreadPool::StartThread(StatePtr state) { +void ThreadPool::StartThread(StatePtr state, bool throttled) { state->thread_count.Add(); + struct ThreadArg { + StatePtr state; + bool throttled; + }; grpc_core::Thread( "event_engine", [](void* arg) { + std::unique_ptr a(static_cast(arg)); g_threadpool_thread = true; - ThreadFunc(*std::unique_ptr(static_cast(arg))); + if (a->throttled) { + GPR_ASSERT(a->state->currently_starting_one_thread.exchange( + false, std::memory_order_relaxed)); + } + ThreadFunc(a->state); }, - new StatePtr(state), nullptr, + new ThreadArg{state, throttled}, nullptr, grpc_core::Thread::Options().set_tracked(false).set_joinable(false)) .Start(); } @@ -89,7 +98,7 @@ bool ThreadPool::Queue::Step() { ThreadPool::ThreadPool(int reserve_threads) : reserve_threads_(reserve_threads) { for (int i = 0; i < reserve_threads; i++) { - StartThread(state_); + StartThread(state_, /*throttled=*/false); } } @@ -105,7 +114,10 @@ ThreadPool::~ThreadPool() { void ThreadPool::Add(absl::AnyInvocable callback) { if (state_->queue.Add(std::move(callback))) { - StartThread(state_); + if (!state_->currently_starting_one_thread.exchange( + true, std::memory_order_relaxed)) { + StartThread(state_, /*throttled=*/true); + } } } @@ -175,7 +187,7 @@ void ThreadPool::PostforkChild() { Postfork(); } void ThreadPool::Postfork() { state_->queue.Reset(); for (int i = 0; i < reserve_threads_; i++) { - StartThread(state_); + StartThread(state_, /*throttled=*/false); } } diff --git a/src/core/lib/event_engine/thread_pool.h b/src/core/lib/event_engine/thread_pool.h index 41c15a38c5a..f94f7ecffa3 100644 --- a/src/core/lib/event_engine/thread_pool.h +++ b/src/core/lib/event_engine/thread_pool.h @@ -21,6 +21,7 @@ #include +#include #include #include @@ -88,12 +89,19 @@ class ThreadPool final : public grpc_event_engine::experimental::Forkable { explicit State(int reserve_threads) : queue(reserve_threads) {} Queue queue; ThreadCount thread_count; + // After pool creation we use this to rate limit creation of threads to one + // at a time. + std::atomic currently_starting_one_thread{false}; }; using StatePtr = std::shared_ptr; static void ThreadFunc(StatePtr state); - static void StartThread(StatePtr state); + // Start a new thread; throttled indicates whether the State::starting_thread + // variable is being used to throttle this threads creation against others or + // not: at thread pool startup we start several threads concurrently, but + // after that we only start one at a time. + static void StartThread(StatePtr state, bool throttled); void Postfork(); const int reserve_threads_; diff --git a/test/core/event_engine/thread_pool_test.cc b/test/core/event_engine/thread_pool_test.cc index 550892ff9fd..08b490480c3 100644 --- a/test/core/event_engine/thread_pool_test.cc +++ b/test/core/event_engine/thread_pool_test.cc @@ -99,6 +99,21 @@ TEST(ThreadPoolDeathTest, CanDetectStucknessAtFork) { "Waiting for thread pool to idle before forking"); } +void ScheduleTwiceUntilZero(ThreadPool* p, int n) { + if (n == 0) return; + p->Add([p, n] { + ScheduleTwiceUntilZero(p, n - 1); + ScheduleTwiceUntilZero(p, n - 1); + }); +} + +TEST(ThreadPoolTest, CanStartLotsOfClosures) { + ThreadPool p(1); + // Our first thread pool implementation tried to create ~1M threads for this + // test. + ScheduleTwiceUntilZero(&p, 20); +} + } // namespace experimental } // namespace grpc_event_engine