Merge pull request #11816 from sreecha/sreek-epoll1

More changes to epoll1 poller  (not enabled in tests yet)
pull/11913/head
Sree Kuchibhotla 8 years ago committed by GitHub
commit e8e05f90a4
  1. 1
      build.yaml
  2. 282
      src/core/lib/iomgr/ev_epoll1_linux.c
  3. 8
      src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
  4. 8
      src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
  5. 12
      src/core/lib/iomgr/ev_epollex_linux.c
  6. 10
      src/core/lib/iomgr/ev_epollsig_linux.c
  7. 14
      src/core/lib/iomgr/lockfree_event.c
  8. 5
      src/core/lib/iomgr/lockfree_event.h
  9. 3
      test/core/surface/completion_queue_threading_test.c
  10. 3
      tools/run_tests/generated/tests.json

@ -4496,6 +4496,7 @@ targets:
- grpc
- gpr_test_util
- gpr
timeout_seconds: 1200
- name: writes_per_rpc_test
gtest: true
cpu_cost: 0.5

@ -45,6 +45,7 @@
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
#include "src/core/lib/support/string.h"
static grpc_wakeup_fd global_wakeup_fd;
static int g_epfd;
@ -77,8 +78,21 @@ static void fd_global_shutdown(void);
typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
static const char *kick_state_string(kick_state st) {
switch (st) {
case UNKICKED:
return "UNKICKED";
case KICKED:
return "KICKED";
case DESIGNATED_POLLER:
return "DESIGNATED_POLLER";
}
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
struct grpc_pollset_worker {
kick_state kick_state;
int kick_state_mutator; // which line of code last changed kick state
bool initialized_cv;
grpc_pollset_worker *next;
grpc_pollset_worker *prev;
@ -86,6 +100,12 @@ struct grpc_pollset_worker {
grpc_closure_list schedule_on_end_work;
};
#define SET_KICK_STATE(worker, state) \
do { \
(worker)->kick_state = (state); \
(worker)->kick_state_mutator = __LINE__; \
} while (false)
#define MAX_NEIGHBOURHOODS 1024
typedef struct pollset_neighbourhood {
@ -100,10 +120,15 @@ struct grpc_pollset {
bool reassigning_neighbourhood;
grpc_pollset_worker *root_worker;
bool kicked_without_poller;
/* Set to true if the pollset is observed to have no workers available to
* poll */
bool seen_inactive;
bool shutting_down; /* Is the pollset shutting down ? */
bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
bool shutting_down; /* Is the pollset shutting down ? */
grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
/* Number of workers who are *about-to* attach themselves to the pollset
* worker list */
int begin_refs;
grpc_pollset *next;
@ -263,29 +288,23 @@ static bool fd_is_shutdown(grpc_fd *fd) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
/* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll
sets (This can happen briefly during polling island merges). In such cases
it does not really matter which notifer is set as the read_notifier_pollset
(They would both point to the same polling island anyway) */
grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Use release store to match with acquire load in fd_get_read_notifier */
gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
/*******************************************************************************
@ -410,18 +429,28 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
if (pollset->root_worker != NULL) {
grpc_pollset_worker *worker = pollset->root_worker;
do {
if (worker->initialized_cv) {
worker->kick_state = KICKED;
gpr_cv_signal(&worker->cv);
} else {
worker->kick_state = KICKED;
append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
"pollset_shutdown");
switch (worker->kick_state) {
case KICKED:
break;
case UNKICKED:
SET_KICK_STATE(worker, KICKED);
if (worker->initialized_cv) {
gpr_cv_signal(&worker->cv);
}
break;
case DESIGNATED_POLLER:
SET_KICK_STATE(worker, KICKED);
append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
"pollset_kick_all");
break;
}
worker = worker->next;
} while (worker != pollset->root_worker);
}
// TODO: sreek. Check if we need to set 'kicked_without_poller' to true here
// in the else case
return error;
}
@ -437,7 +466,9 @@ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
GPR_ASSERT(pollset->shutdown_closure == NULL);
GPR_ASSERT(!pollset->shutting_down);
pollset->shutdown_closure = closure;
pollset->shutting_down = true;
GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
@ -510,10 +541,14 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline) {
if (worker_hdl != NULL) *worker_hdl = worker;
worker->initialized_cv = false;
worker->kick_state = UNKICKED;
SET_KICK_STATE(worker, UNKICKED);
worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
pollset->begin_refs++;
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, "PS:%p BEGIN_STARTS:%p", pollset, worker);
}
if (pollset->seen_inactive) {
// pollset has been observed to be inactive, we need to move back to the
// active list
@ -529,6 +564,11 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
retry_lock_neighbourhood:
gpr_mu_lock(&neighbourhood->mu);
gpr_mu_lock(&pollset->mu);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
pollset, worker, kick_state_string(worker->kick_state),
is_reassigning);
}
if (pollset->seen_inactive) {
if (neighbourhood != pollset->neighbourhood) {
gpr_mu_unlock(&neighbourhood->mu);
@ -539,8 +579,14 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
pollset->seen_inactive = false;
if (neighbourhood->active_root == NULL) {
neighbourhood->active_root = pollset->next = pollset->prev = pollset;
if (gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
worker->kick_state = DESIGNATED_POLLER;
/* TODO: sreek. Why would this worker state be other than UNKICKED
* here ? (since the worker isn't added to the pollset yet, there is no
* way it can be "found" by other threads to get kicked). */
/* If there is no designated poller, make this the designated poller */
if (worker->kick_state == UNKICKED &&
gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
SET_KICK_STATE(worker, DESIGNATED_POLLER);
}
} else {
pollset->next = neighbourhood->active_root;
@ -554,24 +600,53 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
}
gpr_mu_unlock(&neighbourhood->mu);
}
worker_insert(pollset, worker);
pollset->begin_refs--;
if (worker->kick_state == UNKICKED) {
if (worker->kick_state == UNKICKED && !pollset->kicked_without_poller) {
GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
worker->initialized_cv = true;
gpr_cv_init(&worker->cv);
while (worker->kick_state == UNKICKED &&
pollset->shutdown_closure == NULL) {
while (worker->kick_state == UNKICKED && !pollset->shutting_down) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
pollset, worker, kick_state_string(worker->kick_state),
pollset->shutting_down);
}
if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
worker->kick_state == UNKICKED) {
worker->kick_state = KICKED;
/* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
received a kick */
SET_KICK_STATE(worker, KICKED);
}
}
*now = gpr_now(now->clock_type);
}
return worker->kick_state == DESIGNATED_POLLER &&
pollset->shutdown_closure == NULL;
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR,
"PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
"kicked_without_poller: %d",
pollset, worker, kick_state_string(worker->kick_state),
pollset->shutting_down, pollset->kicked_without_poller);
}
/* We release pollset lock in this function at a couple of places:
* 1. Briefly when assigning pollset to a neighbourhood
* 2. When doing gpr_cv_wait()
* It is possible that 'kicked_without_poller' was set to true during (1) and
* 'shutting_down' is set to true during (1) or (2). If either of them is
* true, this worker cannot do polling */
/* TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
* case; especially when the worker is the DESIGNATED_POLLER */
if (pollset->kicked_without_poller) {
pollset->kicked_without_poller = false;
return false;
}
return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down;
}
static bool check_neighbourhood_for_available_poller(
@ -591,10 +666,18 @@ static bool check_neighbourhood_for_available_poller(
case UNKICKED:
if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
(gpr_atm)inspect_worker)) {
inspect_worker->kick_state = DESIGNATED_POLLER;
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, " .. choose next poller to be %p",
inspect_worker);
}
SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
if (inspect_worker->initialized_cv) {
gpr_cv_signal(&inspect_worker->cv);
}
} else {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, " .. beaten to choose next poller");
}
}
// even if we didn't win the cas, there's a worker, we can stop
found_worker = true;
@ -607,9 +690,12 @@ static bool check_neighbourhood_for_available_poller(
break;
}
inspect_worker = inspect_worker->next;
} while (inspect_worker != inspect->root_worker);
} while (!found_worker && inspect_worker != inspect->root_worker);
}
if (!found_worker) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect);
}
inspect->seen_inactive = true;
if (inspect == neighbourhood->active_root) {
neighbourhood->active_root =
@ -627,15 +713,22 @@ static bool check_neighbourhood_for_available_poller(
static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p END_WORKER:%p", pollset, worker);
}
if (worker_hdl != NULL) *worker_hdl = NULL;
worker->kick_state = KICKED;
/* Make sure we appear kicked */
SET_KICK_STATE(worker, KICKED);
grpc_closure_list_move(&worker->schedule_on_end_work,
&exec_ctx->closure_list);
if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
if (worker->next != worker && worker->next->kick_state == UNKICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker);
}
GPR_ASSERT(worker->next->initialized_cv);
gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
worker->next->kick_state = DESIGNATED_POLLER;
SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
gpr_cv_signal(&worker->next->cv);
if (grpc_exec_ctx_has_work(exec_ctx)) {
gpr_mu_unlock(&pollset->mu);
@ -644,9 +737,9 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
} else {
gpr_atm_no_barrier_store(&g_active_poller, 0);
gpr_mu_unlock(&pollset->mu);
size_t poller_neighbourhood_idx =
(size_t)(pollset->neighbourhood - g_neighbourhoods);
gpr_mu_unlock(&pollset->mu);
bool found_worker = false;
bool scan_state[MAX_NEIGHBOURHOODS];
for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
@ -682,6 +775,9 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (worker->initialized_cv) {
gpr_cv_destroy(&worker->cv);
}
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, " .. remove worker");
}
if (EMPTIED == worker_remove(pollset, worker)) {
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
@ -702,16 +798,18 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->kicked_without_poller = false;
return GRPC_ERROR_NONE;
}
gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
GPR_ASSERT(!pollset->shutdown_closure);
GPR_ASSERT(!pollset->shutting_down);
GPR_ASSERT(!pollset->seen_inactive);
gpr_mu_unlock(&pollset->mu);
append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
err_desc);
gpr_mu_lock(&pollset->mu);
gpr_tls_set(&g_current_thread_worker, 0);
} else {
gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
}
end_worker(exec_ctx, pollset, &worker, worker_hdl);
gpr_tls_set(&g_current_thread_pollset, 0);
@ -720,46 +818,136 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
static grpc_error *pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_strvec log;
gpr_strvec_init(&log);
char *tmp;
gpr_asprintf(
&tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset,
specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset),
(void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker);
gpr_strvec_add(&log, tmp);
if (pollset->root_worker != NULL) {
gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}",
kick_state_string(pollset->root_worker->kick_state),
pollset->root_worker->next,
kick_state_string(pollset->root_worker->next->kick_state));
gpr_strvec_add(&log, tmp);
}
if (specific_worker != NULL) {
gpr_asprintf(&tmp, " worker_kick_state=%s",
kick_state_string(specific_worker->kick_state));
gpr_strvec_add(&log, tmp);
}
tmp = gpr_strvec_flatten(&log, NULL);
gpr_strvec_destroy(&log);
gpr_log(GPR_ERROR, "%s", tmp);
gpr_free(tmp);
}
if (specific_worker == NULL) {
if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
grpc_pollset_worker *root_worker = pollset->root_worker;
if (root_worker == NULL) {
pollset->kicked_without_poller = true;
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kicked_without_poller");
}
return GRPC_ERROR_NONE;
}
grpc_pollset_worker *next_worker = root_worker->next;
if (root_worker == next_worker &&
root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
&g_active_poller)) {
root_worker->kick_state = KICKED;
if (root_worker->kick_state == KICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. already kicked %p", root_worker);
}
SET_KICK_STATE(root_worker, KICKED);
return GRPC_ERROR_NONE;
} else if (next_worker->kick_state == KICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. already kicked %p", next_worker);
}
SET_KICK_STATE(next_worker, KICKED);
return GRPC_ERROR_NONE;
} else if (root_worker ==
next_worker && // only try and wake up a poller if
// there is no next worker
root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
&g_active_poller)) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kicked %p", root_worker);
}
SET_KICK_STATE(root_worker, KICKED);
return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
} else if (next_worker->kick_state == UNKICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kicked %p", next_worker);
}
GPR_ASSERT(next_worker->initialized_cv);
next_worker->kick_state = KICKED;
SET_KICK_STATE(next_worker, KICKED);
gpr_cv_signal(&next_worker->cv);
return GRPC_ERROR_NONE;
} else if (next_worker->kick_state == DESIGNATED_POLLER) {
if (root_worker->kick_state != DESIGNATED_POLLER) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(
GPR_ERROR,
" .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
root_worker, root_worker->initialized_cv, next_worker);
}
SET_KICK_STATE(root_worker, KICKED);
if (root_worker->initialized_cv) {
gpr_cv_signal(&root_worker->cv);
}
return GRPC_ERROR_NONE;
} else {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. non-root poller %p (root=%p)", next_worker,
root_worker);
}
SET_KICK_STATE(next_worker, KICKED);
return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
}
} else {
GPR_ASSERT(next_worker->kick_state == KICKED);
SET_KICK_STATE(next_worker, KICKED);
return GRPC_ERROR_NONE;
}
} else {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kicked while waking up");
}
return GRPC_ERROR_NONE;
}
} else if (specific_worker->kick_state == KICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. specific worker already kicked");
}
return GRPC_ERROR_NONE;
} else if (gpr_tls_get(&g_current_thread_worker) ==
(intptr_t)specific_worker) {
specific_worker->kick_state = KICKED;
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. mark %p kicked", specific_worker);
}
SET_KICK_STATE(specific_worker, KICKED);
return GRPC_ERROR_NONE;
} else if (specific_worker ==
(grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
specific_worker->kick_state = KICKED;
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kick active poller");
}
SET_KICK_STATE(specific_worker, KICKED);
return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
} else if (specific_worker->initialized_cv) {
specific_worker->kick_state = KICKED;
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kick waiting worker");
}
SET_KICK_STATE(specific_worker, KICKED);
gpr_cv_signal(&specific_worker->cv);
return GRPC_ERROR_NONE;
} else {
specific_worker->kick_state = KICKED;
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kick non-waiting worker");
}
SET_KICK_STATE(specific_worker, KICKED);
return GRPC_ERROR_NONE;
}
}
@ -805,6 +993,7 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
static void shutdown_engine(void) {
fd_global_shutdown();
pollset_global_shutdown();
close(g_epfd);
}
static const grpc_event_engine_vtable vtable = {
@ -841,9 +1030,6 @@ static const grpc_event_engine_vtable vtable = {
/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
* Create a dummy epoll_fd to make sure epoll support is available */
const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
/* TODO(ctiller): temporary, until this stabilizes */
if (!explicit_request) return NULL;
if (!grpc_has_wakeup_fd()) {
return NULL;
}
@ -862,6 +1048,8 @@ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
return NULL;
}
gpr_log(GPR_ERROR, "grpc epoll fd: %d", g_epfd);
return &vtable;
}

@ -1007,12 +1007,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
/*******************************************************************************
@ -1223,7 +1223,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll
@ -1235,7 +1235,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,

@ -560,12 +560,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
/*******************************************************************************
@ -696,11 +696,11 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
}
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
static void pollset_release_epoll_set(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,

@ -438,12 +438,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
/*******************************************************************************
@ -710,7 +710,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll
@ -722,7 +722,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
static grpc_error *fd_become_pollable_locked(grpc_fd *fd) {
@ -1049,8 +1049,8 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
/* Introduce a spurious completion.
If we do not, then it may be that the fd-specific epoll set consumed
a completion without being polled, leading to a missed edge going up. */
grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure);
grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure);
grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure, "read");
grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure, "write");
pollset_kick_all(exec_ctx, pollset);
pollset->current_pollable = &pollset->pollable;
if (append_error(&error, pollable_materialize(&pollset->pollable),

@ -933,12 +933,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
/*******************************************************************************
@ -1115,7 +1115,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll
@ -1127,7 +1127,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
@ -1731,7 +1731,7 @@ const grpc_event_engine_vtable *grpc_init_epollsig_linux(
if (!is_grpc_wakeup_signal_initialized) {
/* TODO(ctiller): when other epoll engines are ready, remove the true || to
* force this to be explitly chosen if needed */
if (true || explicit_request) {
if (explicit_request) {
grpc_use_signal(SIGRTMIN + 6);
} else {
return NULL;

@ -79,12 +79,12 @@ bool grpc_lfev_is_shutdown(gpr_atm *state) {
}
void grpc_lfev_notify_on(grpc_exec_ctx *exec_ctx, gpr_atm *state,
grpc_closure *closure) {
grpc_closure *closure, const char *variable) {
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(state);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "lfev_notify_on: %p curr=%p closure=%p", state,
(void *)curr, closure);
gpr_log(GPR_ERROR, "lfev_notify_on[%s]: %p curr=%p closure=%p", variable,
state, (void *)curr, closure);
}
switch (curr) {
case CLOSURE_NOT_READY: {
@ -149,7 +149,7 @@ bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state,
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(state);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "lfev_set_shutdown: %p curr=%p err=%s", state,
gpr_log(GPR_ERROR, "lfev_set_shutdown: %p curr=%p err=%s", state,
(void *)curr, grpc_error_string(shutdown_err));
}
switch (curr) {
@ -193,12 +193,14 @@ bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state,
GPR_UNREACHABLE_CODE(return false);
}
void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state) {
void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state,
const char *variable) {
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(state);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "lfev_set_ready: %p curr=%p", state, (void *)curr);
gpr_log(GPR_ERROR, "lfev_set_ready[%s]: %p curr=%p", variable, state,
(void *)curr);
}
switch (curr) {

@ -30,10 +30,11 @@ void grpc_lfev_destroy(gpr_atm *state);
bool grpc_lfev_is_shutdown(gpr_atm *state);
void grpc_lfev_notify_on(grpc_exec_ctx *exec_ctx, gpr_atm *state,
grpc_closure *closure);
grpc_closure *closure, const char *variable);
/* Returns true on first successful shutdown */
bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state,
grpc_error *shutdown_err);
void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state);
void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state,
const char *variable);
#endif /* GRPC_CORE_LIB_IOMGR_LOCKFREE_EVENT_H */

@ -190,7 +190,8 @@ static void consumer_thread(void *arg) {
gpr_log(GPR_INFO, "consumer %d phase 2", opt->id);
for (;;) {
ev = grpc_completion_queue_next(opt->cc, ten_seconds_time(), NULL);
ev = grpc_completion_queue_next(opt->cc,
gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL);
switch (ev.type) {
case GRPC_OP_COMPLETE:
GPR_ASSERT(ev.success);

@ -3993,7 +3993,8 @@
"mac",
"posix",
"windows"
]
],
"timeout_seconds": 1200
},
{
"args": [],

Loading…
Cancel
Save