|
|
|
@ -40,10 +40,47 @@ namespace posix_engine { |
|
|
|
|
|
|
|
|
|
grpc_core::DebugOnlyTraceFlag grpc_event_engine_timer_trace(false, "timer"); |
|
|
|
|
|
|
|
|
|
void TimerManager::StartThread() { |
|
|
|
|
++waiter_count_; |
|
|
|
|
++thread_count_; |
|
|
|
|
auto* thread = new RunThreadArgs(); |
|
|
|
|
thread->self = this; |
|
|
|
|
thread->thread = grpc_core::Thread( |
|
|
|
|
"timer_manager", &TimerManager::RunThread, thread, nullptr, |
|
|
|
|
grpc_core::Thread::Options().set_tracked(false).set_joinable(false)); |
|
|
|
|
thread->thread.Start(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void TimerManager::RunSomeTimers( |
|
|
|
|
std::vector<experimental::EventEngine::Closure*> timers) { |
|
|
|
|
// if there's something to execute...
|
|
|
|
|
{ |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
if (shutdown_ || forking_) return; |
|
|
|
|
// remove a waiter from the pool, and start another thread if necessary
|
|
|
|
|
--waiter_count_; |
|
|
|
|
if (waiter_count_ == 0) { |
|
|
|
|
// The number of timer threads is always increasing until all the threads
|
|
|
|
|
// are stopped, with the exception that all threads are shut down on fork
|
|
|
|
|
// events. In rare cases, if a large number of timers fire simultaneously,
|
|
|
|
|
// we may end up using a large number of threads.
|
|
|
|
|
// TODO(ctiller): We could avoid this by exiting threads in WaitUntil().
|
|
|
|
|
StartThread(); |
|
|
|
|
} else { |
|
|
|
|
// if there's no thread waiting with a timeout, kick an existing untimed
|
|
|
|
|
// waiter so that the next deadline is not missed
|
|
|
|
|
if (!has_timed_waiter_) { |
|
|
|
|
cv_wait_.Signal(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
for (auto* timer : timers) { |
|
|
|
|
thread_pool_->Run(timer); |
|
|
|
|
timer->Run(); |
|
|
|
|
} |
|
|
|
|
{ |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
// get ready to wait again
|
|
|
|
|
++waiter_count_; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -52,18 +89,64 @@ void TimerManager::RunSomeTimers( |
|
|
|
|
// shutdown)
|
|
|
|
|
bool TimerManager::WaitUntil(grpc_core::Timestamp next) { |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
|
|
|
|
|
if (shutdown_) return false; |
|
|
|
|
if (forking_) return false; |
|
|
|
|
|
|
|
|
|
// TODO(ctiller): if there are too many waiting threads, this would be a good
|
|
|
|
|
// place to exit the current thread.
|
|
|
|
|
|
|
|
|
|
// If kicked_ is true at this point, it means there was a kick from the timer
|
|
|
|
|
// system that the timer-manager threads here missed. We cannot trust 'next'
|
|
|
|
|
// here any longer (since there might be an earlier deadline). So if kicked_
|
|
|
|
|
// is true at this point, we should quickly exit this and get the next
|
|
|
|
|
// deadline from the timer system
|
|
|
|
|
|
|
|
|
|
if (!kicked_) { |
|
|
|
|
// if there's no timed waiter, we should become one: that waiter waits
|
|
|
|
|
// only until the next timer should expire. All other timers wait forever
|
|
|
|
|
//
|
|
|
|
|
// 'timed_waiter_generation_' is a global generation counter. The idea here
|
|
|
|
|
// is that the thread becoming a timed-waiter increments and stores this
|
|
|
|
|
// global counter locally in 'my_timed_waiter_generation' before going to
|
|
|
|
|
// sleep. After waking up, if my_timed_waiter_generation ==
|
|
|
|
|
// timed_waiter_generation_, it can be sure that it was the timed_waiter
|
|
|
|
|
// thread (and that no other thread took over while this was asleep)
|
|
|
|
|
//
|
|
|
|
|
// Initialize my_timed_waiter_generation to some value that is NOT equal to
|
|
|
|
|
// timed_waiter_generation_
|
|
|
|
|
uint64_t my_timed_waiter_generation = timed_waiter_generation_ - 1; |
|
|
|
|
|
|
|
|
|
/* If there's no timed waiter, we should become one: that waiter waits only
|
|
|
|
|
until the next timer should expire. All other timer threads wait forever |
|
|
|
|
unless their 'next' is earlier than the current timed-waiter's deadline |
|
|
|
|
(in which case the thread with earlier 'next' takes over as the new timed |
|
|
|
|
waiter) */ |
|
|
|
|
if (next != grpc_core::Timestamp::InfFuture()) { |
|
|
|
|
if (!has_timed_waiter_ || (next < timed_waiter_deadline_)) { |
|
|
|
|
my_timed_waiter_generation = ++timed_waiter_generation_; |
|
|
|
|
has_timed_waiter_ = true; |
|
|
|
|
timed_waiter_deadline_ = next; |
|
|
|
|
} else { // timed_waiter_ == true && next >= timed_waiter_deadline_
|
|
|
|
|
next = grpc_core::Timestamp::InfFuture(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
cv_wait_.WaitWithTimeout(&mu_, |
|
|
|
|
absl::Milliseconds((next - host_.Now()).millis())); |
|
|
|
|
++wakeups_; |
|
|
|
|
|
|
|
|
|
// if this was the timed waiter, then we need to check timers, and flag
|
|
|
|
|
// that there's now no timed waiter... we'll look for a replacement if
|
|
|
|
|
// there's work to do after checking timers (code above)
|
|
|
|
|
if (my_timed_waiter_generation == timed_waiter_generation_) { |
|
|
|
|
++wakeups_; |
|
|
|
|
has_timed_waiter_ = false; |
|
|
|
|
timed_waiter_deadline_ = grpc_core::Timestamp::InfFuture(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
kicked_ = false; |
|
|
|
|
|
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -72,37 +155,54 @@ void TimerManager::MainLoop() { |
|
|
|
|
grpc_core::Timestamp next = grpc_core::Timestamp::InfFuture(); |
|
|
|
|
absl::optional<std::vector<experimental::EventEngine::Closure*>> |
|
|
|
|
check_result = timer_list_->TimerCheck(&next); |
|
|
|
|
GPR_ASSERT(check_result.has_value() && |
|
|
|
|
"ERROR: More than one MainLoop is running."); |
|
|
|
|
if (!check_result->empty()) { |
|
|
|
|
RunSomeTimers(std::move(*check_result)); |
|
|
|
|
continue; |
|
|
|
|
if (check_result.has_value()) { |
|
|
|
|
if (!check_result->empty()) { |
|
|
|
|
RunSomeTimers(std::move(*check_result)); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
/* This case only happens under contention, meaning more than one timer
|
|
|
|
|
manager thread checked timers concurrently. |
|
|
|
|
|
|
|
|
|
If that happens, we're guaranteed that some other thread has just |
|
|
|
|
checked timers, and this will avalanche into some other thread seeing |
|
|
|
|
empty timers and doing a timed sleep. |
|
|
|
|
|
|
|
|
|
Consequently, we can just sleep forever here and be happy at some |
|
|
|
|
saved wakeup cycles. */ |
|
|
|
|
next = grpc_core::Timestamp::InfFuture(); |
|
|
|
|
} |
|
|
|
|
if (!WaitUntil(next)) break; |
|
|
|
|
if (!WaitUntil(next)) return; |
|
|
|
|
} |
|
|
|
|
main_loop_exit_signal_->Notify(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool TimerManager::IsTimerManagerThread() { return g_timer_thread; } |
|
|
|
|
void TimerManager::RunThread(void* arg) { |
|
|
|
|
g_timer_thread = true; |
|
|
|
|
std::unique_ptr<RunThreadArgs> thread(static_cast<RunThreadArgs*>(arg)); |
|
|
|
|
if (grpc_event_engine_timer_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "TimerManager::%p starting thread::%p", thread->self, |
|
|
|
|
&thread->thread); |
|
|
|
|
} |
|
|
|
|
thread->self->Run(); |
|
|
|
|
if (grpc_event_engine_timer_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "TimerManager::%p thread::%p finished", thread->self, |
|
|
|
|
&thread->thread); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void TimerManager::StartMainLoopThread() { |
|
|
|
|
main_thread_ = grpc_core::Thread( |
|
|
|
|
"timer_manager", |
|
|
|
|
[](void* arg) { |
|
|
|
|
auto self = static_cast<TimerManager*>(arg); |
|
|
|
|
self->MainLoop(); |
|
|
|
|
}, |
|
|
|
|
this, nullptr, |
|
|
|
|
grpc_core::Thread::Options().set_tracked(false).set_joinable(false)); |
|
|
|
|
main_thread_.Start(); |
|
|
|
|
void TimerManager::Run() { |
|
|
|
|
MainLoop(); |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
thread_count_--; |
|
|
|
|
if (thread_count_ == 0) cv_threadcount_.Signal(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TimerManager::TimerManager( |
|
|
|
|
std::shared_ptr<grpc_event_engine::experimental::ThreadPool> thread_pool) |
|
|
|
|
: host_(this), thread_pool_(std::move(thread_pool)) { |
|
|
|
|
bool TimerManager::IsTimerManagerThread() { return g_timer_thread; } |
|
|
|
|
|
|
|
|
|
TimerManager::TimerManager() : host_(this) { |
|
|
|
|
timer_list_ = std::make_unique<TimerList>(&host_); |
|
|
|
|
main_loop_exit_signal_.emplace(); |
|
|
|
|
StartMainLoopThread(); |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
StartThread(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_core::Timestamp TimerManager::Host::Now() { |
|
|
|
@ -112,15 +212,6 @@ grpc_core::Timestamp TimerManager::Host::Now() { |
|
|
|
|
|
|
|
|
|
void TimerManager::TimerInit(Timer* timer, grpc_core::Timestamp deadline, |
|
|
|
|
experimental::EventEngine::Closure* closure) { |
|
|
|
|
if (grpc_event_engine_timer_trace.enabled()) { |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
if (shutdown_) { |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"WARNING: TimerManager::%p: scheduling Closure::%p after " |
|
|
|
|
"TimerManager has been shut down.", |
|
|
|
|
this, closure); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
timer_list_->TimerInit(timer, deadline, closure); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -128,47 +219,63 @@ bool TimerManager::TimerCancel(Timer* timer) { |
|
|
|
|
return timer_list_->TimerCancel(timer); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void TimerManager::Shutdown() { |
|
|
|
|
{ |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
if (shutdown_) return; |
|
|
|
|
TimerManager::~TimerManager() { |
|
|
|
|
if (grpc_event_engine_timer_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "TimerManager::%p shutting down", this); |
|
|
|
|
} |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
shutdown_ = true; |
|
|
|
|
cv_wait_.SignalAll(); |
|
|
|
|
while (thread_count_ > 0) { |
|
|
|
|
if (grpc_event_engine_timer_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "TimerManager::%p shutting down", this); |
|
|
|
|
gpr_log(GPR_DEBUG, "TimerManager::%p waiting for %zu threads to finish", |
|
|
|
|
this, thread_count_); |
|
|
|
|
} |
|
|
|
|
shutdown_ = true; |
|
|
|
|
// Wait on the main loop to exit.
|
|
|
|
|
cv_wait_.Signal(); |
|
|
|
|
cv_threadcount_.Wait(&mu_); |
|
|
|
|
} |
|
|
|
|
main_loop_exit_signal_->WaitForNotification(); |
|
|
|
|
if (grpc_event_engine_timer_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "TimerManager::%p shutdown complete", this); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TimerManager::~TimerManager() { Shutdown(); } |
|
|
|
|
|
|
|
|
|
void TimerManager::Host::Kick() { timer_manager_->Kick(); } |
|
|
|
|
|
|
|
|
|
void TimerManager::Kick() { |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
has_timed_waiter_ = false; |
|
|
|
|
timed_waiter_deadline_ = grpc_core::Timestamp::InfFuture(); |
|
|
|
|
++timed_waiter_generation_; |
|
|
|
|
kicked_ = true; |
|
|
|
|
cv_wait_.Signal(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void TimerManager::RestartPostFork() { |
|
|
|
|
void TimerManager::PrepareFork() { |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
GPR_ASSERT(GPR_LIKELY(shutdown_)); |
|
|
|
|
if (grpc_event_engine_timer_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "TimerManager::%p restarting after shutdown", this); |
|
|
|
|
forking_ = true; |
|
|
|
|
prefork_thread_count_ = thread_count_; |
|
|
|
|
cv_wait_.SignalAll(); |
|
|
|
|
while (thread_count_ > 0) { |
|
|
|
|
cv_threadcount_.Wait(&mu_); |
|
|
|
|
} |
|
|
|
|
shutdown_ = false; |
|
|
|
|
main_loop_exit_signal_.emplace(); |
|
|
|
|
StartMainLoopThread(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void TimerManager::PrepareFork() { Shutdown(); } |
|
|
|
|
void TimerManager::PostforkParent() { RestartPostFork(); } |
|
|
|
|
void TimerManager::PostforkChild() { RestartPostFork(); } |
|
|
|
|
void TimerManager::PostforkParent() { |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
for (int i = 0; i < prefork_thread_count_; i++) { |
|
|
|
|
StartThread(); |
|
|
|
|
} |
|
|
|
|
prefork_thread_count_ = 0; |
|
|
|
|
forking_ = false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void TimerManager::PostforkChild() { |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
for (int i = 0; i < prefork_thread_count_; i++) { |
|
|
|
|
StartThread(); |
|
|
|
|
} |
|
|
|
|
prefork_thread_count_ = 0; |
|
|
|
|
forking_ = false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} // namespace posix_engine
|
|
|
|
|
} // namespace grpc_event_engine
|
|
|
|
|