|
|
|
@ -701,12 +701,14 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
|
- Decrement the ref count on the polling island and det fd->polling_island |
|
|
|
|
to NULL */ |
|
|
|
|
gpr_mu_lock(&fd->pi_mu); |
|
|
|
|
|
|
|
|
|
fd->polling_island = polling_island_update_and_lock(fd->polling_island, 1, 0); |
|
|
|
|
polling_island_remove_fd_locked(fd->polling_island, fd, !fd->released, true); |
|
|
|
|
polling_island_unref_and_unlock(fd->polling_island, 1); |
|
|
|
|
fd->polling_island = NULL; |
|
|
|
|
|
|
|
|
|
if (fd->polling_island != NULL) { |
|
|
|
|
fd->polling_island = |
|
|
|
|
polling_island_update_and_lock(fd->polling_island, 1, 0); |
|
|
|
|
polling_island_remove_fd_locked(fd->polling_island, fd, !fd->released, |
|
|
|
|
true); |
|
|
|
|
polling_island_unref_and_unlock(fd->polling_island, 1); |
|
|
|
|
fd->polling_island = NULL; |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&fd->pi_mu); |
|
|
|
|
|
|
|
|
|
grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL); |
|
|
|
@ -926,13 +928,12 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { |
|
|
|
|
set_ready(exec_ctx, fd, &fd->write_closure); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* TODO(klempner): We probably want to turn this down a bit */ |
|
|
|
|
#define GRPC_EPOLL_MAX_EVENTS 1000 |
|
|
|
|
static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset *pollset, int timeout_ms, |
|
|
|
|
sigset_t *sig_mask) { |
|
|
|
|
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; |
|
|
|
|
int epoll_fd; |
|
|
|
|
int epoll_fd = -1; |
|
|
|
|
int ep_rv; |
|
|
|
|
GPR_TIMER_BEGIN("pollset_work_and_unlock", 0); |
|
|
|
|
|
|
|
|
@ -943,45 +944,49 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
- pollset->pi_mu |
|
|
|
|
- pollset->polling_island->mu */ |
|
|
|
|
gpr_mu_lock(&pollset->pi_mu); |
|
|
|
|
pollset->polling_island = |
|
|
|
|
polling_island_update_and_lock(pollset->polling_island, 1, 0); |
|
|
|
|
|
|
|
|
|
epoll_fd = pollset->polling_island->epoll_fd; |
|
|
|
|
if (pollset->polling_island != NULL) { |
|
|
|
|
pollset->polling_island = |
|
|
|
|
polling_island_update_and_lock(pollset->polling_island, 1, 0); |
|
|
|
|
epoll_fd = pollset->polling_island->epoll_fd; |
|
|
|
|
gpr_mu_unlock(&pollset->polling_island->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Release the locks */ |
|
|
|
|
polling_island_unref_and_unlock(pollset->polling_island, 0); /* Keep the ref*/ |
|
|
|
|
gpr_mu_unlock(&pollset->pi_mu); |
|
|
|
|
gpr_mu_unlock(&pollset->mu); |
|
|
|
|
|
|
|
|
|
do { |
|
|
|
|
ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, |
|
|
|
|
sig_mask); |
|
|
|
|
/* If epoll_fd == -1, this is a blank pollset and does not have any fds yet */ |
|
|
|
|
if (epoll_fd != -1) { |
|
|
|
|
do { |
|
|
|
|
ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, |
|
|
|
|
sig_mask); |
|
|
|
|
|
|
|
|
|
if (ep_rv < 0) { |
|
|
|
|
if (errno != EINTR) { |
|
|
|
|
/* TODO (sreek) - Check for bad file descriptor error */ |
|
|
|
|
gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno)); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
int i; |
|
|
|
|
for (i = 0; i < ep_rv; ++i) { |
|
|
|
|
grpc_fd *fd = ep_ev[i].data.ptr; |
|
|
|
|
int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); |
|
|
|
|
int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); |
|
|
|
|
int write_ev = ep_ev[i].events & EPOLLOUT; |
|
|
|
|
if (fd == NULL) { |
|
|
|
|
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); |
|
|
|
|
} else { |
|
|
|
|
if (read_ev || cancel) { |
|
|
|
|
fd_become_readable(exec_ctx, fd); |
|
|
|
|
} |
|
|
|
|
if (write_ev || cancel) { |
|
|
|
|
fd_become_writable(exec_ctx, fd); |
|
|
|
|
if (ep_rv < 0) { |
|
|
|
|
if (errno != EINTR) { |
|
|
|
|
/* TODO (sreek) - Check for bad file descriptor error */ |
|
|
|
|
gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno)); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
int i; |
|
|
|
|
for (i = 0; i < ep_rv; ++i) { |
|
|
|
|
grpc_fd *fd = ep_ev[i].data.ptr; |
|
|
|
|
int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); |
|
|
|
|
int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); |
|
|
|
|
int write_ev = ep_ev[i].events & EPOLLOUT; |
|
|
|
|
if (fd == NULL) { |
|
|
|
|
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); |
|
|
|
|
} else { |
|
|
|
|
if (read_ev || cancel) { |
|
|
|
|
fd_become_readable(exec_ctx, fd); |
|
|
|
|
} |
|
|
|
|
if (write_ev || cancel) { |
|
|
|
|
fd_become_writable(exec_ctx, fd); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} while (ep_rv == GRPC_EPOLL_MAX_EVENTS); |
|
|
|
|
} while (ep_rv == GRPC_EPOLL_MAX_EVENTS); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GPR_TIMER_END("pollset_work_and_unlock", 0); |
|
|
|
|
} |
|
|
|
@ -1141,8 +1146,10 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
} |
|
|
|
|
} else if (fd->polling_island == NULL) { |
|
|
|
|
pi_new = polling_island_update_and_lock(pollset->polling_island, 1, 1); |
|
|
|
|
gpr_mu_unlock(&pi_new->mu); |
|
|
|
|
} else if (pollset->polling_island == NULL) { |
|
|
|
|
pi_new = polling_island_update_and_lock(fd->polling_island, 1, 1); |
|
|
|
|
gpr_mu_unlock(&pi_new->mu); |
|
|
|
|
} else { |
|
|
|
|
pi_new = polling_island_merge(fd->polling_island, pollset->polling_island); |
|
|
|
|
} |
|
|
|
|