Do not close epoll_fd while there are any pollers and add the ability to

wake up all pollers when an island is merged
pull/6803/head
Sree Kuchibhotla 9 years ago
parent 5855c478c6
commit 24b1062f42
  1. 113
      src/core/lib/iomgr/ev_epoll_linux.c

@ -190,9 +190,18 @@ struct grpc_pollset_set {
};
/*******************************************************************************
* Polling-island Definitions
* Polling island Definitions
*/
/* The wakeup fd that is used to wake up all threads in a Polling island. This
is useful in the polling island merge operation where we need to wakeup all
the threads currently polling the smaller polling island (so that they can
start polling the new/merged polling island)
NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
static grpc_wakeup_fd polling_island_wakeup_fd;
/* Polling island freelist */
static gpr_mu g_pi_freelist_mu;
static polling_island *g_pi_freelist = NULL;
@ -232,6 +241,25 @@ static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
}
}
/* The caller is expected to hold pi->mu before calling this */
static void polling_island_add_wakeup_fd_locked(polling_island *pi,
grpc_wakeup_fd *wakeup_fd) {
struct epoll_event ev;
int err;
ev.events = (uint32_t)(EPOLLIN | EPOLLET);
ev.data.ptr = wakeup_fd;
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
if (err < 0) {
gpr_log(GPR_ERROR,
"Failed to add grpc_wake_up_fd (%d) to the epoll set (epoll_fd: %d)"
". Error: %s",
GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), pi->epoll_fd,
strerror(errno));
}
}
/* The caller is expected to hold pi->mu lock before calling this function */
static void polling_island_remove_all_fds_locked(polling_island *pi,
bool remove_fd_refs) {
@ -283,8 +311,6 @@ static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
static polling_island *polling_island_create(grpc_fd *initial_fd,
int initial_ref_cnt) {
polling_island *pi = NULL;
struct epoll_event ev;
int err;
/* Try to get one from the polling island freelist */
gpr_mu_lock(&g_pi_freelist_mu);
@ -311,17 +337,7 @@ static polling_island *polling_island_create(grpc_fd *initial_fd,
}
GPR_ASSERT(pi->epoll_fd >= 0);
ev.events = (uint32_t)(EPOLLIN | EPOLLET);
ev.data.ptr = NULL;
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev);
if (err < 0) {
gpr_log(GPR_ERROR,
"Failed to add grpc_global_wake_up_fd (%d) to the epoll set "
"(epoll_fd: %d) with error: %s",
GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), pi->epoll_fd,
strerror(errno));
}
polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd);
pi->ref_cnt = initial_ref_cnt;
pi->merged_to = NULL;
@ -496,13 +512,15 @@ polling_island *polling_island_merge(polling_island *p, polling_island *q) {
GPR_SWAP(polling_island *, p, q);
}
/* "Merge" p with q i.e move all the fds from p (the polling_island with fewer
fds) to q.
Note: Not altering the ref counts on the affected fds here because they
would effectively remain unchanged */
/* "Merge" p with q i.e move all the fds from p (The one with fewer fds) to q
)Note that the refcounts on the fds being moved will not change here. This
is why the last parameter in the following two functions is 'false') */
polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false);
polling_island_remove_all_fds_locked(p, false);
/* Wakeup all the pollers (if any) on p so that they can pickup this change */
polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd);
/* The merged polling island inherits all the ref counts of the island merging
with it */
q->ref_cnt += p->ref_cnt;
@ -516,6 +534,8 @@ polling_island *polling_island_merge(polling_island *p, polling_island *q) {
static void polling_island_global_init() {
gpr_mu_init(&g_pi_freelist_mu);
g_pi_freelist = NULL;
grpc_wakeup_fd_init(&polling_island_wakeup_fd);
grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
}
static void polling_island_global_shutdown() {
@ -529,8 +549,9 @@ static void polling_island_global_shutdown() {
gpr_free(g_pi_freelist);
g_pi_freelist = next;
}
gpr_mu_destroy(&g_pi_freelist_mu);
grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
}
/*******************************************************************************
@ -973,6 +994,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
int epoll_fd = -1;
int ep_rv;
polling_island *pi = NULL;
GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
/* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
@ -983,13 +1005,19 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
- pollset->polling_island->mu */
gpr_mu_lock(&pollset->pi_mu);
if (pollset->polling_island == NULL) {
pollset->polling_island = polling_island_create(NULL, 1);
pi = pollset->polling_island;
if (pi == NULL) {
pi = polling_island_create(NULL, 1);
}
pollset->polling_island =
polling_island_update_and_lock(pollset->polling_island, 1, 0);
epoll_fd = pollset->polling_island->epoll_fd;
/* In addition to locking the polling island, add a ref so that the island
does not get destroyed (which means the epoll_fd won't be closed) while
we are are doing an epoll_wait() on the epoll_fd */
pi = polling_island_update_and_lock(pi, 1, 1);
epoll_fd = pi->epoll_fd;
/* Update the pollset->polling_island */
pollset->polling_island = pi;
#ifdef GRPC_EPOLL_DEBUG
if (pollset->polling_island->fd_cnt == 0) {
@ -1013,25 +1041,29 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
sig_mask);
if (ep_rv < 0) {
if (errno != EINTR) {
/* TODO (sreek) - Do not log an error in case of bad file descriptor
* (A bad file descriptor here would just mean that the epoll set was
* merged with another epoll set and that the current epoll_fd is
* closed) */
gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno));
} else {
/* We were interrupted. Save an interation by doing a zero timeout
epoll_wait to see if there are any other events of interest */
ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
}
}
int i;
for (i = 0; i < ep_rv; ++i) {
grpc_fd *fd = ep_ev[i].data.ptr;
int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
int write_ev = ep_ev[i].events & EPOLLOUT;
if (fd == NULL) {
void *data_ptr = ep_ev[i].data.ptr;
if (data_ptr == &grpc_global_wakeup_fd) {
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
} else if (data_ptr == &polling_island_wakeup_fd) {
/* This means that our polling island is merged with a different
island. We do not have to do anything here since the subsequent call
to the function pollset_work_and_unlock() will pick up the correct
epoll_fd */
} else {
grpc_fd *fd = data_ptr;
int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
int write_ev = ep_ev[i].events & EPOLLOUT;
if (read_ev || cancel) {
fd_become_readable(exec_ctx, fd, pollset);
}
@ -1041,6 +1073,21 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
}
}
} while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
GPR_ASSERT(pi != NULL);
/* Before leaving, release the extra ref we added to the polling island */
/* It is important to note that at this point 'pi' may not be the same as
* pollset->polling_island. This is because pollset->polling_island pointer
* gets updated whenever the underlying polling island is merged with another
* island and while we are doing epoll_wait() above, the polling island may
* have been merged */
/* TODO (sreek) - Change the ref count on polling island to gpr_atm so that
* we do not have to do this here */
gpr_mu_lock(&pi->mu);
polling_island_unref_and_unlock(pi, 1);
GPR_TIMER_END("pollset_work_and_unlock", 0);
}

Loading…
Cancel
Save