diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index c71b1fe8538..172847f6c84 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -601,11 +601,20 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { grpc_lfev_set_ready(exec_ctx, &fd->write_closure); } +static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset) { + if (pollset->shutdown_closure != NULL && pollset->num_pollers == 0 && + pollset->po.pss_master == NULL) { + grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); + } +} + /* pollset->po.mu lock must be held by the caller before calling this */ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { GPR_ASSERT(pollset->shutdown_closure == NULL); pollset->shutdown_closure = closure; + gpr_atm_no_barrier_store(&pollset->shutdown_atm, 1); if (pollset->num_pollers > 0) { struct epoll_event ev = {.events = EPOLLIN, .data.ptr = &pollset->pollset_wakeup}; @@ -613,9 +622,16 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, &ev); GRPC_LOG_IF_ERROR("pollset_shutdown", grpc_wakeup_fd_wakeup(&pollset->pollset_wakeup)); - } else { - grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); } + if (pollset->root_worker != NULL) { + for (grpc_pollset_worker *worker = pollset->root_worker->next; + worker != pollset->root_worker; worker = worker->next) { + if (worker->initialized_cv) { + gpr_cv_signal(&worker->cv); + } + } + } + pollset_maybe_finish_shutdown(exec_ctx, pollset); } /* pollset_shutdown is guaranteed to be called before pollset_destroy. */ @@ -719,6 +735,7 @@ static void end_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, pollset->root_worker = worker->next; worker->prev->next = worker->next; worker->next->prev = worker->prev; + gpr_cv_signal(&pollset->root_worker->cv); } } else { worker->prev->next = worker->next; @@ -747,9 +764,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&pollset->po.mu); pollset->num_pollers--; - if (pollset->num_pollers == 0 && pollset->shutdown_closure != NULL) { - grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); - } + pollset_maybe_finish_shutdown(exec_ctx, pollset); } end_worker(pollset, &worker, worker_hdl); return error; @@ -979,12 +994,14 @@ static void pss_add_obj(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, obj->pss_prev->pss_next = obj; obj->pss_next->pss_prev = obj; } - gpr_mu_unlock(&obj->mu); switch (type) { case PSS_FD: + REF_BY((grpc_fd *)obj, 2, "pollset_set"); + gpr_mu_unlock(&obj->mu); pss_broadcast_fd(exec_ctx, pss, obj); break; case PSS_POLLSET: + gpr_mu_unlock(&obj->mu); pss_broadcast_pollset(exec_ctx, pss, obj); break; case PSS_POLLSET_SET: @@ -998,6 +1015,7 @@ static void pss_add_obj(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, static void pss_del_obj(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, pss_obj *obj, pss_obj_type type) { + bool unref = false; pss = pss_ref_and_lock_master(pss); gpr_mu_lock(&obj->mu); obj->pss_refs--; @@ -1012,10 +1030,12 @@ static void pss_del_obj(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, obj->pss_next->pss_prev = obj->pss_prev; obj->pss_prev->pss_next = obj->pss_next; } + unref = true; } gpr_mu_unlock(&obj->mu); gpr_mu_unlock(&pss->po.mu); pss_unref(pss); + if (unref && type == PSS_FD) UNREF_BY((grpc_fd *)obj, 2, "pollset_set"); } static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,