Fix loss of poller bug

pull/10932/head
Craig Tiller 8 years ago
parent c1d7acbee8
commit a4b8eb003e
  1. 39
      src/core/lib/iomgr/ev_epoll1_linux.c

@ -581,6 +581,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
} }
worker_insert(pollset, worker); worker_insert(pollset, worker);
if (worker->kick_state == UNKICKED) { if (worker->kick_state == UNKICKED) {
GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
worker->initialized_cv = true; worker->initialized_cv = true;
gpr_cv_init(&worker->cv); gpr_cv_init(&worker->cv);
do { do {
@ -597,7 +598,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
} }
static bool check_neighbourhood_for_available_poller( static bool check_neighbourhood_for_available_poller(
pollset_neighbourhood *neighbourhood, grpc_pollset_worker *avoid_worker) { pollset_neighbourhood *neighbourhood) {
bool found_worker = false; bool found_worker = false;
do { do {
grpc_pollset *inspect = neighbourhood->active_root; grpc_pollset *inspect = neighbourhood->active_root;
@ -607,19 +608,24 @@ static bool check_neighbourhood_for_available_poller(
gpr_mu_lock(&inspect->mu); gpr_mu_lock(&inspect->mu);
GPR_ASSERT(!inspect->seen_inactive); GPR_ASSERT(!inspect->seen_inactive);
grpc_pollset_worker *inspect_worker = inspect->root_worker; grpc_pollset_worker *inspect_worker = inspect->root_worker;
if (inspect_worker == avoid_worker) inspect_worker = inspect_worker->next;
if (inspect_worker == avoid_worker) inspect_worker = NULL;
if (inspect_worker != NULL) { if (inspect_worker != NULL) {
if (gpr_atm_no_barrier_cas(&g_active_poller, 0, do {
(gpr_atm)inspect_worker)) { if (inspect_worker->kick_state == UNKICKED) {
inspect_worker->kick_state = DESIGNATED_POLLER; if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
if (inspect_worker->initialized_cv) { (gpr_atm)inspect_worker)) {
gpr_cv_signal(&inspect_worker->cv); inspect_worker->kick_state = DESIGNATED_POLLER;
if (inspect_worker->initialized_cv) {
gpr_cv_signal(&inspect_worker->cv);
}
}
// even if we didn't win the cas, there's a worker, we can stop
found_worker = true;
break;
} }
} inspect_worker = inspect_worker->next;
// even if we didn't win the cas, there's a worker, we can stop } while (inspect_worker != inspect->root_worker);
found_worker = true; }
} else { if (!found_worker) {
inspect->seen_inactive = true; inspect->seen_inactive = true;
move_pollset_to_neighbourhood_list(inspect, &neighbourhood->active_root, move_pollset_to_neighbourhood_list(inspect, &neighbourhood->active_root,
&neighbourhood->inactive_root); &neighbourhood->inactive_root);
@ -636,10 +642,10 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker, grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl) { grpc_pollset_worker **worker_hdl) {
if (worker_hdl != NULL) *worker_hdl = NULL; if (worker_hdl != NULL) *worker_hdl = NULL;
worker->kick_state = KICKED;
if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) { if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
GPR_ASSERT(!pollset->seen_inactive); GPR_ASSERT(!pollset->seen_inactive);
GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker); if (worker->next != worker && worker->next->kick_state == UNKICKED) {
if (worker->next != worker) {
assert(worker->next->initialized_cv); assert(worker->next->initialized_cv);
gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next); gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
worker->next->kick_state = DESIGNATED_POLLER; worker->next->kick_state = DESIGNATED_POLLER;
@ -661,7 +667,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
g_num_neighbourhoods]; g_num_neighbourhoods];
if (gpr_mu_trylock(&neighbourhood->mu)) { if (gpr_mu_trylock(&neighbourhood->mu)) {
found_worker = found_worker =
check_neighbourhood_for_available_poller(neighbourhood, worker); check_neighbourhood_for_available_poller(neighbourhood);
gpr_mu_unlock(&neighbourhood->mu); gpr_mu_unlock(&neighbourhood->mu);
g_neighbour_scan_state[i] = true; g_neighbour_scan_state[i] = true;
} else { } else {
@ -676,7 +682,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
g_num_neighbourhoods]; g_num_neighbourhoods];
gpr_mu_lock(&neighbourhood->mu); gpr_mu_lock(&neighbourhood->mu);
found_worker = found_worker =
check_neighbourhood_for_available_poller(neighbourhood, worker); check_neighbourhood_for_available_poller(neighbourhood);
gpr_mu_unlock(&neighbourhood->mu); gpr_mu_unlock(&neighbourhood->mu);
} }
} }
@ -690,6 +696,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (EMPTIED == worker_remove(pollset, worker)) { if (EMPTIED == worker_remove(pollset, worker)) {
pollset_maybe_finish_shutdown(exec_ctx, pollset); pollset_maybe_finish_shutdown(exec_ctx, pollset);
} }
GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
} }
/* pollset->po.mu lock must be held by the caller before calling this. /* pollset->po.mu lock must be held by the caller before calling this.

Loading…
Cancel
Save