|
|
|
@ -45,7 +45,6 @@ |
|
|
|
|
#include "src/core/lib/gpr/spinlock.h" |
|
|
|
|
#include "src/core/lib/gpr/tls.h" |
|
|
|
|
#include "src/core/lib/gpr/useful.h" |
|
|
|
|
#include "src/core/lib/gprpp/inlined_vector.h" |
|
|
|
|
#include "src/core/lib/gprpp/manual_constructor.h" |
|
|
|
|
#include "src/core/lib/gprpp/mutex_lock.h" |
|
|
|
|
#include "src/core/lib/iomgr/block_annotate.h" |
|
|
|
@ -79,6 +78,18 @@ typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type; |
|
|
|
|
|
|
|
|
|
typedef struct pollable pollable; |
|
|
|
|
|
|
|
|
|
typedef struct cached_fd { |
|
|
|
|
// Set to the grpc_fd's salt value. See 'salt' variable' in grpc_fd for more
|
|
|
|
|
// details
|
|
|
|
|
intptr_t salt; |
|
|
|
|
|
|
|
|
|
// The underlying fd
|
|
|
|
|
int fd; |
|
|
|
|
|
|
|
|
|
// A recency time counter that helps to determine the LRU fd in the cache
|
|
|
|
|
uint64_t last_used; |
|
|
|
|
} cached_fd; |
|
|
|
|
|
|
|
|
|
/// A pollable is something that can be polled: it has an epoll set to poll on,
|
|
|
|
|
/// and a wakeup fd for kicks
|
|
|
|
|
/// There are three broad types:
|
|
|
|
@ -109,6 +120,33 @@ struct pollable { |
|
|
|
|
int event_cursor; |
|
|
|
|
int event_count; |
|
|
|
|
struct epoll_event events[MAX_EPOLL_EVENTS]; |
|
|
|
|
|
|
|
|
|
// We may be calling pollable_add_fd() on the same (pollable, fd) multiple
|
|
|
|
|
// times. To prevent pollable_add_fd() from making multiple sys calls to
|
|
|
|
|
// epoll_ctl() to add the fd, we maintain a cache of what fds are already
|
|
|
|
|
// present in the underlying epoll-set.
|
|
|
|
|
//
|
|
|
|
|
// Since this is not a correctness issue, we do not need to maintain all the
|
|
|
|
|
// fds in the cache. Hence we just use an LRU cache of size 'MAX_FDS_IN_CACHE'
|
|
|
|
|
//
|
|
|
|
|
// NOTE: An ideal implementation of this should do the following:
|
|
|
|
|
// 1) Add fds to the cache in pollable_add_fd() function (i.e whenever the fd
|
|
|
|
|
// is added to the pollable's epoll set)
|
|
|
|
|
// 2) Remove the fd from the cache whenever the fd is removed from the
|
|
|
|
|
// underlying epoll set (i.e whenever fd_orphan() is called).
|
|
|
|
|
//
|
|
|
|
|
// Implementing (2) above (i.e removing fds from cache on fd_orphan) adds a
|
|
|
|
|
// lot of complexity since an fd can be present in multiple pollables. So our
|
|
|
|
|
// implementation ONLY DOES (1) and NOT (2).
|
|
|
|
|
//
|
|
|
|
|
// The cache_fd.salt variable helps here to maintain correctness (it serves as
|
|
|
|
|
// an epoch that differentiates one grpc_fd from the other even though both of
|
|
|
|
|
// them may have the same fd number)
|
|
|
|
|
//
|
|
|
|
|
// The following implements LRU-eviction cache of fds in this pollable
|
|
|
|
|
cached_fd fd_cache[MAX_FDS_IN_CACHE]; |
|
|
|
|
int fd_cache_size; |
|
|
|
|
uint64_t fd_cache_counter; // Recency timer tick counter
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
static const char* pollable_type_string(pollable_type t) { |
|
|
|
@ -151,86 +189,37 @@ static void pollable_unref(pollable* p, int line, const char* reason); |
|
|
|
|
* Fd Declarations |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
struct grpc_fd { |
|
|
|
|
grpc_fd(int fd, const char* name, bool track_err) |
|
|
|
|
: fd(fd), track_err(track_err) { |
|
|
|
|
gpr_mu_init(&orphan_mu); |
|
|
|
|
gpr_mu_init(&pollable_mu); |
|
|
|
|
read_closure.InitEvent(); |
|
|
|
|
write_closure.InitEvent(); |
|
|
|
|
error_closure.InitEvent(); |
|
|
|
|
|
|
|
|
|
char* fd_name; |
|
|
|
|
gpr_asprintf(&fd_name, "%s fd=%d", name, fd); |
|
|
|
|
grpc_iomgr_register_object(&iomgr_object, fd_name); |
|
|
|
|
#ifndef NDEBUG |
|
|
|
|
if (grpc_trace_fd_refcount.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, this, fd_name); |
|
|
|
|
} |
|
|
|
|
#endif |
|
|
|
|
gpr_free(fd_name); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// This is really the dtor, but the poller threads waking up from
|
|
|
|
|
// epoll_wait() may access the (read|write|error)_closure after destruction.
|
|
|
|
|
// Since the object will be added to the free pool, this behavior is
|
|
|
|
|
// not going to cause issues, except spurious events if the FD is reused
|
|
|
|
|
// while the race happens.
|
|
|
|
|
void destroy() { |
|
|
|
|
grpc_iomgr_unregister_object(&iomgr_object); |
|
|
|
|
|
|
|
|
|
POLLABLE_UNREF(pollable_obj, "fd_pollable"); |
|
|
|
|
pollsets.clear(); |
|
|
|
|
gpr_mu_destroy(&pollable_mu); |
|
|
|
|
gpr_mu_destroy(&orphan_mu); |
|
|
|
|
|
|
|
|
|
read_closure.DestroyEvent(); |
|
|
|
|
write_closure.DestroyEvent(); |
|
|
|
|
error_closure.DestroyEvent(); |
|
|
|
|
|
|
|
|
|
invalidate(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#ifndef NDEBUG |
|
|
|
|
/* Since an fd is never really destroyed (i.e gpr_free() is not called), it is
|
|
|
|
|
* hard-to-debug cases where fd fields are accessed even after calling |
|
|
|
|
* fd_destroy(). The following invalidates fd fields to make catching such |
|
|
|
|
* errors easier */ |
|
|
|
|
void invalidate() { |
|
|
|
|
fd = -1; |
|
|
|
|
gpr_atm_no_barrier_store(&refst, -1); |
|
|
|
|
memset(&orphan_mu, -1, sizeof(orphan_mu)); |
|
|
|
|
memset(&pollable_mu, -1, sizeof(pollable_mu)); |
|
|
|
|
pollable_obj = nullptr; |
|
|
|
|
on_done_closure = nullptr; |
|
|
|
|
memset(&iomgr_object, -1, sizeof(iomgr_object)); |
|
|
|
|
track_err = false; |
|
|
|
|
} |
|
|
|
|
#else |
|
|
|
|
void invalidate() {} |
|
|
|
|
#endif |
|
|
|
|
// Monotonically increasing Epoch counter that is assinged to each grpc_fd. See
|
|
|
|
|
// the description of 'salt' variable in 'grpc_fd' for more details
|
|
|
|
|
// TODO: (sreek/kpayson) gpr_atm is intptr_t which may not be wide-enough on
|
|
|
|
|
// 32-bit systems. Change this to int_64 - atleast on 32-bit systems
|
|
|
|
|
static gpr_atm g_fd_salt; |
|
|
|
|
|
|
|
|
|
struct grpc_fd { |
|
|
|
|
int fd; |
|
|
|
|
|
|
|
|
|
// Since fd numbers can be reused (after old fds are closed), this serves as
|
|
|
|
|
// an epoch that uniquely identifies this fd (i.e the pair (salt, fd) is
|
|
|
|
|
// unique (until the salt counter (i.e g_fd_salt) overflows)
|
|
|
|
|
intptr_t salt; |
|
|
|
|
|
|
|
|
|
// refst format:
|
|
|
|
|
// bit 0 : 1=Active / 0=Orphaned
|
|
|
|
|
// bits 1-n : refcount
|
|
|
|
|
// Ref/Unref by two to avoid altering the orphaned bit
|
|
|
|
|
gpr_atm refst = 1; |
|
|
|
|
gpr_atm refst; |
|
|
|
|
|
|
|
|
|
gpr_mu orphan_mu; |
|
|
|
|
|
|
|
|
|
// Protects pollable_obj and pollsets.
|
|
|
|
|
gpr_mu pollable_mu; |
|
|
|
|
grpc_core::InlinedVector<grpc_pollset*, 1> pollsets; // Used in PO_MULTI.
|
|
|
|
|
pollable* pollable_obj = nullptr; // Used in PO_FD.
|
|
|
|
|
pollable* pollable_obj; |
|
|
|
|
|
|
|
|
|
grpc_core::LockfreeEvent read_closure; |
|
|
|
|
grpc_core::LockfreeEvent write_closure; |
|
|
|
|
grpc_core::LockfreeEvent error_closure; |
|
|
|
|
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure; |
|
|
|
|
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure; |
|
|
|
|
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure; |
|
|
|
|
|
|
|
|
|
struct grpc_fd* freelist_next = nullptr; |
|
|
|
|
grpc_closure* on_done_closure = nullptr; |
|
|
|
|
struct grpc_fd* freelist_next; |
|
|
|
|
grpc_closure* on_done_closure; |
|
|
|
|
|
|
|
|
|
grpc_iomgr_object iomgr_object; |
|
|
|
|
|
|
|
|
@ -269,7 +258,6 @@ struct grpc_pollset_worker { |
|
|
|
|
struct grpc_pollset { |
|
|
|
|
gpr_mu mu; |
|
|
|
|
gpr_atm worker_count; |
|
|
|
|
gpr_atm active_pollable_type; |
|
|
|
|
pollable* active_pollable; |
|
|
|
|
bool kicked_without_poller; |
|
|
|
|
grpc_closure* shutdown_closure; |
|
|
|
@ -349,10 +337,39 @@ static void ref_by(grpc_fd* fd, int n) { |
|
|
|
|
GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#ifndef NDEBUG |
|
|
|
|
#define INVALIDATE_FD(fd) invalidate_fd(fd) |
|
|
|
|
/* Since an fd is never really destroyed (i.e gpr_free() is not called), it is
|
|
|
|
|
* hard to cases where fd fields are accessed even after calling fd_destroy(). |
|
|
|
|
* The following invalidates fd fields to make catching such errors easier */ |
|
|
|
|
static void invalidate_fd(grpc_fd* fd) { |
|
|
|
|
fd->fd = -1; |
|
|
|
|
fd->salt = -1; |
|
|
|
|
gpr_atm_no_barrier_store(&fd->refst, -1); |
|
|
|
|
memset(&fd->orphan_mu, -1, sizeof(fd->orphan_mu)); |
|
|
|
|
memset(&fd->pollable_mu, -1, sizeof(fd->pollable_mu)); |
|
|
|
|
fd->pollable_obj = nullptr; |
|
|
|
|
fd->on_done_closure = nullptr; |
|
|
|
|
memset(&fd->iomgr_object, -1, sizeof(fd->iomgr_object)); |
|
|
|
|
fd->track_err = false; |
|
|
|
|
} |
|
|
|
|
#else |
|
|
|
|
#define INVALIDATE_FD(fd) |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
/* Uninitialize and add to the freelist */ |
|
|
|
|
static void fd_destroy(void* arg, grpc_error* error) { |
|
|
|
|
grpc_fd* fd = static_cast<grpc_fd*>(arg); |
|
|
|
|
fd->destroy(); |
|
|
|
|
grpc_iomgr_unregister_object(&fd->iomgr_object); |
|
|
|
|
POLLABLE_UNREF(fd->pollable_obj, "fd_pollable"); |
|
|
|
|
gpr_mu_destroy(&fd->pollable_mu); |
|
|
|
|
gpr_mu_destroy(&fd->orphan_mu); |
|
|
|
|
|
|
|
|
|
fd->read_closure->DestroyEvent(); |
|
|
|
|
fd->write_closure->DestroyEvent(); |
|
|
|
|
fd->error_closure->DestroyEvent(); |
|
|
|
|
|
|
|
|
|
INVALIDATE_FD(fd); |
|
|
|
|
|
|
|
|
|
/* Add the fd to the freelist */ |
|
|
|
|
gpr_mu_lock(&fd_freelist_mu); |
|
|
|
@ -412,9 +429,35 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) { |
|
|
|
|
|
|
|
|
|
if (new_fd == nullptr) { |
|
|
|
|
new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd))); |
|
|
|
|
new_fd->read_closure.Init(); |
|
|
|
|
new_fd->write_closure.Init(); |
|
|
|
|
new_fd->error_closure.Init(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
new_fd->fd = fd; |
|
|
|
|
new_fd->salt = gpr_atm_no_barrier_fetch_add(&g_fd_salt, 1); |
|
|
|
|
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); |
|
|
|
|
gpr_mu_init(&new_fd->orphan_mu); |
|
|
|
|
gpr_mu_init(&new_fd->pollable_mu); |
|
|
|
|
new_fd->pollable_obj = nullptr; |
|
|
|
|
new_fd->read_closure->InitEvent(); |
|
|
|
|
new_fd->write_closure->InitEvent(); |
|
|
|
|
new_fd->error_closure->InitEvent(); |
|
|
|
|
new_fd->freelist_next = nullptr; |
|
|
|
|
new_fd->on_done_closure = nullptr; |
|
|
|
|
|
|
|
|
|
char* fd_name; |
|
|
|
|
gpr_asprintf(&fd_name, "%s fd=%d", name, fd); |
|
|
|
|
grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name); |
|
|
|
|
#ifndef NDEBUG |
|
|
|
|
if (grpc_trace_fd_refcount.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name); |
|
|
|
|
} |
|
|
|
|
#endif |
|
|
|
|
gpr_free(fd_name); |
|
|
|
|
|
|
|
|
|
return new (new_fd) grpc_fd(fd, name, track_err); |
|
|
|
|
new_fd->track_err = track_err; |
|
|
|
|
return new_fd; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int fd_wrapped_fd(grpc_fd* fd) { |
|
|
|
@ -422,7 +465,6 @@ static int fd_wrapped_fd(grpc_fd* fd) { |
|
|
|
|
return (gpr_atm_acq_load(&fd->refst) & 1) ? ret_fd : -1; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int pollset_epoll_fd_locked(grpc_pollset* pollset); |
|
|
|
|
static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, |
|
|
|
|
const char* reason) { |
|
|
|
|
bool is_fd_closed = false; |
|
|
|
@ -433,6 +475,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, |
|
|
|
|
// true so that the pollable will no longer access its owner_fd field.
|
|
|
|
|
gpr_mu_lock(&fd->pollable_mu); |
|
|
|
|
pollable* pollable_obj = fd->pollable_obj; |
|
|
|
|
gpr_mu_unlock(&fd->pollable_mu); |
|
|
|
|
|
|
|
|
|
if (pollable_obj) { |
|
|
|
|
gpr_mu_lock(&pollable_obj->owner_orphan_mu); |
|
|
|
@ -444,20 +487,6 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, |
|
|
|
|
/* If release_fd is not NULL, we should be relinquishing control of the file
|
|
|
|
|
descriptor fd->fd (but we still own the grpc_fd structure). */ |
|
|
|
|
if (release_fd != nullptr) { |
|
|
|
|
// Remove the FD from all epolls sets, before releasing it.
|
|
|
|
|
// Otherwise, we will receive epoll events after we release the FD.
|
|
|
|
|
epoll_event ev_fd; |
|
|
|
|
memset(&ev_fd, 0, sizeof(ev_fd)); |
|
|
|
|
if (release_fd != nullptr) { |
|
|
|
|
if (pollable_obj != nullptr) { // For PO_FD.
|
|
|
|
|
epoll_ctl(pollable_obj->epfd, EPOLL_CTL_DEL, fd->fd, &ev_fd); |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < fd->pollsets.size(); ++i) { // For PO_MULTI.
|
|
|
|
|
grpc_pollset* pollset = fd->pollsets[i]; |
|
|
|
|
const int epfd = pollset_epoll_fd_locked(pollset); |
|
|
|
|
epoll_ctl(epfd, EPOLL_CTL_DEL, fd->fd, &ev_fd); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
*release_fd = fd->fd; |
|
|
|
|
} else { |
|
|
|
|
close(fd->fd); |
|
|
|
@ -479,56 +508,40 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, |
|
|
|
|
gpr_mu_unlock(&pollable_obj->owner_orphan_mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&fd->pollable_mu); |
|
|
|
|
gpr_mu_unlock(&fd->orphan_mu); |
|
|
|
|
|
|
|
|
|
UNREF_BY(fd, 2, reason); /* Drop the reference */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static bool fd_is_shutdown(grpc_fd* fd) { |
|
|
|
|
return fd->read_closure.IsShutdown(); |
|
|
|
|
return fd->read_closure->IsShutdown(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Might be called multiple times */ |
|
|
|
|
static void fd_shutdown(grpc_fd* fd, grpc_error* why) { |
|
|
|
|
if (fd->read_closure.SetShutdown(GRPC_ERROR_REF(why))) { |
|
|
|
|
if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) { |
|
|
|
|
if (shutdown(fd->fd, SHUT_RDWR)) { |
|
|
|
|
if (errno != ENOTCONN) { |
|
|
|
|
gpr_log(GPR_ERROR, "Error shutting down fd %d. errno: %d", |
|
|
|
|
grpc_fd_wrapped_fd(fd), errno); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
fd->write_closure.SetShutdown(GRPC_ERROR_REF(why)); |
|
|
|
|
fd->error_closure.SetShutdown(GRPC_ERROR_REF(why)); |
|
|
|
|
fd->write_closure->SetShutdown(GRPC_ERROR_REF(why)); |
|
|
|
|
fd->error_closure->SetShutdown(GRPC_ERROR_REF(why)); |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(why); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) { |
|
|
|
|
fd->read_closure.NotifyOn(closure); |
|
|
|
|
fd->read_closure->NotifyOn(closure); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { |
|
|
|
|
fd->write_closure.NotifyOn(closure); |
|
|
|
|
fd->write_closure->NotifyOn(closure); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { |
|
|
|
|
fd->error_closure.NotifyOn(closure); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static bool fd_has_pollset(grpc_fd* fd, grpc_pollset* pollset) { |
|
|
|
|
grpc_core::MutexLock lock(&fd->pollable_mu); |
|
|
|
|
for (size_t i = 0; i < fd->pollsets.size(); ++i) { |
|
|
|
|
if (fd->pollsets[i] == pollset) { |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void fd_add_pollset(grpc_fd* fd, grpc_pollset* pollset) { |
|
|
|
|
grpc_core::MutexLock lock(&fd->pollable_mu); |
|
|
|
|
fd->pollsets.push_back(pollset); |
|
|
|
|
fd->error_closure->NotifyOn(closure); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*******************************************************************************
|
|
|
|
@ -581,6 +594,8 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) { |
|
|
|
|
(*p)->root_worker = nullptr; |
|
|
|
|
(*p)->event_cursor = 0; |
|
|
|
|
(*p)->event_count = 0; |
|
|
|
|
(*p)->fd_cache_size = 0; |
|
|
|
|
(*p)->fd_cache_counter = 0; |
|
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -622,6 +637,39 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { |
|
|
|
|
grpc_error* error = GRPC_ERROR_NONE; |
|
|
|
|
static const char* err_desc = "pollable_add_fd"; |
|
|
|
|
const int epfd = p->epfd; |
|
|
|
|
gpr_mu_lock(&p->mu); |
|
|
|
|
p->fd_cache_counter++; |
|
|
|
|
|
|
|
|
|
// Handle the case of overflow for our cache counter by
|
|
|
|
|
// reseting the recency-counter on all cache objects
|
|
|
|
|
if (p->fd_cache_counter == 0) { |
|
|
|
|
for (int i = 0; i < p->fd_cache_size; i++) { |
|
|
|
|
p->fd_cache[i].last_used = 0; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int lru_idx = 0; |
|
|
|
|
for (int i = 0; i < p->fd_cache_size; i++) { |
|
|
|
|
if (p->fd_cache[i].fd == fd->fd && p->fd_cache[i].salt == fd->salt) { |
|
|
|
|
GRPC_STATS_INC_POLLSET_FD_CACHE_HITS(); |
|
|
|
|
p->fd_cache[i].last_used = p->fd_cache_counter; |
|
|
|
|
gpr_mu_unlock(&p->mu); |
|
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
|
} else if (p->fd_cache[i].last_used < p->fd_cache[lru_idx].last_used) { |
|
|
|
|
lru_idx = i; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Add to cache
|
|
|
|
|
if (p->fd_cache_size < MAX_FDS_IN_CACHE) { |
|
|
|
|
lru_idx = p->fd_cache_size; |
|
|
|
|
p->fd_cache_size++; |
|
|
|
|
} |
|
|
|
|
p->fd_cache[lru_idx].fd = fd->fd; |
|
|
|
|
p->fd_cache[lru_idx].salt = fd->salt; |
|
|
|
|
p->fd_cache[lru_idx].last_used = p->fd_cache_counter; |
|
|
|
|
gpr_mu_unlock(&p->mu); |
|
|
|
|
|
|
|
|
|
if (grpc_polling_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, "add fd %p (%d) to pollable %p", fd, fd->fd, p); |
|
|
|
|
} |
|
|
|
@ -801,7 +849,6 @@ static grpc_error* pollset_kick_all(grpc_pollset* pollset) { |
|
|
|
|
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { |
|
|
|
|
gpr_mu_init(&pollset->mu); |
|
|
|
|
gpr_atm_no_barrier_store(&pollset->worker_count, 0); |
|
|
|
|
gpr_atm_no_barrier_store(&pollset->active_pollable_type, PO_EMPTY); |
|
|
|
|
pollset->active_pollable = POLLABLE_REF(g_empty_pollable, "pollset"); |
|
|
|
|
pollset->kicked_without_poller = false; |
|
|
|
|
pollset->shutdown_closure = nullptr; |
|
|
|
@ -822,11 +869,11 @@ static int poll_deadline_to_millis_timeout(grpc_millis millis) { |
|
|
|
|
return static_cast<int>(delta); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void fd_become_readable(grpc_fd* fd) { fd->read_closure.SetReady(); } |
|
|
|
|
static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); } |
|
|
|
|
|
|
|
|
|
static void fd_become_writable(grpc_fd* fd) { fd->write_closure.SetReady(); } |
|
|
|
|
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } |
|
|
|
|
|
|
|
|
|
static void fd_has_errors(grpc_fd* fd) { fd->error_closure.SetReady(); } |
|
|
|
|
static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); } |
|
|
|
|
|
|
|
|
|
/* Get the pollable_obj attached to this fd. If none is attached, create a new
|
|
|
|
|
* pollable object (of type PO_FD), attach it to the fd and return it |
|
|
|
@ -1236,8 +1283,6 @@ static grpc_error* pollset_add_fd_locked(grpc_pollset* pollset, grpc_fd* fd) { |
|
|
|
|
POLLABLE_UNREF(pollset->active_pollable, "pollset"); |
|
|
|
|
pollset->active_pollable = po_at_start; |
|
|
|
|
} else { |
|
|
|
|
gpr_atm_rel_store(&pollset->active_pollable_type, |
|
|
|
|
pollset->active_pollable->type); |
|
|
|
|
POLLABLE_UNREF(po_at_start, "pollset_add_fd"); |
|
|
|
|
} |
|
|
|
|
return error; |
|
|
|
@ -1284,38 +1329,17 @@ static grpc_error* pollset_as_multipollable_locked(grpc_pollset* pollset, |
|
|
|
|
pollset->active_pollable = po_at_start; |
|
|
|
|
*pollable_obj = nullptr; |
|
|
|
|
} else { |
|
|
|
|
gpr_atm_rel_store(&pollset->active_pollable_type, |
|
|
|
|
pollset->active_pollable->type); |
|
|
|
|
*pollable_obj = POLLABLE_REF(pollset->active_pollable, "pollset_set"); |
|
|
|
|
POLLABLE_UNREF(po_at_start, "pollset_as_multipollable"); |
|
|
|
|
} |
|
|
|
|
return error; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Caller must hold the lock for `pollset->mu`.
|
|
|
|
|
static int pollset_epoll_fd_locked(grpc_pollset* pollset) { |
|
|
|
|
return pollset->active_pollable->epfd; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) { |
|
|
|
|
GPR_TIMER_SCOPE("pollset_add_fd", 0); |
|
|
|
|
|
|
|
|
|
// We never transition from PO_MULTI to other modes (i.e., PO_FD or PO_EMOPTY)
|
|
|
|
|
// and, thus, it is safe to simply store and check whether the FD has already
|
|
|
|
|
// been added to the active pollable previously.
|
|
|
|
|
if (gpr_atm_acq_load(&pollset->active_pollable_type) == PO_MULTI && |
|
|
|
|
fd_has_pollset(fd, pollset)) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_core::MutexLock lock(&pollset->mu); |
|
|
|
|
gpr_mu_lock(&pollset->mu); |
|
|
|
|
grpc_error* error = pollset_add_fd_locked(pollset, fd); |
|
|
|
|
|
|
|
|
|
// If we are in PO_MULTI mode, we should update the pollsets of the FD.
|
|
|
|
|
if (gpr_atm_no_barrier_load(&pollset->active_pollable_type) == PO_MULTI) { |
|
|
|
|
fd_add_pollset(fd, pollset); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&pollset->mu); |
|
|
|
|
GRPC_LOG_IF_ERROR("pollset_add_fd", error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|