|
|
|
@ -60,6 +60,14 @@ |
|
|
|
|
#include "src/core/lib/profiling/timers.h" |
|
|
|
|
#include "src/core/lib/support/block_annotate.h" |
|
|
|
|
|
|
|
|
|
/* TODO: sreek - Move this to init.c and initialize this like other tracers.
|
|
|
|
|
* Also, enable this trace by default for now. */ |
|
|
|
|
static int grpc_polling_trace = 1; |
|
|
|
|
#define GRPC_POLLING_TRACE(fmt, ...) \ |
|
|
|
|
if (grpc_polling_trace) { \
|
|
|
|
|
gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int grpc_wakeup_signal = -1; |
|
|
|
|
static bool is_grpc_wakeup_signal_initialized = false; |
|
|
|
|
|
|
|
|
@ -1058,6 +1066,8 @@ static void pollset_global_shutdown(void) { |
|
|
|
|
|
|
|
|
|
static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) { |
|
|
|
|
grpc_error *err = GRPC_ERROR_NONE; |
|
|
|
|
GRPC_POLLING_TRACE("pollset_worker_kick: Kicking worker: %p (thread id: %ld)", |
|
|
|
|
(void *)worker, worker->pt_id); |
|
|
|
|
int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal); |
|
|
|
|
if (err_num != 0) { |
|
|
|
|
err = GRPC_OS_ERROR(err_num, "pthread_kill"); |
|
|
|
@ -1104,7 +1114,6 @@ static grpc_error *pollset_kick(grpc_pollset *p, |
|
|
|
|
GPR_TIMER_BEGIN("pollset_kick", 0); |
|
|
|
|
grpc_error *error = GRPC_ERROR_NONE; |
|
|
|
|
const char *err_desc = "Kick Failure"; |
|
|
|
|
|
|
|
|
|
grpc_pollset_worker *worker = specific_worker; |
|
|
|
|
if (worker != NULL) { |
|
|
|
|
if (worker == GRPC_POLLSET_KICK_BROADCAST) { |
|
|
|
@ -1270,7 +1279,8 @@ static void pollset_reset(grpc_pollset *pollset) { |
|
|
|
|
#define GRPC_EPOLL_MAX_EVENTS 1000 |
|
|
|
|
/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */ |
|
|
|
|
static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset *pollset, int timeout_ms, |
|
|
|
|
grpc_pollset *pollset, |
|
|
|
|
grpc_pollset_worker *worker, int timeout_ms, |
|
|
|
|
sigset_t *sig_mask, grpc_error **error) { |
|
|
|
|
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; |
|
|
|
|
int epoll_fd = -1; |
|
|
|
@ -1298,6 +1308,8 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
PI_ADD_REF(pollset->polling_island, "ps"); |
|
|
|
|
GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p", |
|
|
|
|
(void *)pollset, (void *)pollset->polling_island); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pi = polling_island_maybe_get_latest(pollset->polling_island); |
|
|
|
@ -1331,6 +1343,9 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} else { |
|
|
|
|
/* We were interrupted. Save an interation by doing a zero timeout
|
|
|
|
|
epoll_wait to see if there are any other events of interest */ |
|
|
|
|
GRPC_POLLING_TRACE( |
|
|
|
|
"pollset_work: pollset: %p, worker: %p received kick", |
|
|
|
|
(void *)pollset, (void *)worker); |
|
|
|
|
ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -1347,6 +1362,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd), |
|
|
|
|
err_desc); |
|
|
|
|
} else if (data_ptr == &polling_island_wakeup_fd) { |
|
|
|
|
GRPC_POLLING_TRACE( |
|
|
|
|
"pollset_work: pollset: %p, worker: %p polling island (epoll_fd: " |
|
|
|
|
"%d) got merged", |
|
|
|
|
(void *)pollset, (void *)worker, epoll_fd); |
|
|
|
|
/* This means that our polling island is merged with a different
|
|
|
|
|
island. We do not have to do anything here since the subsequent call |
|
|
|
|
to the function pollset_work_and_unlock() will pick up the correct |
|
|
|
@ -1442,8 +1461,8 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
|
|
|
|
|
push_front_worker(pollset, &worker); /* Add worker to pollset */ |
|
|
|
|
|
|
|
|
|
pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &g_orig_sigmask, |
|
|
|
|
&error); |
|
|
|
|
pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms, |
|
|
|
|
&g_orig_sigmask, &error); |
|
|
|
|
grpc_exec_ctx_flush(exec_ctx); |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&pollset->mu); |
|
|
|
@ -1506,17 +1525,38 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
pi_new = fd->polling_island; |
|
|
|
|
if (pi_new == NULL) { |
|
|
|
|
pi_new = polling_island_create(fd, &error); |
|
|
|
|
|
|
|
|
|
GRPC_POLLING_TRACE( |
|
|
|
|
"pollset_add_fd: Created new polling island. pi_new:%p (fd: %d, " |
|
|
|
|
"pollset: %p)", |
|
|
|
|
(void *)pi_new, fd->fd, (void *)pollset); |
|
|
|
|
} |
|
|
|
|
} else if (fd->polling_island == NULL) { |
|
|
|
|
pi_new = polling_island_lock(pollset->polling_island); |
|
|
|
|
polling_island_add_fds_locked(pi_new, &fd, 1, true, &error); |
|
|
|
|
gpr_mu_unlock(&pi_new->mu); |
|
|
|
|
|
|
|
|
|
GRPC_POLLING_TRACE( |
|
|
|
|
"pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, " |
|
|
|
|
"pollset->pi: %p)", |
|
|
|
|
(void *)pi_new, fd->fd, (void *)pollset, |
|
|
|
|
(void *)pollset->polling_island); |
|
|
|
|
} else if (pollset->polling_island == NULL) { |
|
|
|
|
pi_new = polling_island_lock(fd->polling_island); |
|
|
|
|
gpr_mu_unlock(&pi_new->mu); |
|
|
|
|
|
|
|
|
|
GRPC_POLLING_TRACE( |
|
|
|
|
"pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: " |
|
|
|
|
"%p, fd->pi: %p", |
|
|
|
|
(void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island); |
|
|
|
|
} else { |
|
|
|
|
pi_new = polling_island_merge(fd->polling_island, pollset->polling_island, |
|
|
|
|
&error); |
|
|
|
|
GRPC_POLLING_TRACE( |
|
|
|
|
"pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: " |
|
|
|
|
"%p, fd->pi: %p, pollset->pi: %p)", |
|
|
|
|
(void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island, |
|
|
|
|
(void *)pollset->polling_island); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* At this point, pi_new is the polling island that both fd->polling_island
|
|
|
|
|