diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index 9fb640af6ba..5c158baa773 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -581,6 +581,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, } worker_insert(pollset, worker); if (worker->kick_state == UNKICKED) { + GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker); worker->initialized_cv = true; gpr_cv_init(&worker->cv); do { @@ -597,7 +598,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, } static bool check_neighbourhood_for_available_poller( - pollset_neighbourhood *neighbourhood, grpc_pollset_worker *avoid_worker) { + pollset_neighbourhood *neighbourhood) { bool found_worker = false; do { grpc_pollset *inspect = neighbourhood->active_root; @@ -607,19 +608,24 @@ static bool check_neighbourhood_for_available_poller( gpr_mu_lock(&inspect->mu); GPR_ASSERT(!inspect->seen_inactive); grpc_pollset_worker *inspect_worker = inspect->root_worker; - if (inspect_worker == avoid_worker) inspect_worker = inspect_worker->next; - if (inspect_worker == avoid_worker) inspect_worker = NULL; if (inspect_worker != NULL) { - if (gpr_atm_no_barrier_cas(&g_active_poller, 0, - (gpr_atm)inspect_worker)) { - inspect_worker->kick_state = DESIGNATED_POLLER; - if (inspect_worker->initialized_cv) { - gpr_cv_signal(&inspect_worker->cv); + do { + if (inspect_worker->kick_state == UNKICKED) { + if (gpr_atm_no_barrier_cas(&g_active_poller, 0, + (gpr_atm)inspect_worker)) { + inspect_worker->kick_state = DESIGNATED_POLLER; + if (inspect_worker->initialized_cv) { + gpr_cv_signal(&inspect_worker->cv); + } + } + // even if we didn't win the cas, there's a worker, we can stop + found_worker = true; + break; } - } - // even if we didn't win the cas, there's a worker, we can stop - found_worker = true; - } else { + inspect_worker = inspect_worker->next; + } while (inspect_worker != inspect->root_worker); + } + if (!found_worker) { inspect->seen_inactive = true; move_pollset_to_neighbourhood_list(inspect, &neighbourhood->active_root, &neighbourhood->inactive_root); @@ -636,10 +642,10 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl) { if (worker_hdl != NULL) *worker_hdl = NULL; + worker->kick_state = KICKED; if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) { GPR_ASSERT(!pollset->seen_inactive); - GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker); - if (worker->next != worker) { + if (worker->next != worker && worker->next->kick_state == UNKICKED) { assert(worker->next->initialized_cv); gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next); worker->next->kick_state = DESIGNATED_POLLER; @@ -661,7 +667,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, g_num_neighbourhoods]; if (gpr_mu_trylock(&neighbourhood->mu)) { found_worker = - check_neighbourhood_for_available_poller(neighbourhood, worker); + check_neighbourhood_for_available_poller(neighbourhood); gpr_mu_unlock(&neighbourhood->mu); g_neighbour_scan_state[i] = true; } else { @@ -676,7 +682,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, g_num_neighbourhoods]; gpr_mu_lock(&neighbourhood->mu); found_worker = - check_neighbourhood_for_available_poller(neighbourhood, worker); + check_neighbourhood_for_available_poller(neighbourhood); gpr_mu_unlock(&neighbourhood->mu); } } @@ -690,6 +696,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (EMPTIED == worker_remove(pollset, worker)) { pollset_maybe_finish_shutdown(exec_ctx, pollset); } + GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker); } /* pollset->po.mu lock must be held by the caller before calling this.