|
|
|
@ -516,6 +516,21 @@ static void polling_island_global_init() { |
|
|
|
|
g_pi_freelist = NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void polling_island_global_shutdown() { |
|
|
|
|
polling_island *next; |
|
|
|
|
gpr_mu_lock(&g_pi_freelist_mu); |
|
|
|
|
gpr_mu_unlock(&g_pi_freelist_mu); |
|
|
|
|
while (g_pi_freelist != NULL) { |
|
|
|
|
next = g_pi_freelist->next_free; |
|
|
|
|
gpr_mu_destroy(&g_pi_freelist->mu); |
|
|
|
|
gpr_free(g_pi_freelist->fds); |
|
|
|
|
gpr_free(g_pi_freelist); |
|
|
|
|
g_pi_freelist = next; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_mu_destroy(&g_pi_freelist_mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*******************************************************************************
|
|
|
|
|
* Fd Definitions |
|
|
|
|
*/ |
|
|
|
@ -784,7 +799,7 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
static void sig_handler(int sig_num) { |
|
|
|
|
#ifdef GPRC_EPOLL_DEBUG |
|
|
|
|
#ifdef GRPC_EPOLL_DEBUG |
|
|
|
|
gpr_log(GPR_INFO, "Received signal %d", sig_num); |
|
|
|
|
#endif |
|
|
|
|
} |
|
|
|
@ -792,7 +807,7 @@ static void sig_handler(int sig_num) { |
|
|
|
|
/* Global state management */ |
|
|
|
|
static void pollset_global_init(void) { |
|
|
|
|
grpc_wakeup_fd_init(&grpc_global_wakeup_fd); |
|
|
|
|
signal(SIGUSR1, sig_handler); |
|
|
|
|
signal(SIGUSR1, sig_handler); /* TODO: sreek - Do not hardcode SIGUSR1 */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void pollset_global_shutdown(void) { |
|
|
|
@ -840,7 +855,6 @@ static void pollset_kick(grpc_pollset *p, |
|
|
|
|
grpc_pollset_worker *worker = specific_worker; |
|
|
|
|
if (worker != NULL) { |
|
|
|
|
if (worker == GRPC_POLLSET_KICK_BROADCAST) { |
|
|
|
|
gpr_log(GPR_DEBUG, "pollset_kick: broadcast!"); |
|
|
|
|
if (pollset_has_workers(p)) { |
|
|
|
|
GPR_TIMER_BEGIN("pollset_kick.broadcast", 0); |
|
|
|
|
for (worker = p->root_worker.next; worker != &p->root_worker; |
|
|
|
@ -848,12 +862,10 @@ static void pollset_kick(grpc_pollset *p, |
|
|
|
|
pthread_kill(worker->pt_id, SIGUSR1); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
gpr_log(GPR_DEBUG, "pollset_kick: (broadcast) Kicked without pollers"); |
|
|
|
|
p->kicked_without_pollers = true; |
|
|
|
|
} |
|
|
|
|
GPR_TIMER_END("pollset_kick.broadcast", 0); |
|
|
|
|
} else { |
|
|
|
|
gpr_log(GPR_DEBUG, "pollset_kick: kicked kicked_specifically"); |
|
|
|
|
GPR_TIMER_MARK("kicked_specifically", 0); |
|
|
|
|
worker->kicked_specifically = true; |
|
|
|
|
pthread_kill(worker->pt_id, SIGUSR1); |
|
|
|
@ -864,11 +876,9 @@ static void pollset_kick(grpc_pollset *p, |
|
|
|
|
if (worker != NULL) { |
|
|
|
|
GPR_TIMER_MARK("finally_kick", 0); |
|
|
|
|
push_back_worker(p, worker); |
|
|
|
|
gpr_log(GPR_DEBUG, "pollset_kick: anonymous kick"); |
|
|
|
|
pthread_kill(worker->pt_id, SIGUSR1); |
|
|
|
|
} else { |
|
|
|
|
GPR_TIMER_MARK("kicked_no_pollers", 0); |
|
|
|
|
gpr_log(GPR_DEBUG, "pollset_kick: kicked without pollers"); |
|
|
|
|
p->kicked_without_pollers = true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -941,7 +951,6 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; |
|
|
|
|
int epoll_fd = -1; |
|
|
|
|
int ep_rv; |
|
|
|
|
gpr_log(GPR_DEBUG, "pollset_work_and_unlock: Entering.."); |
|
|
|
|
GPR_TIMER_BEGIN("pollset_work_and_unlock", 0); |
|
|
|
|
|
|
|
|
|
/* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
|
|
|
|
@ -952,22 +961,27 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
- pollset->polling_island->mu */ |
|
|
|
|
gpr_mu_lock(&pollset->pi_mu); |
|
|
|
|
|
|
|
|
|
if (pollset->polling_island != NULL) { |
|
|
|
|
pollset->polling_island = |
|
|
|
|
polling_island_update_and_lock(pollset->polling_island, 1, 0); |
|
|
|
|
epoll_fd = pollset->polling_island->epoll_fd; |
|
|
|
|
if (pollset->polling_island->fd_cnt == 0) { |
|
|
|
|
gpr_log(GPR_DEBUG, "pollset_work_and_unlock: epoll_fd: %d, No other fds", |
|
|
|
|
epoll_fd); |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < pollset->polling_island->fd_cnt; i++) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"pollset_work_and_unlock: epoll_fd: %d, fd_count: %d, fd[%d]: %d", |
|
|
|
|
epoll_fd, pollset->polling_island->fd_cnt, i, |
|
|
|
|
pollset->polling_island->fds[i]->fd); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&pollset->polling_island->mu); |
|
|
|
|
if (pollset->polling_island == NULL) { |
|
|
|
|
pollset->polling_island = polling_island_create(NULL, 1); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pollset->polling_island = |
|
|
|
|
polling_island_update_and_lock(pollset->polling_island, 1, 0); |
|
|
|
|
epoll_fd = pollset->polling_island->epoll_fd; |
|
|
|
|
|
|
|
|
|
#ifdef GRPC_EPOLL_DEBUG |
|
|
|
|
if (pollset->polling_island->fd_cnt == 0) { |
|
|
|
|
gpr_log(GPR_DEBUG, "pollset_work_and_unlock: epoll_fd: %d, No other fds", |
|
|
|
|
epoll_fd); |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < pollset->polling_island->fd_cnt; i++) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"pollset_work_and_unlock: epoll_fd: %d, fd_count: %d, fd[%d]: %d", |
|
|
|
|
epoll_fd, pollset->polling_island->fd_cnt, i, |
|
|
|
|
pollset->polling_island->fds[i]->fd); |
|
|
|
|
} |
|
|
|
|
#endif |
|
|
|
|
gpr_mu_unlock(&pollset->polling_island->mu); |
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&pollset->pi_mu); |
|
|
|
|
gpr_mu_unlock(&pollset->mu); |
|
|
|
@ -975,16 +989,8 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
/* If epoll_fd == -1, this is a blank pollset and does not have any fds yet */ |
|
|
|
|
if (epoll_fd != -1) { |
|
|
|
|
do { |
|
|
|
|
gpr_timespec before_epoll = gpr_now(GPR_CLOCK_PRECISE); |
|
|
|
|
gpr_log(GPR_DEBUG, "pollset_work_and_unlock: epoll_wait()...."); |
|
|
|
|
ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, |
|
|
|
|
sig_mask); |
|
|
|
|
gpr_timespec after_epoll = gpr_now(GPR_CLOCK_PRECISE); |
|
|
|
|
int dur = gpr_time_to_millis(gpr_time_sub(after_epoll, before_epoll)); |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"pollset_work_and_unlock: DONE epoll_wait() : %d ms, ep_rv: %d", |
|
|
|
|
dur, ep_rv); |
|
|
|
|
|
|
|
|
|
if (ep_rv < 0) { |
|
|
|
|
if (errno != EINTR) { |
|
|
|
|
/* TODO (sreek) - Do not log an error in case of bad file descriptor
|
|
|
|
@ -993,9 +999,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
* closed) */ |
|
|
|
|
gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno)); |
|
|
|
|
} else { |
|
|
|
|
gpr_log(GPR_DEBUG, "pollset_work_and_unlock: 0-timeout epoll_wait()"); |
|
|
|
|
ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); |
|
|
|
|
gpr_log(GPR_DEBUG, "pollset_work_and_unlock: ep_rv: %d", ep_rv); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1018,7 +1022,6 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
} while (ep_rv == GRPC_EPOLL_MAX_EVENTS); |
|
|
|
|
} |
|
|
|
|
gpr_log(GPR_DEBUG, "pollset_work_and_unlock: Leaving.."); |
|
|
|
|
GPR_TIMER_END("pollset_work_and_unlock", 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1093,7 +1096,6 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
grpc_pollset_worker **worker_hdl, gpr_timespec now, |
|
|
|
|
gpr_timespec deadline) { |
|
|
|
|
GPR_TIMER_BEGIN("pollset_work", 0); |
|
|
|
|
gpr_log(GPR_DEBUG, "pollset_work: enter"); |
|
|
|
|
int timeout_ms = poll_deadline_to_millis_timeout(deadline, now); |
|
|
|
|
|
|
|
|
|
sigset_t new_mask; |
|
|
|
@ -1112,7 +1114,6 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
work that needs attention like an event on the completion queue or an |
|
|
|
|
alarm */ |
|
|
|
|
GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0); |
|
|
|
|
gpr_log(GPR_INFO, "pollset_work: kicked without pollers.."); |
|
|
|
|
pollset->kicked_without_pollers = 0; |
|
|
|
|
} else if (!pollset->shutting_down) { |
|
|
|
|
sigemptyset(&new_mask); |
|
|
|
@ -1147,14 +1148,12 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
gpr_mu_lock(&pollset->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_log(GPR_DEBUG, "pollset_work(): leaving"); |
|
|
|
|
*worker_hdl = NULL; |
|
|
|
|
GPR_TIMER_END("pollset_work", 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
grpc_fd *fd) { |
|
|
|
|
gpr_log(GPR_DEBUG, "pollset_add_fd: pollset: %p, fd: %d", pollset, fd->fd); |
|
|
|
|
/* TODO sreek - Double check if we need to get a pollset->mu lock here */ |
|
|
|
|
gpr_mu_lock(&pollset->pi_mu); |
|
|
|
|
gpr_mu_lock(&fd->pi_mu); |
|
|
|
@ -1347,6 +1346,7 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, |
|
|
|
|
static void shutdown_engine(void) { |
|
|
|
|
fd_global_shutdown(); |
|
|
|
|
pollset_global_shutdown(); |
|
|
|
|
polling_island_global_shutdown(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static const grpc_event_engine_vtable vtable = { |
|
|
|
|