|
|
|
@ -185,11 +185,6 @@ struct grpc_pollset_worker { |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
struct grpc_pollset { |
|
|
|
|
/* pollsets under posix can mutate representation as fds are added and
|
|
|
|
|
removed. |
|
|
|
|
For example, we may choose a poll() based implementation on linux for |
|
|
|
|
few fds, and an epoll() based implementation for many fds */ |
|
|
|
|
const grpc_pollset_vtable *vtable; |
|
|
|
|
gpr_mu mu; |
|
|
|
|
grpc_pollset_worker root_worker; |
|
|
|
|
int in_flight_cbs; |
|
|
|
@ -248,8 +243,6 @@ typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset *pollset, |
|
|
|
|
struct grpc_fd **fds, |
|
|
|
|
size_t fd_count); |
|
|
|
|
static platform_become_multipoller_type platform_become_multipoller; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* Return 1 if the pollset has active threads in pollset_work (pollset must
|
|
|
|
|
* be locked) */ |
|
|
|
@ -796,8 +789,6 @@ static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } |
|
|
|
|
|
|
|
|
|
/* main interface */ |
|
|
|
|
|
|
|
|
|
static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null); |
|
|
|
|
|
|
|
|
|
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { |
|
|
|
|
gpr_mu_init(&pollset->mu); |
|
|
|
|
*mu = &pollset->mu; |
|
|
|
@ -809,14 +800,20 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { |
|
|
|
|
pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL; |
|
|
|
|
pollset->local_wakeup_cache = NULL; |
|
|
|
|
pollset->kicked_without_pollers = 0; |
|
|
|
|
become_basic_pollset(pollset, NULL); |
|
|
|
|
pollset->data.ptr = NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* TODO(sreek): Maybe merge multipoll_*_destroy() with pollset_destroy()
|
|
|
|
|
* function */ |
|
|
|
|
static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset); |
|
|
|
|
|
|
|
|
|
static void pollset_destroy(grpc_pollset *pollset) { |
|
|
|
|
GPR_ASSERT(pollset->in_flight_cbs == 0); |
|
|
|
|
GPR_ASSERT(!pollset_has_workers(pollset)); |
|
|
|
|
GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); |
|
|
|
|
pollset->vtable->destroy(pollset); |
|
|
|
|
|
|
|
|
|
multipoll_with_epoll_pollset_destroy(pollset); |
|
|
|
|
|
|
|
|
|
while (pollset->local_wakeup_cache) { |
|
|
|
|
grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next; |
|
|
|
|
grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd); |
|
|
|
@ -831,17 +828,24 @@ static void pollset_reset(grpc_pollset *pollset) { |
|
|
|
|
GPR_ASSERT(pollset->in_flight_cbs == 0); |
|
|
|
|
GPR_ASSERT(!pollset_has_workers(pollset)); |
|
|
|
|
GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); |
|
|
|
|
pollset->vtable->destroy(pollset); |
|
|
|
|
|
|
|
|
|
multipoll_with_epoll_pollset_destroy(pollset); |
|
|
|
|
|
|
|
|
|
pollset->shutting_down = 0; |
|
|
|
|
pollset->called_shutdown = 0; |
|
|
|
|
pollset->kicked_without_pollers = 0; |
|
|
|
|
become_basic_pollset(pollset, NULL); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* TODO (sreek): Remove multipoll_with_epoll_add_fd declaration*/ |
|
|
|
|
static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset *pollset, |
|
|
|
|
grpc_fd *fd, |
|
|
|
|
int and_unlock_pollset); |
|
|
|
|
|
|
|
|
|
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
grpc_fd *fd) { |
|
|
|
|
gpr_mu_lock(&pollset->mu); |
|
|
|
|
pollset->vtable->add_fd(exec_ctx, pollset, fd, 1); |
|
|
|
|
multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fd, 1); |
|
|
|
|
/* the following (enabled only in debug) will reacquire and then release
|
|
|
|
|
our lock - meaning that if the unlocking flag passed to add_fd above is |
|
|
|
|
not respected, the code will deadlock (in a way that we have a chance of |
|
|
|
@ -852,12 +856,21 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
#endif |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* TODO (sreek): Remove multipoll_with_epoll_finish_shutdown() declaration */ |
|
|
|
|
static void multipoll_with_epoll_pollset_finish_shutdown(grpc_pollset *pollset); |
|
|
|
|
|
|
|
|
|
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { |
|
|
|
|
GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs)); |
|
|
|
|
pollset->vtable->finish_shutdown(pollset); |
|
|
|
|
multipoll_with_epoll_pollset_finish_shutdown(pollset); |
|
|
|
|
grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* TODO(sreek): Remove multipoll_with_epoll_*_maybe_work_and_unlock declaration
|
|
|
|
|
*/ |
|
|
|
|
static void multipoll_with_epoll_pollset_maybe_work_and_unlock( |
|
|
|
|
grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, |
|
|
|
|
gpr_timespec deadline, gpr_timespec now); |
|
|
|
|
|
|
|
|
|
static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
grpc_pollset_worker **worker_hdl, gpr_timespec now, |
|
|
|
|
gpr_timespec deadline) { |
|
|
|
@ -915,8 +928,10 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
} |
|
|
|
|
gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); |
|
|
|
|
GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); |
|
|
|
|
pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, |
|
|
|
|
deadline, now); |
|
|
|
|
|
|
|
|
|
multipoll_with_epoll_pollset_maybe_work_and_unlock( |
|
|
|
|
exec_ctx, pollset, &worker, deadline, now); |
|
|
|
|
|
|
|
|
|
GPR_TIMER_END("maybe_work_and_unlock", 0); |
|
|
|
|
locked = 0; |
|
|
|
|
gpr_tls_set(&g_current_thread_poller, 0); |
|
|
|
@ -1013,233 +1028,6 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, |
|
|
|
|
timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* basic_pollset - a vtable that provides polling for zero or one file |
|
|
|
|
* descriptor via poll() |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
typedef struct grpc_unary_promote_args { |
|
|
|
|
const grpc_pollset_vtable *original_vtable; |
|
|
|
|
grpc_pollset *pollset; |
|
|
|
|
grpc_fd *fd; |
|
|
|
|
grpc_closure promotion_closure; |
|
|
|
|
} grpc_unary_promote_args; |
|
|
|
|
|
|
|
|
|
static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, |
|
|
|
|
bool success) { |
|
|
|
|
grpc_unary_promote_args *up_args = args; |
|
|
|
|
const grpc_pollset_vtable *original_vtable = up_args->original_vtable; |
|
|
|
|
grpc_pollset *pollset = up_args->pollset; |
|
|
|
|
grpc_fd *fd = up_args->fd; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* This is quite tricky. There are a number of cases to keep in mind here: |
|
|
|
|
* 1. fd may have been orphaned |
|
|
|
|
* 2. The pollset may no longer be a unary poller (and we can't let case #1 |
|
|
|
|
* leak to other pollset types!) |
|
|
|
|
* 3. pollset's fd (which may have changed) may have been orphaned |
|
|
|
|
* 4. The pollset may be shutting down. |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&pollset->mu); |
|
|
|
|
/* First we need to ensure that nobody is polling concurrently */ |
|
|
|
|
GPR_ASSERT(!pollset_has_workers(pollset)); |
|
|
|
|
|
|
|
|
|
gpr_free(up_args); |
|
|
|
|
/* At this point the pollset may no longer be a unary poller. In that case
|
|
|
|
|
* we should just call the right add function and be done. */ |
|
|
|
|
/* TODO(klempner): If we're not careful this could cause infinite recursion.
|
|
|
|
|
* That's not a problem for now because empty_pollset has a trivial poller |
|
|
|
|
* and we don't have any mechanism to unbecome multipoller. */ |
|
|
|
|
pollset->in_flight_cbs--; |
|
|
|
|
if (pollset->shutting_down) { |
|
|
|
|
/* We don't care about this pollset anymore. */ |
|
|
|
|
if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) { |
|
|
|
|
pollset->called_shutdown = 1; |
|
|
|
|
finish_shutdown(exec_ctx, pollset); |
|
|
|
|
} |
|
|
|
|
} else if (fd_is_orphaned(fd)) { |
|
|
|
|
/* Don't try to add it to anything, we'll drop our ref on it below */ |
|
|
|
|
} else if (pollset->vtable != original_vtable) { |
|
|
|
|
pollset->vtable->add_fd(exec_ctx, pollset, fd, 0); |
|
|
|
|
} else if (fd != pollset->data.ptr) { |
|
|
|
|
grpc_fd *fds[2]; |
|
|
|
|
fds[0] = pollset->data.ptr; |
|
|
|
|
fds[1] = fd; |
|
|
|
|
|
|
|
|
|
if (fds[0] && !fd_is_orphaned(fds[0])) { |
|
|
|
|
platform_become_multipoller(exec_ctx, pollset, fds, GPR_ARRAY_SIZE(fds)); |
|
|
|
|
GRPC_FD_UNREF(fds[0], "basicpoll"); |
|
|
|
|
} else { |
|
|
|
|
/* old fd is orphaned and we haven't cleaned it up until now, so remain a
|
|
|
|
|
* unary poller */ |
|
|
|
|
/* Note that it is possible that fds[1] is also orphaned at this point.
|
|
|
|
|
* That's okay, we'll correct it at the next add or poll. */ |
|
|
|
|
if (fds[0]) GRPC_FD_UNREF(fds[0], "basicpoll"); |
|
|
|
|
pollset->data.ptr = fd; |
|
|
|
|
GRPC_FD_REF(fd, "basicpoll"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&pollset->mu); |
|
|
|
|
|
|
|
|
|
/* Matching ref in basic_pollset_add_fd */ |
|
|
|
|
GRPC_FD_UNREF(fd, "basicpoll_add"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
grpc_fd *fd, int and_unlock_pollset) { |
|
|
|
|
grpc_unary_promote_args *up_args; |
|
|
|
|
GPR_ASSERT(fd); |
|
|
|
|
if (fd == pollset->data.ptr) goto exit; |
|
|
|
|
|
|
|
|
|
if (!pollset_has_workers(pollset)) { |
|
|
|
|
/* Fast path -- no in flight cbs */ |
|
|
|
|
/* TODO(klempner): Comment this out and fix any test failures or establish
|
|
|
|
|
* they are due to timing issues */ |
|
|
|
|
grpc_fd *fds[2]; |
|
|
|
|
fds[0] = pollset->data.ptr; |
|
|
|
|
fds[1] = fd; |
|
|
|
|
|
|
|
|
|
if (fds[0] == NULL) { |
|
|
|
|
pollset->data.ptr = fd; |
|
|
|
|
GRPC_FD_REF(fd, "basicpoll"); |
|
|
|
|
} else if (!fd_is_orphaned(fds[0])) { |
|
|
|
|
platform_become_multipoller(exec_ctx, pollset, fds, GPR_ARRAY_SIZE(fds)); |
|
|
|
|
GRPC_FD_UNREF(fds[0], "basicpoll"); |
|
|
|
|
} else { |
|
|
|
|
/* old fd is orphaned and we haven't cleaned it up until now, so remain a
|
|
|
|
|
* unary poller */ |
|
|
|
|
GRPC_FD_UNREF(fds[0], "basicpoll"); |
|
|
|
|
pollset->data.ptr = fd; |
|
|
|
|
GRPC_FD_REF(fd, "basicpoll"); |
|
|
|
|
} |
|
|
|
|
goto exit; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Now we need to promote. This needs to happen when we're not polling. Since
|
|
|
|
|
* this may be called from poll, the wait needs to happen asynchronously. */ |
|
|
|
|
GRPC_FD_REF(fd, "basicpoll_add"); |
|
|
|
|
pollset->in_flight_cbs++; |
|
|
|
|
up_args = gpr_malloc(sizeof(*up_args)); |
|
|
|
|
up_args->fd = fd; |
|
|
|
|
up_args->original_vtable = pollset->vtable; |
|
|
|
|
up_args->pollset = pollset; |
|
|
|
|
up_args->promotion_closure.cb = basic_do_promote; |
|
|
|
|
up_args->promotion_closure.cb_arg = up_args; |
|
|
|
|
|
|
|
|
|
grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1); |
|
|
|
|
pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); |
|
|
|
|
|
|
|
|
|
exit: |
|
|
|
|
if (and_unlock_pollset) { |
|
|
|
|
gpr_mu_unlock(&pollset->mu); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset *pollset, |
|
|
|
|
grpc_pollset_worker *worker, |
|
|
|
|
gpr_timespec deadline, |
|
|
|
|
gpr_timespec now) { |
|
|
|
|
#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) |
|
|
|
|
#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) |
|
|
|
|
|
|
|
|
|
struct pollfd pfd[3]; |
|
|
|
|
grpc_fd *fd; |
|
|
|
|
grpc_fd_watcher fd_watcher; |
|
|
|
|
int timeout; |
|
|
|
|
int r; |
|
|
|
|
nfds_t nfds; |
|
|
|
|
|
|
|
|
|
fd = pollset->data.ptr; |
|
|
|
|
if (fd && fd_is_orphaned(fd)) { |
|
|
|
|
GRPC_FD_UNREF(fd, "basicpoll"); |
|
|
|
|
fd = pollset->data.ptr = NULL; |
|
|
|
|
} |
|
|
|
|
timeout = poll_deadline_to_millis_timeout(deadline, now); |
|
|
|
|
pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); |
|
|
|
|
pfd[0].events = POLLIN; |
|
|
|
|
pfd[0].revents = 0; |
|
|
|
|
pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); |
|
|
|
|
pfd[1].events = POLLIN; |
|
|
|
|
pfd[1].revents = 0; |
|
|
|
|
nfds = 2; |
|
|
|
|
if (fd) { |
|
|
|
|
pfd[2].fd = fd->fd; |
|
|
|
|
pfd[2].revents = 0; |
|
|
|
|
GRPC_FD_REF(fd, "basicpoll_begin"); |
|
|
|
|
gpr_mu_unlock(&pollset->mu); |
|
|
|
|
pfd[2].events = |
|
|
|
|
(short)fd_begin_poll(fd, pollset, worker, POLLIN, POLLOUT, &fd_watcher); |
|
|
|
|
if (pfd[2].events != 0) { |
|
|
|
|
nfds++; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
gpr_mu_unlock(&pollset->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
|
|
|
|
|
even going into the blocking annotation if possible */ |
|
|
|
|
/* poll fd count (argument 2) is shortened by one if we have no events
|
|
|
|
|
to poll on - such that it only includes the kicker */ |
|
|
|
|
GPR_TIMER_BEGIN("poll", 0); |
|
|
|
|
GRPC_SCHEDULING_START_BLOCKING_REGION; |
|
|
|
|
r = grpc_poll_function(pfd, nfds, timeout); |
|
|
|
|
GRPC_SCHEDULING_END_BLOCKING_REGION; |
|
|
|
|
GPR_TIMER_END("poll", 0); |
|
|
|
|
|
|
|
|
|
if (r < 0) { |
|
|
|
|
if (errno != EINTR) { |
|
|
|
|
gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); |
|
|
|
|
} |
|
|
|
|
if (fd) { |
|
|
|
|
fd_end_poll(exec_ctx, &fd_watcher, 0, 0); |
|
|
|
|
} |
|
|
|
|
} else if (r == 0) { |
|
|
|
|
if (fd) { |
|
|
|
|
fd_end_poll(exec_ctx, &fd_watcher, 0, 0); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
if (pfd[0].revents & POLLIN_CHECK) { |
|
|
|
|
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); |
|
|
|
|
} |
|
|
|
|
if (pfd[1].revents & POLLIN_CHECK) { |
|
|
|
|
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); |
|
|
|
|
} |
|
|
|
|
if (nfds > 2) { |
|
|
|
|
fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, |
|
|
|
|
pfd[2].revents & POLLOUT_CHECK); |
|
|
|
|
} else if (fd) { |
|
|
|
|
fd_end_poll(exec_ctx, &fd_watcher, 0, 0); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (fd) { |
|
|
|
|
GRPC_FD_UNREF(fd, "basicpoll_begin"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void basic_pollset_destroy(grpc_pollset *pollset) { |
|
|
|
|
if (pollset->data.ptr != NULL) { |
|
|
|
|
GRPC_FD_UNREF(pollset->data.ptr, "basicpoll"); |
|
|
|
|
pollset->data.ptr = NULL; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static const grpc_pollset_vtable basic_pollset = { |
|
|
|
|
basic_pollset_add_fd, basic_pollset_maybe_work_and_unlock, |
|
|
|
|
basic_pollset_destroy, basic_pollset_destroy}; |
|
|
|
|
|
|
|
|
|
static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) { |
|
|
|
|
pollset->vtable = &basic_pollset; |
|
|
|
|
pollset->data.ptr = fd_or_null; |
|
|
|
|
if (fd_or_null != NULL) { |
|
|
|
|
GRPC_FD_REF(fd_or_null, "basicpoll"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/*******************************************************************************
|
|
|
|
|
* pollset_multipoller_with_epoll_posix.c |
|
|
|
|
*/ |
|
|
|
@ -1274,6 +1062,7 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { |
|
|
|
|
set_ready(exec_ctx, fd, &fd->write_closure); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* TODO (sreek): Maybe this global list is not required. Double check*/ |
|
|
|
|
struct epoll_fd_list { |
|
|
|
|
int *epoll_fds; |
|
|
|
|
size_t count; |
|
|
|
@ -1390,10 +1179,48 @@ static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
gpr_free(da); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Creates an epoll fd and initializes the pollset */ |
|
|
|
|
static void multipoll_with_epoll_pollset_create_efd(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset *pollset) { |
|
|
|
|
epoll_hdr *h = gpr_malloc(sizeof(epoll_hdr)); |
|
|
|
|
struct epoll_event ev; |
|
|
|
|
int err; |
|
|
|
|
|
|
|
|
|
/* Ensuring that the pollset is infact empty (with no epoll fd either) */ |
|
|
|
|
GPR_ASSERT(pollset->data.ptr == NULL); |
|
|
|
|
|
|
|
|
|
pollset->data.ptr = h; |
|
|
|
|
h->epoll_fd = epoll_create1(EPOLL_CLOEXEC); |
|
|
|
|
if (h->epoll_fd < 0) { |
|
|
|
|
gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno)); |
|
|
|
|
abort(); |
|
|
|
|
} |
|
|
|
|
add_epoll_fd_to_global_list(h->epoll_fd); |
|
|
|
|
|
|
|
|
|
ev.events = (uint32_t)(EPOLLIN | EPOLLET); |
|
|
|
|
ev.data.ptr = NULL; |
|
|
|
|
|
|
|
|
|
/* TODO (sreek): Double-check the use of grpc_global_wakeup_fd here (right now
|
|
|
|
|
* I do not know why this is used. I just copied this code from |
|
|
|
|
* epoll_become_mutipoller() function in ev_poll_and_epoll_posix.c file */ |
|
|
|
|
err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, |
|
|
|
|
GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev); |
|
|
|
|
if (err < 0) { |
|
|
|
|
gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", |
|
|
|
|
GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), |
|
|
|
|
strerror(errno)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset *pollset, |
|
|
|
|
grpc_fd *fd, |
|
|
|
|
int and_unlock_pollset) { |
|
|
|
|
/* If there is no epoll fd on the pollset, create one */ |
|
|
|
|
if (pollset->data.ptr == NULL) { |
|
|
|
|
multipoll_with_epoll_pollset_create_efd(exec_ctx, pollset); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (and_unlock_pollset) { |
|
|
|
|
gpr_mu_unlock(&pollset->mu); |
|
|
|
|
finally_add_fd(exec_ctx, pollset, fd); |
|
|
|
@ -1500,45 +1327,6 @@ static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) { |
|
|
|
|
gpr_free(h); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static const grpc_pollset_vtable multipoll_with_epoll_pollset = { |
|
|
|
|
multipoll_with_epoll_pollset_add_fd, |
|
|
|
|
multipoll_with_epoll_pollset_maybe_work_and_unlock, |
|
|
|
|
multipoll_with_epoll_pollset_finish_shutdown, |
|
|
|
|
multipoll_with_epoll_pollset_destroy}; |
|
|
|
|
|
|
|
|
|
static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset *pollset, grpc_fd **fds, |
|
|
|
|
size_t nfds) { |
|
|
|
|
size_t i; |
|
|
|
|
epoll_hdr *h = gpr_malloc(sizeof(epoll_hdr)); |
|
|
|
|
struct epoll_event ev; |
|
|
|
|
int err; |
|
|
|
|
|
|
|
|
|
pollset->vtable = &multipoll_with_epoll_pollset; |
|
|
|
|
pollset->data.ptr = h; |
|
|
|
|
h->epoll_fd = epoll_create1(EPOLL_CLOEXEC); |
|
|
|
|
if (h->epoll_fd < 0) { |
|
|
|
|
/* TODO(klempner): Fall back to poll here, especially on ENOSYS */ |
|
|
|
|
gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno)); |
|
|
|
|
abort(); |
|
|
|
|
} |
|
|
|
|
add_epoll_fd_to_global_list(h->epoll_fd); |
|
|
|
|
|
|
|
|
|
ev.events = (uint32_t)(EPOLLIN | EPOLLET); |
|
|
|
|
ev.data.ptr = NULL; |
|
|
|
|
err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, |
|
|
|
|
GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev); |
|
|
|
|
if (err < 0) { |
|
|
|
|
gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", |
|
|
|
|
GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), |
|
|
|
|
strerror(errno)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for (i = 0; i < nfds; i++) { |
|
|
|
|
multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fds[i], 0); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*******************************************************************************
|
|
|
|
|
* pollset_set_posix.c |
|
|
|
|
*/ |
|
|
|
@ -1724,7 +1512,6 @@ static const grpc_event_engine_vtable vtable = { |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
const grpc_event_engine_vtable *grpc_init_epoll_posix(void) { |
|
|
|
|
platform_become_multipoller = epoll_become_multipoller; |
|
|
|
|
fd_global_init(); |
|
|
|
|
pollset_global_init(); |
|
|
|
|
return &vtable; |
|
|
|
|