diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index de780ec4172..958f4d50280 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -77,34 +77,48 @@ static grpc_wakeup_fd global_wakeup_fd; */ typedef enum { - PSS_FD, - PSS_POLLSET, - PSS_POLLSET_SET, - PSS_OBJ_TYPE_COUNT -} pss_obj_type; + PO_FD, + PO_POLLSET, + PO_POLLSET_SET, + PO_POLLING_GROUP, + PO_COUNT +} polling_obj_type; -typedef struct pss_obj { +typedef struct polling_obj polling_obj; +typedef struct polling_group polling_group; + +struct polling_obj { gpr_mu mu; - struct pss_obj *pss_next; - struct pss_obj *pss_prev; - int pss_refs; - grpc_pollset_set *pss_master; -} pss_obj; - -static void pss_obj_init(pss_obj *obj) { - gpr_mu_init(&obj->mu); - obj->pss_refs = 0; - obj->pss_next = NULL; - obj->pss_prev = NULL; - obj->pss_master = NULL; -} + polling_obj_type type; + polling_group *group; + struct polling_obj *next; + struct polling_obj *prev; +}; + +struct polling_group { + polling_obj po; + gpr_refcount refs; +}; + +static void po_init(polling_obj *po, polling_obj_type type); +static void po_destroy(polling_obj *po); +static void po_join(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b); + +static void pg_create(grpc_exec_ctx *exec_ctx, polling_obj **initial_po, + size_t initial_po_count); +static polling_group *pg_ref(polling_group *pg); +static void pg_unref(polling_group *pg); +static void pg_merge(grpc_exec_ctx *exec_ctx, polling_group *a, + polling_group *b); +static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg, + polling_obj *po); /******************************************************************************* * Fd Declarations */ struct grpc_fd { - pss_obj po; + polling_obj po; int fd; /* refst format: bit 0 : 1=Active / 0=Orphaned @@ -162,7 +176,7 @@ struct grpc_pollset_worker { }; struct grpc_pollset { - pss_obj po; + polling_obj po; int epfd; int num_pollers; bool kicked_without_poller; @@ -181,11 +195,7 @@ struct grpc_pollset { * Pollset-set Declarations */ struct grpc_pollset_set { - pss_obj po; - gpr_refcount refs; - - /* roots are only used if master == self */ - pss_obj *roots[PSS_OBJ_TYPE_COUNT]; + polling_obj po; }; /******************************************************************************* @@ -257,10 +267,11 @@ static void unref_by(grpc_fd *fd, int n) { old = gpr_atm_full_fetch_add(&fd->refst, -n); if (old == n) { /* Add the fd to the freelist */ + grpc_iomgr_unregister_object(&fd->iomgr_object); + po_destroy(&fd->po); gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; fd_freelist = fd; - grpc_iomgr_unregister_object(&fd->iomgr_object); grpc_lfev_destroy(&fd->read_closure); grpc_lfev_destroy(&fd->write_closure); @@ -279,7 +290,6 @@ static void fd_global_shutdown(void) { while (fd_freelist != NULL) { grpc_fd *fd = fd_freelist; fd_freelist = fd_freelist->freelist_next; - gpr_mu_destroy(&fd->po.mu); gpr_free(fd); } gpr_mu_destroy(&fd_freelist_mu); @@ -299,7 +309,7 @@ static grpc_fd *fd_create(int fd, const char *name) { new_fd = gpr_malloc(sizeof(grpc_fd)); } - pss_obj_init(&new_fd->po); + po_init(&new_fd->po, PO_FD); gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; @@ -536,7 +546,7 @@ static grpc_error *kick_poller(void) { } static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { - pss_obj_init(&pollset->po); + po_init(&pollset->po, PO_POLLSET); pollset->kicked_without_poller = false; pollset->epfd = epoll_create1(EPOLL_CLOEXEC); if (pollset->epfd < 0) { @@ -610,8 +620,7 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - if (pollset->shutdown_closure != NULL && pollset->num_pollers == 0 && - pollset->po.pss_master == NULL) { + if (pollset->shutdown_closure != NULL && pollset->num_pollers == 0) { grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); } } @@ -643,7 +652,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* pollset_shutdown is guaranteed to be called before pollset_destroy. */ static void pollset_destroy(grpc_pollset *pollset) { - gpr_mu_destroy(&pollset->po.mu); + po_destroy(&pollset->po); if (pollset->epfd >= 0) close(pollset->epfd); grpc_wakeup_fd_destroy(&pollset->pollset_wakeup); } @@ -821,268 +830,257 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static grpc_pollset_set *pollset_set_create(void) { grpc_pollset_set *pss = gpr_zalloc(sizeof(*pss)); - pss_obj_init(&pss->po); - gpr_ref_init(&pss->refs, 1); - pss->roots[PSS_POLLSET_SET] = &pss->po; - pss->po.pss_master = pss; - pss->po.pss_next = pss->po.pss_prev = &pss->po; + po_init(&pss->po, PO_POLLSET_SET); return pss; } -static void pss_unref(grpc_pollset_set *pss); - -static void pss_destroy(grpc_pollset_set *pss) { - gpr_mu_destroy(&pss->po.mu); - GPR_ASSERT(pss->roots[PSS_FD] == NULL); - GPR_ASSERT(pss->roots[PSS_POLLSET] == NULL); - GPR_ASSERT(pss->roots[PSS_POLLSET_SET] == &pss->po); - for (pss_obj *child = pss->roots[PSS_POLLSET_SET]; child != &pss->po; - child = child->pss_next) { - pss_unref((grpc_pollset_set *)child); - } - gpr_free(pss); +static void pollset_set_destroy(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pss) { + po_destroy(&pss->po); } -static grpc_pollset_set *pss_ref(grpc_pollset_set *pss) { - gpr_ref(&pss->refs); - return pss; +static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, + grpc_fd *fd) { + po_join(exec_ctx, &pss->po, &fd->po); } -static void pss_unref(grpc_pollset_set *pss) { - if (gpr_unref(&pss->refs)) pss_destroy(pss); -} +static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, + grpc_fd *fd) {} -static void pollset_set_destroy(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pss) { - pss_unref(pss); +static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pss, grpc_pollset *ps) { + po_join(exec_ctx, &pss->po, &ps->po); } -static grpc_pollset_set *pss_ref_and_lock_master( - grpc_pollset_set *master_or_slave) { - pss_ref(master_or_slave); - gpr_mu_lock(&master_or_slave->po.mu); - while (master_or_slave != master_or_slave->po.pss_master) { - grpc_pollset_set *master = pss_ref(master_or_slave->po.pss_master); - gpr_mu_unlock(&master_or_slave->po.mu); - pss_unref(master_or_slave); - master_or_slave = master; - gpr_mu_lock(&master_or_slave->po.mu); - } - return master_or_slave; -} +static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pss, grpc_pollset *ps) {} -static void pss_broadcast_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *dst, - pss_obj *obj) { - grpc_fd *fd = (grpc_fd *)obj; - if (dst->roots[PSS_POLLSET] == NULL) return; - pss_obj *tgt = dst->roots[PSS_POLLSET]; - do { - pollset_add_fd(exec_ctx, (grpc_pollset *)tgt, fd); - tgt = tgt->pss_next; - } while (tgt != dst->roots[PSS_POLLSET]); +static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) { + po_join(exec_ctx, &bag->po, &item->po); } -static void pss_broadcast_pollset(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *dst, pss_obj *obj) { - grpc_pollset *pollset = (grpc_pollset *)obj; - if (dst->roots[PSS_FD] == NULL) return; - pss_obj *tgt = dst->roots[PSS_FD]; - do { - pollset_add_fd(exec_ctx, pollset, (grpc_fd *)tgt); - tgt = tgt->pss_next; - } while (tgt != dst->roots[PSS_FD]); -} +static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) {} -static pss_obj *pss_splice(pss_obj *p, pss_obj *q) { - if (p == NULL) return q; - if (q == NULL) return p; - p->pss_next->pss_prev = q->pss_prev; - q->pss_prev->pss_next = p->pss_next; - p->pss_next = q; - q->pss_prev = p; - return p; +static void po_init(polling_obj *po, polling_obj_type type) { + gpr_mu_init(&po->mu); + po->type = type; + po->group = NULL; + po->next = po; + po->prev = po; } -static void (*const broadcast[PSS_OBJ_TYPE_COUNT])(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *dst, - pss_obj *obj) = { - pss_broadcast_fd, pss_broadcast_pollset, NULL}; - -static void pss_merge_broadcast_and_patch(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *a, - grpc_pollset_set *b, - pss_obj_type type) { - pss_obj *obj; - if (a->roots[type] != NULL) { - obj = a->roots[type]; - do { - broadcast[type](exec_ctx, b, obj); - obj = obj->pss_next; - } while (obj != a->roots[type]); - } - if (b->roots[type] != NULL) { - obj = b->roots[type]; - do { - broadcast[type](exec_ctx, a, obj); - gpr_mu_lock(&obj->mu); - obj->pss_master = a; - gpr_mu_unlock(&obj->mu); - obj = obj->pss_next; - } while (obj != b->roots[type]); +static polling_group *pg_lock_latest(polling_group *pg) { + /* assumes pg unlocked; consumes ref, returns ref */ + gpr_mu_lock(&pg->po.mu); + while (pg->po.group != NULL) { + polling_group *new_pg = pg_ref(pg->po.group); + gpr_mu_unlock(&pg->po.mu); + pg_unref(pg); + pg = new_pg; + gpr_mu_lock(&pg->po.mu); } - a->roots[type] = pss_splice(a->roots[type], b->roots[type]); + return pg; } -static void pss_merge(grpc_exec_ctx *exec_ctx, grpc_pollset_set *a, - grpc_pollset_set *b) { - pss_ref(a); - pss_ref(b); - for (;;) { - if (a == b) { - pss_unref(a); - pss_unref(b); - return; - } else if (a < b) { - gpr_mu_lock(&a->po.mu); - gpr_mu_lock(&b->po.mu); - } else { - gpr_mu_lock(&b->po.mu); - gpr_mu_lock(&a->po.mu); - } - if (a != a->po.pss_master) { - grpc_pollset_set *master = pss_ref(a->po.pss_master); - gpr_mu_unlock(&a->po.mu); - gpr_mu_unlock(&b->po.mu); - pss_unref(a); - a = master; - } else if (b != b->po.pss_master) { - grpc_pollset_set *master = pss_ref(b->po.pss_master); - gpr_mu_unlock(&a->po.mu); - gpr_mu_unlock(&b->po.mu); - pss_unref(b); - b = master; - } else { - /* a, b locked and are at their respective masters */ - pss_merge_broadcast_and_patch(exec_ctx, a, b, PSS_FD); - pss_merge_broadcast_and_patch(exec_ctx, a, b, PSS_POLLSET); - b->po.pss_master = a; - a->roots[PSS_POLLSET_SET] = - pss_splice(a->roots[PSS_POLLSET_SET], b->roots[PSS_POLLSET_SET]); - gpr_mu_unlock(&a->po.mu); - gpr_mu_unlock(&b->po.mu); - pss_unref(a); - /* a now owns a ref to b */ - return; - } +static void po_destroy(polling_obj *po) { + if (po->group != NULL) { + polling_group *pg = pg_lock_latest(po->group); + po->prev->next = po->next; + po->next->prev = po->prev; + pg_unref(pg); } + gpr_mu_destroy(&po->mu); } -static void pss_add_obj(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, - pss_obj *obj, pss_obj_type type) { - pss = pss_ref_and_lock_master(pss); - gpr_mu_lock(&obj->mu); - if (obj->pss_master == pss) { - /* obj is already a member -- just bump refcount */ - obj->pss_refs++; - gpr_mu_unlock(&obj->mu); - gpr_mu_unlock(&pss->po.mu); - pss_unref(pss); - return; - } else if (obj->pss_master != NULL) { - grpc_pollset_set *other_pss = pss_ref(obj->pss_master); - obj->pss_refs++; - gpr_mu_unlock(&obj->mu); - gpr_mu_unlock(&pss->po.mu); - pss_merge(exec_ctx, pss, other_pss); - pss_unref(other_pss); - pss_unref(pss); - } else { - GPR_ASSERT(obj->pss_refs == 0); - obj->pss_refs = 1; - obj->pss_master = pss; - if (pss->roots[type] == NULL) { - pss->roots[type] = obj; - obj->pss_next = obj->pss_prev = obj; - } else { - obj->pss_next = pss->roots[type]; - obj->pss_prev = obj->pss_next->pss_prev; - obj->pss_prev->pss_next = obj; - obj->pss_next->pss_prev = obj; - } - switch (type) { - case PSS_FD: - REF_BY((grpc_fd *)obj, 2, "pollset_set"); - gpr_mu_unlock(&obj->mu); - pss_broadcast_fd(exec_ctx, pss, obj); - break; - case PSS_POLLSET: - gpr_mu_unlock(&obj->mu); - pss_broadcast_pollset(exec_ctx, pss, obj); - break; - case PSS_POLLSET_SET: - case PSS_OBJ_TYPE_COUNT: - GPR_UNREACHABLE_CODE(break); - } - gpr_mu_unlock(&pss->po.mu); - pss_unref(pss); +static polling_group *pg_ref(polling_group *pg) { + gpr_ref(&pg->refs); + return pg; +} + +static void pg_unref(polling_group *pg) { + if (gpr_unref(&pg->refs)) { + po_destroy(&pg->po); + gpr_free(pg); } } -static void pss_del_obj(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, - pss_obj *obj, pss_obj_type type) { - bool unref = false; - pss = pss_ref_and_lock_master(pss); - gpr_mu_lock(&obj->mu); - obj->pss_refs--; - if (obj->pss_refs == 0) { - obj->pss_master = NULL; - if (obj == pss->roots[type]) { - pss->roots[type] = obj->pss_next; - } - if (obj->pss_next == obj) { - pss->roots[type] = NULL; +static void po_join(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b) { + if (a == b) return; + if (a > b) GPR_SWAP(polling_obj *, a, b); + + gpr_mu_lock(&a->mu); + gpr_mu_lock(&b->mu); + + if (a->group == NULL) { + if (b->group == NULL) { + polling_obj *initial_po[] = {a, b}; + pg_create(exec_ctx, initial_po, GPR_ARRAY_SIZE(initial_po)); + gpr_mu_unlock(&a->mu); + gpr_mu_unlock(&b->mu); } else { - obj->pss_next->pss_prev = obj->pss_prev; - obj->pss_prev->pss_next = obj->pss_next; + polling_group *b_group = pg_ref(b->group); + gpr_mu_unlock(&b->mu); + gpr_mu_unlock(&a->mu); + pg_join(exec_ctx, b_group, a); } - unref = true; + } else if (b->group == NULL) { + polling_group *a_group = pg_ref(a->group); + gpr_mu_unlock(&a->mu); + gpr_mu_unlock(&b->mu); + pg_join(exec_ctx, a_group, b); + } else if (a->group == b->group) { + /* nothing to do */ + gpr_mu_unlock(&a->mu); + gpr_mu_unlock(&b->mu); + } else { + polling_group *a_group = pg_ref(a->group); + polling_group *b_group = pg_ref(b->group); + gpr_mu_unlock(&a->mu); + gpr_mu_unlock(&b->mu); + pg_merge(exec_ctx, a_group, b_group); } - gpr_mu_unlock(&obj->mu); - gpr_mu_unlock(&pss->po.mu); - pss_unref(pss); - if (unref && type == PSS_FD) UNREF_BY((grpc_fd *)obj, 2, "pollset_set"); } -static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, - grpc_fd *fd) { - pss_add_obj(exec_ctx, pss, &fd->po, PSS_FD); +#define POTYPES(a, b) (((a)*PO_COUNT) + (b)) + +static void pg_notify(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b) { + GPR_ASSERT(a != b); + switch (POTYPES(a->type, b->type)) { + case POTYPES(PO_FD, PO_POLLSET): + pollset_add_fd(exec_ctx, (grpc_pollset *)b, (grpc_fd *)a); + break; + case POTYPES(PO_POLLSET, PO_FD): + pollset_add_fd(exec_ctx, (grpc_pollset *)a, (grpc_fd *)b); + break; + } } -static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, - grpc_fd *fd) { - pss_del_obj(exec_ctx, pss, &fd->po, PSS_FD); +static void pg_broadcast(grpc_exec_ctx *exec_ctx, polling_group *from, + polling_group *to) { + for (polling_obj *a = from->po.next; a != &from->po; a = a->next) { + for (polling_obj *b = to->po.next; b != &to->po; b = b->next) { + pg_notify(exec_ctx, a, b); + } + } } -static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pss, grpc_pollset *ps) { - pss_add_obj(exec_ctx, pss, &ps->po, PSS_POLLSET); +static void pg_create(grpc_exec_ctx *exec_ctx, polling_obj **initial_po, + size_t initial_po_count) { + /* assumes all polling objects in initial_po are locked */ + polling_group *pg = gpr_malloc(sizeof(*pg)); + po_init(&pg->po, PO_POLLING_GROUP); + gpr_ref_init(&pg->refs, initial_po_count); + GPR_ASSERT(initial_po[0]->group == NULL); + initial_po[0]->next = initial_po[0]->prev = initial_po[0]; + initial_po[0]->group = pg; + for (size_t i = 0; i < initial_po_count; i++) { + GPR_ASSERT(initial_po[i]->group == NULL); + initial_po[i]->group = pg; + } + for (size_t i = 1; i < initial_po_count; i++) { + initial_po[i]->prev = initial_po[i - 1]; + } + for (size_t i = 0; i < initial_po_count - 1; i++) { + initial_po[i]->next = initial_po[i + 1]; + } + initial_po[0]->prev = &pg->po; + initial_po[initial_po_count - 1]->next = &pg->po; + pg->po.next = initial_po[0]; + pg->po.prev = initial_po[initial_po_count - 1]; + for (size_t i = 1; i < initial_po_count; i++) { + for (size_t j = 0; j < i; j++) { + pg_notify(exec_ctx, initial_po[i], initial_po[j]); + } + } } -static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pss, grpc_pollset *ps) { - pss_del_obj(exec_ctx, pss, &ps->po, PSS_POLLSET); +static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg, + polling_obj *po) { + /* assumes neither pg nor po are locked; consumes one ref to pg */ + pg = pg_lock_latest(pg); + /* pg locked */ + gpr_mu_lock(&po->mu); + if (po->group != NULL) { + gpr_mu_unlock(&pg->po.mu); + polling_group *po_group = pg_ref(po->group); + gpr_mu_unlock(&po->mu); + pg_merge(exec_ctx, pg, po_group); + /* early exit: polling obj picked up a group before joining: we now need + to do a full merge */ + return; + } + /* pg, po locked */ + for (polling_obj *existing = pg->po.next /* skip pg - it's just a stub */; + existing != &pg->po; existing = existing->next) { + pg_notify(exec_ctx, po, existing); + } + po->group = pg; + po->next = &pg->po; + po->prev = pg->po.prev; + po->prev->next = po->next->prev = po; + gpr_mu_unlock(&pg->po.mu); + gpr_mu_unlock(&po->mu); } -static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) { - pss_merge(exec_ctx, bag, item); +static void pg_merge(grpc_exec_ctx *exec_ctx, polling_group *a, + polling_group *b) { + for (;;) { + if (a == b) return; + if (a > b) GPR_SWAP(polling_group *, a, b); + gpr_mu_lock(&a->po.mu); + gpr_mu_lock(&b->po.mu); + if (a->po.group != NULL) { + polling_group *m2 = pg_ref(a->po.group); + gpr_mu_unlock(&a->po.mu); + gpr_mu_unlock(&b->po.mu); + pg_unref(a); + a = m2; + } else if (b->po.group != NULL) { + polling_group *m2 = pg_ref(b->po.group); + gpr_mu_unlock(&a->po.mu); + gpr_mu_unlock(&b->po.mu); + pg_unref(b); + b = m2; + } else { + break; + } + } + polling_group **unref = NULL; + size_t unref_count = 0; + size_t unref_cap = 0; + b->po.group = a; + pg_broadcast(exec_ctx, a, b); + pg_broadcast(exec_ctx, b, a); + while (b->po.next != &b->po) { + polling_obj *po = b->po.next; + gpr_mu_lock(&po->mu); + if (unref_count == unref_cap) { + unref_cap = GPR_MAX(8, 3 * unref_cap / 2); + unref = gpr_realloc(unref, unref_cap * sizeof(*unref)); + } + unref[unref_count++] = po->group; + po->group = pg_ref(a); + // unlink from b + po->prev->next = po->next; + po->next->prev = po->prev; + // link to a + po->next = &a->po; + po->prev = a->po.prev; + po->next->prev = po->prev->next = po; + gpr_mu_unlock(&po->mu); + } + gpr_mu_unlock(&a->po.mu); + gpr_mu_unlock(&b->po.mu); + for (size_t i = 0; i < unref_count; i++) { + pg_unref(unref[i]); + } + gpr_free(unref); } -static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) {} - /******************************************************************************* * Event engine binding */