From 9f012514738c560df9ccd72971390a5b097256c0 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 13 Apr 2017 15:37:14 -0700 Subject: [PATCH] Working towards a single-fd optimized data structure --- src/core/lib/iomgr/ev_epollex_linux.c | 161 +++++++++++++++++--------- 1 file changed, 109 insertions(+), 52 deletions(-) diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index 0b3e2b13fc8..6e6e4460c02 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -170,24 +170,27 @@ static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = { struct grpc_pollset_worker { bool kicked; bool initialized_cv; + bool inserted; gpr_cv cv; grpc_pollset_worker *next; grpc_pollset_worker *prev; + grpc_pollset *pollset; }; -struct grpc_pollset { +struct pollable { polling_obj po; - /* Pollable set - possible values: - 0 - nothing is pollable - pointer | 1 - a single pollable file descriptor - (fd << 1) | 0 - an epoll fd */ - gpr_atm pollable_set_atm; + int epfd; + grpc_wakeup_fd wakeup; + grpc_pollset_worker *root_worker; int num_pollers; - bool kicked_without_poller; gpr_atm shutdown_atm; +}; + +struct grpc_pollset { + pollable pollable; + pollable *current_pollable; + bool kicked_without_poller; grpc_closure *shutdown_closure; - grpc_wakeup_fd pollset_wakeup; - grpc_pollset_worker *root_worker; }; /******************************************************************************* @@ -573,32 +576,53 @@ static grpc_error *kick_poller(void) { static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { po_init(&pollset->po, PO_POLLSET); pollset->kicked_without_poller = false; - pollset->epfd = epoll_create1(EPOLL_CLOEXEC); - if (pollset->epfd < 0) { - GRPC_LOG_IF_ERROR("pollset_init", GRPC_OS_ERROR(errno, "epoll_create1")); + gpr_atm_no_barrier_store(&pollset->pollable_set_atm, 0); + pollset->num_pollers = 0; + gpr_atm_no_barrier_store(&pollset->shutdown_atm, 0); + pollset->shutdown_closure = NULL; + pollset->root_worker = NULL; + *mu = &pollset->po.mu; +} + +static grpc_error *multipoller_create(multipoller **out) { + multipoller *p = gpr_malloc(sizeof(*p)); + p->epfd = epoll_create1(EPOLL_CLOEXEC); + if (p->epfd < 0) { + grpc_error *err = GRPC_OS_ERROR(errno, "epoll_create1"); + gpr_free(p); + return err; } else { struct epoll_event ev = {.events = EPOLLIN | EPOLLET | EPOLLEXCLUSIVE, .data.ptr = &global_wakeup_fd}; - if (epoll_ctl(pollset->epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, - &ev) != 0) { - GRPC_LOG_IF_ERROR("pollset_init", GRPC_OS_ERROR(errno, "epoll_ctl")); + if (epoll_ctl(p->epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) { + grpc_error *err = GRPC_OS_ERROR(errno, "epoll_ctl"); + close(p->epfd); + gpr_free(p); + return err; } } - pollset->num_pollers = 0; - gpr_atm_no_barrier_store(&pollset->shutdown_atm, 0); - pollset->shutdown_closure = NULL; - if (GRPC_LOG_IF_ERROR("pollset_init", - grpc_wakeup_fd_init(&pollset->pollset_wakeup)) && - pollset->epfd >= 0) { - struct epoll_event ev = {.events = EPOLLIN | EPOLLET, - .data.ptr = &pollset->pollset_wakeup}; - if (epoll_ctl(pollset->epfd, EPOLL_CTL_ADD, pollset->pollset_wakeup.read_fd, - &ev) != 0) { - GRPC_LOG_IF_ERROR("pollset_init", GRPC_OS_ERROR(errno, "epoll_ctl")); - } + grpc_error *err = grpc_wakeup_fd_init(&p->wakeup); + if (err != GRPC_ERROR_NONE) { + close(p->epfd); + gpr_free(p); + return err; } - pollset->root_worker = NULL; - *mu = &pollset->po.mu; + struct epoll_event ev = {.events = EPOLLIN | EPOLLET, .data.ptr = &p->wakeup}; + if (epoll_ctl(pollset->epfd, EPOLL_CTL_ADD, p->wakeup.read_fd, &ev) != 0) { + err = GRPC_OS_ERROR(errno, "epoll_ctl"); + close(p->epfd); + grpc_wakeup_fd_destroy(&p->wakeup); + gpr_free(p); + return err; + } + *out = p; + return GRPC_ERROR_NONE; +} + +static void multipoller_destroy(multipoller *p) { + close(p->epfd); + grpc_wakeup_fd_destroy(&p->wakeup); + gpr_free(p); } /* Convert a timespec to milliseconds: @@ -678,31 +702,40 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* pollset_shutdown is guaranteed to be called before pollset_destroy. */ static void pollset_destroy(grpc_pollset *pollset) { po_destroy(&pollset->po); - if (pollset->epfd >= 0) close(pollset->epfd); - grpc_wakeup_fd_destroy(&pollset->pollset_wakeup); + switch (pollset->occupancy) { + case POLLSET_EMPTY: + break; + case POLLSET_UNARY_FD: + UNREF_BY(pollset->pollable.unary_fd, 2); + break; + case POLLSET_MULTIPOLLER: + multipoller_destroy(pollset->pollable.multipoller); + break; + } } #define MAX_EPOLL_EVENTS 100 -static grpc_error *pollset_poll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - gpr_timespec now, gpr_timespec deadline) { +static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + gpr_timespec now, gpr_timespec deadline) { struct epoll_event events[MAX_EPOLL_EVENTS]; static const char *err_desc = "pollset_poll"; - if (pollset->epfd < 0) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "epoll fd failed to initialize"); - } - - GRPC_SCHEDULING_START_BLOCKING_REGION; int timeout = poll_deadline_to_millis_timeout(deadline, now); if (grpc_polling_trace) { gpr_log(GPR_DEBUG, "PS:%p poll for %dms", pollset, timeout); } - int r = epoll_wait(pollset->epfd, events, MAX_EPOLL_EVENTS, timeout); - GRPC_SCHEDULING_END_BLOCKING_REGION; + if (timeout != 0) { + GRPC_SCHEDULING_START_BLOCKING_REGION; + } + int r = epoll_wait(pollset->pollable.multipoller->epfd, events, + MAX_EPOLL_EVENTS, timeout); + if (timeout != 0) { + GRPC_SCHEDULING_END_BLOCKING_REGION; + } + if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); if (grpc_polling_trace) { @@ -720,7 +753,7 @@ static grpc_error *pollset_poll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_timer_consume_kick(); append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), err_desc); - } else if (data_ptr == &pollset->pollset_wakeup) { + } else if (data_ptr == &pollset->pollable.multipoller->pollset_wakeup) { if (grpc_polling_trace) { gpr_log(GPR_DEBUG, "PS:%p poll got pollset_wakeup", pollset); } @@ -728,7 +761,9 @@ static grpc_error *pollset_poll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, the fd is level triggered and non-exclusive, which should result in all pollers waking */ if (gpr_atm_no_barrier_load(&pollset->shutdown_atm) == 0) { - append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), + append_error(&error, + grpc_wakeup_fd_consume_wakeup( + &pollset->pollable.multipoller->pollset_wakeup), err_desc); } } else { @@ -767,20 +802,29 @@ static grpc_error *pollset_poll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl, gpr_timespec deadline) { - if (worker_hdl != NULL) { - *worker_hdl = worker; + worker->initialized_cv = false; + worker->inserted = false; + if (worker_hdl != NULL || pollset->occupancy != POLLSET_MULTIPOLLER) { + if (worker_hdl != NULL) *worker_hdl = worker; worker->kicked = false; + worker->inserted = true; if (pollset->root_worker == NULL) { pollset->root_worker = worker; worker->next = worker->prev = worker; - worker->initialized_cv = false; + if (pollset->occupancy == POLLSET_EMPTY) { + worker->initialized_cv = true; + } } else { worker->next = pollset->root_worker; worker->prev = worker->next->prev; worker->next->prev = worker->prev->next = worker; worker->initialized_cv = true; + } + if (worker->initialized_cv) { + GPR_ASSERT(worker->inserted); gpr_cv_init(&worker->cv); - while (pollset->root_worker != worker) { + while (pollset->root_worker != worker || + pollset->occupancy == POLLSET_EMPTY) { if (gpr_cv_wait(&worker->cv, &pollset->po.mu, deadline)) return false; if (worker->kicked) return false; } @@ -791,7 +835,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, static void end_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl) { - if (worker_hdl != NULL) { + if (worker->inserted) { if (worker == pollset->root_worker) { if (worker == worker->next) { pollset->root_worker = NULL; @@ -819,6 +863,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { grpc_pollset_worker worker; + grpc_pollset_worker *fake_worker_hdl; if (grpc_polling_trace) { gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRId64 ".%09d deadline=%" PRId64 ".%09d kwp=%d root_worker=%p", @@ -836,10 +881,22 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); GPR_ASSERT(!pollset->shutdown_closure); pollset->num_pollers++; - gpr_mu_unlock(&pollset->po.mu); - error = pollset_poll(exec_ctx, pollset, now, deadline); - grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->po.mu); + switch (pollset->occupancy) { + case POLLSET_EMPTY: + GPR_UNREACHABLE_CODE(break); + case POLLSET_UNARY_FD: + gpr_mu_unlock(&pollset->po.mu); + error = pollset_poll(exec_ctx, pollset, now, deadline); + grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(&pollset->po.mu); + break; + case POLLSET_MULTIPOLLER: + gpr_mu_unlock(&pollset->po.mu); + error = pollset_epoll(exec_ctx, pollset, now, deadline); + grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(&pollset->po.mu); + break; + } gpr_tls_set(&g_current_thread_pollset, 0); gpr_tls_set(&g_current_thread_worker, 0); pollset->num_pollers--;