diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 0a0891013af..d6947d00e84 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -45,6 +45,7 @@ #include "src/core/lib/gpr/spinlock.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/iomgr/block_annotate.h" @@ -78,18 +79,6 @@ typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type; typedef struct pollable pollable; -typedef struct cached_fd { - // Set to the grpc_fd's salt value. See 'salt' variable' in grpc_fd for more - // details - intptr_t salt; - - // The underlying fd - int fd; - - // A recency time counter that helps to determine the LRU fd in the cache - uint64_t last_used; -} cached_fd; - /// A pollable is something that can be polled: it has an epoll set to poll on, /// and a wakeup fd for kicks /// There are three broad types: @@ -120,33 +109,6 @@ struct pollable { int event_cursor; int event_count; struct epoll_event events[MAX_EPOLL_EVENTS]; - - // We may be calling pollable_add_fd() on the same (pollable, fd) multiple - // times. To prevent pollable_add_fd() from making multiple sys calls to - // epoll_ctl() to add the fd, we maintain a cache of what fds are already - // present in the underlying epoll-set. - // - // Since this is not a correctness issue, we do not need to maintain all the - // fds in the cache. Hence we just use an LRU cache of size 'MAX_FDS_IN_CACHE' - // - // NOTE: An ideal implementation of this should do the following: - // 1) Add fds to the cache in pollable_add_fd() function (i.e whenever the fd - // is added to the pollable's epoll set) - // 2) Remove the fd from the cache whenever the fd is removed from the - // underlying epoll set (i.e whenever fd_orphan() is called). - // - // Implementing (2) above (i.e removing fds from cache on fd_orphan) adds a - // lot of complexity since an fd can be present in multiple pollables. So our - // implementation ONLY DOES (1) and NOT (2). - // - // The cache_fd.salt variable helps here to maintain correctness (it serves as - // an epoch that differentiates one grpc_fd from the other even though both of - // them may have the same fd number) - // - // The following implements LRU-eviction cache of fds in this pollable - cached_fd fd_cache[MAX_FDS_IN_CACHE]; - int fd_cache_size; - uint64_t fd_cache_counter; // Recency timer tick counter }; static const char* pollable_type_string(pollable_type t) { @@ -189,37 +151,86 @@ static void pollable_unref(pollable* p, int line, const char* reason); * Fd Declarations */ -// Monotonically increasing Epoch counter that is assinged to each grpc_fd. See -// the description of 'salt' variable in 'grpc_fd' for more details -// TODO: (sreek/kpayson) gpr_atm is intptr_t which may not be wide-enough on -// 32-bit systems. Change this to int_64 - atleast on 32-bit systems -static gpr_atm g_fd_salt; - struct grpc_fd { - int fd; + grpc_fd(int fd, const char* name, bool track_err) + : fd(fd), track_err(track_err) { + gpr_mu_init(&orphan_mu); + gpr_mu_init(&pollable_mu); + read_closure.InitEvent(); + write_closure.InitEvent(); + error_closure.InitEvent(); + + char* fd_name; + gpr_asprintf(&fd_name, "%s fd=%d", name, fd); + grpc_iomgr_register_object(&iomgr_object, fd_name); +#ifndef NDEBUG + if (grpc_trace_fd_refcount.enabled()) { + gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, this, fd_name); + } +#endif + gpr_free(fd_name); + } + + // This is really the dtor, but the poller threads waking up from + // epoll_wait() may access the (read|write|error)_closure after destruction. + // Since the object will be added to the free pool, this behavior is + // not going to cause issues, except spurious events if the FD is reused + // while the race happens. + void destroy() { + grpc_iomgr_unregister_object(&iomgr_object); + + POLLABLE_UNREF(pollable_obj, "fd_pollable"); + pollsets.clear(); + gpr_mu_destroy(&pollable_mu); + gpr_mu_destroy(&orphan_mu); + + read_closure.DestroyEvent(); + write_closure.DestroyEvent(); + error_closure.DestroyEvent(); + + invalidate(); + } - // Since fd numbers can be reused (after old fds are closed), this serves as - // an epoch that uniquely identifies this fd (i.e the pair (salt, fd) is - // unique (until the salt counter (i.e g_fd_salt) overflows) - intptr_t salt; +#ifndef NDEBUG + /* Since an fd is never really destroyed (i.e gpr_free() is not called), it is + * hard-to-debug cases where fd fields are accessed even after calling + * fd_destroy(). The following invalidates fd fields to make catching such + * errors easier */ + void invalidate() { + fd = -1; + gpr_atm_no_barrier_store(&refst, -1); + memset(&orphan_mu, -1, sizeof(orphan_mu)); + memset(&pollable_mu, -1, sizeof(pollable_mu)); + pollable_obj = nullptr; + on_done_closure = nullptr; + memset(&iomgr_object, -1, sizeof(iomgr_object)); + track_err = false; + } +#else + void invalidate() {} +#endif + + int fd; // refst format: // bit 0 : 1=Active / 0=Orphaned // bits 1-n : refcount // Ref/Unref by two to avoid altering the orphaned bit - gpr_atm refst; + gpr_atm refst = 1; gpr_mu orphan_mu; + // Protects pollable_obj and pollsets. gpr_mu pollable_mu; - pollable* pollable_obj; + grpc_core::InlinedVector pollsets; // Used in PO_MULTI. + pollable* pollable_obj = nullptr; // Used in PO_FD. - grpc_core::ManualConstructor read_closure; - grpc_core::ManualConstructor write_closure; - grpc_core::ManualConstructor error_closure; + grpc_core::LockfreeEvent read_closure; + grpc_core::LockfreeEvent write_closure; + grpc_core::LockfreeEvent error_closure; - struct grpc_fd* freelist_next; - grpc_closure* on_done_closure; + struct grpc_fd* freelist_next = nullptr; + grpc_closure* on_done_closure = nullptr; grpc_iomgr_object iomgr_object; @@ -258,6 +269,7 @@ struct grpc_pollset_worker { struct grpc_pollset { gpr_mu mu; gpr_atm worker_count; + gpr_atm active_pollable_type; pollable* active_pollable; bool kicked_without_poller; grpc_closure* shutdown_closure; @@ -337,39 +349,10 @@ static void ref_by(grpc_fd* fd, int n) { GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0); } -#ifndef NDEBUG -#define INVALIDATE_FD(fd) invalidate_fd(fd) -/* Since an fd is never really destroyed (i.e gpr_free() is not called), it is - * hard to cases where fd fields are accessed even after calling fd_destroy(). - * The following invalidates fd fields to make catching such errors easier */ -static void invalidate_fd(grpc_fd* fd) { - fd->fd = -1; - fd->salt = -1; - gpr_atm_no_barrier_store(&fd->refst, -1); - memset(&fd->orphan_mu, -1, sizeof(fd->orphan_mu)); - memset(&fd->pollable_mu, -1, sizeof(fd->pollable_mu)); - fd->pollable_obj = nullptr; - fd->on_done_closure = nullptr; - memset(&fd->iomgr_object, -1, sizeof(fd->iomgr_object)); - fd->track_err = false; -} -#else -#define INVALIDATE_FD(fd) -#endif - /* Uninitialize and add to the freelist */ static void fd_destroy(void* arg, grpc_error* error) { grpc_fd* fd = static_cast(arg); - grpc_iomgr_unregister_object(&fd->iomgr_object); - POLLABLE_UNREF(fd->pollable_obj, "fd_pollable"); - gpr_mu_destroy(&fd->pollable_mu); - gpr_mu_destroy(&fd->orphan_mu); - - fd->read_closure->DestroyEvent(); - fd->write_closure->DestroyEvent(); - fd->error_closure->DestroyEvent(); - - INVALIDATE_FD(fd); + fd->destroy(); /* Add the fd to the freelist */ gpr_mu_lock(&fd_freelist_mu); @@ -429,35 +412,9 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) { if (new_fd == nullptr) { new_fd = static_cast(gpr_malloc(sizeof(grpc_fd))); - new_fd->read_closure.Init(); - new_fd->write_closure.Init(); - new_fd->error_closure.Init(); - } - - new_fd->fd = fd; - new_fd->salt = gpr_atm_no_barrier_fetch_add(&g_fd_salt, 1); - gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); - gpr_mu_init(&new_fd->orphan_mu); - gpr_mu_init(&new_fd->pollable_mu); - new_fd->pollable_obj = nullptr; - new_fd->read_closure->InitEvent(); - new_fd->write_closure->InitEvent(); - new_fd->error_closure->InitEvent(); - new_fd->freelist_next = nullptr; - new_fd->on_done_closure = nullptr; - - char* fd_name; - gpr_asprintf(&fd_name, "%s fd=%d", name, fd); - grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name); -#ifndef NDEBUG - if (grpc_trace_fd_refcount.enabled()) { - gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name); } -#endif - gpr_free(fd_name); - new_fd->track_err = track_err; - return new_fd; + return new (new_fd) grpc_fd(fd, name, track_err); } static int fd_wrapped_fd(grpc_fd* fd) { @@ -465,6 +422,7 @@ static int fd_wrapped_fd(grpc_fd* fd) { return (gpr_atm_acq_load(&fd->refst) & 1) ? ret_fd : -1; } +static int pollset_epoll_fd_locked(grpc_pollset* pollset); static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, const char* reason) { bool is_fd_closed = false; @@ -475,7 +433,6 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, // true so that the pollable will no longer access its owner_fd field. gpr_mu_lock(&fd->pollable_mu); pollable* pollable_obj = fd->pollable_obj; - gpr_mu_unlock(&fd->pollable_mu); if (pollable_obj) { gpr_mu_lock(&pollable_obj->owner_orphan_mu); @@ -487,6 +444,20 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, /* If release_fd is not NULL, we should be relinquishing control of the file descriptor fd->fd (but we still own the grpc_fd structure). */ if (release_fd != nullptr) { + // Remove the FD from all epolls sets, before releasing it. + // Otherwise, we will receive epoll events after we release the FD. + epoll_event ev_fd; + memset(&ev_fd, 0, sizeof(ev_fd)); + if (release_fd != nullptr) { + if (pollable_obj != nullptr) { // For PO_FD. + epoll_ctl(pollable_obj->epfd, EPOLL_CTL_DEL, fd->fd, &ev_fd); + } + for (size_t i = 0; i < fd->pollsets.size(); ++i) { // For PO_MULTI. + grpc_pollset* pollset = fd->pollsets[i]; + const int epfd = pollset_epoll_fd_locked(pollset); + epoll_ctl(epfd, EPOLL_CTL_DEL, fd->fd, &ev_fd); + } + } *release_fd = fd->fd; } else { close(fd->fd); @@ -508,40 +479,56 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, gpr_mu_unlock(&pollable_obj->owner_orphan_mu); } + gpr_mu_unlock(&fd->pollable_mu); gpr_mu_unlock(&fd->orphan_mu); UNREF_BY(fd, 2, reason); /* Drop the reference */ } static bool fd_is_shutdown(grpc_fd* fd) { - return fd->read_closure->IsShutdown(); + return fd->read_closure.IsShutdown(); } /* Might be called multiple times */ static void fd_shutdown(grpc_fd* fd, grpc_error* why) { - if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) { + if (fd->read_closure.SetShutdown(GRPC_ERROR_REF(why))) { if (shutdown(fd->fd, SHUT_RDWR)) { if (errno != ENOTCONN) { gpr_log(GPR_ERROR, "Error shutting down fd %d. errno: %d", grpc_fd_wrapped_fd(fd), errno); } } - fd->write_closure->SetShutdown(GRPC_ERROR_REF(why)); - fd->error_closure->SetShutdown(GRPC_ERROR_REF(why)); + fd->write_closure.SetShutdown(GRPC_ERROR_REF(why)); + fd->error_closure.SetShutdown(GRPC_ERROR_REF(why)); } GRPC_ERROR_UNREF(why); } static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) { - fd->read_closure->NotifyOn(closure); + fd->read_closure.NotifyOn(closure); } static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { - fd->write_closure->NotifyOn(closure); + fd->write_closure.NotifyOn(closure); } static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { - fd->error_closure->NotifyOn(closure); + fd->error_closure.NotifyOn(closure); +} + +static bool fd_has_pollset(grpc_fd* fd, grpc_pollset* pollset) { + grpc_core::MutexLock lock(&fd->pollable_mu); + for (size_t i = 0; i < fd->pollsets.size(); ++i) { + if (fd->pollsets[i] == pollset) { + return true; + } + } + return false; +} + +static void fd_add_pollset(grpc_fd* fd, grpc_pollset* pollset) { + grpc_core::MutexLock lock(&fd->pollable_mu); + fd->pollsets.push_back(pollset); } /******************************************************************************* @@ -594,8 +581,6 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) { (*p)->root_worker = nullptr; (*p)->event_cursor = 0; (*p)->event_count = 0; - (*p)->fd_cache_size = 0; - (*p)->fd_cache_counter = 0; return GRPC_ERROR_NONE; } @@ -637,39 +622,6 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { grpc_error* error = GRPC_ERROR_NONE; static const char* err_desc = "pollable_add_fd"; const int epfd = p->epfd; - gpr_mu_lock(&p->mu); - p->fd_cache_counter++; - - // Handle the case of overflow for our cache counter by - // reseting the recency-counter on all cache objects - if (p->fd_cache_counter == 0) { - for (int i = 0; i < p->fd_cache_size; i++) { - p->fd_cache[i].last_used = 0; - } - } - - int lru_idx = 0; - for (int i = 0; i < p->fd_cache_size; i++) { - if (p->fd_cache[i].fd == fd->fd && p->fd_cache[i].salt == fd->salt) { - GRPC_STATS_INC_POLLSET_FD_CACHE_HITS(); - p->fd_cache[i].last_used = p->fd_cache_counter; - gpr_mu_unlock(&p->mu); - return GRPC_ERROR_NONE; - } else if (p->fd_cache[i].last_used < p->fd_cache[lru_idx].last_used) { - lru_idx = i; - } - } - - // Add to cache - if (p->fd_cache_size < MAX_FDS_IN_CACHE) { - lru_idx = p->fd_cache_size; - p->fd_cache_size++; - } - p->fd_cache[lru_idx].fd = fd->fd; - p->fd_cache[lru_idx].salt = fd->salt; - p->fd_cache[lru_idx].last_used = p->fd_cache_counter; - gpr_mu_unlock(&p->mu); - if (grpc_polling_trace.enabled()) { gpr_log(GPR_INFO, "add fd %p (%d) to pollable %p", fd, fd->fd, p); } @@ -849,6 +801,7 @@ static grpc_error* pollset_kick_all(grpc_pollset* pollset) { static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { gpr_mu_init(&pollset->mu); gpr_atm_no_barrier_store(&pollset->worker_count, 0); + gpr_atm_no_barrier_store(&pollset->active_pollable_type, PO_EMPTY); pollset->active_pollable = POLLABLE_REF(g_empty_pollable, "pollset"); pollset->kicked_without_poller = false; pollset->shutdown_closure = nullptr; @@ -869,11 +822,11 @@ static int poll_deadline_to_millis_timeout(grpc_millis millis) { return static_cast(delta); } -static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); } +static void fd_become_readable(grpc_fd* fd) { fd->read_closure.SetReady(); } -static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } +static void fd_become_writable(grpc_fd* fd) { fd->write_closure.SetReady(); } -static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); } +static void fd_has_errors(grpc_fd* fd) { fd->error_closure.SetReady(); } /* Get the pollable_obj attached to this fd. If none is attached, create a new * pollable object (of type PO_FD), attach it to the fd and return it @@ -1283,6 +1236,8 @@ static grpc_error* pollset_add_fd_locked(grpc_pollset* pollset, grpc_fd* fd) { POLLABLE_UNREF(pollset->active_pollable, "pollset"); pollset->active_pollable = po_at_start; } else { + gpr_atm_rel_store(&pollset->active_pollable_type, + pollset->active_pollable->type); POLLABLE_UNREF(po_at_start, "pollset_add_fd"); } return error; @@ -1329,17 +1284,38 @@ static grpc_error* pollset_as_multipollable_locked(grpc_pollset* pollset, pollset->active_pollable = po_at_start; *pollable_obj = nullptr; } else { + gpr_atm_rel_store(&pollset->active_pollable_type, + pollset->active_pollable->type); *pollable_obj = POLLABLE_REF(pollset->active_pollable, "pollset_set"); POLLABLE_UNREF(po_at_start, "pollset_as_multipollable"); } return error; } +// Caller must hold the lock for `pollset->mu`. +static int pollset_epoll_fd_locked(grpc_pollset* pollset) { + return pollset->active_pollable->epfd; +} + static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) { GPR_TIMER_SCOPE("pollset_add_fd", 0); - gpr_mu_lock(&pollset->mu); + + // We never transition from PO_MULTI to other modes (i.e., PO_FD or PO_EMOPTY) + // and, thus, it is safe to simply store and check whether the FD has already + // been added to the active pollable previously. + if (gpr_atm_acq_load(&pollset->active_pollable_type) == PO_MULTI && + fd_has_pollset(fd, pollset)) { + return; + } + + grpc_core::MutexLock lock(&pollset->mu); grpc_error* error = pollset_add_fd_locked(pollset, fd); - gpr_mu_unlock(&pollset->mu); + + // If we are in PO_MULTI mode, we should update the pollsets of the FD. + if (gpr_atm_no_barrier_load(&pollset->active_pollable_type) == PO_MULTI) { + fd_add_pollset(fd, pollset); + } + GRPC_LOG_IF_ERROR("pollset_add_fd", error); }