From 75dec4d0f22eb0dbd51bb584ca624f861744d218 Mon Sep 17 00:00:00 2001 From: Soheil Hassas Yeganeh Date: Thu, 7 Feb 2019 11:15:07 -0500 Subject: [PATCH] Track the pollsets of an FD in PO_MULTI mode for pollex. Each pollset in pollex has a lock, grabbed upon adding an FD to the pollset. Since this is called on a per-call basis, there is a flat array caching the FDs of the pollset, to avoid unnecessarily calling epoll_ctl multiple times for the same FD. This has two problems: 1) When multiple threads add FDs to the same pollset, we will have contention on the pollset lock. 2) When we have many FDs we simply run out of cache storage, and call epoll_ctl(). This commit changes the caching strategy by simply storing the epfd of pollsets of an FD inside that FD, when we are in PO_MULTI mode. This results in address in both (1) and (2). Moreover, this commit fixes another performance bug. When we have a release FD callback, we do not call close(). That FD will remain in our epollset, until the new owner of the FD actually call close(). This results in a lot of spurious wake ups when we simply hand off gRPC FDs to other FDs. Note that this is a revision on the reverted commit e83e463b5a14cf0de5d8c9e1197d06f925160111 (PR #17823). The main change is to track the epfd of the pollset instead of the pollset pointer, so that if the pollset is deleted we can still access the FD. It also halves the size of the cache vector for 64-bit machines. --- src/core/lib/iomgr/ev_epollex_linux.cc | 295 +++++++++++-------------- 1 file changed, 133 insertions(+), 162 deletions(-) diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 0a0891013af..b6d13b44d12 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -45,6 +45,7 @@ #include "src/core/lib/gpr/spinlock.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/iomgr/block_annotate.h" @@ -78,18 +79,6 @@ typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type; typedef struct pollable pollable; -typedef struct cached_fd { - // Set to the grpc_fd's salt value. See 'salt' variable' in grpc_fd for more - // details - intptr_t salt; - - // The underlying fd - int fd; - - // A recency time counter that helps to determine the LRU fd in the cache - uint64_t last_used; -} cached_fd; - /// A pollable is something that can be polled: it has an epoll set to poll on, /// and a wakeup fd for kicks /// There are three broad types: @@ -120,33 +109,6 @@ struct pollable { int event_cursor; int event_count; struct epoll_event events[MAX_EPOLL_EVENTS]; - - // We may be calling pollable_add_fd() on the same (pollable, fd) multiple - // times. To prevent pollable_add_fd() from making multiple sys calls to - // epoll_ctl() to add the fd, we maintain a cache of what fds are already - // present in the underlying epoll-set. - // - // Since this is not a correctness issue, we do not need to maintain all the - // fds in the cache. Hence we just use an LRU cache of size 'MAX_FDS_IN_CACHE' - // - // NOTE: An ideal implementation of this should do the following: - // 1) Add fds to the cache in pollable_add_fd() function (i.e whenever the fd - // is added to the pollable's epoll set) - // 2) Remove the fd from the cache whenever the fd is removed from the - // underlying epoll set (i.e whenever fd_orphan() is called). - // - // Implementing (2) above (i.e removing fds from cache on fd_orphan) adds a - // lot of complexity since an fd can be present in multiple pollables. So our - // implementation ONLY DOES (1) and NOT (2). - // - // The cache_fd.salt variable helps here to maintain correctness (it serves as - // an epoch that differentiates one grpc_fd from the other even though both of - // them may have the same fd number) - // - // The following implements LRU-eviction cache of fds in this pollable - cached_fd fd_cache[MAX_FDS_IN_CACHE]; - int fd_cache_size; - uint64_t fd_cache_counter; // Recency timer tick counter }; static const char* pollable_type_string(pollable_type t) { @@ -189,37 +151,86 @@ static void pollable_unref(pollable* p, int line, const char* reason); * Fd Declarations */ -// Monotonically increasing Epoch counter that is assinged to each grpc_fd. See -// the description of 'salt' variable in 'grpc_fd' for more details -// TODO: (sreek/kpayson) gpr_atm is intptr_t which may not be wide-enough on -// 32-bit systems. Change this to int_64 - atleast on 32-bit systems -static gpr_atm g_fd_salt; - struct grpc_fd { - int fd; + grpc_fd(int fd, const char* name, bool track_err) + : fd(fd), track_err(track_err) { + gpr_mu_init(&orphan_mu); + gpr_mu_init(&pollable_mu); + read_closure.InitEvent(); + write_closure.InitEvent(); + error_closure.InitEvent(); + + char* fd_name; + gpr_asprintf(&fd_name, "%s fd=%d", name, fd); + grpc_iomgr_register_object(&iomgr_object, fd_name); +#ifndef NDEBUG + if (grpc_trace_fd_refcount.enabled()) { + gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, this, fd_name); + } +#endif + gpr_free(fd_name); + } + + // This is really the dtor, but the poller threads waking up from + // epoll_wait() may access the (read|write|error)_closure after destruction. + // Since the object will be added to the free pool, this behavior is + // not going to cause issues, except spurious events if the FD is reused + // while the race happens. + void destroy() { + grpc_iomgr_unregister_object(&iomgr_object); - // Since fd numbers can be reused (after old fds are closed), this serves as - // an epoch that uniquely identifies this fd (i.e the pair (salt, fd) is - // unique (until the salt counter (i.e g_fd_salt) overflows) - intptr_t salt; + POLLABLE_UNREF(pollable_obj, "fd_pollable"); + pollset_fds.clear(); + gpr_mu_destroy(&pollable_mu); + gpr_mu_destroy(&orphan_mu); + + read_closure.DestroyEvent(); + write_closure.DestroyEvent(); + error_closure.DestroyEvent(); + + invalidate(); + } + +#ifndef NDEBUG + /* Since an fd is never really destroyed (i.e gpr_free() is not called), it is + * hard-to-debug cases where fd fields are accessed even after calling + * fd_destroy(). The following invalidates fd fields to make catching such + * errors easier */ + void invalidate() { + fd = -1; + gpr_atm_no_barrier_store(&refst, -1); + memset(&orphan_mu, -1, sizeof(orphan_mu)); + memset(&pollable_mu, -1, sizeof(pollable_mu)); + pollable_obj = nullptr; + on_done_closure = nullptr; + memset(&iomgr_object, -1, sizeof(iomgr_object)); + track_err = false; + } +#else + void invalidate() {} +#endif + + int fd; // refst format: // bit 0 : 1=Active / 0=Orphaned // bits 1-n : refcount // Ref/Unref by two to avoid altering the orphaned bit - gpr_atm refst; + gpr_atm refst = 1; gpr_mu orphan_mu; + // Protects pollable_obj and pollset_fds. gpr_mu pollable_mu; - pollable* pollable_obj; + grpc_core::InlinedVector pollset_fds; // Used in PO_MULTI. + pollable* pollable_obj = nullptr; // Used in PO_FD. - grpc_core::ManualConstructor read_closure; - grpc_core::ManualConstructor write_closure; - grpc_core::ManualConstructor error_closure; + grpc_core::LockfreeEvent read_closure; + grpc_core::LockfreeEvent write_closure; + grpc_core::LockfreeEvent error_closure; - struct grpc_fd* freelist_next; - grpc_closure* on_done_closure; + struct grpc_fd* freelist_next = nullptr; + grpc_closure* on_done_closure = nullptr; grpc_iomgr_object iomgr_object; @@ -258,6 +269,7 @@ struct grpc_pollset_worker { struct grpc_pollset { gpr_mu mu; gpr_atm worker_count; + gpr_atm active_pollable_type; pollable* active_pollable; bool kicked_without_poller; grpc_closure* shutdown_closure; @@ -337,39 +349,10 @@ static void ref_by(grpc_fd* fd, int n) { GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0); } -#ifndef NDEBUG -#define INVALIDATE_FD(fd) invalidate_fd(fd) -/* Since an fd is never really destroyed (i.e gpr_free() is not called), it is - * hard to cases where fd fields are accessed even after calling fd_destroy(). - * The following invalidates fd fields to make catching such errors easier */ -static void invalidate_fd(grpc_fd* fd) { - fd->fd = -1; - fd->salt = -1; - gpr_atm_no_barrier_store(&fd->refst, -1); - memset(&fd->orphan_mu, -1, sizeof(fd->orphan_mu)); - memset(&fd->pollable_mu, -1, sizeof(fd->pollable_mu)); - fd->pollable_obj = nullptr; - fd->on_done_closure = nullptr; - memset(&fd->iomgr_object, -1, sizeof(fd->iomgr_object)); - fd->track_err = false; -} -#else -#define INVALIDATE_FD(fd) -#endif - /* Uninitialize and add to the freelist */ static void fd_destroy(void* arg, grpc_error* error) { grpc_fd* fd = static_cast(arg); - grpc_iomgr_unregister_object(&fd->iomgr_object); - POLLABLE_UNREF(fd->pollable_obj, "fd_pollable"); - gpr_mu_destroy(&fd->pollable_mu); - gpr_mu_destroy(&fd->orphan_mu); - - fd->read_closure->DestroyEvent(); - fd->write_closure->DestroyEvent(); - fd->error_closure->DestroyEvent(); - - INVALIDATE_FD(fd); + fd->destroy(); /* Add the fd to the freelist */ gpr_mu_lock(&fd_freelist_mu); @@ -429,35 +412,9 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) { if (new_fd == nullptr) { new_fd = static_cast(gpr_malloc(sizeof(grpc_fd))); - new_fd->read_closure.Init(); - new_fd->write_closure.Init(); - new_fd->error_closure.Init(); - } - - new_fd->fd = fd; - new_fd->salt = gpr_atm_no_barrier_fetch_add(&g_fd_salt, 1); - gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); - gpr_mu_init(&new_fd->orphan_mu); - gpr_mu_init(&new_fd->pollable_mu); - new_fd->pollable_obj = nullptr; - new_fd->read_closure->InitEvent(); - new_fd->write_closure->InitEvent(); - new_fd->error_closure->InitEvent(); - new_fd->freelist_next = nullptr; - new_fd->on_done_closure = nullptr; - - char* fd_name; - gpr_asprintf(&fd_name, "%s fd=%d", name, fd); - grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name); -#ifndef NDEBUG - if (grpc_trace_fd_refcount.enabled()) { - gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name); } -#endif - gpr_free(fd_name); - new_fd->track_err = track_err; - return new_fd; + return new (new_fd) grpc_fd(fd, name, track_err); } static int fd_wrapped_fd(grpc_fd* fd) { @@ -475,7 +432,6 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, // true so that the pollable will no longer access its owner_fd field. gpr_mu_lock(&fd->pollable_mu); pollable* pollable_obj = fd->pollable_obj; - gpr_mu_unlock(&fd->pollable_mu); if (pollable_obj) { gpr_mu_lock(&pollable_obj->owner_orphan_mu); @@ -487,6 +443,19 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, /* If release_fd is not NULL, we should be relinquishing control of the file descriptor fd->fd (but we still own the grpc_fd structure). */ if (release_fd != nullptr) { + // Remove the FD from all epolls sets, before releasing it. + // Otherwise, we will receive epoll events after we release the FD. + epoll_event ev_fd; + memset(&ev_fd, 0, sizeof(ev_fd)); + if (release_fd != nullptr) { + if (pollable_obj != nullptr) { // For PO_FD. + epoll_ctl(pollable_obj->epfd, EPOLL_CTL_DEL, fd->fd, &ev_fd); + } + for (size_t i = 0; i < fd->pollset_fds.size(); ++i) { // For PO_MULTI. + const int epfd = fd->pollset_fds[i]; + epoll_ctl(epfd, EPOLL_CTL_DEL, fd->fd, &ev_fd); + } + } *release_fd = fd->fd; } else { close(fd->fd); @@ -508,40 +477,58 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, gpr_mu_unlock(&pollable_obj->owner_orphan_mu); } + gpr_mu_unlock(&fd->pollable_mu); gpr_mu_unlock(&fd->orphan_mu); UNREF_BY(fd, 2, reason); /* Drop the reference */ } static bool fd_is_shutdown(grpc_fd* fd) { - return fd->read_closure->IsShutdown(); + return fd->read_closure.IsShutdown(); } /* Might be called multiple times */ static void fd_shutdown(grpc_fd* fd, grpc_error* why) { - if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) { + if (fd->read_closure.SetShutdown(GRPC_ERROR_REF(why))) { if (shutdown(fd->fd, SHUT_RDWR)) { if (errno != ENOTCONN) { gpr_log(GPR_ERROR, "Error shutting down fd %d. errno: %d", grpc_fd_wrapped_fd(fd), errno); } } - fd->write_closure->SetShutdown(GRPC_ERROR_REF(why)); - fd->error_closure->SetShutdown(GRPC_ERROR_REF(why)); + fd->write_closure.SetShutdown(GRPC_ERROR_REF(why)); + fd->error_closure.SetShutdown(GRPC_ERROR_REF(why)); } GRPC_ERROR_UNREF(why); } static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) { - fd->read_closure->NotifyOn(closure); + fd->read_closure.NotifyOn(closure); } static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { - fd->write_closure->NotifyOn(closure); + fd->write_closure.NotifyOn(closure); } static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { - fd->error_closure->NotifyOn(closure); + fd->error_closure.NotifyOn(closure); +} + +static bool fd_has_pollset(grpc_fd* fd, grpc_pollset* pollset) { + const int epfd = pollset->active_pollable->epfd; + grpc_core::MutexLock lock(&fd->pollable_mu); + for (size_t i = 0; i < fd->pollset_fds.size(); ++i) { + if (fd->pollset_fds[i] == epfd) { + return true; + } + } + return false; +} + +static void fd_add_pollset(grpc_fd* fd, grpc_pollset* pollset) { + const int epfd = pollset->active_pollable->epfd; + grpc_core::MutexLock lock(&fd->pollable_mu); + fd->pollset_fds.push_back(epfd); } /******************************************************************************* @@ -594,8 +581,6 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) { (*p)->root_worker = nullptr; (*p)->event_cursor = 0; (*p)->event_count = 0; - (*p)->fd_cache_size = 0; - (*p)->fd_cache_counter = 0; return GRPC_ERROR_NONE; } @@ -637,39 +622,6 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { grpc_error* error = GRPC_ERROR_NONE; static const char* err_desc = "pollable_add_fd"; const int epfd = p->epfd; - gpr_mu_lock(&p->mu); - p->fd_cache_counter++; - - // Handle the case of overflow for our cache counter by - // reseting the recency-counter on all cache objects - if (p->fd_cache_counter == 0) { - for (int i = 0; i < p->fd_cache_size; i++) { - p->fd_cache[i].last_used = 0; - } - } - - int lru_idx = 0; - for (int i = 0; i < p->fd_cache_size; i++) { - if (p->fd_cache[i].fd == fd->fd && p->fd_cache[i].salt == fd->salt) { - GRPC_STATS_INC_POLLSET_FD_CACHE_HITS(); - p->fd_cache[i].last_used = p->fd_cache_counter; - gpr_mu_unlock(&p->mu); - return GRPC_ERROR_NONE; - } else if (p->fd_cache[i].last_used < p->fd_cache[lru_idx].last_used) { - lru_idx = i; - } - } - - // Add to cache - if (p->fd_cache_size < MAX_FDS_IN_CACHE) { - lru_idx = p->fd_cache_size; - p->fd_cache_size++; - } - p->fd_cache[lru_idx].fd = fd->fd; - p->fd_cache[lru_idx].salt = fd->salt; - p->fd_cache[lru_idx].last_used = p->fd_cache_counter; - gpr_mu_unlock(&p->mu); - if (grpc_polling_trace.enabled()) { gpr_log(GPR_INFO, "add fd %p (%d) to pollable %p", fd, fd->fd, p); } @@ -849,6 +801,7 @@ static grpc_error* pollset_kick_all(grpc_pollset* pollset) { static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { gpr_mu_init(&pollset->mu); gpr_atm_no_barrier_store(&pollset->worker_count, 0); + gpr_atm_no_barrier_store(&pollset->active_pollable_type, PO_EMPTY); pollset->active_pollable = POLLABLE_REF(g_empty_pollable, "pollset"); pollset->kicked_without_poller = false; pollset->shutdown_closure = nullptr; @@ -869,11 +822,11 @@ static int poll_deadline_to_millis_timeout(grpc_millis millis) { return static_cast(delta); } -static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); } +static void fd_become_readable(grpc_fd* fd) { fd->read_closure.SetReady(); } -static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } +static void fd_become_writable(grpc_fd* fd) { fd->write_closure.SetReady(); } -static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); } +static void fd_has_errors(grpc_fd* fd) { fd->error_closure.SetReady(); } /* Get the pollable_obj attached to this fd. If none is attached, create a new * pollable object (of type PO_FD), attach it to the fd and return it @@ -1283,6 +1236,8 @@ static grpc_error* pollset_add_fd_locked(grpc_pollset* pollset, grpc_fd* fd) { POLLABLE_UNREF(pollset->active_pollable, "pollset"); pollset->active_pollable = po_at_start; } else { + gpr_atm_rel_store(&pollset->active_pollable_type, + pollset->active_pollable->type); POLLABLE_UNREF(po_at_start, "pollset_add_fd"); } return error; @@ -1329,6 +1284,8 @@ static grpc_error* pollset_as_multipollable_locked(grpc_pollset* pollset, pollset->active_pollable = po_at_start; *pollable_obj = nullptr; } else { + gpr_atm_rel_store(&pollset->active_pollable_type, + pollset->active_pollable->type); *pollable_obj = POLLABLE_REF(pollset->active_pollable, "pollset_set"); POLLABLE_UNREF(po_at_start, "pollset_as_multipollable"); } @@ -1337,9 +1294,23 @@ static grpc_error* pollset_as_multipollable_locked(grpc_pollset* pollset, static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) { GPR_TIMER_SCOPE("pollset_add_fd", 0); - gpr_mu_lock(&pollset->mu); + + // We never transition from PO_MULTI to other modes (i.e., PO_FD or PO_EMOPTY) + // and, thus, it is safe to simply store and check whether the FD has already + // been added to the active pollable previously. + if (gpr_atm_acq_load(&pollset->active_pollable_type) == PO_MULTI && + fd_has_pollset(fd, pollset)) { + return; + } + + grpc_core::MutexLock lock(&pollset->mu); grpc_error* error = pollset_add_fd_locked(pollset, fd); - gpr_mu_unlock(&pollset->mu); + + // If we are in PO_MULTI mode, we should update the pollsets of the FD. + if (gpr_atm_no_barrier_load(&pollset->active_pollable_type) == PO_MULTI) { + fd_add_pollset(fd, pollset); + } + GRPC_LOG_IF_ERROR("pollset_add_fd", error); }