|
|
|
@ -40,7 +40,6 @@ |
|
|
|
|
|
|
|
|
|
#include <assert.h> |
|
|
|
|
#include <errno.h> |
|
|
|
|
#include <limits.h> |
|
|
|
|
#include <poll.h> |
|
|
|
|
#include <pthread.h> |
|
|
|
|
#include <signal.h> |
|
|
|
@ -63,7 +62,6 @@ |
|
|
|
|
#include "src/core/lib/iomgr/workqueue.h" |
|
|
|
|
#include "src/core/lib/profiling/timers.h" |
|
|
|
|
#include "src/core/lib/support/block_annotate.h" |
|
|
|
|
#include "src/core/lib/support/env.h" |
|
|
|
|
|
|
|
|
|
/* TODO: sreek - Move this to init.c and initialize this like other tracers. */ |
|
|
|
|
static int grpc_polling_trace = 0; /* Disabled by default */ |
|
|
|
@ -75,10 +73,6 @@ static int grpc_polling_trace = 0; /* Disabled by default */ |
|
|
|
|
/* Uncomment the following to enable extra checks on poll_object operations */ |
|
|
|
|
/* #define PO_DEBUG */ |
|
|
|
|
|
|
|
|
|
/* The maximum number of polling threads per polling island. By default no
|
|
|
|
|
limit */ |
|
|
|
|
static int g_max_pollers_per_pi = INT_MAX; |
|
|
|
|
|
|
|
|
|
static int grpc_wakeup_signal = -1; |
|
|
|
|
static bool is_grpc_wakeup_signal_initialized = false; |
|
|
|
|
|
|
|
|
@ -201,11 +195,6 @@ static void fd_global_shutdown(void); |
|
|
|
|
|
|
|
|
|
#endif /* !defined(GRPC_PI_REF_COUNT_DEBUG) */ |
|
|
|
|
|
|
|
|
|
typedef struct worker_node { |
|
|
|
|
struct worker_node *next; |
|
|
|
|
struct worker_node *prev; |
|
|
|
|
} worker_node; |
|
|
|
|
|
|
|
|
|
/* This is also used as grpc_workqueue (by directly casing it) */ |
|
|
|
|
typedef struct polling_island { |
|
|
|
|
grpc_closure_scheduler workqueue_scheduler; |
|
|
|
@ -240,10 +229,6 @@ typedef struct polling_island { |
|
|
|
|
/* Wakeup fd used to wake pollers to check the contents of workqueue_items */ |
|
|
|
|
grpc_wakeup_fd workqueue_wakeup_fd; |
|
|
|
|
|
|
|
|
|
/* The list of workers waiting to do polling on this polling island */ |
|
|
|
|
gpr_mu worker_list_mu; |
|
|
|
|
worker_node worker_list_head; |
|
|
|
|
|
|
|
|
|
/* The fd of the underlying epoll set */ |
|
|
|
|
int epoll_fd; |
|
|
|
|
|
|
|
|
@ -256,24 +241,14 @@ typedef struct polling_island { |
|
|
|
|
/*******************************************************************************
|
|
|
|
|
* Pollset Declarations |
|
|
|
|
*/ |
|
|
|
|
#define WORKER_FROM_WORKER_LIST_NODE(p) \ |
|
|
|
|
(struct grpc_pollset_worker *)(((char *)(p)) - \
|
|
|
|
|
offsetof(grpc_pollset_worker, pi_list_link)) |
|
|
|
|
struct grpc_pollset_worker { |
|
|
|
|
/* Thread id of this worker */ |
|
|
|
|
pthread_t pt_id; |
|
|
|
|
|
|
|
|
|
/* Used to prevent a worker from getting kicked multiple times */ |
|
|
|
|
gpr_atm is_kicked; |
|
|
|
|
|
|
|
|
|
struct grpc_pollset_worker *next; |
|
|
|
|
struct grpc_pollset_worker *prev; |
|
|
|
|
|
|
|
|
|
/* Indicates if it is this worker's turn to do epoll */ |
|
|
|
|
gpr_atm is_polling_turn; |
|
|
|
|
|
|
|
|
|
/* Node in the polling island's worker list. */ |
|
|
|
|
worker_node pi_list_link; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
struct grpc_pollset { |
|
|
|
@ -417,47 +392,7 @@ static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void worker_node_init(worker_node *node) { |
|
|
|
|
node->next = node->prev = node; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Not thread safe. Do under a list-level lock */ |
|
|
|
|
static void push_back_worker_node(worker_node *head, worker_node *node) { |
|
|
|
|
node->next = head; |
|
|
|
|
node->prev = head->prev; |
|
|
|
|
head->prev->next = node; |
|
|
|
|
head->prev = node; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Not thread safe. Do under a list-level lock */ |
|
|
|
|
static void remove_worker_node(worker_node *node) { |
|
|
|
|
node->next->prev = node->prev; |
|
|
|
|
node->prev->next = node->next; |
|
|
|
|
/* If node's next and prev point to itself, the node is considered detached
|
|
|
|
|
* from the list*/ |
|
|
|
|
node->next = node->prev = node; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Not thread safe. Do under a list-level lock */ |
|
|
|
|
static worker_node *pop_front_worker_node(worker_node *head) { |
|
|
|
|
worker_node *node = head->next; |
|
|
|
|
if (node != head) { |
|
|
|
|
remove_worker_node(node); |
|
|
|
|
} else { |
|
|
|
|
node = NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return node; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Returns true if the node's next and prev are pointing to itself (which
|
|
|
|
|
indicates that the node is not in the list */ |
|
|
|
|
static bool is_worker_node_detached(worker_node *node) { |
|
|
|
|
return (node->next == node->prev && node->next == node); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* The caller is expected to hold pi->mu lock before calling this function
|
|
|
|
|
*/ |
|
|
|
|
/* The caller is expected to hold pi->mu lock before calling this function */ |
|
|
|
|
static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds, |
|
|
|
|
size_t fd_count, bool add_fd_refs, |
|
|
|
|
grpc_error **error) { |
|
|
|
@ -611,9 +546,6 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, |
|
|
|
|
gpr_atm_rel_store(&pi->poller_count, 0); |
|
|
|
|
gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); |
|
|
|
|
|
|
|
|
|
gpr_mu_init(&pi->worker_list_mu); |
|
|
|
|
worker_node_init(&pi->worker_list_head); |
|
|
|
|
|
|
|
|
|
if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd), |
|
|
|
|
err_desc)) { |
|
|
|
|
goto done; |
|
|
|
@ -652,9 +584,6 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) { |
|
|
|
|
gpr_mpscq_destroy(&pi->workqueue_items); |
|
|
|
|
gpr_mu_destroy(&pi->mu); |
|
|
|
|
grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd); |
|
|
|
|
gpr_mu_destroy(&pi->worker_list_mu); |
|
|
|
|
GPR_ASSERT(is_worker_node_detached(&pi->worker_list_head)); |
|
|
|
|
|
|
|
|
|
gpr_free(pi->fds); |
|
|
|
|
gpr_free(pi); |
|
|
|
|
} |
|
|
|
@ -1173,7 +1102,6 @@ GPR_TLS_DECL(g_current_thread_pollset); |
|
|
|
|
GPR_TLS_DECL(g_current_thread_worker); |
|
|
|
|
static __thread bool g_initialized_sigmask; |
|
|
|
|
static __thread sigset_t g_orig_sigmask; |
|
|
|
|
static __thread sigset_t g_wakeup_sig_set; |
|
|
|
|
|
|
|
|
|
static void sig_handler(int sig_num) { |
|
|
|
|
#ifdef GRPC_EPOLL_DEBUG |
|
|
|
@ -1181,14 +1109,6 @@ static void sig_handler(int sig_num) { |
|
|
|
|
#endif |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void pollset_worker_init(grpc_pollset_worker *worker) { |
|
|
|
|
worker->pt_id = pthread_self(); |
|
|
|
|
worker->next = worker->prev = NULL; |
|
|
|
|
gpr_atm_no_barrier_store(&worker->is_kicked, (gpr_atm)0); |
|
|
|
|
gpr_atm_no_barrier_store(&worker->is_polling_turn, (gpr_atm)0); |
|
|
|
|
worker_node_init(&worker->pi_list_link); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); } |
|
|
|
|
|
|
|
|
|
/* Global state management */ |
|
|
|
@ -1205,12 +1125,11 @@ static void pollset_global_shutdown(void) { |
|
|
|
|
gpr_tls_destroy(&g_current_thread_worker); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_error *worker_kick(grpc_pollset_worker *worker, |
|
|
|
|
gpr_atm *is_kicked) { |
|
|
|
|
static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) { |
|
|
|
|
grpc_error *err = GRPC_ERROR_NONE; |
|
|
|
|
|
|
|
|
|
/* Kick the worker only if it was not already kicked */ |
|
|
|
|
if (gpr_atm_no_barrier_cas(is_kicked, (gpr_atm)0, (gpr_atm)1)) { |
|
|
|
|
if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) { |
|
|
|
|
GRPC_POLLING_TRACE( |
|
|
|
|
"pollset_worker_kick: Kicking worker: %p (thread id: %ld)", |
|
|
|
|
(void *)worker, (long int)worker->pt_id); |
|
|
|
@ -1222,14 +1141,6 @@ static grpc_error *worker_kick(grpc_pollset_worker *worker, |
|
|
|
|
return err; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) { |
|
|
|
|
return worker_kick(worker, &worker->is_kicked); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_error *poller_kick(grpc_pollset_worker *worker) { |
|
|
|
|
return worker_kick(worker, &worker->is_polling_turn); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Return 1 if the pollset has active threads in pollset_work (pollset must
|
|
|
|
|
* be locked) */ |
|
|
|
|
static int pollset_has_workers(grpc_pollset *p) { |
|
|
|
@ -1335,22 +1246,6 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { |
|
|
|
|
pollset->shutdown_done = NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Convert millis to timespec (clock-type is assumed to be GPR_TIMESPAN) */ |
|
|
|
|
static struct timespec millis_to_timespec(int millis) { |
|
|
|
|
struct timespec linux_ts; |
|
|
|
|
gpr_timespec gpr_ts; |
|
|
|
|
|
|
|
|
|
if (millis == -1) { |
|
|
|
|
gpr_ts = gpr_inf_future(GPR_TIMESPAN); |
|
|
|
|
} else { |
|
|
|
|
gpr_ts = gpr_time_from_millis(millis, GPR_TIMESPAN); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
linux_ts.tv_sec = (time_t)gpr_ts.tv_sec; |
|
|
|
|
linux_ts.tv_nsec = gpr_ts.tv_nsec; |
|
|
|
|
return linux_ts; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Convert a timespec to milliseconds:
|
|
|
|
|
- Very small or negative poll times are clamped to zero to do a non-blocking |
|
|
|
|
poll (which becomes spin polling) |
|
|
|
@ -1469,200 +1364,35 @@ static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* NOTE: This function may modify 'now' */ |
|
|
|
|
static bool acquire_polling_lease(grpc_pollset_worker *worker, |
|
|
|
|
polling_island *pi, gpr_timespec deadline, |
|
|
|
|
gpr_timespec *now) { |
|
|
|
|
bool is_lease_acquired = false; |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&pi->worker_list_mu); // LOCK
|
|
|
|
|
long num_pollers = gpr_atm_no_barrier_load(&pi->poller_count); |
|
|
|
|
|
|
|
|
|
if (num_pollers >= g_max_pollers_per_pi) { |
|
|
|
|
push_back_worker_node(&pi->worker_list_head, &worker->pi_list_link); |
|
|
|
|
gpr_mu_unlock(&pi->worker_list_mu); // UNLOCK
|
|
|
|
|
|
|
|
|
|
bool is_timeout = false; |
|
|
|
|
int ret; |
|
|
|
|
int timeout_ms = poll_deadline_to_millis_timeout(deadline, *now); |
|
|
|
|
if (timeout_ms == -1) { |
|
|
|
|
ret = sigwaitinfo(&g_wakeup_sig_set, NULL); |
|
|
|
|
} else { |
|
|
|
|
struct timespec sigwait_timeout = millis_to_timespec(timeout_ms); |
|
|
|
|
GRPC_SCHEDULING_START_BLOCKING_REGION; |
|
|
|
|
ret = sigtimedwait(&g_wakeup_sig_set, NULL, &sigwait_timeout); |
|
|
|
|
GRPC_SCHEDULING_END_BLOCKING_REGION; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (ret == -1) { |
|
|
|
|
if (errno == EAGAIN) { |
|
|
|
|
is_timeout = true; |
|
|
|
|
} else { |
|
|
|
|
/* NOTE: This should not happen. If we see these log messages, it means
|
|
|
|
|
we are most likely doing something incorrect in the setup * needed |
|
|
|
|
for sigwaitinfo/sigtimedwait */ |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"sigtimedwait failed with retcode: %d (timeout_ms: %d)", errno, |
|
|
|
|
timeout_ms); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Did the worker come out of sigtimedwait due to a thread that just
|
|
|
|
|
exited epoll and kicking it (in release_polling_lease function). */ |
|
|
|
|
bool is_polling_turn = gpr_atm_acq_load(&worker->is_polling_turn); |
|
|
|
|
|
|
|
|
|
/* Did the worker come out of sigtimedwait due to a thread alerting it that
|
|
|
|
|
some completion event was (likely) available in the completion queue */ |
|
|
|
|
bool is_kicked = gpr_atm_no_barrier_load(&worker->is_kicked); |
|
|
|
|
|
|
|
|
|
if (is_kicked || is_timeout) { |
|
|
|
|
*now = deadline; /* Essentially make the epoll timeout = 0 */ |
|
|
|
|
} else if (is_polling_turn) { |
|
|
|
|
*now = gpr_now(GPR_CLOCK_MONOTONIC); /* Reduce the epoll timeout */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&pi->worker_list_mu); // LOCK
|
|
|
|
|
/* The node might have already been removed from the list by the poller
|
|
|
|
|
that kicked this. However it is safe to call 'remove_worker_node' on |
|
|
|
|
an already detached node */ |
|
|
|
|
remove_worker_node(&worker->pi_list_link); |
|
|
|
|
/* It is important to read the num_pollers again under the lock so that we
|
|
|
|
|
* have the latest num_pollers value that doesn't change while we are doing |
|
|
|
|
* the "(num_pollers < g_max_pollers_per_pi)" a a few lines below */ |
|
|
|
|
num_pollers = gpr_atm_no_barrier_load(&pi->poller_count); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (num_pollers < g_max_pollers_per_pi) { |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1); |
|
|
|
|
is_lease_acquired = true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&pi->worker_list_mu); // UNLOCK
|
|
|
|
|
return is_lease_acquired; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void release_polling_lease(polling_island *pi, grpc_error **error) { |
|
|
|
|
gpr_mu_lock(&pi->worker_list_mu); |
|
|
|
|
|
|
|
|
|
gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1); |
|
|
|
|
worker_node *node = pop_front_worker_node(&pi->worker_list_head); |
|
|
|
|
if (node != NULL) { |
|
|
|
|
grpc_pollset_worker *next_worker = WORKER_FROM_WORKER_LIST_NODE(node); |
|
|
|
|
append_error(error, poller_kick(next_worker), "poller kick error"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&pi->worker_list_mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#define GRPC_EPOLL_MAX_EVENTS 100 |
|
|
|
|
static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd, |
|
|
|
|
grpc_pollset *pollset, polling_island *pi, |
|
|
|
|
grpc_pollset_worker *worker, |
|
|
|
|
gpr_timespec now, gpr_timespec deadline, |
|
|
|
|
sigset_t *sig_mask, grpc_error **error) { |
|
|
|
|
/* Only g_max_pollers_per_pi threads can be doing polling in parallel.
|
|
|
|
|
If we cannot get a lease, we cannot continue to do epoll_pwait() */ |
|
|
|
|
if (!acquire_polling_lease(worker, pi, deadline, &now)) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; |
|
|
|
|
int ep_rv; |
|
|
|
|
char *err_msg; |
|
|
|
|
const char *err_desc = "pollset_work_and_unlock"; |
|
|
|
|
|
|
|
|
|
/* timeout_ms is the time between 'now' and 'deadline' */ |
|
|
|
|
int timeout_ms = poll_deadline_to_millis_timeout(deadline, now); |
|
|
|
|
|
|
|
|
|
GRPC_SCHEDULING_START_BLOCKING_REGION; |
|
|
|
|
ep_rv = |
|
|
|
|
epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask); |
|
|
|
|
GRPC_SCHEDULING_END_BLOCKING_REGION; |
|
|
|
|
|
|
|
|
|
/* Give back the lease right away so that some other thread can enter */ |
|
|
|
|
release_polling_lease(pi, error); |
|
|
|
|
|
|
|
|
|
if (ep_rv < 0) { |
|
|
|
|
if (errno != EINTR) { |
|
|
|
|
gpr_asprintf(&err_msg, |
|
|
|
|
"epoll_wait() epoll fd: %d failed with error: %d (%s)", |
|
|
|
|
epoll_fd, errno, strerror(errno)); |
|
|
|
|
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); |
|
|
|
|
} else { |
|
|
|
|
/* We were interrupted. Save an interation by doing a zero timeout
|
|
|
|
|
epoll_wait to see if there are any other events of interest */ |
|
|
|
|
GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker: %p received kick", |
|
|
|
|
(void *)pollset, (void *)worker); |
|
|
|
|
ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#ifdef GRPC_TSAN |
|
|
|
|
/* See the definition of g_poll_sync for more details */ |
|
|
|
|
gpr_atm_acq_load(&g_epoll_sync); |
|
|
|
|
#endif /* defined(GRPC_TSAN) */ |
|
|
|
|
|
|
|
|
|
for (int i = 0; i < ep_rv; ++i) { |
|
|
|
|
void *data_ptr = ep_ev[i].data.ptr; |
|
|
|
|
if (data_ptr == &global_wakeup_fd) { |
|
|
|
|
grpc_timer_consume_kick(); |
|
|
|
|
append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), |
|
|
|
|
err_desc); |
|
|
|
|
} else if (data_ptr == &pi->workqueue_wakeup_fd) { |
|
|
|
|
append_error(error, |
|
|
|
|
grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd), |
|
|
|
|
err_desc); |
|
|
|
|
maybe_do_workqueue_work(exec_ctx, pi); |
|
|
|
|
} else if (data_ptr == &polling_island_wakeup_fd) { |
|
|
|
|
GRPC_POLLING_TRACE( |
|
|
|
|
"pollset_work: pollset: %p, worker: %p polling island (epoll_fd: " |
|
|
|
|
"%d) got merged", |
|
|
|
|
(void *)pollset, (void *)worker, epoll_fd); |
|
|
|
|
/* This means that our polling island is merged with a different
|
|
|
|
|
island. We do not have to do anything here since the subsequent call |
|
|
|
|
to the function pollset_work_and_unlock() will pick up the correct |
|
|
|
|
epoll_fd */ |
|
|
|
|
} else { |
|
|
|
|
grpc_fd *fd = data_ptr; |
|
|
|
|
int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); |
|
|
|
|
int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); |
|
|
|
|
int write_ev = ep_ev[i].events & EPOLLOUT; |
|
|
|
|
if (read_ev || cancel) { |
|
|
|
|
fd_become_readable(exec_ctx, fd, pollset); |
|
|
|
|
} |
|
|
|
|
if (write_ev || cancel) { |
|
|
|
|
fd_become_writable(exec_ctx, fd); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */ |
|
|
|
|
static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset *pollset, |
|
|
|
|
grpc_pollset_worker *worker, |
|
|
|
|
gpr_timespec now, gpr_timespec deadline, |
|
|
|
|
grpc_pollset_worker *worker, int timeout_ms, |
|
|
|
|
sigset_t *sig_mask, grpc_error **error) { |
|
|
|
|
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; |
|
|
|
|
int epoll_fd = -1; |
|
|
|
|
int ep_rv; |
|
|
|
|
polling_island *pi = NULL; |
|
|
|
|
char *err_msg; |
|
|
|
|
const char *err_desc = "pollset_work_and_unlock"; |
|
|
|
|
GPR_TIMER_BEGIN("pollset_work_and_unlock", 0); |
|
|
|
|
|
|
|
|
|
/* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
|
|
|
|
|
latest polling island pointed by pollset->po.pi |
|
|
|
|
|
|
|
|
|
Since epoll_fd is immutable, it is safe to read it without a lock on the |
|
|
|
|
polling island. There is however a possibility that the polling island from |
|
|
|
|
which we got the epoll_fd, got merged with another island in the meantime. |
|
|
|
|
This is okay because in such a case, we will wakeup right-away from |
|
|
|
|
epoll_pwait() (because any merge will poison the old polling island's epoll |
|
|
|
|
set 'polling_island_wakeup_fd') and then pick up the latest polling_island |
|
|
|
|
the next time this function - pollset_work_and_unlock()) is called */ |
|
|
|
|
Since epoll_fd is immutable, we can read it without obtaining the polling |
|
|
|
|
island lock. There is however a possibility that the polling island (from |
|
|
|
|
which we got the epoll_fd) got merged with another island while we are |
|
|
|
|
in this function. This is still okay because in such a case, we will wakeup |
|
|
|
|
right-away from epoll_wait() and pick up the latest polling_island the next |
|
|
|
|
this function (i.e pollset_work_and_unlock()) is called */ |
|
|
|
|
|
|
|
|
|
if (pollset->po.pi == NULL) { |
|
|
|
|
pollset->po.pi = polling_island_create(exec_ctx, NULL, error); |
|
|
|
|
if (pollset->po.pi == NULL) { |
|
|
|
|
GPR_TIMER_END("pollset_work_and_unlock", 0); |
|
|
|
|
return; /* Fatal error. Cannot continue */ |
|
|
|
|
return; /* Fatal error. We cannot continue */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
PI_ADD_REF(pollset->po.pi, "ps"); |
|
|
|
@ -1693,10 +1423,70 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
the completion queue, so there's no need to poll... so we skip that and |
|
|
|
|
redo the complete loop to verify */ |
|
|
|
|
if (!maybe_do_workqueue_work(exec_ctx, pi)) { |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1); |
|
|
|
|
g_current_thread_polling_island = pi; |
|
|
|
|
pollset_do_epoll_pwait(exec_ctx, epoll_fd, pollset, pi, worker, now, |
|
|
|
|
deadline, sig_mask, error); |
|
|
|
|
|
|
|
|
|
GRPC_SCHEDULING_START_BLOCKING_REGION; |
|
|
|
|
ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, |
|
|
|
|
sig_mask); |
|
|
|
|
GRPC_SCHEDULING_END_BLOCKING_REGION; |
|
|
|
|
if (ep_rv < 0) { |
|
|
|
|
if (errno != EINTR) { |
|
|
|
|
gpr_asprintf(&err_msg, |
|
|
|
|
"epoll_wait() epoll fd: %d failed with error: %d (%s)", |
|
|
|
|
epoll_fd, errno, strerror(errno)); |
|
|
|
|
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); |
|
|
|
|
} else { |
|
|
|
|
/* We were interrupted. Save an interation by doing a zero timeout
|
|
|
|
|
epoll_wait to see if there are any other events of interest */ |
|
|
|
|
GRPC_POLLING_TRACE( |
|
|
|
|
"pollset_work: pollset: %p, worker: %p received kick", |
|
|
|
|
(void *)pollset, (void *)worker); |
|
|
|
|
ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#ifdef GRPC_TSAN |
|
|
|
|
/* See the definition of g_poll_sync for more details */ |
|
|
|
|
gpr_atm_acq_load(&g_epoll_sync); |
|
|
|
|
#endif /* defined(GRPC_TSAN) */ |
|
|
|
|
|
|
|
|
|
for (int i = 0; i < ep_rv; ++i) { |
|
|
|
|
void *data_ptr = ep_ev[i].data.ptr; |
|
|
|
|
if (data_ptr == &global_wakeup_fd) { |
|
|
|
|
grpc_timer_consume_kick(); |
|
|
|
|
append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), |
|
|
|
|
err_desc); |
|
|
|
|
} else if (data_ptr == &pi->workqueue_wakeup_fd) { |
|
|
|
|
append_error(error, |
|
|
|
|
grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd), |
|
|
|
|
err_desc); |
|
|
|
|
maybe_do_workqueue_work(exec_ctx, pi); |
|
|
|
|
} else if (data_ptr == &polling_island_wakeup_fd) { |
|
|
|
|
GRPC_POLLING_TRACE( |
|
|
|
|
"pollset_work: pollset: %p, worker: %p polling island (epoll_fd: " |
|
|
|
|
"%d) got merged", |
|
|
|
|
(void *)pollset, (void *)worker, epoll_fd); |
|
|
|
|
/* This means that our polling island is merged with a different
|
|
|
|
|
island. We do not have to do anything here since the subsequent call |
|
|
|
|
to the function pollset_work_and_unlock() will pick up the correct |
|
|
|
|
epoll_fd */ |
|
|
|
|
} else { |
|
|
|
|
grpc_fd *fd = data_ptr; |
|
|
|
|
int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); |
|
|
|
|
int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); |
|
|
|
|
int write_ev = ep_ev[i].events & EPOLLOUT; |
|
|
|
|
if (read_ev || cancel) { |
|
|
|
|
fd_become_readable(exec_ctx, fd, pollset); |
|
|
|
|
} |
|
|
|
|
if (write_ev || cancel) { |
|
|
|
|
fd_become_writable(exec_ctx, fd); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
g_current_thread_polling_island = NULL; |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(pi != NULL); |
|
|
|
@ -1720,9 +1510,14 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
gpr_timespec now, gpr_timespec deadline) { |
|
|
|
|
GPR_TIMER_BEGIN("pollset_work", 0); |
|
|
|
|
grpc_error *error = GRPC_ERROR_NONE; |
|
|
|
|
int timeout_ms = poll_deadline_to_millis_timeout(deadline, now); |
|
|
|
|
|
|
|
|
|
sigset_t new_mask; |
|
|
|
|
|
|
|
|
|
grpc_pollset_worker worker; |
|
|
|
|
pollset_worker_init(&worker); |
|
|
|
|
worker.next = worker.prev = NULL; |
|
|
|
|
worker.pt_id = pthread_self(); |
|
|
|
|
gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0); |
|
|
|
|
|
|
|
|
|
if (worker_hdl) *worker_hdl = &worker; |
|
|
|
|
|
|
|
|
@ -1756,9 +1551,9 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
misses acting on a kick */ |
|
|
|
|
|
|
|
|
|
if (!g_initialized_sigmask) { |
|
|
|
|
sigemptyset(&g_wakeup_sig_set); |
|
|
|
|
sigaddset(&g_wakeup_sig_set, grpc_wakeup_signal); |
|
|
|
|
pthread_sigmask(SIG_BLOCK, &g_wakeup_sig_set, &g_orig_sigmask); |
|
|
|
|
sigemptyset(&new_mask); |
|
|
|
|
sigaddset(&new_mask, grpc_wakeup_signal); |
|
|
|
|
pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask); |
|
|
|
|
sigdelset(&g_orig_sigmask, grpc_wakeup_signal); |
|
|
|
|
g_initialized_sigmask = true; |
|
|
|
|
/* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
|
|
|
|
@ -1773,7 +1568,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
|
|
|
|
|
push_front_worker(pollset, &worker); /* Add worker to pollset */ |
|
|
|
|
|
|
|
|
|
pollset_work_and_unlock(exec_ctx, pollset, &worker, now, deadline, |
|
|
|
|
pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms, |
|
|
|
|
&g_orig_sigmask, &error); |
|
|
|
|
grpc_exec_ctx_flush(exec_ctx); |
|
|
|
|
|
|
|
|
@ -2126,24 +1921,6 @@ static bool is_epoll_available() { |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* This is mainly for testing purposes. Checks to see if environment variable
|
|
|
|
|
* GRPC_MAX_POLLERS_PER_PI is set and if so, assigns that value to |
|
|
|
|
* g_max_pollers_per_pi (any negative value is considered INT_MAX) */ |
|
|
|
|
static void set_max_pollers_per_island() { |
|
|
|
|
char *s = gpr_getenv("GRPC_MAX_POLLERS_PER_PI"); |
|
|
|
|
if (s) { |
|
|
|
|
g_max_pollers_per_pi = (int)strtol(s, NULL, 10); |
|
|
|
|
if (g_max_pollers_per_pi < 0) { |
|
|
|
|
g_max_pollers_per_pi = INT_MAX; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
g_max_pollers_per_pi = INT_MAX; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_log(GPR_INFO, "Max number of pollers per polling island: %d", |
|
|
|
|
g_max_pollers_per_pi); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { |
|
|
|
|
/* If use of signals is disabled, we cannot use epoll engine*/ |
|
|
|
|
if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) { |
|
|
|
@ -2162,8 +1939,6 @@ const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { |
|
|
|
|
grpc_use_signal(SIGRTMIN + 6); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
set_max_pollers_per_island(); |
|
|
|
|
|
|
|
|
|
fd_global_init(); |
|
|
|
|
|
|
|
|
|
if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) { |
|
|
|
|