[EventEngine] Refactor ThreadManager to leverage a shared ThreadPool (#31392)

* Refactor ThreadManager to leverage a shared ThreadPool

This is a step towards a work-stealing thread pool implementation:
unifying thread pools so that work stealing affects timer callbacks as
well.

A subsequent step will expose the timeout logic so that the timer wakeup
and check can be triggered externally (by pollers, in the common case).

* fix atomic uint64_t type missing on some platforms

* sanitize + platform fixes

* ->quiesce

* shut down the timer manager to release the main thread

* roll back atomics

* use a dedicated thread for timer_manager to prevent local execution (work stealing)

* drain the pools after timer manager tests; sanitize

* iwyu

* reintroduce fork handling

* sanitize

* fix
pull/31420/head
AJ Heller 2 years ago committed by GitHub
parent 987b50a258
commit 66df2c646a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 6
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  3. 4
      src/core/lib/event_engine/posix_engine/posix_engine.h
  4. 217
      src/core/lib/event_engine/posix_engine/timer_manager.cc
  5. 68
      src/core/lib/event_engine/posix_engine/timer_manager.h
  6. 12
      src/core/lib/event_engine/windows/windows_engine.cc
  7. 4
      src/core/lib/event_engine/windows/windows_engine.h
  8. 9
      test/core/event_engine/posix/timer_manager_test.cc

@ -2515,9 +2515,11 @@ grpc_cc_library(
],
deps = [
"event_engine_base_hdrs",
"event_engine_thread_pool",
"forkable",
"gpr",
"grpc_trace",
"notification",
"posix_event_engine_timer",
"time",
],

@ -17,6 +17,7 @@
#include <atomic>
#include <chrono>
#include <memory>
#include <string>
#include <utility>
@ -91,12 +92,12 @@ PosixEnginePollerManager::~PosixEnginePollerManager() {
}
PosixEventEngine::PosixEventEngine(PosixEventPoller* poller)
: executor_(std::make_shared<ThreadPool>()) {
: executor_(std::make_shared<ThreadPool>()), timer_manager_(executor_) {
poller_manager_ = std::make_shared<PosixEnginePollerManager>(poller);
}
PosixEventEngine::PosixEventEngine()
: executor_(std::make_shared<ThreadPool>()) {
: executor_(std::make_shared<ThreadPool>()), timer_manager_(executor_) {
if (grpc_core::IsPosixEventEngineEnablePollingEnabled()) {
poller_manager_ = std::make_shared<PosixEnginePollerManager>(executor_);
if (poller_manager_->Poller() != nullptr) {
@ -174,6 +175,7 @@ PosixEventEngine::~PosixEventEngine() {
}
GPR_ASSERT(GPR_LIKELY(known_handles_.empty()));
}
timer_manager_.Shutdown();
#ifdef GRPC_POSIX_SOCKET_TCP
if (poller_manager_ != nullptr) {
poller_manager_->TriggerShutdown();

@ -106,7 +106,7 @@ class PosixEventEngine final : public EventEngine,
grpc_event_engine::posix_engine::PosixEventPoller* poller);
PosixEventEngine();
#else // GRPC_POSIX_SOCKET_TCP
PosixEventEngine() = default;
PosixEventEngine();
#endif // GRPC_POSIX_SOCKET_TCP
~PosixEventEngine() override;
@ -160,8 +160,8 @@ class PosixEventEngine final : public EventEngine,
grpc_core::Mutex mu_;
TaskHandleSet known_handles_ ABSL_GUARDED_BY(mu_);
std::atomic<intptr_t> aba_token_{0};
posix_engine::TimerManager timer_manager_;
std::shared_ptr<ThreadPool> executor_;
posix_engine::TimerManager timer_manager_;
#ifdef GRPC_POSIX_SOCKET_TCP
std::shared_ptr<PosixEnginePollerManager> poller_manager_;
#endif // GRPC_POSIX_SOCKET_TCP

@ -40,47 +40,10 @@ namespace posix_engine {
grpc_core::DebugOnlyTraceFlag grpc_event_engine_timer_trace(false, "timer");
void TimerManager::StartThread() {
++waiter_count_;
++thread_count_;
auto* thread = new RunThreadArgs();
thread->self = this;
thread->thread = grpc_core::Thread(
"timer_manager", &TimerManager::RunThread, thread, nullptr,
grpc_core::Thread::Options().set_tracked(false).set_joinable(false));
thread->thread.Start();
}
void TimerManager::RunSomeTimers(
std::vector<experimental::EventEngine::Closure*> timers) {
// if there's something to execute...
{
grpc_core::MutexLock lock(&mu_);
if (shutdown_ || forking_) return;
// remove a waiter from the pool, and start another thread if necessary
--waiter_count_;
if (waiter_count_ == 0) {
// The number of timer threads is always increasing until all the threads
// are stopped, with the exception that all threads are shut down on fork
// events. In rare cases, if a large number of timers fire simultaneously,
// we may end up using a large number of threads.
// TODO(ctiller): We could avoid this by exiting threads in WaitUntil().
StartThread();
} else {
// if there's no thread waiting with a timeout, kick an existing untimed
// waiter so that the next deadline is not missed
if (!has_timed_waiter_) {
cv_wait_.Signal();
}
}
}
for (auto* timer : timers) {
timer->Run();
}
{
grpc_core::MutexLock lock(&mu_);
// get ready to wait again
++waiter_count_;
thread_pool_->Run(timer);
}
}
@ -89,64 +52,18 @@ void TimerManager::RunSomeTimers(
// shutdown)
bool TimerManager::WaitUntil(grpc_core::Timestamp next) {
grpc_core::MutexLock lock(&mu_);
if (shutdown_) return false;
if (forking_) return false;
// TODO(ctiller): if there are too many waiting threads, this would be a good
// place to exit the current thread.
// If kicked_ is true at this point, it means there was a kick from the timer
// system that the timer-manager threads here missed. We cannot trust 'next'
// here any longer (since there might be an earlier deadline). So if kicked_
// is true at this point, we should quickly exit this and get the next
// deadline from the timer system
if (!kicked_) {
// if there's no timed waiter, we should become one: that waiter waits
// only until the next timer should expire. All other timers wait forever
//
// 'timed_waiter_generation_' is a global generation counter. The idea here
// is that the thread becoming a timed-waiter increments and stores this
// global counter locally in 'my_timed_waiter_generation' before going to
// sleep. After waking up, if my_timed_waiter_generation ==
// timed_waiter_generation_, it can be sure that it was the timed_waiter
// thread (and that no other thread took over while this was asleep)
//
// Initialize my_timed_waiter_generation to some value that is NOT equal to
// timed_waiter_generation_
uint64_t my_timed_waiter_generation = timed_waiter_generation_ - 1;
/* If there's no timed waiter, we should become one: that waiter waits only
until the next timer should expire. All other timer threads wait forever
unless their 'next' is earlier than the current timed-waiter's deadline
(in which case the thread with earlier 'next' takes over as the new timed
waiter) */
if (next != grpc_core::Timestamp::InfFuture()) {
if (!has_timed_waiter_ || (next < timed_waiter_deadline_)) {
my_timed_waiter_generation = ++timed_waiter_generation_;
has_timed_waiter_ = true;
timed_waiter_deadline_ = next;
} else { // timed_waiter_ == true && next >= timed_waiter_deadline_
next = grpc_core::Timestamp::InfFuture();
}
}
cv_wait_.WaitWithTimeout(&mu_,
absl::Milliseconds((next - host_.Now()).millis()));
// if this was the timed waiter, then we need to check timers, and flag
// that there's now no timed waiter... we'll look for a replacement if
// there's work to do after checking timers (code above)
if (my_timed_waiter_generation == timed_waiter_generation_) {
++wakeups_;
has_timed_waiter_ = false;
timed_waiter_deadline_ = grpc_core::Timestamp::InfFuture();
}
++wakeups_;
}
kicked_ = false;
return true;
}
@ -155,54 +72,37 @@ void TimerManager::MainLoop() {
grpc_core::Timestamp next = grpc_core::Timestamp::InfFuture();
absl::optional<std::vector<experimental::EventEngine::Closure*>>
check_result = timer_list_->TimerCheck(&next);
if (check_result.has_value()) {
if (!check_result->empty()) {
RunSomeTimers(std::move(*check_result));
continue;
}
} else {
/* This case only happens under contention, meaning more than one timer
manager thread checked timers concurrently.
If that happens, we're guaranteed that some other thread has just
checked timers, and this will avalanche into some other thread seeing
empty timers and doing a timed sleep.
Consequently, we can just sleep forever here and be happy at some
saved wakeup cycles. */
next = grpc_core::Timestamp::InfFuture();
GPR_ASSERT(check_result.has_value() &&
"ERROR: More than one MainLoop is running.");
if (!check_result->empty()) {
RunSomeTimers(std::move(*check_result));
continue;
}
if (!WaitUntil(next)) return;
if (!WaitUntil(next)) break;
}
main_loop_exit_signal_->Notify();
}
void TimerManager::RunThread(void* arg) {
g_timer_thread = true;
std::unique_ptr<RunThreadArgs> thread(static_cast<RunThreadArgs*>(arg));
if (grpc_event_engine_timer_trace.enabled()) {
gpr_log(GPR_DEBUG, "TimerManager::%p starting thread::%p", thread->self,
&thread->thread);
}
thread->self->Run();
if (grpc_event_engine_timer_trace.enabled()) {
gpr_log(GPR_DEBUG, "TimerManager::%p thread::%p finished", thread->self,
&thread->thread);
}
}
bool TimerManager::IsTimerManagerThread() { return g_timer_thread; }
void TimerManager::Run() {
MainLoop();
grpc_core::MutexLock lock(&mu_);
thread_count_--;
if (thread_count_ == 0) cv_threadcount_.Signal();
void TimerManager::StartMainLoopThread() {
main_thread_ = grpc_core::Thread(
"timer_manager",
[](void* arg) {
auto self = static_cast<TimerManager*>(arg);
self->MainLoop();
},
this, nullptr,
grpc_core::Thread::Options().set_tracked(false).set_joinable(false));
main_thread_.Start();
}
bool TimerManager::IsTimerManagerThread() { return g_timer_thread; }
TimerManager::TimerManager() : host_(this) {
TimerManager::TimerManager(
std::shared_ptr<grpc_event_engine::experimental::ThreadPool> thread_pool)
: host_(this), thread_pool_(std::move(thread_pool)) {
timer_list_ = std::make_unique<TimerList>(&host_);
grpc_core::MutexLock lock(&mu_);
StartThread();
main_loop_exit_signal_.emplace();
StartMainLoopThread();
}
grpc_core::Timestamp TimerManager::Host::Now() {
@ -212,6 +112,15 @@ grpc_core::Timestamp TimerManager::Host::Now() {
void TimerManager::TimerInit(Timer* timer, grpc_core::Timestamp deadline,
experimental::EventEngine::Closure* closure) {
if (grpc_event_engine_timer_trace.enabled()) {
grpc_core::MutexLock lock(&mu_);
if (shutdown_) {
gpr_log(GPR_ERROR,
"WARNING: TimerManager::%p: scheduling Closure::%p after "
"TimerManager has been shut down.",
this, closure);
}
}
timer_list_->TimerInit(timer, deadline, closure);
}
@ -219,63 +128,47 @@ bool TimerManager::TimerCancel(Timer* timer) {
return timer_list_->TimerCancel(timer);
}
TimerManager::~TimerManager() {
if (grpc_event_engine_timer_trace.enabled()) {
gpr_log(GPR_DEBUG, "TimerManager::%p shutting down", this);
}
grpc_core::MutexLock lock(&mu_);
shutdown_ = true;
cv_wait_.SignalAll();
while (thread_count_ > 0) {
void TimerManager::Shutdown() {
{
grpc_core::MutexLock lock(&mu_);
if (shutdown_) return;
if (grpc_event_engine_timer_trace.enabled()) {
gpr_log(GPR_DEBUG, "TimerManager::%p waiting for %zu threads to finish",
this, thread_count_);
gpr_log(GPR_DEBUG, "TimerManager::%p shutting down", this);
}
cv_threadcount_.Wait(&mu_);
shutdown_ = true;
// Wait on the main loop to exit.
cv_wait_.Signal();
}
main_loop_exit_signal_->WaitForNotification();
if (grpc_event_engine_timer_trace.enabled()) {
gpr_log(GPR_DEBUG, "TimerManager::%p shutdown complete", this);
}
}
TimerManager::~TimerManager() { Shutdown(); }
void TimerManager::Host::Kick() { timer_manager_->Kick(); }
void TimerManager::Kick() {
grpc_core::MutexLock lock(&mu_);
has_timed_waiter_ = false;
timed_waiter_deadline_ = grpc_core::Timestamp::InfFuture();
++timed_waiter_generation_;
kicked_ = true;
cv_wait_.Signal();
}
void TimerManager::PrepareFork() {
void TimerManager::RestartPostFork() {
grpc_core::MutexLock lock(&mu_);
forking_ = true;
prefork_thread_count_ = thread_count_;
cv_wait_.SignalAll();
while (thread_count_ > 0) {
cv_threadcount_.Wait(&mu_);
}
}
void TimerManager::PostforkParent() {
grpc_core::MutexLock lock(&mu_);
for (int i = 0; i < prefork_thread_count_; i++) {
StartThread();
GPR_ASSERT(GPR_LIKELY(shutdown_));
if (grpc_event_engine_timer_trace.enabled()) {
gpr_log(GPR_DEBUG, "TimerManager::%p restarting after shutdown", this);
}
prefork_thread_count_ = 0;
forking_ = false;
shutdown_ = false;
main_loop_exit_signal_.emplace();
StartMainLoopThread();
}
void TimerManager::PostforkChild() {
grpc_core::MutexLock lock(&mu_);
for (int i = 0; i < prefork_thread_count_; i++) {
StartThread();
}
prefork_thread_count_ = 0;
forking_ = false;
}
void TimerManager::PrepareFork() { Shutdown(); }
void TimerManager::PostforkParent() { RestartPostFork(); }
void TimerManager::PostforkChild() { RestartPostFork(); }
} // namespace posix_engine
} // namespace grpc_event_engine

@ -21,18 +21,20 @@
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <stdint.h>
#include <memory>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/event_engine/posix_engine/timer.h"
#include "src/core/lib/event_engine/thread_pool.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gprpp/time.h"
@ -46,7 +48,8 @@ namespace posix_engine {
// thread_pool.{h,cc}.
class TimerManager final : public grpc_event_engine::experimental::Forkable {
public:
TimerManager();
explicit TimerManager(
std::shared_ptr<grpc_event_engine::experimental::ThreadPool> thread_pool);
~TimerManager() override;
grpc_core::Timestamp Now() { return host_.Now(); }
@ -55,19 +58,16 @@ class TimerManager final : public grpc_event_engine::experimental::Forkable {
experimental::EventEngine::Closure* closure);
bool TimerCancel(Timer* timer);
// Forkable
static bool IsTimerManagerThread();
// Called on destruction, prefork, and manually when needed.
void Shutdown();
void PrepareFork() override;
void PostforkParent() override;
void PostforkChild() override;
static bool IsTimerManagerThread();
private:
struct RunThreadArgs {
TimerManager* self;
grpc_core::Thread thread;
};
class Host final : public TimerListHost {
public:
explicit Host(TimerManager* timer_manager)
@ -80,58 +80,32 @@ class TimerManager final : public grpc_event_engine::experimental::Forkable {
TimerManager* const timer_manager_;
};
void StartThread() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
static void RunThread(void* arg);
void Run();
void StartMainLoopThread();
void RestartPostFork();
void MainLoop();
void RunSomeTimers(std::vector<experimental::EventEngine::Closure*> timers);
bool WaitUntil(grpc_core::Timestamp next);
void Kick();
grpc_core::Mutex mu_;
// Condvar associated with decrementing the thread count.
// Threads will signal this when thread count reaches zero, and the forking
// code *or* the destructor will wait upon it.
grpc_core::CondVar cv_threadcount_;
// Condvar associated with threads waiting to wakeup and work.
// Threads wait on this until either a timeout is reached or another thread is
// needed to wait for a timeout.
// On shutdown we SignalAll against this to wake up all threads and have them
// finish.
// On kick we Signal against this to wake up at least one thread (but not
// all)! Similarly when we note that no thread is watching timers.
//
// This is a different condvar than cv_threadcount_!
// If this were the same:
// - thread exits would require a SignalAll to ensure that the specific thread
// we want to wake is woken up.
// - kicks would need to signal all threads to avoid having the kick absorbed
// by a shutdown thread and cause a deadlock, leading to thundering herd
// problems in the common case.
// Condvar associated with the main thread waiting to wakeup and work.
// Threads wait on this until either a timeout is reached or the timer manager
// is kicked. On shutdown we Signal against this to wake up all threads and
// have them finish. On kick we Signal against this to wake up the main
// thread.
grpc_core::CondVar cv_wait_;
Host host_;
// number of threads in the system
size_t thread_count_ ABSL_GUARDED_BY(mu_) = 0;
// number of threads sitting around waiting
size_t waiter_count_ ABSL_GUARDED_BY(mu_) = 0;
// is there a thread waiting until the next timer should fire?
bool has_timed_waiter_ ABSL_GUARDED_BY(mu_) = false;
// are we shutting down?
bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
// are we forking?
bool forking_ ABSL_GUARDED_BY(mu_) = false;
// are we shutting down?
bool kicked_ ABSL_GUARDED_BY(mu_) = false;
// the deadline of the current timed waiter thread (only relevant if
// has_timed_waiter_ is true)
grpc_core::Timestamp timed_waiter_deadline_ ABSL_GUARDED_BY(mu_);
// generation counter to track which thread is waiting for the next timer
uint64_t timed_waiter_generation_ ABSL_GUARDED_BY(mu_) = 0;
// number of timer wakeups
uint64_t wakeups_ ABSL_GUARDED_BY(mu_) = 0;
uint64_t wakeups_ ABSL_GUARDED_BY(mu_) = false;
// actual timer implementation
std::unique_ptr<TimerList> timer_list_;
int prefork_thread_count_ = 0;
grpc_core::Thread main_thread_;
std::shared_ptr<grpc_event_engine::experimental::ThreadPool> thread_pool_;
absl::optional<grpc_core::Notification> main_loop_exit_signal_;
};
} // namespace posix_engine

@ -60,7 +60,10 @@ struct WindowsEventEngine::Closure final : public EventEngine::Closure {
}
};
WindowsEventEngine::WindowsEventEngine() : iocp_(&executor_) {
WindowsEventEngine::WindowsEventEngine()
: executor_(std::make_shared<ThreadPool>()),
iocp_(executor_.get()),
timer_manager_(executor_) {
WSADATA wsaData;
int status = WSAStartup(MAKEWORD(2, 0), &wsaData);
GPR_ASSERT(status == 0);
@ -77,7 +80,8 @@ WindowsEventEngine::~WindowsEventEngine() {
}
GPR_ASSERT(GPR_LIKELY(known_handles_.empty()));
GPR_ASSERT(WSACleanup() == 0);
executor_.Quiesce();
timer_manager_.Shutdown();
executor_->Quiesce();
}
bool WindowsEventEngine::Cancel(EventEngine::TaskHandle handle) {
@ -101,11 +105,11 @@ EventEngine::TaskHandle WindowsEventEngine::RunAfter(
}
void WindowsEventEngine::Run(absl::AnyInvocable<void()> closure) {
executor_.Run(std::move(closure));
executor_->Run(std::move(closure));
}
void WindowsEventEngine::Run(EventEngine::Closure* closure) {
executor_.Run(closure);
executor_->Run(closure);
}
EventEngine::TaskHandle WindowsEventEngine::RunAfterInternal(

@ -111,9 +111,9 @@ class WindowsEventEngine : public EventEngine,
TaskHandleSet known_handles_ ABSL_GUARDED_BY(mu_);
std::atomic<intptr_t> aba_token_{0};
posix_engine::TimerManager timer_manager_;
ThreadPool executor_;
std::shared_ptr<ThreadPool> executor_;
IOCP iocp_;
posix_engine::TimerManager timer_manager_;
};
} // namespace experimental

@ -16,6 +16,7 @@
#include <algorithm>
#include <atomic>
#include <memory>
#include <random>
#include "absl/functional/any_invocable.h"
@ -45,8 +46,9 @@ TEST(TimerManagerTest, StressTest) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_real_distribution<> dis_millis(100, 3000);
auto pool = std::make_shared<grpc_event_engine::experimental::ThreadPool>();
{
TimerManager manager;
TimerManager manager(pool);
for (auto& timer : timers) {
exec_ctx.InvalidateNow();
manager.TimerInit(
@ -69,6 +71,7 @@ TEST(TimerManagerTest, StressTest) {
absl::SleepFor(absl::Milliseconds(333));
}
}
pool->Quiesce();
}
TEST(TimerManagerTest, ShutDownBeforeAllCallbacksAreExecuted) {
@ -79,13 +82,15 @@ TEST(TimerManagerTest, ShutDownBeforeAllCallbacksAreExecuted) {
timers.resize(kTimerCount);
std::atomic_int called{0};
experimental::AnyInvocableClosure closure([&called] { ++called; });
auto pool = std::make_shared<grpc_event_engine::experimental::ThreadPool>();
{
TimerManager manager;
TimerManager manager(pool);
for (auto& timer : timers) {
manager.TimerInit(&timer, grpc_core::Timestamp::InfFuture(), &closure);
}
}
ASSERT_EQ(called.load(), 0);
pool->Quiesce();
}
} // namespace posix_engine

Loading…
Cancel
Save