From e3a69338c38fc63a8a7e3d0e61802b9dff13439b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 18 Apr 2017 16:45:40 +0000 Subject: [PATCH] Fix wakeup bugs --- src/core/lib/iomgr/ev_epollex_linux.c | 73 +++++++++++++++++++-------- 1 file changed, 52 insertions(+), 21 deletions(-) diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index af2908f16d4..4f221a5f22f 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -666,6 +666,7 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) { grpc_pollset_worker *worker = pollset->root_worker; do { if (worker->initialized_cv) { + worker->kicked = true; gpr_cv_signal(&worker->cv); } else { append_error(&error, grpc_wakeup_fd_wakeup(&worker->pollable->wakeup), @@ -727,6 +728,7 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, if (grpc_polling_trace) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p); } + specific_worker->kicked = true; gpr_cv_signal(&specific_worker->cv); return GRPC_ERROR_NONE; } @@ -850,13 +852,16 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int timeout = poll_deadline_to_millis_timeout(deadline, now); if (grpc_polling_trace) { - gpr_log(GPR_DEBUG, "PS:%p poll for %dms", pollset, timeout); + gpr_log(GPR_DEBUG, "PS:%p poll %p for %dms", pollset, p, timeout); } if (timeout != 0) { GRPC_SCHEDULING_START_BLOCKING_REGION; } - int r = epoll_wait(p->epfd, events, MAX_EPOLL_EVENTS, timeout); + int r; + do { + r = epoll_wait(p->epfd, events, MAX_EPOLL_EVENTS, timeout); + } while (r < 0 && errno == EINTR); if (timeout != 0) { GRPC_SCHEDULING_END_BLOCKING_REGION; } @@ -864,7 +869,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); if (grpc_polling_trace) { - gpr_log(GPR_DEBUG, "PS:%p poll got %d events", pollset, r); + gpr_log(GPR_DEBUG, "PS:%p poll %p got %d events", pollset, p, r); } grpc_error *error = GRPC_ERROR_NONE; @@ -872,7 +877,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, void *data_ptr = events[i].data.ptr; if (data_ptr == &global_wakeup_fd) { if (grpc_polling_trace) { - gpr_log(GPR_DEBUG, "PS:%p poll got global_wakeup_fd", pollset); + gpr_log(GPR_DEBUG, "PS:%p poll %p got global_wakeup_fd", pollset, p); } grpc_timer_consume_kick(); @@ -880,7 +885,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, err_desc); } else if (data_ptr == &p->wakeup) { if (grpc_polling_trace) { - gpr_log(GPR_DEBUG, "PS:%p poll got pollset_wakeup", pollset); + gpr_log(GPR_DEBUG, "PS:%p poll %p got pollset_wakeup", pollset, p); } append_error(&error, grpc_wakeup_fd_consume_wakeup(&p->wakeup), err_desc); } else { @@ -890,11 +895,11 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0; bool write_ev = (events[i].events & EPOLLOUT) != 0; if (grpc_polling_trace) { - gpr_log( - GPR_DEBUG, - "PS:%p poll got fd %p(%d/%d): is_wq=%d cancel=%d read=%d write=%d", - pollset, fd, fd->fd, fd->workqueue_wakeup_fd.read_fd, is_workqueue, - cancel, read_ev, write_ev); + gpr_log(GPR_DEBUG, + "PS:%p poll %p got fd %p(%d/%d): is_wq=%d cancel=%d read=%d " + "write=%d", + pollset, p, fd, fd->fd, fd->workqueue_wakeup_fd.read_fd, + is_workqueue, cancel, read_ev, write_ev); } if (is_workqueue) { append_error(&error, @@ -956,8 +961,9 @@ static worker_remove_result worker_remove(grpc_pollset_worker **root, /* Return true if this thread should poll */ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, - grpc_pollset_worker **worker_hdl, + grpc_pollset_worker **worker_hdl, gpr_timespec *now, gpr_timespec deadline) { + bool do_poll = true; if (worker_hdl != NULL) *worker_hdl = worker; worker->initialized_cv = false; worker->kicked = false; @@ -972,18 +978,43 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, if (!worker_insert(&worker->pollable->root_worker, PWL_POLLABLE, worker)) { worker->initialized_cv = true; gpr_cv_init(&worker->cv); - while (worker->pollable->root_worker != worker) { - if (gpr_cv_wait(&worker->cv, &pollset->current_pollable->po.mu, - deadline)) { - return false; - } - if (worker->kicked) { - return false; + if (worker->pollable != &pollset->pollable) { + gpr_mu_unlock(&pollset->pollable.po.mu); + } + if (grpc_polling_trace && worker->pollable->root_worker != worker) { + gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset, + worker->pollable, worker, + poll_deadline_to_millis_timeout(deadline, *now)); + } + while (do_poll && worker->pollable->root_worker != worker) { + if (gpr_cv_wait(&worker->cv, &worker->pollable->po.mu, deadline)) { + if (grpc_polling_trace) { + gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset, + worker->pollable, worker); + } + do_poll = false; + } else if (worker->kicked) { + if (grpc_polling_trace) { + gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset, worker->pollable, + worker); + } + do_poll = false; + } else if (grpc_polling_trace && + worker->pollable->root_worker != worker) { + gpr_log(GPR_DEBUG, "PS:%p spurious_wakeup %p w=%p", pollset, + worker->pollable, worker); } } + if (worker->pollable != &pollset->pollable) { + gpr_mu_unlock(&worker->pollable->po.mu); + gpr_mu_lock(&pollset->pollable.po.mu); + gpr_mu_lock(&worker->pollable->po.mu); + } + *now = gpr_now(now->clock_type); } - return pollset->shutdown_closure == NULL; + return do_poll && pollset->shutdown_closure == NULL && + pollset->current_pollable == worker->pollable; } static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -1010,7 +1041,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { grpc_pollset_worker worker; - if (grpc_polling_trace) { + if (0 && grpc_polling_trace) { gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRId64 ".%09d deadline=%" PRId64 ".%09d kwp=%d root_worker=%p", pollset, worker_hdl, &worker, now.tv_sec, now.tv_nsec, @@ -1026,7 +1057,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (pollset->current_pollable != &pollset->pollable) { gpr_mu_lock(&pollset->current_pollable->po.mu); } - if (begin_worker(pollset, &worker, worker_hdl, deadline)) { + if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) { gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); GPR_ASSERT(!pollset->shutdown_closure);