@ -727,9 +727,8 @@ static grpc_error_handle do_epoll_wait(grpc_pollset* ps,
if ( r < 0 ) return GRPC_OS_ERROR ( errno , " epoll_wait " ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " ps: " < < ps < < " poll got " < < r < < " events " ;
}
GRPC_TRACE_LOG ( polling , INFO )
< < " ps: " < < ps < < " poll got " < < r < < " events " ;
gpr_atm_rel_store ( & g_epoll_set . num_events , r ) ;
gpr_atm_rel_store ( & g_epoll_set . cursor , 0 ) ;
@ -746,9 +745,8 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
worker - > schedule_on_end_work = ( grpc_closure_list ) GRPC_CLOSURE_LIST_INIT ;
pollset - > begin_refs + + ;
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " PS: " < < pollset < < " BEGIN_STARTS: " < < worker ;
}
GRPC_TRACE_LOG ( polling , INFO )
< < " PS: " < < pollset < < " BEGIN_STARTS: " < < worker ;
if ( pollset - > seen_inactive ) {
// pollset has been observed to be inactive, we need to move back to the
@ -765,11 +763,10 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
retry_lock_neighborhood :
gpr_mu_lock ( & neighborhood - > mu ) ;
gpr_mu_lock ( & pollset - > mu ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " PS: " < < pollset < < " BEGIN_REORG: " < < worker
< < " kick_state= " < < kick_state_string ( worker - > state )
< < " is_reassigning= " < < is_reassigning ;
}
GRPC_TRACE_LOG ( polling , INFO )
< < " PS: " < < pollset < < " BEGIN_REORG: " < < worker
< < " kick_state= " < < kick_state_string ( worker - > state )
< < " is_reassigning= " < < is_reassigning ;
if ( pollset - > seen_inactive ) {
if ( neighborhood ! = pollset - > neighborhood ) {
gpr_mu_unlock ( & neighborhood - > mu ) ;
@ -818,11 +815,10 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
worker - > initialized_cv = true ;
gpr_cv_init ( & worker - > cv ) ;
while ( worker - > state = = UNKICKED & & ! pollset - > shutting_down ) {
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " PS: " < < pollset < < " BEGIN_WAIT: " < < worker
< < " kick_state= " < < kick_state_string ( worker - > state )
< < " shutdown= " < < pollset - > shutting_down ;
}
GRPC_TRACE_LOG ( polling , INFO )
< < " PS: " < < pollset < < " BEGIN_WAIT: " < < worker
< < " kick_state= " < < kick_state_string ( worker - > state )
< < " shutdown= " < < pollset - > shutting_down ;
if ( gpr_cv_wait ( & worker - > cv , & pollset - > mu ,
deadline . as_timespec ( GPR_CLOCK_MONOTONIC ) ) & &
@ -877,17 +873,15 @@ static bool check_neighborhood_for_available_poller(
if ( gpr_atm_no_barrier_cas (
& g_active_poller , 0 ,
reinterpret_cast < gpr_atm > ( inspect_worker ) ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " .. choose next poller to be " < < inspect_worker ;
}
GRPC_TRACE_LOG ( polling , INFO )
< < " .. choose next poller to be " < < inspect_worker ;
SET_KICK_STATE ( inspect_worker , DESIGNATED_POLLER ) ;
if ( inspect_worker - > initialized_cv ) {
gpr_cv_signal ( & inspect_worker - > cv ) ;
}
} else {
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " .. beaten to choose next poller " ;
}
GRPC_TRACE_LOG ( polling , INFO )
< < " .. beaten to choose next poller " ;
}
// even if we didn't win the cas, there's a worker, we can stop
found_worker = true ;
@ -903,9 +897,8 @@ static bool check_neighborhood_for_available_poller(
} while ( ! found_worker & & inspect_worker ! = inspect - > root_worker ) ;
}
if ( ! found_worker ) {
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " .. mark pollset " < < inspect < < " inactive " ;
}
GRPC_TRACE_LOG ( polling , INFO )
< < " .. mark pollset " < < inspect < < " inactive " ;
inspect - > seen_inactive = true ;
if ( inspect = = neighborhood - > active_root ) {
neighborhood - > active_root =
@ -922,9 +915,7 @@ static bool check_neighborhood_for_available_poller(
static void end_worker ( grpc_pollset * pollset , grpc_pollset_worker * worker ,
grpc_pollset_worker * * worker_hdl ) {
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " PS: " < < pollset < < " END_WORKER: " < < worker ;
}
GRPC_TRACE_LOG ( polling , INFO ) < < " PS: " < < pollset < < " END_WORKER: " < < worker ;
if ( worker_hdl ! = nullptr ) * worker_hdl = nullptr ;
// Make sure we appear kicked
SET_KICK_STATE ( worker , KICKED ) ;
@ -933,9 +924,8 @@ static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
if ( gpr_atm_no_barrier_load ( & g_active_poller ) = =
reinterpret_cast < gpr_atm > ( worker ) ) {
if ( worker - > next ! = worker & & worker - > next - > state = = UNKICKED ) {
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " .. choose next poller to be peer " < < worker ;
}
GRPC_TRACE_LOG ( polling , INFO )
< < " .. choose next poller to be peer " < < worker ;
CHECK ( worker - > next - > initialized_cv ) ;
gpr_atm_no_barrier_store ( & g_active_poller , ( gpr_atm ) worker - > next ) ;
SET_KICK_STATE ( worker - > next , DESIGNATED_POLLER ) ;
@ -984,9 +974,7 @@ static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
if ( worker - > initialized_cv ) {
gpr_cv_destroy ( & worker - > cv ) ;
}
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " .. remove worker " ;
}
GRPC_TRACE_LOG ( polling , INFO ) < < " .. remove worker " ;
if ( EMPTIED = = worker_remove ( pollset , worker ) ) {
pollset_maybe_finish_shutdown ( pollset ) ;
}
@ -1075,22 +1063,16 @@ static grpc_error_handle pollset_kick(grpc_pollset* pollset,
grpc_pollset_worker * root_worker = pollset - > root_worker ;
if ( root_worker = = nullptr ) {
pollset - > kicked_without_poller = true ;
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " .. kicked_without_poller " ;
}
GRPC_TRACE_LOG ( polling , INFO ) < < " .. kicked_without_poller " ;
goto done ;
}
grpc_pollset_worker * next_worker = root_worker - > next ;
if ( root_worker - > state = = KICKED ) {
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " .. already kicked " < < root_worker ;
}
GRPC_TRACE_LOG ( polling , INFO ) < < " .. already kicked " < < root_worker ;
SET_KICK_STATE ( root_worker , KICKED ) ;
goto done ;
} else if ( next_worker - > state = = KICKED ) {
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " .. already kicked " < < next_worker ;
}
GRPC_TRACE_LOG ( polling , INFO ) < < " .. already kicked " < < next_worker ;
SET_KICK_STATE ( next_worker , KICKED ) ;
goto done ;
} else if ( root_worker = = next_worker & & // only try and wake up a poller
@ -1098,27 +1080,22 @@ static grpc_error_handle pollset_kick(grpc_pollset* pollset,
root_worker = =
reinterpret_cast < grpc_pollset_worker * > (
gpr_atm_no_barrier_load ( & g_active_poller ) ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " .. kicked " < < root_worker ;
}
GRPC_TRACE_LOG ( polling , INFO ) < < " .. kicked " < < root_worker ;
SET_KICK_STATE ( root_worker , KICKED ) ;
ret_err = grpc_wakeup_fd_wakeup ( & global_wakeup_fd ) ;
goto done ;
} else if ( next_worker - > state = = UNKICKED ) {
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " .. kicked " < < next_worker ;
}
GRPC_TRACE_LOG ( polling , INFO ) < < " .. kicked " < < next_worker ;
CHECK ( next_worker - > initialized_cv ) ;
SET_KICK_STATE ( next_worker , KICKED ) ;
gpr_cv_signal ( & next_worker - > cv ) ;
goto done ;
} else if ( next_worker - > state = = DESIGNATED_POLLER ) {
if ( root_worker - > state ! = DESIGNATED_POLLER ) {
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " .. kicked root non-poller " < < root_worker
< < " (initialized_cv= " < < root_worker - > initialized_cv
< < " ) (poller= " < < next_worker < < " ) " ;
}
GRPC_TRACE_LOG ( polling , INFO )
< < " .. kicked root non-poller " < < root_worker
< < " (initialized_cv= " < < root_worker - > initialized_cv
< < " ) (poller= " < < next_worker < < " ) " ;
SET_KICK_STATE ( root_worker , KICKED ) ;
if ( root_worker - > initialized_cv ) {
gpr_cv_signal ( & root_worker - > cv ) ;
@ -1137,9 +1114,7 @@ static grpc_error_handle pollset_kick(grpc_pollset* pollset,
goto done ;
}
} else {
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " .. kicked while waking up " ;
}
GRPC_TRACE_LOG ( polling , INFO ) < < " .. kicked while waking up " ;
goto done ;
}
@ -1147,36 +1122,27 @@ static grpc_error_handle pollset_kick(grpc_pollset* pollset,
}
if ( specific_worker - > state = = KICKED ) {
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " .. specific worker already kicked " ;
}
GRPC_TRACE_LOG ( polling , INFO ) < < " .. specific worker already kicked " ;
goto done ;
} else if ( g_current_thread_worker = = specific_worker ) {
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " .. mark " < < specific_worker < < " kicked " ;
}
GRPC_TRACE_LOG ( polling , INFO )
< < " .. mark " < < specific_worker < < " kicked " ;
SET_KICK_STATE ( specific_worker , KICKED ) ;
goto done ;
} else if ( specific_worker = =
reinterpret_cast < grpc_pollset_worker * > (
gpr_atm_no_barrier_load ( & g_active_poller ) ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " .. kick active poller " ;
}
GRPC_TRACE_LOG ( polling , INFO ) < < " .. kick active poller " ;
SET_KICK_STATE ( specific_worker , KICKED ) ;
ret_err = grpc_wakeup_fd_wakeup ( & global_wakeup_fd ) ;
goto done ;
} else if ( specific_worker - > initialized_cv ) {
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " .. kick waiting worker " ;
}
GRPC_TRACE_LOG ( polling , INFO ) < < " .. kick waiting worker " ;
SET_KICK_STATE ( specific_worker , KICKED ) ;
gpr_cv_signal ( & specific_worker - > cv ) ;
goto done ;
} else {
if ( GRPC_TRACE_FLAG_ENABLED ( polling ) ) {
LOG ( INFO ) < < " .. kick non-waiting worker " ;
}
GRPC_TRACE_LOG ( polling , INFO ) < < " .. kick non-waiting worker " ;
SET_KICK_STATE ( specific_worker , KICKED ) ;
goto done ;
}