@ -46,6 +46,7 @@
# include "src/core/lib/iomgr/lockfree_event.h"
# include "src/core/lib/iomgr/lockfree_event.h"
# include "src/core/lib/iomgr/wakeup_fd_posix.h"
# include "src/core/lib/iomgr/wakeup_fd_posix.h"
# include "src/core/lib/profiling/timers.h"
# include "src/core/lib/profiling/timers.h"
# include "src/core/lib/support/string.h"
static grpc_wakeup_fd global_wakeup_fd ;
static grpc_wakeup_fd global_wakeup_fd ;
static int g_epfd ;
static int g_epfd ;
@ -78,8 +79,21 @@ static void fd_global_shutdown(void);
typedef enum { UNKICKED , KICKED , DESIGNATED_POLLER } kick_state ;
typedef enum { UNKICKED , KICKED , DESIGNATED_POLLER } kick_state ;
static const char * kick_state_string ( kick_state st ) {
switch ( st ) {
case UNKICKED :
return " UNKICKED " ;
case KICKED :
return " KICKED " ;
case DESIGNATED_POLLER :
return " DESIGNATED_POLLER " ;
}
GPR_UNREACHABLE_CODE ( return " UNKNOWN " ) ;
}
struct grpc_pollset_worker {
struct grpc_pollset_worker {
kick_state kick_state ;
kick_state kick_state ;
int kick_state_mutator ; // which line of code last changed kick state
bool initialized_cv ;
bool initialized_cv ;
grpc_pollset_worker * next ;
grpc_pollset_worker * next ;
grpc_pollset_worker * prev ;
grpc_pollset_worker * prev ;
@ -87,6 +101,12 @@ struct grpc_pollset_worker {
grpc_closure_list schedule_on_end_work ;
grpc_closure_list schedule_on_end_work ;
} ;
} ;
# define SET_KICK_STATE(worker, state) \
do { \
( worker ) - > kick_state = ( state ) ; \
( worker ) - > kick_state_mutator = __LINE__ ; \
} while ( false )
# define MAX_NEIGHBOURHOODS 1024
# define MAX_NEIGHBOURHOODS 1024
typedef struct pollset_neighbourhood {
typedef struct pollset_neighbourhood {
@ -101,10 +121,15 @@ struct grpc_pollset {
bool reassigning_neighbourhood ;
bool reassigning_neighbourhood ;
grpc_pollset_worker * root_worker ;
grpc_pollset_worker * root_worker ;
bool kicked_without_poller ;
bool kicked_without_poller ;
/* Set to true if the pollset is observed to have no workers available to
* poll */
bool seen_inactive ;
bool seen_inactive ;
bool shutting_down ; /* Is the pollset shutting down ? */
bool shutting_down ; /* Is the pollset shutting down ? */
bool finish_shutdown_called ; /* Is the 'finish_shutdown_locked()' called ? */
grpc_closure * shutdown_closure ; /* Called after after shutdown is complete */
grpc_closure * shutdown_closure ; /* Called after after shutdown is complete */
/* Number of workers who are *about-to* attach themselves to the pollset
* worker list */
int begin_refs ;
int begin_refs ;
grpc_pollset * next ;
grpc_pollset * next ;
@ -264,29 +289,23 @@ static bool fd_is_shutdown(grpc_fd *fd) {
static void fd_notify_on_read ( grpc_exec_ctx * exec_ctx , grpc_fd * fd ,
static void fd_notify_on_read ( grpc_exec_ctx * exec_ctx , grpc_fd * fd ,
grpc_closure * closure ) {
grpc_closure * closure ) {
grpc_lfev_notify_on ( exec_ctx , & fd - > read_closure , closure ) ;
grpc_lfev_notify_on ( exec_ctx , & fd - > read_closure , closure , " read " ) ;
}
}
static void fd_notify_on_write ( grpc_exec_ctx * exec_ctx , grpc_fd * fd ,
static void fd_notify_on_write ( grpc_exec_ctx * exec_ctx , grpc_fd * fd ,
grpc_closure * closure ) {
grpc_closure * closure ) {
grpc_lfev_notify_on ( exec_ctx , & fd - > write_closure , closure ) ;
grpc_lfev_notify_on ( exec_ctx , & fd - > write_closure , closure , " write " ) ;
}
}
static void fd_become_readable ( grpc_exec_ctx * exec_ctx , grpc_fd * fd ,
static void fd_become_readable ( grpc_exec_ctx * exec_ctx , grpc_fd * fd ,
grpc_pollset * notifier ) {
grpc_pollset * notifier ) {
grpc_lfev_set_ready ( exec_ctx , & fd - > read_closure ) ;
grpc_lfev_set_ready ( exec_ctx , & fd - > read_closure , " read " ) ;
/* Note, it is possible that fd_become_readable might be called twice with
different ' notifier ' s when an fd becomes readable and it is in two epoll
sets ( This can happen briefly during polling island merges ) . In such cases
it does not really matter which notifer is set as the read_notifier_pollset
( They would both point to the same polling island anyway ) */
/* Use release store to match with acquire load in fd_get_read_notifier */
/* Use release store to match with acquire load in fd_get_read_notifier */
gpr_atm_rel_store ( & fd - > read_notifier_pollset , ( gpr_atm ) notifier ) ;
gpr_atm_rel_store ( & fd - > read_notifier_pollset , ( gpr_atm ) notifier ) ;
}
}
static void fd_become_writable ( grpc_exec_ctx * exec_ctx , grpc_fd * fd ) {
static void fd_become_writable ( grpc_exec_ctx * exec_ctx , grpc_fd * fd ) {
grpc_lfev_set_ready ( exec_ctx , & fd - > write_closure ) ;
grpc_lfev_set_ready ( exec_ctx , & fd - > write_closure , " write " ) ;
}
}
/*******************************************************************************
/*******************************************************************************
@ -411,18 +430,28 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
if ( pollset - > root_worker ! = NULL ) {
if ( pollset - > root_worker ! = NULL ) {
grpc_pollset_worker * worker = pollset - > root_worker ;
grpc_pollset_worker * worker = pollset - > root_worker ;
do {
do {
switch ( worker - > kick_state ) {
case KICKED :
break ;
case UNKICKED :
SET_KICK_STATE ( worker , KICKED ) ;
if ( worker - > initialized_cv ) {
if ( worker - > initialized_cv ) {
worker - > kick_state = KICKED ;
gpr_cv_signal ( & worker - > cv ) ;
gpr_cv_signal ( & worker - > cv ) ;
} else {
}
worker - > kick_state = KICKED ;
break ;
case DESIGNATED_POLLER :
SET_KICK_STATE ( worker , KICKED ) ;
append_error ( & error , grpc_wakeup_fd_wakeup ( & global_wakeup_fd ) ,
append_error ( & error , grpc_wakeup_fd_wakeup ( & global_wakeup_fd ) ,
" pollset_shutdown " ) ;
" pollset_kick_all " ) ;
break ;
}
}
worker = worker - > next ;
worker = worker - > next ;
} while ( worker ! = pollset - > root_worker ) ;
} while ( worker ! = pollset - > root_worker ) ;
}
}
// TODO: sreek. Check if we need to set 'kicked_without_poller' to true here
// in the else case
return error ;
return error ;
}
}
@ -438,7 +467,9 @@ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
static void pollset_shutdown ( grpc_exec_ctx * exec_ctx , grpc_pollset * pollset ,
static void pollset_shutdown ( grpc_exec_ctx * exec_ctx , grpc_pollset * pollset ,
grpc_closure * closure ) {
grpc_closure * closure ) {
GPR_ASSERT ( pollset - > shutdown_closure = = NULL ) ;
GPR_ASSERT ( pollset - > shutdown_closure = = NULL ) ;
GPR_ASSERT ( ! pollset - > shutting_down ) ;
pollset - > shutdown_closure = closure ;
pollset - > shutdown_closure = closure ;
pollset - > shutting_down = true ;
GRPC_LOG_IF_ERROR ( " pollset_shutdown " , pollset_kick_all ( pollset ) ) ;
GRPC_LOG_IF_ERROR ( " pollset_shutdown " , pollset_kick_all ( pollset ) ) ;
pollset_maybe_finish_shutdown ( exec_ctx , pollset ) ;
pollset_maybe_finish_shutdown ( exec_ctx , pollset ) ;
}
}
@ -506,10 +537,14 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_millis deadline ) {
grpc_millis deadline ) {
if ( worker_hdl ! = NULL ) * worker_hdl = worker ;
if ( worker_hdl ! = NULL ) * worker_hdl = worker ;
worker - > initialized_cv = false ;
worker - > initialized_cv = false ;
worker - > kick_state = UNKICKED ;
SET_KICK_STATE ( worker , UNKICKED ) ;
worker - > schedule_on_end_work = ( grpc_closure_list ) GRPC_CLOSURE_LIST_INIT ;
worker - > schedule_on_end_work = ( grpc_closure_list ) GRPC_CLOSURE_LIST_INIT ;
pollset - > begin_refs + + ;
pollset - > begin_refs + + ;
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_ERROR , " PS:%p BEGIN_STARTS:%p " , pollset , worker ) ;
}
if ( pollset - > seen_inactive ) {
if ( pollset - > seen_inactive ) {
// pollset has been observed to be inactive, we need to move back to the
// pollset has been observed to be inactive, we need to move back to the
// active list
// active list
@ -525,6 +560,11 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
retry_lock_neighbourhood :
retry_lock_neighbourhood :
gpr_mu_lock ( & neighbourhood - > mu ) ;
gpr_mu_lock ( & neighbourhood - > mu ) ;
gpr_mu_lock ( & pollset - > mu ) ;
gpr_mu_lock ( & pollset - > mu ) ;
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_ERROR , " PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d " ,
pollset , worker , kick_state_string ( worker - > kick_state ) ,
is_reassigning ) ;
}
if ( pollset - > seen_inactive ) {
if ( pollset - > seen_inactive ) {
if ( neighbourhood ! = pollset - > neighbourhood ) {
if ( neighbourhood ! = pollset - > neighbourhood ) {
gpr_mu_unlock ( & neighbourhood - > mu ) ;
gpr_mu_unlock ( & neighbourhood - > mu ) ;
@ -535,8 +575,14 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset - > seen_inactive = false ;
pollset - > seen_inactive = false ;
if ( neighbourhood - > active_root = = NULL ) {
if ( neighbourhood - > active_root = = NULL ) {
neighbourhood - > active_root = pollset - > next = pollset - > prev = pollset ;
neighbourhood - > active_root = pollset - > next = pollset - > prev = pollset ;
if ( gpr_atm_no_barrier_cas ( & g_active_poller , 0 , ( gpr_atm ) worker ) ) {
/* TODO: sreek. Why would this worker state be other than UNKICKED
worker - > kick_state = DESIGNATED_POLLER ;
* here ? ( since the worker isn ' t added to the pollset yet , there is no
* way it can be " found " by other threads to get kicked ) . */
/* If there is no designated poller, make this the designated poller */
if ( worker - > kick_state = = UNKICKED & &
gpr_atm_no_barrier_cas ( & g_active_poller , 0 , ( gpr_atm ) worker ) ) {
SET_KICK_STATE ( worker , DESIGNATED_POLLER ) ;
}
}
} else {
} else {
pollset - > next = neighbourhood - > active_root ;
pollset - > next = neighbourhood - > active_root ;
@ -550,25 +596,54 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
}
gpr_mu_unlock ( & neighbourhood - > mu ) ;
gpr_mu_unlock ( & neighbourhood - > mu ) ;
}
}
worker_insert ( pollset , worker ) ;
worker_insert ( pollset , worker ) ;
pollset - > begin_refs - - ;
pollset - > begin_refs - - ;
if ( worker - > kick_state = = UNKICKED ) {
if ( worker - > kick_state = = UNKICKED & & ! pollset - > kicked_without_poller ) {
GPR_ASSERT ( gpr_atm_no_barrier_load ( & g_active_poller ) ! = ( gpr_atm ) worker ) ;
GPR_ASSERT ( gpr_atm_no_barrier_load ( & g_active_poller ) ! = ( gpr_atm ) worker ) ;
worker - > initialized_cv = true ;
worker - > initialized_cv = true ;
gpr_cv_init ( & worker - > cv ) ;
gpr_cv_init ( & worker - > cv ) ;
while ( worker - > kick_state = = UNKICKED & &
while ( worker - > kick_state = = UNKICKED & & ! pollset - > shutting_down ) {
pollset - > shutdown_closure = = NULL ) {
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_ERROR , " PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d " ,
pollset , worker , kick_state_string ( worker - > kick_state ) ,
pollset - > shutting_down ) ;
}
if ( gpr_cv_wait ( & worker - > cv , & pollset - > mu ,
if ( gpr_cv_wait ( & worker - > cv , & pollset - > mu ,
grpc_millis_to_timespec ( deadline , GPR_CLOCK_REALTIME ) ) & &
grpc_millis_to_timespec ( deadline , GPR_CLOCK_REALTIME ) ) & &
worker - > kick_state = = UNKICKED ) {
worker - > kick_state = = UNKICKED ) {
worker - > kick_state = KICKED ;
/* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
received a kick */
SET_KICK_STATE ( worker , KICKED ) ;
}
}
}
}
grpc_exec_ctx_invalidate_now ( exec_ctx ) ;
grpc_exec_ctx_invalidate_now ( exec_ctx ) ;
}
}
return worker - > kick_state = = DESIGNATED_POLLER & &
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
pollset - > shutdown_closure = = NULL ;
gpr_log ( GPR_ERROR ,
" PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
" kicked_without_poller: %d " ,
pollset , worker , kick_state_string ( worker - > kick_state ) ,
pollset - > shutting_down , pollset - > kicked_without_poller ) ;
}
/* We release pollset lock in this function at a couple of places:
* 1. Briefly when assigning pollset to a neighbourhood
* 2. When doing gpr_cv_wait ( )
* It is possible that ' kicked_without_poller ' was set to true during ( 1 ) and
* ' shutting_down ' is set to true during ( 1 ) or ( 2 ) . If either of them is
* true , this worker cannot do polling */
/* TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
* case ; especially when the worker is the DESIGNATED_POLLER */
if ( pollset - > kicked_without_poller ) {
pollset - > kicked_without_poller = false ;
return false ;
}
return worker - > kick_state = = DESIGNATED_POLLER & & ! pollset - > shutting_down ;
}
}
static bool check_neighbourhood_for_available_poller (
static bool check_neighbourhood_for_available_poller (
@ -588,10 +663,18 @@ static bool check_neighbourhood_for_available_poller(
case UNKICKED :
case UNKICKED :
if ( gpr_atm_no_barrier_cas ( & g_active_poller , 0 ,
if ( gpr_atm_no_barrier_cas ( & g_active_poller , 0 ,
( gpr_atm ) inspect_worker ) ) {
( gpr_atm ) inspect_worker ) ) {
inspect_worker - > kick_state = DESIGNATED_POLLER ;
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_DEBUG , " .. choose next poller to be %p " ,
inspect_worker ) ;
}
SET_KICK_STATE ( inspect_worker , DESIGNATED_POLLER ) ;
if ( inspect_worker - > initialized_cv ) {
if ( inspect_worker - > initialized_cv ) {
gpr_cv_signal ( & inspect_worker - > cv ) ;
gpr_cv_signal ( & inspect_worker - > cv ) ;
}
}
} else {
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_DEBUG , " .. beaten to choose next poller " ) ;
}
}
}
// even if we didn't win the cas, there's a worker, we can stop
// even if we didn't win the cas, there's a worker, we can stop
found_worker = true ;
found_worker = true ;
@ -604,9 +687,12 @@ static bool check_neighbourhood_for_available_poller(
break ;
break ;
}
}
inspect_worker = inspect_worker - > next ;
inspect_worker = inspect_worker - > next ;
} while ( inspect_worker ! = inspect - > root_worker ) ;
} while ( ! found_worker & & inspect_worker ! = inspect - > root_worker ) ;
}
}
if ( ! found_worker ) {
if ( ! found_worker ) {
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_DEBUG , " .. mark pollset %p inactive " , inspect ) ;
}
inspect - > seen_inactive = true ;
inspect - > seen_inactive = true ;
if ( inspect = = neighbourhood - > active_root ) {
if ( inspect = = neighbourhood - > active_root ) {
neighbourhood - > active_root =
neighbourhood - > active_root =
@ -624,15 +710,22 @@ static bool check_neighbourhood_for_available_poller(
static void end_worker ( grpc_exec_ctx * exec_ctx , grpc_pollset * pollset ,
static void end_worker ( grpc_exec_ctx * exec_ctx , grpc_pollset * pollset ,
grpc_pollset_worker * worker ,
grpc_pollset_worker * worker ,
grpc_pollset_worker * * worker_hdl ) {
grpc_pollset_worker * * worker_hdl ) {
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_DEBUG , " PS:%p END_WORKER:%p " , pollset , worker ) ;
}
if ( worker_hdl ! = NULL ) * worker_hdl = NULL ;
if ( worker_hdl ! = NULL ) * worker_hdl = NULL ;
worker - > kick_state = KICKED ;
/* Make sure we appear kicked */
SET_KICK_STATE ( worker , KICKED ) ;
grpc_closure_list_move ( & worker - > schedule_on_end_work ,
grpc_closure_list_move ( & worker - > schedule_on_end_work ,
& exec_ctx - > closure_list ) ;
& exec_ctx - > closure_list ) ;
if ( gpr_atm_no_barrier_load ( & g_active_poller ) = = ( gpr_atm ) worker ) {
if ( gpr_atm_no_barrier_load ( & g_active_poller ) = = ( gpr_atm ) worker ) {
if ( worker - > next ! = worker & & worker - > next - > kick_state = = UNKICKED ) {
if ( worker - > next ! = worker & & worker - > next - > kick_state = = UNKICKED ) {
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_DEBUG , " .. choose next poller to be peer %p " , worker ) ;
}
GPR_ASSERT ( worker - > next - > initialized_cv ) ;
GPR_ASSERT ( worker - > next - > initialized_cv ) ;
gpr_atm_no_barrier_store ( & g_active_poller , ( gpr_atm ) worker - > next ) ;
gpr_atm_no_barrier_store ( & g_active_poller , ( gpr_atm ) worker - > next ) ;
worker - > next - > kick_state = DESIGNATED_POLLER ;
SET_KICK_STATE ( worker - > next , DESIGNATED_POLLER ) ;
gpr_cv_signal ( & worker - > next - > cv ) ;
gpr_cv_signal ( & worker - > next - > cv ) ;
if ( grpc_exec_ctx_has_work ( exec_ctx ) ) {
if ( grpc_exec_ctx_has_work ( exec_ctx ) ) {
gpr_mu_unlock ( & pollset - > mu ) ;
gpr_mu_unlock ( & pollset - > mu ) ;
@ -641,9 +734,9 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
}
} else {
} else {
gpr_atm_no_barrier_store ( & g_active_poller , 0 ) ;
gpr_atm_no_barrier_store ( & g_active_poller , 0 ) ;
gpr_mu_unlock ( & pollset - > mu ) ;
size_t poller_neighbourhood_idx =
size_t poller_neighbourhood_idx =
( size_t ) ( pollset - > neighbourhood - g_neighbourhoods ) ;
( size_t ) ( pollset - > neighbourhood - g_neighbourhoods ) ;
gpr_mu_unlock ( & pollset - > mu ) ;
bool found_worker = false ;
bool found_worker = false ;
bool scan_state [ MAX_NEIGHBOURHOODS ] ;
bool scan_state [ MAX_NEIGHBOURHOODS ] ;
for ( size_t i = 0 ; ! found_worker & & i < g_num_neighbourhoods ; i + + ) {
for ( size_t i = 0 ; ! found_worker & & i < g_num_neighbourhoods ; i + + ) {
@ -679,6 +772,9 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if ( worker - > initialized_cv ) {
if ( worker - > initialized_cv ) {
gpr_cv_destroy ( & worker - > cv ) ;
gpr_cv_destroy ( & worker - > cv ) ;
}
}
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_DEBUG , " .. remove worker " ) ;
}
if ( EMPTIED = = worker_remove ( pollset , worker ) ) {
if ( EMPTIED = = worker_remove ( pollset , worker ) ) {
pollset_maybe_finish_shutdown ( exec_ctx , pollset ) ;
pollset_maybe_finish_shutdown ( exec_ctx , pollset ) ;
}
}
@ -699,15 +795,17 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset - > kicked_without_poller = false ;
pollset - > kicked_without_poller = false ;
return GRPC_ERROR_NONE ;
return GRPC_ERROR_NONE ;
}
}
gpr_tls_set ( & g_current_thread_pollset , ( intptr_t ) pollset ) ;
if ( begin_worker ( exec_ctx , pollset , & worker , worker_hdl , deadline ) ) {
if ( begin_worker ( exec_ctx , pollset , & worker , worker_hdl , deadline ) ) {
gpr_tls_set ( & g_current_thread_pollset , ( intptr_t ) pollset ) ;
gpr_tls_set ( & g_current_thread_worker , ( intptr_t ) & worker ) ;
gpr_tls_set ( & g_current_thread_worker , ( intptr_t ) & worker ) ;
GPR_ASSERT ( ! pollset - > shutdown_closure ) ;
GPR_ASSERT ( ! pollset - > shutting_ down ) ;
GPR_ASSERT ( ! pollset - > seen_inactive ) ;
GPR_ASSERT ( ! pollset - > seen_inactive ) ;
gpr_mu_unlock ( & pollset - > mu ) ;
gpr_mu_unlock ( & pollset - > mu ) ;
append_error ( & error , pollset_epoll ( exec_ctx , pollset , deadline ) , err_desc ) ;
append_error ( & error , pollset_epoll ( exec_ctx , pollset , deadline ) , err_desc ) ;
gpr_mu_lock ( & pollset - > mu ) ;
gpr_mu_lock ( & pollset - > mu ) ;
gpr_tls_set ( & g_current_thread_worker , 0 ) ;
gpr_tls_set ( & g_current_thread_worker , 0 ) ;
} else {
gpr_tls_set ( & g_current_thread_pollset , ( intptr_t ) pollset ) ;
}
}
end_worker ( exec_ctx , pollset , & worker , worker_hdl ) ;
end_worker ( exec_ctx , pollset , & worker , worker_hdl ) ;
gpr_tls_set ( & g_current_thread_pollset , 0 ) ;
gpr_tls_set ( & g_current_thread_pollset , 0 ) ;
@ -716,46 +814,136 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
static grpc_error * pollset_kick ( grpc_pollset * pollset ,
static grpc_error * pollset_kick ( grpc_pollset * pollset ,
grpc_pollset_worker * specific_worker ) {
grpc_pollset_worker * specific_worker ) {
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_strvec log ;
gpr_strvec_init ( & log ) ;
char * tmp ;
gpr_asprintf (
& tmp , " PS:%p KICK:%p curps=%p curworker=%p root=%p " , pollset ,
specific_worker , ( void * ) gpr_tls_get ( & g_current_thread_pollset ) ,
( void * ) gpr_tls_get ( & g_current_thread_worker ) , pollset - > root_worker ) ;
gpr_strvec_add ( & log , tmp ) ;
if ( pollset - > root_worker ! = NULL ) {
gpr_asprintf ( & tmp , " {kick_state=%s next=%p {kick_state=%s}} " ,
kick_state_string ( pollset - > root_worker - > kick_state ) ,
pollset - > root_worker - > next ,
kick_state_string ( pollset - > root_worker - > next - > kick_state ) ) ;
gpr_strvec_add ( & log , tmp ) ;
}
if ( specific_worker ! = NULL ) {
gpr_asprintf ( & tmp , " worker_kick_state=%s " ,
kick_state_string ( specific_worker - > kick_state ) ) ;
gpr_strvec_add ( & log , tmp ) ;
}
tmp = gpr_strvec_flatten ( & log , NULL ) ;
gpr_strvec_destroy ( & log ) ;
gpr_log ( GPR_ERROR , " %s " , tmp ) ;
gpr_free ( tmp ) ;
}
if ( specific_worker = = NULL ) {
if ( specific_worker = = NULL ) {
if ( gpr_tls_get ( & g_current_thread_pollset ) ! = ( intptr_t ) pollset ) {
if ( gpr_tls_get ( & g_current_thread_pollset ) ! = ( intptr_t ) pollset ) {
grpc_pollset_worker * root_worker = pollset - > root_worker ;
grpc_pollset_worker * root_worker = pollset - > root_worker ;
if ( root_worker = = NULL ) {
if ( root_worker = = NULL ) {
pollset - > kicked_without_poller = true ;
pollset - > kicked_without_poller = true ;
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_ERROR , " .. kicked_without_poller " ) ;
}
return GRPC_ERROR_NONE ;
return GRPC_ERROR_NONE ;
}
}
grpc_pollset_worker * next_worker = root_worker - > next ;
grpc_pollset_worker * next_worker = root_worker - > next ;
if ( root_worker = = next_worker & &
if ( root_worker - > kick_state = = KICKED ) {
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_ERROR , " .. already kicked %p " , root_worker ) ;
}
SET_KICK_STATE ( root_worker , KICKED ) ;
return GRPC_ERROR_NONE ;
} else if ( next_worker - > kick_state = = KICKED ) {
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_ERROR , " .. already kicked %p " , next_worker ) ;
}
SET_KICK_STATE ( next_worker , KICKED ) ;
return GRPC_ERROR_NONE ;
} else if ( root_worker = =
next_worker & & // only try and wake up a poller if
// there is no next worker
root_worker = = ( grpc_pollset_worker * ) gpr_atm_no_barrier_load (
root_worker = = ( grpc_pollset_worker * ) gpr_atm_no_barrier_load (
& g_active_poller ) ) {
& g_active_poller ) ) {
root_worker - > kick_state = KICKED ;
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_ERROR , " .. kicked %p " , root_worker ) ;
}
SET_KICK_STATE ( root_worker , KICKED ) ;
return grpc_wakeup_fd_wakeup ( & global_wakeup_fd ) ;
return grpc_wakeup_fd_wakeup ( & global_wakeup_fd ) ;
} else if ( next_worker - > kick_state = = UNKICKED ) {
} else if ( next_worker - > kick_state = = UNKICKED ) {
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_ERROR , " .. kicked %p " , next_worker ) ;
}
GPR_ASSERT ( next_worker - > initialized_cv ) ;
GPR_ASSERT ( next_worker - > initialized_cv ) ;
next_worker - > kick_state = KICKED ;
SET_KICK_STATE ( next_worker , KICKED ) ;
gpr_cv_signal ( & next_worker - > cv ) ;
gpr_cv_signal ( & next_worker - > cv ) ;
return GRPC_ERROR_NONE ;
return GRPC_ERROR_NONE ;
} else if ( next_worker - > kick_state = = DESIGNATED_POLLER ) {
if ( root_worker - > kick_state ! = DESIGNATED_POLLER ) {
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log (
GPR_ERROR ,
" .. kicked root non-poller %p (initialized_cv=%d) (poller=%p) " ,
root_worker , root_worker - > initialized_cv , next_worker ) ;
}
SET_KICK_STATE ( root_worker , KICKED ) ;
if ( root_worker - > initialized_cv ) {
gpr_cv_signal ( & root_worker - > cv ) ;
}
return GRPC_ERROR_NONE ;
} else {
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_ERROR , " .. non-root poller %p (root=%p) " , next_worker ,
root_worker ) ;
}
SET_KICK_STATE ( next_worker , KICKED ) ;
return grpc_wakeup_fd_wakeup ( & global_wakeup_fd ) ;
}
} else {
} else {
GPR_ASSERT ( next_worker - > kick_state = = KICKED ) ;
SET_KICK_STATE ( next_worker , KICKED ) ;
return GRPC_ERROR_NONE ;
return GRPC_ERROR_NONE ;
}
}
} else {
} else {
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_ERROR , " .. kicked while waking up " ) ;
}
return GRPC_ERROR_NONE ;
return GRPC_ERROR_NONE ;
}
}
} else if ( specific_worker - > kick_state = = KICKED ) {
} else if ( specific_worker - > kick_state = = KICKED ) {
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_ERROR , " .. specific worker already kicked " ) ;
}
return GRPC_ERROR_NONE ;
return GRPC_ERROR_NONE ;
} else if ( gpr_tls_get ( & g_current_thread_worker ) = =
} else if ( gpr_tls_get ( & g_current_thread_worker ) = =
( intptr_t ) specific_worker ) {
( intptr_t ) specific_worker ) {
specific_worker - > kick_state = KICKED ;
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_ERROR , " .. mark %p kicked " , specific_worker ) ;
}
SET_KICK_STATE ( specific_worker , KICKED ) ;
return GRPC_ERROR_NONE ;
return GRPC_ERROR_NONE ;
} else if ( specific_worker = =
} else if ( specific_worker = =
( grpc_pollset_worker * ) gpr_atm_no_barrier_load ( & g_active_poller ) ) {
( grpc_pollset_worker * ) gpr_atm_no_barrier_load ( & g_active_poller ) ) {
specific_worker - > kick_state = KICKED ;
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_ERROR , " .. kick active poller " ) ;
}
SET_KICK_STATE ( specific_worker , KICKED ) ;
return grpc_wakeup_fd_wakeup ( & global_wakeup_fd ) ;
return grpc_wakeup_fd_wakeup ( & global_wakeup_fd ) ;
} else if ( specific_worker - > initialized_cv ) {
} else if ( specific_worker - > initialized_cv ) {
specific_worker - > kick_state = KICKED ;
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_ERROR , " .. kick waiting worker " ) ;
}
SET_KICK_STATE ( specific_worker , KICKED ) ;
gpr_cv_signal ( & specific_worker - > cv ) ;
gpr_cv_signal ( & specific_worker - > cv ) ;
return GRPC_ERROR_NONE ;
return GRPC_ERROR_NONE ;
} else {
} else {
specific_worker - > kick_state = KICKED ;
if ( GRPC_TRACER_ON ( grpc_polling_trace ) ) {
gpr_log ( GPR_ERROR , " .. kick non-waiting worker " ) ;
}
SET_KICK_STATE ( specific_worker , KICKED ) ;
return GRPC_ERROR_NONE ;
return GRPC_ERROR_NONE ;
}
}
}
}
@ -801,6 +989,7 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
static void shutdown_engine ( void ) {
static void shutdown_engine ( void ) {
fd_global_shutdown ( ) ;
fd_global_shutdown ( ) ;
pollset_global_shutdown ( ) ;
pollset_global_shutdown ( ) ;
close ( g_epfd ) ;
}
}
static const grpc_event_engine_vtable vtable = {
static const grpc_event_engine_vtable vtable = {
@ -837,9 +1026,6 @@ static const grpc_event_engine_vtable vtable = {
/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
* Create a dummy epoll_fd to make sure epoll support is available */
* Create a dummy epoll_fd to make sure epoll support is available */
const grpc_event_engine_vtable * grpc_init_epoll1_linux ( bool explicit_request ) {
const grpc_event_engine_vtable * grpc_init_epoll1_linux ( bool explicit_request ) {
/* TODO(ctiller): temporary, until this stabilizes */
if ( ! explicit_request ) return NULL ;
if ( ! grpc_has_wakeup_fd ( ) ) {
if ( ! grpc_has_wakeup_fd ( ) ) {
return NULL ;
return NULL ;
}
}
@ -858,6 +1044,8 @@ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
return NULL ;
return NULL ;
}
}
gpr_log ( GPR_ERROR , " grpc epoll fd: %d " , g_epfd ) ;
return & vtable ;
return & vtable ;
}
}