@ -42,6 +42,7 @@
# include "src/core/lib/iomgr/wakeup_fd_posix.h"
# include "src/core/lib/profiling/timers.h"
# include "src/core/lib/support/block_annotate.h"
# include "src/core/lib/support/murmur_hash.h"
# define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
@ -239,22 +240,43 @@ struct grpc_pollset_set {
* condition variable polling definitions
*/
# define POLLCV_THREAD_GRACE_MS 1000
# define CV_POLL_PERIOD_MS 1000
# define CV_DEFAULT_TABLE_SIZE 16
typedef enum poll_status_t { INPROGRESS , COMPLETED , CANCELLED } poll_status_t ;
typedef struct poll_args {
typedef struct poll_result {
gpr_refcount refcount ;
gpr_cv * cv ;
cv_node * watchers ;
int watchcount ;
struct pollfd * fds ;
nfds_t nfds ;
int timeout ;
int retval ;
int err ;
gpr_atm status ;
int completed ;
} poll_result ;
typedef struct poll_args {
gpr_cv trigger ;
int trigger_set ;
struct pollfd * fds ;
nfds_t nfds ;
poll_result * result ;
struct poll_args * next ;
struct poll_args * prev ;
} poll_args ;
// This is a 2-tiered cache, we mantain a hash table
// of active poll calls, so we can wait on the result
// of that call. We also maintain a freelist of inactive
// poll threads.
typedef struct poll_hash_table {
poll_args * free_pollers ;
poll_args * * active_pollers ;
unsigned int size ;
unsigned int count ;
} poll_hash_table ;
poll_hash_table poll_cache ;
cv_fd_table g_cvfds ;
/*******************************************************************************
@ -1277,43 +1299,205 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
* Condition Variable polling extensions
*/
static void decref_poll_args ( poll_args * args ) {
if ( gpr_unref ( & args - > refcount ) ) {
gpr_free ( args - > fds ) ;
gpr_cv_destroy ( args - > cv ) ;
gpr_free ( args - > cv ) ;
gpr_free ( args ) ;
static void run_poll ( void * args ) ;
static void cache_poller_locked ( poll_args * args ) ;
static void cache_insert_locked ( poll_args * args ) {
uint32_t key = gpr_murmur_hash3 ( args - > fds , args - > nfds * sizeof ( struct pollfd ) ,
0xDEADBEEF ) ;
key = key % poll_cache . size ;
if ( poll_cache . active_pollers [ key ] ) {
poll_cache . active_pollers [ key ] - > prev = args ;
}
args - > next = poll_cache . active_pollers [ key ] ;
args - > prev = NULL ;
poll_cache . active_pollers [ key ] = args ;
poll_cache . count + + ;
}
// Poll in a background thread
static void run_poll ( void * arg ) {
int timeout , retval ;
poll_args * pargs = ( poll_args * ) arg ;
while ( gpr_atm_no_barrier_load ( & pargs - > status ) = = INPROGRESS ) {
if ( pargs - > timeout < 0 ) {
timeout = CV_POLL_PERIOD_MS ;
} else {
timeout = GPR_MIN ( CV_POLL_PERIOD_MS , pargs - > timeout ) ;
pargs - > timeout - = timeout ;
static void init_result ( poll_args * pargs ) {
pargs - > result = gpr_malloc ( sizeof ( poll_result ) ) ;
gpr_ref_init ( & pargs - > result - > refcount , 1 ) ;
pargs - > result - > watchers = NULL ;
pargs - > result - > watchcount = 0 ;
pargs - > result - > fds = gpr_malloc ( sizeof ( struct pollfd ) * pargs - > nfds ) ;
memcpy ( pargs - > result - > fds , pargs - > fds , sizeof ( struct pollfd ) * pargs - > nfds ) ;
pargs - > result - > nfds = pargs - > nfds ;
pargs - > result - > retval = 0 ;
pargs - > result - > err = 0 ;
pargs - > result - > completed = 0 ;
}
// Creates a poll_args object for a given arguments to poll().
// This object may return a poll_args in the cache.
static poll_args * get_poller_locked ( struct pollfd * fds , nfds_t count ) {
uint32_t key =
gpr_murmur_hash3 ( fds , count * sizeof ( struct pollfd ) , 0xDEADBEEF ) ;
key = key % poll_cache . size ;
poll_args * curr = poll_cache . active_pollers [ key ] ;
while ( curr ) {
if ( curr - > nfds = = count & &
memcmp ( curr - > fds , fds , count * sizeof ( struct pollfd ) ) = = 0 ) {
gpr_free ( fds ) ;
return curr ;
}
retval = g_cvfds . poll ( pargs - > fds , pargs - > nfds , timeout ) ;
if ( retval ! = 0 | | pargs - > timeout = = 0 ) {
pargs - > retval = retval ;
pargs - > err = errno ;
break ;
curr = curr - > next ;
}
if ( poll_cache . free_pollers ) {
poll_args * pargs = poll_cache . free_pollers ;
poll_cache . free_pollers = pargs - > next ;
if ( poll_cache . free_pollers ) {
poll_cache . free_pollers - > prev = NULL ;
}
pargs - > fds = fds ;
pargs - > nfds = count ;
pargs - > next = NULL ;
pargs - > prev = NULL ;
init_result ( pargs ) ;
cache_poller_locked ( pargs ) ;
return pargs ;
}
poll_args * pargs = gpr_malloc ( sizeof ( struct poll_args ) ) ;
gpr_cv_init ( & pargs - > trigger ) ;
pargs - > fds = fds ;
pargs - > nfds = count ;
pargs - > next = NULL ;
pargs - > prev = NULL ;
pargs - > trigger_set = 0 ;
init_result ( pargs ) ;
cache_poller_locked ( pargs ) ;
gpr_thd_id t_id ;
gpr_thd_options opt = gpr_thd_options_default ( ) ;
gpr_ref ( & g_cvfds . pollcount ) ;
gpr_thd_options_set_detached ( & opt ) ;
GPR_ASSERT ( gpr_thd_new ( & t_id , & run_poll , pargs , & opt ) ) ;
return pargs ;
}
static void cache_delete_locked ( poll_args * args ) {
if ( ! args - > prev ) {
uint32_t key = gpr_murmur_hash3 (
args - > fds , args - > nfds * sizeof ( struct pollfd ) , 0xDEADBEEF ) ;
key = key % poll_cache . size ;
GPR_ASSERT ( poll_cache . active_pollers [ key ] = = args ) ;
poll_cache . active_pollers [ key ] = args - > next ;
} else {
args - > prev - > next = args - > next ;
}
gpr_mu_lock ( & g_cvfds . mu ) ;
if ( gpr_atm_no_barrier_load ( & pargs - > status ) = = INPROGRESS ) {
// Signal main thread that the poll completed
gpr_atm_no_barrier_store ( & pargs - > status , COMPLETED ) ;
gpr_cv_signal ( pargs - > cv ) ;
if ( args - > next ) {
args - > next - > prev = args - > prev ;
}
decref_poll_args ( pargs ) ;
g_cvfds . pollcount - - ;
if ( g_cvfds . shutdown & & g_cvfds . pollcount = = 0 ) {
gpr_cv_signal ( & g_cvfds . shutdown_complete ) ;
poll_cache . count - - ;
if ( poll_cache . free_pollers ) {
poll_cache . free_pollers - > prev = args ;
}
args - > prev = NULL ;
args - > next = poll_cache . free_pollers ;
gpr_free ( args - > fds ) ;
poll_cache . free_pollers = args ;
}
static void cache_poller_locked ( poll_args * args ) {
if ( poll_cache . count + 1 > poll_cache . size / 2 ) {
poll_args * * old_active_pollers = poll_cache . active_pollers ;
poll_cache . size = poll_cache . size * 2 ;
poll_cache . count = 0 ;
poll_cache . active_pollers = gpr_malloc ( sizeof ( void * ) * poll_cache . size ) ;
for ( unsigned int i = 0 ; i < poll_cache . size ; i + + ) {
poll_cache . active_pollers [ i ] = NULL ;
}
for ( unsigned int i = 0 ; i < poll_cache . size / 2 ; i + + ) {
poll_args * curr = old_active_pollers [ i ] ;
poll_args * next = NULL ;
while ( curr ) {
next = curr - > next ;
cache_insert_locked ( curr ) ;
curr = next ;
}
}
gpr_free ( old_active_pollers ) ;
}
cache_insert_locked ( args ) ;
}
static void cache_destroy_locked ( poll_args * args ) {
if ( args - > next ) {
args - > next - > prev = args - > prev ;
}
if ( args - > prev ) {
args - > prev - > next = args - > next ;
} else {
poll_cache . free_pollers = args - > next ;
}
gpr_free ( args ) ;
}
static void decref_poll_result ( poll_result * res ) {
if ( gpr_unref ( & res - > refcount ) ) {
GPR_ASSERT ( ! res - > watchers ) ;
gpr_free ( res - > fds ) ;
gpr_free ( res ) ;
}
}
void remove_cvn ( cv_node * * head , cv_node * target ) {
if ( target - > next ) {
target - > next - > prev = target - > prev ;
}
if ( target - > prev ) {
target - > prev - > next = target - > next ;
} else {
* head = target - > next ;
}
}
gpr_timespec thread_grace ;
// Poll in a background thread
static void run_poll ( void * args ) {
poll_args * pargs = ( poll_args * ) args ;
while ( 1 ) {
poll_result * result = pargs - > result ;
int retval = g_cvfds . poll ( result - > fds , result - > nfds , CV_POLL_PERIOD_MS ) ;
gpr_mu_lock ( & g_cvfds . mu ) ;
if ( retval ! = 0 ) {
result - > completed = 1 ;
result - > retval = retval ;
result - > err = errno ;
cv_node * watcher = result - > watchers ;
while ( watcher ) {
gpr_cv_signal ( watcher - > cv ) ;
watcher = watcher - > next ;
}
}
if ( result - > watchcount = = 0 | | result - > completed ) {
cache_delete_locked ( pargs ) ;
decref_poll_result ( result ) ;
// Leave this polling thread alive for a grace period to do another poll()
// op
gpr_timespec deadline = gpr_now ( GPR_CLOCK_REALTIME ) ;
deadline = gpr_time_add ( deadline , thread_grace ) ;
pargs - > trigger_set = 0 ;
gpr_cv_wait ( & pargs - > trigger , & g_cvfds . mu , deadline ) ;
if ( ! pargs - > trigger_set ) {
cache_destroy_locked ( pargs ) ;
break ;
}
}
gpr_mu_unlock ( & g_cvfds . mu ) ;
}
// We still have the lock here
if ( gpr_unref ( & g_cvfds . pollcount ) ) {
gpr_cv_signal ( & g_cvfds . shutdown_cv ) ;
}
gpr_mu_unlock ( & g_cvfds . mu ) ;
}
@ -1322,24 +1506,29 @@ static void run_poll(void *arg) {
static int cvfd_poll ( struct pollfd * fds , nfds_t nfds , int timeout ) {
unsigned int i ;
int res , idx ;
gpr_cv * pollcv ;
cv_node * cvn , * prev ;
cv_node * pollcv ;
int skip_poll = 0 ;
nfds_t nsockfds = 0 ;
gpr_thd_id t_id ;
gpr_thd_options opt ;
poll_args * pargs = NULL ;
poll_result * result = NULL ;
gpr_mu_lock ( & g_cvfds . mu ) ;
pollcv = gpr_malloc ( sizeof ( gpr_cv ) ) ;
gpr_cv_init ( pollcv ) ;
pollcv = gpr_malloc ( sizeof ( cv_node ) ) ;
pollcv - > next = NULL ;
gpr_cv pollcv_cv ;
gpr_cv_init ( & pollcv_cv ) ;
pollcv - > cv = & pollcv_cv ;
cv_node * fd_cvs = gpr_malloc ( nfds * sizeof ( cv_node ) ) ;
for ( i = 0 ; i < nfds ; i + + ) {
fds [ i ] . revents = 0 ;
if ( fds [ i ] . fd < 0 & & ( fds [ i ] . events & POLLIN ) ) {
idx = FD_TO_IDX ( fds [ i ] . fd ) ;
cvn = gpr_malloc ( sizeof ( cv_node ) ) ;
cvn - > cv = pollcv ;
cvn - > next = g_cvfds . cvfds [ idx ] . cvs ;
g_cvfds . cvfds [ idx ] . cvs = cvn ;
fd_cvs [ i ] . cv = & pollcv_cv ;
fd_cvs [ i ] . prev = NULL ;
fd_cvs [ i ] . next = g_cvfds . cvfds [ idx ] . cvs ;
if ( g_cvfds . cvfds [ idx ] . cvs ) {
g_cvfds . cvfds [ idx ] . cvs - > prev = & ( fd_cvs [ i ] ) ;
}
g_cvfds . cvfds [ idx ] . cvs = & ( fd_cvs [ i ] ) ;
// Don't bother polling if a wakeup fd is ready
if ( g_cvfds . cvfds [ idx ] . is_set ) {
skip_poll = 1 ;
@ -1349,81 +1538,68 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
}
}
gpr_timespec deadline = gpr_now ( GPR_CLOCK_REALTIME ) ;
if ( timeout < 0 ) {
deadline = gpr_inf_future ( GPR_CLOCK_REALTIME ) ;
} else {
deadline =
gpr_time_add ( deadline , gpr_time_from_millis ( timeout , GPR_TIMESPAN ) ) ;
}
res = 0 ;
if ( ! skip_poll & & nsockfds > 0 ) {
pargs = gpr_malloc ( sizeof ( struct poll_args ) ) ;
// Both the main thread and calling thread get a reference
gpr_ref_init ( & pargs - > refcount , 2 ) ;
pargs - > cv = pollcv ;
pargs - > fds = gpr_malloc ( sizeof ( struct pollfd ) * nsockfds ) ;
pargs - > nfds = nsockfds ;
pargs - > timeout = timeout ;
pargs - > retval = 0 ;
pargs - > err = 0 ;
gpr_atm_no_barrier_store ( & pargs - > status , INPROGRESS ) ;
struct pollfd * pollfds = gpr_malloc ( sizeof ( struct pollfd ) * nsockfds ) ;
idx = 0 ;
for ( i = 0 ; i < nfds ; i + + ) {
if ( fds [ i ] . fd > = 0 ) {
pargs - > fds [ idx ] . fd = fds [ i ] . fd ;
pargs - > fds [ idx ] . events = fds [ i ] . events ;
pargs - > fds [ idx ] . revents = 0 ;
pollfds [ idx ] . fd = fds [ i ] . fd ;
pollfds [ idx ] . events = fds [ i ] . events ;
pollfds [ idx ] . revents = 0 ;
idx + + ;
}
}
g_cvfds . pollcount + + ;
opt = gpr_thd_options_default ( ) ;
gpr_thd_options_set_detached ( & opt ) ;
GPR_ASSERT ( gpr_thd_new ( & t_id , & run_poll , pargs , & opt ) ) ;
// We want the poll() thread to trigger the deadline, so wait forever here
gpr_cv_wait ( pollcv , & g_cvfds . mu , gpr_inf_future ( GPR_CLOCK_MONOTONIC ) ) ;
if ( gpr_atm_no_barrier_load ( & pargs - > status ) = = COMPLETED ) {
res = pargs - > retval ;
errno = pargs - > err ;
} else {
errno = 0 ;
gpr_atm_no_barrier_store ( & pargs - > status , CANCELLED ) ;
poll_args * pargs = get_poller_locked ( pollfds , nsockfds ) ;
result = pargs - > result ;
pollcv - > next = result - > watchers ;
pollcv - > prev = NULL ;
if ( result - > watchers ) {
result - > watchers - > prev = pollcv ;
}
result - > watchers = pollcv ;
result - > watchcount + + ;
gpr_ref ( & result - > refcount ) ;
pargs - > trigger_set = 1 ;
gpr_cv_signal ( & pargs - > trigger ) ;
gpr_cv_wait ( & pollcv_cv , & g_cvfds . mu , deadline ) ;
res = result - > retval ;
errno = result - > err ;
result - > watchcount - - ;
remove_cvn ( & result - > watchers , pollcv ) ;
} else if ( ! skip_poll ) {
gpr_timespec deadline = gpr_now ( GPR_CLOCK_REALTIME ) ;
deadline =
gpr_time_add ( deadline , gpr_time_from_millis ( timeout , GPR_TIMESPAN ) ) ;
gpr_cv_wait ( pollcv , & g_cvfds . mu , deadline ) ;
gpr_cv_wait ( & pollcv_cv , & g_cvfds . mu , deadline ) ;
}
idx = 0 ;
for ( i = 0 ; i < nfds ; i + + ) {
if ( fds [ i ] . fd < 0 & & ( fds [ i ] . events & POLLIN ) ) {
cvn = g_cvfds . cvfds [ FD_TO_IDX ( fds [ i ] . fd ) ] . cvs ;
prev = NULL ;
while ( cvn - > cv ! = pollcv ) {
prev = cvn ;
cvn = cvn - > next ;
GPR_ASSERT ( cvn ) ;
}
if ( ! prev ) {
g_cvfds . cvfds [ FD_TO_IDX ( fds [ i ] . fd ) ] . cvs = cvn - > next ;
} else {
prev - > next = cvn - > next ;
}
gpr_free ( cvn ) ;
remove_cvn ( & g_cvfds . cvfds [ FD_TO_IDX ( fds [ i ] . fd ) ] . cvs , & ( fd_cvs [ i ] ) ) ;
if ( g_cvfds . cvfds [ FD_TO_IDX ( fds [ i ] . fd ) ] . is_set ) {
fds [ i ] . revents = POLLIN ;
if ( res > = 0 ) res + + ;
}
} else if ( ! skip_poll & & fds [ i ] . fd > = 0 & &
gpr_atm_no_barrier_load ( & pargs - > status ) = = COMPLETED ) {
fds [ i ] . revents = pargs - > fds [ idx ] . revents ;
} else if ( ! skip_poll & & fds [ i ] . fd > = 0 & & result - > completed ) {
fds [ i ] . revents = result - > fds [ idx ] . revents ;
idx + + ;
}
}
if ( pargs ) {
decref_poll_args ( pargs ) ;
} else {
gpr_cv_destroy ( pollcv ) ;
gpr_free ( pollcv ) ;
gpr_free ( fd_cvs ) ;
gpr_free ( pollcv ) ;
if ( result ) {
decref_poll_result ( result ) ;
}
gpr_mu_unlock ( & g_cvfds . mu ) ;
return res ;
@ -1432,12 +1608,12 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
static void global_cv_fd_table_init ( ) {
gpr_mu_init ( & g_cvfds . mu ) ;
gpr_mu_lock ( & g_cvfds . mu ) ;
gpr_cv_init ( & g_cvfds . shutdown_complete ) ;
g_cvfds . shutdown = 0 ;
g_cvfds . pollcount = 0 ;
gpr_cv_init ( & g_cvfds . shutdown_cv ) ;
gpr_ref_init ( & g_cvfds . pollcount , 1 ) ;
g_cvfds . size = CV_DEFAULT_TABLE_SIZE ;
g_cvfds . cvfds = gpr_malloc ( sizeof ( fd_node ) * CV_DEFAULT_TABLE_SIZE ) ;
g_cvfds . free_fds = NULL ;
thread_grace = gpr_time_from_millis ( POLLCV_THREAD_GRACE_MS , GPR_TIMESPAN ) ;
for ( int i = 0 ; i < CV_DEFAULT_TABLE_SIZE ; i + + ) {
g_cvfds . cvfds [ i ] . is_set = 0 ;
g_cvfds . cvfds [ i ] . cvs = NULL ;
@ -1447,23 +1623,35 @@ static void global_cv_fd_table_init() {
// Override the poll function with one that supports cvfds
g_cvfds . poll = grpc_poll_function ;
grpc_poll_function = & cvfd_poll ;
// Initialize the cache
poll_cache . size = 32 ;
poll_cache . count = 0 ;
poll_cache . free_pollers = NULL ;
poll_cache . active_pollers = gpr_malloc ( sizeof ( void * ) * 32 ) ;
for ( unsigned int i = 0 ; i < poll_cache . size ; i + + ) {
poll_cache . active_pollers [ i ] = NULL ;
}
gpr_mu_unlock ( & g_cvfds . mu ) ;
}
static void global_cv_fd_table_shutdown ( ) {
gpr_mu_lock ( & g_cvfds . mu ) ;
g_cvfds . shutdown = 1 ;
// Attempt to wait for all abandoned poll() threads to terminate
// Not doing so will result in reported memory leaks
if ( g_cvfds . pollcount > 0 ) {
int res = gpr_cv_wait ( & g_cvfds . shutdown_complete , & g_cvfds . mu ,
if ( ! gpr_unref ( & g_cvfds . pollcount ) ) {
int res = gpr_cv_wait ( & g_cvfds . shutdown_cv , & g_cvfds . mu ,
gpr_time_add ( gpr_now ( GPR_CLOCK_REALTIME ) ,
gpr_time_from_seconds ( 3 , GPR_TIMESPAN ) ) ) ;
GPR_ASSERT ( res = = 0 ) ;
}
gpr_cv_destroy ( & g_cvfds . shutdown_complete ) ;
gpr_cv_destroy ( & g_cvfds . shutdown_cv ) ;
grpc_poll_function = g_cvfds . poll ;
gpr_free ( g_cvfds . cvfds ) ;
gpr_free ( poll_cache . active_pollers ) ;
gpr_mu_unlock ( & g_cvfds . mu ) ;
gpr_mu_destroy ( & g_cvfds . mu ) ;
}