From 8223f4172f0a5739813fe6bb82863e68a2f588c9 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 17 Oct 2017 22:05:55 +0000 Subject: [PATCH] Fixes --- src/core/lib/iomgr/ev_epollex_linux.cc | 67 +++++++++++++++----------- 1 file changed, 39 insertions(+), 28 deletions(-) diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 566986a7747..fb94b8e5134 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -54,6 +54,9 @@ // use-after-destruction) //#define GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP 1 +#define MAX_EPOLL_EVENTS 100 +#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5 + #ifndef NDEBUG grpc_tracer_flag grpc_trace_pollable_refcount = GRPC_TRACER_INITIALIZER(false, "pollable_refcount"); @@ -83,6 +86,10 @@ struct pollable { gpr_mu mu; grpc_pollset_worker *root_worker; + + int event_cursor; + int event_count; + struct epoll_event events[MAX_EPOLL_EVENTS]; }; static const char *pollable_type_string(pollable_type t) { @@ -174,9 +181,6 @@ struct grpc_pollset_worker { pwlink links[PWLINK_COUNT]; }; -#define MAX_EPOLL_EVENTS 100 -#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5 - struct grpc_pollset { gpr_mu mu; pollable *active_pollable; @@ -184,10 +188,6 @@ struct grpc_pollset { grpc_closure *shutdown_closure; grpc_pollset_worker *root_worker; int containing_pollset_set_count; - - int event_cursor; - int event_count; - struct epoll_event events[MAX_EPOLL_EVENTS]; }; /******************************************************************************* @@ -725,15 +725,15 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset_maybe_finish_shutdown(exec_ctx, pollset); } -static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, bool drain) { +static grpc_error *pollable_process_events(grpc_exec_ctx *exec_ctx, grpc_pollset*pollset, + pollable *pollable_obj, bool drain) { static const char *err_desc = "pollset_process_events"; grpc_error *error = GRPC_ERROR_NONE; for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) && - pollset->event_cursor != pollset->event_count; + pollable_obj->event_cursor != pollable_obj->event_count; i++) { - int n = pollset->event_cursor++; - struct epoll_event *ev = &pollset->events[n]; + int n = pollable_obj->event_cursor++; + struct epoll_event *ev = &pollable_obj->events[n]; void *data_ptr = ev->data.ptr; if (1 & (intptr_t)data_ptr) { if (GRPC_TRACER_ON(grpc_polling_trace)) { @@ -770,8 +770,6 @@ static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { POLLABLE_UNREF(pollset->active_pollable, "pollset"); pollset->active_pollable = NULL; - GRPC_LOG_IF_ERROR("pollset_process_events", - pollset_process_events(exec_ctx, pollset, true)); } static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -790,7 +788,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int r; do { GRPC_STATS_INC_SYSCALL_POLL(exec_ctx); - r = epoll_wait(p->epfd, pollset->events, MAX_EPOLL_EVENTS, timeout); + r = epoll_wait(p->epfd, p->events, MAX_EPOLL_EVENTS, timeout); } while (r < 0 && errno == EINTR); if (timeout != 0) { GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); @@ -802,8 +800,8 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_log(GPR_DEBUG, "PS:%p poll %p got %d events", pollset, p, r); } - pollset->event_cursor = 0; - pollset->event_count = r; + p->event_cursor = 0; + p->event_count = r; return GRPC_ERROR_NONE; } @@ -852,7 +850,7 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl, grpc_millis deadline) { - bool do_poll = true; + bool do_poll = (pollset->shutdown_closure == nullptr); if (worker_hdl != NULL) *worker_hdl = worker; worker->initialized_cv = false; worker->kicked = false; @@ -899,23 +897,33 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_unlock(&worker->pollable_obj->mu); return do_poll; - // && pollset->shutdown_closure == NULL && pollset->active_pollable == - // worker->pollable_obj; } static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl) { + gpr_mu_lock(&pollset->mu); gpr_mu_lock(&worker->pollable_obj->mu); - if (worker_remove(&worker->pollable_obj->root_worker, worker, - PWLINK_POLLABLE) == WRR_NEW_ROOT) { - grpc_pollset_worker *new_root = worker->pollable_obj->root_worker; - GPR_ASSERT(new_root->initialized_cv); - gpr_cv_signal(&new_root->cv); + switch (worker_remove(&worker->pollable_obj->root_worker, worker, + PWLINK_POLLABLE)) { + case WRR_NEW_ROOT: { + // wakeup new poller + grpc_pollset_worker *new_root = worker->pollable_obj->root_worker; + GPR_ASSERT(new_root->initialized_cv); + gpr_cv_signal(&new_root->cv); + break; + } + case WRR_EMPTIED: + if (pollset->active_pollable != worker->pollable_obj) { + // pollable no longer being polled: flush events + pollable_process_events(exec_ctx, pollset, worker->pollable_obj, true); + } + break; + case WRR_REMOVED: + break; } gpr_mu_unlock(&worker->pollable_obj->mu); POLLABLE_UNREF(worker->pollable_obj, "pollset_worker"); - gpr_mu_lock(&pollset->mu); if (worker_remove(&pollset->root_worker, worker, PWLINK_POLLSET) == WRR_EMPTIED) { pollset_maybe_finish_shutdown(exec_ctx, pollset); @@ -957,12 +965,14 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (begin_worker(exec_ctx, pollset, WORKER_PTR, worker_hdl, deadline)) { gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_worker, (intptr_t)WORKER_PTR); - if (pollset->event_cursor == pollset->event_count) { + if (WORKER_PTR->pollable_obj->event_cursor == WORKER_PTR->pollable_obj->event_count) { append_error(&error, pollset_epoll(exec_ctx, pollset, WORKER_PTR->pollable_obj, deadline), err_desc); } - append_error(&error, pollset_process_events(exec_ctx, pollset, false), + append_error(&error, + pollable_process_events( + exec_ctx, pollset, WORKER_PTR->pollable_obj, false), err_desc); grpc_exec_ctx_flush(exec_ctx); gpr_tls_set(&g_current_thread_pollset, 0); @@ -973,6 +983,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, #ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP gpr_free(worker); #endif +#undef WORKER_PTR return error; }