From 8fc1ca1e3f3fe3cfb6040405f764a150a95110ba Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 7 Apr 2017 13:01:48 -0700 Subject: [PATCH] Initial pollset_set implementation --- src/core/lib/iomgr/ev_epollex_linux.c | 298 +++++++++++++++++++++++--- 1 file changed, 267 insertions(+), 31 deletions(-) diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index e846263a4bf..6d79b82982d 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -73,13 +73,30 @@ static grpc_wakeup_fd global_wakeup_fd; /******************************************************************************* - * Fd Declarations + * Pollset-set sibling link */ -#define FD_FROM_PO(po) ((grpc_fd *)(po)) +typedef enum { + PSS_FD, + PSS_POLLSET, + PSS_POLLSET_SET, + PSS_OBJ_TYPE_COUNT +} pss_obj_type; -struct grpc_fd { +typedef struct pss_obj { gpr_mu mu; + struct pss_obj *pss_next; + struct pss_obj *pss_prev; + int pss_refs; + grpc_pollset_set *pss_master; +} pss_obj; + +/******************************************************************************* + * Fd Declarations + */ + +struct grpc_fd { + pss_obj po; int fd; /* refst format: bit 0 : 1=Active / 0=Orphaned @@ -137,19 +154,31 @@ struct grpc_pollset_worker { }; struct grpc_pollset { - gpr_mu mu; + pss_obj po; int epfd; int num_pollers; gpr_atm shutdown_atm; grpc_closure *shutdown_closure; grpc_wakeup_fd pollset_wakeup; grpc_pollset_worker *root_worker; + + grpc_pollset *pss_next; + grpc_pollset *pss_prev; + int pss_refs; + grpc_pollset_set *pss_master; }; /******************************************************************************* * Pollset-set Declarations */ -struct grpc_pollset_set {}; +struct grpc_pollset_set { + pss_obj po; + gpr_refcount refs; + grpc_pollset_set *master; + + /* roots are only used if master == self */ + pss_obj *roots[PSS_OBJ_TYPE_COUNT]; +}; /******************************************************************************* * Common helpers @@ -242,7 +271,7 @@ static void fd_global_shutdown(void) { while (fd_freelist != NULL) { grpc_fd *fd = fd_freelist; fd_freelist = fd_freelist->freelist_next; - gpr_mu_destroy(&fd->mu); + gpr_mu_destroy(&fd->po.mu); gpr_free(fd); } gpr_mu_destroy(&fd_freelist_mu); @@ -260,13 +289,13 @@ static grpc_fd *fd_create(int fd, const char *name) { if (new_fd == NULL) { new_fd = gpr_malloc(sizeof(grpc_fd)); - gpr_mu_init(&new_fd->mu); + gpr_mu_init(&new_fd->po.mu); } - /* Note: It is not really needed to get the new_fd->mu lock here. If this + /* Note: It is not really needed to get the new_fd->po.mu lock here. If this * is a newly created fd (or an fd we got from the freelist), no one else * would be holding a lock to it anyway. */ - gpr_mu_lock(&new_fd->mu); + gpr_mu_lock(&new_fd->po.mu); gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; @@ -285,7 +314,7 @@ static grpc_fd *fd_create(int fd, const char *name) { new_fd->freelist_next = NULL; new_fd->on_done_closure = NULL; - gpr_mu_unlock(&new_fd->mu); + gpr_mu_unlock(&new_fd->po.mu); char *fd_name; gpr_asprintf(&fd_name, "%s fd=%d", name, fd); @@ -299,11 +328,11 @@ static grpc_fd *fd_create(int fd, const char *name) { static int fd_wrapped_fd(grpc_fd *fd) { int ret_fd = -1; - gpr_mu_lock(&fd->mu); + gpr_mu_lock(&fd->po.mu); if (!fd->orphaned) { ret_fd = fd->fd; } - gpr_mu_unlock(&fd->mu); + gpr_mu_unlock(&fd->po.mu); return ret_fd; } @@ -314,7 +343,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, bool is_fd_closed = false; grpc_error *error = GRPC_ERROR_NONE; - gpr_mu_lock(&fd->mu); + gpr_mu_lock(&fd->po.mu); fd->on_done_closure = on_done; /* If release_fd is not NULL, we should be relinquishing control of the file @@ -338,7 +367,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); - gpr_mu_unlock(&fd->mu); + gpr_mu_unlock(&fd->po.mu); UNREF_BY(fd, 2, reason); /* Drop the reference */ GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); @@ -472,7 +501,7 @@ static void pollset_global_shutdown(void) { gpr_tls_destroy(&g_current_thread_worker); } -/* p->mu must be held before calling this function */ +/* p->po.mu must be held before calling this function */ static grpc_error *pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { if (specific_worker == NULL) { @@ -497,7 +526,7 @@ static grpc_error *kick_poller(void) { } static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { - gpr_mu_init(&pollset->mu); + gpr_mu_init(&pollset->po.mu); pollset->epfd = epoll_create1(EPOLL_CLOEXEC); if (pollset->epfd < 0) { GRPC_LOG_IF_ERROR("pollset_init", GRPC_OS_ERROR(errno, "epoll_create1")); @@ -523,7 +552,7 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { } } pollset->root_worker = NULL; - *mu = &pollset->mu; + *mu = &pollset->po.mu; } /* Convert a timespec to milliseconds: @@ -588,7 +617,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* pollset_shutdown is guaranteed to be called before pollset_destroy. */ static void pollset_destroy(grpc_pollset *pollset) { - gpr_mu_destroy(&pollset->mu); + gpr_mu_destroy(&pollset->po.mu); if (pollset->epfd >= 0) close(pollset->epfd); grpc_wakeup_fd_destroy(&pollset->pollset_wakeup); } @@ -669,7 +698,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, worker->initialized_cv = true; gpr_cv_init(&worker->cv); while (pollset->root_worker != worker) { - if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline)) return false; + if (gpr_cv_wait(&worker->cv, &pollset->po.mu, deadline)) return false; if (worker->kicked) return false; } } @@ -698,8 +727,8 @@ static void end_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, } } -/* pollset->mu lock must be held by the caller before calling this. - The function pollset_work() may temporarily release the lock (pollset->mu) +/* pollset->po.mu lock must be held by the caller before calling this. + The function pollset_work() may temporarily release the lock (pollset->po.mu) during the course of its execution but it will always re-acquire the lock and ensure that it is held by the time the function returns */ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -710,10 +739,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (begin_worker(pollset, &worker, worker_hdl, deadline)) { GPR_ASSERT(!pollset->shutdown_closure); pollset->num_pollers++; - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(&pollset->po.mu); error = pollset_poll(exec_ctx, pollset, now, deadline); grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->mu); + gpr_mu_lock(&pollset->po.mu); pollset->num_pollers--; if (pollset->num_pollers == 0 && pollset->shutdown_closure != NULL) { grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); @@ -758,45 +787,252 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, */ static grpc_pollset_set *pollset_set_create(void) { - grpc_pollset_set *pss = gpr_malloc(sizeof(*pss)); + grpc_pollset_set *pss = gpr_zalloc(sizeof(*pss)); + gpr_mu_init(&pss->po.mu); + pss->roots[PSS_POLLSET_SET] = &pss->po; + pss->po.pss_next = pss->po.pss_prev = &pss->po; + return pss; +} + +static void pss_destroy(grpc_pollset_set *pss) { + gpr_mu_destroy(&pss->po.mu); + gpr_free(pss); +} + +static grpc_pollset_set *pss_ref(grpc_pollset_set *pss) { + gpr_ref(&pss->refs); return pss; } +static void pss_unref(grpc_pollset_set *pss) { + if (gpr_unref(&pss->refs)) pss_destroy(pss); +} + static void pollset_set_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss) { - gpr_free(pss); + pss_unref(pss); +} + +static grpc_pollset_set *pss_ref_and_lock_master( + grpc_pollset_set *master_or_slave) { + pss_ref(master_or_slave); + gpr_mu_lock(&master_or_slave->po.mu); + while (master_or_slave != master_or_slave->master) { + grpc_pollset_set *master = pss_ref(master_or_slave->master); + gpr_mu_unlock(&master_or_slave->po.mu); + pss_unref(master_or_slave); + master_or_slave = master; + gpr_mu_lock(&master_or_slave->po.mu); + } + return master_or_slave; +} + +static void pss_broadcast_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *dst, + pss_obj *obj) { + grpc_fd *fd = (grpc_fd *)obj; + if (dst->roots[PSS_POLLSET] == NULL) return; + pss_obj *tgt = dst->roots[PSS_POLLSET]; + do { + pollset_add_fd(exec_ctx, (grpc_pollset *)tgt, fd); + tgt = tgt->pss_next; + } while (tgt != dst->roots[PSS_POLLSET]); +} + +static void pss_broadcast_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *dst, pss_obj *obj) { + grpc_pollset *pollset = (grpc_pollset *)obj; + if (dst->roots[PSS_FD] == NULL) return; + pss_obj *tgt = dst->roots[PSS_FD]; + do { + pollset_add_fd(exec_ctx, pollset, (grpc_fd *)tgt); + tgt = tgt->pss_next; + } while (tgt != dst->roots[PSS_FD]); +} + +static pss_obj *pss_splice(pss_obj *p, pss_obj *q) { + if (p == NULL) return q; + if (q == NULL) return p; + p->pss_next->pss_prev = q->pss_prev; + q->pss_prev->pss_next = p->pss_next; + p->pss_next = q; + q->pss_prev = p; + return p; +} + +static void (*const broadcast[PSS_OBJ_TYPE_COUNT])(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *dst, + pss_obj *obj) = { + pss_broadcast_fd, pss_broadcast_pollset, NULL}; + +static void pss_merge_broadcast_and_patch(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *a, + grpc_pollset_set *b, + pss_obj_type type) { + pss_obj *obj; + if (a->roots[type] != NULL) { + obj = a->roots[PSS_FD]; + do { + broadcast[type](exec_ctx, b, obj); + obj = obj->pss_next; + } while (obj != a->roots[PSS_FD]); + } + if (b->roots[type] != NULL) { + obj = b->roots[PSS_FD]; + do { + broadcast[type](exec_ctx, a, obj); + gpr_mu_lock(&obj->mu); + obj->pss_master = a; + gpr_mu_unlock(&obj->mu); + obj = obj->pss_next; + } while (obj != b->roots[PSS_FD]); + } + a->roots[type] = pss_splice(a->roots[type], b->roots[type]); +} + +static void pss_merge(grpc_exec_ctx *exec_ctx, grpc_pollset_set *a, + grpc_pollset_set *b) { + pss_ref(a); + pss_ref(b); + bool changed; + for (;;) { + if (a == b) { + pss_unref(a); + pss_unref(b); + return; + } else if (a < b) { + gpr_mu_lock(&a->po.mu); + gpr_mu_lock(&b->po.mu); + } else { + gpr_mu_lock(&b->po.mu); + gpr_mu_lock(&a->po.mu); + } + changed = false; + if (a != a->master) { + grpc_pollset_set *master = pss_ref(a->master); + gpr_mu_unlock(&a->po.mu); + gpr_mu_unlock(&b->po.mu); + pss_unref(a); + a = master; + changed = true; + } else if (b != b->master) { + grpc_pollset_set *master = pss_ref(b->master); + gpr_mu_unlock(&a->po.mu); + gpr_mu_unlock(&b->po.mu); + pss_unref(b); + b = master; + changed = true; + } else { + /* a, b locked and are at their respective masters */ + pss_merge_broadcast_and_patch(exec_ctx, a, b, PSS_FD); + pss_merge_broadcast_and_patch(exec_ctx, a, b, PSS_POLLSET); + b->po.pss_master = a; + gpr_mu_unlock(&a->po.mu); + gpr_mu_unlock(&b->po.mu); + } + } +} + +static void pss_add_obj(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, + pss_obj *obj, pss_obj_type type) { + pss = pss_ref_and_lock_master(pss); + gpr_mu_lock(&obj->mu); + if (obj->pss_master == pss) { + /* obj is already a member -- just bump refcount */ + obj->pss_refs++; + gpr_mu_unlock(&obj->mu); + gpr_mu_unlock(&pss->po.mu); + pss_unref(pss); + return; + } else if (obj->pss_master != NULL) { + grpc_pollset_set *other_pss = pss_ref(obj->pss_master); + obj->pss_refs++; + gpr_mu_unlock(&obj->mu); + gpr_mu_unlock(&pss->po.mu); + pss_merge(exec_ctx, pss, other_pss); + pss_unref(other_pss); + pss_unref(pss); + } else { + GPR_ASSERT(obj->pss_refs == 0); + obj->pss_refs = 1; + obj->pss_master = pss; + if (pss->roots[type] == NULL) { + pss->roots[type] = obj; + obj->pss_next = obj->pss_prev = obj; + } else { + obj->pss_next = pss->roots[type]; + obj->pss_prev = obj->pss_next->pss_prev; + obj->pss_prev->pss_next = obj; + obj->pss_next->pss_prev = obj; + } + gpr_mu_unlock(&obj->mu); + switch (type) { + case PSS_FD: + pss_broadcast_fd(exec_ctx, pss, obj); + break; + case PSS_POLLSET: + pss_broadcast_pollset(exec_ctx, pss, obj); + break; + case PSS_POLLSET_SET: + case PSS_OBJ_TYPE_COUNT: + GPR_UNREACHABLE_CODE(break); + } + gpr_mu_unlock(&pss->po.mu); + pss_unref(pss); + } +} + +static void pss_del_obj(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, + pss_obj *obj, pss_obj_type type) { + pss = pss_ref_and_lock_master(pss); + gpr_mu_lock(&obj->mu); + obj->pss_refs--; + if (obj->pss_refs == 0) { + obj->pss_master = NULL; + if (obj == pss->roots[type]) { + pss->roots[type] = obj->pss_next; + } + if (obj->pss_next == obj) { + pss->roots[type] = NULL; + } else { + obj->pss_next->pss_prev = obj->pss_prev; + obj->pss_prev->pss_next = obj->pss_next; + } + } + gpr_mu_unlock(&obj->mu); + gpr_mu_unlock(&pss->po.mu); + pss_unref(pss); } static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, grpc_fd *fd) { - abort(); + pss_add_obj(exec_ctx, pss, &fd->po, PSS_FD); } static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, grpc_fd *fd) { - abort(); + pss_del_obj(exec_ctx, pss, &fd->po, PSS_FD); } static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, grpc_pollset *ps) { - abort(); + pss_add_obj(exec_ctx, pss, &ps->po, PSS_POLLSET); } static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, grpc_pollset *ps) { - abort(); + pss_del_obj(exec_ctx, pss, &ps->po, PSS_POLLSET); } static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, grpc_pollset_set *bag, grpc_pollset_set *item) { - abort(); + pss_add_obj(exec_ctx, bag, &item->po, PSS_POLLSET_SET); } static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, grpc_pollset_set *bag, grpc_pollset_set *item) { - abort(); + pss_del_obj(exec_ctx, bag, &item->po, PSS_POLLSET_SET); } /*******************************************************************************