From 5098f91159f2a5c0494688b8cfaff4debef5686f Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 31 May 2016 10:58:17 -0700 Subject: [PATCH] Rewrite all the pollset and fd functions in ev_epoll_linux.c --- src/core/lib/iomgr/ev_epoll_linux.c | 161 +++++++--------------------- 1 file changed, 38 insertions(+), 123 deletions(-) diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 7793a952016..1201c10a7e0 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -67,10 +67,10 @@ struct polling_island; struct grpc_fd { int fd; /* refst format: - bit0: 1=active/0=orphaned - bit1-n: refcount - meaning that mostly we ref by two to avoid altering the orphaned bit, - and just unref by 1 when we're ready to flag the object as orphaned */ + bit 0 : 1=Active / 0=Orphaned + bits 1-n : refcount + - ref/unref by two to avoid altering the orphaned bit + - To orphan, unref by 1 */ gpr_atm refst; gpr_mu mu; @@ -84,12 +84,11 @@ struct grpc_fd { /* Mutex protecting the 'polling_island' field */ gpr_mu pi_mu; - /* The polling island to which this fd belongs to. An fd belongs to exactly - one polling island */ + /* The polling island to which this fd belongs to. + * An fd belongs to exactly one polling island */ struct polling_island *polling_island; struct grpc_fd *freelist_next; - grpc_closure *on_done_closure; grpc_iomgr_object iomgr_object; @@ -141,7 +140,6 @@ typedef struct polling_island { /* Polling islands that are no longer needed are kept in a freelist so that they can be reused. This field points to the next polling island in the - free list. Note that this is only used if the polling island is in the free list */ struct polling_island *next_free; } polling_island; @@ -185,7 +183,7 @@ static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds, * TODO: sreek - Might have to unref the fds (assuming whether we add a ref to * the fd when adding it to the epollset) */ /* The caller is expected to hold pi->mu lock before calling this function */ -static void polling_island_clear_fds_locked(polling_island *pi) { +static void polling_island_remove_all_fds_locked(polling_island *pi) { int err; size_t i; @@ -392,7 +390,7 @@ polling_island *polling_island_merge(polling_island *p, polling_island *q) { // Move all the fds from polling_island p to polling_island q polling_island_add_fds_locked(q, p->fds, p->fd_cnt); - polling_island_clear_fds_locked(p); + polling_island_remove_all_fds_locked(p); q->ref_cnt += p->ref_cnt; @@ -411,14 +409,7 @@ static void polling_island_global_init() { * pollset declarations */ -typedef struct grpc_cached_wakeup_fd { - grpc_wakeup_fd fd; - struct grpc_cached_wakeup_fd *next; -} grpc_cached_wakeup_fd; - struct grpc_pollset_worker { - grpc_cached_wakeup_fd *wakeup_fd; - int reevaluate_polling_on_wakeup; int kicked_specifically; pthread_t pt_id; struct grpc_pollset_worker *next; @@ -441,9 +432,6 @@ struct grpc_pollset { /* The polling island to which this fd belongs to. An fd belongs to exactly one polling island */ struct polling_island *polling_island; - - /* Local cache of eventfds for workers */ - grpc_cached_wakeup_fd *local_wakeup_cache; }; /* Add an fd to a pollset */ @@ -465,8 +453,6 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, /* Allow kick to wakeup the currently polling worker */ #define GRPC_POLLSET_CAN_KICK_SELF 1 -/* Force the wakee to repoll when awoken */ -#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 /* As per pollset_kick, with an extended set of flags (defined above) -- mostly for fd_posix's use. */ static void pollset_kick_ext(grpc_pollset *p, @@ -815,34 +801,25 @@ static void pollset_kick_ext(grpc_pollset *p, if (specific_worker != NULL) { if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0); - GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); for (specific_worker = p->root_worker.next; specific_worker != &p->root_worker; specific_worker = specific_worker->next) { - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + pthread_kill(specific_worker->pt_id, SIGUSR1); } p->kicked_without_pollers = 1; GPR_TIMER_END("pollset_kick_ext.broadcast", 0); } else if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)specific_worker) { GPR_TIMER_MARK("different_thread_worker", 0); - if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { - specific_worker->reevaluate_polling_on_wakeup = 1; - } specific_worker->kicked_specifically = 1; - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); /* TODO (sreek): Refactor this into a separate file*/ pthread_kill(specific_worker->pt_id, SIGUSR1); } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { GPR_TIMER_MARK("kick_yoself", 0); - if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { - specific_worker->reevaluate_polling_on_wakeup = 1; - } specific_worker->kicked_specifically = 1; - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + pthread_kill(specific_worker->pt_id, SIGUSR1); } } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) { - GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); GPR_TIMER_MARK("kick_anonymous", 0); specific_worker = pop_front_worker(p); if (specific_worker != NULL) { @@ -860,7 +837,7 @@ static void pollset_kick_ext(grpc_pollset *p, if (specific_worker != NULL) { GPR_TIMER_MARK("finally_kick", 0); push_back_worker(p, specific_worker); - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + pthread_kill(specific_worker->pt_id, SIGUSR1); } } else { GPR_TIMER_MARK("kicked_no_pollers", 0); @@ -911,8 +888,6 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { pollset->shutting_down = 0; pollset->called_shutdown = 0; pollset->kicked_without_pollers = 0; - pollset->local_wakeup_cache = NULL; - pollset->kicked_without_pollers = 0; multipoll_with_epoll_pollset_create_efd(pollset); } @@ -926,12 +901,6 @@ static void pollset_destroy(grpc_pollset *pollset) { multipoll_with_epoll_pollset_destroy(pollset); - while (pollset->local_wakeup_cache) { - grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next; - grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd); - gpr_free(pollset->local_wakeup_cache); - pollset->local_wakeup_cache = next; - } gpr_mu_destroy(&pollset->pi_mu); gpr_mu_destroy(&pollset->mu); } @@ -974,14 +943,6 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, GPR_TIMER_BEGIN("pollset_work", 0); /* this must happen before we (potentially) drop pollset->mu */ worker.next = worker.prev = NULL; - worker.reevaluate_polling_on_wakeup = 0; - if (pollset->local_wakeup_cache != NULL) { - worker.wakeup_fd = pollset->local_wakeup_cache; - pollset->local_wakeup_cache = worker.wakeup_fd->next; - } else { - worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); - grpc_wakeup_fd_init(&worker.wakeup_fd->fd); - } worker.kicked_specifically = 0; /* TODO(sreek): Abstract this thread id stuff out into a separate file */ @@ -1026,27 +987,12 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_lock(&pollset->mu); locked = 1; } - /* If we're forced to re-evaluate polling (via pollset_kick with - GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force - a loop */ - if (worker.reevaluate_polling_on_wakeup) { - worker.reevaluate_polling_on_wakeup = 0; - pollset->kicked_without_pollers = 0; - if (queued_work || worker.kicked_specifically) { - /* If there's queued work on the list, then set the deadline to be - immediate so we get back out of the polling loop quickly */ - deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); - } - keep_polling = 1; - } } if (added_worker) { remove_worker(pollset, &worker); gpr_tls_set(&g_current_thread_worker, 0); } - /* release wakeup fd to the local pool */ - worker.wakeup_fd->next = pollset->local_wakeup_cache; - pollset->local_wakeup_cache = worker.wakeup_fd; + /* check shutdown conditions */ if (pollset->shutting_down) { if (pollset_has_workers(pollset)) { @@ -1135,10 +1081,9 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } else if (fd->polling_island == NULL) { pi_new = polling_island_update_and_lock(pollset->polling_island, 1, 1); - } else if (pollset->polling_island == NULL) { pi_new = polling_island_update_and_lock(fd->polling_island, 1, 1); - } else { // Non null and different + } else { pi_new = polling_island_merge(fd->polling_island, pollset->polling_island); } @@ -1182,9 +1127,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; int epoll_fd = pollset->epoll_fd; int ep_rv; - int poll_rv; int timeout_ms; - struct pollfd pfds[2]; /* If you want to ignore epoll's ability to sanely handle parallel pollers, * for a more apples-to-apples performance comparison with poll, add a @@ -1196,63 +1139,35 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( timeout_ms = poll_deadline_to_millis_timeout(deadline, now); - pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); - pfds[0].events = POLLIN; - pfds[0].revents = 0; - pfds[1].fd = epoll_fd; - pfds[1].events = POLLIN; - pfds[1].revents = 0; - - /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid - even going into the blocking annotation if possible */ - GPR_TIMER_BEGIN("poll", 0); - GRPC_SCHEDULING_START_BLOCKING_REGION; - poll_rv = grpc_poll_function(pfds, 2, timeout_ms); - GRPC_SCHEDULING_END_BLOCKING_REGION; - GPR_TIMER_END("poll", 0); - - if (poll_rv < 0) { - if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); - } - } else if (poll_rv == 0) { - /* do nothing */ - } else { - if (pfds[0].revents) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); - } - if (pfds[1].revents) { - do { - /* The following epoll_wait never blocks; it has a timeout of 0 */ - ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); - if (ep_rv < 0) { - if (errno != EINTR) { - gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno)); - } + do { + /* The following epoll_wait never blocks; it has a timeout of 0 */ + ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms); + if (ep_rv < 0) { + if (errno != EINTR) { + gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno)); + } + } else { + int i; + for (i = 0; i < ep_rv; ++i) { + grpc_fd *fd = ep_ev[i].data.ptr; + /* TODO(klempner): We might want to consider making err and pri + * separate events */ + int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); + int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); + int write_ev = ep_ev[i].events & EPOLLOUT; + if (fd == NULL) { + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); } else { - int i; - for (i = 0; i < ep_rv; ++i) { - grpc_fd *fd = ep_ev[i].data.ptr; - /* TODO(klempner): We might want to consider making err and pri - * separate events */ - int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); - int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); - int write_ev = ep_ev[i].events & EPOLLOUT; - if (fd == NULL) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); - } else { - if (read_ev || cancel) { - fd_become_readable(exec_ctx, fd); - } - if (write_ev || cancel) { - fd_become_writable(exec_ctx, fd); - } - } + if (read_ev || cancel) { + fd_become_readable(exec_ctx, fd); + } + if (write_ev || cancel) { + fd_become_writable(exec_ctx, fd); } } - } while (ep_rv == GRPC_EPOLL_MAX_EVENTS); + } } - } + } while (ep_rv == GRPC_EPOLL_MAX_EVENTS); } static void multipoll_with_epoll_pollset_finish_shutdown(