@ -20,8 +20,6 @@
# include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h"
# include <inttypes.h>
# include <atomic>
# include <memory>
# include <utility>
@ -38,7 +36,6 @@
# include "src/core/lib/event_engine/trace.h"
# include "src/core/lib/event_engine/work_queue/basic_work_queue.h"
# include "src/core/lib/event_engine/work_queue/work_queue.h"
# include "src/core/lib/gpr/time_precise.h"
# include "src/core/lib/gprpp/thd.h"
# include "src/core/lib/gprpp/time.h"
@ -46,29 +43,19 @@ namespace grpc_event_engine {
namespace experimental {
namespace {
// Maximum amount of time an extra thread is allowed to idle before being
// reclaimed.
constexpr grpc_core : : Duration kIdleThreadLimit =
grpc_core : : Duration : : Seconds ( 20 ) ;
// Rate at which "Waiting for ..." logs should be printed while quiescing.
constexpr size_t kBlockingQuiesceLogRateSeconds = 3 ;
// Minumum time between thread creations.
constexpr grpc_core : : Duration kTimeBetweenThrottledThreadStarts =
grpc_core : : Duration : : Seconds ( 1 ) ;
// Minimum time a worker thread should sleep between checking for new work. Used
// in backoff calculations to reduce vigilance when the pool is calm.
constexpr grpc_core : : Duration kWorkerThreadMinSleepBetweenChecks {
grpc_core : : Duration : : Milliseconds ( 15 ) } ;
// Maximum time a worker thread should sleep between checking for new work.
constexpr grpc_core : : Duration kWorkerThreadMaxSleepBetweenChecks {
grpc_core : : Duration : : Seconds ( 3 ) } ;
// Minimum time the lifeguard thread should sleep between checks. Used in
// backoff calculations to reduce vigilance when the pool is calm.
constexpr grpc_core : : Duration kLifeguardMinSleepBetweenChecks {
grpc_core : : Duration : : Milliseconds ( 15 ) } ;
// Maximum time the lifeguard thread should sleep between checking for new work.
constexpr grpc_core : : Duration kLifeguardMaxSleepBetweenChecks {
grpc_core : : Duration : : Seconds ( 1 ) } ;
constexpr absl : : Duration kSleepBetweenQuiesceCheck { absl : : Milliseconds ( 10 ) } ;
} // namespace
thread_local WorkQueue * g_local_queue = nullptr ;
@ -126,13 +113,13 @@ void WorkStealingThreadPool::PostforkChild() { pool_->Postfork(); }
WorkStealingThreadPool : : WorkStealingThreadPoolImpl : : WorkStealingThreadPoolImpl (
size_t reserve_threads )
: reserve_threads_ ( reserve_threads ) , lifeguard_ ( this ) { }
: reserve_threads_ ( reserve_threads ) , lifeguard_ ( ) { }
void WorkStealingThreadPool : : WorkStealingThreadPoolImpl : : Start ( ) {
lifeguard_ . Start ( shared_from_this ( ) ) ;
for ( size_t i = 0 ; i < reserve_threads_ ; i + + ) {
StartThread ( ) ;
}
lifeguard_ . Start ( ) ;
}
void WorkStealingThreadPool : : WorkStealingThreadPoolImpl : : Run (
@ -168,7 +155,6 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Quiesce() {
// Note that if this is a threadpool thread then we won't exit this thread
// until all other threads have exited, so we need to wait for just one thread
// running instead of zero.
gpr_cycle_counter start_time = gpr_get_cycle_counter ( ) ;
bool is_threadpool_thread = g_local_queue ! = nullptr ;
thread_count ( ) - > BlockUntilThreadCount ( CounterType : : kLivingThreadCount ,
is_threadpool_thread ? 1 : 0 ,
@ -176,8 +162,6 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Quiesce() {
GPR_ASSERT ( queue_ . Empty ( ) ) ;
quiesced_ . store ( true , std : : memory_order_relaxed ) ;
lifeguard_ . BlockUntilShutdown ( ) ;
GRPC_EVENT_ENGINE_TRACE ( " %f cycles spent quiescing the pool " ,
gpr_get_cycle_counter ( ) - start_time ) ;
}
bool WorkStealingThreadPool : : WorkStealingThreadPoolImpl : : SetThrottled (
@ -222,21 +206,21 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Postfork() {
Start ( ) ;
}
// -------- WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard -----
// -------- WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard
// --------
WorkStealingThreadPool : : WorkStealingThreadPoolImpl : : Lifeguard : : Lifeguard (
WorkStealingThreadPoolImpl * pool )
: pool_ ( pool ) ,
backoff_ ( grpc_core : : BackOff : : Options ( )
WorkStealingThreadPool : : WorkStealingThreadPoolImpl : : Lifeguard : : Lifeguard ( )
: backoff_ ( grpc_core : : BackOff : : Options ( )
. set_initial_backoff ( kLifeguardMinSleepBetweenChecks )
. set_max_backoff ( kLifeguardMaxSleepBetweenChecks )
. set_multiplier ( 1.3 ) ) { }
void WorkStealingThreadPool : : WorkStealingThreadPoolImpl : : Lifeguard : : Start ( ) {
// lifeguard_running_ is set early to avoid a quiesce race while the
// lifeguard is still starting up.
grpc_core : : MutexLock lock ( & lifeguard_shutdown_mu_ ) ;
lifeguard_running_ = true ;
void WorkStealingThreadPool : : WorkStealingThreadPoolImpl : : Lifeguard : : Start (
std : : shared_ptr < WorkStealingThreadPoolImpl > pool ) {
// thread_running_ is set early to avoid a quiesce race while the lifeguard is
// still starting up.
thread_running_ . store ( true ) ;
pool_ = std : : move ( pool ) ;
grpc_core : : Thread (
" lifeguard " ,
[ ] ( void * arg ) {
@ -250,36 +234,21 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Start() {
void WorkStealingThreadPool : : WorkStealingThreadPoolImpl : : Lifeguard : :
LifeguardMain ( ) {
grpc_core : : MutexLock lock ( & lifeguard_shutdown_mu_ ) ;
while ( true ) {
absl : : SleepFor ( absl : : Milliseconds (
( backoff_ . NextAttemptTime ( ) - grpc_core : : Timestamp : : Now ( ) ) . millis ( ) ) ) ;
if ( pool_ - > IsForking ( ) ) break ;
// If the pool is shut down, loop quickly until quiesced. Otherwise,
// reduce the check rate if the pool is idle.
if ( pool_ - > IsShutdown ( ) ) {
if ( pool_ - > IsQuiesced ( ) ) break ;
} else {
lifeguard_shutdown_cv_ . WaitWithTimeout (
& lifeguard_shutdown_mu_ ,
absl : : Milliseconds (
( backoff_ . NextAttemptTime ( ) - grpc_core : : Timestamp : : Now ( ) )
. millis ( ) ) ) ;
}
if ( pool_ - > IsShutdown ( ) & & pool_ - > IsQuiesced ( ) ) break ;
MaybeStartNewThread ( ) ;
}
lifeguard_running_ = false ;
lifeguard_shutdown_cv_ . Signal ( ) ;
pool_ . reset ( ) ;
thread_running_ . store ( false ) ;
}
void WorkStealingThreadPool : : WorkStealingThreadPoolImpl : : Lifeguard : :
BlockUntilShutdown ( ) {
grpc_core : : MutexLock lock ( & lifeguard_shutdown_mu_ ) ;
while ( lifeguard_running_ ) {
lifeguard_shutdown_cv_ . Signal ( ) ;
lifeguard_shutdown_cv_ . WaitWithTimeout (
& lifeguard_shutdown_mu_ , absl : : Seconds ( kBlockingQuiesceLogRateSeconds ) ) ;
GRPC_LOG_EVERY_N_SEC_DELAYED ( kBlockingQuiesceLogRateSeconds , GPR_DEBUG ,
" %s " ,
" Waiting for lifeguard thread to shut down " ) ;
while ( thread_running_ . load ( ) ) {
absl : : SleepFor ( kSleepBetweenQuiesceCheck ) ;
}
}
@ -456,57 +425,36 @@ void WorkStealingThreadPool::ThreadState::FinishDraining() {
// -------- WorkStealingThreadPool::ThreadCount --------
void WorkStealingThreadPool : : ThreadCount : : Add ( CounterType counter_type ) {
grpc_core : : MutexLock lock ( & wait_mu_ [ counter_type ] ) ;
+ + thread_counts_ [ counter_type ] ;
wait_cv_ [ counter_type ] . SignalAll ( ) ;
thread_counts_ [ counter_type ] . fetch_add ( 1 , std : : memory_order_relaxed ) ;
}
void WorkStealingThreadPool : : ThreadCount : : Remove ( CounterType counter_type ) {
grpc_core : : MutexLock lock ( & wait_mu_ [ counter_type ] ) ;
- - thread_counts_ [ counter_type ] ;
wait_cv_ [ counter_type ] . SignalAll ( ) ;
thread_counts_ [ counter_type ] . fetch_sub ( 1 , std : : memory_order_relaxed ) ;
}
void WorkStealingThreadPool : : ThreadCount : : BlockUntilThreadCount (
CounterType counter_type , size_ t desired_threads , const char * why ,
CounterType counter_type , in t desired_threads , const char * why ,
WorkSignal * work_signal ) {
auto & counter = thread_counts_ [ counter_type ] ;
int curr_threads = counter . load ( std : : memory_order_relaxed ) ;
// Wait for all threads to exit.
while ( true ) {
auto curr_threads = WaitForCountChange (
counter_type , desired_threads ,
grpc_core : : Duration : : Seconds ( kBlockingQuiesceLogRateSeconds ) ) ;
if ( curr_threads = = desired_threads ) break ;
GRPC_LOG_EVERY_N_SEC_DELAYED (
kBlockingQuiesceLogRateSeconds , GPR_DEBUG ,
" Waiting for thread pool to idle before %s. (% " PRIdPTR " to % " PRIdPTR
" ) " ,
why , curr_threads , desired_threads ) ;
auto last_log_time = grpc_core : : Timestamp : : Now ( ) ;
while ( curr_threads > desired_threads ) {
absl : : SleepFor ( kSleepBetweenQuiesceCheck ) ;
work_signal - > SignalAll ( ) ;
if ( grpc_core : : Timestamp : : Now ( ) - last_log_time >
grpc_core : : Duration : : Seconds ( 3 ) ) {
gpr_log ( GPR_DEBUG ,
" Waiting for thread pool to idle before %s. (%d to %d) " , why ,
curr_threads , desired_threads ) ;
last_log_time = grpc_core : : Timestamp : : Now ( ) ;
}
curr_threads = counter . load ( std : : memory_order_relaxed ) ;
}
}
size_t WorkStealingThreadPool : : ThreadCount : : WaitForCountChange (
CounterType counter_type , size_t desired_threads ,
grpc_core : : Duration timeout ) {
size_t count ;
auto deadline = absl : : Now ( ) + absl : : Milliseconds ( timeout . millis ( ) ) ;
do {
grpc_core : : MutexLock lock ( & wait_mu_ [ counter_type ] ) ;
count = GetCountLocked ( counter_type ) ;
if ( count = = desired_threads ) break ;
wait_cv_ [ counter_type ] . WaitWithDeadline ( & wait_mu_ [ counter_type ] , deadline ) ;
} while ( absl : : Now ( ) < deadline ) ;
return count ;
}
size_t WorkStealingThreadPool : : ThreadCount : : GetCount ( CounterType counter_type ) {
grpc_core : : MutexLock lock ( & wait_mu_ [ counter_type ] ) ;
return GetCountLocked ( counter_type ) ;
}
size_t WorkStealingThreadPool : : ThreadCount : : GetCountLocked (
CounterType counter_type ) {
return thread_counts_ [ counter_type ] ;
return thread_counts_ [ counter_type ] . load ( std : : memory_order_relaxed ) ;
}
WorkStealingThreadPool : : ThreadCount : : AutoThreadCount : : AutoThreadCount (