Update epollers

reviewable/pr11119/r1
Craig Tiller 8 years ago
parent 3e9f98ef11
commit 61f96c1683
  1. 100
      src/core/lib/iomgr/ev_epoll1_linux.c
  2. 198
      src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
  3. 172
      src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
  4. 156
      src/core/lib/iomgr/ev_epollex_linux.c
  5. 279
      src/core/lib/iomgr/ev_epollsig_linux.c
  6. 6
      src/core/lib/iomgr/executor.c
  7. 82
      test/core/iomgr/ev_epollsig_linux_test.c

@ -58,7 +58,6 @@
#include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/lockfree_event.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/block_annotate.h"
@ -130,7 +129,9 @@ struct grpc_pollset {
* Pollset-set Declarations * Pollset-set Declarations
*/ */
struct grpc_pollset_set {}; struct grpc_pollset_set {
char unused;
};
/******************************************************************************* /*******************************************************************************
* Common helpers * Common helpers
@ -283,10 +284,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
} }
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
return (grpc_workqueue *)0xb0b51ed;
}
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) { grpc_pollset *notifier) {
grpc_lfev_set_ready(exec_ctx, &fd->read_closure); grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
@ -313,8 +310,6 @@ GPR_TLS_DECL(g_current_thread_worker);
static gpr_atm g_active_poller; static gpr_atm g_active_poller;
static pollset_neighbourhood *g_neighbourhoods; static pollset_neighbourhood *g_neighbourhoods;
static size_t g_num_neighbourhoods; static size_t g_num_neighbourhoods;
static gpr_mu g_wq_mu;
static grpc_closure_list g_wq_items;
/* Return true if first in list */ /* Return true if first in list */
static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) { static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) {
@ -363,8 +358,6 @@ static grpc_error *pollset_global_init(void) {
gpr_atm_no_barrier_store(&g_active_poller, 0); gpr_atm_no_barrier_store(&g_active_poller, 0);
global_wakeup_fd.read_fd = -1; global_wakeup_fd.read_fd = -1;
grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd); grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd);
gpr_mu_init(&g_wq_mu);
g_wq_items = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
if (err != GRPC_ERROR_NONE) return err; if (err != GRPC_ERROR_NONE) return err;
struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET), struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
.data.ptr = &global_wakeup_fd}; .data.ptr = &global_wakeup_fd};
@ -383,7 +376,6 @@ static grpc_error *pollset_global_init(void) {
static void pollset_global_shutdown(void) { static void pollset_global_shutdown(void) {
gpr_tls_destroy(&g_current_thread_pollset); gpr_tls_destroy(&g_current_thread_pollset);
gpr_tls_destroy(&g_current_thread_worker); gpr_tls_destroy(&g_current_thread_worker);
gpr_mu_destroy(&g_wq_mu);
if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd); if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
for (size_t i = 0; i < g_num_neighbourhoods; i++) { for (size_t i = 0; i < g_num_neighbourhoods; i++) {
gpr_mu_destroy(&g_neighbourhoods[i].mu); gpr_mu_destroy(&g_neighbourhoods[i].mu);
@ -507,9 +499,6 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
for (int i = 0; i < r; i++) { for (int i = 0; i < r; i++) {
void *data_ptr = events[i].data.ptr; void *data_ptr = events[i].data.ptr;
if (data_ptr == &global_wakeup_fd) { if (data_ptr == &global_wakeup_fd) {
gpr_mu_lock(&g_wq_mu);
grpc_closure_list_move(&g_wq_items, &exec_ctx->closure_list);
gpr_mu_unlock(&g_wq_mu);
append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc); err_desc);
} else { } else {
@ -791,84 +780,6 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd *fd) {} grpc_fd *fd) {}
/*******************************************************************************
* Workqueue Definitions
*/
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
const char *file, int line,
const char *reason) {
return workqueue;
}
static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
const char *file, int line, const char *reason) {}
#else
static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
return workqueue;
}
static void workqueue_unref(grpc_exec_ctx *exec_ctx,
grpc_workqueue *workqueue) {}
#endif
static void wq_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error) {
// find a neighbourhood to wakeup
bool scheduled = false;
size_t initial_neighbourhood = choose_neighbourhood();
for (size_t i = 0; !scheduled && i < g_num_neighbourhoods; i++) {
pollset_neighbourhood *neighbourhood =
&g_neighbourhoods[(initial_neighbourhood + i) % g_num_neighbourhoods];
if (gpr_mu_trylock(&neighbourhood->mu)) {
if (neighbourhood->active_root != NULL) {
grpc_pollset *inspect = neighbourhood->active_root;
do {
if (gpr_mu_trylock(&inspect->mu)) {
if (inspect->root_worker != NULL) {
grpc_pollset_worker *inspect_worker = inspect->root_worker;
do {
if (inspect_worker->kick_state == UNKICKED) {
inspect_worker->kick_state = KICKED;
grpc_closure_list_append(
&inspect_worker->schedule_on_end_work, closure, error);
if (inspect_worker->initialized_cv) {
gpr_cv_signal(&inspect_worker->cv);
}
scheduled = true;
}
inspect_worker = inspect_worker->next;
} while (!scheduled && inspect_worker != inspect->root_worker);
}
gpr_mu_unlock(&inspect->mu);
}
inspect = inspect->next;
} while (!scheduled && inspect != neighbourhood->active_root);
}
gpr_mu_unlock(&neighbourhood->mu);
}
}
if (!scheduled) {
gpr_mu_lock(&g_wq_mu);
grpc_closure_list_append(&g_wq_items, closure, error);
gpr_mu_unlock(&g_wq_mu);
GRPC_LOG_IF_ERROR("workqueue_scheduler",
grpc_wakeup_fd_wakeup(&global_wakeup_fd));
}
}
static const grpc_closure_scheduler_vtable
singleton_workqueue_scheduler_vtable = {wq_sched, wq_sched,
"epoll1_workqueue"};
static grpc_closure_scheduler singleton_workqueue_scheduler = {
&singleton_workqueue_scheduler_vtable};
static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
return &singleton_workqueue_scheduler;
}
/******************************************************************************* /*******************************************************************************
* Pollset-set Definitions * Pollset-set Definitions
*/ */
@ -920,7 +831,6 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read, .fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write, .fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
.fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init, .pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown, .pollset_shutdown = pollset_shutdown,
@ -938,10 +848,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_set_add_fd = pollset_set_add_fd, .pollset_set_add_fd = pollset_set_add_fd,
.pollset_set_del_fd = pollset_set_del_fd, .pollset_set_del_fd = pollset_set_del_fd,
.workqueue_ref = workqueue_ref,
.workqueue_unref = workqueue_unref,
.workqueue_scheduler = workqueue_scheduler,
.shutdown_engine = shutdown_engine, .shutdown_engine = shutdown_engine,
}; };

@ -61,7 +61,6 @@
#include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/lockfree_event.h"
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/block_annotate.h"
#include "src/core/lib/support/env.h" #include "src/core/lib/support/env.h"
@ -184,13 +183,15 @@ static void fd_global_shutdown(void);
* Polling island Declarations * Polling island Declarations
*/ */
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG //#define PI_REFCOUNT_DEBUG
#ifdef PI_REFCOUNT_DEBUG
#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__) #define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
#define PI_UNREF(exec_ctx, p, r) \ #define PI_UNREF(exec_ctx, p, r) \
pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__) pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */ #else /* defined(PI_REFCOUNT_DEBUG) */
#define PI_ADD_REF(p, r) pi_add_ref((p)) #define PI_ADD_REF(p, r) pi_add_ref((p))
#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p)) #define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
@ -204,8 +205,6 @@ typedef struct worker_node {
/* This is also used as grpc_workqueue (by directly casing it) */ /* This is also used as grpc_workqueue (by directly casing it) */
typedef struct polling_island { typedef struct polling_island {
grpc_closure_scheduler workqueue_scheduler;
gpr_mu mu; gpr_mu mu;
/* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
the refcount. the refcount.
@ -226,15 +225,6 @@ typedef struct polling_island {
/* Number of threads currently polling on this island */ /* Number of threads currently polling on this island */
gpr_atm poller_count; gpr_atm poller_count;
/* Mutex guarding the read end of the workqueue (must be held to pop from
* workqueue_items) */
gpr_mu workqueue_read_mu;
/* Queue of closures to be executed */
gpr_mpscq workqueue_items;
/* Count of items in workqueue_items */
gpr_atm workqueue_item_count;
/* Wakeup fd used to wake pollers to check the contents of workqueue_items */
grpc_wakeup_fd workqueue_wakeup_fd;
/* The list of workers waiting to do polling on this polling island */ /* The list of workers waiting to do polling on this polling island */
gpr_mu worker_list_mu; gpr_mu worker_list_mu;
@ -323,8 +313,6 @@ static __thread polling_island *g_current_thread_polling_island;
/* Forward declaration */ /* Forward declaration */
static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi); static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error);
#ifdef GRPC_TSAN #ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and /* Currently TSAN may incorrectly flag data races between epoll_ctl and
@ -337,13 +325,10 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
gpr_atm g_epoll_sync; gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */ #endif /* defined(GRPC_TSAN) */
static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
workqueue_enqueue, workqueue_enqueue, "workqueue"};
static void pi_add_ref(polling_island *pi); static void pi_add_ref(polling_island *pi);
static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi); static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef PI_REFCOUNT_DEBUG
static void pi_add_ref_dbg(polling_island *pi, const char *reason, static void pi_add_ref_dbg(polling_island *pi, const char *reason,
const char *file, int line) { const char *file, int line) {
long old_cnt = gpr_atm_acq_load(&pi->ref_count); long old_cnt = gpr_atm_acq_load(&pi->ref_count);
@ -359,36 +344,6 @@ static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)", gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, (old_cnt - 1), reason, file, line); (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
} }
static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
const char *file, int line,
const char *reason) {
if (workqueue != NULL) {
pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
}
return workqueue;
}
static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
const char *file, int line, const char *reason) {
if (workqueue != NULL) {
pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
}
}
#else
static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
if (workqueue != NULL) {
pi_add_ref((polling_island *)workqueue);
}
return workqueue;
}
static void workqueue_unref(grpc_exec_ctx *exec_ctx,
grpc_workqueue *workqueue) {
if (workqueue != NULL) {
pi_unref(exec_ctx, (polling_island *)workqueue);
}
}
#endif #endif
static void pi_add_ref(polling_island *pi) { static void pi_add_ref(polling_island *pi) {
@ -592,17 +547,12 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
*error = GRPC_ERROR_NONE; *error = GRPC_ERROR_NONE;
pi = gpr_malloc(sizeof(*pi)); pi = gpr_malloc(sizeof(*pi));
pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
gpr_mu_init(&pi->mu); gpr_mu_init(&pi->mu);
pi->fd_cnt = 0; pi->fd_cnt = 0;
pi->fd_capacity = 0; pi->fd_capacity = 0;
pi->fds = NULL; pi->fds = NULL;
pi->epoll_fd = -1; pi->epoll_fd = -1;
gpr_mu_init(&pi->workqueue_read_mu);
gpr_mpscq_init(&pi->workqueue_items);
gpr_atm_rel_store(&pi->workqueue_item_count, 0);
gpr_atm_rel_store(&pi->ref_count, 0); gpr_atm_rel_store(&pi->ref_count, 0);
gpr_atm_rel_store(&pi->poller_count, 0); gpr_atm_rel_store(&pi->poller_count, 0);
gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
@ -610,11 +560,6 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
gpr_mu_init(&pi->worker_list_mu); gpr_mu_init(&pi->worker_list_mu);
worker_node_init(&pi->worker_list_head); worker_node_init(&pi->worker_list_head);
if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
err_desc)) {
goto done;
}
pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC); pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (pi->epoll_fd < 0) { if (pi->epoll_fd < 0) {
@ -622,8 +567,6 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
goto done; goto done;
} }
polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
if (initial_fd != NULL) { if (initial_fd != NULL) {
polling_island_add_fds_locked(pi, &initial_fd, 1, true, error); polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
} }
@ -642,11 +585,7 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
if (pi->epoll_fd >= 0) { if (pi->epoll_fd >= 0) {
close(pi->epoll_fd); close(pi->epoll_fd);
} }
GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
gpr_mu_destroy(&pi->workqueue_read_mu);
gpr_mpscq_destroy(&pi->workqueue_items);
gpr_mu_destroy(&pi->mu); gpr_mu_destroy(&pi->mu);
grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
gpr_mu_destroy(&pi->worker_list_mu); gpr_mu_destroy(&pi->worker_list_mu);
GPR_ASSERT(is_worker_node_detached(&pi->worker_list_head)); GPR_ASSERT(is_worker_node_detached(&pi->worker_list_head));
@ -794,45 +733,6 @@ static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
} }
} }
static void workqueue_maybe_wakeup(polling_island *pi) {
/* If this thread is the current poller, then it may be that it's about to
decrement the current poller count, so we need to look past this thread */
bool is_current_poller = (g_current_thread_polling_island == pi);
gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
/* Only issue a wakeup if it's likely that some poller could come in and take
it right now. Note that since we do an anticipatory mpscq_pop every poll
loop, it's ok if we miss the wakeup here, as we'll get the work item when
the next poller enters anyway. */
if (current_pollers > min_current_pollers_for_wakeup) {
GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
}
}
static void workqueue_move_items_to_parent(polling_island *q) {
polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
if (p == NULL) {
return;
}
gpr_mu_lock(&q->workqueue_read_mu);
int num_added = 0;
while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
if (n != NULL) {
gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
gpr_mpscq_push(&p->workqueue_items, n);
num_added++;
}
}
gpr_mu_unlock(&q->workqueue_read_mu);
if (num_added > 0) {
workqueue_maybe_wakeup(p);
}
workqueue_move_items_to_parent(p);
}
static polling_island *polling_island_merge(polling_island *p, static polling_island *polling_island_merge(polling_island *p,
polling_island *q, polling_island *q,
grpc_error **error) { grpc_error **error) {
@ -857,8 +757,6 @@ static polling_island *polling_island_merge(polling_island *p,
/* Add the 'merged_to' link from p --> q */ /* Add the 'merged_to' link from p --> q */
gpr_atm_rel_store(&p->merged_to, (gpr_atm)q); gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */ PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
workqueue_move_items_to_parent(p);
} }
/* else if p == q, nothing needs to be done */ /* else if p == q, nothing needs to be done */
@ -869,32 +767,6 @@ static polling_island *polling_island_merge(polling_island *p,
return q; return q;
} }
static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error) {
GPR_TIMER_BEGIN("workqueue.enqueue", 0);
grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
/* take a ref to the workqueue: otherwise it can happen that whatever events
* this kicks off ends up destroying the workqueue before this function
* completes */
GRPC_WORKQUEUE_REF(workqueue, "enqueue");
polling_island *pi = (polling_island *)workqueue;
gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
closure->error_data.error = error;
gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
if (last == 0) {
workqueue_maybe_wakeup(pi);
}
workqueue_move_items_to_parent(pi);
GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
GPR_TIMER_END("workqueue.enqueue", 0);
}
static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
polling_island *pi = (polling_island *)workqueue;
return workqueue == NULL ? grpc_schedule_on_exec_ctx
: &pi->workqueue_scheduler;
}
static grpc_error *polling_island_global_init() { static grpc_error *polling_island_global_init() {
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
@ -1153,14 +1025,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
} }
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
gpr_mu_lock(&fd->po.mu);
grpc_workqueue *workqueue =
GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
gpr_mu_unlock(&fd->po.mu);
return workqueue;
}
/******************************************************************************* /*******************************************************************************
* Pollset Definitions * Pollset Definitions
*/ */
@ -1432,33 +1296,6 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->po.mu); gpr_mu_destroy(&pollset->po.mu);
} }
static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
polling_island *pi) {
if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
gpr_mu_unlock(&pi->workqueue_read_mu);
if (n != NULL) {
if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
workqueue_maybe_wakeup(pi);
}
grpc_closure *c = (grpc_closure *)n;
grpc_error *error = c->error_data.error;
#ifndef NDEBUG
c->scheduled = false;
#endif
c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
return true;
} else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
/* n == NULL might mean there's work but it's not available to be popped
* yet - try to ensure another workqueue wakes up to check shortly if so
*/
workqueue_maybe_wakeup(pi);
}
}
return false;
}
/* NOTE: This function may modify 'now' */ /* NOTE: This function may modify 'now' */
static bool acquire_polling_lease(grpc_pollset_worker *worker, static bool acquire_polling_lease(grpc_pollset_worker *worker,
polling_island *pi, gpr_timespec deadline, polling_island *pi, gpr_timespec deadline,
@ -1594,12 +1431,7 @@ static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd,
for (int i = 0; i < ep_rv; ++i) { for (int i = 0; i < ep_rv; ++i) {
void *data_ptr = ep_ev[i].data.ptr; void *data_ptr = ep_ev[i].data.ptr;
if (data_ptr == &pi->workqueue_wakeup_fd) { if (data_ptr == &polling_island_wakeup_fd) {
append_error(error,
grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd),
err_desc);
maybe_do_workqueue_work(exec_ctx, pi);
} else if (data_ptr == &polling_island_wakeup_fd) {
GRPC_POLLING_TRACE( GRPC_POLLING_TRACE(
"pollset_work: pollset: %p, worker: %p polling island (epoll_fd: " "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
"%d) got merged", "%d) got merged",
@ -1675,15 +1507,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
PI_ADD_REF(pi, "ps_work"); PI_ADD_REF(pi, "ps_work");
gpr_mu_unlock(&pollset->po.mu); gpr_mu_unlock(&pollset->po.mu);
/* If we get some workqueue work to do, it might end up completing an item on g_current_thread_polling_island = pi;
the completion queue, so there's no need to poll... so we skip that and pollset_do_epoll_pwait(exec_ctx, epoll_fd, pollset, pi, worker, now, deadline,
redo the complete loop to verify */ sig_mask, error);
if (!maybe_do_workqueue_work(exec_ctx, pi)) { g_current_thread_polling_island = NULL;
g_current_thread_polling_island = pi;
pollset_do_epoll_pwait(exec_ctx, epoll_fd, pollset, pi, worker, now,
deadline, sig_mask, error);
g_current_thread_polling_island = NULL;
}
GPR_ASSERT(pi != NULL); GPR_ASSERT(pi != NULL);
@ -2036,7 +1863,6 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read, .fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write, .fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
.fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init, .pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown, .pollset_shutdown = pollset_shutdown,
@ -2054,10 +1880,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_set_add_fd = pollset_set_add_fd, .pollset_set_add_fd = pollset_set_add_fd,
.pollset_set_del_fd = pollset_set_del_fd, .pollset_set_del_fd = pollset_set_del_fd,
.workqueue_ref = workqueue_ref,
.workqueue_unref = workqueue_unref,
.workqueue_scheduler = workqueue_scheduler,
.shutdown_engine = shutdown_engine, .shutdown_engine = shutdown_engine,
}; };

@ -61,7 +61,6 @@
#include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/lockfree_event.h"
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/block_annotate.h"
@ -109,23 +108,22 @@ static void fd_global_shutdown(void);
* epoll set Declarations * epoll set Declarations
*/ */
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG //#define EPS_REFCOUNT_DEBUG
#ifdef EPS_REFCOUNT_DEBUG
#define EPS_ADD_REF(p, r) eps_add_ref_dbg((p), (r), __FILE__, __LINE__) #define EPS_ADD_REF(p, r) eps_add_ref_dbg((p), (r), __FILE__, __LINE__)
#define EPS_UNREF(exec_ctx, p, r) \ #define EPS_UNREF(exec_ctx, p, r) \
eps_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__) eps_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */ #else /* defined(EPS_REFCOUNT_DEBUG) */
#define EPS_ADD_REF(p, r) eps_add_ref((p)) #define EPS_ADD_REF(p, r) eps_add_ref((p))
#define EPS_UNREF(exec_ctx, p, r) eps_unref((exec_ctx), (p)) #define EPS_UNREF(exec_ctx, p, r) eps_unref((exec_ctx), (p))
#endif /* !defined(GRPC_EPS_REF_COUNT_DEBUG) */ #endif /* !defined(GRPC_EPS_REF_COUNT_DEBUG) */
/* This is also used as grpc_workqueue (by directly casting it) */
typedef struct epoll_set { typedef struct epoll_set {
grpc_closure_scheduler workqueue_scheduler;
/* Mutex poller should acquire to poll this. This enforces that only one /* Mutex poller should acquire to poll this. This enforces that only one
* poller can be polling on epoll_set at any time */ * poller can be polling on epoll_set at any time */
gpr_mu mu; gpr_mu mu;
@ -139,15 +137,6 @@ typedef struct epoll_set {
/* Number of threads currently polling on this epoll set*/ /* Number of threads currently polling on this epoll set*/
gpr_atm poller_count; gpr_atm poller_count;
/* Mutex guarding the read end of the workqueue (must be held to pop from
* workqueue_items) */
gpr_mu workqueue_read_mu;
/* Queue of closures to be executed */
gpr_mpscq workqueue_items;
/* Count of items in workqueue_items */
gpr_atm workqueue_item_count;
/* Wakeup fd used to wake pollers to check the contents of workqueue_items */
grpc_wakeup_fd workqueue_wakeup_fd;
/* Is the epoll set shutdown */ /* Is the epoll set shutdown */
gpr_atm is_shutdown; gpr_atm is_shutdown;
@ -181,7 +170,9 @@ struct grpc_pollset {
/******************************************************************************* /*******************************************************************************
* Pollset-set Declarations * Pollset-set Declarations
*/ */
struct grpc_pollset_set {}; struct grpc_pollset_set {
char unused;
};
/***************************************************************************** /*****************************************************************************
* Dedicated polling threads and pollsets - Declarations * Dedicated polling threads and pollsets - Declarations
@ -235,8 +226,6 @@ static __thread epoll_set *g_current_thread_epoll_set;
/* Forward declaration */ /* Forward declaration */
static void epoll_set_delete(epoll_set *eps); static void epoll_set_delete(epoll_set *eps);
static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error);
#ifdef GRPC_TSAN #ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and /* Currently TSAN may incorrectly flag data races between epoll_ctl and
@ -249,13 +238,10 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
gpr_atm g_epoll_sync; gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */ #endif /* defined(GRPC_TSAN) */
static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
workqueue_enqueue, workqueue_enqueue, "workqueue"};
static void eps_add_ref(epoll_set *eps); static void eps_add_ref(epoll_set *eps);
static void eps_unref(grpc_exec_ctx *exec_ctx, epoll_set *eps); static void eps_unref(grpc_exec_ctx *exec_ctx, epoll_set *eps);
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef EPS_REFCOUNT_DEBUG
static void eps_add_ref_dbg(epoll_set *eps, const char *reason, static void eps_add_ref_dbg(epoll_set *eps, const char *reason,
const char *file, int line) { const char *file, int line) {
long old_cnt = gpr_atm_acq_load(&eps->ref_count); long old_cnt = gpr_atm_acq_load(&eps->ref_count);
@ -271,36 +257,6 @@ static void eps_unref_dbg(grpc_exec_ctx *exec_ctx, epoll_set *eps,
gpr_log(GPR_DEBUG, "Unref eps: %p, old:%ld -> new:%ld (%s) - (%s, %d)", gpr_log(GPR_DEBUG, "Unref eps: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
(void *)eps, old_cnt, (old_cnt - 1), reason, file, line); (void *)eps, old_cnt, (old_cnt - 1), reason, file, line);
} }
static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
const char *file, int line,
const char *reason) {
if (workqueue != NULL) {
eps_add_ref_dbg((epoll_set *)workqueue, reason, file, line);
}
return workqueue;
}
static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
const char *file, int line, const char *reason) {
if (workqueue != NULL) {
eps_unref_dbg(exec_ctx, (epoll_set *)workqueue, reason, file, line);
}
}
#else
static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
if (workqueue != NULL) {
eps_add_ref((epoll_set *)workqueue);
}
return workqueue;
}
static void workqueue_unref(grpc_exec_ctx *exec_ctx,
grpc_workqueue *workqueue) {
if (workqueue != NULL) {
eps_unref(exec_ctx, (epoll_set *)workqueue);
}
}
#endif #endif
static void eps_add_ref(epoll_set *eps) { static void eps_add_ref(epoll_set *eps) {
@ -394,24 +350,15 @@ static epoll_set *epoll_set_create(grpc_error **error) {
*error = GRPC_ERROR_NONE; *error = GRPC_ERROR_NONE;
eps = gpr_malloc(sizeof(*eps)); eps = gpr_malloc(sizeof(*eps));
eps->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
eps->epoll_fd = -1; eps->epoll_fd = -1;
gpr_mu_init(&eps->mu); gpr_mu_init(&eps->mu);
gpr_mu_init(&eps->workqueue_read_mu);
gpr_mpscq_init(&eps->workqueue_items);
gpr_atm_rel_store(&eps->workqueue_item_count, 0);
gpr_atm_rel_store(&eps->ref_count, 0); gpr_atm_rel_store(&eps->ref_count, 0);
gpr_atm_rel_store(&eps->poller_count, 0); gpr_atm_rel_store(&eps->poller_count, 0);
gpr_atm_rel_store(&eps->is_shutdown, false); gpr_atm_rel_store(&eps->is_shutdown, false);
if (!append_error(error, grpc_wakeup_fd_init(&eps->workqueue_wakeup_fd),
err_desc)) {
goto done;
}
eps->epoll_fd = epoll_create1(EPOLL_CLOEXEC); eps->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (eps->epoll_fd < 0) { if (eps->epoll_fd < 0) {
@ -419,8 +366,6 @@ static epoll_set *epoll_set_create(grpc_error **error) {
goto done; goto done;
} }
epoll_set_add_wakeup_fd_locked(eps, &eps->workqueue_wakeup_fd, error);
done: done:
if (*error != GRPC_ERROR_NONE) { if (*error != GRPC_ERROR_NONE) {
epoll_set_delete(eps); epoll_set_delete(eps);
@ -434,57 +379,11 @@ static void epoll_set_delete(epoll_set *eps) {
close(eps->epoll_fd); close(eps->epoll_fd);
} }
GPR_ASSERT(gpr_atm_no_barrier_load(&eps->workqueue_item_count) == 0);
gpr_mu_destroy(&eps->mu); gpr_mu_destroy(&eps->mu);
gpr_mu_destroy(&eps->workqueue_read_mu);
gpr_mpscq_destroy(&eps->workqueue_items);
grpc_wakeup_fd_destroy(&eps->workqueue_wakeup_fd);
gpr_free(eps); gpr_free(eps);
} }
static void workqueue_maybe_wakeup(epoll_set *eps) {
/* If this thread is the current poller, then it may be that it's about to
decrement the current poller count, so we need to look past this thread */
bool is_current_poller = (g_current_thread_epoll_set == eps);
gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
gpr_atm current_pollers = gpr_atm_no_barrier_load(&eps->poller_count);
/* Only issue a wakeup if it's likely that some poller could come in and take
it right now. Note that since we do an anticipatory mpscq_pop every poll
loop, it's ok if we miss the wakeup here, as we'll get the work item when
the next poller enters anyway. */
if (current_pollers > min_current_pollers_for_wakeup) {
GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
grpc_wakeup_fd_wakeup(&eps->workqueue_wakeup_fd));
}
}
static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error) {
GPR_TIMER_BEGIN("workqueue.enqueue", 0);
grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
/* take a ref to the workqueue: otherwise it can happen that whatever events
* this kicks off ends up destroying the workqueue before this function
* completes */
GRPC_WORKQUEUE_REF(workqueue, "enqueue");
epoll_set *eps = (epoll_set *)workqueue;
gpr_atm last = gpr_atm_no_barrier_fetch_add(&eps->workqueue_item_count, 1);
closure->error_data.error = error;
gpr_mpscq_push(&eps->workqueue_items, &closure->next_data.atm_next);
if (last == 0) {
workqueue_maybe_wakeup(eps);
}
GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
GPR_TIMER_END("workqueue.enqueue", 0);
}
static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
epoll_set *eps = (epoll_set *)workqueue;
return workqueue == NULL ? grpc_schedule_on_exec_ctx
: &eps->workqueue_scheduler;
}
static grpc_error *epoll_set_global_init() { static grpc_error *epoll_set_global_init() {
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
@ -680,8 +579,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
} }
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
/******************************************************************************* /*******************************************************************************
* Pollset Definitions * Pollset Definitions
*/ */
@ -865,32 +762,6 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu); gpr_mu_destroy(&pollset->mu);
} }
static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, epoll_set *eps) {
if (gpr_mu_trylock(&eps->workqueue_read_mu)) {
gpr_mpscq_node *n = gpr_mpscq_pop(&eps->workqueue_items);
gpr_mu_unlock(&eps->workqueue_read_mu);
if (n != NULL) {
if (gpr_atm_full_fetch_add(&eps->workqueue_item_count, -1) > 1) {
workqueue_maybe_wakeup(eps);
}
grpc_closure *c = (grpc_closure *)n;
grpc_error *error = c->error_data.error;
#ifndef NDEBUG
c->scheduled = false;
#endif
c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
return true;
} else if (gpr_atm_no_barrier_load(&eps->workqueue_item_count) > 0) {
/* n == NULL might mean there's work but it's not available to be popped
* yet - try to ensure another workqueue wakes up to check shortly if so
*/
workqueue_maybe_wakeup(eps);
}
}
return false;
}
/* Blocking call */ /* Blocking call */
static void acquire_epoll_lease(epoll_set *eps) { static void acquire_epoll_lease(epoll_set *eps) {
if (g_num_threads_per_eps > 1) { if (g_num_threads_per_eps > 1) {
@ -934,12 +805,7 @@ static void do_epoll_wait(grpc_exec_ctx *exec_ctx, int epoll_fd, epoll_set *eps,
for (int i = 0; i < ep_rv; ++i) { for (int i = 0; i < ep_rv; ++i) {
void *data_ptr = ep_ev[i].data.ptr; void *data_ptr = ep_ev[i].data.ptr;
if (data_ptr == &eps->workqueue_wakeup_fd) { if (data_ptr == &epoll_set_wakeup_fd) {
append_error(error,
grpc_wakeup_fd_consume_wakeup(&eps->workqueue_wakeup_fd),
err_desc);
maybe_do_workqueue_work(exec_ctx, eps);
} else if (data_ptr == &epoll_set_wakeup_fd) {
gpr_atm_rel_store(&eps->is_shutdown, 1); gpr_atm_rel_store(&eps->is_shutdown, 1);
gpr_log(GPR_INFO, "pollset poller: shutdown set"); gpr_log(GPR_INFO, "pollset poller: shutdown set");
} else { } else {
@ -966,18 +832,13 @@ static void epoll_set_work(grpc_exec_ctx *exec_ctx, epoll_set *eps,
epoll set. */ epoll set. */
epoll_fd = eps->epoll_fd; epoll_fd = eps->epoll_fd;
/* If we get some workqueue work to do, it might end up completing an item on gpr_atm_no_barrier_fetch_add(&eps->poller_count, 1);
the completion queue, so there's no need to poll... so we skip that and g_current_thread_epoll_set = eps;
redo the complete loop to verify */
if (!maybe_do_workqueue_work(exec_ctx, eps)) {
gpr_atm_no_barrier_fetch_add(&eps->poller_count, 1);
g_current_thread_epoll_set = eps;
do_epoll_wait(exec_ctx, epoll_fd, eps, error); do_epoll_wait(exec_ctx, epoll_fd, eps, error);
g_current_thread_epoll_set = NULL; g_current_thread_epoll_set = NULL;
gpr_atm_no_barrier_fetch_add(&eps->poller_count, -1); gpr_atm_no_barrier_fetch_add(&eps->poller_count, -1);
}
GPR_TIMER_END("epoll_set_work", 0); GPR_TIMER_END("epoll_set_work", 0);
} }
@ -1120,7 +981,6 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read, .fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write, .fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
.fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init, .pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown, .pollset_shutdown = pollset_shutdown,
@ -1138,10 +998,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_set_add_fd = pollset_set_add_fd, .pollset_set_add_fd = pollset_set_add_fd,
.pollset_set_del_fd = pollset_set_del_fd, .pollset_set_del_fd = pollset_set_del_fd,
.workqueue_ref = workqueue_ref,
.workqueue_unref = workqueue_unref,
.workqueue_scheduler = workqueue_scheduler,
.shutdown_engine = shutdown_engine, .shutdown_engine = shutdown_engine,
}; };

@ -59,7 +59,6 @@
#include "src/core/lib/iomgr/sys_epoll_wrapper.h" #include "src/core/lib/iomgr/sys_epoll_wrapper.h"
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/block_annotate.h"
#include "src/core/lib/support/spinlock.h" #include "src/core/lib/support/spinlock.h"
@ -139,17 +138,6 @@ struct grpc_fd {
Ref/Unref by two to avoid altering the orphaned bit */ Ref/Unref by two to avoid altering the orphaned bit */
gpr_atm refst; gpr_atm refst;
/* Wakeup fd used to wake pollers to check the contents of workqueue_items */
grpc_wakeup_fd workqueue_wakeup_fd;
grpc_closure_scheduler workqueue_scheduler;
/* Spinlock guarding the read end of the workqueue (must be held to pop from
* workqueue_items) */
gpr_spinlock workqueue_read_mu;
/* Queue of closures to be executed */
gpr_mpscq workqueue_items;
/* Count of items in workqueue_items */
gpr_atm workqueue_item_count;
/* The fd is either closed or we relinquished control of it. In either /* The fd is either closed or we relinquished control of it. In either
cases, this indicates that the 'fd' on this structure is no longer cases, this indicates that the 'fd' on this structure is no longer
valid */ valid */
@ -172,12 +160,6 @@ struct grpc_fd {
static void fd_global_init(void); static void fd_global_init(void);
static void fd_global_shutdown(void); static void fd_global_shutdown(void);
static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error);
static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
workqueue_enqueue, workqueue_enqueue, "workqueue"};
/******************************************************************************* /*******************************************************************************
* Pollset Declarations * Pollset Declarations
*/ */
@ -347,13 +329,6 @@ static grpc_fd *fd_create(int fd, const char *name) {
grpc_lfev_init(&new_fd->write_closure); grpc_lfev_init(&new_fd->write_closure);
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
GRPC_LOG_IF_ERROR("fd_create",
grpc_wakeup_fd_init(&new_fd->workqueue_wakeup_fd));
new_fd->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
new_fd->workqueue_read_mu = GPR_SPINLOCK_INITIALIZER;
gpr_mpscq_init(&new_fd->workqueue_items);
gpr_atm_no_barrier_store(&new_fd->workqueue_item_count, 0);
new_fd->freelist_next = NULL; new_fd->freelist_next = NULL;
new_fd->on_done_closure = NULL; new_fd->on_done_closure = NULL;
@ -446,91 +421,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
} }
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
REF_BY(fd, 2, "return_workqueue");
return (grpc_workqueue *)fd;
}
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
const char *file, int line,
const char *reason) {
if (workqueue != NULL) {
ref_by((grpc_fd *)workqueue, 2, file, line, reason);
}
return workqueue;
}
static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
const char *file, int line, const char *reason) {
if (workqueue != NULL) {
unref_by(exec_ctx, (grpc_fd *)workqueue, 2, file, line, reason);
}
}
#else
static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
if (workqueue != NULL) {
ref_by((grpc_fd *)workqueue, 2);
}
return workqueue;
}
static void workqueue_unref(grpc_exec_ctx *exec_ctx,
grpc_workqueue *workqueue) {
if (workqueue != NULL) {
unref_by(exec_ctx, (grpc_fd *)workqueue, 2);
}
}
#endif
static void workqueue_wakeup(grpc_fd *fd) {
GRPC_LOG_IF_ERROR("workqueue_enqueue",
grpc_wakeup_fd_wakeup(&fd->workqueue_wakeup_fd));
}
static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error) {
GPR_TIMER_BEGIN("workqueue.enqueue", 0);
grpc_fd *fd = (grpc_fd *)(((char *)closure->scheduler) -
offsetof(grpc_fd, workqueue_scheduler));
REF_BY(fd, 2, "workqueue_enqueue");
gpr_atm last = gpr_atm_no_barrier_fetch_add(&fd->workqueue_item_count, 1);
closure->error_data.error = error;
gpr_mpscq_push(&fd->workqueue_items, &closure->next_data.atm_next);
if (last == 0) {
workqueue_wakeup(fd);
}
UNREF_BY(exec_ctx, fd, 2, "workqueue_enqueue");
}
static void fd_invoke_workqueue(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
/* handle spurious wakeups */
if (!gpr_spinlock_trylock(&fd->workqueue_read_mu)) return;
gpr_mpscq_node *n = gpr_mpscq_pop(&fd->workqueue_items);
gpr_spinlock_unlock(&fd->workqueue_read_mu);
if (n != NULL) {
if (gpr_atm_full_fetch_add(&fd->workqueue_item_count, -1) > 1) {
workqueue_wakeup(fd);
}
grpc_closure *c = (grpc_closure *)n;
grpc_error *error = c->error_data.error;
#ifndef NDEBUG
c->scheduled = false;
#endif
c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
} else if (gpr_atm_no_barrier_load(&fd->workqueue_item_count) > 0) {
/* n == NULL might mean there's work but it's not available to be popped
* yet - try to ensure another workqueue wakes up to check shortly if so
*/
workqueue_wakeup(fd);
}
}
static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
return &((grpc_fd *)workqueue)->workqueue_scheduler;
}
/******************************************************************************* /*******************************************************************************
* Pollable Definitions * Pollable Definitions
*/ */
@ -596,22 +486,7 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) {
.data.ptr = fd}; .data.ptr = fd};
if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) { if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) {
switch (errno) { switch (errno) {
case EEXIST: /* if this fd is already in the epoll set, the workqueue fd case EEXIST:
must also be - just return */
gpr_mu_unlock(&fd->orphaned_mu);
return GRPC_ERROR_NONE;
default:
append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc);
}
}
struct epoll_event ev_wq = {
.events = (uint32_t)(EPOLLET | EPOLLIN | EPOLLEXCLUSIVE),
.data.ptr = (void *)(1 + (intptr_t)fd)};
if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->workqueue_wakeup_fd.read_fd, &ev_wq) !=
0) {
switch (errno) {
case EEXIST: /* if the workqueue fd is already in the epoll set we're ok
- no need to do anything special */
break; break;
default: default:
append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc); append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc);
@ -874,29 +749,21 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
} }
append_error(&error, grpc_wakeup_fd_consume_wakeup(&p->wakeup), err_desc); append_error(&error, grpc_wakeup_fd_consume_wakeup(&p->wakeup), err_desc);
} else { } else {
grpc_fd *fd = (grpc_fd *)(((intptr_t)data_ptr) & ~(intptr_t)1); grpc_fd *fd = (grpc_fd *)data_ptr;
bool is_workqueue = (((intptr_t)data_ptr) & 1) != 0;
bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0; bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0; bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (events[i].events & EPOLLOUT) != 0; bool write_ev = (events[i].events & EPOLLOUT) != 0;
if (GRPC_TRACER_ON(grpc_polling_trace)) { if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"PS:%p poll %p got fd %p: is_wq=%d cancel=%d read=%d " "PS:%p poll %p got fd %p: cancel=%d read=%d "
"write=%d", "write=%d",
pollset, p, fd, is_workqueue, cancel, read_ev, write_ev); pollset, p, fd, cancel, read_ev, write_ev);
} }
if (is_workqueue) { if (read_ev || cancel) {
append_error(&error, fd_become_readable(exec_ctx, fd, pollset);
grpc_wakeup_fd_consume_wakeup(&fd->workqueue_wakeup_fd), }
err_desc); if (write_ev || cancel) {
fd_invoke_workqueue(exec_ctx, fd); fd_become_writable(exec_ctx, fd);
} else {
if (read_ev || cancel) {
fd_become_readable(exec_ctx, fd, pollset);
}
if (write_ev || cancel) {
fd_become_writable(exec_ctx, fd);
}
} }
} }
} }
@ -1449,7 +1316,6 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read, .fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write, .fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
.fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init, .pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown, .pollset_shutdown = pollset_shutdown,
@ -1467,10 +1333,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_set_add_fd = pollset_set_add_fd, .pollset_set_add_fd = pollset_set_add_fd,
.pollset_set_del_fd = pollset_set_del_fd, .pollset_set_del_fd = pollset_set_del_fd,
.workqueue_ref = workqueue_ref,
.workqueue_unref = workqueue_unref,
.workqueue_scheduler = workqueue_scheduler,
.shutdown_engine = shutdown_engine, .shutdown_engine = shutdown_engine,
}; };

@ -59,7 +59,6 @@
#include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/lockfree_event.h"
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/block_annotate.h"
@ -177,7 +176,9 @@ static void fd_global_shutdown(void);
* Polling island Declarations * Polling island Declarations
*/ */
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG //#define PI_REFCOUNT_DEBUG
#ifdef PI_REFCOUNT_DEBUG
#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__) #define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
#define PI_UNREF(exec_ctx, p, r) \ #define PI_UNREF(exec_ctx, p, r) \
@ -192,8 +193,6 @@ static void fd_global_shutdown(void);
/* This is also used as grpc_workqueue (by directly casing it) */ /* This is also used as grpc_workqueue (by directly casing it) */
typedef struct polling_island { typedef struct polling_island {
grpc_closure_scheduler workqueue_scheduler;
gpr_mu mu; gpr_mu mu;
/* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
the refcount. the refcount.
@ -214,15 +213,6 @@ typedef struct polling_island {
/* Number of threads currently polling on this island */ /* Number of threads currently polling on this island */
gpr_atm poller_count; gpr_atm poller_count;
/* Mutex guarding the read end of the workqueue (must be held to pop from
* workqueue_items) */
gpr_mu workqueue_read_mu;
/* Queue of closures to be executed */
gpr_mpscq workqueue_items;
/* Count of items in workqueue_items */
gpr_atm workqueue_item_count;
/* Wakeup fd used to wake pollers to check the contents of workqueue_items */
grpc_wakeup_fd workqueue_wakeup_fd;
/* The fd of the underlying epoll set */ /* The fd of the underlying epoll set */
int epoll_fd; int epoll_fd;
@ -297,8 +287,6 @@ static __thread polling_island *g_current_thread_polling_island;
/* Forward declaration */ /* Forward declaration */
static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi); static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error);
#ifdef GRPC_TSAN #ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and /* Currently TSAN may incorrectly flag data races between epoll_ctl and
@ -311,13 +299,10 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
gpr_atm g_epoll_sync; gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */ #endif /* defined(GRPC_TSAN) */
static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
workqueue_enqueue, workqueue_enqueue, "workqueue"};
static void pi_add_ref(polling_island *pi); static void pi_add_ref(polling_island *pi);
static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi); static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef PI_REFCOUNT_DEBUG
static void pi_add_ref_dbg(polling_island *pi, const char *reason, static void pi_add_ref_dbg(polling_island *pi, const char *reason,
const char *file, int line) { const char *file, int line) {
long old_cnt = gpr_atm_acq_load(&pi->ref_count); long old_cnt = gpr_atm_acq_load(&pi->ref_count);
@ -333,36 +318,6 @@ static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)", gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, (old_cnt - 1), reason, file, line); (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
} }
static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
const char *file, int line,
const char *reason) {
if (workqueue != NULL) {
pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
}
return workqueue;
}
static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
const char *file, int line, const char *reason) {
if (workqueue != NULL) {
pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
}
}
#else
static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
if (workqueue != NULL) {
pi_add_ref((polling_island *)workqueue);
}
return workqueue;
}
static void workqueue_unref(grpc_exec_ctx *exec_ctx,
grpc_workqueue *workqueue) {
if (workqueue != NULL) {
pi_unref(exec_ctx, (polling_island *)workqueue);
}
}
#endif #endif
static void pi_add_ref(polling_island *pi) { static void pi_add_ref(polling_island *pi) {
@ -526,26 +481,16 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
*error = GRPC_ERROR_NONE; *error = GRPC_ERROR_NONE;
pi = gpr_malloc(sizeof(*pi)); pi = gpr_malloc(sizeof(*pi));
pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
gpr_mu_init(&pi->mu); gpr_mu_init(&pi->mu);
pi->fd_cnt = 0; pi->fd_cnt = 0;
pi->fd_capacity = 0; pi->fd_capacity = 0;
pi->fds = NULL; pi->fds = NULL;
pi->epoll_fd = -1; pi->epoll_fd = -1;
gpr_mu_init(&pi->workqueue_read_mu);
gpr_mpscq_init(&pi->workqueue_items);
gpr_atm_rel_store(&pi->workqueue_item_count, 0);
gpr_atm_rel_store(&pi->ref_count, 0); gpr_atm_rel_store(&pi->ref_count, 0);
gpr_atm_rel_store(&pi->poller_count, 0); gpr_atm_rel_store(&pi->poller_count, 0);
gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
err_desc)) {
goto done;
}
pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC); pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (pi->epoll_fd < 0) { if (pi->epoll_fd < 0) {
@ -553,8 +498,6 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
goto done; goto done;
} }
polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
if (initial_fd != NULL) { if (initial_fd != NULL) {
polling_island_add_fds_locked(pi, &initial_fd, 1, true, error); polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
} }
@ -573,11 +516,7 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
if (pi->epoll_fd >= 0) { if (pi->epoll_fd >= 0) {
close(pi->epoll_fd); close(pi->epoll_fd);
} }
GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
gpr_mu_destroy(&pi->workqueue_read_mu);
gpr_mpscq_destroy(&pi->workqueue_items);
gpr_mu_destroy(&pi->mu); gpr_mu_destroy(&pi->mu);
grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
gpr_free(pi->fds); gpr_free(pi->fds);
gpr_free(pi); gpr_free(pi);
} }
@ -722,45 +661,6 @@ static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
} }
} }
static void workqueue_maybe_wakeup(polling_island *pi) {
/* If this thread is the current poller, then it may be that it's about to
decrement the current poller count, so we need to look past this thread */
bool is_current_poller = (g_current_thread_polling_island == pi);
gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
/* Only issue a wakeup if it's likely that some poller could come in and take
it right now. Note that since we do an anticipatory mpscq_pop every poll
loop, it's ok if we miss the wakeup here, as we'll get the work item when
the next poller enters anyway. */
if (current_pollers > min_current_pollers_for_wakeup) {
GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
}
}
static void workqueue_move_items_to_parent(polling_island *q) {
polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
if (p == NULL) {
return;
}
gpr_mu_lock(&q->workqueue_read_mu);
int num_added = 0;
while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
if (n != NULL) {
gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
gpr_mpscq_push(&p->workqueue_items, n);
num_added++;
}
}
gpr_mu_unlock(&q->workqueue_read_mu);
if (num_added > 0) {
workqueue_maybe_wakeup(p);
}
workqueue_move_items_to_parent(p);
}
static polling_island *polling_island_merge(polling_island *p, static polling_island *polling_island_merge(polling_island *p,
polling_island *q, polling_island *q,
grpc_error **error) { grpc_error **error) {
@ -785,8 +685,6 @@ static polling_island *polling_island_merge(polling_island *p,
/* Add the 'merged_to' link from p --> q */ /* Add the 'merged_to' link from p --> q */
gpr_atm_rel_store(&p->merged_to, (gpr_atm)q); gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */ PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
workqueue_move_items_to_parent(p);
} }
/* else if p == q, nothing needs to be done */ /* else if p == q, nothing needs to be done */
@ -797,32 +695,6 @@ static polling_island *polling_island_merge(polling_island *p,
return q; return q;
} }
static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error) {
GPR_TIMER_BEGIN("workqueue.enqueue", 0);
grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
/* take a ref to the workqueue: otherwise it can happen that whatever events
* this kicks off ends up destroying the workqueue before this function
* completes */
GRPC_WORKQUEUE_REF(workqueue, "enqueue");
polling_island *pi = (polling_island *)workqueue;
gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
closure->error_data.error = error;
gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
if (last == 0) {
workqueue_maybe_wakeup(pi);
}
workqueue_move_items_to_parent(pi);
GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
GPR_TIMER_END("workqueue.enqueue", 0);
}
static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
polling_island *pi = (polling_island *)workqueue;
return workqueue == NULL ? grpc_schedule_on_exec_ctx
: &pi->workqueue_scheduler;
}
static grpc_error *polling_island_global_init() { static grpc_error *polling_island_global_init() {
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
@ -1081,14 +953,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
} }
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
gpr_mu_lock(&fd->po.mu);
grpc_workqueue *workqueue =
GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
gpr_mu_unlock(&fd->po.mu);
return workqueue;
}
/******************************************************************************* /*******************************************************************************
* Pollset Definitions * Pollset Definitions
*/ */
@ -1326,33 +1190,6 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->po.mu); gpr_mu_destroy(&pollset->po.mu);
} }
static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
polling_island *pi) {
if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
gpr_mu_unlock(&pi->workqueue_read_mu);
if (n != NULL) {
if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
workqueue_maybe_wakeup(pi);
}
grpc_closure *c = (grpc_closure *)n;
grpc_error *error = c->error_data.error;
#ifndef NDEBUG
c->scheduled = false;
#endif
c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
return true;
} else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
/* n == NULL might mean there's work but it's not available to be popped
* yet - try to ensure another workqueue wakes up to check shortly if so
*/
workqueue_maybe_wakeup(pi);
}
}
return false;
}
#define GRPC_EPOLL_MAX_EVENTS 100 #define GRPC_EPOLL_MAX_EVENTS 100
/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */ /* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
@ -1408,72 +1245,61 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
PI_ADD_REF(pi, "ps_work"); PI_ADD_REF(pi, "ps_work");
gpr_mu_unlock(&pollset->po.mu); gpr_mu_unlock(&pollset->po.mu);
/* If we get some workqueue work to do, it might end up completing an item on gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
the completion queue, so there's no need to poll... so we skip that and g_current_thread_polling_island = pi;
redo the complete loop to verify */
if (!maybe_do_workqueue_work(exec_ctx, pi)) { GRPC_SCHEDULING_START_BLOCKING_REGION;
gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1); ep_rv =
g_current_thread_polling_island = pi; epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask);
GRPC_SCHEDULING_END_BLOCKING_REGION;
GRPC_SCHEDULING_START_BLOCKING_REGION; if (ep_rv < 0) {
ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, if (errno != EINTR) {
sig_mask); gpr_asprintf(&err_msg,
GRPC_SCHEDULING_END_BLOCKING_REGION; "epoll_wait() epoll fd: %d failed with error: %d (%s)",
if (ep_rv < 0) { epoll_fd, errno, strerror(errno));
if (errno != EINTR) { append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
gpr_asprintf(&err_msg, } else {
"epoll_wait() epoll fd: %d failed with error: %d (%s)", /* We were interrupted. Save an interation by doing a zero timeout
epoll_fd, errno, strerror(errno)); epoll_wait to see if there are any other events of interest */
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker: %p received kick",
} else { (void *)pollset, (void *)worker);
/* We were interrupted. Save an interation by doing a zero timeout ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
epoll_wait to see if there are any other events of interest */
GRPC_POLLING_TRACE(
"pollset_work: pollset: %p, worker: %p received kick",
(void *)pollset, (void *)worker);
ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
}
} }
}
#ifdef GRPC_TSAN #ifdef GRPC_TSAN
/* See the definition of g_poll_sync for more details */ /* See the definition of g_poll_sync for more details */
gpr_atm_acq_load(&g_epoll_sync); gpr_atm_acq_load(&g_epoll_sync);
#endif /* defined(GRPC_TSAN) */ #endif /* defined(GRPC_TSAN) */
for (int i = 0; i < ep_rv; ++i) { for (int i = 0; i < ep_rv; ++i) {
void *data_ptr = ep_ev[i].data.ptr; void *data_ptr = ep_ev[i].data.ptr;
if (data_ptr == &pi->workqueue_wakeup_fd) { if (data_ptr == &polling_island_wakeup_fd) {
append_error(error, GRPC_POLLING_TRACE(
grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd), "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
err_desc); "%d) got merged",
maybe_do_workqueue_work(exec_ctx, pi); (void *)pollset, (void *)worker, epoll_fd);
} else if (data_ptr == &polling_island_wakeup_fd) { /* This means that our polling island is merged with a different
GRPC_POLLING_TRACE( island. We do not have to do anything here since the subsequent call
"pollset_work: pollset: %p, worker: %p polling island (epoll_fd: " to the function pollset_work_and_unlock() will pick up the correct
"%d) got merged", epoll_fd */
(void *)pollset, (void *)worker, epoll_fd); } else {
/* This means that our polling island is merged with a different grpc_fd *fd = data_ptr;
island. We do not have to do anything here since the subsequent call int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
to the function pollset_work_and_unlock() will pick up the correct int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
epoll_fd */ int write_ev = ep_ev[i].events & EPOLLOUT;
} else { if (read_ev || cancel) {
grpc_fd *fd = data_ptr; fd_become_readable(exec_ctx, fd, pollset);
int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); }
int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); if (write_ev || cancel) {
int write_ev = ep_ev[i].events & EPOLLOUT; fd_become_writable(exec_ctx, fd);
if (read_ev || cancel) {
fd_become_readable(exec_ctx, fd, pollset);
}
if (write_ev || cancel) {
fd_become_writable(exec_ctx, fd);
}
} }
} }
g_current_thread_polling_island = NULL;
gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
} }
g_current_thread_polling_island = NULL;
gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
GPR_ASSERT(pi != NULL); GPR_ASSERT(pi != NULL);
/* Before leaving, release the extra ref we added to the polling island. It /* Before leaving, release the extra ref we added to the polling island. It
@ -1864,7 +1690,6 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read, .fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write, .fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
.fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init, .pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown, .pollset_shutdown = pollset_shutdown,
@ -1882,10 +1707,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_set_add_fd = pollset_set_add_fd, .pollset_set_add_fd = pollset_set_add_fd,
.pollset_set_del_fd = pollset_set_del_fd, .pollset_set_del_fd = pollset_set_del_fd,
.workqueue_ref = workqueue_ref,
.workqueue_unref = workqueue_unref,
.workqueue_scheduler = workqueue_scheduler,
.shutdown_engine = shutdown_engine, .shutdown_engine = shutdown_engine,
}; };

@ -148,9 +148,9 @@ static void executor_thread(void *arg) {
static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error) { grpc_error *error) {
thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state); thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state);
gpr_atm cur_thread_count = gpr_atm_no_barrier_load(&g_cur_threads); size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
if (ts == NULL) { if (ts == NULL) {
ts = &g_thread_state[rand() % cur_thread_count]; ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
} }
gpr_mu_lock(&ts->mu); gpr_mu_lock(&ts->mu);
grpc_closure_list_append(&ts->elems, closure, error); grpc_closure_list_append(&ts->elems, closure, error);
@ -159,7 +159,7 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
ts->depth > MAX_DEPTH && cur_thread_count < g_max_threads; ts->depth > MAX_DEPTH && cur_thread_count < g_max_threads;
gpr_mu_unlock(&ts->mu); gpr_mu_unlock(&ts->mu);
if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) { if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) {
cur_thread_count = gpr_atm_no_barrier_load(&g_cur_threads); cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
if (cur_thread_count < g_max_threads) { if (cur_thread_count < g_max_threads) {
gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);

@ -47,7 +47,6 @@
#include <grpc/support/useful.h> #include <grpc/support/useful.h>
#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/workqueue.h"
#include "test/core/util/test_config.h" #include "test/core/util/test_config.h"
typedef struct test_pollset { typedef struct test_pollset {
@ -131,86 +130,6 @@ static void test_pollset_cleanup(grpc_exec_ctx *exec_ctx,
} }
} }
static void increment(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
++*(int *)arg;
}
/*
* Validate that merging two workqueues preserves the closures in each queue.
* This is a regression test for a bug in
* polling_island_merge()[ev_epoll_linux.c], where the parent relationship was
* inverted.
*/
#define NUM_FDS 2
#define NUM_POLLSETS 2
#define NUM_CLOSURES 4
static void test_pollset_queue_merge_items() {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
test_fd tfds[NUM_FDS];
int fds[NUM_FDS];
test_pollset pollsets[NUM_POLLSETS];
grpc_closure closures[NUM_CLOSURES];
int i;
int result = 0;
test_fd_init(tfds, fds, NUM_FDS);
test_pollset_init(pollsets, NUM_POLLSETS);
/* Two distinct polling islands, each with their own FD and pollset. */
for (i = 0; i < NUM_FDS; i++) {
grpc_pollset_add_fd(&exec_ctx, pollsets[i].pollset, tfds[i].fd);
grpc_exec_ctx_flush(&exec_ctx);
}
/* Enqeue the closures, 3 to polling island 0 and 1 to polling island 1. */
grpc_closure_init(
closures, increment, &result,
grpc_workqueue_scheduler(grpc_fd_get_polling_island(tfds[0].fd)));
grpc_closure_init(
closures + 1, increment, &result,
grpc_workqueue_scheduler(grpc_fd_get_polling_island(tfds[0].fd)));
grpc_closure_init(
closures + 2, increment, &result,
grpc_workqueue_scheduler(grpc_fd_get_polling_island(tfds[0].fd)));
grpc_closure_init(
closures + 3, increment, &result,
grpc_workqueue_scheduler(grpc_fd_get_polling_island(tfds[1].fd)));
for (i = 0; i < NUM_CLOSURES; ++i) {
grpc_closure_sched(&exec_ctx, closures + i, GRPC_ERROR_NONE);
}
/* Merge the two polling islands. */
grpc_pollset_add_fd(&exec_ctx, pollsets[0].pollset, tfds[1].fd);
grpc_exec_ctx_flush(&exec_ctx);
/*
* Execute the closures, verify we see each one execute when executing work on
* the merged polling island.
*/
grpc_pollset_worker *worker = NULL;
for (i = 0; i < NUM_CLOSURES; ++i) {
const gpr_timespec deadline = gpr_time_add(
gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(2, GPR_TIMESPAN));
gpr_mu_lock(pollsets[1].mu);
GRPC_LOG_IF_ERROR(
"grpc_pollset_work",
grpc_pollset_work(&exec_ctx, pollsets[1].pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC), deadline));
gpr_mu_unlock(pollsets[1].mu);
}
GPR_ASSERT(result == NUM_CLOSURES);
test_fd_cleanup(&exec_ctx, tfds, NUM_FDS);
test_pollset_cleanup(&exec_ctx, pollsets, NUM_POLLSETS);
grpc_exec_ctx_finish(&exec_ctx);
}
#undef NUM_FDS
#undef NUM_POLLSETS
#undef NUM_CLOSURES
/* /*
* Cases to test: * Cases to test:
* case 1) Polling islands of both fd and pollset are NULL * case 1) Polling islands of both fd and pollset are NULL
@ -408,7 +327,6 @@ int main(int argc, char **argv) {
poll_strategy = grpc_get_poll_strategy_name(); poll_strategy = grpc_get_poll_strategy_name();
if (poll_strategy != NULL && strcmp(poll_strategy, "epollsig") == 0) { if (poll_strategy != NULL && strcmp(poll_strategy, "epollsig") == 0) {
test_add_fd_to_pollset(); test_add_fd_to_pollset();
test_pollset_queue_merge_items();
test_threading(); test_threading();
} else { } else {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,

Loading…
Cancel
Save