@ -39,13 +39,6 @@
# define MAX_DEPTH 2
# define EXECUTOR_TRACE(format, ...) \
do { \
if ( GRPC_TRACE_FLAG_ENABLED ( executor ) ) { \
LOG ( INFO ) < < " EXECUTOR " < < absl : : StrFormat ( format , __VA_ARGS__ ) ; \
} \
} while ( 0 )
namespace grpc_core {
namespace {
@ -110,11 +103,13 @@ size_t Executor::RunClosures(const char* executor_name,
while ( c ! = nullptr ) {
grpc_closure * next = c - > next_data . next ;
# ifndef NDEBUG
EXECUTOR_TRACE ( " (%s) run %p [created by %s:%d] " , executor_name , c ,
c - > file_created , c - > line_created ) ;
GRPC_TRACE_LOG ( executor , INFO )
< < " EXECUTOR ( " < < executor_name < < " ) run " < < c < < " [created by "
< < c - > file_created < < " : " < < c - > line_created < < " ] " ;
c - > scheduled = false ;
# else
EXECUTOR_TRACE ( " (%s) run %p " , executor_name , c ) ;
GRPC_TRACE_LOG ( executor , INFO )
< < " EXECUTOR ( " < < executor_name < < " ) run " < < c ;
# endif
grpc_error_handle error =
internal : : StatusMoveFromHeapPtr ( c - > error_data . error ) ;
@ -134,11 +129,14 @@ bool Executor::IsThreaded() const {
void Executor : : SetThreading ( bool threading ) {
gpr_atm curr_num_threads = gpr_atm_acq_load ( & num_threads_ ) ;
EXECUTOR_TRACE ( " (%s) SetThreading(%d) begin " , name_ , threading ) ;
GRPC_TRACE_LOG ( executor , INFO )
< < " EXECUTOR ( " < < name_ < < " ) SetThreading( " < < threading < < " ) begin " ;
if ( threading ) {
if ( curr_num_threads > 0 ) {
EXECUTOR_TRACE ( " (%s) SetThreading(true). curr_num_threads > 0 " , name_ ) ;
GRPC_TRACE_LOG ( executor , INFO )
< < " EXECUTOR ( " < < name_
< < " ) SetThreading(true). curr_num_threads > 0 " ;
return ;
}
@ -160,7 +158,9 @@ void Executor::SetThreading(bool threading) {
thd_state_ [ 0 ] . thd . Start ( ) ;
} else { // !threading
if ( curr_num_threads = = 0 ) {
EXECUTOR_TRACE ( " (%s) SetThreading(false). curr_num_threads == 0 " , name_ ) ;
GRPC_TRACE_LOG ( executor , INFO )
< < " EXECUTOR ( " < < name_
< < " ) SetThreading(false). curr_num_threads == 0 " ;
return ;
}
@ -179,8 +179,9 @@ void Executor::SetThreading(bool threading) {
curr_num_threads = gpr_atm_no_barrier_load ( & num_threads_ ) ;
for ( gpr_atm i = 0 ; i < curr_num_threads ; i + + ) {
thd_state_ [ i ] . thd . Join ( ) ;
EXECUTOR_TRACE ( " (%s) Thread % " PRIdPTR " of % " PRIdPTR " joined " , name_ ,
i + 1 , curr_num_threads ) ;
GRPC_TRACE_LOG ( executor , INFO )
< < " EXECUTOR ( " < < name_ < < " ) Thread " < < i + 1 < < " of "
< < curr_num_threads < < " joined " ;
}
gpr_atm_rel_store ( & num_threads_ , 0 ) ;
@ -201,7 +202,8 @@ void Executor::SetThreading(bool threading) {
grpc_iomgr_platform_shutdown_background_closure ( ) ;
}
EXECUTOR_TRACE ( " (%s) SetThreading(%d) done " , name_ , threading ) ;
GRPC_TRACE_LOG ( executor , INFO )
< < " EXECUTOR ( " < < name_ < < " ) SetThreading( " < < threading < < " ) done " ;
}
void Executor : : Shutdown ( ) { SetThreading ( false ) ; }
@ -214,8 +216,9 @@ void Executor::ThreadMain(void* arg) {
size_t subtract_depth = 0 ;
for ( ; ; ) {
EXECUTOR_TRACE ( " (%s) [% " PRIdPTR " ]: step (sub_depth=% " PRIdPTR " ) " ,
ts - > name , ts - > id , subtract_depth ) ;
GRPC_TRACE_LOG ( executor , INFO )
< < " EXECUTOR ( " < < ts - > name < < " ) [ " < < ts - > id
< < " ]: step (sub_depth= " < < subtract_depth < < " ) " ;
gpr_mu_lock ( & ts - > mu ) ;
ts - > depth - = subtract_depth ;
@ -226,7 +229,8 @@ void Executor::ThreadMain(void* arg) {
}
if ( ts - > shutdown ) {
EXECUTOR_TRACE ( " (%s) [% " PRIdPTR " ]: shutdown " , ts - > name , ts - > id ) ;
GRPC_TRACE_LOG ( executor , INFO )
< < " EXECUTOR ( " < < ts - > name < < " ) [ " < < ts - > id < < " ]: shutdown " ;
gpr_mu_unlock ( & ts - > mu ) ;
break ;
}
@ -235,7 +239,8 @@ void Executor::ThreadMain(void* arg) {
ts - > elems = GRPC_CLOSURE_LIST_INIT ;
gpr_mu_unlock ( & ts - > mu ) ;
EXECUTOR_TRACE ( " (%s) [% " PRIdPTR " ]: execute " , ts - > name , ts - > id ) ;
GRPC_TRACE_LOG ( executor , INFO )
< < " EXECUTOR ( " < < ts - > name < < " ) [ " < < ts - > id < < " ]: execute " ;
ExecCtx : : Get ( ) - > InvalidateNow ( ) ;
subtract_depth = RunClosures ( ts - > name , closures ) ;
@ -257,10 +262,13 @@ void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error,
// or already shutdown), then queue the closure on the exec context itself
if ( cur_thread_count = = 0 ) {
# ifndef NDEBUG
EXECUTOR_TRACE ( " (%s) schedule %p (created %s:%d) inline " , name_ , closure ,
closure - > file_created , closure - > line_created ) ;
GRPC_TRACE_LOG ( executor , INFO )
< < " EXECUTOR ( " < < name_ < < " ) schedule " < < closure < < " (created "
< < closure - > file_created < < " : " < < closure - > line_created
< < " ) inline " ;
# else
EXECUTOR_TRACE ( " (%s) schedule %p inline " , name_ , closure ) ;
GRPC_TRACE_LOG ( executor , INFO )
< < " EXECUTOR ( " < < name_ < < " ) schedule " < < closure < < " inline " ;
# endif
grpc_closure_list_append ( ExecCtx : : Get ( ) - > closure_list ( ) , closure , error ) ;
return ;
@ -280,14 +288,15 @@ void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error,
for ( ; ; ) {
# ifndef NDEBUG
EXECUTOR_TRACE (
" (%s) try to schedule %p (%s) (created %s:%d) to thread "
" % " PRIdPTR ,
name_ , closure , is_short ? " short " : " long " , closure - > fi le_created,
closure - > line_created , ts - > id ) ;
GRPC_TRACE_LOG ( executor , INFO )
< < " EXECUTOR ( " < < name_ < < " ) try to schedule " < < closure < < " ( "
< < ( is_short ? " short " : " long " ) < < " ) (created "
< < closure - > file_created < < " : " < < closure - > lin e_created
< < " ) to thread " < < ts - > id ;
# else
EXECUTOR_TRACE ( " (%s) try to schedule %p (%s) to thread % " PRIdPTR , name_ ,
closure , is_short ? " short " : " long " , ts - > id ) ;
GRPC_TRACE_LOG ( executor , INFO )
< < " EXECUTOR ( " < < name_ < < " ) try to schedule " < < closure < < " ( "
< < ( is_short ? " short " : " long " ) < < " ) to thread " < < ts - > id ;
# endif
gpr_mu_lock ( & ts - > mu ) ;
@ -429,7 +438,8 @@ bool Executor::IsThreadedDefault() {
}
void Executor : : SetThreadingAll ( bool enable ) {
EXECUTOR_TRACE ( " Executor::SetThreadingAll(%d) called " , enable ) ;
GRPC_TRACE_LOG ( executor , INFO )
< < " EXECUTOR Executor::SetThreadingAll( " < < enable < < " ) called " ;
for ( size_t i = 0 ; i < static_cast < size_t > ( ExecutorType : : NUM_EXECUTORS ) ;
i + + ) {
executors [ i ] - > SetThreading ( enable ) ;
@ -437,7 +447,8 @@ void Executor::SetThreadingAll(bool enable) {
}
void Executor : : SetThreadingDefault ( bool enable ) {
EXECUTOR_TRACE ( " Executor::SetThreadingDefault(%d) called " , enable ) ;
GRPC_TRACE_LOG ( executor , INFO )
< < " EXECUTOR Executor::SetThreadingDefault( " < < enable < < " ) called " ;
executors [ static_cast < size_t > ( ExecutorType : : DEFAULT ) ] - > SetThreading ( enable ) ;
}