pull/10892/head
Craig Tiller 8 years ago
parent c67cc999e3
commit 4509c47614
  1. 467
      src/core/lib/iomgr/ev_epoll1_linux.c
  2. 2
      src/core/lib/iomgr/ev_epollsig_linux.c

@ -36,7 +36,7 @@
/* This polling engine is only relevant on linux kernels supporting epoll() */
#ifdef GRPC_LINUX_EPOLL
#include "src/core/lib/iomgr/ev_epoll_linux.h"
#include "src/core/lib/iomgr/ev_epoll1_linux.h"
#include <assert.h>
#include <errno.h>
@ -75,16 +75,10 @@ static int g_epfd;
struct grpc_fd {
int fd;
/* The fd is either closed or we relinquished control of it. In either
cases, this indicates that the 'fd' on this structure is no longer
valid */
bool orphaned;
gpr_atm read_closure;
gpr_atm write_closure;
struct grpc_fd *freelist_next;
grpc_closure *on_done_closure;
/* The pollset that last noticed that the fd is readable. The actual type
* stored in this is (grpc_pollset *) */
@ -119,12 +113,12 @@ struct grpc_pollset_worker {
};
struct grpc_pollset {
grpc_pollset_worker root_worker;
bool kicked_without_pollers;
grpc_pollset_worker *root_worker;
bool kicked_without_poller;
bool shutting_down; /* Is the pollset shutting down ? */
bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
grpc_closure *shutdown_done; /* Called after after shutdown is complete */
grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
};
/*******************************************************************************
@ -171,66 +165,6 @@ static bool append_error(grpc_error **composite, grpc_error *error,
static grpc_fd *fd_freelist = NULL;
static gpr_mu fd_freelist_mu;
#ifdef GRPC_FD_REF_COUNT_DEBUG
#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
int line) {
gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
(void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
#else
#define REF_BY(fd, n, reason) ref_by(fd, n)
#define UNREF_BY(fd, n, reason) unref_by(fd, n)
static void ref_by(grpc_fd *fd, int n) {
#endif
GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
}
#ifdef GRPC_FD_REF_COUNT_DEBUG
static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
int line) {
gpr_atm old;
gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
(void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
#else
static void unref_by(grpc_fd *fd, int n) {
gpr_atm old;
#endif
old = gpr_atm_full_fetch_add(&fd->refst, -n);
if (old == n) {
/* Add the fd to the freelist */
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
fd_freelist = fd;
grpc_iomgr_unregister_object(&fd->iomgr_object);
grpc_lfev_destroy(&fd->read_closure);
grpc_lfev_destroy(&fd->write_closure);
gpr_mu_unlock(&fd_freelist_mu);
} else {
GPR_ASSERT(old > n);
}
}
/* Increment refcount by two to avoid changing the orphan bit */
#ifdef GRPC_FD_REF_COUNT_DEBUG
static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
int line) {
ref_by(fd, 2, reason, file, line);
}
static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
int line) {
unref_by(fd, 2, reason, file, line);
}
#else
static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
#endif
static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
static void fd_global_shutdown(void) {
@ -239,7 +173,6 @@ static void fd_global_shutdown(void) {
while (fd_freelist != NULL) {
grpc_fd *fd = fd_freelist;
fd_freelist = fd_freelist->freelist_next;
gpr_mu_destroy(&fd->po.mu);
gpr_free(fd);
}
gpr_mu_destroy(&fd_freelist_mu);
@ -257,29 +190,20 @@ static grpc_fd *fd_create(int fd, const char *name) {
if (new_fd == NULL) {
new_fd = gpr_malloc(sizeof(grpc_fd));
gpr_mu_init(&new_fd->po.mu);
}
/* Note: It is not really needed to get the new_fd->po.mu lock here. If this
* is a newly created fd (or an fd we got from the freelist), no one else
* would be holding a lock to it anyway. */
gpr_mu_lock(&new_fd->po.mu);
new_fd->po.pi = NULL;
#ifdef PO_DEBUG
new_fd->po.obj_type = POLL_OBJ_FD;
#endif
struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
.data.ptr = new_fd};
if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
}
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
new_fd->orphaned = false;
grpc_lfev_init(&new_fd->read_closure);
grpc_lfev_init(&new_fd->write_closure);
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = NULL;
new_fd->on_done_closure = NULL;
gpr_mu_unlock(&new_fd->po.mu);
char *fd_name;
gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
@ -291,26 +215,12 @@ static grpc_fd *fd_create(int fd, const char *name) {
return new_fd;
}
static int fd_wrapped_fd(grpc_fd *fd) {
int ret_fd = -1;
gpr_mu_lock(&fd->po.mu);
if (!fd->orphaned) {
ret_fd = fd->fd;
}
gpr_mu_unlock(&fd->po.mu);
return ret_fd;
}
static int fd_wrapped_fd(grpc_fd *fd) { return fd->fd; }
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
const char *reason) {
bool is_fd_closed = false;
grpc_error *error = GRPC_ERROR_NONE;
polling_island *unref_pi = NULL;
gpr_mu_lock(&fd->po.mu);
fd->on_done_closure = on_done;
/* If release_fd is not NULL, we should be relinquishing control of the file
descriptor fd->fd (but we still own the grpc_fd structure). */
@ -318,45 +228,18 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
*release_fd = fd->fd;
} else {
close(fd->fd);
is_fd_closed = true;
}
fd->orphaned = true;
/* Remove the active status but keep referenced. We want this grpc_fd struct
to be alive (and not added to freelist) until the end of this function */
REF_BY(fd, 1, reason);
/* Remove the fd from the polling island:
- Get a lock on the latest polling island (i.e the last island in the
linked list pointed by fd->po.pi). This is the island that
would actually contain the fd
- Remove the fd from the latest polling island
- Unlock the latest polling island
- Set fd->po.pi to NULL (but remove the ref on the polling island
before doing this.) */
if (fd->po.pi != NULL) {
polling_island *pi_latest = polling_island_lock(fd->po.pi);
polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
gpr_mu_unlock(&pi_latest->mu);
unref_pi = fd->po.pi;
fd->po.pi = NULL;
}
grpc_closure_sched(exec_ctx, on_done, GRPC_ERROR_REF(error));
grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
grpc_iomgr_unregister_object(&fd->iomgr_object);
grpc_lfev_destroy(&fd->read_closure);
grpc_lfev_destroy(&fd->write_closure);
gpr_mu_unlock(&fd->po.mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
if (unref_pi != NULL) {
/* Unref stale polling island here, outside the fd lock above.
The polling island owns a workqueue which owns an fd, and unreffing
inside the lock can cause an eventual lock loop that makes TSAN very
unhappy. */
PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
}
GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
fd_freelist = fd;
gpr_mu_unlock(&fd_freelist_mu);
}
static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
@ -390,11 +273,24 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
}
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
gpr_mu_lock(&fd->po.mu);
grpc_workqueue *workqueue =
GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
gpr_mu_unlock(&fd->po.mu);
return workqueue;
return NULL; /* TODO(ctiller): add a global workqueue */
}
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
/* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll
sets (This can happen briefly during polling island merges). In such cases
it does not really matter which notifer is set as the read_notifier_pollset
(They would both point to the same polling island anyway) */
/* Use release store to match with acquire load in fd_get_read_notifier */
gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
}
/*******************************************************************************
@ -442,6 +338,263 @@ static worker_remove_result worker_remove(grpc_pollset_worker **root,
GPR_TLS_DECL(g_current_thread_pollset);
GPR_TLS_DECL(g_current_thread_worker);
static gpr_mu g_pollset_mu;
static grpc_pollset_worker *g_root_worker;
static grpc_error *pollset_global_init(void) {
gpr_mu_init(&g_pollset_mu);
gpr_tls_init(&g_current_thread_pollset);
gpr_tls_init(&g_current_thread_worker);
struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
.data.ptr = &global_wakeup_fd};
if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) {
return GRPC_OS_ERROR(errno, "epoll_ctl");
}
return GRPC_ERROR_NONE;
}
static void pollset_global_shutdown(void) {
gpr_mu_destroy(&g_pollset_mu);
gpr_tls_destroy(&g_current_thread_pollset);
gpr_tls_destroy(&g_current_thread_worker);
}
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
*mu = &g_pollset_mu;
}
static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
grpc_error *error = GRPC_ERROR_NONE;
if (pollset->root_worker != NULL) {
grpc_pollset_worker *worker = pollset->root_worker;
do {
if (worker->initialized_cv) {
worker->kicked = true;
gpr_cv_signal(&worker->cv);
} else {
append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
"pollset_shutdown");
}
worker = worker->links[PWL_POLLSET].next;
} while (worker != pollset->root_worker);
}
return error;
}
static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset) {
if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL) {
grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
pollset->shutdown_closure = NULL;
}
}
static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
GPR_ASSERT(pollset->shutdown_closure == NULL);
pollset->shutdown_closure = closure;
GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
static void pollset_destroy(grpc_pollset *pollset) {}
#define MAX_EPOLL_EVENTS 100
static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
gpr_timespec now) {
gpr_timespec timeout;
if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
return -1;
}
if (gpr_time_cmp(deadline, now) <= 0) {
return 0;
}
static const gpr_timespec round_up = {
.clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1};
timeout = gpr_time_sub(deadline, now);
int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
return millis >= 1 ? millis : 1;
}
static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_timespec now, gpr_timespec deadline) {
struct epoll_event events[MAX_EPOLL_EVENTS];
static const char *err_desc = "pollset_poll";
int timeout = poll_deadline_to_millis_timeout(deadline, now);
if (timeout != 0) {
GRPC_SCHEDULING_START_BLOCKING_REGION;
}
int r;
do {
r = epoll_wait(g_epfd, events, MAX_EPOLL_EVENTS, timeout);
} while (r < 0 && errno == EINTR);
if (timeout != 0) {
GRPC_SCHEDULING_END_BLOCKING_REGION;
}
if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
grpc_error *error = GRPC_ERROR_NONE;
for (int i = 0; i < r; i++) {
void *data_ptr = events[i].data.ptr;
if (data_ptr == &global_wakeup_fd) {
grpc_timer_consume_kick();
append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
} else {
grpc_fd *fd = (grpc_fd *)(data_ptr);
bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (events[i].events & EPOLLOUT) != 0;
if (read_ev || cancel) {
fd_become_readable(exec_ctx, fd, pollset);
}
if (write_ev || cancel) {
fd_become_writable(exec_ctx, fd);
}
}
}
return error;
}
static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl, gpr_timespec *now,
gpr_timespec deadline) {
bool do_poll = true;
if (worker_hdl != NULL) *worker_hdl = worker;
worker->initialized_cv = false;
worker->kicked = false;
worker_insert(&pollset->root_worker, PWL_POLLSET, worker);
if (!worker_insert(&g_root_worker, PWL_POLLABLE, worker)) {
worker->initialized_cv = true;
gpr_cv_init(&worker->cv);
while (do_poll && g_root_worker != worker) {
if (gpr_cv_wait(&worker->cv, &g_pollset_mu, deadline)) {
do_poll = false;
} else if (worker->kicked) {
do_poll = false;
}
}
*now = gpr_now(now->clock_type);
}
return do_poll && pollset->shutdown_closure == NULL;
}
static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl) {
if (NEW_ROOT == worker_remove(&g_root_worker, PWL_POLLABLE, worker)) {
gpr_cv_signal(&g_root_worker->cv);
}
if (worker->initialized_cv) {
gpr_cv_destroy(&worker->cv);
}
if (EMPTIED == worker_remove(&pollset->root_worker, PWL_POLLSET, worker)) {
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
}
/* pollset->po.mu lock must be held by the caller before calling this.
The function pollset_work() may temporarily release the lock (pollset->po.mu)
during the course of its execution but it will always re-acquire the lock and
ensure that it is held by the time the function returns */
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) {
grpc_pollset_worker worker;
grpc_error *error = GRPC_ERROR_NONE;
static const char *err_desc = "pollset_work";
if (pollset->kicked_without_poller) {
pollset->kicked_without_poller = false;
return GRPC_ERROR_NONE;
}
if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
GPR_ASSERT(!pollset->shutdown_closure);
gpr_mu_unlock(&g_pollset_mu);
append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
err_desc);
grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&g_pollset_mu);
gpr_tls_set(&g_current_thread_pollset, 0);
gpr_tls_set(&g_current_thread_worker, 0);
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
end_worker(exec_ctx, pollset, &worker, worker_hdl);
return error;
}
static grpc_error *pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
if (specific_worker == NULL) {
if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
if (pollset->root_worker == NULL) {
pollset->kicked_without_poller = true;
return GRPC_ERROR_NONE;
} else {
return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
}
} else {
return GRPC_ERROR_NONE;
}
} else if (specific_worker->kicked) {
return GRPC_ERROR_NONE;
} else if (gpr_tls_get(&g_current_thread_worker) ==
(intptr_t)specific_worker) {
specific_worker->kicked = true;
return GRPC_ERROR_NONE;
} else if (specific_worker == g_root_worker) {
specific_worker->kicked = true;
return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
} else {
specific_worker->kicked = true;
gpr_cv_signal(&specific_worker->cv);
return GRPC_ERROR_NONE;
}
}
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd *fd) {}
static grpc_error *kick_poller(void) {
return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
}
/*******************************************************************************
* Workqueue Definitions
*/
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
const char *file, int line,
const char *reason) {
return workqueue;
}
static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
const char *file, int line, const char *reason) {}
#else
static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
return workqueue;
}
static void workqueue_unref(grpc_exec_ctx *exec_ctx,
grpc_workqueue *workqueue) {}
#endif
static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
return grpc_schedule_on_exec_ctx;
}
/*******************************************************************************
* Pollset-set Definitions
@ -481,7 +634,6 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
static void shutdown_engine(void) {
fd_global_shutdown();
pollset_global_shutdown();
polling_island_global_shutdown();
}
static const grpc_event_engine_vtable vtable = {
@ -524,45 +676,22 @@ static const grpc_event_engine_vtable vtable = {
/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
* Create a dummy epoll_fd to make sure epoll support is available */
static bool is_epoll_available() {
int fd = epoll_create1(EPOLL_CLOEXEC);
if (fd < 0) {
gpr_log(
GPR_ERROR,
"epoll_create1 failed with error: %d. Not using epoll polling engine",
fd);
return false;
}
close(fd);
return true;
}
const grpc_event_engine_vtable *grpc_init_epoll1_linux(void) {
/* If use of signals is disabled, we cannot use epoll engine*/
if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
return NULL;
}
if (!grpc_has_wakeup_fd()) {
return NULL;
}
if (!is_epoll_available()) {
g_epfd = epoll_create1(EPOLL_CLOEXEC);
if (g_epfd < 0) {
gpr_log(GPR_ERROR, "epoll unavailable");
return NULL;
}
if (!is_grpc_wakeup_signal_initialized) {
grpc_use_signal(SIGRTMIN + 6);
}
fd_global_init();
if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
return NULL;
}
if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
polling_island_global_init())) {
close(g_epfd);
fd_global_shutdown();
return NULL;
}

@ -36,7 +36,7 @@
/* This polling engine is only relevant on linux kernels supporting epoll() */
#ifdef GRPC_LINUX_EPOLL
#include "src/core/lib/iomgr/ev_epoll_linux.h"
#include "src/core/lib/iomgr/ev_epollsig_linux.h"
#include <assert.h>
#include <errno.h>

Loading…
Cancel
Save