|
|
|
@ -237,6 +237,7 @@ typedef struct polling_island { |
|
|
|
|
/* Wakeup fd used to wake pollers to check the contents of workqueue_items */ |
|
|
|
|
grpc_wakeup_fd workqueue_wakeup_fd; |
|
|
|
|
|
|
|
|
|
/* The list of workers waiting to do polling on this polling island */ |
|
|
|
|
gpr_mu worker_list_mu; |
|
|
|
|
worker_node worker_list_head; |
|
|
|
|
|
|
|
|
@ -265,7 +266,10 @@ struct grpc_pollset_worker { |
|
|
|
|
struct grpc_pollset_worker *next; |
|
|
|
|
struct grpc_pollset_worker *prev; |
|
|
|
|
|
|
|
|
|
/* Indicates if it is this worker's turn to do epoll */ |
|
|
|
|
gpr_atm is_polling_turn; |
|
|
|
|
|
|
|
|
|
/* Node in the polling island's worker list. */ |
|
|
|
|
worker_node pi_list_link; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
@ -1468,12 +1472,12 @@ static bool acquire_polling_lease(grpc_pollset_worker *worker, |
|
|
|
|
gpr_timespec *now) { |
|
|
|
|
bool is_lease_acquired = false; |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&pi->worker_list_mu); // Lock
|
|
|
|
|
gpr_mu_lock(&pi->worker_list_mu); // LOCK
|
|
|
|
|
long num_pollers = gpr_atm_no_barrier_load(&pi->poller_count); |
|
|
|
|
|
|
|
|
|
if (num_pollers >= GRPC_MAX_POLLERS_PER_ISLAND) { |
|
|
|
|
push_back_worker_node(&pi->worker_list_head, &worker->pi_list_link); |
|
|
|
|
gpr_mu_unlock(&pi->worker_list_mu); // Unlock
|
|
|
|
|
gpr_mu_unlock(&pi->worker_list_mu); // UNLOCK
|
|
|
|
|
|
|
|
|
|
bool is_timeout = false; |
|
|
|
|
int ret; |
|
|
|
@ -1487,51 +1491,54 @@ static bool acquire_polling_lease(grpc_pollset_worker *worker, |
|
|
|
|
|
|
|
|
|
if (ret == -1) { |
|
|
|
|
if (errno == EAGAIN) { |
|
|
|
|
// gpr_log(GPR_INFO, "timeout"); // TODO: sreek remove this
|
|
|
|
|
// log-line
|
|
|
|
|
is_timeout = true; |
|
|
|
|
} else { |
|
|
|
|
/* TODO: sreek This should not happen. If we see these log messages, it
|
|
|
|
|
* means we are most likely something incorrect in the setup needed for |
|
|
|
|
* sigwaitinfo/sigtimedwait */ |
|
|
|
|
gpr_log(GPR_ERROR, "Failed with retcode: %d (timeout_ms: %d)", errno, |
|
|
|
|
timeout_ms); |
|
|
|
|
} |
|
|
|
|
is_timeout = true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Did the worker come out of sigtimedwait due to a thread that just
|
|
|
|
|
* exited epoll and kicking it (in release_polling_lease function). */ |
|
|
|
|
bool is_polling_turn = gpr_atm_acq_load(&worker->is_polling_turn); |
|
|
|
|
/*
|
|
|
|
|
if (is_polling_turn) { |
|
|
|
|
/ gpr_log(GPR_ERROR, "do epoll is true (timeout_ms:%d)", |
|
|
|
|
timeout_ms); // TODO: sreek remove this logline
|
|
|
|
|
} |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
/* Did the worker come out of sigtimedwait due to a thread alerting it that
|
|
|
|
|
* some completion event was (likely) available in the completion queue */ |
|
|
|
|
bool is_kicked = gpr_atm_no_barrier_load(&worker->is_kicked); |
|
|
|
|
|
|
|
|
|
if (is_kicked || is_timeout) { |
|
|
|
|
*now = deadline; |
|
|
|
|
*now = deadline; /* Essentially make the epoll timeout = 0 */ |
|
|
|
|
} else if (is_polling_turn) { |
|
|
|
|
*now = gpr_now(GPR_CLOCK_MONOTONIC); |
|
|
|
|
*now = gpr_now(GPR_CLOCK_MONOTONIC); /* Reduce the epoll timeout */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&pi->worker_list_mu); // Lock
|
|
|
|
|
gpr_mu_lock(&pi->worker_list_mu); // LOCK
|
|
|
|
|
/* The node might have already been removed from the list by the poller
|
|
|
|
|
that kicked this. However it is safe to call 'remove_worker_node' on |
|
|
|
|
an already detached node */ |
|
|
|
|
remove_worker_node(&worker->pi_list_link); |
|
|
|
|
/* It is important to read the num_pollers again under the lock so that we
|
|
|
|
|
* have the latest num_pollers value that doesn't change while we are doing |
|
|
|
|
* the "(num_pollers < GPRC_MAX_POLLERS_PER_ISLAND)" a a few lines below */ |
|
|
|
|
num_pollers = gpr_atm_no_barrier_load(&pi->poller_count); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (num_pollers < GRPC_MAX_POLLERS_PER_ISLAND) { |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1); // Add a poller
|
|
|
|
|
gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1); |
|
|
|
|
is_lease_acquired = true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&pi->worker_list_mu); // Unlock
|
|
|
|
|
gpr_mu_unlock(&pi->worker_list_mu); // UNLOCK
|
|
|
|
|
return is_lease_acquired; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void release_polling_lease(polling_island *pi, grpc_error **error) { |
|
|
|
|
gpr_mu_lock(&pi->worker_list_mu); // Lock
|
|
|
|
|
|
|
|
|
|
gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1); // Remove poller
|
|
|
|
|
gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1); |
|
|
|
|
worker_node *node = pop_front_worker_node(&pi->worker_list_head); |
|
|
|
|
if (node != NULL) { |
|
|
|
|
grpc_pollset_worker *next_worker = WORKER_FROM_WORKER_LIST_NODE(node); |
|
|
|
|