Rename polling_island to epoll_set

pull/10970/head
Sree Kuchibhotla 8 years ago
parent 8ed56f5a4b
commit aa033db15e
  1. 382
      src/core/lib/iomgr/ev_epoll_thread_pool_linux.c

@ -82,14 +82,14 @@ static int grpc_polling_trace = 0; /* Disabled by default */
* needed) */
static grpc_wakeup_fd global_wakeup_fd;
struct polling_island;
struct epoll_set;
/*******************************************************************************
* Fd Declarations
*/
struct grpc_fd {
gpr_mu mu;
struct polling_island *pi;
struct epoll_set *eps;
int fd;
@ -115,25 +115,25 @@ static void fd_global_shutdown(void);
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
#define PI_UNREF(exec_ctx, p, r) \
pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
#define EPS_ADD_REF(p, r) eps_add_ref_dbg((p), (r), __FILE__, __LINE__)
#define EPS_UNREF(exec_ctx, p, r) \
eps_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
#define PI_ADD_REF(p, r) pi_add_ref((p))
#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
#define EPS_ADD_REF(p, r) eps_add_ref((p))
#define EPS_UNREF(exec_ctx, p, r) eps_unref((exec_ctx), (p))
#endif /* !defined(GRPC_PI_REF_COUNT_DEBUG) */
#endif /* !defined(GRPC_EPS_REF_COUNT_DEBUG) */
/* This is also used as grpc_workqueue (by directly casting it) */
typedef struct polling_island {
typedef struct epoll_set {
grpc_closure_scheduler workqueue_scheduler;
/* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
/* Ref count. Use EPS_ADD_REF() and EPS_UNREF() macros to increment/decrement
the refcount. Once the ref count becomes zero, this structure is destroyed
which means we should ensure that there is never a scenario where a
PI_ADD_REF() is racing with a PI_UNREF() that just made the ref_count
EPS_ADD_REF() is racing with a EPS_UNREF() that just made the ref_count
zero. */
gpr_atm ref_count;
@ -154,7 +154,7 @@ typedef struct polling_island {
/* The fd of the underlying epoll set */
int epoll_fd;
} polling_island;
} epoll_set;
/*******************************************************************************
* Pollset Declarations
@ -168,7 +168,7 @@ struct grpc_pollset_worker {
struct grpc_pollset {
gpr_mu mu;
struct polling_island *pi;
struct epoll_set *eps;
grpc_pollset_worker root_worker;
bool kicked_without_pollers;
@ -187,9 +187,9 @@ struct grpc_pollset_set {};
* Dedicated polling threads and pollsets - Declarations
*/
size_t g_num_pi = 1;
struct polling_island **g_polling_islands = NULL;
size_t g_num_threads_per_pi = 1;
size_t g_num_eps = 1;
struct epoll_set **g_epoll_sets = NULL;
size_t g_num_threads_per_eps = 1;
gpr_thd_id *g_poller_threads = NULL;
/* Used as read-notifier pollsets for fds. We won't be using read notifier
@ -197,9 +197,9 @@ gpr_thd_id *g_poller_threads = NULL;
* return */
grpc_pollset g_read_notifier;
static void add_fd_to_dedicated_pi(grpc_fd *fd);
static bool init_dedicated_polling_islands();
static void shutdown_dedicated_polling_islands();
static void add_fd_to_dedicated_eps(grpc_fd *fd);
static bool init_dedicated_epoll_sets();
static void shutdown_dedicated_epoll_sets();
static void poller_thread_loop(void *arg);
static void start_dedicated_poller_threads();
static void shutdown_dedicated_poller_threads();
@ -229,14 +229,14 @@ static bool append_error(grpc_error **composite, grpc_error *error,
NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
static grpc_wakeup_fd polling_island_wakeup_fd;
static grpc_wakeup_fd epoll_set_wakeup_fd;
/* The polling island being polled right now.
See comments in workqueue_maybe_wakeup for why this is tracked. */
static __thread polling_island *g_current_thread_polling_island;
static __thread epoll_set *g_current_thread_epoll_set;
/* Forward declaration */
static void polling_island_delete(polling_island *pi);
static void epoll_set_delete(epoll_set *eps);
static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error);
@ -254,31 +254,31 @@ gpr_atm g_epoll_sync;
static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
workqueue_enqueue, workqueue_enqueue, "workqueue"};
static void pi_add_ref(polling_island *pi);
static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
static void eps_add_ref(epoll_set *eps);
static void eps_unref(grpc_exec_ctx *exec_ctx, epoll_set *eps);
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
static void pi_add_ref_dbg(polling_island *pi, const char *reason,
static void eps_add_ref_dbg(epoll_set *eps, const char *reason,
const char *file, int line) {
long old_cnt = gpr_atm_acq_load(&pi->ref_count);
pi_add_ref(pi);
gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, old_cnt + 1, reason, file, line);
long old_cnt = gpr_atm_acq_load(&eps->ref_count);
eps_add_ref(eps);
gpr_log(GPR_DEBUG, "Add ref eps: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
(void *)eps, old_cnt, old_cnt + 1, reason, file, line);
}
static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
static void eps_unref_dbg(grpc_exec_ctx *exec_ctx, epoll_set *eps,
const char *reason, const char *file, int line) {
long old_cnt = gpr_atm_acq_load(&pi->ref_count);
pi_unref(exec_ctx, pi);
gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
long old_cnt = gpr_atm_acq_load(&eps->ref_count);
eps_unref(exec_ctx, eps);
gpr_log(GPR_DEBUG, "Unref eps: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
(void *)eps, old_cnt, (old_cnt - 1), reason, file, line);
}
static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
const char *file, int line,
const char *reason) {
if (workqueue != NULL) {
pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
eps_add_ref_dbg((epoll_set *)workqueue, reason, file, line);
}
return workqueue;
}
@ -286,13 +286,13 @@ static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
const char *file, int line, const char *reason) {
if (workqueue != NULL) {
pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
eps_unref_dbg(exec_ctx, (epoll_set *)workqueue, reason, file, line);
}
}
#else
static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
if (workqueue != NULL) {
pi_add_ref((polling_island *)workqueue);
eps_add_ref((epoll_set *)workqueue);
}
return workqueue;
}
@ -300,31 +300,31 @@ static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
static void workqueue_unref(grpc_exec_ctx *exec_ctx,
grpc_workqueue *workqueue) {
if (workqueue != NULL) {
pi_unref(exec_ctx, (polling_island *)workqueue);
eps_unref(exec_ctx, (epoll_set *)workqueue);
}
}
#endif
static void pi_add_ref(polling_island *pi) {
gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
static void eps_add_ref(epoll_set *eps) {
gpr_atm_no_barrier_fetch_add(&eps->ref_count, 1);
}
static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
static void eps_unref(grpc_exec_ctx *exec_ctx, epoll_set *eps) {
/* If ref count went to zero, delete the polling island. This deletion is
not done under a lock since once the ref count goes to zero, we are
guaranteed that no one else holds a reference to the polling island (and
that there is no racing pi_add_ref() call either).*/
if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
polling_island_delete(pi);
that there is no racing eps_add_ref() call either).*/
if (1 == gpr_atm_full_fetch_add(&eps->ref_count, -1)) {
epoll_set_delete(eps);
}
}
static void polling_island_add_fd_locked(polling_island *pi, grpc_fd *fd,
static void epoll_set_add_fd_locked(epoll_set *eps, grpc_fd *fd,
grpc_error **error) {
int err;
struct epoll_event ev;
char *err_msg;
const char *err_desc = "polling_island_add_fd_locked";
const char *err_desc = "epoll_set_add_fd_locked";
#ifdef GRPC_TSAN
/* See the definition of g_epoll_sync for more context */
@ -333,55 +333,55 @@ static void polling_island_add_fd_locked(polling_island *pi, grpc_fd *fd,
ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
ev.data.ptr = fd;
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
err = epoll_ctl(eps->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
if (err < 0 && errno != EEXIST) {
gpr_asprintf(
&err_msg,
"epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
pi->epoll_fd, fd->fd, errno, strerror(errno));
eps->epoll_fd, fd->fd, errno, strerror(errno));
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
gpr_free(err_msg);
}
}
static void polling_island_add_wakeup_fd_locked(polling_island *pi,
static void epoll_set_add_wakeup_fd_locked(epoll_set *eps,
grpc_wakeup_fd *wakeup_fd,
grpc_error **error) {
struct epoll_event ev;
int err;
char *err_msg;
const char *err_desc = "polling_island_add_wakeup_fd";
const char *err_desc = "epoll_set_add_wakeup_fd";
ev.events = (uint32_t)(EPOLLIN | EPOLLET);
ev.data.ptr = wakeup_fd;
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
err = epoll_ctl(eps->epoll_fd, EPOLL_CTL_ADD,
GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
if (err < 0 && errno != EEXIST) {
gpr_asprintf(&err_msg,
"epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
"error: %d (%s)",
pi->epoll_fd, GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd),
eps->epoll_fd, GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd),
errno, strerror(errno));
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
gpr_free(err_msg);
}
}
static void polling_island_remove_fd(polling_island *pi, grpc_fd *fd,
static void epoll_set_remove_fd(epoll_set *eps, grpc_fd *fd,
bool is_fd_closed, grpc_error **error) {
int err;
char *err_msg;
const char *err_desc = "polling_island_remove_fd";
const char *err_desc = "epoll_set_remove_fd";
/* If fd is already closed, then it would have been automatically been removed
from the epoll set */
if (!is_fd_closed) {
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
err = epoll_ctl(eps->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
if (err < 0 && errno != ENOENT) {
gpr_asprintf(
&err_msg,
"epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
pi->epoll_fd, fd->fd, errno, strerror(errno));
eps->epoll_fd, fd->fd, errno, strerror(errno));
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
gpr_free(err_msg);
}
@ -389,74 +389,74 @@ static void polling_island_remove_fd(polling_island *pi, grpc_fd *fd,
}
/* Might return NULL in case of an error */
static polling_island *polling_island_create(grpc_error **error) {
polling_island *pi = NULL;
const char *err_desc = "polling_island_create";
static epoll_set *epoll_set_create(grpc_error **error) {
epoll_set *eps = NULL;
const char *err_desc = "epoll_set_create";
*error = GRPC_ERROR_NONE;
pi = gpr_malloc(sizeof(*pi));
pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
pi->epoll_fd = -1;
eps = gpr_malloc(sizeof(*eps));
eps->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
eps->epoll_fd = -1;
gpr_mu_init(&pi->workqueue_read_mu);
gpr_mpscq_init(&pi->workqueue_items);
gpr_atm_rel_store(&pi->workqueue_item_count, 0);
gpr_mu_init(&eps->workqueue_read_mu);
gpr_mpscq_init(&eps->workqueue_items);
gpr_atm_rel_store(&eps->workqueue_item_count, 0);
gpr_atm_rel_store(&pi->ref_count, 0);
gpr_atm_rel_store(&pi->poller_count, 0);
gpr_atm_rel_store(&eps->ref_count, 0);
gpr_atm_rel_store(&eps->poller_count, 0);
gpr_atm_rel_store(&pi->is_shutdown, false);
gpr_atm_rel_store(&eps->is_shutdown, false);
if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
if (!append_error(error, grpc_wakeup_fd_init(&eps->workqueue_wakeup_fd),
err_desc)) {
goto done;
}
pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
eps->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (pi->epoll_fd < 0) {
if (eps->epoll_fd < 0) {
append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
goto done;
}
polling_island_add_wakeup_fd_locked(pi, &global_wakeup_fd, error);
polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
epoll_set_add_wakeup_fd_locked(eps, &global_wakeup_fd, error);
epoll_set_add_wakeup_fd_locked(eps, &eps->workqueue_wakeup_fd, error);
done:
if (*error != GRPC_ERROR_NONE) {
polling_island_delete(pi);
pi = NULL;
epoll_set_delete(eps);
eps = NULL;
}
return pi;
return eps;
}
static void polling_island_delete(polling_island *pi) {
if (pi->epoll_fd >= 0) {
close(pi->epoll_fd);
static void epoll_set_delete(epoll_set *eps) {
if (eps->epoll_fd >= 0) {
close(eps->epoll_fd);
}
GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
gpr_mu_destroy(&pi->workqueue_read_mu);
gpr_mpscq_destroy(&pi->workqueue_items);
grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
GPR_ASSERT(gpr_atm_no_barrier_load(&eps->workqueue_item_count) == 0);
gpr_mu_destroy(&eps->workqueue_read_mu);
gpr_mpscq_destroy(&eps->workqueue_items);
grpc_wakeup_fd_destroy(&eps->workqueue_wakeup_fd);
gpr_free(pi);
gpr_free(eps);
}
static void workqueue_maybe_wakeup(polling_island *pi) {
static void workqueue_maybe_wakeup(epoll_set *eps) {
/* If this thread is the current poller, then it may be that it's about to
decrement the current poller count, so we need to look past this thread */
bool is_current_poller = (g_current_thread_polling_island == pi);
bool is_current_poller = (g_current_thread_epoll_set == eps);
gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
gpr_atm current_pollers = gpr_atm_no_barrier_load(&eps->poller_count);
/* Only issue a wakeup if it's likely that some poller could come in and take
it right now. Note that since we do an anticipatory mpscq_pop every poll
loop, it's ok if we miss the wakeup here, as we'll get the work item when
the next poller enters anyway. */
if (current_pollers > min_current_pollers_for_wakeup) {
GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
grpc_wakeup_fd_wakeup(&eps->workqueue_wakeup_fd));
}
}
@ -468,12 +468,12 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
* this kicks off ends up destroying the workqueue before this function
* completes */
GRPC_WORKQUEUE_REF(workqueue, "enqueue");
polling_island *pi = (polling_island *)workqueue;
gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
epoll_set *eps = (epoll_set *)workqueue;
gpr_atm last = gpr_atm_no_barrier_fetch_add(&eps->workqueue_item_count, 1);
closure->error_data.error = error;
gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
gpr_mpscq_push(&eps->workqueue_items, &closure->next_data.atm_next);
if (last == 0) {
workqueue_maybe_wakeup(pi);
workqueue_maybe_wakeup(eps);
}
GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
@ -481,24 +481,24 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
}
static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
polling_island *pi = (polling_island *)workqueue;
epoll_set *eps = (epoll_set *)workqueue;
return workqueue == NULL ? grpc_schedule_on_exec_ctx
: &pi->workqueue_scheduler;
: &eps->workqueue_scheduler;
}
static grpc_error *polling_island_global_init() {
static grpc_error *epoll_set_global_init() {
grpc_error *error = GRPC_ERROR_NONE;
error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
error = grpc_wakeup_fd_init(&epoll_set_wakeup_fd);
if (error == GRPC_ERROR_NONE) {
error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
error = grpc_wakeup_fd_wakeup(&epoll_set_wakeup_fd);
}
return error;
}
static void polling_island_global_shutdown() {
grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
static void epoll_set_global_shutdown() {
grpc_wakeup_fd_destroy(&epoll_set_wakeup_fd);
}
/*******************************************************************************
@ -570,7 +570,7 @@ static grpc_fd *fd_create(int fd, const char *name) {
* is a newly created fd (or an fd we got from the freelist), no one else
* would be holding a lock to it anyway. */
gpr_mu_lock(&new_fd->mu);
new_fd->pi = NULL;
new_fd->eps = NULL;
new_fd->fd = fd;
new_fd->orphaned = false;
@ -588,8 +588,8 @@ static grpc_fd *fd_create(int fd, const char *name) {
gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
gpr_free(fd_name);
/* Associate the fd with one of the dedicated pi */
add_fd_to_dedicated_pi(new_fd);
/* Associate the fd with one of the dedicated eps */
add_fd_to_dedicated_eps(new_fd);
return new_fd;
}
@ -609,7 +609,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
const char *reason) {
bool is_fd_closed = false;
grpc_error *error = GRPC_ERROR_NONE;
polling_island *unref_pi = NULL;
epoll_set *unref_eps = NULL;
gpr_mu_lock(&fd->mu);
fd->on_done_closure = on_done;
@ -626,10 +626,10 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
fd->orphaned = true;
/* Remove the fd from the polling island */
if (fd->pi != NULL) {
polling_island_remove_fd(fd->pi, fd, is_fd_closed, &error);
unref_pi = fd->pi;
fd->pi = NULL;
if (fd->eps != NULL) {
epoll_set_remove_fd(fd->eps, fd, is_fd_closed, &error);
unref_eps = fd->eps;
fd->eps = NULL;
}
grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
@ -639,12 +639,12 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
/* We are done with this fd. Release it (i.e add back to freelist) */
add_fd_to_freelist(fd);
if (unref_pi != NULL) {
if (unref_eps != NULL) {
/* Unref stale polling island here, outside the fd lock above.
The polling island owns a workqueue which owns an fd, and unreffing
inside the lock can cause an eventual lock loop that makes TSAN very
unhappy. */
PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
EPS_UNREF(exec_ctx, unref_eps, "fd_orphan");
}
GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
@ -805,7 +805,7 @@ static grpc_error *kick_poller(void) {
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
gpr_mu_init(&pollset->mu);
*mu = &pollset->mu;
pollset->pi = NULL;
pollset->eps = NULL;
pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
pollset->kicked_without_pollers = false;
@ -823,12 +823,12 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
}
static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
static void pollset_release_epoll_set(grpc_exec_ctx *exec_ctx,
grpc_pollset *ps, char *reason) {
if (ps->pi != NULL) {
PI_UNREF(exec_ctx, ps->pi, reason);
if (ps->eps != NULL) {
EPS_UNREF(exec_ctx, ps->eps, reason);
}
ps->pi = NULL;
ps->eps = NULL;
}
static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
@ -838,8 +838,8 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
pollset->finish_shutdown_called = true;
/* Release the ref and set pollset->pi to NULL */
pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
/* Release the ref and set pollset->eps to NULL */
pollset_release_epoll_set(exec_ctx, pollset, "ps_shutdown");
grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
}
@ -872,13 +872,13 @@ static void pollset_destroy(grpc_pollset *pollset) {
}
static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
polling_island *pi) {
if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
gpr_mu_unlock(&pi->workqueue_read_mu);
epoll_set *eps) {
if (gpr_mu_trylock(&eps->workqueue_read_mu)) {
gpr_mpscq_node *n = gpr_mpscq_pop(&eps->workqueue_items);
gpr_mu_unlock(&eps->workqueue_read_mu);
if (n != NULL) {
if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
workqueue_maybe_wakeup(pi);
if (gpr_atm_full_fetch_add(&eps->workqueue_item_count, -1) > 1) {
workqueue_maybe_wakeup(eps);
}
grpc_closure *c = (grpc_closure *)n;
grpc_error *error = c->error_data.error;
@ -888,11 +888,11 @@ static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
return true;
} else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
} else if (gpr_atm_no_barrier_load(&eps->workqueue_item_count) > 0) {
/* n == NULL might mean there's work but it's not available to be popped
* yet - try to ensure another workqueue wakes up to check shortly if so
*/
workqueue_maybe_wakeup(pi);
workqueue_maybe_wakeup(eps);
}
}
return false;
@ -900,7 +900,7 @@ static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
#define GRPC_EPOLL_MAX_EVENTS 100
static void do_epoll_wait(grpc_exec_ctx *exec_ctx, int epoll_fd,
polling_island *pi, grpc_error **error) {
epoll_set *eps, grpc_error **error) {
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
int ep_rv;
char *err_msg;
@ -930,13 +930,13 @@ static void do_epoll_wait(grpc_exec_ctx *exec_ctx, int epoll_fd,
grpc_timer_consume_kick();
append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
} else if (data_ptr == &pi->workqueue_wakeup_fd) {
} else if (data_ptr == &eps->workqueue_wakeup_fd) {
append_error(error,
grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd),
grpc_wakeup_fd_consume_wakeup(&eps->workqueue_wakeup_fd),
err_desc);
maybe_do_workqueue_work(exec_ctx, pi);
} else if (data_ptr == &polling_island_wakeup_fd) {
gpr_atm_rel_store(&pi->is_shutdown, 1);
maybe_do_workqueue_work(exec_ctx, eps);
} else if (data_ptr == &epoll_set_wakeup_fd) {
gpr_atm_rel_store(&eps->is_shutdown, 1);
gpr_log(GPR_INFO, "pollset poller: shutdown set");
} else {
grpc_fd *fd = data_ptr;
@ -953,41 +953,41 @@ static void do_epoll_wait(grpc_exec_ctx *exec_ctx, int epoll_fd,
}
}
static void polling_island_work(grpc_exec_ctx *exec_ctx, polling_island *pi,
static void epoll_set_work(grpc_exec_ctx *exec_ctx, epoll_set *eps,
grpc_error **error) {
int epoll_fd = -1;
GPR_TIMER_BEGIN("polling_island_work", 0);
GPR_TIMER_BEGIN("epoll_set_work", 0);
/* Since epoll_fd is immutable, it is safe to read it without a lock on the
polling island. */
epoll_fd = pi->epoll_fd;
epoll_fd = eps->epoll_fd;
/* Add an extra ref so that the island does not get destroyed (which means
the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
epoll_fd */
PI_ADD_REF(pi, "ps_work");
EPS_ADD_REF(eps, "ps_work");
/* If we get some workqueue work to do, it might end up completing an item on
the completion queue, so there's no need to poll... so we skip that and
redo the complete loop to verify */
if (!maybe_do_workqueue_work(exec_ctx, pi)) {
gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
g_current_thread_polling_island = pi;
if (!maybe_do_workqueue_work(exec_ctx, eps)) {
gpr_atm_no_barrier_fetch_add(&eps->poller_count, 1);
g_current_thread_epoll_set = eps;
do_epoll_wait(exec_ctx, epoll_fd, pi, error);
do_epoll_wait(exec_ctx, epoll_fd, eps, error);
g_current_thread_polling_island = NULL;
gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
g_current_thread_epoll_set = NULL;
gpr_atm_no_barrier_fetch_add(&eps->poller_count, -1);
}
/* Before leaving, release the extra ref we added to the polling island. It
is important to use "pi" here (i.e our old copy of pollset->pi
is important to use "eps" here (i.e our old copy of pollset->eps
that we got before releasing the polling island lock). This is because
pollset->pi pointer might get udpated in other parts of the
pollset->eps pointer might get udpated in other parts of the
code when there is an island merge while we are doing epoll_wait() above */
PI_UNREF(exec_ctx, pi, "ps_work");
EPS_UNREF(exec_ctx, eps, "ps_work");
GPR_TIMER_END("polling_island_work", 0);
GPR_TIMER_END("epoll_set_work", 0);
}
/* pollset->mu lock must be held by the caller before calling this.
@ -1110,10 +1110,10 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
static void shutdown_engine(void) {
shutdown_dedicated_poller_threads();
shutdown_dedicated_polling_islands();
shutdown_dedicated_epoll_sets();
fd_global_shutdown();
pollset_global_shutdown();
polling_island_global_shutdown();
epoll_set_global_shutdown();
gpr_log(GPR_INFO, "ev-epoll-threadpool engine shutdown complete");
}
@ -1158,13 +1158,13 @@ static const grpc_event_engine_vtable vtable = {
/*****************************************************************************
* Dedicated polling threads and pollsets - Definitions
*/
static void add_fd_to_dedicated_pi(grpc_fd *fd) {
GPR_ASSERT(fd->pi == NULL);
GPR_TIMER_BEGIN("add_fd_to_dedicated_pi", 0);
static void add_fd_to_dedicated_eps(grpc_fd *fd) {
GPR_ASSERT(fd->eps == NULL);
GPR_TIMER_BEGIN("add_fd_to_dedicated_eps", 0);
grpc_error *error = GRPC_ERROR_NONE;
size_t idx = ((size_t)rand()) % g_num_pi;
polling_island *pi = g_polling_islands[idx];
size_t idx = ((size_t)rand()) % g_num_eps;
epoll_set *eps = g_epoll_sets[idx];
gpr_mu_lock(&fd->mu);
@ -1173,70 +1173,70 @@ static void add_fd_to_dedicated_pi(grpc_fd *fd) {
return; /* Early out */
}
polling_island_add_fd_locked(pi, fd, &error);
PI_ADD_REF(pi, "fd");
fd->pi = pi;
epoll_set_add_fd_locked(eps, fd, &error);
EPS_ADD_REF(eps, "fd");
fd->eps = eps;
GRPC_POLLING_TRACE("add_fd_to_dedicated_pi (fd: %d, pi idx = %ld)", fd->fd,
GRPC_POLLING_TRACE("add_fd_to_dedicated_eps (fd: %d, eps idx = %ld)", fd->fd,
idx);
gpr_mu_unlock(&fd->mu);
GRPC_LOG_IF_ERROR("add_fd_to_dedicated_pi", error);
GPR_TIMER_END("add_fd_to_dedicated_pi", 0);
GRPC_LOG_IF_ERROR("add_fd_to_dedicated_eps", error);
GPR_TIMER_END("add_fd_to_dedicated_eps", 0);
}
static bool init_dedicated_polling_islands() {
static bool init_dedicated_epoll_sets() {
grpc_error *error = GRPC_ERROR_NONE;
bool is_success = true;
g_polling_islands =
(polling_island **)malloc(g_num_pi * sizeof(polling_island *));
g_epoll_sets =
(epoll_set **)malloc(g_num_eps * sizeof(epoll_set *));
for (size_t i = 0; i < g_num_pi; i++) {
g_polling_islands[i] = polling_island_create(&error);
if (g_polling_islands[i] == NULL) {
for (size_t i = 0; i < g_num_eps; i++) {
g_epoll_sets[i] = epoll_set_create(&error);
if (g_epoll_sets[i] == NULL) {
gpr_log(GPR_ERROR, "Error in creating a dedicated polling island");
g_num_pi = i; /* Helps cleanup */
shutdown_dedicated_polling_islands();
g_num_eps = i; /* Helps cleanup */
shutdown_dedicated_epoll_sets();
is_success = false;
goto done;
}
PI_ADD_REF(g_polling_islands[i], "init_dedicated_polling_islands");
EPS_ADD_REF(g_epoll_sets[i], "init_dedicated_epoll_sets");
}
gpr_mu *mu;
pollset_init(&g_read_notifier, &mu);
done:
GRPC_LOG_IF_ERROR("init_dedicated_polling_islands", error);
GRPC_LOG_IF_ERROR("init_dedicated_epoll_sets", error);
return is_success;
}
static void shutdown_dedicated_polling_islands() {
if (!g_polling_islands) {
static void shutdown_dedicated_epoll_sets() {
if (!g_epoll_sets) {
return;
}
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
for (size_t i = 0; i < g_num_pi; i++) {
PI_UNREF(&exec_ctx, g_polling_islands[i],
"shutdown_dedicated_polling_islands");
for (size_t i = 0; i < g_num_eps; i++) {
EPS_UNREF(&exec_ctx, g_epoll_sets[i],
"shutdown_dedicated_epoll_sets");
}
grpc_exec_ctx_finish(&exec_ctx);
gpr_free(g_polling_islands);
g_polling_islands = NULL;
gpr_free(g_epoll_sets);
g_epoll_sets = NULL;
pollset_destroy(&g_read_notifier);
}
static void poller_thread_loop(void *arg) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_error *error = GRPC_ERROR_NONE;
polling_island *pi = (polling_island *)arg;
epoll_set *eps = (epoll_set *)arg;
while (!gpr_atm_acq_load(&pi->is_shutdown)) {
polling_island_work(&exec_ctx, pi, &error);
while (!gpr_atm_acq_load(&eps->is_shutdown)) {
epoll_set_work(&exec_ctx, eps, &error);
grpc_exec_ctx_flush(&exec_ctx);
}
@ -1244,37 +1244,37 @@ static void poller_thread_loop(void *arg) {
GRPC_LOG_IF_ERROR("poller_thread_loop", error);
}
/* g_polling_islands MUST be initialized before calling this */
/* g_epoll_sets MUST be initialized before calling this */
static void start_dedicated_poller_threads() {
GPR_ASSERT(g_polling_islands);
GPR_ASSERT(g_epoll_sets);
gpr_log(GPR_INFO, "Starting poller threads");
/* One thread per pollset */
g_poller_threads = (gpr_thd_id *)malloc(g_num_pi * sizeof(gpr_thd_id));
g_poller_threads = (gpr_thd_id *)malloc(g_num_eps * sizeof(gpr_thd_id));
gpr_thd_options options = gpr_thd_options_default();
gpr_thd_options_set_joinable(&options);
for (size_t i = 0; i < g_num_pi; i++) {
for (size_t i = 0; i < g_num_eps; i++) {
gpr_thd_new(&g_poller_threads[i], poller_thread_loop,
(void *)g_polling_islands[i], &options);
(void *)g_epoll_sets[i], &options);
}
}
static void shutdown_dedicated_poller_threads() {
GPR_ASSERT(g_poller_threads);
GPR_ASSERT(g_polling_islands);
GPR_ASSERT(g_epoll_sets);
grpc_error *error = GRPC_ERROR_NONE;
gpr_log(GPR_INFO, "Shutting down pollers");
polling_island *pi = NULL;
for (size_t i = 0; i < g_num_pi; i++) {
pi = g_polling_islands[i];
polling_island_add_wakeup_fd_locked(pi, &polling_island_wakeup_fd, &error);
epoll_set *eps = NULL;
for (size_t i = 0; i < g_num_eps; i++) {
eps = g_epoll_sets[i];
epoll_set_add_wakeup_fd_locked(eps, &epoll_set_wakeup_fd, &error);
}
for (size_t i = 0; i < g_num_pi; i++) {
for (size_t i = 0; i < g_num_eps; i++) {
gpr_thd_join(g_poller_threads[i]);
}
@ -1316,12 +1316,12 @@ const grpc_event_engine_vtable *grpc_init_epoll_thread_pool_linux(void) {
return NULL;
}
if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
polling_island_global_init())) {
if (!GRPC_LOG_IF_ERROR("epoll_set_global_init",
epoll_set_global_init())) {
return NULL;
}
if (!init_dedicated_polling_islands()) {
if (!init_dedicated_epoll_sets()) {
return NULL;
}

Loading…
Cancel
Save