@ -72,9 +72,14 @@ struct grpc_fd {
gpr_atm refst ;
gpr_mu mu ;
/* Indicates that the fd is shutdown and that any pending read/write closures
should fail */
bool shutdown ;
int closed ;
bool released ;
/* The fd is either closed or we relinquished control of it. In either cases,
this indicates that the ' fd ' on this structure is no longer valid */
bool orphaned ;
grpc_closure * read_closure ;
grpc_closure * write_closure ;
@ -251,16 +256,13 @@ static void polling_island_remove_all_fds_locked(polling_island *pi,
/* The caller is expected to hold pi->mu lock before calling this function */
static void polling_island_remove_fd_locked ( polling_island * pi , grpc_fd * fd ,
bool close_fd , bool remove_fd_ref ) {
bool is_fd_closed ) {
int err ;
size_t i ;
/* Calling close() on the fd will automatically remove it from the epoll set.
If not calling close ( ) , the fd must be explicitly removed from the epoll
set */
if ( close_fd ) {
close ( fd - > fd ) ;
} else {
/* If fd is already closed, then it would have been automatically been removed
from the epoll set */
if ( ! is_fd_closed ) {
err = epoll_ctl ( pi - > epoll_fd , EPOLL_CTL_DEL , fd - > fd , NULL ) ;
if ( err < 0 & & errno ! = ENOENT ) {
gpr_log ( GPR_ERROR , " epoll_ctl delete for fd: %d failed with error; %s " ,
@ -271,9 +273,7 @@ static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
for ( i = 0 ; i < pi - > fd_cnt ; i + + ) {
if ( pi - > fds [ i ] = = fd ) {
pi - > fds [ i ] = pi - > fds [ - - pi - > fd_cnt ] ;
if ( remove_fd_ref ) {
GRPC_FD_UNREF ( fd , " polling_island " ) ;
}
GRPC_FD_UNREF ( fd , " polling_island " ) ;
break ;
}
}
@ -644,8 +644,7 @@ static grpc_fd *fd_create(int fd, const char *name) {
new_fd - > polling_island = NULL ;
new_fd - > freelist_next = NULL ;
new_fd - > on_done_closure = NULL ;
new_fd - > closed = 0 ;
new_fd - > released = false ;
new_fd - > orphaned = false ;
gpr_mu_unlock ( & new_fd - > mu ) ;
@ -666,7 +665,7 @@ static bool fd_is_orphaned(grpc_fd *fd) {
static int fd_wrapped_fd ( grpc_fd * fd ) {
int ret_fd = - 1 ;
gpr_mu_lock ( & fd - > mu ) ;
if ( ! fd - > released & & ! fd - > clos ed) {
if ( ! fd - > orphan ed) {
ret_fd = fd - > fd ;
}
gpr_mu_unlock ( & fd - > mu ) ;
@ -678,34 +677,35 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure * on_done , int * release_fd ,
const char * reason ) {
/* TODO(sreek) In ev_poll_posix.c,the lock is acquired a little later. Why? */
bool is_fd_closed = false ;
gpr_mu_lock ( & fd - > mu ) ;
fd - > on_done_closure = on_done ;
/* If release_fd is not NULL, we should be relinquishing control of the file
descriptor fd - > fd ( but we still own the grpc_fd structure ) . */
fd - > released = release_fd ! = NULL ;
if ( ! fd - > released ) {
shutdown ( fd - > fd , SHUT_RDWR ) ;
} else {
if ( release_fd ! = NULL ) {
* release_fd = fd - > fd ;
} else {
close ( fd - > fd ) ;
is_fd_closed = true ;
}
REF_BY ( fd , 1 , reason ) ; /* Remove active status, but keep referenced */
fd - > closed = 1 ;
fd - > orphaned = true ;
/* Remove the active status but keep referenced. We want this grpc_fd struct
to be alive ( and not added to freelist ) until the end of this function */
REF_BY ( fd , 1 , reason ) ;
/* Remove the fd from the polling island:
- Update the fd - > polling_island to point to the latest polling island
- Remove the fd from the polling island . Also , call close ( ) on the file
descriptor fd - > fd ONLY if we haven ' t relinquised control ( i . e
fd - > released is ' false ' )
- Decrement the ref count on the polling island and det fd - > polling_island
to NULL */
- Remove the fd from the polling island .
- Remove a ref to the polling island and set fd - > polling_island to NULL */
gpr_mu_lock ( & fd - > pi_mu ) ;
if ( fd - > polling_island ! = NULL ) {
fd - > polling_island =
polling_island_update_and_lock ( fd - > polling_island , 1 , 0 ) ;
polling_island_remove_fd_locked ( fd - > polling_island , fd , ! fd - > released ,
true ) ;
polling_island_remove_fd_locked ( fd - > polling_island , fd , is_fd_closed ) ;
polling_island_unref_and_unlock ( fd - > polling_island , 1 ) ;
fd - > polling_island = NULL ;
}
@ -839,17 +839,20 @@ static void pollset_kick(grpc_pollset *p,
grpc_pollset_worker * worker = specific_worker ;
if ( worker ! = NULL ) {
if ( worker = = GRPC_POLLSET_KICK_BROADCAST ) {
GPR_TIMER_BEGIN ( " pollset_kick.broadcast " , 0 ) ;
gpr_log ( GPR_DEBUG , " pollset_kick: broadcast! " ) ;
if ( pollset_has_workers ( p ) ) {
GPR_TIMER_BEGIN ( " pollset_kick.broadcast " , 0 ) ;
for ( worker = p - > root_worker . next ; worker ! = & p - > root_worker ;
worker = worker - > next ) {
pthread_kill ( worker - > pt_id , SIGUSR1 ) ;
}
} else {
gpr_log ( GPR_DEBUG , " pollset_kick: (broadcast) Kicked without pollers " ) ;
p - > kicked_without_pollers = true ;
}
GPR_TIMER_END ( " pollset_kick.broadcast " , 0 ) ;
} else {
gpr_log ( GPR_DEBUG , " pollset_kick: kicked kicked_specifically " ) ;
GPR_TIMER_MARK ( " kicked_specifically " , 0 ) ;
worker - > kicked_specifically = true ;
pthread_kill ( worker - > pt_id , SIGUSR1 ) ;
@ -860,9 +863,11 @@ static void pollset_kick(grpc_pollset *p,
if ( worker ! = NULL ) {
GPR_TIMER_MARK ( " finally_kick " , 0 ) ;
push_back_worker ( p , worker ) ;
gpr_log ( GPR_DEBUG , " pollset_kick: anonymous kick " ) ;
pthread_kill ( worker - > pt_id , SIGUSR1 ) ;
} else {
GPR_TIMER_MARK ( " kicked_no_pollers " , 0 ) ;
gpr_log ( GPR_DEBUG , " pollset_kick: kicked without pollers " ) ;
p - > kicked_without_pollers = true ;
}
}
@ -935,6 +940,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
struct epoll_event ep_ev [ GRPC_EPOLL_MAX_EVENTS ] ;
int epoll_fd = - 1 ;
int ep_rv ;
gpr_log ( GPR_DEBUG , " pollset_work_and_unlock: Entering.. " ) ;
GPR_TIMER_BEGIN ( " pollset_work_and_unlock " , 0 ) ;
/* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
@ -949,6 +955,16 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
pollset - > polling_island =
polling_island_update_and_lock ( pollset - > polling_island , 1 , 0 ) ;
epoll_fd = pollset - > polling_island - > epoll_fd ;
if ( pollset - > polling_island - > fd_cnt = = 0 ) {
gpr_log ( GPR_DEBUG , " pollset_work_and_unlock: epoll_fd: %d, No other fds " ,
epoll_fd ) ;
}
for ( size_t i = 0 ; i < pollset - > polling_island - > fd_cnt ; i + + ) {
gpr_log ( GPR_DEBUG ,
" pollset_work_and_unlock: epoll_fd: %d, fd_count: %d, fd[%d]: %d " ,
epoll_fd , pollset - > polling_island - > fd_cnt , i ,
pollset - > polling_island - > fds [ i ] - > fd ) ;
}
gpr_mu_unlock ( & pollset - > polling_island - > mu ) ;
}
@ -958,36 +974,47 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
/* If epoll_fd == -1, this is a blank pollset and does not have any fds yet */
if ( epoll_fd ! = - 1 ) {
do {
gpr_timespec before_epoll = gpr_now ( GPR_CLOCK_PRECISE ) ;
gpr_log ( GPR_DEBUG , " pollset_work_and_unlock: epoll_wait().... " ) ;
ep_rv = epoll_pwait ( epoll_fd , ep_ev , GRPC_EPOLL_MAX_EVENTS , timeout_ms ,
sig_mask ) ;
gpr_timespec after_epoll = gpr_now ( GPR_CLOCK_PRECISE ) ;
int dur = gpr_time_to_millis ( gpr_time_sub ( after_epoll , before_epoll ) ) ;
gpr_log ( GPR_DEBUG ,
" pollset_work_and_unlock: DONE epoll_wait() : %d ms, ep_rv: %d " ,
dur , ep_rv ) ;
if ( ep_rv < 0 ) {
if ( errno ! = EINTR ) {
/* TODO (sreek) - Check for bad file descriptor error */
gpr_log ( GPR_ERROR , " epoll_pwait() failed: %s " , strerror ( errno ) ) ;
} else {
gpr_log ( GPR_DEBUG , " pollset_work_and_unlock: 0-timeout epoll_wait() " ) ;
ep_rv = epoll_wait ( epoll_fd , ep_ev , GRPC_EPOLL_MAX_EVENTS , 0 ) ;
gpr_log ( GPR_DEBUG , " pollset_work_and_unlock: ep_rv: %d " , ep_rv ) ;
}
} else {
int i ;
for ( i = 0 ; i < ep_rv ; + + i ) {
grpc_fd * fd = ep_ev [ i ] . data . ptr ;
int cancel = ep_ev [ i ] . events & ( EPOLLERR | EPOLLHUP ) ;
int read_ev = ep_ev [ i ] . events & ( EPOLLIN | EPOLLPRI ) ;
int w rit e_ev = ep_ev [ i ] . events & EPOLLOUT ;
if ( fd = = NULL ) {
grpc_wakeup_fd_consume_wakeup ( & grpc_global_wakeup_fd ) ;
} else {
if ( read_ev | | cancel ) {
fd_become_readable ( exec_ctx , fd ) ;
}
if ( write_ev | | cancel ) {
fd_become_writable ( exec_ctx , fd ) ;
}
}
int i ;
for ( i = 0 ; i < ep_rv ; + + i ) {
grpc_fd * fd = ep_ev [ i ] . data . ptr ;
int cancel = ep_ev [ i ] . events & ( EPOLLERR | EPOLLHU P ) ;
int read _ev = ep_ev [ i ] . events & ( EPOLLIN | EPOLLPRI ) ;
int write_ev = ep_ev [ i ] . events & EPOLLOUT ;
if ( fd = = NULL ) {
grpc_wakeup_fd_consume_wakeup ( & grpc_global_wakeup_fd ) ;
} else {
if ( read_ev | | cancel ) {
fd_become_readable ( exec_ctx , fd ) ;
}
if ( write_ev | | cancel ) {
fd_become_writable ( exec_ctx , fd ) ;
}
}
}
} while ( ep_rv = = GRPC_EPOLL_MAX_EVENTS ) ;
}
gpr_log ( GPR_DEBUG , " pollset_work_and_unlock: Leaving.. " ) ;
GPR_TIMER_END ( " pollset_work_and_unlock " , 0 ) ;
}
@ -1060,7 +1087,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker * * worker_hdl , gpr_timespec now ,
gpr_timespec deadline ) {
GPR_TIMER_BEGIN ( " pollset_work " , 0 ) ;
gpr_log ( GPR_DEBUG , " pollset_work: enter " ) ;
int timeout_ms = poll_deadline_to_millis_timeout ( deadline , now ) ;
sigset_t new_mask ;
@ -1079,6 +1106,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
work that needs attention like an event on the completion queue or an
alarm */
GPR_TIMER_MARK ( " pollset_work.kicked_without_pollers " , 0 ) ;
gpr_log ( GPR_INFO , " pollset_work: kicked without pollers.. " ) ;
pollset - > kicked_without_pollers = 0 ;
} else if ( ! pollset - > shutting_down ) {
sigemptyset ( & new_mask ) ;
@ -1113,12 +1141,14 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_mu_lock ( & pollset - > mu ) ;
}
gpr_log ( GPR_DEBUG , " pollset_work(): leaving " ) ;
* worker_hdl = NULL ;
GPR_TIMER_END ( " pollset_work " , 0 ) ;
}
static void pollset_add_fd ( grpc_exec_ctx * exec_ctx , grpc_pollset * pollset ,
grpc_fd * fd ) {
gpr_log ( GPR_DEBUG , " pollset_add_fd: pollset: %p, fd: %d " , pollset , fd - > fd ) ;
/* TODO sreek - Check if we need to get a pollset->mu lock here */
gpr_mu_lock ( & pollset - > pi_mu ) ;
gpr_mu_lock ( & fd - > pi_mu ) ;
@ -1146,6 +1176,7 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
} else if ( fd - > polling_island = = NULL ) {
pi_new = polling_island_update_and_lock ( pollset - > polling_island , 1 , 1 ) ;
polling_island_add_fds_locked ( pollset - > polling_island , & fd , 1 , true ) ;
gpr_mu_unlock ( & pi_new - > mu ) ;
} else if ( pollset - > polling_island = = NULL ) {
pi_new = polling_island_update_and_lock ( fd - > polling_island , 1 , 1 ) ;