|
|
@ -64,11 +64,6 @@ |
|
|
|
#include "src/core/lib/support/block_annotate.h" |
|
|
|
#include "src/core/lib/support/block_annotate.h" |
|
|
|
#include "src/core/lib/support/spinlock.h" |
|
|
|
#include "src/core/lib/support/spinlock.h" |
|
|
|
|
|
|
|
|
|
|
|
/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
|
|
|
|
|
|
|
|
* sure to wake up one polling thread (which can wake up other threads if |
|
|
|
|
|
|
|
* needed) */ |
|
|
|
|
|
|
|
static grpc_wakeup_fd global_wakeup_fd; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/*******************************************************************************
|
|
|
|
/*******************************************************************************
|
|
|
|
* Pollset-set sibling link |
|
|
|
* Pollset-set sibling link |
|
|
|
*/ |
|
|
|
*/ |
|
|
@ -560,16 +555,6 @@ static grpc_error *pollable_materialize(pollable *p) { |
|
|
|
int new_epfd = epoll_create1(EPOLL_CLOEXEC); |
|
|
|
int new_epfd = epoll_create1(EPOLL_CLOEXEC); |
|
|
|
if (new_epfd < 0) { |
|
|
|
if (new_epfd < 0) { |
|
|
|
return GRPC_OS_ERROR(errno, "epoll_create1"); |
|
|
|
return GRPC_OS_ERROR(errno, "epoll_create1"); |
|
|
|
} else { |
|
|
|
|
|
|
|
struct epoll_event ev = { |
|
|
|
|
|
|
|
.events = (uint32_t)(EPOLLIN | EPOLLET | EPOLLEXCLUSIVE), |
|
|
|
|
|
|
|
.data.ptr = &global_wakeup_fd}; |
|
|
|
|
|
|
|
if (epoll_ctl(new_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != |
|
|
|
|
|
|
|
0) { |
|
|
|
|
|
|
|
grpc_error *err = GRPC_OS_ERROR(errno, "epoll_ctl"); |
|
|
|
|
|
|
|
close(new_epfd); |
|
|
|
|
|
|
|
return err; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
grpc_error *err = grpc_wakeup_fd_init(&p->wakeup); |
|
|
|
grpc_error *err = grpc_wakeup_fd_init(&p->wakeup); |
|
|
|
if (err != GRPC_ERROR_NONE) { |
|
|
|
if (err != GRPC_ERROR_NONE) { |
|
|
@ -639,22 +624,16 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { |
|
|
|
|
|
|
|
|
|
|
|
GPR_TLS_DECL(g_current_thread_pollset); |
|
|
|
GPR_TLS_DECL(g_current_thread_pollset); |
|
|
|
GPR_TLS_DECL(g_current_thread_worker); |
|
|
|
GPR_TLS_DECL(g_current_thread_worker); |
|
|
|
static bool global_wakeup_fd_initialized = false; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* Global state management */ |
|
|
|
/* Global state management */ |
|
|
|
static grpc_error *pollset_global_init(void) { |
|
|
|
static grpc_error *pollset_global_init(void) { |
|
|
|
gpr_tls_init(&g_current_thread_pollset); |
|
|
|
gpr_tls_init(&g_current_thread_pollset); |
|
|
|
gpr_tls_init(&g_current_thread_worker); |
|
|
|
gpr_tls_init(&g_current_thread_worker); |
|
|
|
grpc_error *error = GRPC_ERROR_NONE; |
|
|
|
|
|
|
|
static const char *err_desc = "pollset_global_init"; |
|
|
|
|
|
|
|
global_wakeup_fd_initialized = |
|
|
|
|
|
|
|
append_error(&error, grpc_wakeup_fd_init(&global_wakeup_fd), err_desc); |
|
|
|
|
|
|
|
pollable_init(&g_empty_pollable, PO_EMPTY_POLLABLE); |
|
|
|
pollable_init(&g_empty_pollable, PO_EMPTY_POLLABLE); |
|
|
|
return error; |
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static void pollset_global_shutdown(void) { |
|
|
|
static void pollset_global_shutdown(void) { |
|
|
|
if (global_wakeup_fd_initialized) grpc_wakeup_fd_destroy(&global_wakeup_fd); |
|
|
|
|
|
|
|
pollable_destroy(&g_empty_pollable); |
|
|
|
pollable_destroy(&g_empty_pollable); |
|
|
|
gpr_tls_destroy(&g_current_thread_pollset); |
|
|
|
gpr_tls_destroy(&g_current_thread_pollset); |
|
|
|
gpr_tls_destroy(&g_current_thread_worker); |
|
|
|
gpr_tls_destroy(&g_current_thread_worker); |
|
|
@ -687,7 +666,7 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) { |
|
|
|
|
|
|
|
|
|
|
|
static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, |
|
|
|
static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, |
|
|
|
grpc_pollset_worker *specific_worker) { |
|
|
|
grpc_pollset_worker *specific_worker) { |
|
|
|
if (grpc_polling_trace) { |
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
"PS:%p kick %p tls_pollset=%p tls_worker=%p " |
|
|
|
"PS:%p kick %p tls_pollset=%p tls_worker=%p " |
|
|
|
"root_worker=(pollset:%p pollable:%p)", |
|
|
|
"root_worker=(pollset:%p pollable:%p)", |
|
|
@ -698,13 +677,13 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, |
|
|
|
if (specific_worker == NULL) { |
|
|
|
if (specific_worker == NULL) { |
|
|
|
if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) { |
|
|
|
if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) { |
|
|
|
if (pollset->root_worker == NULL) { |
|
|
|
if (pollset->root_worker == NULL) { |
|
|
|
if (grpc_polling_trace) { |
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", p); |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", p); |
|
|
|
} |
|
|
|
} |
|
|
|
pollset->kicked_without_poller = true; |
|
|
|
pollset->kicked_without_poller = true; |
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
if (grpc_polling_trace) { |
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p kicked_any_via_wakeup_fd", p); |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p kicked_any_via_wakeup_fd", p); |
|
|
|
} |
|
|
|
} |
|
|
|
grpc_error *err = pollable_materialize(p); |
|
|
|
grpc_error *err = pollable_materialize(p); |
|
|
@ -712,25 +691,25 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, |
|
|
|
return grpc_wakeup_fd_wakeup(&p->wakeup); |
|
|
|
return grpc_wakeup_fd_wakeup(&p->wakeup); |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
if (grpc_polling_trace) { |
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", p); |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", p); |
|
|
|
} |
|
|
|
} |
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
} |
|
|
|
} |
|
|
|
} else if (specific_worker->kicked) { |
|
|
|
} else if (specific_worker->kicked) { |
|
|
|
if (grpc_polling_trace) { |
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p); |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p); |
|
|
|
} |
|
|
|
} |
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
} else if (gpr_tls_get(&g_current_thread_worker) == |
|
|
|
} else if (gpr_tls_get(&g_current_thread_worker) == |
|
|
|
(intptr_t)specific_worker) { |
|
|
|
(intptr_t)specific_worker) { |
|
|
|
if (grpc_polling_trace) { |
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_awake", p); |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_awake", p); |
|
|
|
} |
|
|
|
} |
|
|
|
specific_worker->kicked = true; |
|
|
|
specific_worker->kicked = true; |
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
} else if (specific_worker == p->root_worker) { |
|
|
|
} else if (specific_worker == p->root_worker) { |
|
|
|
if (grpc_polling_trace) { |
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p); |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p); |
|
|
|
} |
|
|
|
} |
|
|
|
grpc_error *err = pollable_materialize(p); |
|
|
|
grpc_error *err = pollable_materialize(p); |
|
|
@ -738,7 +717,7 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, |
|
|
|
specific_worker->kicked = true; |
|
|
|
specific_worker->kicked = true; |
|
|
|
return grpc_wakeup_fd_wakeup(&p->wakeup); |
|
|
|
return grpc_wakeup_fd_wakeup(&p->wakeup); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
if (grpc_polling_trace) { |
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p); |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p); |
|
|
|
} |
|
|
|
} |
|
|
|
specific_worker->kicked = true; |
|
|
|
specific_worker->kicked = true; |
|
|
@ -761,10 +740,6 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, |
|
|
|
return error; |
|
|
|
return error; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static grpc_error *kick_poller(void) { |
|
|
|
|
|
|
|
return grpc_wakeup_fd_wakeup(&global_wakeup_fd); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { |
|
|
|
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { |
|
|
|
pollable_init(&pollset->pollable, PO_POLLSET); |
|
|
|
pollable_init(&pollset->pollable, PO_POLLSET); |
|
|
|
pollset->current_pollable = &g_empty_pollable; |
|
|
|
pollset->current_pollable = &g_empty_pollable; |
|
|
@ -865,7 +840,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
|
|
|
|
|
|
|
int timeout = poll_deadline_to_millis_timeout(deadline, now); |
|
|
|
int timeout = poll_deadline_to_millis_timeout(deadline, now); |
|
|
|
|
|
|
|
|
|
|
|
if (grpc_polling_trace) { |
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p poll %p for %dms", pollset, p, timeout); |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p poll %p for %dms", pollset, p, timeout); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -882,23 +857,15 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
|
|
|
|
|
|
|
if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); |
|
|
|
if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); |
|
|
|
|
|
|
|
|
|
|
|
if (grpc_polling_trace) { |
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p poll %p got %d events", pollset, p, r); |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p poll %p got %d events", pollset, p, r); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
grpc_error *error = GRPC_ERROR_NONE; |
|
|
|
grpc_error *error = GRPC_ERROR_NONE; |
|
|
|
for (int i = 0; i < r; i++) { |
|
|
|
for (int i = 0; i < r; i++) { |
|
|
|
void *data_ptr = events[i].data.ptr; |
|
|
|
void *data_ptr = events[i].data.ptr; |
|
|
|
if (data_ptr == &global_wakeup_fd) { |
|
|
|
if (data_ptr == &p->wakeup) { |
|
|
|
if (grpc_polling_trace) { |
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p poll %p got global_wakeup_fd", pollset, p); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
grpc_timer_consume_kick(); |
|
|
|
|
|
|
|
append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), |
|
|
|
|
|
|
|
err_desc); |
|
|
|
|
|
|
|
} else if (data_ptr == &p->wakeup) { |
|
|
|
|
|
|
|
if (grpc_polling_trace) { |
|
|
|
|
|
|
|
gpr_log(GPR_DEBUG, "PS:%p poll %p got pollset_wakeup", pollset, p); |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p poll %p got pollset_wakeup", pollset, p); |
|
|
|
} |
|
|
|
} |
|
|
|
append_error(&error, grpc_wakeup_fd_consume_wakeup(&p->wakeup), err_desc); |
|
|
|
append_error(&error, grpc_wakeup_fd_consume_wakeup(&p->wakeup), err_desc); |
|
|
@ -908,7 +875,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0; |
|
|
|
bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0; |
|
|
|
bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0; |
|
|
|
bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0; |
|
|
|
bool write_ev = (events[i].events & EPOLLOUT) != 0; |
|
|
|
bool write_ev = (events[i].events & EPOLLOUT) != 0; |
|
|
|
if (grpc_polling_trace) { |
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
"PS:%p poll %p got fd %p: is_wq=%d cancel=%d read=%d " |
|
|
|
"PS:%p poll %p got fd %p: is_wq=%d cancel=%d read=%d " |
|
|
|
"write=%d", |
|
|
|
"write=%d", |
|
|
@ -994,25 +961,25 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, |
|
|
|
if (worker->pollable != &pollset->pollable) { |
|
|
|
if (worker->pollable != &pollset->pollable) { |
|
|
|
gpr_mu_unlock(&pollset->pollable.po.mu); |
|
|
|
gpr_mu_unlock(&pollset->pollable.po.mu); |
|
|
|
} |
|
|
|
} |
|
|
|
if (grpc_polling_trace && worker->pollable->root_worker != worker) { |
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace) && worker->pollable->root_worker != worker) { |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset, |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset, |
|
|
|
worker->pollable, worker, |
|
|
|
worker->pollable, worker, |
|
|
|
poll_deadline_to_millis_timeout(deadline, *now)); |
|
|
|
poll_deadline_to_millis_timeout(deadline, *now)); |
|
|
|
} |
|
|
|
} |
|
|
|
while (do_poll && worker->pollable->root_worker != worker) { |
|
|
|
while (do_poll && worker->pollable->root_worker != worker) { |
|
|
|
if (gpr_cv_wait(&worker->cv, &worker->pollable->po.mu, deadline)) { |
|
|
|
if (gpr_cv_wait(&worker->cv, &worker->pollable->po.mu, deadline)) { |
|
|
|
if (grpc_polling_trace) { |
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset, |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset, |
|
|
|
worker->pollable, worker); |
|
|
|
worker->pollable, worker); |
|
|
|
} |
|
|
|
} |
|
|
|
do_poll = false; |
|
|
|
do_poll = false; |
|
|
|
} else if (worker->kicked) { |
|
|
|
} else if (worker->kicked) { |
|
|
|
if (grpc_polling_trace) { |
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset, worker->pollable, |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset, worker->pollable, |
|
|
|
worker); |
|
|
|
worker); |
|
|
|
} |
|
|
|
} |
|
|
|
do_poll = false; |
|
|
|
do_poll = false; |
|
|
|
} else if (grpc_polling_trace && |
|
|
|
} else if (GRPC_TRACER_ON(grpc_polling_trace) && |
|
|
|
worker->pollable->root_worker != worker) { |
|
|
|
worker->pollable->root_worker != worker) { |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p spurious_wakeup %p w=%p", pollset, |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p spurious_wakeup %p w=%p", pollset, |
|
|
|
worker->pollable, worker); |
|
|
|
worker->pollable, worker); |
|
|
@ -1056,7 +1023,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
grpc_pollset_worker **worker_hdl, |
|
|
|
grpc_pollset_worker **worker_hdl, |
|
|
|
gpr_timespec now, gpr_timespec deadline) { |
|
|
|
gpr_timespec now, gpr_timespec deadline) { |
|
|
|
grpc_pollset_worker worker; |
|
|
|
grpc_pollset_worker worker; |
|
|
|
if (0 && grpc_polling_trace) { |
|
|
|
if (0 && GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRId64 |
|
|
|
gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRId64 |
|
|
|
".%09d deadline=%" PRId64 ".%09d kwp=%d root_worker=%p", |
|
|
|
".%09d deadline=%" PRId64 ".%09d kwp=%d root_worker=%p", |
|
|
|
pollset, worker_hdl, &worker, now.tv_sec, now.tv_nsec, |
|
|
|
pollset, worker_hdl, &worker, now.tv_sec, now.tv_nsec, |
|
|
@ -1484,8 +1451,6 @@ static const grpc_event_engine_vtable vtable = { |
|
|
|
.pollset_set_add_fd = pollset_set_add_fd, |
|
|
|
.pollset_set_add_fd = pollset_set_add_fd, |
|
|
|
.pollset_set_del_fd = pollset_set_del_fd, |
|
|
|
.pollset_set_del_fd = pollset_set_del_fd, |
|
|
|
|
|
|
|
|
|
|
|
.kick_poller = kick_poller, |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
.workqueue_ref = workqueue_ref, |
|
|
|
.workqueue_ref = workqueue_ref, |
|
|
|
.workqueue_unref = workqueue_unref, |
|
|
|
.workqueue_unref = workqueue_unref, |
|
|
|
.workqueue_scheduler = workqueue_scheduler, |
|
|
|
.workqueue_scheduler = workqueue_scheduler, |
|
|
|