Try to avoid contention

pull/10932/head
Craig Tiller 8 years ago
parent 1168e81478
commit bbf4c7a00a
  1. 106
      src/core/lib/iomgr/ev_epoll1_linux.c

@ -314,6 +314,7 @@ GPR_TLS_DECL(g_current_thread_pollset);
GPR_TLS_DECL(g_current_thread_worker);
static gpr_atm g_active_poller;
static pollset_neighbourhood *g_neighbourhoods;
static bool *g_neighbour_scan_state;
static size_t g_num_neighbourhoods;
/* Return true if first in list */
@ -368,6 +369,8 @@ static grpc_error *pollset_global_init(void) {
g_num_neighbourhoods = GPR_MAX(1, gpr_cpu_num_cores());
g_neighbourhoods =
gpr_zalloc(sizeof(*g_neighbourhoods) * g_num_neighbourhoods);
g_neighbour_scan_state =
gpr_malloc(sizeof(*g_neighbour_scan_state) * g_num_neighbourhoods);
for (size_t i = 0; i < g_num_neighbourhoods; i++) {
gpr_mu_init(&g_neighbourhoods[i].mu);
g_neighbourhoods[i].seen_inactive = true;
@ -383,6 +386,7 @@ static void pollset_global_shutdown(void) {
gpr_mu_destroy(&g_neighbourhoods[i].mu);
}
gpr_free(g_neighbourhoods);
gpr_free(g_neighbour_scan_state);
}
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
@ -591,6 +595,42 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
pollset->shutdown_closure == NULL;
}
static bool check_neighbourhood_for_available_poller(
pollset_neighbourhood *neighbourhood, grpc_pollset_worker *avoid_worker) {
bool found_worker = false;
do {
grpc_pollset *inspect = neighbourhood->active_root;
if (inspect == NULL) {
break;
}
gpr_mu_lock(&inspect->mu);
GPR_ASSERT(!inspect->seen_inactive);
grpc_pollset_worker *inspect_worker = inspect->root_worker;
if (inspect_worker == avoid_worker) inspect_worker = inspect_worker->next;
if (inspect_worker == avoid_worker) inspect_worker = NULL;
if (inspect_worker != NULL) {
if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
(gpr_atm)inspect_worker)) {
inspect_worker->kick_state = KICKED_FOR_POLL;
if (inspect_worker->initialized_cv) {
gpr_cv_signal(&inspect_worker->cv);
}
}
// even if we didn't win the cas, there's a worker, we can stop
found_worker = true;
} else {
inspect->seen_inactive = true;
move_pollset_to_neighbourhood_list(inspect, &neighbourhood->active_root,
&neighbourhood->inactive_root);
}
gpr_mu_unlock(&inspect->mu);
} while (!found_worker);
if (!found_worker) {
neighbourhood->seen_inactive = true;
}
return found_worker;
}
static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl) {
@ -610,49 +650,35 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
} else {
gpr_atm_no_barrier_store(&g_active_poller, 0);
pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
gpr_mu_unlock(&pollset->mu);
size_t poller_neighbourhood_idx =
(size_t)(pollset->neighbourhood - g_neighbourhoods);
bool found_worker = false;
do {
gpr_mu_lock(&neighbourhood->mu);
do {
grpc_pollset *inspect = neighbourhood->active_root;
if (inspect == NULL) {
break;
}
gpr_mu_lock(&inspect->mu);
GPR_ASSERT(!inspect->seen_inactive);
grpc_pollset_worker *inspect_worker = inspect->root_worker;
if (inspect_worker == worker) inspect_worker = worker->next;
if (inspect_worker == worker) inspect_worker = NULL;
if (inspect_worker != NULL) {
if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
(gpr_atm)inspect_worker)) {
inspect_worker->kick_state = KICKED_FOR_POLL;
if (inspect_worker->initialized_cv) gpr_cv_signal(&inspect_worker->cv);
}
// even if we didn't win the cas, there's a worker, we can stop
found_worker = true;
} else {
inspect->seen_inactive = true;
move_pollset_to_neighbourhood_list(inspect,
&neighbourhood->active_root,
&neighbourhood->inactive_root);
}
gpr_mu_unlock(&inspect->mu);
} while (!found_worker);
if (!found_worker) {
neighbourhood->seen_inactive = true;
for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
pollset_neighbourhood *neighbourhood =
&g_neighbourhoods[(poller_neighbourhood_idx + i) %
g_num_neighbourhoods];
if (gpr_mu_trylock(&neighbourhood->mu)) {
found_worker =
check_neighbourhood_for_available_poller(neighbourhood, worker);
gpr_mu_unlock(&neighbourhood->mu);
g_neighbour_scan_state[i] = true;
} else {
g_neighbour_scan_state[i] = false;
}
gpr_mu_unlock(&neighbourhood->mu);
ssize_t cur_neighbourhood_idx = neighbourhood - g_neighbourhoods;
GPR_ASSERT(cur_neighbourhood_idx >= 0);
GPR_ASSERT(g_num_neighbourhoods < INTPTR_MAX);
GPR_ASSERT(cur_neighbourhood_idx < (ssize_t)g_neighbourhoods);
size_t new_neighbourhood_idx =
((size_t)cur_neighbourhood_idx + 1) % g_num_neighbourhoods;
neighbourhood = &g_neighbourhoods[new_neighbourhood_idx];
} while (!found_worker && neighbourhood != pollset->neighbourhood);
}
if (!found_worker) {
for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
if (g_neighbour_scan_state[i]) continue;
pollset_neighbourhood *neighbourhood =
&g_neighbourhoods[(poller_neighbourhood_idx + i) %
g_num_neighbourhoods];
gpr_mu_lock(&neighbourhood->mu);
found_worker =
check_neighbourhood_for_available_poller(neighbourhood, worker);
gpr_mu_unlock(&neighbourhood->mu);
}
}
grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu);
}

Loading…
Cancel
Save