|
|
|
@ -169,12 +169,20 @@ struct grpc_pollset_worker { |
|
|
|
|
pollable *pollable; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
#define MAX_EPOLL_EVENTS 100 |
|
|
|
|
#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5 |
|
|
|
|
|
|
|
|
|
struct grpc_pollset { |
|
|
|
|
pollable pollable; |
|
|
|
|
pollable *current_pollable; |
|
|
|
|
int kick_alls_pending; |
|
|
|
|
bool kicked_without_poller; |
|
|
|
|
grpc_closure *shutdown_closure; |
|
|
|
|
grpc_pollset_worker *root_worker; |
|
|
|
|
|
|
|
|
|
int event_cursor; |
|
|
|
|
int event_count; |
|
|
|
|
struct epoll_event events[MAX_EPOLL_EVENTS]; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
/*******************************************************************************
|
|
|
|
@ -437,7 +445,7 @@ static grpc_error *pollable_materialize(pollable *p) { |
|
|
|
|
return err; |
|
|
|
|
} |
|
|
|
|
struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET), |
|
|
|
|
.data.ptr = &p->wakeup}; |
|
|
|
|
.data.ptr = (void *)(1 | (intptr_t)&p->wakeup)}; |
|
|
|
|
if (epoll_ctl(new_epfd, EPOLL_CTL_ADD, p->wakeup.read_fd, &ev) != 0) { |
|
|
|
|
err = GRPC_OS_ERROR(errno, "epoll_ctl"); |
|
|
|
|
close(new_epfd); |
|
|
|
@ -503,8 +511,20 @@ static void pollset_global_shutdown(void) { |
|
|
|
|
gpr_tls_destroy(&g_current_thread_worker); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_error *pollset_kick_all(grpc_pollset *pollset) { |
|
|
|
|
static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset *pollset) { |
|
|
|
|
if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL && |
|
|
|
|
pollset->kick_alls_pending == 0) { |
|
|
|
|
grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); |
|
|
|
|
pollset->shutdown_closure = NULL; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error_unused) { |
|
|
|
|
grpc_error *error = GRPC_ERROR_NONE; |
|
|
|
|
grpc_pollset *pollset = arg; |
|
|
|
|
gpr_mu_lock(&pollset->pollable.po.mu); |
|
|
|
|
if (pollset->root_worker != NULL) { |
|
|
|
|
grpc_pollset_worker *worker = pollset->root_worker; |
|
|
|
|
do { |
|
|
|
@ -525,7 +545,17 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) { |
|
|
|
|
worker = worker->links[PWL_POLLSET].next; |
|
|
|
|
} while (worker != pollset->root_worker); |
|
|
|
|
} |
|
|
|
|
return error; |
|
|
|
|
pollset->kick_alls_pending--; |
|
|
|
|
pollset_maybe_finish_shutdown(exec_ctx, pollset); |
|
|
|
|
gpr_mu_unlock(&pollset->pollable.po.mu); |
|
|
|
|
GRPC_LOG_IF_ERROR("kick_all", error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { |
|
|
|
|
pollset->kick_alls_pending++; |
|
|
|
|
grpc_closure_sched(exec_ctx, grpc_closure_create(do_kick_all, pollset, |
|
|
|
|
grpc_schedule_on_exec_ctx), |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, |
|
|
|
@ -664,20 +694,12 @@ static grpc_error *fd_become_pollable_locked(grpc_fd *fd) { |
|
|
|
|
return error; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset *pollset) { |
|
|
|
|
if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL) { |
|
|
|
|
GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); |
|
|
|
|
pollset->shutdown_closure = NULL; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* pollset->po.mu lock must be held by the caller before calling this */ |
|
|
|
|
static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
grpc_closure *closure) { |
|
|
|
|
GPR_ASSERT(pollset->shutdown_closure == NULL); |
|
|
|
|
pollset->shutdown_closure = closure; |
|
|
|
|
GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset)); |
|
|
|
|
pollset_kick_all(exec_ctx, pollset); |
|
|
|
|
pollset_maybe_finish_shutdown(exec_ctx, pollset); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -685,6 +707,46 @@ static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) { |
|
|
|
|
return p != &g_empty_pollable && p != &pollset->pollable; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset *pollset, bool drain) { |
|
|
|
|
static const char *err_desc = "pollset_process_events"; |
|
|
|
|
grpc_error *error = GRPC_ERROR_NONE; |
|
|
|
|
for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) && |
|
|
|
|
pollset->event_cursor != pollset->event_count; |
|
|
|
|
i++) { |
|
|
|
|
int n = pollset->event_cursor++; |
|
|
|
|
struct epoll_event *ev = &pollset->events[n]; |
|
|
|
|
void *data_ptr = ev->data.ptr; |
|
|
|
|
if (1 & (intptr_t)data_ptr) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "PS:%p got pollset_wakeup %p", pollset, data_ptr); |
|
|
|
|
} |
|
|
|
|
append_error(&error, grpc_wakeup_fd_consume_wakeup( |
|
|
|
|
(void *)((~(intptr_t)1) & (intptr_t)data_ptr)), |
|
|
|
|
err_desc); |
|
|
|
|
} else { |
|
|
|
|
grpc_fd *fd = (grpc_fd *)data_ptr; |
|
|
|
|
bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0; |
|
|
|
|
bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; |
|
|
|
|
bool write_ev = (ev->events & EPOLLOUT) != 0; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"PS:%p got fd %p: cancel=%d read=%d " |
|
|
|
|
"write=%d", |
|
|
|
|
pollset, fd, cancel, read_ev, write_ev); |
|
|
|
|
} |
|
|
|
|
if (read_ev || cancel) { |
|
|
|
|
fd_become_readable(exec_ctx, fd, pollset); |
|
|
|
|
} |
|
|
|
|
if (write_ev || cancel) { |
|
|
|
|
fd_become_writable(exec_ctx, fd); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return error; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* pollset_shutdown is guaranteed to be called before pollset_destroy. */ |
|
|
|
|
static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { |
|
|
|
|
pollable_destroy(&pollset->pollable); |
|
|
|
@ -692,16 +754,13 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { |
|
|
|
|
UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable, 2, |
|
|
|
|
"pollset_pollable"); |
|
|
|
|
} |
|
|
|
|
GRPC_LOG_IF_ERROR("pollset_process_events", |
|
|
|
|
pollset_process_events(exec_ctx, pollset, true)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#define MAX_EPOLL_EVENTS 100 |
|
|
|
|
|
|
|
|
|
static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
pollable *p, gpr_timespec now, |
|
|
|
|
gpr_timespec deadline) { |
|
|
|
|
struct epoll_event events[MAX_EPOLL_EVENTS]; |
|
|
|
|
static const char *err_desc = "pollset_poll"; |
|
|
|
|
|
|
|
|
|
int timeout = poll_deadline_to_millis_timeout(deadline, now); |
|
|
|
|
|
|
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
@ -713,7 +772,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
} |
|
|
|
|
int r; |
|
|
|
|
do { |
|
|
|
|
r = epoll_wait(p->epfd, events, MAX_EPOLL_EVENTS, timeout); |
|
|
|
|
r = epoll_wait(p->epfd, pollset->events, MAX_EPOLL_EVENTS, timeout); |
|
|
|
|
} while (r < 0 && errno == EINTR); |
|
|
|
|
if (timeout != 0) { |
|
|
|
|
GRPC_SCHEDULING_END_BLOCKING_REGION; |
|
|
|
@ -725,35 +784,10 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
gpr_log(GPR_DEBUG, "PS:%p poll %p got %d events", pollset, p, r); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_error *error = GRPC_ERROR_NONE; |
|
|
|
|
for (int i = 0; i < r; i++) { |
|
|
|
|
void *data_ptr = events[i].data.ptr; |
|
|
|
|
if (data_ptr == &p->wakeup) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "PS:%p poll %p got pollset_wakeup", pollset, p); |
|
|
|
|
} |
|
|
|
|
append_error(&error, grpc_wakeup_fd_consume_wakeup(&p->wakeup), err_desc); |
|
|
|
|
} else { |
|
|
|
|
grpc_fd *fd = (grpc_fd *)data_ptr; |
|
|
|
|
bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0; |
|
|
|
|
bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0; |
|
|
|
|
bool write_ev = (events[i].events & EPOLLOUT) != 0; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"PS:%p poll %p got fd %p: cancel=%d read=%d " |
|
|
|
|
"write=%d", |
|
|
|
|
pollset, p, fd, cancel, read_ev, write_ev); |
|
|
|
|
} |
|
|
|
|
if (read_ev || cancel) { |
|
|
|
|
fd_become_readable(exec_ctx, fd, pollset); |
|
|
|
|
} |
|
|
|
|
if (write_ev || cancel) { |
|
|
|
|
fd_become_writable(exec_ctx, fd); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
pollset->event_cursor = 0; |
|
|
|
|
pollset->event_count = r; |
|
|
|
|
|
|
|
|
|
return error; |
|
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Return true if first in list */ |
|
|
|
@ -905,10 +939,13 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
gpr_mu_unlock(&worker.pollable->po.mu); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&pollset->pollable.po.mu); |
|
|
|
|
append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable, now, |
|
|
|
|
deadline), |
|
|
|
|
if (pollset->event_cursor == pollset->event_count) { |
|
|
|
|
append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable, |
|
|
|
|
now, deadline), |
|
|
|
|
err_desc); |
|
|
|
|
} |
|
|
|
|
append_error(&error, pollset_process_events(exec_ctx, pollset, false), |
|
|
|
|
err_desc); |
|
|
|
|
grpc_exec_ctx_flush(exec_ctx); |
|
|
|
|
gpr_mu_lock(&pollset->pollable.po.mu); |
|
|
|
|
if (worker.pollable != &pollset->pollable) { |
|
|
|
|
gpr_mu_lock(&worker.pollable->po.mu); |
|
|
|
@ -921,6 +958,11 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
if (worker.pollable != &pollset->pollable) { |
|
|
|
|
gpr_mu_unlock(&worker.pollable->po.mu); |
|
|
|
|
} |
|
|
|
|
if (grpc_exec_ctx_has_work(exec_ctx)) { |
|
|
|
|
gpr_mu_unlock(&pollset->pollable.po.mu); |
|
|
|
|
grpc_exec_ctx_flush(exec_ctx); |
|
|
|
|
gpr_mu_lock(&pollset->pollable.po.mu); |
|
|
|
|
} |
|
|
|
|
return error; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -942,7 +984,7 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
"PS:%p add fd %p; transition pollable from empty to fd", pollset, |
|
|
|
|
fd); |
|
|
|
|
/* empty pollable --> single fd pollable */ |
|
|
|
|
append_error(&error, pollset_kick_all(pollset), err_desc); |
|
|
|
|
pollset_kick_all(exec_ctx, pollset); |
|
|
|
|
pollset->current_pollable = &fd->pollable; |
|
|
|
|
if (!fd_locked) gpr_mu_lock(&fd->pollable.po.mu); |
|
|
|
|
append_error(&error, fd_become_pollable_locked(fd), err_desc); |
|
|
|
@ -959,7 +1001,7 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"PS:%p add fd %p; transition pollable from fd %p to multipoller", |
|
|
|
|
pollset, fd, had_fd); |
|
|
|
|
append_error(&error, pollset_kick_all(pollset), err_desc); |
|
|
|
|
pollset_kick_all(exec_ctx, pollset); |
|
|
|
|
pollset->current_pollable = &pollset->pollable; |
|
|
|
|
if (append_error(&error, pollable_materialize(&pollset->pollable), |
|
|
|
|
err_desc)) { |
|
|
|
@ -1323,8 +1365,6 @@ static const grpc_event_engine_vtable vtable = { |
|
|
|
|
|
|
|
|
|
const grpc_event_engine_vtable *grpc_init_epollex_linux( |
|
|
|
|
bool explicitly_requested) { |
|
|
|
|
if (!explicitly_requested) return NULL; |
|
|
|
|
|
|
|
|
|
if (!grpc_has_wakeup_fd()) { |
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|