From ba550da853b9e036ff29c5260c8f17ec4c03fc02 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 1 May 2017 14:26:31 +0000 Subject: [PATCH] Fix several races --- src/core/lib/iomgr/ev_epoll1_linux.c | 87 +++++++++++++++++----------- 1 file changed, 52 insertions(+), 35 deletions(-) diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index 1284891ded2..fccccccd093 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -106,6 +106,8 @@ struct grpc_pollset_worker { gpr_cv cv; }; +#define MAX_NEIGHBOURHOODS 1024 + typedef struct pollset_neighbourhood { gpr_mu mu; grpc_pollset *active_root; @@ -121,6 +123,7 @@ struct grpc_pollset { bool shutting_down; /* Is the pollset shutting down ? */ bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */ grpc_closure *shutdown_closure; /* Called after after shutdown is complete */ + int begin_refs; grpc_pollset *next; grpc_pollset *prev; @@ -312,7 +315,6 @@ GPR_TLS_DECL(g_current_thread_pollset); GPR_TLS_DECL(g_current_thread_worker); static gpr_atm g_active_poller; static pollset_neighbourhood *g_neighbourhoods; -static bool *g_neighbour_scan_state; static size_t g_num_neighbourhoods; /* Return true if first in list */ @@ -352,6 +354,10 @@ static worker_remove_result worker_remove(grpc_pollset *pollset, } } +static size_t choose_neighbourhood(void) { + return (size_t)gpr_cpu_current_cpu() % g_num_neighbourhoods; +} + static grpc_error *pollset_global_init(void) { gpr_tls_init(&g_current_thread_pollset); gpr_tls_init(&g_current_thread_worker); @@ -364,11 +370,9 @@ static grpc_error *pollset_global_init(void) { if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) { return GRPC_OS_ERROR(errno, "epoll_ctl"); } - g_num_neighbourhoods = GPR_MAX(1, gpr_cpu_num_cores()); + g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS); g_neighbourhoods = gpr_zalloc(sizeof(*g_neighbourhoods) * g_num_neighbourhoods); - g_neighbour_scan_state = - gpr_malloc(sizeof(*g_neighbour_scan_state) * g_num_neighbourhoods); for (size_t i = 0; i < g_num_neighbourhoods; i++) { gpr_mu_init(&g_neighbourhoods[i].mu); } @@ -383,26 +387,27 @@ static void pollset_global_shutdown(void) { gpr_mu_destroy(&g_neighbourhoods[i].mu); } gpr_free(g_neighbourhoods); - gpr_free(g_neighbour_scan_state); } static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { gpr_mu_init(&pollset->mu); *mu = &pollset->mu; - pollset->neighbourhood = &g_neighbourhoods[gpr_cpu_current_cpu()]; + pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()]; pollset->seen_inactive = true; pollset->next = pollset->prev = pollset; } static void pollset_destroy(grpc_pollset *pollset) { - gpr_mu_lock(&pollset->neighbourhood->mu); - pollset->prev->next = pollset->next; - pollset->next->prev = pollset->prev; - if (pollset == pollset->neighbourhood->active_root) { - pollset->neighbourhood->active_root = - pollset->next == pollset ? NULL : pollset->next; - } - gpr_mu_unlock(&pollset->neighbourhood->mu); + if (!pollset->seen_inactive) { + gpr_mu_lock(&pollset->neighbourhood->mu); + pollset->prev->next = pollset->next; + pollset->next->prev = pollset->prev; + if (pollset == pollset->neighbourhood->active_root) { + pollset->neighbourhood->active_root = + pollset->next == pollset ? NULL : pollset->next; + } + gpr_mu_unlock(&pollset->neighbourhood->mu); + } gpr_mu_destroy(&pollset->mu); } @@ -428,7 +433,8 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) { static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL) { + if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL && + pollset->begin_refs == 0) { grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); pollset->shutdown_closure = NULL; } @@ -532,14 +538,16 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, if (worker_hdl != NULL) *worker_hdl = worker; worker->initialized_cv = false; worker->kick_state = UNKICKED; + pollset->begin_refs++; if (pollset->seen_inactive) { // pollset has been observed to be inactive, we need to move back to the // active list - pollset_neighbourhood *neighbourhood = pollset->neighbourhood = &g_neighbourhoods[gpr_cpu_current_cpu()]; + pollset_neighbourhood *neighbourhood = pollset->neighbourhood = + &g_neighbourhoods[choose_neighbourhood()]; gpr_mu_unlock(&pollset->mu); - // pollset unlocked: state may change (even worker->kick_state) -retry_lock_neighbourhood: + // pollset unlocked: state may change (even worker->kick_state) + retry_lock_neighbourhood: gpr_mu_lock(&neighbourhood->mu); gpr_mu_lock(&pollset->mu); if (pollset->seen_inactive) { @@ -564,16 +572,18 @@ retry_lock_neighbourhood: gpr_mu_unlock(&neighbourhood->mu); } worker_insert(pollset, worker); + pollset->begin_refs--; if (worker->kick_state == UNKICKED) { GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker); worker->initialized_cv = true; gpr_cv_init(&worker->cv); - do { + while (worker->kick_state == UNKICKED && + pollset->shutdown_closure == NULL) { if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) && worker->kick_state == UNKICKED) { worker->kick_state = KICKED; } - } while (worker->kick_state == UNKICKED); + } *now = gpr_now(now->clock_type); } @@ -594,17 +604,24 @@ static bool check_neighbourhood_for_available_poller( grpc_pollset_worker *inspect_worker = inspect->root_worker; if (inspect_worker != NULL) { do { - if (inspect_worker->kick_state == UNKICKED) { - if (gpr_atm_no_barrier_cas(&g_active_poller, 0, - (gpr_atm)inspect_worker)) { - inspect_worker->kick_state = DESIGNATED_POLLER; - if (inspect_worker->initialized_cv) { - gpr_cv_signal(&inspect_worker->cv); + switch (inspect_worker->kick_state) { + case UNKICKED: + if (gpr_atm_no_barrier_cas(&g_active_poller, 0, + (gpr_atm)inspect_worker)) { + inspect_worker->kick_state = DESIGNATED_POLLER; + if (inspect_worker->initialized_cv) { + gpr_cv_signal(&inspect_worker->cv); + } } - } - // even if we didn't win the cas, there's a worker, we can stop - found_worker = true; - break; + // even if we didn't win the cas, there's a worker, we can stop + found_worker = true; + break; + case KICKED: + break; + case DESIGNATED_POLLER: + found_worker = true; // ok, so someone else found the worker, but + // we'll accept that + break; } inspect_worker = inspect_worker->next; } while (inspect_worker != inspect->root_worker); @@ -649,6 +666,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, size_t poller_neighbourhood_idx = (size_t)(pollset->neighbourhood - g_neighbourhoods); bool found_worker = false; + bool scan_state[MAX_NEIGHBOURHOODS]; for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) { pollset_neighbourhood *neighbourhood = &g_neighbourhoods[(poller_neighbourhood_idx + i) % @@ -657,19 +675,18 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, found_worker = check_neighbourhood_for_available_poller(neighbourhood); gpr_mu_unlock(&neighbourhood->mu); - g_neighbour_scan_state[i] = true; + scan_state[i] = true; } else { - g_neighbour_scan_state[i] = false; + scan_state[i] = false; } } for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) { - if (g_neighbour_scan_state[i]) continue; + if (scan_state[i]) continue; pollset_neighbourhood *neighbourhood = &g_neighbourhoods[(poller_neighbourhood_idx + i) % g_num_neighbourhoods]; gpr_mu_lock(&neighbourhood->mu); - found_worker = - check_neighbourhood_for_available_poller(neighbourhood); + found_worker = check_neighbourhood_for_available_poller(neighbourhood); gpr_mu_unlock(&neighbourhood->mu); } grpc_exec_ctx_flush(exec_ctx);