From e24b24d3c646c40248ea4581f3c5b03597797544 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 6 Apr 2017 16:05:45 -0700 Subject: [PATCH] Implement pollset for epollex --- src/core/lib/iomgr/ev_epoll_linux.c | 2 + src/core/lib/iomgr/ev_epollex_linux.c | 279 +++++++++++++++++++++++--- src/core/lib/iomgr/ev_poll_posix.c | 2 + src/core/lib/iomgr/pollset.h | 5 +- src/core/lib/iomgr/pollset_windows.c | 2 + 5 files changed, 254 insertions(+), 36 deletions(-) diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 6db8e1a77cb..d41c164d71f 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -63,6 +63,8 @@ #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" +#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) + /* TODO: sreek - Move this to init.c and initialize this like other tracers. */ static int grpc_polling_trace = 0; /* Disabled by default */ #define GRPC_POLLING_TRACE(fmt, ...) \ diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index 71e8b7e4fb7..0985755a43b 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -62,8 +62,9 @@ #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" -/* Uncomment the following to enable extra checks on poll_object operations */ -/* #define PO_DEBUG */ +#ifndef EPOLLEXCLUSIVE +#define EPOLLEXCLUSIVE (1u << 28) +#endif /* TODO: sreek: Right now, this wakes up all pollers. In future we should make * sure to wake up one polling thread (which can wake up other threads if @@ -85,6 +86,8 @@ struct grpc_fd { Ref/Unref by two to avoid altering the orphaned bit */ gpr_atm refst; + grpc_wakeup_fd workqueue_wakeup_fd; + /* The fd is either closed or we relinquished control of it. In either cases, this indicates that the 'fd' on this structure is no longer valid */ @@ -131,16 +134,22 @@ static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = { * Pollset Declarations */ struct grpc_pollset_worker { - /* Thread id of this worker */ - pthread_t pt_id; - - /* Used to prevent a worker from getting kicked multiple times */ - gpr_atm is_kicked; - struct grpc_pollset_worker *next; - struct grpc_pollset_worker *prev; + bool kicked; + bool initialized_cv; + gpr_cv cv; + grpc_pollset_worker *next; + grpc_pollset_worker *prev; }; -struct grpc_pollset {}; +struct grpc_pollset { + gpr_mu mu; + int epfd; + int num_pollers; + gpr_atm shutdown_atm; + grpc_closure *shutdown_closure; + grpc_wakeup_fd pollset_wakeup; + grpc_pollset_worker *root_worker; +}; /******************************************************************************* * Pollset-set Declarations @@ -151,6 +160,16 @@ struct grpc_pollset_set {}; * Common helpers */ +static bool append_error(grpc_error **composite, grpc_error *error, + const char *desc) { + if (error == GRPC_ERROR_NONE) return true; + if (*composite == GRPC_ERROR_NONE) { + *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc); + } + *composite = grpc_error_add_child(*composite, error); + return false; +} + #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, @@ -400,13 +419,10 @@ static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { abort(); } GPR_TLS_DECL(g_current_thread_pollset); GPR_TLS_DECL(g_current_thread_worker); -static void poller_kick_init() {} - /* Global state management */ static grpc_error *pollset_global_init(void) { gpr_tls_init(&g_current_thread_pollset); gpr_tls_init(&g_current_thread_worker); - poller_kick_init(); return grpc_wakeup_fd_init(&global_wakeup_fd); } @@ -419,12 +435,41 @@ static void pollset_global_shutdown(void) { /* p->mu must be held before calling this function */ static grpc_error *pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { - abort(); + if (specific_worker == NULL) { + if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) { + return grpc_wakeup_fd_wakeup(&p->pollset_wakeup); + } else { + return GRPC_ERROR_NONE; + } + } else if (gpr_tls_get(&g_current_thread_worker) == + (intptr_t)specific_worker) { + return GRPC_ERROR_NONE; + } else if (specific_worker == p->root_worker) { + return grpc_wakeup_fd_wakeup(&p->pollset_wakeup); + } else { + gpr_cv_signal(&specific_worker->cv); + return GRPC_ERROR_NONE; + } } -static grpc_error *kick_poller(void) { abort(); } +static grpc_error *kick_poller(void) { + return grpc_wakeup_fd_wakeup(&global_wakeup_fd); +} -static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {} +static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { + gpr_mu_init(&pollset->mu); + pollset->epfd = epoll_create1(EPOLL_CLOEXEC); + if (pollset->epfd < 0) { + GRPC_LOG_IF_ERROR("pollset_init", GRPC_OS_ERROR(errno, "epoll_create1")); + } + pollset->num_pollers = 0; + gpr_atm_no_barrier_store(&pollset->shutdown_atm, 0); + pollset->shutdown_closure = NULL; + GRPC_LOG_IF_ERROR("pollset_init", + grpc_wakeup_fd_init(&pollset->pollset_wakeup)); + pollset->root_worker = NULL; + *mu = &pollset->mu; +} /* Convert a timespec to milliseconds: - Very small or negative poll times are clamped to zero to do a non-blocking @@ -469,33 +514,186 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { grpc_lfev_set_ready(exec_ctx, &fd->write_closure); } -static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset) { - abort(); -} - /* pollset->po.mu lock must be held by the caller before calling this */ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_closure *closure) {} + grpc_closure *closure) { + GPR_ASSERT(pollset->shutdown_closure == NULL); + pollset->shutdown_closure = closure; + if (pollset->num_pollers > 0) { + struct epoll_event ev = {.events = EPOLLIN, + .data.ptr = &pollset->pollset_wakeup}; + epoll_ctl(pollset->epfd, EPOLL_CTL_MOD, pollset->pollset_wakeup.read_fd, + &ev); + GRPC_LOG_IF_ERROR("pollset_shutdown", + grpc_wakeup_fd_wakeup(&pollset->pollset_wakeup)); + } else { + grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); + } +} + +/* pollset_shutdown is guaranteed to be called before pollset_destroy. */ +static void pollset_destroy(grpc_pollset *pollset) { + gpr_mu_destroy(&pollset->mu); + if (pollset->epfd >= 0) close(pollset->epfd); + grpc_wakeup_fd_destroy(&pollset->pollset_wakeup); +} -/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other - * than destroying the mutexes, there is nothing special that needs to be done - * here */ -static void pollset_destroy(grpc_pollset *pollset) {} +#define MAX_EPOLL_EVENTS 100 -/* pollset->po.mu lock must be held by the caller before calling this. - The function pollset_work() may temporarily release the lock (pollset->po.mu) +static grpc_error *pollset_poll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + gpr_timespec now, gpr_timespec deadline) { + struct epoll_event events[MAX_EPOLL_EVENTS]; + static const char *err_desc = "pollset_poll"; + + if (pollset->epfd < 0) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "epoll fd failed to initialize"); + } + + int r = epoll_wait(pollset->epfd, events, MAX_EPOLL_EVENTS, + poll_deadline_to_millis_timeout(deadline, now)); + if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); + + grpc_error *error = GRPC_ERROR_NONE; + for (int i = 0; i < r; i++) { + void *data_ptr = events[i].data.ptr; + if (data_ptr == &global_wakeup_fd) { + grpc_timer_consume_kick(); + append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), + err_desc); + } else if (data_ptr == &pollset->pollset_wakeup) { + /* once we start shutting down we stop consuming the wakeup: + the fd is level triggered and non-exclusive, which should result in all + pollers waking */ + if (gpr_atm_no_barrier_load(&pollset->shutdown_atm) == 0) { + append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), + err_desc); + } + } else { + grpc_fd *fd = (grpc_fd *)(((intptr_t)data_ptr) & ~(intptr_t)1); + bool is_workqueue = (((intptr_t)data_ptr) & 1) != 0; + bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0; + bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0; + bool write_ev = (events[i].events & EPOLLOUT) != 0; + if (is_workqueue) { + append_error(&error, + grpc_wakeup_fd_consume_wakeup(&fd->workqueue_wakeup_fd), + err_desc); + fd_invoke_workqueue(exec_ctx, fd); + } else { + if (read_ev || cancel) { + fd_become_readable(exec_ctx, fd, pollset); + } + if (write_ev || cancel) { + fd_become_writable(exec_ctx, fd); + } + } + } + } + + return error; +} + +/* Return true if this thread should poll */ +static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, + grpc_pollset_worker **worker_hdl, + gpr_timespec deadline) { + if (worker_hdl != NULL) { + *worker_hdl = worker; + worker->kicked = false; + if (pollset->root_worker == NULL) { + pollset->root_worker = worker; + worker->next = worker->prev = worker; + worker->initialized_cv = false; + } else { + worker->next = pollset->root_worker; + worker->prev = worker->next->prev; + worker->next->prev = worker->prev->next = worker; + worker->initialized_cv = true; + gpr_cv_init(&worker->cv); + while (pollset->root_worker != worker) { + if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline)) return false; + if (worker->kicked) return false; + } + } + } + return pollset->shutdown_closure == NULL; +} + +static void end_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, + grpc_pollset_worker **worker_hdl) { + if (worker_hdl != NULL) { + if (worker == pollset->root_worker) { + if (worker == worker->next) { + pollset->root_worker = NULL; + } else { + pollset->root_worker = worker->next; + worker->prev->next = worker->next; + worker->next->prev = worker->prev; + } + } else { + worker->prev->next = worker->next; + worker->next->prev = worker->prev; + } + if (worker->initialized_cv) { + gpr_cv_destroy(&worker->cv); + } + } +} + +/* pollset->mu lock must be held by the caller before calling this. + The function pollset_work() may temporarily release the lock (pollset->mu) during the course of its execution but it will always re-acquire the lock and ensure that it is held by the time the function returns */ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { - abort(); + grpc_pollset_worker worker; + grpc_error *error = GRPC_ERROR_NONE; + if (begin_worker(pollset, &worker, worker_hdl, deadline)) { + GPR_ASSERT(!pollset->shutdown_closure); + pollset->num_pollers++; + gpr_mu_unlock(&pollset->mu); + error = pollset_poll(exec_ctx, pollset, now, deadline); + grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(&pollset->mu); + pollset->num_pollers--; + if (pollset->num_pollers == 0 && pollset->shutdown_closure != NULL) { + grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); + } + } + end_worker(pollset, &worker, worker_hdl); + return error; } static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { - abort(); + grpc_error *error = GRPC_ERROR_NONE; + static const char *err_desc = "pollset_add_fd"; + struct epoll_event ev_fd = { + .events = EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE, .data.ptr = fd}; + if (epoll_ctl(pollset->epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) { + switch (errno) { + case EEXIST: /* if this fd is already in the epoll set, the workqueue fd + must also be - just return */ + return; + default: + append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc); + } + } + struct epoll_event ev_wq = {.events = EPOLLET | EPOLLIN | EPOLLEXCLUSIVE, + .data.ptr = fd}; + if (epoll_ctl(pollset->epfd, EPOLL_CTL_ADD, fd->workqueue_wakeup_fd.read_fd, + &ev_wq) != 0) { + switch (errno) { + case EEXIST: /* if the workqueue fd is already in the epoll set we're ok - + no need to do anything special */ + break; + default: + append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc); + } + } + GRPC_LOG_IF_ERROR("pollset_add_fd", error); } /******************************************************************************* @@ -593,15 +791,32 @@ static const grpc_event_engine_vtable vtable = { /* It is possible that GLIBC has epoll but the underlying kernel doesn't. * Create a dummy epoll_fd to make sure epoll support is available */ -static bool is_epollex_available() { +static bool is_epollex_available(void) { int fd = epoll_create1(EPOLL_CLOEXEC); if (fd < 0) { gpr_log( GPR_ERROR, - "epoll_create1 failed with error: %d. Not using epoll polling engine", + "epoll_create1 failed with error: %d. Not using epollex polling engine", fd); return false; } + grpc_wakeup_fd wakeup; + if (!GRPC_LOG_IF_ERROR("check_wakeupfd_for_epollex", + grpc_wakeup_fd_init(&wakeup))) { + return false; + } + struct epoll_event ev = {.events = EPOLLET | EPOLLIN | EPOLLEXCLUSIVE, + .data.ptr = NULL}; + if (epoll_ctl(fd, EPOLL_CTL_ADD, wakeup.read_fd, &ev) != 0) { + gpr_log(GPR_ERROR, + "epoll_ctl with EPOLLEXCLUSIVE failed with error: %d. Not using " + "epollex polling engine", + fd); + close(fd); + grpc_wakeup_fd_destroy(&wakeup); + return false; + } + grpc_wakeup_fd_destroy(&wakeup); close(fd); return true; } diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 9834cdd1979..b13ec2cbc1e 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -58,6 +58,8 @@ #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" +#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) + /******************************************************************************* * FD declarations */ diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h index 9bf3cdac89e..6f3a51e7175 100644 --- a/src/core/lib/iomgr/pollset.h +++ b/src/core/lib/iomgr/pollset.h @@ -40,8 +40,6 @@ #include "src/core/lib/iomgr/exec_ctx.h" -#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) - /* A grpc_pollset is a set of file descriptors that a higher level item is interested in. For example: - a server will typically keep a pollset containing all connected channels, @@ -88,8 +86,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_timespec deadline) GRPC_MUST_USE_RESULT; /* Break one polling thread out of polling work for this pollset. - If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers. - Otherwise, if specific_worker is non-NULL, then kick that worker. */ + If specific_worker is non-NULL, then kick that worker. */ grpc_error *grpc_pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker) GRPC_MUST_USE_RESULT; diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c index 04c6b717475..6dca37b481c 100644 --- a/src/core/lib/iomgr/pollset_windows.c +++ b/src/core/lib/iomgr/pollset_windows.c @@ -43,6 +43,8 @@ #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_windows.h" +#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) + gpr_mu grpc_polling_mu; static grpc_pollset_worker *g_active_poller; static grpc_pollset_worker g_global_root_worker;