@ -45,6 +45,7 @@
# include "src/core/lib/gpr/spinlock.h"
# include "src/core/lib/gpr/tls.h"
# include "src/core/lib/gpr/useful.h"
# include "src/core/lib/gprpp/inlined_vector.h"
# include "src/core/lib/gprpp/manual_constructor.h"
# include "src/core/lib/gprpp/mutex_lock.h"
# include "src/core/lib/iomgr/block_annotate.h"
@ -78,18 +79,6 @@ typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type;
typedef struct pollable pollable ;
typedef struct cached_fd {
// Set to the grpc_fd's salt value. See 'salt' variable' in grpc_fd for more
// details
intptr_t salt ;
// The underlying fd
int fd ;
// A recency time counter that helps to determine the LRU fd in the cache
uint64_t last_used ;
} cached_fd ;
/// A pollable is something that can be polled: it has an epoll set to poll on,
/// and a wakeup fd for kicks
/// There are three broad types:
@ -120,33 +109,6 @@ struct pollable {
int event_cursor ;
int event_count ;
struct epoll_event events [ MAX_EPOLL_EVENTS ] ;
// We may be calling pollable_add_fd() on the same (pollable, fd) multiple
// times. To prevent pollable_add_fd() from making multiple sys calls to
// epoll_ctl() to add the fd, we maintain a cache of what fds are already
// present in the underlying epoll-set.
//
// Since this is not a correctness issue, we do not need to maintain all the
// fds in the cache. Hence we just use an LRU cache of size 'MAX_FDS_IN_CACHE'
//
// NOTE: An ideal implementation of this should do the following:
// 1) Add fds to the cache in pollable_add_fd() function (i.e whenever the fd
// is added to the pollable's epoll set)
// 2) Remove the fd from the cache whenever the fd is removed from the
// underlying epoll set (i.e whenever fd_orphan() is called).
//
// Implementing (2) above (i.e removing fds from cache on fd_orphan) adds a
// lot of complexity since an fd can be present in multiple pollables. So our
// implementation ONLY DOES (1) and NOT (2).
//
// The cache_fd.salt variable helps here to maintain correctness (it serves as
// an epoch that differentiates one grpc_fd from the other even though both of
// them may have the same fd number)
//
// The following implements LRU-eviction cache of fds in this pollable
cached_fd fd_cache [ MAX_FDS_IN_CACHE ] ;
int fd_cache_size ;
uint64_t fd_cache_counter ; // Recency timer tick counter
} ;
static const char * pollable_type_string ( pollable_type t ) {
@ -189,37 +151,86 @@ static void pollable_unref(pollable* p, int line, const char* reason);
* Fd Declarations
*/
// Monotonically increasing Epoch counter that is assinged to each grpc_fd. See
// the description of 'salt' variable in 'grpc_fd' for more details
// TODO: (sreek/kpayson) gpr_atm is intptr_t which may not be wide-enough on
// 32-bit systems. Change this to int_64 - atleast on 32-bit systems
static gpr_atm g_fd_salt ;
struct grpc_fd {
int fd ;
grpc_fd ( int fd , const char * name , bool track_err )
: fd ( fd ) , track_err ( track_err ) {
gpr_mu_init ( & orphan_mu ) ;
gpr_mu_init ( & pollable_mu ) ;
read_closure . InitEvent ( ) ;
write_closure . InitEvent ( ) ;
error_closure . InitEvent ( ) ;
char * fd_name ;
gpr_asprintf ( & fd_name , " %s fd=%d " , name , fd ) ;
grpc_iomgr_register_object ( & iomgr_object , fd_name ) ;
# ifndef NDEBUG
if ( grpc_trace_fd_refcount . enabled ( ) ) {
gpr_log ( GPR_DEBUG , " FD %d %p create %s " , fd , this , fd_name ) ;
}
# endif
gpr_free ( fd_name ) ;
}
// This is really the dtor, but the poller threads waking up from
// epoll_wait() may access the (read|write|error)_closure after destruction.
// Since the object will be added to the free pool, this behavior is
// not going to cause issues, except spurious events if the FD is reused
// while the race happens.
void destroy ( ) {
grpc_iomgr_unregister_object ( & iomgr_object ) ;
// Since fd numbers can be reused (after old fds are closed), this serves as
// an epoch that uniquely identifies this fd (i.e the pair (salt, fd) is
// unique (until the salt counter (i.e g_fd_salt) overflows)
intptr_t salt ;
POLLABLE_UNREF ( pollable_obj , " fd_pollable " ) ;
pollset_fds . clear ( ) ;
gpr_mu_destroy ( & pollable_mu ) ;
gpr_mu_destroy ( & orphan_mu ) ;
read_closure . DestroyEvent ( ) ;
write_closure . DestroyEvent ( ) ;
error_closure . DestroyEvent ( ) ;
invalidate ( ) ;
}
# ifndef NDEBUG
/* Since an fd is never really destroyed (i.e gpr_free() is not called), it is
* hard - to - debug cases where fd fields are accessed even after calling
* fd_destroy ( ) . The following invalidates fd fields to make catching such
* errors easier */
void invalidate ( ) {
fd = - 1 ;
gpr_atm_no_barrier_store ( & refst , - 1 ) ;
memset ( & orphan_mu , - 1 , sizeof ( orphan_mu ) ) ;
memset ( & pollable_mu , - 1 , sizeof ( pollable_mu ) ) ;
pollable_obj = nullptr ;
on_done_closure = nullptr ;
memset ( & iomgr_object , - 1 , sizeof ( iomgr_object ) ) ;
track_err = false ;
}
# else
void invalidate ( ) { }
# endif
int fd ;
// refst format:
// bit 0 : 1=Active / 0=Orphaned
// bits 1-n : refcount
// Ref/Unref by two to avoid altering the orphaned bit
gpr_atm refst ;
gpr_atm refst = 1 ;
gpr_mu orphan_mu ;
// Protects pollable_obj and pollset_fds.
gpr_mu pollable_mu ;
pollable * pollable_obj ;
grpc_core : : InlinedVector < int , 1 > pollset_fds ; // Used in PO_MULTI.
pollable * pollable_obj = nullptr ; // Used in PO_FD.
grpc_core : : ManualConstructor < grpc_core : : LockfreeEvent > read_closure ;
grpc_core : : ManualConstructor < grpc_core : : LockfreeEvent > write_closure ;
grpc_core : : ManualConstructor < grpc_core : : LockfreeEvent > error_closure ;
grpc_core : : LockfreeEvent read_closure ;
grpc_core : : LockfreeEvent write_closure ;
grpc_core : : LockfreeEvent error_closure ;
struct grpc_fd * freelist_next ;
grpc_closure * on_done_closure ;
struct grpc_fd * freelist_next = nullptr ;
grpc_closure * on_done_closure = nullptr ;
grpc_iomgr_object iomgr_object ;
@ -258,6 +269,7 @@ struct grpc_pollset_worker {
struct grpc_pollset {
gpr_mu mu ;
gpr_atm worker_count ;
gpr_atm active_pollable_type ;
pollable * active_pollable ;
bool kicked_without_poller ;
grpc_closure * shutdown_closure ;
@ -337,39 +349,10 @@ static void ref_by(grpc_fd* fd, int n) {
GPR_ASSERT ( gpr_atm_no_barrier_fetch_add ( & fd - > refst , n ) > 0 ) ;
}
# ifndef NDEBUG
# define INVALIDATE_FD(fd) invalidate_fd(fd)
/* Since an fd is never really destroyed (i.e gpr_free() is not called), it is
* hard to cases where fd fields are accessed even after calling fd_destroy ( ) .
* The following invalidates fd fields to make catching such errors easier */
static void invalidate_fd ( grpc_fd * fd ) {
fd - > fd = - 1 ;
fd - > salt = - 1 ;
gpr_atm_no_barrier_store ( & fd - > refst , - 1 ) ;
memset ( & fd - > orphan_mu , - 1 , sizeof ( fd - > orphan_mu ) ) ;
memset ( & fd - > pollable_mu , - 1 , sizeof ( fd - > pollable_mu ) ) ;
fd - > pollable_obj = nullptr ;
fd - > on_done_closure = nullptr ;
memset ( & fd - > iomgr_object , - 1 , sizeof ( fd - > iomgr_object ) ) ;
fd - > track_err = false ;
}
# else
# define INVALIDATE_FD(fd)
# endif
/* Uninitialize and add to the freelist */
static void fd_destroy ( void * arg , grpc_error * error ) {
grpc_fd * fd = static_cast < grpc_fd * > ( arg ) ;
grpc_iomgr_unregister_object ( & fd - > iomgr_object ) ;
POLLABLE_UNREF ( fd - > pollable_obj , " fd_pollable " ) ;
gpr_mu_destroy ( & fd - > pollable_mu ) ;
gpr_mu_destroy ( & fd - > orphan_mu ) ;
fd - > read_closure - > DestroyEvent ( ) ;
fd - > write_closure - > DestroyEvent ( ) ;
fd - > error_closure - > DestroyEvent ( ) ;
INVALIDATE_FD ( fd ) ;
fd - > destroy ( ) ;
/* Add the fd to the freelist */
gpr_mu_lock ( & fd_freelist_mu ) ;
@ -429,35 +412,9 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
if ( new_fd = = nullptr ) {
new_fd = static_cast < grpc_fd * > ( gpr_malloc ( sizeof ( grpc_fd ) ) ) ;
new_fd - > read_closure . Init ( ) ;
new_fd - > write_closure . Init ( ) ;
new_fd - > error_closure . Init ( ) ;
}
new_fd - > fd = fd ;
new_fd - > salt = gpr_atm_no_barrier_fetch_add ( & g_fd_salt , 1 ) ;
gpr_atm_rel_store ( & new_fd - > refst , ( gpr_atm ) 1 ) ;
gpr_mu_init ( & new_fd - > orphan_mu ) ;
gpr_mu_init ( & new_fd - > pollable_mu ) ;
new_fd - > pollable_obj = nullptr ;
new_fd - > read_closure - > InitEvent ( ) ;
new_fd - > write_closure - > InitEvent ( ) ;
new_fd - > error_closure - > InitEvent ( ) ;
new_fd - > freelist_next = nullptr ;
new_fd - > on_done_closure = nullptr ;
char * fd_name ;
gpr_asprintf ( & fd_name , " %s fd=%d " , name , fd ) ;
grpc_iomgr_register_object ( & new_fd - > iomgr_object , fd_name ) ;
# ifndef NDEBUG
if ( grpc_trace_fd_refcount . enabled ( ) ) {
gpr_log ( GPR_DEBUG , " FD %d %p create %s " , fd , new_fd , fd_name ) ;
}
# endif
gpr_free ( fd_name ) ;
new_fd - > track_err = track_err ;
return new_fd ;
return new ( new_fd ) grpc_fd ( fd , name , track_err ) ;
}
static int fd_wrapped_fd ( grpc_fd * fd ) {
@ -475,7 +432,6 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
// true so that the pollable will no longer access its owner_fd field.
gpr_mu_lock ( & fd - > pollable_mu ) ;
pollable * pollable_obj = fd - > pollable_obj ;
gpr_mu_unlock ( & fd - > pollable_mu ) ;
if ( pollable_obj ) {
gpr_mu_lock ( & pollable_obj - > owner_orphan_mu ) ;
@ -487,6 +443,19 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
/* If release_fd is not NULL, we should be relinquishing control of the file
descriptor fd - > fd ( but we still own the grpc_fd structure ) . */
if ( release_fd ! = nullptr ) {
// Remove the FD from all epolls sets, before releasing it.
// Otherwise, we will receive epoll events after we release the FD.
epoll_event ev_fd ;
memset ( & ev_fd , 0 , sizeof ( ev_fd ) ) ;
if ( release_fd ! = nullptr ) {
if ( pollable_obj ! = nullptr ) { // For PO_FD.
epoll_ctl ( pollable_obj - > epfd , EPOLL_CTL_DEL , fd - > fd , & ev_fd ) ;
}
for ( size_t i = 0 ; i < fd - > pollset_fds . size ( ) ; + + i ) { // For PO_MULTI.
const int epfd = fd - > pollset_fds [ i ] ;
epoll_ctl ( epfd , EPOLL_CTL_DEL , fd - > fd , & ev_fd ) ;
}
}
* release_fd = fd - > fd ;
} else {
close ( fd - > fd ) ;
@ -508,40 +477,58 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
gpr_mu_unlock ( & pollable_obj - > owner_orphan_mu ) ;
}
gpr_mu_unlock ( & fd - > pollable_mu ) ;
gpr_mu_unlock ( & fd - > orphan_mu ) ;
UNREF_BY ( fd , 2 , reason ) ; /* Drop the reference */
}
static bool fd_is_shutdown ( grpc_fd * fd ) {
return fd - > read_closure - > IsShutdown ( ) ;
return fd - > read_closure . IsShutdown ( ) ;
}
/* Might be called multiple times */
static void fd_shutdown ( grpc_fd * fd , grpc_error * why ) {
if ( fd - > read_closure - > SetShutdown ( GRPC_ERROR_REF ( why ) ) ) {
if ( fd - > read_closure . SetShutdown ( GRPC_ERROR_REF ( why ) ) ) {
if ( shutdown ( fd - > fd , SHUT_RDWR ) ) {
if ( errno ! = ENOTCONN ) {
gpr_log ( GPR_ERROR , " Error shutting down fd %d. errno: %d " ,
grpc_fd_wrapped_fd ( fd ) , errno ) ;
}
}
fd - > write_closure - > SetShutdown ( GRPC_ERROR_REF ( why ) ) ;
fd - > error_closure - > SetShutdown ( GRPC_ERROR_REF ( why ) ) ;
fd - > write_closure . SetShutdown ( GRPC_ERROR_REF ( why ) ) ;
fd - > error_closure . SetShutdown ( GRPC_ERROR_REF ( why ) ) ;
}
GRPC_ERROR_UNREF ( why ) ;
}
static void fd_notify_on_read ( grpc_fd * fd , grpc_closure * closure ) {
fd - > read_closure - > NotifyOn ( closure ) ;
fd - > read_closure . NotifyOn ( closure ) ;
}
static void fd_notify_on_write ( grpc_fd * fd , grpc_closure * closure ) {
fd - > write_closure - > NotifyOn ( closure ) ;
fd - > write_closure . NotifyOn ( closure ) ;
}
static void fd_notify_on_error ( grpc_fd * fd , grpc_closure * closure ) {
fd - > error_closure - > NotifyOn ( closure ) ;
fd - > error_closure . NotifyOn ( closure ) ;
}
static bool fd_has_pollset ( grpc_fd * fd , grpc_pollset * pollset ) {
const int epfd = pollset - > active_pollable - > epfd ;
grpc_core : : MutexLock lock ( & fd - > pollable_mu ) ;
for ( size_t i = 0 ; i < fd - > pollset_fds . size ( ) ; + + i ) {
if ( fd - > pollset_fds [ i ] = = epfd ) {
return true ;
}
}
return false ;
}
static void fd_add_pollset ( grpc_fd * fd , grpc_pollset * pollset ) {
const int epfd = pollset - > active_pollable - > epfd ;
grpc_core : : MutexLock lock ( & fd - > pollable_mu ) ;
fd - > pollset_fds . push_back ( epfd ) ;
}
/*******************************************************************************
@ -594,8 +581,6 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) {
( * p ) - > root_worker = nullptr ;
( * p ) - > event_cursor = 0 ;
( * p ) - > event_count = 0 ;
( * p ) - > fd_cache_size = 0 ;
( * p ) - > fd_cache_counter = 0 ;
return GRPC_ERROR_NONE ;
}
@ -637,39 +622,6 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
grpc_error * error = GRPC_ERROR_NONE ;
static const char * err_desc = " pollable_add_fd " ;
const int epfd = p - > epfd ;
gpr_mu_lock ( & p - > mu ) ;
p - > fd_cache_counter + + ;
// Handle the case of overflow for our cache counter by
// reseting the recency-counter on all cache objects
if ( p - > fd_cache_counter = = 0 ) {
for ( int i = 0 ; i < p - > fd_cache_size ; i + + ) {
p - > fd_cache [ i ] . last_used = 0 ;
}
}
int lru_idx = 0 ;
for ( int i = 0 ; i < p - > fd_cache_size ; i + + ) {
if ( p - > fd_cache [ i ] . fd = = fd - > fd & & p - > fd_cache [ i ] . salt = = fd - > salt ) {
GRPC_STATS_INC_POLLSET_FD_CACHE_HITS ( ) ;
p - > fd_cache [ i ] . last_used = p - > fd_cache_counter ;
gpr_mu_unlock ( & p - > mu ) ;
return GRPC_ERROR_NONE ;
} else if ( p - > fd_cache [ i ] . last_used < p - > fd_cache [ lru_idx ] . last_used ) {
lru_idx = i ;
}
}
// Add to cache
if ( p - > fd_cache_size < MAX_FDS_IN_CACHE ) {
lru_idx = p - > fd_cache_size ;
p - > fd_cache_size + + ;
}
p - > fd_cache [ lru_idx ] . fd = fd - > fd ;
p - > fd_cache [ lru_idx ] . salt = fd - > salt ;
p - > fd_cache [ lru_idx ] . last_used = p - > fd_cache_counter ;
gpr_mu_unlock ( & p - > mu ) ;
if ( grpc_polling_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " add fd %p (%d) to pollable %p " , fd , fd - > fd , p ) ;
}
@ -849,6 +801,7 @@ static grpc_error* pollset_kick_all(grpc_pollset* pollset) {
static void pollset_init ( grpc_pollset * pollset , gpr_mu * * mu ) {
gpr_mu_init ( & pollset - > mu ) ;
gpr_atm_no_barrier_store ( & pollset - > worker_count , 0 ) ;
gpr_atm_no_barrier_store ( & pollset - > active_pollable_type , PO_EMPTY ) ;
pollset - > active_pollable = POLLABLE_REF ( g_empty_pollable , " pollset " ) ;
pollset - > kicked_without_poller = false ;
pollset - > shutdown_closure = nullptr ;
@ -869,11 +822,11 @@ static int poll_deadline_to_millis_timeout(grpc_millis millis) {
return static_cast < int > ( delta ) ;
}
static void fd_become_readable ( grpc_fd * fd ) { fd - > read_closure - > SetReady ( ) ; }
static void fd_become_readable ( grpc_fd * fd ) { fd - > read_closure . SetReady ( ) ; }
static void fd_become_writable ( grpc_fd * fd ) { fd - > write_closure - > SetReady ( ) ; }
static void fd_become_writable ( grpc_fd * fd ) { fd - > write_closure . SetReady ( ) ; }
static void fd_has_errors ( grpc_fd * fd ) { fd - > error_closure - > SetReady ( ) ; }
static void fd_has_errors ( grpc_fd * fd ) { fd - > error_closure . SetReady ( ) ; }
/* Get the pollable_obj attached to this fd. If none is attached, create a new
* pollable object ( of type PO_FD ) , attach it to the fd and return it
@ -1283,6 +1236,8 @@ static grpc_error* pollset_add_fd_locked(grpc_pollset* pollset, grpc_fd* fd) {
POLLABLE_UNREF ( pollset - > active_pollable , " pollset " ) ;
pollset - > active_pollable = po_at_start ;
} else {
gpr_atm_rel_store ( & pollset - > active_pollable_type ,
pollset - > active_pollable - > type ) ;
POLLABLE_UNREF ( po_at_start , " pollset_add_fd " ) ;
}
return error ;
@ -1329,6 +1284,8 @@ static grpc_error* pollset_as_multipollable_locked(grpc_pollset* pollset,
pollset - > active_pollable = po_at_start ;
* pollable_obj = nullptr ;
} else {
gpr_atm_rel_store ( & pollset - > active_pollable_type ,
pollset - > active_pollable - > type ) ;
* pollable_obj = POLLABLE_REF ( pollset - > active_pollable , " pollset_set " ) ;
POLLABLE_UNREF ( po_at_start , " pollset_as_multipollable " ) ;
}
@ -1337,9 +1294,23 @@ static grpc_error* pollset_as_multipollable_locked(grpc_pollset* pollset,
static void pollset_add_fd ( grpc_pollset * pollset , grpc_fd * fd ) {
GPR_TIMER_SCOPE ( " pollset_add_fd " , 0 ) ;
gpr_mu_lock ( & pollset - > mu ) ;
// We never transition from PO_MULTI to other modes (i.e., PO_FD or PO_EMOPTY)
// and, thus, it is safe to simply store and check whether the FD has already
// been added to the active pollable previously.
if ( gpr_atm_acq_load ( & pollset - > active_pollable_type ) = = PO_MULTI & &
fd_has_pollset ( fd , pollset ) ) {
return ;
}
grpc_core : : MutexLock lock ( & pollset - > mu ) ;
grpc_error * error = pollset_add_fd_locked ( pollset , fd ) ;
gpr_mu_unlock ( & pollset - > mu ) ;
// If we are in PO_MULTI mode, we should update the pollsets of the FD.
if ( gpr_atm_no_barrier_load ( & pollset - > active_pollable_type ) = = PO_MULTI ) {
fd_add_pollset ( fd , pollset ) ;
}
GRPC_LOG_IF_ERROR ( " pollset_add_fd " , error ) ;
}