@ -69,6 +69,9 @@ static int grpc_polling_trace = 0; /* Disabled by default */
gpr_log ( GPR_INFO , ( fmt ) , __VA_ARGS__ ) ; \
}
/* Uncomment the following enable extra checks on poll_object operations */
/* #define PO_DEBUG */
static int grpc_wakeup_signal = - 1 ;
static bool is_grpc_wakeup_signal_initialized = false ;
@ -95,10 +98,42 @@ void grpc_use_signal(int signum) {
struct polling_island ;
typedef enum {
POLL_OBJ_FD ,
POLL_OBJ_POLLSET ,
POLL_OBJ_POLLSET_SET
} poll_obj_type ;
typedef struct poll_obj {
# ifdef PO_DEBUG
poll_obj_type obj_type ;
# endif
gpr_mu mu ;
struct polling_island * pi ;
} poll_obj ;
const char * poll_obj_string ( poll_obj_type po_type ) {
switch ( po_type ) {
case POLL_OBJ_FD :
return " fd " ;
case POLL_OBJ_POLLSET :
return " pollset " ;
case POLL_OBJ_POLLSET_SET :
return " pollset_set " ;
}
GPR_UNREACHABLE_CODE ( return " UNKNOWN " ) ;
}
/*******************************************************************************
* Fd Declarations
*/
# define FD_FROM_PO(po) ((grpc_fd *)(po))
struct grpc_fd {
poll_obj po ;
int fd ;
/* refst format:
bit 0 : 1 = Active / 0 = Orphaned
@ -106,8 +141,6 @@ struct grpc_fd {
Ref / Unref by two to avoid altering the orphaned bit */
gpr_atm refst ;
gpr_mu mu ;
/* Indicates that the fd is shutdown and that any pending read/write closures
should fail */
bool shutdown ;
@ -120,9 +153,6 @@ struct grpc_fd {
grpc_closure * read_closure ;
grpc_closure * write_closure ;
/* The polling island to which this fd belongs to (protected by mu) */
struct polling_island * polling_island ;
struct grpc_fd * freelist_next ;
grpc_closure * on_done_closure ;
@ -225,41 +255,21 @@ struct grpc_pollset_worker {
} ;
struct grpc_pollset {
gpr_mu mu ;
poll_obj po ;
grpc_pollset_worker root_worker ;
bool kicked_without_pollers ;
bool shutting_down ; /* Is the pollset shutting down ? */
bool finish_shutdown_called ; /* Is the 'finish_shutdown_locked()' called ? */
grpc_closure * shutdown_done ; /* Called after after shutdown is complete */
/* The polling island to which this pollset belongs to */
struct polling_island * polling_island ;
} ;
/*******************************************************************************
* Pollset - set Declarations
*/
/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
* directly points to a polling_island ( and adding an fd / pollset / pollset_set to
* the current pollset_set would result in polling island merges . This would
* remove the need to maintain fd_count here . This will also significantly
* simplify the grpc_fd structure since we would no longer need to explicitly
* maintain the orphaned state */
struct grpc_pollset_set {
gpr_mu mu ;
size_t pollset_count ;
size_t pollset_capacity ;
grpc_pollset * * pollsets ;
size_t pollset_set_count ;
size_t pollset_set_capacity ;
struct grpc_pollset_set * * pollset_sets ;
size_t fd_count ;
size_t fd_capacity ;
grpc_fd * * fds ;
poll_obj po ;
} ;
/*******************************************************************************
@ -915,7 +925,7 @@ static void fd_global_shutdown(void) {
while ( fd_freelist ! = NULL ) {
grpc_fd * fd = fd_freelist ;
fd_freelist = fd_freelist - > freelist_next ;
gpr_mu_destroy ( & fd - > mu ) ;
gpr_mu_destroy ( & fd - > po . mu ) ;
gpr_free ( fd ) ;
}
gpr_mu_destroy ( & fd_freelist_mu ) ;
@ -933,13 +943,17 @@ static grpc_fd *fd_create(int fd, const char *name) {
if ( new_fd = = NULL ) {
new_fd = gpr_malloc ( sizeof ( grpc_fd ) ) ;
gpr_mu_init ( & new_fd - > mu ) ;
gpr_mu_init ( & new_fd - > po . mu ) ;
}
/* Note: It is not really needed to get the new_fd->mu lock here. If this is a
newly created fd ( or an fd we got from the freelist ) , no one else would be
holding a lock to it anyway . */
gpr_mu_lock ( & new_fd - > mu ) ;
/* Note: It is not really needed to get the new_fd->po.mu lock here. If this
* is a newly created fd ( or an fd we got from the freelist ) , no one else
* would be holding a lock to it anyway . */
gpr_mu_lock ( & new_fd - > po . mu ) ;
new_fd - > po . pi = NULL ;
# ifdef PO_DEBUG
new_fd - > po . obj_type = POLL_OBJ_FD ;
# endif
gpr_atm_rel_store ( & new_fd - > refst , ( gpr_atm ) 1 ) ;
new_fd - > fd = fd ;
@ -947,12 +961,11 @@ static grpc_fd *fd_create(int fd, const char *name) {
new_fd - > orphaned = false ;
new_fd - > read_closure = CLOSURE_NOT_READY ;
new_fd - > write_closure = CLOSURE_NOT_READY ;
new_fd - > polling_island = NULL ;
new_fd - > freelist_next = NULL ;
new_fd - > on_done_closure = NULL ;
new_fd - > read_notifier_pollset = NULL ;
gpr_mu_unlock ( & new_fd - > mu ) ;
gpr_mu_unlock ( & new_fd - > po . mu ) ;
char * fd_name ;
gpr_asprintf ( & fd_name , " %s fd=%d " , name , fd ) ;
@ -964,17 +977,13 @@ static grpc_fd *fd_create(int fd, const char *name) {
return new_fd ;
}
static bool fd_is_orphaned ( grpc_fd * fd ) {
return ( gpr_atm_acq_load ( & fd - > refst ) & 1 ) = = 0 ;
}
static int fd_wrapped_fd ( grpc_fd * fd ) {
int ret_fd = - 1 ;
gpr_mu_lock ( & fd - > mu ) ;
gpr_mu_lock ( & fd - > po . mu ) ;
if ( ! fd - > orphaned ) {
ret_fd = fd - > fd ;
}
gpr_mu_unlock ( & fd - > mu ) ;
gpr_mu_unlock ( & fd - > po . mu ) ;
return ret_fd ;
}
@ -986,7 +995,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_error * error = GRPC_ERROR_NONE ;
polling_island * unref_pi = NULL ;
gpr_mu_lock ( & fd - > mu ) ;
gpr_mu_lock ( & fd - > po . mu ) ;
fd - > on_done_closure = on_done ;
/* If release_fd is not NULL, we should be relinquishing control of the file
@ -1006,25 +1015,25 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
/* Remove the fd from the polling island:
- Get a lock on the latest polling island ( i . e the last island in the
linked list pointed by fd - > polling_island ) . This is the island that
linked list pointed by fd - > po . pi ) . This is the island that
would actually contain the fd
- Remove the fd from the latest polling island
- Unlock the latest polling island
- Set fd - > polling_island to NULL ( but remove the ref on the polling island
- Set fd - > po . pi to NULL ( but remove the ref on the polling island
before doing this . ) */
if ( fd - > polling_island ! = NULL ) {
polling_island * pi_latest = polling_island_lock ( fd - > polling_island ) ;
if ( fd - > po . pi ! = NULL ) {
polling_island * pi_latest = polling_island_lock ( fd - > po . pi ) ;
polling_island_remove_fd_locked ( pi_latest , fd , is_fd_closed , & error ) ;
gpr_mu_unlock ( & pi_latest - > mu ) ;
unref_pi = fd - > polling_island ;
fd - > polling_island = NULL ;
unref_pi = fd - > po . pi ;
fd - > po . pi = NULL ;
}
grpc_exec_ctx_sched ( exec_ctx , fd - > on_done_closure , GRPC_ERROR_REF ( error ) ,
NULL ) ;
gpr_mu_unlock ( & fd - > mu ) ;
gpr_mu_unlock ( & fd - > po . mu ) ;
UNREF_BY ( fd , 2 , reason ) ; /* Drop the reference */
if ( unref_pi ! = NULL ) {
/* Unref stale polling island here, outside the fd lock above.
@ -1089,23 +1098,23 @@ static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
grpc_fd * fd ) {
grpc_pollset * notifier = NULL ;
gpr_mu_lock ( & fd - > mu ) ;
gpr_mu_lock ( & fd - > po . mu ) ;
notifier = fd - > read_notifier_pollset ;
gpr_mu_unlock ( & fd - > mu ) ;
gpr_mu_unlock ( & fd - > po . mu ) ;
return notifier ;
}
static bool fd_is_shutdown ( grpc_fd * fd ) {
gpr_mu_lock ( & fd - > mu ) ;
gpr_mu_lock ( & fd - > po . mu ) ;
const bool r = fd - > shutdown ;
gpr_mu_unlock ( & fd - > mu ) ;
gpr_mu_unlock ( & fd - > po . mu ) ;
return r ;
}
/* Might be called multiple times */
static void fd_shutdown ( grpc_exec_ctx * exec_ctx , grpc_fd * fd ) {
gpr_mu_lock ( & fd - > mu ) ;
gpr_mu_lock ( & fd - > po . mu ) ;
/* Do the actual shutdown only once */
if ( ! fd - > shutdown ) {
fd - > shutdown = true ;
@ -1116,28 +1125,28 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
set_ready_locked ( exec_ctx , fd , & fd - > read_closure ) ;
set_ready_locked ( exec_ctx , fd , & fd - > write_closure ) ;
}
gpr_mu_unlock ( & fd - > mu ) ;
gpr_mu_unlock ( & fd - > po . mu ) ;
}
static void fd_notify_on_read ( grpc_exec_ctx * exec_ctx , grpc_fd * fd ,
grpc_closure * closure ) {
gpr_mu_lock ( & fd - > mu ) ;
gpr_mu_lock ( & fd - > po . mu ) ;
notify_on_locked ( exec_ctx , fd , & fd - > read_closure , closure ) ;
gpr_mu_unlock ( & fd - > mu ) ;
gpr_mu_unlock ( & fd - > po . mu ) ;
}
static void fd_notify_on_write ( grpc_exec_ctx * exec_ctx , grpc_fd * fd ,
grpc_closure * closure ) {
gpr_mu_lock ( & fd - > mu ) ;
gpr_mu_lock ( & fd - > po . mu ) ;
notify_on_locked ( exec_ctx , fd , & fd - > write_closure , closure ) ;
gpr_mu_unlock ( & fd - > mu ) ;
gpr_mu_unlock ( & fd - > po . mu ) ;
}
static grpc_workqueue * fd_get_workqueue ( grpc_fd * fd ) {
gpr_mu_lock ( & fd - > mu ) ;
grpc_workqueue * workqueue = GRPC_WORKQUEUE_REF (
( grpc_workqueue * ) fd - > polling_island , " fd_get_workqueue " ) ;
gpr_mu_unlock ( & fd - > mu ) ;
gpr_mu_lock ( & fd - > po . mu ) ;
grpc_workqueue * workqueue =
GRPC_WORKQUEUE_REF ( ( grpc_workqueue * ) fd - > po . pi , " fd_get_workqueue " ) ;
gpr_mu_unlock ( & fd - > po . mu ) ;
return workqueue ;
}
@ -1277,8 +1286,12 @@ static grpc_error *kick_poller(void) {
}
static void pollset_init ( grpc_pollset * pollset , gpr_mu * * mu ) {
gpr_mu_init ( & pollset - > mu ) ;
* mu = & pollset - > mu ;
gpr_mu_init ( & pollset - > po . mu ) ;
* mu = & pollset - > po . mu ;
pollset - > po . pi = NULL ;
# ifdef PO_DEBUG
pollset - > po . obj_type = POLL_OBJ_POLLSET ;
# endif
pollset - > root_worker . next = pollset - > root_worker . prev = & pollset - > root_worker ;
pollset - > kicked_without_pollers = false ;
@ -1286,8 +1299,6 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
pollset - > shutting_down = false ;
pollset - > finish_shutdown_called = false ;
pollset - > shutdown_done = NULL ;
pollset - > polling_island = NULL ;
}
/* Convert a timespec to milliseconds:
@ -1317,26 +1328,26 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
static void fd_become_readable ( grpc_exec_ctx * exec_ctx , grpc_fd * fd ,
grpc_pollset * notifier ) {
/* Need the fd->mu since we might be racing with fd_notify_on_read */
gpr_mu_lock ( & fd - > mu ) ;
/* Need the fd->po. mu since we might be racing with fd_notify_on_read */
gpr_mu_lock ( & fd - > po . mu ) ;
set_ready_locked ( exec_ctx , fd , & fd - > read_closure ) ;
fd - > read_notifier_pollset = notifier ;
gpr_mu_unlock ( & fd - > mu ) ;
gpr_mu_unlock ( & fd - > po . mu ) ;
}
static void fd_become_writable ( grpc_exec_ctx * exec_ctx , grpc_fd * fd ) {
/* Need the fd->mu since we might be racing with fd_notify_on_write */
gpr_mu_lock ( & fd - > mu ) ;
/* Need the fd->po. mu since we might be racing with fd_notify_on_write */
gpr_mu_lock ( & fd - > po . mu ) ;
set_ready_locked ( exec_ctx , fd , & fd - > write_closure ) ;
gpr_mu_unlock ( & fd - > mu ) ;
gpr_mu_unlock ( & fd - > po . mu ) ;
}
static void pollset_release_polling_island ( grpc_exec_ctx * exec_ctx ,
grpc_pollset * ps , char * reason ) {
if ( ps - > polling_island ! = NULL ) {
PI_UNREF ( exec_ctx , ps - > polling_island , reason ) ;
if ( ps - > po . pi ! = NULL ) {
PI_UNREF ( exec_ctx , ps - > po . pi , reason ) ;
}
ps - > polling_island = NULL ;
ps - > po . pi = NULL ;
}
static void finish_shutdown_locked ( grpc_exec_ctx * exec_ctx ,
@ -1346,12 +1357,12 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
pollset - > finish_shutdown_called = true ;
/* Release the ref and set pollset->polling_island to NULL */
/* Release the ref and set pollset->po.pi to NULL */
pollset_release_polling_island ( exec_ctx , pollset , " ps_shutdown " ) ;
grpc_exec_ctx_sched ( exec_ctx , pollset - > shutdown_done , GRPC_ERROR_NONE , NULL ) ;
}
/* pollset->mu lock must be held by the caller before calling this */
/* pollset->po. mu lock must be held by the caller before calling this */
static void pollset_shutdown ( grpc_exec_ctx * exec_ctx , grpc_pollset * pollset ,
grpc_closure * closure ) {
GPR_TIMER_BEGIN ( " pollset_shutdown " , 0 ) ;
@ -1376,7 +1387,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
* here */
static void pollset_destroy ( grpc_pollset * pollset ) {
GPR_ASSERT ( ! pollset_has_workers ( pollset ) ) ;
gpr_mu_destroy ( & pollset - > mu ) ;
gpr_mu_destroy ( & pollset - > po . mu ) ;
}
static void pollset_reset ( grpc_pollset * pollset ) {
@ -1386,7 +1397,7 @@ static void pollset_reset(grpc_pollset *pollset) {
pollset - > finish_shutdown_called = false ;
pollset - > kicked_without_pollers = false ;
pollset - > shutdown_done = NULL ;
GPR_ASSERT ( pollset - > polling_island = = NULL ) ;
GPR_ASSERT ( pollset - > po . pi = = NULL ) ;
}
static bool maybe_do_workqueue_work ( grpc_exec_ctx * exec_ctx ,
@ -1426,7 +1437,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN ( " pollset_work_and_unlock " , 0 ) ;
/* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
latest polling island pointed by pollset - > polling_island .
latest polling island pointed by pollset - > po . pi
Since epoll_fd is immutable , we can read it without obtaining the polling
island lock . There is however a possibility that the polling island ( from
@ -1435,36 +1446,36 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
right - away from epoll_wait ( ) and pick up the latest polling_island the next
this function ( i . e pollset_work_and_unlock ( ) ) is called */
if ( pollset - > polling_island = = NULL ) {
pollset - > polling_island = polling_island_create ( exec_ctx , NULL , error ) ;
if ( pollset - > polling_island = = NULL ) {
if ( pollset - > po . pi = = NULL ) {
pollset - > po . pi = polling_island_create ( exec_ctx , NULL , error ) ;
if ( pollset - > po . pi = = NULL ) {
GPR_TIMER_END ( " pollset_work_and_unlock " , 0 ) ;
return ; /* Fatal error. We cannot continue */
}
PI_ADD_REF ( pollset - > polling_island , " ps " ) ;
PI_ADD_REF ( pollset - > po . pi , " ps " ) ;
GRPC_POLLING_TRACE ( " pollset_work: pollset: %p created new pi: %p " ,
( void * ) pollset , ( void * ) pollset - > polling_island ) ;
( void * ) pollset , ( void * ) pollset - > po . pi ) ;
}
pi = polling_island_maybe_get_latest ( pollset - > polling_island ) ;
pi = polling_island_maybe_get_latest ( pollset - > po . pi ) ;
epoll_fd = pi - > epoll_fd ;
/* Update the pollset->polling_island since the island being pointed by
pollset - > polling_island maybe older than the one pointed by pi ) */
if ( pollset - > polling_island ! = pi ) {
/* Update the pollset->po.pi since the island being pointed by
pollset - > po . pi maybe older than the one pointed by pi ) */
if ( pollset - > po . pi ! = pi ) {
/* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
polling island to be deleted */
PI_ADD_REF ( pi , " ps " ) ;
PI_UNREF ( exec_ctx , pollset - > polling_island , " ps " ) ;
pollset - > polling_island = pi ;
PI_UNREF ( exec_ctx , pollset - > po . pi , " ps " ) ;
pollset - > po . pi = pi ;
}
/* Add an extra ref so that the island does not get destroyed (which means
the epoll_fd won ' t be closed ) while we are are doing an epoll_wait ( ) on the
epoll_fd */
PI_ADD_REF ( pi , " ps_work " ) ;
gpr_mu_unlock ( & pollset - > mu ) ;
gpr_mu_unlock ( & pollset - > po . mu ) ;
/* If we get some workqueue work to do, it might end up completing an item on
the completion queue , so there ' s no need to poll . . . so we skip that and
@ -1537,17 +1548,17 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
GPR_ASSERT ( pi ! = NULL ) ;
/* Before leaving, release the extra ref we added to the polling island. It
is important to use " pi " here ( i . e our old copy of pollset - > polling_island
is important to use " pi " here ( i . e our old copy of pollset - > po . pi
that we got before releasing the polling island lock ) . This is because
pollset - > polling_island pointer might get udpated in other parts of the
pollset - > po . pi pointer might get udpated in other parts of the
code when there is an island merge while we are doing epoll_wait ( ) above */
PI_UNREF ( exec_ctx , pi , " ps_work " ) ;
GPR_TIMER_END ( " pollset_work_and_unlock " , 0 ) ;
}
/* pollset->mu lock must be held by the caller before calling this.
The function pollset_work ( ) may temporarily release the lock ( pollset - > mu )
/* pollset->po. mu lock must be held by the caller before calling this.
The function pollset_work ( ) may temporarily release the lock ( pollset - > po . mu )
during the course of its execution but it will always re - acquire the lock and
ensure that it is held by the time the function returns */
static grpc_error * pollset_work ( grpc_exec_ctx * exec_ctx , grpc_pollset * pollset ,
@ -1617,7 +1628,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
& g_orig_sigmask , & error ) ;
grpc_exec_ctx_flush ( exec_ctx ) ;
gpr_mu_lock ( & pollset - > mu ) ;
gpr_mu_lock ( & pollset - > po . mu ) ;
/* Note: There is no need to reset worker.is_kicked to 0 since we are no
longer going to use this worker */
@ -1637,9 +1648,9 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GPR_TIMER_MARK ( " pollset_work.finish_shutdown_locked " , 0 ) ;
finish_shutdown_locked ( exec_ctx , pollset ) ;
gpr_mu_unlock ( & pollset - > mu ) ;
gpr_mu_unlock ( & pollset - > po . mu ) ;
grpc_exec_ctx_flush ( exec_ctx ) ;
gpr_mu_lock ( & pollset - > mu ) ;
gpr_mu_lock ( & pollset - > po . mu ) ;
}
* worker_hdl = NULL ;
@ -1653,130 +1664,160 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
return error ;
}
static void pollset_add_fd ( grpc_exec_ctx * exec_ctx , grpc_pollset * pollset ,
grpc_fd * fd ) {
GPR_TIMER_BEGIN ( " pollset_add_fd " , 0 ) ;
grpc_error * error = GRPC_ERROR_NONE ;
static void add_poll_object ( grpc_exec_ctx * exec_ctx , poll_obj * bag ,
poll_obj_type bag_type , poll_obj * item ,
poll_obj_type item_type ) {
GPR_TIMER_BEGIN ( " add_poll_object " , 0 ) ;
gpr_mu_lock ( & pollset - > mu ) ;
gpr_mu_lock ( & fd - > mu ) ;
# ifdef PO_DEBUG
GPR_ASSERT ( item - > obj_type = = item_type ) ;
GPR_ASSERT ( bag - > obj_type = = bag_type ) ;
# endif
grpc_error * error = GRPC_ERROR_NONE ;
polling_island * pi_new = NULL ;
gpr_mu_lock ( & bag - > mu ) ;
gpr_mu_lock ( & item - > mu ) ;
retry :
/* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
* equal , do nothing .
* 2 ) If fd - > polling_island and pollset - > polling_island are both NULL , create
* a new polling island ( with a refcount of 2 ) and make the polling_island
* fields in both fd and pollset to point to the new island
* 3 ) If one of fd - > polling_island or pollset - > polling_island is NULL , update
* the NULL polling_island field to point to the non - NULL polling_island
* field ( ensure that the refcount on the polling island is incremented by
* 1 to account for the newly added reference )
* 4 ) Finally , if fd - > polling_island and pollset - > polling_island are non - NULL
* and different , merge both the polling islands and update the
* polling_island fields in both fd and pollset to point to the merged
* polling island .
/*
* 1 ) If item - > pi and bag - > pi are both non - NULL and equal , do nothing
* 2 ) If item - > pi and bag - > pi are both NULL , create a new polling island ( with
* a refcount of 2 ) and point item - > pi and bag - > pi to the new island
* 3 ) If exactly one of item - > pi or bag - > pi is NULL , update it to point to
* the other ' s non - NULL pi
* 4 ) Finally if item - > pi and bag - pi are non - NULL and not - equal , merge the
* polling islands and update item - > pi and bag - > pi to point to the new
* island
*/
if ( fd - > orphaned ) {
gpr_mu_unlock ( & fd - > mu ) ;
gpr_mu_unlock ( & pollset - > mu ) ;
/* early out */
/* Early out if we are trying to add an 'fd' to a 'bag' but the fd is already
* orphaned */
if ( item_type = = POLL_OBJ_FD & & ( FD_FROM_PO ( item ) ) - > orphaned ) {
gpr_mu_unlock ( & item - > mu ) ;
gpr_mu_unlock ( & bag - > mu ) ;
return ;
}
if ( fd - > poll ing_island = = pollset - > poll ing_island ) {
pi_new = fd - > poll ing_island ;
if ( item - > pi = = bag - > pi ) {
pi_new = item - > pi ;
if ( pi_new = = NULL ) {
/* Unlock before creating a new polling island: the polling island will
create a workqueue which creates a file descriptor , and holding an fd
lock here can eventually cause a loop to appear to TSAN ( making it
unhappy ) . We don ' t think it ' s a real loop ( there ' s an epoch point where
that loop possibility disappears ) , but the advantages of keeping TSAN
happy outweigh any performance advantage we might have by keeping the
lock held . */
gpr_mu_unlock ( & fd - > mu ) ;
pi_new = polling_island_create ( exec_ctx , fd , & error ) ;
gpr_mu_lock ( & fd - > mu ) ;
/* Need to reverify any assumptions made between the initial lock and
getting to this branch : if they ' ve changed , we need to throw away our
work and figure things out again . */
if ( fd - > polling_island ! = NULL ) {
GRPC_POLLING_TRACE (
" pollset_add_fd: Raced creating new polling island. pi_new: %p "
" (fd: %d, pollset: %p) " ,
( void * ) pi_new , fd - > fd , ( void * ) pollset ) ;
/* No need to lock 'pi_new' here since this is a new polling island and
* no one has a reference to it yet */
polling_island_remove_all_fds_locked ( pi_new , true , & error ) ;
/* Ref and unref so that the polling island gets deleted during unref */
PI_ADD_REF ( pi_new , " dance_of_destruction " ) ;
PI_UNREF ( exec_ctx , pi_new , " dance_of_destruction " ) ;
goto retry ;
/* GPR_ASSERT(item->pi == bag->pi == NULL) */
/* If we are adding an fd to a bag (i.e pollset or pollset_set), then
* we need to do some extra work to make TSAN happy */
if ( item_type = = POLL_OBJ_FD ) {
/* Unlock before creating a new polling island: the polling island will
create a workqueue which creates a file descriptor , and holding an fd
lock here can eventually cause a loop to appear to TSAN ( making it
unhappy ) . We don ' t think it ' s a real loop ( there ' s an epoch point
where that loop possibility disappears ) , but the advantages of
keeping TSAN happy outweigh any performance advantage we might have
by keeping the lock held . */
gpr_mu_unlock ( & item - > mu ) ;
pi_new = polling_island_create ( exec_ctx , FD_FROM_PO ( item ) , & error ) ;
gpr_mu_lock ( & item - > mu ) ;
/* Need to reverify any assumptions made between the initial lock and
getting to this branch : if they ' ve changed , we need to throw away our
work and figure things out again . */
if ( item - > pi ! = NULL ) {
GRPC_POLLING_TRACE (
" add_poll_object: Raced creating new polling island. pi_new: %p "
" (fd: %d, %s: %p) " ,
( void * ) pi_new , FD_FROM_PO ( item ) - > fd , poll_obj_string ( bag_type ) ,
( void * ) bag ) ;
/* No need to lock 'pi_new' here since this is a new polling island
* and no one has a reference to it yet */
polling_island_remove_all_fds_locked ( pi_new , true , & error ) ;
/* Ref and unref so that the polling island gets deleted during unref
*/
PI_ADD_REF ( pi_new , " dance_of_destruction " ) ;
PI_UNREF ( exec_ctx , pi_new , " dance_of_destruction " ) ;
goto retry ;
}
} else {
GRPC_POLLING_TRACE (
" pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
" pollset: %p) " ,
( void * ) pi_new , fd - > fd , ( void * ) pollset ) ;
pi_new = polling_island_create ( exec_ctx , NULL , & error ) ;
}
GRPC_POLLING_TRACE (
" add_poll_object: Created new polling island. pi_new: %p (%s: %p, "
" %s: %p) " ,
( void * ) pi_new , poll_obj_string ( item_type ) , ( void * ) item ,
poll_obj_string ( bag_type ) , ( void * ) bag ) ;
} else {
GRPC_POLLING_TRACE (
" add_poll_object: Same polling island. pi: %p (%s, %s) " ,
( void * ) pi_new , poll_obj_string ( item_type ) ,
poll_obj_string ( bag_type ) ) ;
}
} else if ( item - > pi = = NULL ) {
/* GPR_ASSERT(bag->pi != NULL) */
/* Make pi_new point to latest pi*/
pi_new = polling_island_lock ( bag - > pi ) ;
if ( item_type = = POLL_OBJ_FD ) {
grpc_fd * fd = FD_FROM_PO ( item ) ;
polling_island_add_fds_locked ( pi_new , & fd , 1 , true , & error ) ;
}
} else if ( fd - > polling_island = = NULL ) {
pi_new = polling_island_lock ( pollset - > polling_island ) ;
polling_island_add_fds_locked ( pi_new , & fd , 1 , true , & error ) ;
gpr_mu_unlock ( & pi_new - > mu ) ;
gpr_mu_unlock ( & pi_new - > mu ) ;
GRPC_POLLING_TRACE (
" pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, "
" pollset->pi: %p) " ,
( void * ) pi_new , fd - > fd , ( void * ) pollset ,
( void * ) pollset - > polling_island ) ;
} else if ( pollset - > polling_island = = NULL ) {
pi_new = polling_island_lock ( fd - > polling_island ) ;
" add_poll_obj: item->pi was NULL. pi_new: %p (item(%s): %p, "
" bag(%s): %p) " ,
( void * ) pi_new , poll_obj_string ( item_type ) , ( void * ) item ,
poll_obj_string ( bag_type ) , ( void * ) bag ) ;
} else if ( bag - > pi = = NULL ) {
/* GPR_ASSERT(item->pi != NULL) */
/* Make pi_new to point to latest pi */
pi_new = polling_island_lock ( item - > pi ) ;
gpr_mu_unlock ( & pi_new - > mu ) ;
GRPC_POLLING_TRACE (
" pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: "
" %p, fd->pi: %p " ,
( void * ) pi_new , fd - > fd , ( void * ) pollset , ( void * ) fd - > polling_island ) ;
" add_poll_obj: bag->pi was NULL. pi_new: %p (item(%s): %p, "
" bag(%s): %p) " ,
( void * ) pi_new , poll_obj_string ( item_type ) , ( void * ) item ,
poll_obj_string ( bag_type ) , ( void * ) bag ) ;
} else {
pi_new = polling_island_merge ( fd - > polling_island , pollset - > polling_island ,
& error ) ;
pi_new = polling_island_merge ( item - > pi , bag - > pi , & error ) ;
GRPC_POLLING_TRACE (
" pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: "
" %p, fd->pi: %p, pollset->pi : %p)" ,
( void * ) pi_new , fd - > fd , ( void * ) pollset , ( void * ) fd - > polling_island ,
( void * ) pollset - > polling_island ) ;
" add_poll_obj: polling islands merged. pi_new: %p (item(%s): %p, "
" bag(%s) : %p)" ,
( void * ) pi_new , poll_obj_string ( item_type ) , ( void * ) item ,
poll_obj_string ( bag_type ) , ( void * ) bag ) ;
}
/* At this point, pi_new is the polling island that both fd->polling_island
and pollset - > polling_island must be pointing to */
/* At this point, pi_new is the polling island that both item->pi and bag->pi
MUST be pointing to */
if ( fd - > poll ing_island ! = pi_new ) {
PI_ADD_REF ( pi_new , " fd " ) ;
if ( fd - > poll ing_island ! = NULL ) {
PI_UNREF ( exec_ctx , fd - > polling_island , " fd " ) ;
if ( item - > pi ! = pi_new ) {
PI_ADD_REF ( pi_new , poll_obj_string ( item_type ) ) ;
if ( item - > pi ! = NULL ) {
PI_UNREF ( exec_ctx , item - > pi , poll_obj_string ( item_type ) ) ;
}
fd - > poll ing_island = pi_new ;
item - > pi = pi_new ;
}
if ( pollset - > poll ing_island ! = pi_new ) {
PI_ADD_REF ( pi_new , " ps " ) ;
if ( pollset - > poll ing_island ! = NULL ) {
PI_UNREF ( exec_ctx , pollset - > polling_island , " ps " ) ;
if ( bag - > pi ! = pi_new ) {
PI_ADD_REF ( pi_new , poll_obj_string ( bag_type ) ) ;
if ( bag - > pi ! = NULL ) {
PI_UNREF ( exec_ctx , bag - > pi , poll_obj_string ( bag_type ) ) ;
}
pollset - > poll ing_island = pi_new ;
bag - > pi = pi_new ;
}
gpr_mu_unlock ( & fd - > mu ) ;
gpr_mu_unlock ( & pollset - > mu ) ;
gpr_mu_unlock ( & item - > mu ) ;
gpr_mu_unlock ( & bag - > mu ) ;
GRPC_LOG_IF_ERROR ( " pollset_add_fd " , error ) ;
GRPC_LOG_IF_ERROR ( " add_poll_object " , error ) ;
GPR_TIMER_END ( " add_poll_object " , 0 ) ;
}
GPR_TIMER_END ( " pollset_add_fd " , 0 ) ;
static void pollset_add_fd ( grpc_exec_ctx * exec_ctx , grpc_pollset * pollset ,
grpc_fd * fd ) {
add_poll_object ( exec_ctx , & pollset - > po , POLL_OBJ_POLLSET , & fd - > po ,
POLL_OBJ_FD ) ;
}
/*******************************************************************************
@ -1784,142 +1825,60 @@ retry:
*/
static grpc_pollset_set * pollset_set_create ( void ) {
grpc_pollset_set * pollset_set = gpr_malloc ( sizeof ( * pollset_set ) ) ;
memset ( pollset_set , 0 , sizeof ( * pollset_set ) ) ;
gpr_mu_init ( & pollset_set - > mu ) ;
return pollset_set ;
grpc_pollset_set * pss = gpr_malloc ( sizeof ( * pss ) ) ;
gpr_mu_init ( & pss - > po . mu ) ;
pss - > po . pi = NULL ;
# ifdef PO_DEBUG
pss - > po . obj_type = POLL_OBJ_POLLSET_SET ;
# endif
return pss ;
}
static void pollset_set_destroy ( grpc_pollset_set * pollset_set ) {
size_t i ;
gpr_mu_destroy ( & pollset_set - > mu ) ;
for ( i = 0 ; i < pollset_set - > fd_count ; i + + ) {
GRPC_FD_UNREF ( pollset_set - > fds [ i ] , " pollset_set " ) ;
static void pollset_set_destroy ( grpc_pollset_set * pss ) {
gpr_mu_destroy ( & pss - > po . mu ) ;
if ( pss - > po . pi ! = NULL ) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT ;
PI_UNREF ( & exec_ctx , pss - > po . pi , " pss_destroy " ) ;
grpc_exec_ctx_finish ( & exec_ctx ) ;
}
gpr_free ( pollset_set - > pollsets ) ;
gpr_free ( pollset_set - > pollset_sets ) ;
gpr_free ( pollset_set - > fds ) ;
gpr_free ( pollset_set ) ;
gpr_free ( pss ) ;
}
static void pollset_set_add_fd ( grpc_exec_ctx * exec_ctx ,
grpc_pollset_set * pollset_set , grpc_fd * fd ) {
size_t i ;
gpr_mu_lock ( & pollset_set - > mu ) ;
if ( pollset_set - > fd_count = = pollset_set - > fd_capacity ) {
pollset_set - > fd_capacity = GPR_MAX ( 8 , 2 * pollset_set - > fd_capacity ) ;
pollset_set - > fds = gpr_realloc (
pollset_set - > fds , pollset_set - > fd_capacity * sizeof ( * pollset_set - > fds ) ) ;
}
GRPC_FD_REF ( fd , " pollset_set " ) ;
pollset_set - > fds [ pollset_set - > fd_count + + ] = fd ;
for ( i = 0 ; i < pollset_set - > pollset_count ; i + + ) {
pollset_add_fd ( exec_ctx , pollset_set - > pollsets [ i ] , fd ) ;
}
for ( i = 0 ; i < pollset_set - > pollset_set_count ; i + + ) {
pollset_set_add_fd ( exec_ctx , pollset_set - > pollset_sets [ i ] , fd ) ;
}
gpr_mu_unlock ( & pollset_set - > mu ) ;
static void pollset_set_add_fd ( grpc_exec_ctx * exec_ctx , grpc_pollset_set * pss ,
grpc_fd * fd ) {
add_poll_object ( exec_ctx , & pss - > po , POLL_OBJ_POLLSET_SET , & fd - > po ,
POLL_OBJ_FD ) ;
}
static void pollset_set_del_fd ( grpc_exec_ctx * exec_ctx ,
grpc_pollset_set * pollset_set , grpc_fd * fd ) {
size_t i ;
gpr_mu_lock ( & pollset_set - > mu ) ;
for ( i = 0 ; i < pollset_set - > fd_count ; i + + ) {
if ( pollset_set - > fds [ i ] = = fd ) {
pollset_set - > fd_count - - ;
GPR_SWAP ( grpc_fd * , pollset_set - > fds [ i ] ,
pollset_set - > fds [ pollset_set - > fd_count ] ) ;
GRPC_FD_UNREF ( fd , " pollset_set " ) ;
break ;
}
}
for ( i = 0 ; i < pollset_set - > pollset_set_count ; i + + ) {
pollset_set_del_fd ( exec_ctx , pollset_set - > pollset_sets [ i ] , fd ) ;
}
gpr_mu_unlock ( & pollset_set - > mu ) ;
static void pollset_set_del_fd ( grpc_exec_ctx * exec_ctx , grpc_pollset_set * pss ,
grpc_fd * fd ) {
/* Nothing to do */
}
static void pollset_set_add_pollset ( grpc_exec_ctx * exec_ctx ,
grpc_pollset_set * pollset_set ,
grpc_pollset * pollset ) {
size_t i , j ;
gpr_mu_lock ( & pollset_set - > mu ) ;
if ( pollset_set - > pollset_count = = pollset_set - > pollset_capacity ) {
pollset_set - > pollset_capacity =
GPR_MAX ( 8 , 2 * pollset_set - > pollset_capacity ) ;
pollset_set - > pollsets =
gpr_realloc ( pollset_set - > pollsets , pollset_set - > pollset_capacity *
sizeof ( * pollset_set - > pollsets ) ) ;
}
pollset_set - > pollsets [ pollset_set - > pollset_count + + ] = pollset ;
for ( i = 0 , j = 0 ; i < pollset_set - > fd_count ; i + + ) {
if ( fd_is_orphaned ( pollset_set - > fds [ i ] ) ) {
GRPC_FD_UNREF ( pollset_set - > fds [ i ] , " pollset_set " ) ;
} else {
pollset_add_fd ( exec_ctx , pollset , pollset_set - > fds [ i ] ) ;
pollset_set - > fds [ j + + ] = pollset_set - > fds [ i ] ;
}
}
pollset_set - > fd_count = j ;
gpr_mu_unlock ( & pollset_set - > mu ) ;
grpc_pollset_set * pss , grpc_pollset * ps ) {
add_poll_object ( exec_ctx , & pss - > po , POLL_OBJ_POLLSET_SET , & ps - > po ,
POLL_OBJ_POLLSET ) ;
}
static void pollset_set_del_pollset ( grpc_exec_ctx * exec_ctx ,
grpc_pollset_set * pollset_set ,
grpc_pollset * pollset ) {
size_t i ;
gpr_mu_lock ( & pollset_set - > mu ) ;
for ( i = 0 ; i < pollset_set - > pollset_count ; i + + ) {
if ( pollset_set - > pollsets [ i ] = = pollset ) {
pollset_set - > pollset_count - - ;
GPR_SWAP ( grpc_pollset * , pollset_set - > pollsets [ i ] ,
pollset_set - > pollsets [ pollset_set - > pollset_count ] ) ;
break ;
}
}
gpr_mu_unlock ( & pollset_set - > mu ) ;
grpc_pollset_set * pss , grpc_pollset * ps ) {
/* Nothing to do */
}
static void pollset_set_add_pollset_set ( grpc_exec_ctx * exec_ctx ,
grpc_pollset_set * bag ,
grpc_pollset_set * item ) {
size_t i , j ;
gpr_mu_lock ( & bag - > mu ) ;
if ( bag - > pollset_set_count = = bag - > pollset_set_capacity ) {
bag - > pollset_set_capacity = GPR_MAX ( 8 , 2 * bag - > pollset_set_capacity ) ;
bag - > pollset_sets =
gpr_realloc ( bag - > pollset_sets ,
bag - > pollset_set_capacity * sizeof ( * bag - > pollset_sets ) ) ;
}
bag - > pollset_sets [ bag - > pollset_set_count + + ] = item ;
for ( i = 0 , j = 0 ; i < bag - > fd_count ; i + + ) {
if ( fd_is_orphaned ( bag - > fds [ i ] ) ) {
GRPC_FD_UNREF ( bag - > fds [ i ] , " pollset_set " ) ;
} else {
pollset_set_add_fd ( exec_ctx , item , bag - > fds [ i ] ) ;
bag - > fds [ j + + ] = bag - > fds [ i ] ;
}
}
bag - > fd_count = j ;
gpr_mu_unlock ( & bag - > mu ) ;
add_poll_object ( exec_ctx , & bag - > po , POLL_OBJ_POLLSET_SET , & item - > po ,
POLL_OBJ_POLLSET_SET ) ;
}
static void pollset_set_del_pollset_set ( grpc_exec_ctx * exec_ctx ,
grpc_pollset_set * bag ,
grpc_pollset_set * item ) {
size_t i ;
gpr_mu_lock ( & bag - > mu ) ;
for ( i = 0 ; i < bag - > pollset_set_count ; i + + ) {
if ( bag - > pollset_sets [ i ] = = item ) {
bag - > pollset_set_count - - ;
GPR_SWAP ( grpc_pollset_set * , bag - > pollset_sets [ i ] ,
bag - > pollset_sets [ bag - > pollset_set_count ] ) ;
break ;
}
}
gpr_mu_unlock ( & bag - > mu ) ;
/* Nothing to do */
}
/* Test helper functions
@ -1927,9 +1886,9 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
void * grpc_fd_get_polling_island ( grpc_fd * fd ) {
polling_island * pi ;
gpr_mu_lock ( & fd - > mu ) ;
pi = fd - > polling_island ;
gpr_mu_unlock ( & fd - > mu ) ;
gpr_mu_lock ( & fd - > po . mu ) ;
pi = fd - > po . pi ;
gpr_mu_unlock ( & fd - > po . mu ) ;
return pi ;
}
@ -1937,9 +1896,9 @@ void *grpc_fd_get_polling_island(grpc_fd *fd) {
void * grpc_pollset_get_polling_island ( grpc_pollset * ps ) {
polling_island * pi ;
gpr_mu_lock ( & ps - > mu ) ;
pi = ps - > polling_island ;
gpr_mu_unlock ( & ps - > mu ) ;
gpr_mu_lock ( & ps - > po . mu ) ;
pi = ps - > po . pi ;
gpr_mu_unlock ( & ps - > po . mu ) ;
return pi ;
}