diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index fdd6384c867..3355c8344ec 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -68,7 +68,7 @@ * needed) */ static grpc_wakeup_fd global_wakeup_fd; static int g_epfd; -static bool g_timer_kick = false; +static gpr_atm g_timer_kick; /******************************************************************************* * Fd Declarations @@ -96,21 +96,13 @@ static void fd_global_shutdown(void); * Pollset Declarations */ -typedef struct pollset_worker_link { - grpc_pollset_worker *next; - grpc_pollset_worker *prev; -} pollset_worker_link; - -typedef enum { - PWL_POLLSET, - PWL_POLLABLE, - POLLSET_WORKER_LINK_COUNT -} pollset_worker_links; +typedef enum { UNKICKED, KICKED, KICKED_FOR_POLL } kick_state; struct grpc_pollset_worker { - bool kicked; + kick_state kick_state; bool initialized_cv; - pollset_worker_link links[POLLSET_WORKER_LINK_COUNT]; + grpc_pollset_worker *next; + grpc_pollset_worker *prev; gpr_cv cv; }; @@ -322,19 +314,19 @@ GPR_TLS_DECL(g_current_thread_pollset); GPR_TLS_DECL(g_current_thread_worker); static gpr_atm g_active_poller; static pollset_neighbourhood *g_neighbourhoods; +static size_t g_num_neighbourhoods; /* Return true if first in list */ -static bool worker_insert(grpc_pollset_worker **root, pollset_worker_links link, - grpc_pollset_worker *worker) { - if (*root == NULL) { - *root = worker; - worker->links[link].next = worker->links[link].prev = worker; +static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) { + if (pollset->root_worker == NULL) { + pollset->root_worker = worker; + worker->next = worker->prev = worker; return true; } else { - worker->links[link].next = *root; - worker->links[link].prev = worker->links[link].next->links[link].prev; - worker->links[link].next->links[link].prev = worker; - worker->links[link].prev->links[link].next = worker; + worker->next = pollset->root_worker; + worker->prev = worker->next->prev; + worker->next->prev = worker; + worker->prev->next = worker; return false; } } @@ -342,22 +334,21 @@ static bool worker_insert(grpc_pollset_worker **root, pollset_worker_links link, /* Return true if last in list */ typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result; -static worker_remove_result worker_remove(grpc_pollset_worker **root, - pollset_worker_links link, +static worker_remove_result worker_remove(grpc_pollset *pollset, grpc_pollset_worker *worker) { - if (worker == *root) { - if (worker == worker->links[link].next) { - *root = NULL; + if (worker == pollset->root_worker) { + if (worker == worker->next) { + pollset->root_worker = NULL; return EMPTIED; } else { - *root = worker->links[link].next; - worker->links[link].prev->links[link].next = worker->links[link].next; - worker->links[link].next->links[link].prev = worker->links[link].prev; + pollset->root_worker = worker->next; + worker->prev->next = worker->next; + worker->next->prev = worker->prev; return NEW_ROOT; } } else { - worker->links[link].prev->links[link].next = worker->links[link].next; - worker->links[link].next->links[link].prev = worker->links[link].prev; + worker->prev->next = worker->next; + worker->next->prev = worker->prev; return REMOVED; } } @@ -374,6 +365,13 @@ static grpc_error *pollset_global_init(void) { if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) { return GRPC_OS_ERROR(errno, "epoll_ctl"); } + g_num_neighbourhoods = GPR_MAX(1, gpr_cpu_num_cores()); + g_neighbourhoods = + gpr_zalloc(sizeof(*g_neighbourhoods) * g_num_neighbourhoods); + for (size_t i = 0; i < g_num_neighbourhoods; i++) { + gpr_mu_init(&g_neighbourhoods[i].mu); + g_neighbourhoods[i].seen_inactive = true; + } return GRPC_ERROR_NONE; } @@ -381,6 +379,10 @@ static void pollset_global_shutdown(void) { gpr_tls_destroy(&g_current_thread_pollset); gpr_tls_destroy(&g_current_thread_worker); if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd); + for (size_t i = 0; i < g_num_neighbourhoods; i++) { + gpr_mu_destroy(&g_neighbourhoods[i].mu); + } + gpr_free(g_neighbourhoods); } static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { @@ -392,7 +394,6 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { } static void pollset_destroy(grpc_pollset *pollset) { - gpr_mu_destroy(&pollset->mu); gpr_mu_lock(&pollset->neighbourhood->mu); pollset->prev->next = pollset->next; pollset->next->prev = pollset->prev; @@ -404,6 +405,7 @@ static void pollset_destroy(grpc_pollset *pollset) { pollset->next == pollset ? NULL : pollset->next; } gpr_mu_unlock(&pollset->neighbourhood->mu); + gpr_mu_destroy(&pollset->mu); } static grpc_error *pollset_kick_all(grpc_pollset *pollset) { @@ -412,14 +414,15 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) { grpc_pollset_worker *worker = pollset->root_worker; do { if (worker->initialized_cv) { - worker->kicked = true; + worker->kick_state = KICKED; gpr_cv_signal(&worker->cv); } else { + worker->kick_state = KICKED; append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd), "pollset_shutdown"); } - worker = worker->links[PWL_POLLSET].next; + worker = worker->next; } while (worker != pollset->root_worker); } return error; @@ -485,8 +488,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, for (int i = 0; i < r; i++) { void *data_ptr = events[i].data.ptr; if (data_ptr == &global_wakeup_fd) { - if (g_timer_kick) { - g_timer_kick = false; + if (gpr_atm_no_barrier_cas(&g_timer_kick, 1, 0)) { grpc_timer_consume_kick(); } append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), @@ -508,41 +510,151 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, return error; } +#if 0 +static void verify_all_entries_in_neighbourhood_list( + grpc_pollset *root, bool should_be_seen_inactive) { + if (root == NULL) return; + grpc_pollset *p = root; + do { + GPR_ASSERT(p->seen_inactive == should_be_seen_inactive); + p = p->next; + } while (p != root); +} + +static void verify_neighbourhood_lists(pollset_neighbourhood *neighbourhood) { + // assumes neighbourhood->mu locked + verify_all_entries_in_neighbourhood_list(neighbourhood->active_root, false); + verify_all_entries_in_neighbourhood_list(neighbourhood->inactive_root, true); +} +#endif + +static void move_pollset_to_neighbourhood_list(grpc_pollset *pollset, + grpc_pollset **from_root, + grpc_pollset **to_root) { + // remove from old list + pollset->prev->next = pollset->next; + pollset->next->prev = pollset->prev; + if (*from_root == pollset) { + *from_root = pollset->next == pollset ? NULL : pollset->next; + } + // add to new list + if (*to_root == NULL) { + *to_root = pollset->next = pollset->prev = pollset; + } else { + pollset->next = *to_root; + pollset->prev = pollset->next->prev; + pollset->next->prev = pollset->prev->next = pollset; + } +} + static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl, gpr_timespec *now, gpr_timespec deadline) { - bool do_poll = true; if (worker_hdl != NULL) *worker_hdl = worker; worker->initialized_cv = false; - worker->kicked = false; - - worker_insert(&pollset->root_worker, PWL_POLLSET, worker); - if (!worker_insert(&g_root_worker, PWL_POLLABLE, worker)) { + worker->kick_state = UNKICKED; + + if (pollset->seen_inactive) { + // pollset has been observed to be inactive, we need to move back to the + // active list + pollset_neighbourhood *neighbourhood = pollset->neighbourhood; + gpr_mu_unlock(&pollset->mu); + gpr_mu_lock(&neighbourhood->mu); + gpr_mu_lock(&pollset->mu); + if (pollset->seen_inactive) { + pollset->seen_inactive = false; + move_pollset_to_neighbourhood_list(pollset, &neighbourhood->inactive_root, + &neighbourhood->active_root); + if (neighbourhood->seen_inactive) { + neighbourhood->seen_inactive = false; + if (gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) { + worker->kick_state = KICKED_FOR_POLL; + } + } + } + gpr_mu_unlock(&neighbourhood->mu); + } + worker_insert(pollset, worker); + if (worker->kick_state == UNKICKED) { worker->initialized_cv = true; gpr_cv_init(&worker->cv); - while (do_poll && g_root_worker != worker) { - if (gpr_cv_wait(&worker->cv, &g_pollset_mu, deadline)) { - do_poll = false; - } else if (worker->kicked) { - do_poll = false; + do { + if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) && + worker->kick_state == UNKICKED) { + worker->kick_state = KICKED; } - } + } while (worker->kick_state == UNKICKED); *now = gpr_now(now->clock_type); } - return do_poll && pollset->shutdown_closure == NULL; + return worker->kick_state == KICKED_FOR_POLL && + pollset->shutdown_closure == NULL; } static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl) { - if (NEW_ROOT == worker_remove(&g_root_worker, PWL_POLLABLE, worker)) { - gpr_cv_signal(&g_root_worker->cv); + if (worker->kick_state == KICKED_FOR_POLL) { + GPR_ASSERT(!pollset->seen_inactive); + GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker); + if (worker->next != worker) { + assert(worker->next->initialized_cv); + gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next); + worker->next->kick_state = KICKED_FOR_POLL; + gpr_cv_signal(&worker->next->cv); + } else { + gpr_atm_no_barrier_store(&g_active_poller, 0); + pollset_neighbourhood *neighbourhood = pollset->neighbourhood; + gpr_mu_unlock(&pollset->mu); + bool found_worker = false; + do { + gpr_mu_lock(&neighbourhood->mu); + do { + grpc_pollset *inspect = neighbourhood->active_root; + if (inspect == NULL) { + break; + } + gpr_mu_lock(&inspect->mu); + GPR_ASSERT(!inspect->seen_inactive); + grpc_pollset_worker *inspect_worker = inspect->root_worker; + if (inspect_worker == worker) inspect_worker = worker->next; + if (inspect_worker == worker) inspect_worker = NULL; + if (inspect_worker != NULL) { + if (gpr_atm_no_barrier_cas(&g_active_poller, 0, + (gpr_atm)inspect_worker)) { + GPR_ASSERT(inspect_worker->initialized_cv); + inspect_worker->kick_state = KICKED_FOR_POLL; + gpr_cv_signal(&inspect_worker->cv); + } + // even if we didn't win the cas, there's a worker, we can stop + found_worker = true; + } else { + inspect->seen_inactive = true; + move_pollset_to_neighbourhood_list(inspect, + &neighbourhood->active_root, + &neighbourhood->inactive_root); + } + gpr_mu_unlock(&inspect->mu); + } while (!found_worker); + if (!found_worker) { + neighbourhood->seen_inactive = true; + } + gpr_mu_unlock(&neighbourhood->mu); + ssize_t cur_neighbourhood_idx = neighbourhood - g_neighbourhoods; + GPR_ASSERT(cur_neighbourhood_idx >= 0); + GPR_ASSERT(g_num_neighbourhoods < INTPTR_MAX); + GPR_ASSERT(cur_neighbourhood_idx < (ssize_t)g_neighbourhoods); + size_t new_neighbourhood_idx = + ((size_t)cur_neighbourhood_idx + 1) % g_num_neighbourhoods; + neighbourhood = &g_neighbourhoods[new_neighbourhood_idx]; + } while (!found_worker && neighbourhood != pollset->neighbourhood); + gpr_mu_lock(&pollset->mu); + } } if (worker->initialized_cv) { gpr_cv_destroy(&worker->cv); } - if (EMPTIED == worker_remove(&pollset->root_worker, PWL_POLLSET, worker)) { + if (EMPTIED == worker_remove(pollset, worker)) { pollset_maybe_finish_shutdown(exec_ctx, pollset); } } @@ -565,11 +677,11 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); GPR_ASSERT(!pollset->shutdown_closure); - gpr_mu_unlock(&g_pollset_mu); + gpr_mu_unlock(&pollset->mu); append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline), err_desc); grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&g_pollset_mu); + gpr_mu_lock(&pollset->mu); gpr_tls_set(&g_current_thread_pollset, 0); gpr_tls_set(&g_current_thread_worker, 0); pollset_maybe_finish_shutdown(exec_ctx, pollset); @@ -587,29 +699,32 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, pollset->kicked_without_poller = true; return GRPC_ERROR_NONE; } - grpc_pollset_worker *next_worker = root_worker->links[PWL_POLLSET].next; - if (root_worker == next_worker && root_worker == g_root_worker) { - root_worker->kicked = true; + grpc_pollset_worker *next_worker = root_worker->next; + if (root_worker == next_worker && + root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load( + &g_active_poller)) { + root_worker->kick_state = KICKED; return grpc_wakeup_fd_wakeup(&global_wakeup_fd); } else { - next_worker->kicked = true; + next_worker->kick_state = KICKED; gpr_cv_signal(&next_worker->cv); return GRPC_ERROR_NONE; } } else { return GRPC_ERROR_NONE; } - } else if (specific_worker->kicked) { + } else if (specific_worker->kick_state != KICKED) { return GRPC_ERROR_NONE; } else if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { - specific_worker->kicked = true; + specific_worker->kick_state = KICKED; return GRPC_ERROR_NONE; - } else if (specific_worker == g_root_worker) { - specific_worker->kicked = true; + } else if (specific_worker == + (grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) { + specific_worker->kick_state = KICKED; return grpc_wakeup_fd_wakeup(&global_wakeup_fd); } else { - specific_worker->kicked = true; + specific_worker->kick_state = KICKED; gpr_cv_signal(&specific_worker->cv); return GRPC_ERROR_NONE; } @@ -619,9 +734,7 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) {} static grpc_error *kick_poller(void) { - gpr_mu_lock(&g_pollset_mu); - g_timer_kick = true; - gpr_mu_unlock(&g_pollset_mu); + gpr_atm_no_barrier_store(&g_timer_kick, 1); return grpc_wakeup_fd_wakeup(&global_wakeup_fd); }