Remove from all epoll sets when releasing an fd

pull/4642/head
yang-g 9 years ago
parent c75e7de6cd
commit 8fefe37693
  1. 23
      src/core/iomgr/fd_posix.c
  2. 65
      src/core/iomgr/pollset_multipoller_with_epoll.c
  3. 2
      src/core/iomgr/pollset_posix.h

@ -101,6 +101,7 @@ static grpc_fd *alloc_fd(int fd) {
r->read_watcher = r->write_watcher = NULL;
r->on_done_closure = NULL;
r->closed = 0;
r->released = 0;
return r;
}
@ -210,6 +211,16 @@ static int has_watchers(grpc_fd *fd) {
fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
}
static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
fd->closed = 1;
if (!fd->released) {
close(fd->fd);
} else {
grpc_remove_fd_from_all_epoll_sets(fd->fd);
}
grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1);
}
void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
int *release_fd, const char *reason) {
fd->on_done_closure = on_done;
@ -222,11 +233,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
gpr_mu_lock(&fd->mu);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
if (!has_watchers(fd)) {
fd->closed = 1;
if (!fd->released) {
close(fd->fd);
}
grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1);
close_fd_locked(exec_ctx, fd);
} else {
wake_all_watchers_locked(fd);
}
@ -416,11 +423,7 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
maybe_wake_one_watcher_locked(fd);
}
if (grpc_fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
fd->closed = 1;
if (!fd->released) {
close(fd->fd);
}
grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1);
close_fd_locked(exec_ctx, fd);
}
gpr_mu_unlock(&fd->mu);

@ -43,9 +43,66 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
#include "src/core/iomgr/fd_posix.h"
#include "src/core/support/block_annotate.h"
#include "src/core/profiling/timers.h"
#include "src/core/support/block_annotate.h"
struct epoll_fd_list {
int *epoll_fds;
size_t count;
size_t capacity;
};
static struct epoll_fd_list epoll_fd_global_list;
static gpr_once init_epoll_fd_list_mu = GPR_ONCE_INIT;
static gpr_mu epoll_fd_list_mu;
static void init_mu(void) { gpr_mu_init(&epoll_fd_list_mu); }
static void add_epoll_fd_to_global_list(int epoll_fd) {
gpr_once_init(&init_epoll_fd_list_mu, init_mu);
gpr_mu_lock(&epoll_fd_list_mu);
if (epoll_fd_global_list.count == epoll_fd_global_list.capacity) {
epoll_fd_global_list.capacity =
GPR_MAX((size_t)8, epoll_fd_global_list.capacity * 2);
epoll_fd_global_list.epoll_fds =
gpr_realloc(epoll_fd_global_list.epoll_fds,
epoll_fd_global_list.capacity * sizeof(int));
}
epoll_fd_global_list.epoll_fds[epoll_fd_global_list.count++] = epoll_fd;
gpr_mu_unlock(&epoll_fd_list_mu);
}
static void remove_epoll_fd_from_global_list(int epoll_fd) {
gpr_mu_lock(&epoll_fd_list_mu);
GPR_ASSERT(epoll_fd_global_list.count > 0);
for (size_t i = 0; i < epoll_fd_global_list.count; i++) {
if (epoll_fd == epoll_fd_global_list.epoll_fds[i]) {
epoll_fd_global_list.epoll_fds[i] =
epoll_fd_global_list.epoll_fds[--(epoll_fd_global_list.count)];
break;
}
}
gpr_mu_unlock(&epoll_fd_list_mu);
}
void grpc_remove_fd_from_all_epoll_sets(int fd) {
int err;
gpr_mu_lock(&epoll_fd_list_mu);
if (epoll_fd_global_list.count == 0) {
return;
}
for (size_t i = 0; i < epoll_fd_global_list.count; i++) {
err = epoll_ctl(epoll_fd_global_list.epoll_fds[i], EPOLL_CTL_DEL, fd, NULL);
if (err < 0 && errno != ENOENT) {
gpr_log(GPR_ERROR, "epoll_ctl del for %d failed: %s", fd,
strerror(errno));
}
}
gpr_mu_unlock(&epoll_fd_list_mu);
}
typedef struct {
grpc_pollset *pollset;
@ -211,6 +268,7 @@ static void multipoll_with_epoll_pollset_finish_shutdown(
static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
pollset_hdr *h = pollset->data.ptr;
close(h->epoll_fd);
remove_epoll_fd_from_global_list(h->epoll_fd);
gpr_free(h);
}
@ -236,6 +294,7 @@ static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno));
abort();
}
add_epoll_fd_to_global_list(h->epoll_fd);
ev.events = (uint32_t)(EPOLLIN | EPOLLET);
ev.data.ptr = NULL;
@ -255,4 +314,8 @@ static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx,
grpc_platform_become_multipoller_type grpc_platform_become_multipoller =
epoll_become_multipoller;
#else /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */
void grpc_remove_fd_from_all_epoll_sets(int fd) {}
#endif /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */

@ -139,6 +139,8 @@ void grpc_poll_become_multipoller(grpc_exec_ctx *exec_ctx,
* be locked) */
int grpc_pollset_has_workers(grpc_pollset *pollset);
void grpc_remove_fd_from_all_epoll_sets(int fd);
/* override to allow tests to hook poll() usage */
typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;

Loading…
Cancel
Save