pull/12789/head
Craig Tiller 8 years ago
parent 2c75fd0f80
commit 8223f4172f
  1. 67
      src/core/lib/iomgr/ev_epollex_linux.cc

@ -54,6 +54,9 @@
// use-after-destruction) // use-after-destruction)
//#define GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP 1 //#define GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP 1
#define MAX_EPOLL_EVENTS 100
#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5
#ifndef NDEBUG #ifndef NDEBUG
grpc_tracer_flag grpc_trace_pollable_refcount = grpc_tracer_flag grpc_trace_pollable_refcount =
GRPC_TRACER_INITIALIZER(false, "pollable_refcount"); GRPC_TRACER_INITIALIZER(false, "pollable_refcount");
@ -83,6 +86,10 @@ struct pollable {
gpr_mu mu; gpr_mu mu;
grpc_pollset_worker *root_worker; grpc_pollset_worker *root_worker;
int event_cursor;
int event_count;
struct epoll_event events[MAX_EPOLL_EVENTS];
}; };
static const char *pollable_type_string(pollable_type t) { static const char *pollable_type_string(pollable_type t) {
@ -174,9 +181,6 @@ struct grpc_pollset_worker {
pwlink links[PWLINK_COUNT]; pwlink links[PWLINK_COUNT];
}; };
#define MAX_EPOLL_EVENTS 100
#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5
struct grpc_pollset { struct grpc_pollset {
gpr_mu mu; gpr_mu mu;
pollable *active_pollable; pollable *active_pollable;
@ -184,10 +188,6 @@ struct grpc_pollset {
grpc_closure *shutdown_closure; grpc_closure *shutdown_closure;
grpc_pollset_worker *root_worker; grpc_pollset_worker *root_worker;
int containing_pollset_set_count; int containing_pollset_set_count;
int event_cursor;
int event_count;
struct epoll_event events[MAX_EPOLL_EVENTS];
}; };
/******************************************************************************* /*******************************************************************************
@ -725,15 +725,15 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset_maybe_finish_shutdown(exec_ctx, pollset); pollset_maybe_finish_shutdown(exec_ctx, pollset);
} }
static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, static grpc_error *pollable_process_events(grpc_exec_ctx *exec_ctx, grpc_pollset*pollset,
grpc_pollset *pollset, bool drain) { pollable *pollable_obj, bool drain) {
static const char *err_desc = "pollset_process_events"; static const char *err_desc = "pollset_process_events";
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) && for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) &&
pollset->event_cursor != pollset->event_count; pollable_obj->event_cursor != pollable_obj->event_count;
i++) { i++) {
int n = pollset->event_cursor++; int n = pollable_obj->event_cursor++;
struct epoll_event *ev = &pollset->events[n]; struct epoll_event *ev = &pollable_obj->events[n];
void *data_ptr = ev->data.ptr; void *data_ptr = ev->data.ptr;
if (1 & (intptr_t)data_ptr) { if (1 & (intptr_t)data_ptr) {
if (GRPC_TRACER_ON(grpc_polling_trace)) { if (GRPC_TRACER_ON(grpc_polling_trace)) {
@ -770,8 +770,6 @@ static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx,
static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
POLLABLE_UNREF(pollset->active_pollable, "pollset"); POLLABLE_UNREF(pollset->active_pollable, "pollset");
pollset->active_pollable = NULL; pollset->active_pollable = NULL;
GRPC_LOG_IF_ERROR("pollset_process_events",
pollset_process_events(exec_ctx, pollset, true));
} }
static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@ -790,7 +788,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
int r; int r;
do { do {
GRPC_STATS_INC_SYSCALL_POLL(exec_ctx); GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
r = epoll_wait(p->epfd, pollset->events, MAX_EPOLL_EVENTS, timeout); r = epoll_wait(p->epfd, p->events, MAX_EPOLL_EVENTS, timeout);
} while (r < 0 && errno == EINTR); } while (r < 0 && errno == EINTR);
if (timeout != 0) { if (timeout != 0) {
GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx);
@ -802,8 +800,8 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_log(GPR_DEBUG, "PS:%p poll %p got %d events", pollset, p, r); gpr_log(GPR_DEBUG, "PS:%p poll %p got %d events", pollset, p, r);
} }
pollset->event_cursor = 0; p->event_cursor = 0;
pollset->event_count = r; p->event_count = r;
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
@ -852,7 +850,7 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker, grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl, grpc_pollset_worker **worker_hdl,
grpc_millis deadline) { grpc_millis deadline) {
bool do_poll = true; bool do_poll = (pollset->shutdown_closure == nullptr);
if (worker_hdl != NULL) *worker_hdl = worker; if (worker_hdl != NULL) *worker_hdl = worker;
worker->initialized_cv = false; worker->initialized_cv = false;
worker->kicked = false; worker->kicked = false;
@ -899,23 +897,33 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_mu_unlock(&worker->pollable_obj->mu); gpr_mu_unlock(&worker->pollable_obj->mu);
return do_poll; return do_poll;
// && pollset->shutdown_closure == NULL && pollset->active_pollable ==
// worker->pollable_obj;
} }
static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker, grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl) { grpc_pollset_worker **worker_hdl) {
gpr_mu_lock(&pollset->mu);
gpr_mu_lock(&worker->pollable_obj->mu); gpr_mu_lock(&worker->pollable_obj->mu);
if (worker_remove(&worker->pollable_obj->root_worker, worker, switch (worker_remove(&worker->pollable_obj->root_worker, worker,
PWLINK_POLLABLE) == WRR_NEW_ROOT) { PWLINK_POLLABLE)) {
grpc_pollset_worker *new_root = worker->pollable_obj->root_worker; case WRR_NEW_ROOT: {
GPR_ASSERT(new_root->initialized_cv); // wakeup new poller
gpr_cv_signal(&new_root->cv); grpc_pollset_worker *new_root = worker->pollable_obj->root_worker;
GPR_ASSERT(new_root->initialized_cv);
gpr_cv_signal(&new_root->cv);
break;
}
case WRR_EMPTIED:
if (pollset->active_pollable != worker->pollable_obj) {
// pollable no longer being polled: flush events
pollable_process_events(exec_ctx, pollset, worker->pollable_obj, true);
}
break;
case WRR_REMOVED:
break;
} }
gpr_mu_unlock(&worker->pollable_obj->mu); gpr_mu_unlock(&worker->pollable_obj->mu);
POLLABLE_UNREF(worker->pollable_obj, "pollset_worker"); POLLABLE_UNREF(worker->pollable_obj, "pollset_worker");
gpr_mu_lock(&pollset->mu);
if (worker_remove(&pollset->root_worker, worker, PWLINK_POLLSET) == if (worker_remove(&pollset->root_worker, worker, PWLINK_POLLSET) ==
WRR_EMPTIED) { WRR_EMPTIED) {
pollset_maybe_finish_shutdown(exec_ctx, pollset); pollset_maybe_finish_shutdown(exec_ctx, pollset);
@ -957,12 +965,14 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (begin_worker(exec_ctx, pollset, WORKER_PTR, worker_hdl, deadline)) { if (begin_worker(exec_ctx, pollset, WORKER_PTR, worker_hdl, deadline)) {
gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
gpr_tls_set(&g_current_thread_worker, (intptr_t)WORKER_PTR); gpr_tls_set(&g_current_thread_worker, (intptr_t)WORKER_PTR);
if (pollset->event_cursor == pollset->event_count) { if (WORKER_PTR->pollable_obj->event_cursor == WORKER_PTR->pollable_obj->event_count) {
append_error(&error, pollset_epoll(exec_ctx, pollset, append_error(&error, pollset_epoll(exec_ctx, pollset,
WORKER_PTR->pollable_obj, deadline), WORKER_PTR->pollable_obj, deadline),
err_desc); err_desc);
} }
append_error(&error, pollset_process_events(exec_ctx, pollset, false), append_error(&error,
pollable_process_events(
exec_ctx, pollset, WORKER_PTR->pollable_obj, false),
err_desc); err_desc);
grpc_exec_ctx_flush(exec_ctx); grpc_exec_ctx_flush(exec_ctx);
gpr_tls_set(&g_current_thread_pollset, 0); gpr_tls_set(&g_current_thread_pollset, 0);
@ -973,6 +983,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
#ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP #ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP
gpr_free(worker); gpr_free(worker);
#endif #endif
#undef WORKER_PTR
return error; return error;
} }

Loading…
Cancel
Save