From 50480b2a058fc74c3cf934ae5ae9981b2417005e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 17 Apr 2017 16:34:52 +0000 Subject: [PATCH] Fixes --- src/core/lib/iomgr/ev_epollex_linux.c | 130 +++++++++++++++++--------- src/core/lib/iomgr/pollset.h | 2 +- 2 files changed, 86 insertions(+), 46 deletions(-) diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index eba1bf920c6..fdde21756bf 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -265,7 +265,7 @@ static gpr_mu fd_freelist_mu; #ifdef GRPC_FD_REF_COUNT_DEBUG #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__) -#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__) +#define UNREF_BY(ec, fd, n, reason) unref_by(ec, fd, n, reason, __FILE__, __LINE__) static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) { gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd, @@ -273,25 +273,14 @@ static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line); #else #define REF_BY(fd, n, reason) ref_by(fd, n) -#define UNREF_BY(fd, n, reason) unref_by(fd, n) +#define UNREF_BY(ec, fd, n, reason) unref_by(ec, fd, n) static void ref_by(grpc_fd *fd, int n) { #endif GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0); } -#ifdef GRPC_FD_REF_COUNT_DEBUG -static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file, - int line) { - gpr_atm old; - gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd, - (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst), - gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line); -#else -static void unref_by(grpc_fd *fd, int n) { - gpr_atm old; -#endif - old = gpr_atm_full_fetch_add(&fd->refst, -n); - if (old == n) { +static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { +grpc_fd *fd = arg; /* Add the fd to the freelist */ grpc_iomgr_unregister_object(&fd->iomgr_object); pollable_destroy(&fd->pollable); @@ -303,6 +292,22 @@ static void unref_by(grpc_fd *fd, int n) { grpc_lfev_destroy(&fd->write_closure); gpr_mu_unlock(&fd_freelist_mu); +} + +#ifdef GRPC_FD_REF_COUNT_DEBUG +static void unref_by(grpc_exec_ctx *exec_ctx, grpc_fd *fd, int n, const char *reason, const char *file, + int line) { + gpr_atm old; + gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd, + (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst), + gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line); +#else +static void unref_by(grpc_exec_ctx *exec_ctx, grpc_fd *fd, int n) { + gpr_atm old; +#endif + old = gpr_atm_full_fetch_add(&fd->refst, -n); + if (old == n) { + grpc_closure_sched(exec_ctx, grpc_closure_create(fd_destroy, fd, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } else { GPR_ASSERT(old > n); } @@ -406,7 +411,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); gpr_mu_unlock(&fd->pollable.po.mu); - UNREF_BY(fd, 2, reason); /* Drop the reference */ + UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */ GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); } @@ -459,7 +464,7 @@ static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, const char *file, int line, const char *reason) { if (workqueue != NULL) { - unref_by((grpc_fd *)workqueue, 2, file, line, reason); + unref_by(exec_ctx, (grpc_fd *)workqueue, 2, file, line, reason); } } #else @@ -473,7 +478,7 @@ static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) { static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { if (workqueue != NULL) { - unref_by((grpc_fd *)workqueue, 2); + unref_by(exec_ctx, (grpc_fd *)workqueue, 2); } } #endif @@ -495,7 +500,7 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, if (last == 0) { workqueue_wakeup(fd); } - UNREF_BY(fd, 2, "workqueue_enqueue"); + UNREF_BY(exec_ctx, fd, 2, "workqueue_enqueue"); } static void fd_invoke_workqueue(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { @@ -778,14 +783,12 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { grpc_lfev_set_ready(exec_ctx, &fd->write_closure); } -static grpc_error *fd_become_pollable(grpc_fd *fd) { +static grpc_error *fd_become_pollable_locked(grpc_fd *fd) { grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "fd_become_pollable"; - gpr_mu_lock(&fd->pollable.po.mu); if (append_error(&error, pollable_materialize(&fd->pollable), err_desc)) { append_error(&error, pollable_add_fd(&fd->pollable, fd), err_desc); } - gpr_mu_unlock(&fd->pollable.po.mu); return error; } @@ -810,10 +813,10 @@ static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) { } /* pollset_shutdown is guaranteed to be called before pollset_destroy. */ -static void pollset_destroy(grpc_pollset *pollset) { +static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { pollable_destroy(&pollset->pollable); if (pollset_is_pollable_fd(pollset, pollset->current_pollable)) { - UNREF_BY((grpc_fd *)pollset->current_pollable, 2, "pollset_pollable"); + UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable, 2, "pollset_pollable"); } } @@ -975,7 +978,7 @@ static void end_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_cv_destroy(&worker->cv); } if (pollset_is_pollable_fd(pollset, worker->pollable)) { - UNREF_BY((grpc_fd *)worker->pollable, 2, "one_poll"); + UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable, 2, "one_poll"); } } @@ -1031,35 +1034,45 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, return error; } -static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_fd *fd) { - grpc_error *error = GRPC_ERROR_NONE; - grpc_fd *unref_fd = NULL; +static void unref_fd_no_longer_poller(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + grpc_fd *fd = arg; + UNREF_BY(exec_ctx, fd, 2, "pollset_pollable"); +} + +/* expects pollsets locked, flag whether fd is locked or not */ +static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_fd *fd, bool fd_locked) { static const char *err_desc = "pollset_add_fd"; - gpr_mu_lock(&pollset->pollable.po.mu); + grpc_error *error = GRPC_ERROR_NONE; if (pollset->current_pollable == &g_empty_pollable) { /* empty pollable --> single fd pollable */ append_error(&error, pollset_kick_all(pollset), err_desc); pollset->current_pollable = &fd->pollable; - append_error(&error, fd_become_pollable(fd), err_desc); + if (!fd_locked) gpr_mu_lock(&fd->pollable.po.mu); + append_error(&error, fd_become_pollable_locked(fd), err_desc); + if (!fd_locked) gpr_mu_unlock(&fd->pollable.po.mu); REF_BY(fd, 2, "pollset_pollable"); } else if (pollset->current_pollable == &pollset->pollable) { append_error(&error, pollable_add_fd(pollset->current_pollable, fd), err_desc); } else if (pollset->current_pollable != &fd->pollable) { - unref_fd = (grpc_fd *)pollset->current_pollable; + grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable; pollset->current_pollable = &pollset->pollable; if (append_error(&error, pollable_materialize(&pollset->pollable), err_desc)) { - pollable_add_fd(&pollset->pollable, unref_fd); + pollable_add_fd(&pollset->pollable, had_fd); pollable_add_fd(&pollset->pollable, fd); } + grpc_closure_sched(exec_ctx, grpc_closure_create(unref_fd_no_longer_poller, had_fd, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } - gpr_mu_unlock(&pollset->pollable.po.mu); - if (unref_fd) { - UNREF_BY(unref_fd, 2, "pollset_pollable"); - } + return error; +} +static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_fd *fd) { + gpr_mu_lock(&pollset->pollable.po.mu); + grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd, false); + gpr_mu_unlock(&pollset->pollable.po.mu); GRPC_LOG_IF_ERROR("pollset_add_fd", error); } @@ -1202,9 +1215,9 @@ static void po_join(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b) { static void pg_notify(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b) { if (a->type == PO_FD && b->type == PO_POLLSET) { - pollset_add_fd(exec_ctx, (grpc_pollset *)b, (grpc_fd *)a); + pollset_add_fd_locked(exec_ctx, (grpc_pollset *)b, (grpc_fd *)a, true); } else if (a->type == PO_POLLSET && b->type == PO_FD) { - pollset_add_fd(exec_ctx, (grpc_pollset *)a, (grpc_fd *)b); + pollset_add_fd_locked(exec_ctx, (grpc_pollset *)a, (grpc_fd *)b, true); } } @@ -1212,7 +1225,14 @@ static void pg_broadcast(grpc_exec_ctx *exec_ctx, polling_group *from, polling_group *to) { for (polling_obj *a = from->po.next; a != &from->po; a = a->next) { for (polling_obj *b = to->po.next; b != &to->po; b = b->next) { + if (po_cmp(a, b) < 0) { + gpr_mu_lock(&a->mu); gpr_mu_lock(&b->mu); + } else { + GPR_ASSERT(po_cmp(a, b) != 0); + gpr_mu_lock(&b->mu); gpr_mu_lock(&a->mu); + } pg_notify(exec_ctx, a, b); + gpr_mu_unlock(&a->mu); gpr_mu_unlock(&b->mu); } } } @@ -1249,21 +1269,41 @@ static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg, /* assumes neither pg nor po are locked; consumes one ref to pg */ pg = pg_lock_latest(pg); /* pg locked */ + for (polling_obj *existing = pg->po.next /* skip pg - it's just a stub */; + existing != &pg->po; existing = existing->next) { + if (po_cmp(po, existing) < 0) { + gpr_mu_lock(&po->mu); + gpr_mu_lock(&existing->mu); + } else { + GPR_ASSERT(po_cmp(po, existing) != 0); + gpr_mu_lock(&existing->mu); + gpr_mu_lock(&po->mu); + } + /* pg, po, existing locked */ + if (po->group != NULL) { + gpr_mu_unlock(&pg->po.mu); + polling_group *po_group = pg_ref(po->group); + gpr_mu_unlock(&po->mu); + gpr_mu_unlock(&existing->mu); + pg_merge(exec_ctx, pg, po_group); + /* early exit: polling obj picked up a group during joining: we needed + to do a full merge */ + return; + } + pg_notify(exec_ctx, po, existing); + gpr_mu_unlock(&po->mu); + gpr_mu_unlock(&existing->mu); + } gpr_mu_lock(&po->mu); if (po->group != NULL) { gpr_mu_unlock(&pg->po.mu); polling_group *po_group = pg_ref(po->group); gpr_mu_unlock(&po->mu); pg_merge(exec_ctx, pg, po_group); - /* early exit: polling obj picked up a group before joining: we needed + /* early exit: polling obj picked up a group during joining: we needed to do a full merge */ return; } - /* pg, po locked */ - for (polling_obj *existing = pg->po.next /* skip pg - it's just a stub */; - existing != &pg->po; existing = existing->next) { - pg_notify(exec_ctx, po, existing); - } po->group = pg; po->next = &pg->po; po->prev = pg->po.prev; diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h index 6f3a51e7175..69e20098d70 100644 --- a/src/core/lib/iomgr/pollset.h +++ b/src/core/lib/iomgr/pollset.h @@ -57,7 +57,7 @@ void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu); * pollset's mutex must be held */ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure); -void grpc_pollset_destroy(grpc_pollset *pollset); +void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); /* Do some work on a pollset. May involve invoking asynchronous callbacks, or actually polling file