From 8c6878be3a5f748ccbc68be261303a3b57837854 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 7 Apr 2017 15:15:30 -0700 Subject: [PATCH 1/3] clang-format --- src/core/lib/iomgr/ev_epollex_linux.c | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index c75b59c4c40..89b416a60e1 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -404,7 +404,10 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); } -static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { REF_BY(fd, 2, "return_workqueue"); return (grpc_workqueue*)fd; } +static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { + REF_BY(fd, 2, "return_workqueue"); + return (grpc_workqueue *)fd; +} #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, @@ -574,7 +577,8 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, return 0; } - static const gpr_timespec round_up = {.clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS-1}; + static const gpr_timespec round_up = { + .clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1}; timeout = gpr_time_sub(deadline, now); int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up)); return millis >= 1 ? millis : 1; @@ -634,9 +638,8 @@ static grpc_error *pollset_poll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } GRPC_SCHEDULING_START_BLOCKING_REGION; -int timeout=poll_deadline_to_millis_timeout(deadline, now); - int r = epoll_wait(pollset->epfd, events, MAX_EPOLL_EVENTS, - timeout); + int timeout = poll_deadline_to_millis_timeout(deadline, now); + int r = epoll_wait(pollset->epfd, events, MAX_EPOLL_EVENTS, timeout); GRPC_SCHEDULING_END_BLOCKING_REGION; if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); @@ -803,8 +806,9 @@ static void pss_destroy(grpc_pollset_set *pss) { GPR_ASSERT(pss->roots[PSS_FD] == NULL); GPR_ASSERT(pss->roots[PSS_POLLSET] == NULL); GPR_ASSERT(pss->roots[PSS_POLLSET_SET] == &pss->po); - for (pss_obj *child = pss->roots[PSS_POLLSET_SET]; child != &pss->po; child = child->pss_next) { - pss_unref((grpc_pollset_set*)child); + for (pss_obj *child = pss->roots[PSS_POLLSET_SET]; child != &pss->po; + child = child->pss_next) { + pss_unref((grpc_pollset_set *)child); } gpr_free(pss); } @@ -932,7 +936,8 @@ static void pss_merge(grpc_exec_ctx *exec_ctx, grpc_pollset_set *a, pss_merge_broadcast_and_patch(exec_ctx, a, b, PSS_FD); pss_merge_broadcast_and_patch(exec_ctx, a, b, PSS_POLLSET); b->po.pss_master = a; - a->roots[PSS_POLLSET_SET] = pss_splice(a->roots[PSS_POLLSET_SET], b->roots[PSS_POLLSET_SET]); + a->roots[PSS_POLLSET_SET] = + pss_splice(a->roots[PSS_POLLSET_SET], b->roots[PSS_POLLSET_SET]); gpr_mu_unlock(&a->po.mu); gpr_mu_unlock(&b->po.mu); pss_unref(a); From 2636f43a6789f63148448fdce2877aff6b0aaade Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 7 Apr 2017 15:16:56 -0700 Subject: [PATCH 2/3] less spam --- src/core/lib/iomgr/ev_epollex_linux.c | 36 ++++++++++++++++++--------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index 89b416a60e1..c71b1fe8538 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -1098,12 +1098,17 @@ static const grpc_event_engine_vtable vtable = { /* It is possible that GLIBC has epoll but the underlying kernel doesn't. * Create a dummy epoll_fd to make sure epoll support is available */ static bool is_epollex_available(void) { + static bool logged_why_not = false; + int fd = epoll_create1(EPOLL_CLOEXEC); if (fd < 0) { - gpr_log(GPR_ERROR, - "epoll_create1 failed with error: %d. Not using epollex polling " - "engine.", - fd); + if (!logged_why_not) { + gpr_log(GPR_ERROR, + "epoll_create1 failed with error: %d. Not using epollex polling " + "engine.", + fd); + logged_why_not = true; + } return false; } grpc_wakeup_fd wakeup; @@ -1119,19 +1124,26 @@ static bool is_epollex_available(void) { .data.ptr = NULL}; if (epoll_ctl(fd, EPOLL_CTL_ADD, wakeup.read_fd, &ev) != 0) { if (errno != EINVAL) { - gpr_log(GPR_ERROR, - "epoll_ctl with EPOLLEXCLUSIVE | EPOLLONESHOT failed with error: " - "%d. Not using epollex polling engine.", - errno); + if (!logged_why_not) { + gpr_log( + GPR_ERROR, + "epoll_ctl with EPOLLEXCLUSIVE | EPOLLONESHOT failed with error: " + "%d. Not using epollex polling engine.", + errno); + logged_why_not = true; + } close(fd); grpc_wakeup_fd_destroy(&wakeup); return false; } } else { - gpr_log(GPR_ERROR, - "epoll_ctl with EPOLLEXCLUSIVE | EPOLLONESHOT succeeded. This is " - "evidence of no EPOLLEXCLUSIVE support. Not using " - "epollex polling engine."); + if (!logged_why_not) { + gpr_log(GPR_ERROR, + "epoll_ctl with EPOLLEXCLUSIVE | EPOLLONESHOT succeeded. This is " + "evidence of no EPOLLEXCLUSIVE support. Not using " + "epollex polling engine."); + logged_why_not = true; + } close(fd); grpc_wakeup_fd_destroy(&wakeup); return false; From d9cd8f0abe6aa8562aaafb041b1db371247517d6 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 7 Apr 2017 16:26:02 -0700 Subject: [PATCH 3/3] shutdown progress --- src/core/lib/iomgr/ev_epollex_linux.c | 32 ++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index c71b1fe8538..172847f6c84 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -601,11 +601,20 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { grpc_lfev_set_ready(exec_ctx, &fd->write_closure); } +static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset) { + if (pollset->shutdown_closure != NULL && pollset->num_pollers == 0 && + pollset->po.pss_master == NULL) { + grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); + } +} + /* pollset->po.mu lock must be held by the caller before calling this */ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { GPR_ASSERT(pollset->shutdown_closure == NULL); pollset->shutdown_closure = closure; + gpr_atm_no_barrier_store(&pollset->shutdown_atm, 1); if (pollset->num_pollers > 0) { struct epoll_event ev = {.events = EPOLLIN, .data.ptr = &pollset->pollset_wakeup}; @@ -613,9 +622,16 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, &ev); GRPC_LOG_IF_ERROR("pollset_shutdown", grpc_wakeup_fd_wakeup(&pollset->pollset_wakeup)); - } else { - grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); } + if (pollset->root_worker != NULL) { + for (grpc_pollset_worker *worker = pollset->root_worker->next; + worker != pollset->root_worker; worker = worker->next) { + if (worker->initialized_cv) { + gpr_cv_signal(&worker->cv); + } + } + } + pollset_maybe_finish_shutdown(exec_ctx, pollset); } /* pollset_shutdown is guaranteed to be called before pollset_destroy. */ @@ -719,6 +735,7 @@ static void end_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, pollset->root_worker = worker->next; worker->prev->next = worker->next; worker->next->prev = worker->prev; + gpr_cv_signal(&pollset->root_worker->cv); } } else { worker->prev->next = worker->next; @@ -747,9 +764,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&pollset->po.mu); pollset->num_pollers--; - if (pollset->num_pollers == 0 && pollset->shutdown_closure != NULL) { - grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); - } + pollset_maybe_finish_shutdown(exec_ctx, pollset); } end_worker(pollset, &worker, worker_hdl); return error; @@ -979,12 +994,14 @@ static void pss_add_obj(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, obj->pss_prev->pss_next = obj; obj->pss_next->pss_prev = obj; } - gpr_mu_unlock(&obj->mu); switch (type) { case PSS_FD: + REF_BY((grpc_fd *)obj, 2, "pollset_set"); + gpr_mu_unlock(&obj->mu); pss_broadcast_fd(exec_ctx, pss, obj); break; case PSS_POLLSET: + gpr_mu_unlock(&obj->mu); pss_broadcast_pollset(exec_ctx, pss, obj); break; case PSS_POLLSET_SET: @@ -998,6 +1015,7 @@ static void pss_add_obj(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, static void pss_del_obj(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, pss_obj *obj, pss_obj_type type) { + bool unref = false; pss = pss_ref_and_lock_master(pss); gpr_mu_lock(&obj->mu); obj->pss_refs--; @@ -1012,10 +1030,12 @@ static void pss_del_obj(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, obj->pss_next->pss_prev = obj->pss_prev; obj->pss_prev->pss_next = obj->pss_next; } + unref = true; } gpr_mu_unlock(&obj->mu); gpr_mu_unlock(&pss->po.mu); pss_unref(pss); + if (unref && type == PSS_FD) UNREF_BY((grpc_fd *)obj, 2, "pollset_set"); } static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,