@ -46,12 +46,33 @@
int evthread_use_threads ( void ) ;
static void grpc_em_fd_impl_destroy ( struct grpc_em_fd_impl * impl ) ;
# define ALARM_TRIGGER_INIT ((gpr_atm)0)
# define ALARM_TRIGGER_INCREMENT ((gpr_atm)1)
# define DONE_SHUTDOWN ((void *)1)
# define POLLER_ID_INVALID ((gpr_atm)-1)
typedef struct grpc_em_fd_impl {
grpc_em_task task ; /* Base class, callbacks, queues, etc */
int fd ; /* File descriptor */
/* Note that the shutdown event is only needed as a workaround for libevent
not properly handling event_active on an in flight event . */
struct event * shutdown_ev ; /* activated to trigger shutdown */
/* protect shutdown_started|read_state|write_state and ensure barriers
between notify_on_ [ read | write ] and read | write callbacks */
gpr_mu mu ;
int shutdown_started ; /* 0 -> shutdown not started, 1 -> started */
grpc_em_fd_state read_state ;
grpc_em_fd_state write_state ;
/* descriptor delete list. These are destroyed during polling. */
struct grpc_em_fd_impl * next ;
} grpc_em_fd_impl ;
/* ================== grpc_em implementation ===================== */
/* If anything is in the work queue, process one item and return 1.
@ -83,7 +104,18 @@ static void timer_callback(int fd, short events, void *context) {
event_base_loopbreak ( ( struct event_base * ) context ) ;
}
/* Spend some time polling if no other thread is.
static void free_fd_list ( grpc_em_fd_impl * impl ) {
while ( impl ! = NULL ) {
grpc_em_fd_impl * current = impl ;
impl = impl - > next ;
grpc_em_fd_impl_destroy ( current ) ;
gpr_free ( current ) ;
}
}
/* Spend some time doing polling and libevent maintenance work if no other
thread is . This includes both polling for events and destroying / closing file
descriptor objects .
Returns 1 if polling was performed , 0 otherwise .
Requires em - > mu locked , may unlock and relock during the call . */
static int maybe_do_polling_work ( grpc_em * em , struct timeval delay ) {
@ -92,6 +124,10 @@ static int maybe_do_polling_work(grpc_em *em, struct timeval delay) {
if ( em - > num_pollers ) return 0 ;
em - > num_pollers = 1 ;
free_fd_list ( em - > fds_to_free ) ;
em - > fds_to_free = NULL ;
gpr_mu_unlock ( & em - > mu ) ;
event_add ( em - > timeout_ev , & delay ) ;
@ -102,6 +138,11 @@ static int maybe_do_polling_work(grpc_em *em, struct timeval delay) {
event_del ( em - > timeout_ev ) ;
gpr_mu_lock ( & em - > mu ) ;
if ( em - > fds_to_free ) {
free_fd_list ( em - > fds_to_free ) ;
em - > fds_to_free = NULL ;
}
em - > num_pollers = 0 ;
gpr_cv_broadcast ( & em - > cv ) ;
return 1 ;
@ -191,6 +232,7 @@ grpc_em_error grpc_em_init(grpc_em *em) {
em - > num_fds = 0 ;
em - > last_poll_completed = gpr_now ( ) ;
em - > shutdown_backup_poller = 0 ;
em - > fds_to_free = NULL ;
gpr_event_init ( & em - > backup_poller_done ) ;
@ -247,6 +289,8 @@ grpc_em_error grpc_em_destroy(grpc_em *em) {
;
gpr_mu_unlock ( & em - > mu ) ;
free_fd_list ( em - > fds_to_free ) ;
/* complete shutdown */
gpr_mu_destroy ( & em - > mu ) ;
gpr_cv_destroy ( & em - > cv ) ;
@ -284,6 +328,7 @@ static void add_task(grpc_em *em, grpc_em_activation_data *adata) {
static void alarm_ev_destroy ( grpc_em_alarm * alarm ) {
grpc_em_activation_data * adata = & alarm - > task . activation [ GRPC_EM_TA_ONLY ] ;
if ( adata - > ev ! = NULL ) {
/* TODO(klempner): Is this safe to do when we're cancelling? */
event_free ( adata - > ev ) ;
adata - > ev = NULL ;
}
@ -368,16 +413,14 @@ grpc_em_error grpc_em_alarm_cancel(grpc_em_alarm *alarm) {
/* ==================== grpc_em_fd implementation =================== */
/* Proxy callback to call a gRPC read/write callback */
static void em_fd_cb ( int fd , short what , void * arg /*=em_fd*/ ) {
grpc_em_fd * em_fd = arg ;
static void em_fd_cb ( int fd , short what , void * arg /*=em_fd_impl */ ) {
grpc_em_fd_impl * em_fd = arg ;
grpc_em_cb_status status = GRPC_CALLBACK_SUCCESS ;
int run_read_cb = 0 ;
int run_write_cb = 0 ;
grpc_em_activation_data * rdata , * wdata ;
gpr_mu_lock ( & em_fd - > mu ) ;
/* TODO(klempner): We need to delete the event here too so we avoid spurious
shutdowns . */
if ( em_fd - > shutdown_started ) {
status = GRPC_CALLBACK_CANCELLED ;
} else if ( status = = GRPC_CALLBACK_SUCCESS & & ( what & EV_TIMEOUT ) ) {
@ -428,28 +471,32 @@ static void em_fd_shutdown_cb(int fd, short what, void *arg /*=em_fd*/) {
that libevent ' s handling of event_active ( ) on an event which is already in
flight on a different thread is racy and easily triggers TSAN .
*/
grpc_em_fd * em_fd = arg ;
gpr_mu_lock ( & em_fd - > mu ) ;
em_fd - > shutdown_started = 1 ;
if ( em_fd - > read_state = = GRPC_EM_FD_WAITING ) {
event_active ( em_fd - > task . activation [ GRPC_EM_TA_READ ] . ev , EV_READ , 1 ) ;
grpc_em_fd_impl * impl = arg ;
gpr_mu_lock ( & impl - > mu ) ;
impl - > shutdown_started = 1 ;
if ( impl - > read_state = = GRPC_EM_FD_WAITING ) {
event_active ( impl - > task . activation [ GRPC_EM_TA_READ ] . ev , EV_READ , 1 ) ;
}
if ( em_fd - > write_state = = GRPC_EM_FD_WAITING ) {
event_active ( em_fd - > task . activation [ GRPC_EM_TA_WRITE ] . ev , EV_WRITE , 1 ) ;
if ( impl - > write_state = = GRPC_EM_FD_WAITING ) {
event_active ( impl - > task . activation [ GRPC_EM_TA_WRITE ] . ev , EV_WRITE , 1 ) ;
}
gpr_mu_unlock ( & em_fd - > mu ) ;
gpr_mu_unlock ( & impl - > mu ) ;
}
grpc_em_error grpc_em_fd_init ( grpc_em_fd * em_fd , grpc_em * em , int fd ) {
int flags ;
grpc_em_activation_data * rdata , * wdata ;
grpc_em_fd_impl * impl = gpr_malloc ( sizeof ( grpc_em_fd_impl ) ) ;
gpr_mu_lock ( & em - > mu ) ;
em - > num_fds + + ;
gpr_mu_unlock ( & em - > mu ) ;
em_fd - > shutdown_ev = NULL ;
gpr_mu_init ( & em_fd - > mu ) ;
em_fd - > impl = impl ;
impl - > shutdown_ev = NULL ;
gpr_mu_init ( & impl - > mu ) ;
flags = fcntl ( fd , F_GETFL , 0 ) ;
if ( ( flags & O_NONBLOCK ) = = 0 ) {
@ -457,11 +504,11 @@ grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd) {
return GRPC_EM_INVALID_ARGUMENTS ;
}
em_fd - > task . type = GRPC_EM_TASK_FD ;
em_fd - > task . em = em ;
em_fd - > fd = fd ;
impl - > task . type = GRPC_EM_TASK_FD ;
impl - > task . em = em ;
impl - > fd = fd ;
rdata = & ( em_fd - > task . activation [ GRPC_EM_TA_READ ] ) ;
rdata = & ( impl - > task . activation [ GRPC_EM_TA_READ ] ) ;
rdata - > ev = NULL ;
rdata - > cb = NULL ;
rdata - > arg = NULL ;
@ -469,7 +516,7 @@ grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd) {
rdata - > prev = NULL ;
rdata - > next = NULL ;
wdata = & ( em_fd - > task . activation [ GRPC_EM_TA_WRITE ] ) ;
wdata = & ( impl - > task . activation [ GRPC_EM_TA_WRITE ] ) ;
wdata - > ev = NULL ;
wdata - > cb = NULL ;
wdata - > arg = NULL ;
@ -477,49 +524,45 @@ grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd) {
wdata - > prev = NULL ;
wdata - > next = NULL ;
em_fd - > read_state = GRPC_EM_FD_IDLE ;
em_fd - > write_state = GRPC_EM_FD_IDLE ;
impl - > read_state = GRPC_EM_FD_IDLE ;
impl - > write_state = GRPC_EM_FD_IDLE ;
impl - > shutdown_started = 0 ;
impl - > next = NULL ;
/* TODO(chenw): detect platforms where only level trigger is supported,
and set the event to non - persist . */
rdata - > ev = event_new ( em - > event_base , em_fd - > fd , EV_ET | EV_PERSIST | EV_READ ,
em_fd_cb , em_fd ) ;
rdata - > ev = event_new ( em - > event_base , impl - > fd , EV_ET | EV_PERSIST | EV_READ ,
em_fd_cb , impl ) ;
if ( ! rdata - > ev ) {
gpr_log ( GPR_ERROR , " Failed to create read event " ) ;
return GRPC_EM_ERROR ;
}
wdata - > ev = event_new ( em - > event_base , em_fd - > fd ,
EV_ET | EV_PERSIST | EV_WRITE , em_fd_cb , em_fd ) ;
wdata - > ev = event_new ( em - > event_base , impl - > fd , EV_ET | EV_PERSIST | EV_WRITE ,
em_fd_cb , impl ) ;
if ( ! wdata - > ev ) {
gpr_log ( GPR_ERROR , " Failed to create write event " ) ;
return GRPC_EM_ERROR ;
}
em_fd - > shutdown_ev =
event_new ( em - > event_base , - 1 , EV_READ , em_fd_shutdown_cb , em_fd ) ;
impl - > shutdown_ev =
event_new ( em - > event_base , - 1 , EV_READ , em_fd_shutdown_cb , impl ) ;
if ( ! em_fd - > shutdown_ev ) {
if ( ! impl - > shutdown_ev ) {
gpr_log ( GPR_ERROR , " Failed to create shutdown event " ) ;
return GRPC_EM_ERROR ;
}
em_fd - > shutdown_started = 0 ;
return GRPC_EM_OK ;
}
void grpc_em_fd_destroy ( grpc_em_fd * em_fd ) {
static void grpc_em_fd_impl _destroy ( grpc_em_fd_impl * impl ) {
grpc_em_task_activity_type type ;
grpc_em_activation_data * adata ;
grpc_em * em = em_fd - > task . em ;
/* ensure anyone holding the lock has left - it's the callers responsibility
to ensure that no new users enter */
gpr_mu_lock ( & em_fd - > mu ) ;
gpr_mu_unlock ( & em_fd - > mu ) ;
for ( type = GRPC_EM_TA_READ ; type < GRPC_EM_TA_COUNT ; type + + ) {
adata = & ( em_fd - > task . activation [ type ] ) ;
adata = & ( impl - > task . activation [ type ] ) ;
GPR_ASSERT ( adata - > next = = NULL ) ;
if ( adata - > ev ! = NULL ) {
event_free ( adata - > ev ) ;
@ -527,24 +570,43 @@ void grpc_em_fd_destroy(grpc_em_fd *em_fd) {
}
}
if ( em_fd - > shutdown_ev ! = NULL ) {
event_free ( em_fd - > shutdown_ev ) ;
em_fd - > shutdown_ev = NULL ;
if ( impl - > shutdown_ev ! = NULL ) {
event_free ( impl - > shutdown_ev ) ;
impl - > shutdown_ev = NULL ;
}
gpr_mu_destroy ( & em_fd - > mu ) ;
gpr_mu_destroy ( & impl - > mu ) ;
close ( impl - > fd ) ;
}
void grpc_em_fd_destroy ( grpc_em_fd * em_fd ) {
grpc_em_fd_impl * impl = em_fd - > impl ;
grpc_em * em = impl - > task . em ;
gpr_mu_lock ( & em - > mu ) ;
if ( em - > num_pollers = = 0 ) {
/* it is safe to simply free it */
grpc_em_fd_impl_destroy ( impl ) ;
gpr_free ( impl ) ;
} else {
/* Put the impl on the list to be destroyed by the poller. */
impl - > next = em - > fds_to_free ;
em - > fds_to_free = impl ;
/* Kick the poller so it closes the fd promptly.
* TODO ( klempner ) : maybe this should be a different event .
*/
event_active ( em_fd - > impl - > shutdown_ev , EV_READ , 1 ) ;
}
em - > num_fds - - ;
gpr_cv_broadcast ( & em - > cv ) ;
gpr_mu_unlock ( & em - > mu ) ;
close ( em_fd - > fd ) ;
}
int grpc_em_fd_get ( struct grpc_em_fd * em_fd ) { return em_fd - > fd ; }
int grpc_em_fd_get ( struct grpc_em_fd * em_fd ) { return em_fd - > impl - > fd ; }
/* Returns the event manager associated with *em_fd. */
grpc_em * grpc_em_fd_get_em ( grpc_em_fd * em_fd ) { return em_fd - > task . em ; }
grpc_em * grpc_em_fd_get_em ( grpc_em_fd * em_fd ) { return em_fd - > impl - > task . em ; }
/* TODO(chenw): should we enforce the contract that notify_on_read cannot be
called when the previously registered callback has not been called yet . */
@ -552,6 +614,7 @@ grpc_em_error grpc_em_fd_notify_on_read(grpc_em_fd *em_fd,
grpc_em_cb_func read_cb ,
void * read_cb_arg ,
gpr_timespec deadline ) {
grpc_em_fd_impl * impl = em_fd - > impl ;
int force_event = 0 ;
grpc_em_activation_data * rdata ;
grpc_em_error result = GRPC_EM_OK ;
@ -560,22 +623,22 @@ grpc_em_error grpc_em_fd_notify_on_read(grpc_em_fd *em_fd,
struct timeval * delayp =
gpr_time_cmp ( deadline , gpr_inf_future ) ? & delay : NULL ;
rdata = & em_fd - > task . activation [ GRPC_EM_TA_READ ] ;
rdata = & impl - > task . activation [ GRPC_EM_TA_READ ] ;
gpr_mu_lock ( & em_fd - > mu ) ;
gpr_mu_lock ( & impl - > mu ) ;
rdata - > cb = read_cb ;
rdata - > arg = read_cb_arg ;
force_event =
( em_fd - > shutdown_started | | em_fd - > read_state = = GRPC_EM_FD_CACHED ) ;
em_fd - > read_state = GRPC_EM_FD_WAITING ;
( impl - > shutdown_started | | impl - > read_state = = GRPC_EM_FD_CACHED ) ;
impl - > read_state = GRPC_EM_FD_WAITING ;
if ( force_event ) {
event_active ( rdata - > ev , EV_READ , 1 ) ;
} else if ( event_add ( rdata - > ev , delayp ) = = - 1 ) {
result = GRPC_EM_ERROR ;
}
gpr_mu_unlock ( & em_fd - > mu ) ;
gpr_mu_unlock ( & impl - > mu ) ;
return result ;
}
@ -583,6 +646,7 @@ grpc_em_error grpc_em_fd_notify_on_write(grpc_em_fd *em_fd,
grpc_em_cb_func write_cb ,
void * write_cb_arg ,
gpr_timespec deadline ) {
grpc_em_fd_impl * impl = em_fd - > impl ;
int force_event = 0 ;
grpc_em_activation_data * wdata ;
grpc_em_error result = GRPC_EM_OK ;
@ -591,27 +655,27 @@ grpc_em_error grpc_em_fd_notify_on_write(grpc_em_fd *em_fd,
struct timeval * delayp =
gpr_time_cmp ( deadline , gpr_inf_future ) ? & delay : NULL ;
wdata = & em_fd - > task . activation [ GRPC_EM_TA_WRITE ] ;
wdata = & impl - > task . activation [ GRPC_EM_TA_WRITE ] ;
gpr_mu_lock ( & em_fd - > mu ) ;
gpr_mu_lock ( & impl - > mu ) ;
wdata - > cb = write_cb ;
wdata - > arg = write_cb_arg ;
force_event =
( em_fd - > shutdown_started | | em_fd - > write_state = = GRPC_EM_FD_CACHED ) ;
em_fd - > write_state = GRPC_EM_FD_WAITING ;
( impl - > shutdown_started | | impl - > write_state = = GRPC_EM_FD_CACHED ) ;
impl - > write_state = GRPC_EM_FD_WAITING ;
if ( force_event ) {
event_active ( wdata - > ev , EV_WRITE , 1 ) ;
} else if ( event_add ( wdata - > ev , delayp ) = = - 1 ) {
result = GRPC_EM_ERROR ;
}
gpr_mu_unlock ( & em_fd - > mu ) ;
gpr_mu_unlock ( & impl - > mu ) ;
return result ;
}
void grpc_em_fd_shutdown ( grpc_em_fd * em_fd ) {
event_active ( em_fd - > shutdown_ev , EV_READ , 1 ) ;
event_active ( em_fd - > impl - > shutdown_ev , EV_READ , 1 ) ;
}
/*====================== Other callback functions ======================*/