[event-engine] Throttle thread starting in thread pool (#31075)

* [event_engine] Thread pool thread start throttling

* add test

* review feedback

* fix
pull/31021/head^2
Craig Tiller 2 years ago committed by GitHub
parent da08fe1d6f
commit 4420d11ee0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      src/core/lib/event_engine/thread_pool.cc
  2. 10
      src/core/lib/event_engine/thread_pool.h
  3. 15
      test/core/event_engine/thread_pool_test.cc

@ -40,15 +40,24 @@ namespace {
GPR_THREAD_LOCAL(bool) g_threadpool_thread;
} // namespace
void ThreadPool::StartThread(StatePtr state) {
void ThreadPool::StartThread(StatePtr state, bool throttled) {
state->thread_count.Add();
struct ThreadArg {
StatePtr state;
bool throttled;
};
grpc_core::Thread(
"event_engine",
[](void* arg) {
std::unique_ptr<ThreadArg> a(static_cast<ThreadArg*>(arg));
g_threadpool_thread = true;
ThreadFunc(*std::unique_ptr<StatePtr>(static_cast<StatePtr*>(arg)));
if (a->throttled) {
GPR_ASSERT(a->state->currently_starting_one_thread.exchange(
false, std::memory_order_relaxed));
}
ThreadFunc(a->state);
},
new StatePtr(state), nullptr,
new ThreadArg{state, throttled}, nullptr,
grpc_core::Thread::Options().set_tracked(false).set_joinable(false))
.Start();
}
@ -89,7 +98,7 @@ bool ThreadPool::Queue::Step() {
ThreadPool::ThreadPool(int reserve_threads)
: reserve_threads_(reserve_threads) {
for (int i = 0; i < reserve_threads; i++) {
StartThread(state_);
StartThread(state_, /*throttled=*/false);
}
}
@ -105,7 +114,10 @@ ThreadPool::~ThreadPool() {
void ThreadPool::Add(absl::AnyInvocable<void()> callback) {
if (state_->queue.Add(std::move(callback))) {
StartThread(state_);
if (!state_->currently_starting_one_thread.exchange(
true, std::memory_order_relaxed)) {
StartThread(state_, /*throttled=*/true);
}
}
}
@ -175,7 +187,7 @@ void ThreadPool::PostforkChild() { Postfork(); }
void ThreadPool::Postfork() {
state_->queue.Reset();
for (int i = 0; i < reserve_threads_; i++) {
StartThread(state_);
StartThread(state_, /*throttled=*/false);
}
}

@ -21,6 +21,7 @@
#include <grpc/support/port_platform.h>
#include <atomic>
#include <memory>
#include <queue>
@ -88,12 +89,19 @@ class ThreadPool final : public grpc_event_engine::experimental::Forkable {
explicit State(int reserve_threads) : queue(reserve_threads) {}
Queue queue;
ThreadCount thread_count;
// After pool creation we use this to rate limit creation of threads to one
// at a time.
std::atomic<bool> currently_starting_one_thread{false};
};
using StatePtr = std::shared_ptr<State>;
static void ThreadFunc(StatePtr state);
static void StartThread(StatePtr state);
// Start a new thread; throttled indicates whether the State::starting_thread
// variable is being used to throttle this threads creation against others or
// not: at thread pool startup we start several threads concurrently, but
// after that we only start one at a time.
static void StartThread(StatePtr state, bool throttled);
void Postfork();
const int reserve_threads_;

@ -99,6 +99,21 @@ TEST(ThreadPoolDeathTest, CanDetectStucknessAtFork) {
"Waiting for thread pool to idle before forking");
}
void ScheduleTwiceUntilZero(ThreadPool* p, int n) {
if (n == 0) return;
p->Add([p, n] {
ScheduleTwiceUntilZero(p, n - 1);
ScheduleTwiceUntilZero(p, n - 1);
});
}
TEST(ThreadPoolTest, CanStartLotsOfClosures) {
ThreadPool p(1);
// Our first thread pool implementation tried to create ~1M threads for this
// test.
ScheduleTwiceUntilZero(&p, 20);
}
} // namespace experimental
} // namespace grpc_event_engine

Loading…
Cancel
Save