diff --git a/BUILD b/BUILD index 363d9524a97..bd6af7bffdc 100644 --- a/BUILD +++ b/BUILD @@ -2515,9 +2515,11 @@ grpc_cc_library( ], deps = [ "event_engine_base_hdrs", + "event_engine_thread_pool", "forkable", "gpr", "grpc_trace", + "notification", "posix_event_engine_timer", "time", ], diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.cc b/src/core/lib/event_engine/posix_engine/posix_engine.cc index 96a51feb20e..480c9654aa5 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine.cc @@ -17,6 +17,7 @@ #include #include +#include #include #include @@ -91,12 +92,12 @@ PosixEnginePollerManager::~PosixEnginePollerManager() { } PosixEventEngine::PosixEventEngine(PosixEventPoller* poller) - : executor_(std::make_shared()) { + : executor_(std::make_shared()), timer_manager_(executor_) { poller_manager_ = std::make_shared(poller); } PosixEventEngine::PosixEventEngine() - : executor_(std::make_shared()) { + : executor_(std::make_shared()), timer_manager_(executor_) { if (grpc_core::IsPosixEventEngineEnablePollingEnabled()) { poller_manager_ = std::make_shared(executor_); if (poller_manager_->Poller() != nullptr) { @@ -174,6 +175,7 @@ PosixEventEngine::~PosixEventEngine() { } GPR_ASSERT(GPR_LIKELY(known_handles_.empty())); } + timer_manager_.Shutdown(); #ifdef GRPC_POSIX_SOCKET_TCP if (poller_manager_ != nullptr) { poller_manager_->TriggerShutdown(); diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.h b/src/core/lib/event_engine/posix_engine/posix_engine.h index 8e648179a61..e7c333ebd5b 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.h +++ b/src/core/lib/event_engine/posix_engine/posix_engine.h @@ -106,7 +106,7 @@ class PosixEventEngine final : public EventEngine, grpc_event_engine::posix_engine::PosixEventPoller* poller); PosixEventEngine(); #else // GRPC_POSIX_SOCKET_TCP - PosixEventEngine() = default; + PosixEventEngine(); #endif // GRPC_POSIX_SOCKET_TCP ~PosixEventEngine() override; @@ -160,8 +160,8 @@ class PosixEventEngine final : public EventEngine, grpc_core::Mutex mu_; TaskHandleSet known_handles_ ABSL_GUARDED_BY(mu_); std::atomic aba_token_{0}; - posix_engine::TimerManager timer_manager_; std::shared_ptr executor_; + posix_engine::TimerManager timer_manager_; #ifdef GRPC_POSIX_SOCKET_TCP std::shared_ptr poller_manager_; #endif // GRPC_POSIX_SOCKET_TCP diff --git a/src/core/lib/event_engine/posix_engine/timer_manager.cc b/src/core/lib/event_engine/posix_engine/timer_manager.cc index ceb834c7701..6ab020635aa 100644 --- a/src/core/lib/event_engine/posix_engine/timer_manager.cc +++ b/src/core/lib/event_engine/posix_engine/timer_manager.cc @@ -40,47 +40,10 @@ namespace posix_engine { grpc_core::DebugOnlyTraceFlag grpc_event_engine_timer_trace(false, "timer"); -void TimerManager::StartThread() { - ++waiter_count_; - ++thread_count_; - auto* thread = new RunThreadArgs(); - thread->self = this; - thread->thread = grpc_core::Thread( - "timer_manager", &TimerManager::RunThread, thread, nullptr, - grpc_core::Thread::Options().set_tracked(false).set_joinable(false)); - thread->thread.Start(); -} - void TimerManager::RunSomeTimers( std::vector timers) { - // if there's something to execute... - { - grpc_core::MutexLock lock(&mu_); - if (shutdown_ || forking_) return; - // remove a waiter from the pool, and start another thread if necessary - --waiter_count_; - if (waiter_count_ == 0) { - // The number of timer threads is always increasing until all the threads - // are stopped, with the exception that all threads are shut down on fork - // events. In rare cases, if a large number of timers fire simultaneously, - // we may end up using a large number of threads. - // TODO(ctiller): We could avoid this by exiting threads in WaitUntil(). - StartThread(); - } else { - // if there's no thread waiting with a timeout, kick an existing untimed - // waiter so that the next deadline is not missed - if (!has_timed_waiter_) { - cv_wait_.Signal(); - } - } - } for (auto* timer : timers) { - timer->Run(); - } - { - grpc_core::MutexLock lock(&mu_); - // get ready to wait again - ++waiter_count_; + thread_pool_->Run(timer); } } @@ -89,64 +52,18 @@ void TimerManager::RunSomeTimers( // shutdown) bool TimerManager::WaitUntil(grpc_core::Timestamp next) { grpc_core::MutexLock lock(&mu_); - if (shutdown_) return false; - if (forking_) return false; - - // TODO(ctiller): if there are too many waiting threads, this would be a good - // place to exit the current thread. - // If kicked_ is true at this point, it means there was a kick from the timer // system that the timer-manager threads here missed. We cannot trust 'next' // here any longer (since there might be an earlier deadline). So if kicked_ // is true at this point, we should quickly exit this and get the next // deadline from the timer system - if (!kicked_) { - // if there's no timed waiter, we should become one: that waiter waits - // only until the next timer should expire. All other timers wait forever - // - // 'timed_waiter_generation_' is a global generation counter. The idea here - // is that the thread becoming a timed-waiter increments and stores this - // global counter locally in 'my_timed_waiter_generation' before going to - // sleep. After waking up, if my_timed_waiter_generation == - // timed_waiter_generation_, it can be sure that it was the timed_waiter - // thread (and that no other thread took over while this was asleep) - // - // Initialize my_timed_waiter_generation to some value that is NOT equal to - // timed_waiter_generation_ - uint64_t my_timed_waiter_generation = timed_waiter_generation_ - 1; - - /* If there's no timed waiter, we should become one: that waiter waits only - until the next timer should expire. All other timer threads wait forever - unless their 'next' is earlier than the current timed-waiter's deadline - (in which case the thread with earlier 'next' takes over as the new timed - waiter) */ - if (next != grpc_core::Timestamp::InfFuture()) { - if (!has_timed_waiter_ || (next < timed_waiter_deadline_)) { - my_timed_waiter_generation = ++timed_waiter_generation_; - has_timed_waiter_ = true; - timed_waiter_deadline_ = next; - } else { // timed_waiter_ == true && next >= timed_waiter_deadline_ - next = grpc_core::Timestamp::InfFuture(); - } - } - cv_wait_.WaitWithTimeout(&mu_, absl::Milliseconds((next - host_.Now()).millis())); - - // if this was the timed waiter, then we need to check timers, and flag - // that there's now no timed waiter... we'll look for a replacement if - // there's work to do after checking timers (code above) - if (my_timed_waiter_generation == timed_waiter_generation_) { - ++wakeups_; - has_timed_waiter_ = false; - timed_waiter_deadline_ = grpc_core::Timestamp::InfFuture(); - } + ++wakeups_; } - kicked_ = false; - return true; } @@ -155,54 +72,37 @@ void TimerManager::MainLoop() { grpc_core::Timestamp next = grpc_core::Timestamp::InfFuture(); absl::optional> check_result = timer_list_->TimerCheck(&next); - if (check_result.has_value()) { - if (!check_result->empty()) { - RunSomeTimers(std::move(*check_result)); - continue; - } - } else { - /* This case only happens under contention, meaning more than one timer - manager thread checked timers concurrently. - - If that happens, we're guaranteed that some other thread has just - checked timers, and this will avalanche into some other thread seeing - empty timers and doing a timed sleep. - - Consequently, we can just sleep forever here and be happy at some - saved wakeup cycles. */ - next = grpc_core::Timestamp::InfFuture(); + GPR_ASSERT(check_result.has_value() && + "ERROR: More than one MainLoop is running."); + if (!check_result->empty()) { + RunSomeTimers(std::move(*check_result)); + continue; } - if (!WaitUntil(next)) return; + if (!WaitUntil(next)) break; } + main_loop_exit_signal_->Notify(); } -void TimerManager::RunThread(void* arg) { - g_timer_thread = true; - std::unique_ptr thread(static_cast(arg)); - if (grpc_event_engine_timer_trace.enabled()) { - gpr_log(GPR_DEBUG, "TimerManager::%p starting thread::%p", thread->self, - &thread->thread); - } - thread->self->Run(); - if (grpc_event_engine_timer_trace.enabled()) { - gpr_log(GPR_DEBUG, "TimerManager::%p thread::%p finished", thread->self, - &thread->thread); - } -} +bool TimerManager::IsTimerManagerThread() { return g_timer_thread; } -void TimerManager::Run() { - MainLoop(); - grpc_core::MutexLock lock(&mu_); - thread_count_--; - if (thread_count_ == 0) cv_threadcount_.Signal(); +void TimerManager::StartMainLoopThread() { + main_thread_ = grpc_core::Thread( + "timer_manager", + [](void* arg) { + auto self = static_cast(arg); + self->MainLoop(); + }, + this, nullptr, + grpc_core::Thread::Options().set_tracked(false).set_joinable(false)); + main_thread_.Start(); } -bool TimerManager::IsTimerManagerThread() { return g_timer_thread; } - -TimerManager::TimerManager() : host_(this) { +TimerManager::TimerManager( + std::shared_ptr thread_pool) + : host_(this), thread_pool_(std::move(thread_pool)) { timer_list_ = std::make_unique(&host_); - grpc_core::MutexLock lock(&mu_); - StartThread(); + main_loop_exit_signal_.emplace(); + StartMainLoopThread(); } grpc_core::Timestamp TimerManager::Host::Now() { @@ -212,6 +112,15 @@ grpc_core::Timestamp TimerManager::Host::Now() { void TimerManager::TimerInit(Timer* timer, grpc_core::Timestamp deadline, experimental::EventEngine::Closure* closure) { + if (grpc_event_engine_timer_trace.enabled()) { + grpc_core::MutexLock lock(&mu_); + if (shutdown_) { + gpr_log(GPR_ERROR, + "WARNING: TimerManager::%p: scheduling Closure::%p after " + "TimerManager has been shut down.", + this, closure); + } + } timer_list_->TimerInit(timer, deadline, closure); } @@ -219,63 +128,47 @@ bool TimerManager::TimerCancel(Timer* timer) { return timer_list_->TimerCancel(timer); } -TimerManager::~TimerManager() { - if (grpc_event_engine_timer_trace.enabled()) { - gpr_log(GPR_DEBUG, "TimerManager::%p shutting down", this); - } - grpc_core::MutexLock lock(&mu_); - shutdown_ = true; - cv_wait_.SignalAll(); - while (thread_count_ > 0) { +void TimerManager::Shutdown() { + { + grpc_core::MutexLock lock(&mu_); + if (shutdown_) return; if (grpc_event_engine_timer_trace.enabled()) { - gpr_log(GPR_DEBUG, "TimerManager::%p waiting for %zu threads to finish", - this, thread_count_); + gpr_log(GPR_DEBUG, "TimerManager::%p shutting down", this); } - cv_threadcount_.Wait(&mu_); + shutdown_ = true; + // Wait on the main loop to exit. + cv_wait_.Signal(); } + main_loop_exit_signal_->WaitForNotification(); if (grpc_event_engine_timer_trace.enabled()) { gpr_log(GPR_DEBUG, "TimerManager::%p shutdown complete", this); } } +TimerManager::~TimerManager() { Shutdown(); } + void TimerManager::Host::Kick() { timer_manager_->Kick(); } void TimerManager::Kick() { grpc_core::MutexLock lock(&mu_); - has_timed_waiter_ = false; - timed_waiter_deadline_ = grpc_core::Timestamp::InfFuture(); - ++timed_waiter_generation_; kicked_ = true; cv_wait_.Signal(); } -void TimerManager::PrepareFork() { +void TimerManager::RestartPostFork() { grpc_core::MutexLock lock(&mu_); - forking_ = true; - prefork_thread_count_ = thread_count_; - cv_wait_.SignalAll(); - while (thread_count_ > 0) { - cv_threadcount_.Wait(&mu_); - } -} - -void TimerManager::PostforkParent() { - grpc_core::MutexLock lock(&mu_); - for (int i = 0; i < prefork_thread_count_; i++) { - StartThread(); + GPR_ASSERT(GPR_LIKELY(shutdown_)); + if (grpc_event_engine_timer_trace.enabled()) { + gpr_log(GPR_DEBUG, "TimerManager::%p restarting after shutdown", this); } - prefork_thread_count_ = 0; - forking_ = false; + shutdown_ = false; + main_loop_exit_signal_.emplace(); + StartMainLoopThread(); } -void TimerManager::PostforkChild() { - grpc_core::MutexLock lock(&mu_); - for (int i = 0; i < prefork_thread_count_; i++) { - StartThread(); - } - prefork_thread_count_ = 0; - forking_ = false; -} +void TimerManager::PrepareFork() { Shutdown(); } +void TimerManager::PostforkParent() { RestartPostFork(); } +void TimerManager::PostforkChild() { RestartPostFork(); } } // namespace posix_engine } // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/posix_engine/timer_manager.h b/src/core/lib/event_engine/posix_engine/timer_manager.h index 601fb8562c4..4de6eaa5d49 100644 --- a/src/core/lib/event_engine/posix_engine/timer_manager.h +++ b/src/core/lib/event_engine/posix_engine/timer_manager.h @@ -21,18 +21,20 @@ #include -#include #include #include #include #include "absl/base/thread_annotations.h" +#include "absl/types/optional.h" #include #include "src/core/lib/event_engine/forkable.h" #include "src/core/lib/event_engine/posix_engine/timer.h" +#include "src/core/lib/event_engine/thread_pool.h" +#include "src/core/lib/gprpp/notification.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gprpp/time.h" @@ -46,7 +48,8 @@ namespace posix_engine { // thread_pool.{h,cc}. class TimerManager final : public grpc_event_engine::experimental::Forkable { public: - TimerManager(); + explicit TimerManager( + std::shared_ptr thread_pool); ~TimerManager() override; grpc_core::Timestamp Now() { return host_.Now(); } @@ -55,19 +58,16 @@ class TimerManager final : public grpc_event_engine::experimental::Forkable { experimental::EventEngine::Closure* closure); bool TimerCancel(Timer* timer); - // Forkable + static bool IsTimerManagerThread(); + + // Called on destruction, prefork, and manually when needed. + void Shutdown(); + void PrepareFork() override; void PostforkParent() override; void PostforkChild() override; - static bool IsTimerManagerThread(); - private: - struct RunThreadArgs { - TimerManager* self; - grpc_core::Thread thread; - }; - class Host final : public TimerListHost { public: explicit Host(TimerManager* timer_manager) @@ -80,58 +80,32 @@ class TimerManager final : public grpc_event_engine::experimental::Forkable { TimerManager* const timer_manager_; }; - void StartThread() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - static void RunThread(void* arg); - void Run(); + void StartMainLoopThread(); + void RestartPostFork(); void MainLoop(); void RunSomeTimers(std::vector timers); bool WaitUntil(grpc_core::Timestamp next); void Kick(); grpc_core::Mutex mu_; - // Condvar associated with decrementing the thread count. - // Threads will signal this when thread count reaches zero, and the forking - // code *or* the destructor will wait upon it. - grpc_core::CondVar cv_threadcount_; - // Condvar associated with threads waiting to wakeup and work. - // Threads wait on this until either a timeout is reached or another thread is - // needed to wait for a timeout. - // On shutdown we SignalAll against this to wake up all threads and have them - // finish. - // On kick we Signal against this to wake up at least one thread (but not - // all)! Similarly when we note that no thread is watching timers. - // - // This is a different condvar than cv_threadcount_! - // If this were the same: - // - thread exits would require a SignalAll to ensure that the specific thread - // we want to wake is woken up. - // - kicks would need to signal all threads to avoid having the kick absorbed - // by a shutdown thread and cause a deadlock, leading to thundering herd - // problems in the common case. + // Condvar associated with the main thread waiting to wakeup and work. + // Threads wait on this until either a timeout is reached or the timer manager + // is kicked. On shutdown we Signal against this to wake up all threads and + // have them finish. On kick we Signal against this to wake up the main + // thread. grpc_core::CondVar cv_wait_; Host host_; - // number of threads in the system - size_t thread_count_ ABSL_GUARDED_BY(mu_) = 0; - // number of threads sitting around waiting - size_t waiter_count_ ABSL_GUARDED_BY(mu_) = 0; - // is there a thread waiting until the next timer should fire? - bool has_timed_waiter_ ABSL_GUARDED_BY(mu_) = false; // are we shutting down? bool shutdown_ ABSL_GUARDED_BY(mu_) = false; - // are we forking? - bool forking_ ABSL_GUARDED_BY(mu_) = false; // are we shutting down? bool kicked_ ABSL_GUARDED_BY(mu_) = false; - // the deadline of the current timed waiter thread (only relevant if - // has_timed_waiter_ is true) - grpc_core::Timestamp timed_waiter_deadline_ ABSL_GUARDED_BY(mu_); - // generation counter to track which thread is waiting for the next timer - uint64_t timed_waiter_generation_ ABSL_GUARDED_BY(mu_) = 0; // number of timer wakeups - uint64_t wakeups_ ABSL_GUARDED_BY(mu_) = 0; + uint64_t wakeups_ ABSL_GUARDED_BY(mu_) = false; // actual timer implementation std::unique_ptr timer_list_; - int prefork_thread_count_ = 0; + grpc_core::Thread main_thread_; + std::shared_ptr thread_pool_; + absl::optional main_loop_exit_signal_; }; } // namespace posix_engine diff --git a/src/core/lib/event_engine/windows/windows_engine.cc b/src/core/lib/event_engine/windows/windows_engine.cc index 7ec77267774..f622345def9 100644 --- a/src/core/lib/event_engine/windows/windows_engine.cc +++ b/src/core/lib/event_engine/windows/windows_engine.cc @@ -60,7 +60,10 @@ struct WindowsEventEngine::Closure final : public EventEngine::Closure { } }; -WindowsEventEngine::WindowsEventEngine() : iocp_(&executor_) { +WindowsEventEngine::WindowsEventEngine() + : executor_(std::make_shared()), + iocp_(executor_.get()), + timer_manager_(executor_) { WSADATA wsaData; int status = WSAStartup(MAKEWORD(2, 0), &wsaData); GPR_ASSERT(status == 0); @@ -77,7 +80,8 @@ WindowsEventEngine::~WindowsEventEngine() { } GPR_ASSERT(GPR_LIKELY(known_handles_.empty())); GPR_ASSERT(WSACleanup() == 0); - executor_.Quiesce(); + timer_manager_.Shutdown(); + executor_->Quiesce(); } bool WindowsEventEngine::Cancel(EventEngine::TaskHandle handle) { @@ -101,11 +105,11 @@ EventEngine::TaskHandle WindowsEventEngine::RunAfter( } void WindowsEventEngine::Run(absl::AnyInvocable closure) { - executor_.Run(std::move(closure)); + executor_->Run(std::move(closure)); } void WindowsEventEngine::Run(EventEngine::Closure* closure) { - executor_.Run(closure); + executor_->Run(closure); } EventEngine::TaskHandle WindowsEventEngine::RunAfterInternal( diff --git a/src/core/lib/event_engine/windows/windows_engine.h b/src/core/lib/event_engine/windows/windows_engine.h index 8a6aa4d6102..99748561419 100644 --- a/src/core/lib/event_engine/windows/windows_engine.h +++ b/src/core/lib/event_engine/windows/windows_engine.h @@ -111,9 +111,9 @@ class WindowsEventEngine : public EventEngine, TaskHandleSet known_handles_ ABSL_GUARDED_BY(mu_); std::atomic aba_token_{0}; - posix_engine::TimerManager timer_manager_; - ThreadPool executor_; + std::shared_ptr executor_; IOCP iocp_; + posix_engine::TimerManager timer_manager_; }; } // namespace experimental diff --git a/test/core/event_engine/posix/timer_manager_test.cc b/test/core/event_engine/posix/timer_manager_test.cc index ca9e367e735..e742f2f68e1 100644 --- a/test/core/event_engine/posix/timer_manager_test.cc +++ b/test/core/event_engine/posix/timer_manager_test.cc @@ -16,6 +16,7 @@ #include #include +#include #include #include "absl/functional/any_invocable.h" @@ -45,8 +46,9 @@ TEST(TimerManagerTest, StressTest) { std::random_device rd; std::mt19937 gen(rd()); std::uniform_real_distribution<> dis_millis(100, 3000); + auto pool = std::make_shared(); { - TimerManager manager; + TimerManager manager(pool); for (auto& timer : timers) { exec_ctx.InvalidateNow(); manager.TimerInit( @@ -69,6 +71,7 @@ TEST(TimerManagerTest, StressTest) { absl::SleepFor(absl::Milliseconds(333)); } } + pool->Quiesce(); } TEST(TimerManagerTest, ShutDownBeforeAllCallbacksAreExecuted) { @@ -79,13 +82,15 @@ TEST(TimerManagerTest, ShutDownBeforeAllCallbacksAreExecuted) { timers.resize(kTimerCount); std::atomic_int called{0}; experimental::AnyInvocableClosure closure([&called] { ++called; }); + auto pool = std::make_shared(); { - TimerManager manager; + TimerManager manager(pool); for (auto& timer : timers) { manager.TimerInit(&timer, grpc_core::Timestamp::InfFuture(), &closure); } } ASSERT_EQ(called.load(), 0); + pool->Quiesce(); } } // namespace posix_engine