|
|
|
@ -160,18 +160,18 @@ struct grpc_pollset_worker { |
|
|
|
|
(worker)->kick_state_mutator = __LINE__; \
|
|
|
|
|
} while (false) |
|
|
|
|
|
|
|
|
|
#define MAX_NEIGHBOURHOODS 1024 |
|
|
|
|
#define MAX_NEIGHBORHOODS 1024 |
|
|
|
|
|
|
|
|
|
typedef struct pollset_neighbourhood { |
|
|
|
|
typedef struct pollset_neighborhood { |
|
|
|
|
gpr_mu mu; |
|
|
|
|
grpc_pollset *active_root; |
|
|
|
|
char pad[GPR_CACHELINE_SIZE]; |
|
|
|
|
} pollset_neighbourhood; |
|
|
|
|
} pollset_neighborhood; |
|
|
|
|
|
|
|
|
|
struct grpc_pollset { |
|
|
|
|
gpr_mu mu; |
|
|
|
|
pollset_neighbourhood *neighbourhood; |
|
|
|
|
bool reassigning_neighbourhood; |
|
|
|
|
pollset_neighborhood *neighborhood; |
|
|
|
|
bool reassigning_neighborhood; |
|
|
|
|
grpc_pollset_worker *root_worker; |
|
|
|
|
bool kicked_without_poller; |
|
|
|
|
|
|
|
|
@ -384,8 +384,8 @@ GPR_TLS_DECL(g_current_thread_worker); |
|
|
|
|
/* The designated poller */ |
|
|
|
|
static gpr_atm g_active_poller; |
|
|
|
|
|
|
|
|
|
static pollset_neighbourhood *g_neighbourhoods; |
|
|
|
|
static size_t g_num_neighbourhoods; |
|
|
|
|
static pollset_neighborhood *g_neighborhoods; |
|
|
|
|
static size_t g_num_neighborhoods; |
|
|
|
|
|
|
|
|
|
/* Return true if first in list */ |
|
|
|
|
static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) { |
|
|
|
@ -424,8 +424,8 @@ static worker_remove_result worker_remove(grpc_pollset *pollset, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static size_t choose_neighbourhood(void) { |
|
|
|
|
return (size_t)gpr_cpu_current_cpu() % g_num_neighbourhoods; |
|
|
|
|
static size_t choose_neighborhood(void) { |
|
|
|
|
return (size_t)gpr_cpu_current_cpu() % g_num_neighborhoods; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_error *pollset_global_init(void) { |
|
|
|
@ -441,11 +441,11 @@ static grpc_error *pollset_global_init(void) { |
|
|
|
|
&ev) != 0) { |
|
|
|
|
return GRPC_OS_ERROR(errno, "epoll_ctl"); |
|
|
|
|
} |
|
|
|
|
g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS); |
|
|
|
|
g_neighbourhoods = (pollset_neighbourhood *)gpr_zalloc( |
|
|
|
|
sizeof(*g_neighbourhoods) * g_num_neighbourhoods); |
|
|
|
|
for (size_t i = 0; i < g_num_neighbourhoods; i++) { |
|
|
|
|
gpr_mu_init(&g_neighbourhoods[i].mu); |
|
|
|
|
g_num_neighborhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBORHOODS); |
|
|
|
|
g_neighborhoods = (pollset_neighborhood *)gpr_zalloc( |
|
|
|
|
sizeof(*g_neighborhoods) * g_num_neighborhoods); |
|
|
|
|
for (size_t i = 0; i < g_num_neighborhoods; i++) { |
|
|
|
|
gpr_mu_init(&g_neighborhoods[i].mu); |
|
|
|
|
} |
|
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
|
} |
|
|
|
@ -454,17 +454,17 @@ static void pollset_global_shutdown(void) { |
|
|
|
|
gpr_tls_destroy(&g_current_thread_pollset); |
|
|
|
|
gpr_tls_destroy(&g_current_thread_worker); |
|
|
|
|
if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd); |
|
|
|
|
for (size_t i = 0; i < g_num_neighbourhoods; i++) { |
|
|
|
|
gpr_mu_destroy(&g_neighbourhoods[i].mu); |
|
|
|
|
for (size_t i = 0; i < g_num_neighborhoods; i++) { |
|
|
|
|
gpr_mu_destroy(&g_neighborhoods[i].mu); |
|
|
|
|
} |
|
|
|
|
gpr_free(g_neighbourhoods); |
|
|
|
|
gpr_free(g_neighborhoods); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { |
|
|
|
|
gpr_mu_init(&pollset->mu); |
|
|
|
|
*mu = &pollset->mu; |
|
|
|
|
pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()]; |
|
|
|
|
pollset->reassigning_neighbourhood = false; |
|
|
|
|
pollset->neighborhood = &g_neighborhoods[choose_neighborhood()]; |
|
|
|
|
pollset->reassigning_neighborhood = false; |
|
|
|
|
pollset->root_worker = NULL; |
|
|
|
|
pollset->kicked_without_poller = false; |
|
|
|
|
pollset->seen_inactive = true; |
|
|
|
@ -477,26 +477,26 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { |
|
|
|
|
static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { |
|
|
|
|
gpr_mu_lock(&pollset->mu); |
|
|
|
|
if (!pollset->seen_inactive) { |
|
|
|
|
pollset_neighbourhood *neighbourhood = pollset->neighbourhood; |
|
|
|
|
pollset_neighborhood *neighborhood = pollset->neighborhood; |
|
|
|
|
gpr_mu_unlock(&pollset->mu); |
|
|
|
|
retry_lock_neighbourhood: |
|
|
|
|
gpr_mu_lock(&neighbourhood->mu); |
|
|
|
|
retry_lock_neighborhood: |
|
|
|
|
gpr_mu_lock(&neighborhood->mu); |
|
|
|
|
gpr_mu_lock(&pollset->mu); |
|
|
|
|
if (!pollset->seen_inactive) { |
|
|
|
|
if (pollset->neighbourhood != neighbourhood) { |
|
|
|
|
gpr_mu_unlock(&neighbourhood->mu); |
|
|
|
|
neighbourhood = pollset->neighbourhood; |
|
|
|
|
if (pollset->neighborhood != neighborhood) { |
|
|
|
|
gpr_mu_unlock(&neighborhood->mu); |
|
|
|
|
neighborhood = pollset->neighborhood; |
|
|
|
|
gpr_mu_unlock(&pollset->mu); |
|
|
|
|
goto retry_lock_neighbourhood; |
|
|
|
|
goto retry_lock_neighborhood; |
|
|
|
|
} |
|
|
|
|
pollset->prev->next = pollset->next; |
|
|
|
|
pollset->next->prev = pollset->prev; |
|
|
|
|
if (pollset == pollset->neighbourhood->active_root) { |
|
|
|
|
pollset->neighbourhood->active_root = |
|
|
|
|
if (pollset == pollset->neighborhood->active_root) { |
|
|
|
|
pollset->neighborhood->active_root = |
|
|
|
|
pollset->next == pollset ? NULL : pollset->next; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&pollset->neighbourhood->mu); |
|
|
|
|
gpr_mu_unlock(&pollset->neighborhood->mu); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&pollset->mu); |
|
|
|
|
gpr_mu_destroy(&pollset->mu); |
|
|
|
@ -675,16 +675,16 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, |
|
|
|
|
// pollset has been observed to be inactive, we need to move back to the
|
|
|
|
|
// active list
|
|
|
|
|
bool is_reassigning = false; |
|
|
|
|
if (!pollset->reassigning_neighbourhood) { |
|
|
|
|
if (!pollset->reassigning_neighborhood) { |
|
|
|
|
is_reassigning = true; |
|
|
|
|
pollset->reassigning_neighbourhood = true; |
|
|
|
|
pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()]; |
|
|
|
|
pollset->reassigning_neighborhood = true; |
|
|
|
|
pollset->neighborhood = &g_neighborhoods[choose_neighborhood()]; |
|
|
|
|
} |
|
|
|
|
pollset_neighbourhood *neighbourhood = pollset->neighbourhood; |
|
|
|
|
pollset_neighborhood *neighborhood = pollset->neighborhood; |
|
|
|
|
gpr_mu_unlock(&pollset->mu); |
|
|
|
|
// pollset unlocked: state may change (even worker->kick_state)
|
|
|
|
|
retry_lock_neighbourhood: |
|
|
|
|
gpr_mu_lock(&neighbourhood->mu); |
|
|
|
|
retry_lock_neighborhood: |
|
|
|
|
gpr_mu_lock(&neighborhood->mu); |
|
|
|
|
gpr_mu_lock(&pollset->mu); |
|
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
|
gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d", |
|
|
|
@ -692,17 +692,17 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, |
|
|
|
|
is_reassigning); |
|
|
|
|
} |
|
|
|
|
if (pollset->seen_inactive) { |
|
|
|
|
if (neighbourhood != pollset->neighbourhood) { |
|
|
|
|
gpr_mu_unlock(&neighbourhood->mu); |
|
|
|
|
neighbourhood = pollset->neighbourhood; |
|
|
|
|
if (neighborhood != pollset->neighborhood) { |
|
|
|
|
gpr_mu_unlock(&neighborhood->mu); |
|
|
|
|
neighborhood = pollset->neighborhood; |
|
|
|
|
gpr_mu_unlock(&pollset->mu); |
|
|
|
|
goto retry_lock_neighbourhood; |
|
|
|
|
goto retry_lock_neighborhood; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* In the brief time we released the pollset locks above, the worker MAY
|
|
|
|
|
have been kicked. In this case, the worker should get out of this |
|
|
|
|
pollset ASAP and hence this should neither add the pollset to |
|
|
|
|
neighbourhood nor mark the pollset as active. |
|
|
|
|
neighborhood nor mark the pollset as active. |
|
|
|
|
|
|
|
|
|
On a side note, the only way a worker's kick state could have changed |
|
|
|
|
at this point is if it were "kicked specifically". Since the worker has |
|
|
|
@ -710,25 +710,25 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, |
|
|
|
|
not visible in the "kick any" path yet */ |
|
|
|
|
if (worker->kick_state == UNKICKED) { |
|
|
|
|
pollset->seen_inactive = false; |
|
|
|
|
if (neighbourhood->active_root == NULL) { |
|
|
|
|
neighbourhood->active_root = pollset->next = pollset->prev = pollset; |
|
|
|
|
if (neighborhood->active_root == NULL) { |
|
|
|
|
neighborhood->active_root = pollset->next = pollset->prev = pollset; |
|
|
|
|
/* Make this the designated poller if there isn't one already */ |
|
|
|
|
if (worker->kick_state == UNKICKED && |
|
|
|
|
gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) { |
|
|
|
|
SET_KICK_STATE(worker, DESIGNATED_POLLER); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
pollset->next = neighbourhood->active_root; |
|
|
|
|
pollset->next = neighborhood->active_root; |
|
|
|
|
pollset->prev = pollset->next->prev; |
|
|
|
|
pollset->next->prev = pollset->prev->next = pollset; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (is_reassigning) { |
|
|
|
|
GPR_ASSERT(pollset->reassigning_neighbourhood); |
|
|
|
|
pollset->reassigning_neighbourhood = false; |
|
|
|
|
GPR_ASSERT(pollset->reassigning_neighborhood); |
|
|
|
|
pollset->reassigning_neighborhood = false; |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&neighbourhood->mu); |
|
|
|
|
gpr_mu_unlock(&neighborhood->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
worker_insert(pollset, worker); |
|
|
|
@ -763,7 +763,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* We release pollset lock in this function at a couple of places:
|
|
|
|
|
* 1. Briefly when assigning pollset to a neighbourhood |
|
|
|
|
* 1. Briefly when assigning pollset to a neighborhood |
|
|
|
|
* 2. When doing gpr_cv_wait() |
|
|
|
|
* It is possible that 'kicked_without_poller' was set to true during (1) and |
|
|
|
|
* 'shutting_down' is set to true during (1) or (2). If either of them is |
|
|
|
@ -781,12 +781,12 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, |
|
|
|
|
return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static bool check_neighbourhood_for_available_poller( |
|
|
|
|
pollset_neighbourhood *neighbourhood) { |
|
|
|
|
GPR_TIMER_BEGIN("check_neighbourhood_for_available_poller", 0); |
|
|
|
|
static bool check_neighborhood_for_available_poller( |
|
|
|
|
pollset_neighborhood *neighborhood) { |
|
|
|
|
GPR_TIMER_BEGIN("check_neighborhood_for_available_poller", 0); |
|
|
|
|
bool found_worker = false; |
|
|
|
|
do { |
|
|
|
|
grpc_pollset *inspect = neighbourhood->active_root; |
|
|
|
|
grpc_pollset *inspect = neighborhood->active_root; |
|
|
|
|
if (inspect == NULL) { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
@ -831,8 +831,8 @@ static bool check_neighbourhood_for_available_poller( |
|
|
|
|
gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect); |
|
|
|
|
} |
|
|
|
|
inspect->seen_inactive = true; |
|
|
|
|
if (inspect == neighbourhood->active_root) { |
|
|
|
|
neighbourhood->active_root = |
|
|
|
|
if (inspect == neighborhood->active_root) { |
|
|
|
|
neighborhood->active_root = |
|
|
|
|
inspect->next == inspect ? NULL : inspect->next; |
|
|
|
|
} |
|
|
|
|
inspect->next->prev = inspect->prev; |
|
|
|
@ -841,7 +841,7 @@ static bool check_neighbourhood_for_available_poller( |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&inspect->mu); |
|
|
|
|
} while (!found_worker); |
|
|
|
|
GPR_TIMER_END("check_neighbourhood_for_available_poller", 0); |
|
|
|
|
GPR_TIMER_END("check_neighborhood_for_available_poller", 0); |
|
|
|
|
return found_worker; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -873,32 +873,31 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
gpr_atm_no_barrier_store(&g_active_poller, 0); |
|
|
|
|
size_t poller_neighbourhood_idx = |
|
|
|
|
(size_t)(pollset->neighbourhood - g_neighbourhoods); |
|
|
|
|
size_t poller_neighborhood_idx = |
|
|
|
|
(size_t)(pollset->neighborhood - g_neighborhoods); |
|
|
|
|
gpr_mu_unlock(&pollset->mu); |
|
|
|
|
bool found_worker = false; |
|
|
|
|
bool scan_state[MAX_NEIGHBOURHOODS]; |
|
|
|
|
for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) { |
|
|
|
|
pollset_neighbourhood *neighbourhood = |
|
|
|
|
&g_neighbourhoods[(poller_neighbourhood_idx + i) % |
|
|
|
|
g_num_neighbourhoods]; |
|
|
|
|
if (gpr_mu_trylock(&neighbourhood->mu)) { |
|
|
|
|
found_worker = |
|
|
|
|
check_neighbourhood_for_available_poller(neighbourhood); |
|
|
|
|
gpr_mu_unlock(&neighbourhood->mu); |
|
|
|
|
bool scan_state[MAX_NEIGHBORHOODS]; |
|
|
|
|
for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) { |
|
|
|
|
pollset_neighborhood *neighborhood = |
|
|
|
|
&g_neighborhoods[(poller_neighborhood_idx + i) % |
|
|
|
|
g_num_neighborhoods]; |
|
|
|
|
if (gpr_mu_trylock(&neighborhood->mu)) { |
|
|
|
|
found_worker = check_neighborhood_for_available_poller(neighborhood); |
|
|
|
|
gpr_mu_unlock(&neighborhood->mu); |
|
|
|
|
scan_state[i] = true; |
|
|
|
|
} else { |
|
|
|
|
scan_state[i] = false; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) { |
|
|
|
|
for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) { |
|
|
|
|
if (scan_state[i]) continue; |
|
|
|
|
pollset_neighbourhood *neighbourhood = |
|
|
|
|
&g_neighbourhoods[(poller_neighbourhood_idx + i) % |
|
|
|
|
g_num_neighbourhoods]; |
|
|
|
|
gpr_mu_lock(&neighbourhood->mu); |
|
|
|
|
found_worker = check_neighbourhood_for_available_poller(neighbourhood); |
|
|
|
|
gpr_mu_unlock(&neighbourhood->mu); |
|
|
|
|
pollset_neighborhood *neighborhood = |
|
|
|
|
&g_neighborhoods[(poller_neighborhood_idx + i) % |
|
|
|
|
g_num_neighborhoods]; |
|
|
|
|
gpr_mu_lock(&neighborhood->mu); |
|
|
|
|
found_worker = check_neighborhood_for_available_poller(neighborhood); |
|
|
|
|
gpr_mu_unlock(&neighborhood->mu); |
|
|
|
|
} |
|
|
|
|
grpc_exec_ctx_flush(exec_ctx); |
|
|
|
|
gpr_mu_lock(&pollset->mu); |
|
|
|
|