[EventEngine] Use EventEngine::Run for TimerManager's MainLoop (#34705)

Closes #34705

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/34705 from drfloob:timer-mgr-on-ee-alt 070118e3f1
PiperOrigin-RevId: 582778797
pull/34965/head
AJ Heller 1 year ago committed by Copybara-Service
parent bb7be1967f
commit 59408f1285
  1. 44
      src/core/lib/event_engine/posix_engine/timer_manager.cc
  2. 3
      src/core/lib/event_engine/posix_engine/timer_manager.h

@ -30,7 +30,6 @@
#include <grpc/support/time.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/thd.h"
static thread_local bool g_timer_thread;
@ -67,41 +66,32 @@ bool TimerManager::WaitUntil(grpc_core::Timestamp next) {
}
void TimerManager::MainLoop() {
for (;;) {
grpc_core::Timestamp next = grpc_core::Timestamp::InfFuture();
absl::optional<std::vector<experimental::EventEngine::Closure*>>
check_result = timer_list_->TimerCheck(&next);
GPR_ASSERT(check_result.has_value() &&
"ERROR: More than one MainLoop is running.");
if (!check_result->empty()) {
RunSomeTimers(std::move(*check_result));
continue;
}
if (!WaitUntil(next)) break;
grpc_core::Timestamp next = grpc_core::Timestamp::InfFuture();
absl::optional<std::vector<experimental::EventEngine::Closure*>>
check_result = timer_list_->TimerCheck(&next);
GPR_ASSERT(check_result.has_value() &&
"ERROR: More than one MainLoop is running.");
bool timers_found = !check_result->empty();
if (timers_found) {
RunSomeTimers(std::move(*check_result));
}
main_loop_exit_signal_->Notify();
thread_pool_->Run([this, next, timers_found]() {
if (!timers_found && !WaitUntil(next)) {
main_loop_exit_signal_->Notify();
return;
}
MainLoop();
});
}
bool TimerManager::IsTimerManagerThread() { return g_timer_thread; }
void TimerManager::StartMainLoopThread() {
main_thread_ = grpc_core::Thread(
"timer_manager",
[](void* arg) {
auto self = static_cast<TimerManager*>(arg);
self->MainLoop();
},
this, nullptr,
grpc_core::Thread::Options().set_tracked(false).set_joinable(false));
main_thread_.Start();
}
TimerManager::TimerManager(
std::shared_ptr<grpc_event_engine::experimental::ThreadPool> thread_pool)
: host_(this), thread_pool_(std::move(thread_pool)) {
timer_list_ = std::make_unique<TimerList>(&host_);
main_loop_exit_signal_.emplace();
StartMainLoopThread();
thread_pool_->Run([this]() { MainLoop(); });
}
grpc_core::Timestamp TimerManager::Host::Now() {
@ -162,7 +152,7 @@ void TimerManager::RestartPostFork() {
}
shutdown_ = false;
main_loop_exit_signal_.emplace();
StartMainLoopThread();
thread_pool_->Run([this]() { MainLoop(); });
}
void TimerManager::PrepareFork() { Shutdown(); }

@ -36,7 +36,6 @@
#include "src/core/lib/event_engine/thread_pool/thread_pool.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gprpp/time.h"
namespace grpc_event_engine {
@ -80,7 +79,6 @@ class TimerManager final : public grpc_event_engine::experimental::Forkable {
TimerManager* const timer_manager_;
};
void StartMainLoopThread();
void RestartPostFork();
void MainLoop();
void RunSomeTimers(std::vector<experimental::EventEngine::Closure*> timers);
@ -103,7 +101,6 @@ class TimerManager final : public grpc_event_engine::experimental::Forkable {
uint64_t wakeups_ ABSL_GUARDED_BY(mu_) = false;
// actual timer implementation
std::unique_ptr<TimerList> timer_list_;
grpc_core::Thread main_thread_;
std::shared_ptr<grpc_event_engine::experimental::ThreadPool> thread_pool_;
absl::optional<grpc_core::Notification> main_loop_exit_signal_;
};

Loading…
Cancel
Save