|
|
|
@ -63,7 +63,7 @@ |
|
|
|
|
// a keepalive ping timeout issue. We may want to revert https://github
|
|
|
|
|
// .com/grpc/grpc/pull/14943 once we figure out the root cause.
|
|
|
|
|
#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 16 |
|
|
|
|
#define MAX_PROBE_EPOLL_FDS 32 |
|
|
|
|
#define MAX_FDS_IN_CACHE 32 |
|
|
|
|
|
|
|
|
|
grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false, |
|
|
|
|
"pollable_refcount"); |
|
|
|
@ -77,8 +77,14 @@ typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type; |
|
|
|
|
typedef struct pollable pollable; |
|
|
|
|
|
|
|
|
|
typedef struct cached_fd { |
|
|
|
|
// Set to the grpc_fd's salt value. See 'salt' variable' in grpc_fd for more
|
|
|
|
|
// details
|
|
|
|
|
intptr_t salt; |
|
|
|
|
|
|
|
|
|
// The underlying fd
|
|
|
|
|
int fd; |
|
|
|
|
|
|
|
|
|
// A recency time counter that helps to determine the LRU fd in the cache
|
|
|
|
|
uint64_t last_used; |
|
|
|
|
} cached_fd; |
|
|
|
|
|
|
|
|
@ -111,10 +117,32 @@ struct pollable { |
|
|
|
|
int event_count; |
|
|
|
|
struct epoll_event events[MAX_EPOLL_EVENTS]; |
|
|
|
|
|
|
|
|
|
// Maintain a LRU-eviction cache of fds in this pollable
|
|
|
|
|
cached_fd fd_cache[MAX_PROBE_EPOLL_FDS]; |
|
|
|
|
// We may be calling pollable_add_fd() on the same (pollable, fd) multiple
|
|
|
|
|
// times. To prevent pollable_add_fd() from making multiple sys calls to
|
|
|
|
|
// epoll_ctl() to add the fd, we maintain a cache of what fds are already
|
|
|
|
|
// present in the underlying epoll-set.
|
|
|
|
|
//
|
|
|
|
|
// Since this is not a correctness issue, we do not need to maintain all the
|
|
|
|
|
// fds in the cache. Hence we just use an LRU cache of size 'MAX_FDS_IN_CACHE'
|
|
|
|
|
//
|
|
|
|
|
// NOTE: An ideal implementation of this should do the following:
|
|
|
|
|
// 1) Add fds to the cache in pollable_add_fd() function (i.e whenever the fd
|
|
|
|
|
// is added to the pollable's epoll set)
|
|
|
|
|
// 2) Remove the fd from the cache whenever the fd is removed from the
|
|
|
|
|
// underlying epoll set (i.e whenever fd_orphan() is called).
|
|
|
|
|
//
|
|
|
|
|
// Implementing (2) above (i.e removing fds from cache on fd_orphan) adds a
|
|
|
|
|
// lot of complexity since an fd can be present in multiple pollalbles. So our
|
|
|
|
|
// implementation ONLY DOES (1) and NOT (2).
|
|
|
|
|
//
|
|
|
|
|
// The cache_fd.salt variable helps here to maintain correctness (it serves as
|
|
|
|
|
// an epoch that differentiates one grpc_fd from the other even though both of
|
|
|
|
|
// them may have the same fd number)
|
|
|
|
|
//
|
|
|
|
|
// The following implements LRU-eviction cache of fds in this pollable
|
|
|
|
|
cached_fd fd_cache[MAX_FDS_IN_CACHE]; |
|
|
|
|
int fd_cache_size; |
|
|
|
|
uint64_t fd_cache_counter; |
|
|
|
|
uint64_t fd_cache_counter; // Recency timer tick counter
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
static const char* pollable_type_string(pollable_type t) { |
|
|
|
@ -157,15 +185,24 @@ static void pollable_unref(pollable* p, int line, const char* reason); |
|
|
|
|
* Fd Declarations |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
// Monotonically increasing Epoch counter that is assinged to each grpc_fd. See
|
|
|
|
|
// the description of 'salt' variable in 'grpc_fd' for more details
|
|
|
|
|
// TODO: (sreek/kpayson) gpr_atm is intptr_t which may not be wide-enough on
|
|
|
|
|
// 32-bit systems. Change this to int_64 - atleast on 32-bit systems
|
|
|
|
|
static gpr_atm g_fd_salt; |
|
|
|
|
|
|
|
|
|
struct grpc_fd { |
|
|
|
|
int fd; |
|
|
|
|
|
|
|
|
|
// Since fd numbers can be reused (after old fds are closed), this serves as
|
|
|
|
|
// an epoch that uniquely identifies this fd (i.e the pair (salt, fd) is
|
|
|
|
|
// unique (until the salt counter (i.e g_fd_salt) overflows)
|
|
|
|
|
intptr_t salt; |
|
|
|
|
/* refst format:
|
|
|
|
|
bit 0 : 1=Active / 0=Orphaned |
|
|
|
|
bits 1-n : refcount |
|
|
|
|
Ref/Unref by two to avoid altering the orphaned bit */ |
|
|
|
|
|
|
|
|
|
// refst format:
|
|
|
|
|
// bit 0 : 1=Active / 0=Orphaned
|
|
|
|
|
// bits 1-n : refcount
|
|
|
|
|
// Ref/Unref by two to avoid altering the orphaned bit
|
|
|
|
|
gpr_atm refst; |
|
|
|
|
|
|
|
|
|
gpr_mu orphan_mu; |
|
|
|
@ -180,13 +217,13 @@ struct grpc_fd { |
|
|
|
|
struct grpc_fd* freelist_next; |
|
|
|
|
grpc_closure* on_done_closure; |
|
|
|
|
|
|
|
|
|
/* The pollset that last noticed that the fd is readable. The actual type
|
|
|
|
|
* stored in this is (grpc_pollset *) */ |
|
|
|
|
// The pollset that last noticed that the fd is readable. The actual type
|
|
|
|
|
// stored in this is (grpc_pollset *)
|
|
|
|
|
gpr_atm read_notifier_pollset; |
|
|
|
|
|
|
|
|
|
grpc_iomgr_object iomgr_object; |
|
|
|
|
|
|
|
|
|
/* Do we need to track EPOLLERR events separately? */ |
|
|
|
|
// Do we need to track EPOLLERR events separately?
|
|
|
|
|
bool track_err; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
@ -562,6 +599,7 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { |
|
|
|
|
const int epfd = p->epfd; |
|
|
|
|
gpr_mu_lock(&p->mu); |
|
|
|
|
p->fd_cache_counter++; |
|
|
|
|
|
|
|
|
|
// Handle the case of overflow for our cache counter by
|
|
|
|
|
// reseting the recency-counter on all cache objects
|
|
|
|
|
if (p->fd_cache_counter == 0) { |
|
|
|
@ -581,8 +619,9 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { |
|
|
|
|
lru_idx = i; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Add to cache
|
|
|
|
|
if (p->fd_cache_size < MAX_PROBE_EPOLL_FDS) { |
|
|
|
|
if (p->fd_cache_size < MAX_FDS_IN_CACHE) { |
|
|
|
|
lru_idx = p->fd_cache_size; |
|
|
|
|
p->fd_cache_size++; |
|
|
|
|
} |
|
|
|
@ -590,6 +629,7 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { |
|
|
|
|
p->fd_cache[lru_idx].salt = fd->salt; |
|
|
|
|
p->fd_cache[lru_idx].last_used = p->fd_cache_counter; |
|
|
|
|
gpr_mu_unlock(&p->mu); |
|
|
|
|
|
|
|
|
|
if (grpc_polling_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, "add fd %p (%d) to pollable %p", fd, fd->fd, p); |
|
|
|
|
} |
|
|
|
|