attempt #2 at pollset_set

pull/10507/head
Craig Tiller 8 years ago
parent 4848028545
commit f18286bda6
  1. 508
      src/core/lib/iomgr/ev_epollex_linux.c

@ -77,34 +77,48 @@ static grpc_wakeup_fd global_wakeup_fd;
*/
typedef enum {
PSS_FD,
PSS_POLLSET,
PSS_POLLSET_SET,
PSS_OBJ_TYPE_COUNT
} pss_obj_type;
PO_FD,
PO_POLLSET,
PO_POLLSET_SET,
PO_POLLING_GROUP,
PO_COUNT
} polling_obj_type;
typedef struct pss_obj {
typedef struct polling_obj polling_obj;
typedef struct polling_group polling_group;
struct polling_obj {
gpr_mu mu;
struct pss_obj *pss_next;
struct pss_obj *pss_prev;
int pss_refs;
grpc_pollset_set *pss_master;
} pss_obj;
static void pss_obj_init(pss_obj *obj) {
gpr_mu_init(&obj->mu);
obj->pss_refs = 0;
obj->pss_next = NULL;
obj->pss_prev = NULL;
obj->pss_master = NULL;
}
polling_obj_type type;
polling_group *group;
struct polling_obj *next;
struct polling_obj *prev;
};
struct polling_group {
polling_obj po;
gpr_refcount refs;
};
static void po_init(polling_obj *po, polling_obj_type type);
static void po_destroy(polling_obj *po);
static void po_join(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b);
static void pg_create(grpc_exec_ctx *exec_ctx, polling_obj **initial_po,
size_t initial_po_count);
static polling_group *pg_ref(polling_group *pg);
static void pg_unref(polling_group *pg);
static void pg_merge(grpc_exec_ctx *exec_ctx, polling_group *a,
polling_group *b);
static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg,
polling_obj *po);
/*******************************************************************************
* Fd Declarations
*/
struct grpc_fd {
pss_obj po;
polling_obj po;
int fd;
/* refst format:
bit 0 : 1=Active / 0=Orphaned
@ -162,7 +176,7 @@ struct grpc_pollset_worker {
};
struct grpc_pollset {
pss_obj po;
polling_obj po;
int epfd;
int num_pollers;
bool kicked_without_poller;
@ -181,11 +195,7 @@ struct grpc_pollset {
* Pollset-set Declarations
*/
struct grpc_pollset_set {
pss_obj po;
gpr_refcount refs;
/* roots are only used if master == self */
pss_obj *roots[PSS_OBJ_TYPE_COUNT];
polling_obj po;
};
/*******************************************************************************
@ -257,10 +267,11 @@ static void unref_by(grpc_fd *fd, int n) {
old = gpr_atm_full_fetch_add(&fd->refst, -n);
if (old == n) {
/* Add the fd to the freelist */
grpc_iomgr_unregister_object(&fd->iomgr_object);
po_destroy(&fd->po);
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
fd_freelist = fd;
grpc_iomgr_unregister_object(&fd->iomgr_object);
grpc_lfev_destroy(&fd->read_closure);
grpc_lfev_destroy(&fd->write_closure);
@ -279,7 +290,6 @@ static void fd_global_shutdown(void) {
while (fd_freelist != NULL) {
grpc_fd *fd = fd_freelist;
fd_freelist = fd_freelist->freelist_next;
gpr_mu_destroy(&fd->po.mu);
gpr_free(fd);
}
gpr_mu_destroy(&fd_freelist_mu);
@ -299,7 +309,7 @@ static grpc_fd *fd_create(int fd, const char *name) {
new_fd = gpr_malloc(sizeof(grpc_fd));
}
pss_obj_init(&new_fd->po);
po_init(&new_fd->po, PO_FD);
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
@ -536,7 +546,7 @@ static grpc_error *kick_poller(void) {
}
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
pss_obj_init(&pollset->po);
po_init(&pollset->po, PO_POLLSET);
pollset->kicked_without_poller = false;
pollset->epfd = epoll_create1(EPOLL_CLOEXEC);
if (pollset->epfd < 0) {
@ -610,8 +620,7 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset) {
if (pollset->shutdown_closure != NULL && pollset->num_pollers == 0 &&
pollset->po.pss_master == NULL) {
if (pollset->shutdown_closure != NULL && pollset->num_pollers == 0) {
grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
}
}
@ -643,7 +652,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
/* pollset_shutdown is guaranteed to be called before pollset_destroy. */
static void pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->po.mu);
po_destroy(&pollset->po);
if (pollset->epfd >= 0) close(pollset->epfd);
grpc_wakeup_fd_destroy(&pollset->pollset_wakeup);
}
@ -821,268 +830,257 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
static grpc_pollset_set *pollset_set_create(void) {
grpc_pollset_set *pss = gpr_zalloc(sizeof(*pss));
pss_obj_init(&pss->po);
gpr_ref_init(&pss->refs, 1);
pss->roots[PSS_POLLSET_SET] = &pss->po;
pss->po.pss_master = pss;
pss->po.pss_next = pss->po.pss_prev = &pss->po;
po_init(&pss->po, PO_POLLSET_SET);
return pss;
}
static void pss_unref(grpc_pollset_set *pss);
static void pss_destroy(grpc_pollset_set *pss) {
gpr_mu_destroy(&pss->po.mu);
GPR_ASSERT(pss->roots[PSS_FD] == NULL);
GPR_ASSERT(pss->roots[PSS_POLLSET] == NULL);
GPR_ASSERT(pss->roots[PSS_POLLSET_SET] == &pss->po);
for (pss_obj *child = pss->roots[PSS_POLLSET_SET]; child != &pss->po;
child = child->pss_next) {
pss_unref((grpc_pollset_set *)child);
}
gpr_free(pss);
static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pss) {
po_destroy(&pss->po);
}
static grpc_pollset_set *pss_ref(grpc_pollset_set *pss) {
gpr_ref(&pss->refs);
return pss;
static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
grpc_fd *fd) {
po_join(exec_ctx, &pss->po, &fd->po);
}
static void pss_unref(grpc_pollset_set *pss) {
if (gpr_unref(&pss->refs)) pss_destroy(pss);
}
static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
grpc_fd *fd) {}
static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pss) {
pss_unref(pss);
static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pss, grpc_pollset *ps) {
po_join(exec_ctx, &pss->po, &ps->po);
}
static grpc_pollset_set *pss_ref_and_lock_master(
grpc_pollset_set *master_or_slave) {
pss_ref(master_or_slave);
gpr_mu_lock(&master_or_slave->po.mu);
while (master_or_slave != master_or_slave->po.pss_master) {
grpc_pollset_set *master = pss_ref(master_or_slave->po.pss_master);
gpr_mu_unlock(&master_or_slave->po.mu);
pss_unref(master_or_slave);
master_or_slave = master;
gpr_mu_lock(&master_or_slave->po.mu);
}
return master_or_slave;
}
static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pss, grpc_pollset *ps) {}
static void pss_broadcast_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *dst,
pss_obj *obj) {
grpc_fd *fd = (grpc_fd *)obj;
if (dst->roots[PSS_POLLSET] == NULL) return;
pss_obj *tgt = dst->roots[PSS_POLLSET];
do {
pollset_add_fd(exec_ctx, (grpc_pollset *)tgt, fd);
tgt = tgt->pss_next;
} while (tgt != dst->roots[PSS_POLLSET]);
static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *bag,
grpc_pollset_set *item) {
po_join(exec_ctx, &bag->po, &item->po);
}
static void pss_broadcast_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *dst, pss_obj *obj) {
grpc_pollset *pollset = (grpc_pollset *)obj;
if (dst->roots[PSS_FD] == NULL) return;
pss_obj *tgt = dst->roots[PSS_FD];
do {
pollset_add_fd(exec_ctx, pollset, (grpc_fd *)tgt);
tgt = tgt->pss_next;
} while (tgt != dst->roots[PSS_FD]);
}
static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *bag,
grpc_pollset_set *item) {}
static pss_obj *pss_splice(pss_obj *p, pss_obj *q) {
if (p == NULL) return q;
if (q == NULL) return p;
p->pss_next->pss_prev = q->pss_prev;
q->pss_prev->pss_next = p->pss_next;
p->pss_next = q;
q->pss_prev = p;
return p;
static void po_init(polling_obj *po, polling_obj_type type) {
gpr_mu_init(&po->mu);
po->type = type;
po->group = NULL;
po->next = po;
po->prev = po;
}
static void (*const broadcast[PSS_OBJ_TYPE_COUNT])(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *dst,
pss_obj *obj) = {
pss_broadcast_fd, pss_broadcast_pollset, NULL};
static void pss_merge_broadcast_and_patch(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *a,
grpc_pollset_set *b,
pss_obj_type type) {
pss_obj *obj;
if (a->roots[type] != NULL) {
obj = a->roots[type];
do {
broadcast[type](exec_ctx, b, obj);
obj = obj->pss_next;
} while (obj != a->roots[type]);
}
if (b->roots[type] != NULL) {
obj = b->roots[type];
do {
broadcast[type](exec_ctx, a, obj);
gpr_mu_lock(&obj->mu);
obj->pss_master = a;
gpr_mu_unlock(&obj->mu);
obj = obj->pss_next;
} while (obj != b->roots[type]);
static polling_group *pg_lock_latest(polling_group *pg) {
/* assumes pg unlocked; consumes ref, returns ref */
gpr_mu_lock(&pg->po.mu);
while (pg->po.group != NULL) {
polling_group *new_pg = pg_ref(pg->po.group);
gpr_mu_unlock(&pg->po.mu);
pg_unref(pg);
pg = new_pg;
gpr_mu_lock(&pg->po.mu);
}
a->roots[type] = pss_splice(a->roots[type], b->roots[type]);
return pg;
}
static void pss_merge(grpc_exec_ctx *exec_ctx, grpc_pollset_set *a,
grpc_pollset_set *b) {
pss_ref(a);
pss_ref(b);
for (;;) {
if (a == b) {
pss_unref(a);
pss_unref(b);
return;
} else if (a < b) {
gpr_mu_lock(&a->po.mu);
gpr_mu_lock(&b->po.mu);
} else {
gpr_mu_lock(&b->po.mu);
gpr_mu_lock(&a->po.mu);
}
if (a != a->po.pss_master) {
grpc_pollset_set *master = pss_ref(a->po.pss_master);
gpr_mu_unlock(&a->po.mu);
gpr_mu_unlock(&b->po.mu);
pss_unref(a);
a = master;
} else if (b != b->po.pss_master) {
grpc_pollset_set *master = pss_ref(b->po.pss_master);
gpr_mu_unlock(&a->po.mu);
gpr_mu_unlock(&b->po.mu);
pss_unref(b);
b = master;
} else {
/* a, b locked and are at their respective masters */
pss_merge_broadcast_and_patch(exec_ctx, a, b, PSS_FD);
pss_merge_broadcast_and_patch(exec_ctx, a, b, PSS_POLLSET);
b->po.pss_master = a;
a->roots[PSS_POLLSET_SET] =
pss_splice(a->roots[PSS_POLLSET_SET], b->roots[PSS_POLLSET_SET]);
gpr_mu_unlock(&a->po.mu);
gpr_mu_unlock(&b->po.mu);
pss_unref(a);
/* a now owns a ref to b */
return;
}
static void po_destroy(polling_obj *po) {
if (po->group != NULL) {
polling_group *pg = pg_lock_latest(po->group);
po->prev->next = po->next;
po->next->prev = po->prev;
pg_unref(pg);
}
gpr_mu_destroy(&po->mu);
}
static void pss_add_obj(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
pss_obj *obj, pss_obj_type type) {
pss = pss_ref_and_lock_master(pss);
gpr_mu_lock(&obj->mu);
if (obj->pss_master == pss) {
/* obj is already a member -- just bump refcount */
obj->pss_refs++;
gpr_mu_unlock(&obj->mu);
gpr_mu_unlock(&pss->po.mu);
pss_unref(pss);
return;
} else if (obj->pss_master != NULL) {
grpc_pollset_set *other_pss = pss_ref(obj->pss_master);
obj->pss_refs++;
gpr_mu_unlock(&obj->mu);
gpr_mu_unlock(&pss->po.mu);
pss_merge(exec_ctx, pss, other_pss);
pss_unref(other_pss);
pss_unref(pss);
} else {
GPR_ASSERT(obj->pss_refs == 0);
obj->pss_refs = 1;
obj->pss_master = pss;
if (pss->roots[type] == NULL) {
pss->roots[type] = obj;
obj->pss_next = obj->pss_prev = obj;
} else {
obj->pss_next = pss->roots[type];
obj->pss_prev = obj->pss_next->pss_prev;
obj->pss_prev->pss_next = obj;
obj->pss_next->pss_prev = obj;
}
switch (type) {
case PSS_FD:
REF_BY((grpc_fd *)obj, 2, "pollset_set");
gpr_mu_unlock(&obj->mu);
pss_broadcast_fd(exec_ctx, pss, obj);
break;
case PSS_POLLSET:
gpr_mu_unlock(&obj->mu);
pss_broadcast_pollset(exec_ctx, pss, obj);
break;
case PSS_POLLSET_SET:
case PSS_OBJ_TYPE_COUNT:
GPR_UNREACHABLE_CODE(break);
}
gpr_mu_unlock(&pss->po.mu);
pss_unref(pss);
static polling_group *pg_ref(polling_group *pg) {
gpr_ref(&pg->refs);
return pg;
}
static void pg_unref(polling_group *pg) {
if (gpr_unref(&pg->refs)) {
po_destroy(&pg->po);
gpr_free(pg);
}
}
static void pss_del_obj(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
pss_obj *obj, pss_obj_type type) {
bool unref = false;
pss = pss_ref_and_lock_master(pss);
gpr_mu_lock(&obj->mu);
obj->pss_refs--;
if (obj->pss_refs == 0) {
obj->pss_master = NULL;
if (obj == pss->roots[type]) {
pss->roots[type] = obj->pss_next;
}
if (obj->pss_next == obj) {
pss->roots[type] = NULL;
static void po_join(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b) {
if (a == b) return;
if (a > b) GPR_SWAP(polling_obj *, a, b);
gpr_mu_lock(&a->mu);
gpr_mu_lock(&b->mu);
if (a->group == NULL) {
if (b->group == NULL) {
polling_obj *initial_po[] = {a, b};
pg_create(exec_ctx, initial_po, GPR_ARRAY_SIZE(initial_po));
gpr_mu_unlock(&a->mu);
gpr_mu_unlock(&b->mu);
} else {
obj->pss_next->pss_prev = obj->pss_prev;
obj->pss_prev->pss_next = obj->pss_next;
polling_group *b_group = pg_ref(b->group);
gpr_mu_unlock(&b->mu);
gpr_mu_unlock(&a->mu);
pg_join(exec_ctx, b_group, a);
}
unref = true;
} else if (b->group == NULL) {
polling_group *a_group = pg_ref(a->group);
gpr_mu_unlock(&a->mu);
gpr_mu_unlock(&b->mu);
pg_join(exec_ctx, a_group, b);
} else if (a->group == b->group) {
/* nothing to do */
gpr_mu_unlock(&a->mu);
gpr_mu_unlock(&b->mu);
} else {
polling_group *a_group = pg_ref(a->group);
polling_group *b_group = pg_ref(b->group);
gpr_mu_unlock(&a->mu);
gpr_mu_unlock(&b->mu);
pg_merge(exec_ctx, a_group, b_group);
}
gpr_mu_unlock(&obj->mu);
gpr_mu_unlock(&pss->po.mu);
pss_unref(pss);
if (unref && type == PSS_FD) UNREF_BY((grpc_fd *)obj, 2, "pollset_set");
}
static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
grpc_fd *fd) {
pss_add_obj(exec_ctx, pss, &fd->po, PSS_FD);
#define POTYPES(a, b) (((a)*PO_COUNT) + (b))
static void pg_notify(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b) {
GPR_ASSERT(a != b);
switch (POTYPES(a->type, b->type)) {
case POTYPES(PO_FD, PO_POLLSET):
pollset_add_fd(exec_ctx, (grpc_pollset *)b, (grpc_fd *)a);
break;
case POTYPES(PO_POLLSET, PO_FD):
pollset_add_fd(exec_ctx, (grpc_pollset *)a, (grpc_fd *)b);
break;
}
}
static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
grpc_fd *fd) {
pss_del_obj(exec_ctx, pss, &fd->po, PSS_FD);
static void pg_broadcast(grpc_exec_ctx *exec_ctx, polling_group *from,
polling_group *to) {
for (polling_obj *a = from->po.next; a != &from->po; a = a->next) {
for (polling_obj *b = to->po.next; b != &to->po; b = b->next) {
pg_notify(exec_ctx, a, b);
}
}
}
static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pss, grpc_pollset *ps) {
pss_add_obj(exec_ctx, pss, &ps->po, PSS_POLLSET);
static void pg_create(grpc_exec_ctx *exec_ctx, polling_obj **initial_po,
size_t initial_po_count) {
/* assumes all polling objects in initial_po are locked */
polling_group *pg = gpr_malloc(sizeof(*pg));
po_init(&pg->po, PO_POLLING_GROUP);
gpr_ref_init(&pg->refs, initial_po_count);
GPR_ASSERT(initial_po[0]->group == NULL);
initial_po[0]->next = initial_po[0]->prev = initial_po[0];
initial_po[0]->group = pg;
for (size_t i = 0; i < initial_po_count; i++) {
GPR_ASSERT(initial_po[i]->group == NULL);
initial_po[i]->group = pg;
}
for (size_t i = 1; i < initial_po_count; i++) {
initial_po[i]->prev = initial_po[i - 1];
}
for (size_t i = 0; i < initial_po_count - 1; i++) {
initial_po[i]->next = initial_po[i + 1];
}
initial_po[0]->prev = &pg->po;
initial_po[initial_po_count - 1]->next = &pg->po;
pg->po.next = initial_po[0];
pg->po.prev = initial_po[initial_po_count - 1];
for (size_t i = 1; i < initial_po_count; i++) {
for (size_t j = 0; j < i; j++) {
pg_notify(exec_ctx, initial_po[i], initial_po[j]);
}
}
}
static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pss, grpc_pollset *ps) {
pss_del_obj(exec_ctx, pss, &ps->po, PSS_POLLSET);
static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg,
polling_obj *po) {
/* assumes neither pg nor po are locked; consumes one ref to pg */
pg = pg_lock_latest(pg);
/* pg locked */
gpr_mu_lock(&po->mu);
if (po->group != NULL) {
gpr_mu_unlock(&pg->po.mu);
polling_group *po_group = pg_ref(po->group);
gpr_mu_unlock(&po->mu);
pg_merge(exec_ctx, pg, po_group);
/* early exit: polling obj picked up a group before joining: we now need
to do a full merge */
return;
}
/* pg, po locked */
for (polling_obj *existing = pg->po.next /* skip pg - it's just a stub */;
existing != &pg->po; existing = existing->next) {
pg_notify(exec_ctx, po, existing);
}
po->group = pg;
po->next = &pg->po;
po->prev = pg->po.prev;
po->prev->next = po->next->prev = po;
gpr_mu_unlock(&pg->po.mu);
gpr_mu_unlock(&po->mu);
}
static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *bag,
grpc_pollset_set *item) {
pss_merge(exec_ctx, bag, item);
static void pg_merge(grpc_exec_ctx *exec_ctx, polling_group *a,
polling_group *b) {
for (;;) {
if (a == b) return;
if (a > b) GPR_SWAP(polling_group *, a, b);
gpr_mu_lock(&a->po.mu);
gpr_mu_lock(&b->po.mu);
if (a->po.group != NULL) {
polling_group *m2 = pg_ref(a->po.group);
gpr_mu_unlock(&a->po.mu);
gpr_mu_unlock(&b->po.mu);
pg_unref(a);
a = m2;
} else if (b->po.group != NULL) {
polling_group *m2 = pg_ref(b->po.group);
gpr_mu_unlock(&a->po.mu);
gpr_mu_unlock(&b->po.mu);
pg_unref(b);
b = m2;
} else {
break;
}
}
polling_group **unref = NULL;
size_t unref_count = 0;
size_t unref_cap = 0;
b->po.group = a;
pg_broadcast(exec_ctx, a, b);
pg_broadcast(exec_ctx, b, a);
while (b->po.next != &b->po) {
polling_obj *po = b->po.next;
gpr_mu_lock(&po->mu);
if (unref_count == unref_cap) {
unref_cap = GPR_MAX(8, 3 * unref_cap / 2);
unref = gpr_realloc(unref, unref_cap * sizeof(*unref));
}
unref[unref_count++] = po->group;
po->group = pg_ref(a);
// unlink from b
po->prev->next = po->next;
po->next->prev = po->prev;
// link to a
po->next = &a->po;
po->prev = a->po.prev;
po->next->prev = po->prev->next = po;
gpr_mu_unlock(&po->mu);
}
gpr_mu_unlock(&a->po.mu);
gpr_mu_unlock(&b->po.mu);
for (size_t i = 0; i < unref_count; i++) {
pg_unref(unref[i]);
}
gpr_free(unref);
}
static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *bag,
grpc_pollset_set *item) {}
/*******************************************************************************
* Event engine binding
*/

Loading…
Cancel
Save