Rewrite all the pollset and fd functions in ev_epoll_linux.c

pull/6803/head
Sree Kuchibhotla 9 years ago
parent 8c6c9067bc
commit 5098f91159
  1. 113
      src/core/lib/iomgr/ev_epoll_linux.c

@ -67,10 +67,10 @@ struct polling_island;
struct grpc_fd { struct grpc_fd {
int fd; int fd;
/* refst format: /* refst format:
bit0: 1=active/0=orphaned bit 0 : 1=Active / 0=Orphaned
bit1-n: refcount bits 1-n : refcount
meaning that mostly we ref by two to avoid altering the orphaned bit, - ref/unref by two to avoid altering the orphaned bit
and just unref by 1 when we're ready to flag the object as orphaned */ - To orphan, unref by 1 */
gpr_atm refst; gpr_atm refst;
gpr_mu mu; gpr_mu mu;
@ -84,12 +84,11 @@ struct grpc_fd {
/* Mutex protecting the 'polling_island' field */ /* Mutex protecting the 'polling_island' field */
gpr_mu pi_mu; gpr_mu pi_mu;
/* The polling island to which this fd belongs to. An fd belongs to exactly /* The polling island to which this fd belongs to.
one polling island */ * An fd belongs to exactly one polling island */
struct polling_island *polling_island; struct polling_island *polling_island;
struct grpc_fd *freelist_next; struct grpc_fd *freelist_next;
grpc_closure *on_done_closure; grpc_closure *on_done_closure;
grpc_iomgr_object iomgr_object; grpc_iomgr_object iomgr_object;
@ -141,7 +140,6 @@ typedef struct polling_island {
/* Polling islands that are no longer needed are kept in a freelist so that /* Polling islands that are no longer needed are kept in a freelist so that
they can be reused. This field points to the next polling island in the they can be reused. This field points to the next polling island in the
free list. Note that this is only used if the polling island is in the
free list */ free list */
struct polling_island *next_free; struct polling_island *next_free;
} polling_island; } polling_island;
@ -185,7 +183,7 @@ static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
* TODO: sreek - Might have to unref the fds (assuming whether we add a ref to * TODO: sreek - Might have to unref the fds (assuming whether we add a ref to
* the fd when adding it to the epollset) */ * the fd when adding it to the epollset) */
/* The caller is expected to hold pi->mu lock before calling this function */ /* The caller is expected to hold pi->mu lock before calling this function */
static void polling_island_clear_fds_locked(polling_island *pi) { static void polling_island_remove_all_fds_locked(polling_island *pi) {
int err; int err;
size_t i; size_t i;
@ -392,7 +390,7 @@ polling_island *polling_island_merge(polling_island *p, polling_island *q) {
// Move all the fds from polling_island p to polling_island q // Move all the fds from polling_island p to polling_island q
polling_island_add_fds_locked(q, p->fds, p->fd_cnt); polling_island_add_fds_locked(q, p->fds, p->fd_cnt);
polling_island_clear_fds_locked(p); polling_island_remove_all_fds_locked(p);
q->ref_cnt += p->ref_cnt; q->ref_cnt += p->ref_cnt;
@ -411,14 +409,7 @@ static void polling_island_global_init() {
* pollset declarations * pollset declarations
*/ */
typedef struct grpc_cached_wakeup_fd {
grpc_wakeup_fd fd;
struct grpc_cached_wakeup_fd *next;
} grpc_cached_wakeup_fd;
struct grpc_pollset_worker { struct grpc_pollset_worker {
grpc_cached_wakeup_fd *wakeup_fd;
int reevaluate_polling_on_wakeup;
int kicked_specifically; int kicked_specifically;
pthread_t pt_id; pthread_t pt_id;
struct grpc_pollset_worker *next; struct grpc_pollset_worker *next;
@ -441,9 +432,6 @@ struct grpc_pollset {
/* The polling island to which this fd belongs to. An fd belongs to exactly /* The polling island to which this fd belongs to. An fd belongs to exactly
one polling island */ one polling island */
struct polling_island *polling_island; struct polling_island *polling_island;
/* Local cache of eventfds for workers */
grpc_cached_wakeup_fd *local_wakeup_cache;
}; };
/* Add an fd to a pollset */ /* Add an fd to a pollset */
@ -465,8 +453,6 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
/* Allow kick to wakeup the currently polling worker */ /* Allow kick to wakeup the currently polling worker */
#define GRPC_POLLSET_CAN_KICK_SELF 1 #define GRPC_POLLSET_CAN_KICK_SELF 1
/* Force the wakee to repoll when awoken */
#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
/* As per pollset_kick, with an extended set of flags (defined above) /* As per pollset_kick, with an extended set of flags (defined above)
-- mostly for fd_posix's use. */ -- mostly for fd_posix's use. */
static void pollset_kick_ext(grpc_pollset *p, static void pollset_kick_ext(grpc_pollset *p,
@ -815,34 +801,25 @@ static void pollset_kick_ext(grpc_pollset *p,
if (specific_worker != NULL) { if (specific_worker != NULL) {
if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0); GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0);
GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
for (specific_worker = p->root_worker.next; for (specific_worker = p->root_worker.next;
specific_worker != &p->root_worker; specific_worker != &p->root_worker;
specific_worker = specific_worker->next) { specific_worker = specific_worker->next) {
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); pthread_kill(specific_worker->pt_id, SIGUSR1);
} }
p->kicked_without_pollers = 1; p->kicked_without_pollers = 1;
GPR_TIMER_END("pollset_kick_ext.broadcast", 0); GPR_TIMER_END("pollset_kick_ext.broadcast", 0);
} else if (gpr_tls_get(&g_current_thread_worker) != } else if (gpr_tls_get(&g_current_thread_worker) !=
(intptr_t)specific_worker) { (intptr_t)specific_worker) {
GPR_TIMER_MARK("different_thread_worker", 0); GPR_TIMER_MARK("different_thread_worker", 0);
if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
specific_worker->reevaluate_polling_on_wakeup = 1;
}
specific_worker->kicked_specifically = 1; specific_worker->kicked_specifically = 1;
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
/* TODO (sreek): Refactor this into a separate file*/ /* TODO (sreek): Refactor this into a separate file*/
pthread_kill(specific_worker->pt_id, SIGUSR1); pthread_kill(specific_worker->pt_id, SIGUSR1);
} else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
GPR_TIMER_MARK("kick_yoself", 0); GPR_TIMER_MARK("kick_yoself", 0);
if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
specific_worker->reevaluate_polling_on_wakeup = 1;
}
specific_worker->kicked_specifically = 1; specific_worker->kicked_specifically = 1;
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); pthread_kill(specific_worker->pt_id, SIGUSR1);
} }
} else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) { } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
GPR_TIMER_MARK("kick_anonymous", 0); GPR_TIMER_MARK("kick_anonymous", 0);
specific_worker = pop_front_worker(p); specific_worker = pop_front_worker(p);
if (specific_worker != NULL) { if (specific_worker != NULL) {
@ -860,7 +837,7 @@ static void pollset_kick_ext(grpc_pollset *p,
if (specific_worker != NULL) { if (specific_worker != NULL) {
GPR_TIMER_MARK("finally_kick", 0); GPR_TIMER_MARK("finally_kick", 0);
push_back_worker(p, specific_worker); push_back_worker(p, specific_worker);
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); pthread_kill(specific_worker->pt_id, SIGUSR1);
} }
} else { } else {
GPR_TIMER_MARK("kicked_no_pollers", 0); GPR_TIMER_MARK("kicked_no_pollers", 0);
@ -911,8 +888,6 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
pollset->shutting_down = 0; pollset->shutting_down = 0;
pollset->called_shutdown = 0; pollset->called_shutdown = 0;
pollset->kicked_without_pollers = 0; pollset->kicked_without_pollers = 0;
pollset->local_wakeup_cache = NULL;
pollset->kicked_without_pollers = 0;
multipoll_with_epoll_pollset_create_efd(pollset); multipoll_with_epoll_pollset_create_efd(pollset);
} }
@ -926,12 +901,6 @@ static void pollset_destroy(grpc_pollset *pollset) {
multipoll_with_epoll_pollset_destroy(pollset); multipoll_with_epoll_pollset_destroy(pollset);
while (pollset->local_wakeup_cache) {
grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
gpr_free(pollset->local_wakeup_cache);
pollset->local_wakeup_cache = next;
}
gpr_mu_destroy(&pollset->pi_mu); gpr_mu_destroy(&pollset->pi_mu);
gpr_mu_destroy(&pollset->mu); gpr_mu_destroy(&pollset->mu);
} }
@ -974,14 +943,6 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GPR_TIMER_BEGIN("pollset_work", 0); GPR_TIMER_BEGIN("pollset_work", 0);
/* this must happen before we (potentially) drop pollset->mu */ /* this must happen before we (potentially) drop pollset->mu */
worker.next = worker.prev = NULL; worker.next = worker.prev = NULL;
worker.reevaluate_polling_on_wakeup = 0;
if (pollset->local_wakeup_cache != NULL) {
worker.wakeup_fd = pollset->local_wakeup_cache;
pollset->local_wakeup_cache = worker.wakeup_fd->next;
} else {
worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd));
grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
}
worker.kicked_specifically = 0; worker.kicked_specifically = 0;
/* TODO(sreek): Abstract this thread id stuff out into a separate file */ /* TODO(sreek): Abstract this thread id stuff out into a separate file */
@ -1026,27 +987,12 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_mu_lock(&pollset->mu); gpr_mu_lock(&pollset->mu);
locked = 1; locked = 1;
} }
/* If we're forced to re-evaluate polling (via pollset_kick with
GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
a loop */
if (worker.reevaluate_polling_on_wakeup) {
worker.reevaluate_polling_on_wakeup = 0;
pollset->kicked_without_pollers = 0;
if (queued_work || worker.kicked_specifically) {
/* If there's queued work on the list, then set the deadline to be
immediate so we get back out of the polling loop quickly */
deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
}
keep_polling = 1;
}
} }
if (added_worker) { if (added_worker) {
remove_worker(pollset, &worker); remove_worker(pollset, &worker);
gpr_tls_set(&g_current_thread_worker, 0); gpr_tls_set(&g_current_thread_worker, 0);
} }
/* release wakeup fd to the local pool */
worker.wakeup_fd->next = pollset->local_wakeup_cache;
pollset->local_wakeup_cache = worker.wakeup_fd;
/* check shutdown conditions */ /* check shutdown conditions */
if (pollset->shutting_down) { if (pollset->shutting_down) {
if (pollset_has_workers(pollset)) { if (pollset_has_workers(pollset)) {
@ -1135,10 +1081,9 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
} }
} else if (fd->polling_island == NULL) { } else if (fd->polling_island == NULL) {
pi_new = polling_island_update_and_lock(pollset->polling_island, 1, 1); pi_new = polling_island_update_and_lock(pollset->polling_island, 1, 1);
} else if (pollset->polling_island == NULL) { } else if (pollset->polling_island == NULL) {
pi_new = polling_island_update_and_lock(fd->polling_island, 1, 1); pi_new = polling_island_update_and_lock(fd->polling_island, 1, 1);
} else { // Non null and different } else {
pi_new = polling_island_merge(fd->polling_island, pollset->polling_island); pi_new = polling_island_merge(fd->polling_island, pollset->polling_island);
} }
@ -1182,9 +1127,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
int epoll_fd = pollset->epoll_fd; int epoll_fd = pollset->epoll_fd;
int ep_rv; int ep_rv;
int poll_rv;
int timeout_ms; int timeout_ms;
struct pollfd pfds[2];
/* If you want to ignore epoll's ability to sanely handle parallel pollers, /* If you want to ignore epoll's ability to sanely handle parallel pollers,
* for a more apples-to-apples performance comparison with poll, add a * for a more apples-to-apples performance comparison with poll, add a
@ -1196,35 +1139,9 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
timeout_ms = poll_deadline_to_millis_timeout(deadline, now); timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
pfds[0].events = POLLIN;
pfds[0].revents = 0;
pfds[1].fd = epoll_fd;
pfds[1].events = POLLIN;
pfds[1].revents = 0;
/* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
even going into the blocking annotation if possible */
GPR_TIMER_BEGIN("poll", 0);
GRPC_SCHEDULING_START_BLOCKING_REGION;
poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
GRPC_SCHEDULING_END_BLOCKING_REGION;
GPR_TIMER_END("poll", 0);
if (poll_rv < 0) {
if (errno != EINTR) {
gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
}
} else if (poll_rv == 0) {
/* do nothing */
} else {
if (pfds[0].revents) {
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
}
if (pfds[1].revents) {
do { do {
/* The following epoll_wait never blocks; it has a timeout of 0 */ /* The following epoll_wait never blocks; it has a timeout of 0 */
ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms);
if (ep_rv < 0) { if (ep_rv < 0) {
if (errno != EINTR) { if (errno != EINTR) {
gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno)); gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
@ -1251,8 +1168,6 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
} }
} }
} while (ep_rv == GRPC_EPOLL_MAX_EVENTS); } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
}
}
} }
static void multipoll_with_epoll_pollset_finish_shutdown( static void multipoll_with_epoll_pollset_finish_shutdown(

Loading…
Cancel
Save