Merge pull request #12221 from sreecha/epoll1-work-dist

Epoll1 Work Distribution: Parallelize processing epoll events across multiple threads
pull/12311/head
Sree Kuchibhotla 8 years ago committed by GitHub
commit 2d060b6bd5
  1. 226
      src/core/lib/iomgr/ev_epoll1_linux.c
  2. 2
      src/core/lib/iomgr/ev_epollex_linux.c

@ -48,7 +48,60 @@
#include "src/core/lib/support/string.h"
static grpc_wakeup_fd global_wakeup_fd;
static int g_epfd;
/*******************************************************************************
* Singleton epoll set related fields
*/
#define MAX_EPOLL_EVENTS 100
#define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
/* NOTE ON SYNCHRONIZATION:
* - Fields in this struct are only modified by the designated poller. Hence
* there is no need for any locks to protect the struct.
* - num_events and cursor fields have to be of atomic type to provide memory
* visibility guarantees only. i.e In case of multiple pollers, the designated
* polling thread keeps changing; the thread that wrote these values may be
* different from the thread reading the values
*/
typedef struct epoll_set {
int epfd;
/* The epoll_events after the last call to epoll_wait() */
struct epoll_event events[MAX_EPOLL_EVENTS];
/* The number of epoll_events after the last call to epoll_wait() */
gpr_atm num_events;
/* Index of the first event in epoll_events that has to be processed. This
* field is only valid if num_events > 0 */
gpr_atm cursor;
} epoll_set;
/* The global singleton epoll set */
static epoll_set g_epoll_set;
/* Must be called *only* once */
static bool epoll_set_init() {
g_epoll_set.epfd = epoll_create1(EPOLL_CLOEXEC);
if (g_epoll_set.epfd < 0) {
gpr_log(GPR_ERROR, "epoll unavailable");
return false;
}
gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
return true;
}
/* epoll_set_init() MUST be called before calling this. */
static void epoll_set_shutdown() {
if (g_epoll_set.epfd >= 0) {
close(g_epoll_set.epfd);
g_epoll_set.epfd = -1;
}
}
/*******************************************************************************
* Fd Declarations
@ -122,7 +175,7 @@ struct grpc_pollset {
bool kicked_without_poller;
/* Set to true if the pollset is observed to have no workers available to
* poll */
poll */
bool seen_inactive;
bool shutting_down; /* Is the pollset shutting down ? */
grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
@ -228,7 +281,7 @@ static grpc_fd *fd_create(int fd, const char *name) {
struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
.data.ptr = new_fd};
if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
}
@ -326,7 +379,10 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
GPR_TLS_DECL(g_current_thread_pollset);
GPR_TLS_DECL(g_current_thread_worker);
/* The designated poller */
static gpr_atm g_active_poller;
static pollset_neighbourhood *g_neighbourhoods;
static size_t g_num_neighbourhoods;
@ -380,7 +436,8 @@ static grpc_error *pollset_global_init(void) {
if (err != GRPC_ERROR_NONE) return err;
struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
.data.ptr = &global_wakeup_fd};
if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) {
if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
&ev) != 0) {
return GRPC_OS_ERROR(errno, "epoll_ctl");
}
g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS);
@ -497,8 +554,6 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GPR_TIMER_END("pollset_shutdown", 0);
}
#define MAX_EPOLL_EVENTS 100
static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
gpr_timespec now) {
gpr_timespec timeout;
@ -517,56 +572,89 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
return millis >= 1 ? millis : 1;
}
static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_timespec now, gpr_timespec deadline) {
struct epoll_event events[MAX_EPOLL_EVENTS];
static const char *err_desc = "pollset_poll";
GPR_TIMER_BEGIN("pollset_epoll", 0);
int timeout = poll_deadline_to_millis_timeout(deadline, now);
if (timeout != 0) {
GRPC_SCHEDULING_START_BLOCKING_REGION;
}
int r;
do {
GPR_TIMER_BEGIN("epoll_wait", 0);
r = epoll_wait(g_epfd, events, MAX_EPOLL_EVENTS, timeout);
GPR_TIMER_END("epoll_wait", 0);
} while (r < 0 && errno == EINTR);
if (timeout != 0) {
GRPC_SCHEDULING_END_BLOCKING_REGION;
}
/* Process the epoll events found by do_epoll_wait() function.
- g_epoll_set.cursor points to the index of the first event to be processed
- This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
updates the g_epoll_set.cursor
NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
called by g_active_poller thread. So there is no need for synchronization
when accessing fields in g_epoll_set */
static grpc_error *process_epoll_events(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset) {
static const char *err_desc = "process_events";
grpc_error *error = GRPC_ERROR_NONE;
if (r < 0) {
GPR_TIMER_END("pollset_epoll", 0);
return GRPC_OS_ERROR(errno, "epoll_wait");
}
GPR_TIMER_BEGIN("process_epoll_events", 0);
long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
for (int idx = 0;
(idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
idx++) {
long c = cursor++;
struct epoll_event *ev = &g_epoll_set.events[c];
void *data_ptr = ev->data.ptr;
grpc_error *error = GRPC_ERROR_NONE;
for (int i = 0; i < r; i++) {
void *data_ptr = events[i].data.ptr;
if (data_ptr == &global_wakeup_fd) {
append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
} else {
grpc_fd *fd = (grpc_fd *)(data_ptr);
bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (events[i].events & EPOLLOUT) != 0;
bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (ev->events & EPOLLOUT) != 0;
if (read_ev || cancel) {
fd_become_readable(exec_ctx, fd, pollset);
}
if (write_ev || cancel) {
fd_become_writable(exec_ctx, fd);
}
}
}
GPR_TIMER_END("pollset_epoll", 0);
gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
GPR_TIMER_END("process_epoll_events", 0);
return error;
}
/* Do epoll_wait and store the events in g_epoll_set.events field. This does not
"process" any of the events yet; that is done in process_epoll_events().
*See process_epoll_events() function for more details.
NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
(i.e the designated poller thread) will be calling this function. So there is
no need for any synchronization when accesing fields in g_epoll_set */
static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
gpr_timespec now, gpr_timespec deadline) {
GPR_TIMER_BEGIN("do_epoll_wait", 0);
int r;
int timeout = poll_deadline_to_millis_timeout(deadline, now);
if (timeout != 0) {
GRPC_SCHEDULING_START_BLOCKING_REGION;
}
do {
r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
timeout);
} while (r < 0 && errno == EINTR);
if (timeout != 0) {
GRPC_SCHEDULING_END_BLOCKING_REGION;
}
if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "ps: %p poll got %d events", ps, r);
}
gpr_atm_rel_store(&g_epoll_set.num_events, r);
gpr_atm_rel_store(&g_epoll_set.cursor, 0);
GPR_TIMER_END("do_epoll_wait", 0);
return GRPC_ERROR_NONE;
}
static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl, gpr_timespec *now,
gpr_timespec deadline) {
@ -827,32 +915,55 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
The function pollset_work() may temporarily release the lock (pollset->po.mu)
during the course of its execution but it will always re-acquire the lock and
ensure that it is held by the time the function returns */
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) {
grpc_pollset_worker worker;
grpc_error *error = GRPC_ERROR_NONE;
static const char *err_desc = "pollset_work";
GPR_TIMER_BEGIN("pollset_work", 0);
if (pollset->kicked_without_poller) {
pollset->kicked_without_poller = false;
if (ps->kicked_without_poller) {
ps->kicked_without_poller = false;
GPR_TIMER_END("pollset_work", 0);
return GRPC_ERROR_NONE;
}
if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
if (begin_worker(ps, &worker, worker_hdl, &now, deadline)) {
gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
GPR_ASSERT(!pollset->shutting_down);
GPR_ASSERT(!pollset->seen_inactive);
gpr_mu_unlock(&pollset->mu);
append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
err_desc);
gpr_mu_lock(&pollset->mu);
GPR_ASSERT(!ps->shutting_down);
GPR_ASSERT(!ps->seen_inactive);
gpr_mu_unlock(&ps->mu); /* unlock */
/* This is the designated polling thread at this point and should ideally do
polling. However, if there are unprocessed events left from a previous
call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
process the pending epoll events.
The reason for decoupling do_epoll_wait and process_epoll_events is to
better distrubute the work (i.e handling epoll events) across multiple
threads
process_epoll_events() returns very quickly: It just queues the work on
exec_ctx but does not execute it (the actual exectution or more
accurately grpc_exec_ctx_flush() happens in end_worker() AFTER selecting
a designated poller). So we are not waiting long periods without a
designated poller */
if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
gpr_atm_acq_load(&g_epoll_set.num_events)) {
append_error(&error, do_epoll_wait(exec_ctx, ps, now, deadline),
err_desc);
}
append_error(&error, process_epoll_events(exec_ctx, ps), err_desc);
gpr_mu_lock(&ps->mu); /* lock */
gpr_tls_set(&g_current_thread_worker, 0);
} else {
gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
}
end_worker(exec_ctx, pollset, &worker, worker_hdl);
end_worker(exec_ctx, ps, &worker, worker_hdl);
gpr_tls_set(&g_current_thread_pollset, 0);
GPR_TIMER_END("pollset_work", 0);
return error;
@ -1043,7 +1154,7 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
static void shutdown_engine(void) {
fd_global_shutdown();
pollset_global_shutdown();
close(g_epfd);
epoll_set_shutdown();
}
static const grpc_event_engine_vtable vtable = {
@ -1078,7 +1189,8 @@ static const grpc_event_engine_vtable vtable = {
};
/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
* Create a dummy epoll_fd to make sure epoll support is available */
* Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
* support is available */
const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
if (!explicit_request) {
return NULL;
@ -1088,22 +1200,18 @@ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
return NULL;
}
g_epfd = epoll_create1(EPOLL_CLOEXEC);
if (g_epfd < 0) {
gpr_log(GPR_ERROR, "epoll unavailable");
if (!epoll_set_init()) {
return NULL;
}
fd_global_init();
if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
close(g_epfd);
fd_global_shutdown();
epoll_set_shutdown();
return NULL;
}
gpr_log(GPR_ERROR, "grpc epoll fd: %d", g_epfd);
return &vtable;
}

@ -49,7 +49,7 @@
#include "src/core/lib/support/spinlock.h"
/*******************************************************************************
* Pollset-set sibling link
* Polling object
*/
typedef enum {

Loading…
Cancel
Save