Add poll_object struct (and related changes to fix compilation errors). No other functionality changes

reviewable/pr8797/r1
Sree Kuchibhotla 8 years ago
parent 501a4fc19c
commit f6f33d7354
  1. 404
      src/core/lib/iomgr/ev_epoll_linux.c

@ -90,10 +90,39 @@ void grpc_use_signal(int signum) {
struct polling_island; struct polling_island;
typedef enum {
POLL_OBJ_FD,
POLL_OBJ_POLLSET,
POLL_OBJ_POLLSET_SET
} poll_obj_type;
typedef struct poll_obj {
gpr_mu mu;
struct polling_island *pi;
} poll_obj;
const char *poll_obj_string(poll_obj_type po_type) {
switch (po_type) {
case POLL_OBJ_FD:
return "fd";
case POLL_OBJ_POLLSET:
return "pollset";
case POLL_OBJ_POLLSET_SET:
return "pollset_set";
}
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
/******************************************************************************* /*******************************************************************************
* Fd Declarations * Fd Declarations
*/ */
#define FD_FROM_PO(po) ((grpc_fd *)(po))
struct grpc_fd { struct grpc_fd {
poll_obj po;
int fd; int fd;
/* refst format: /* refst format:
bit 0 : 1=Active / 0=Orphaned bit 0 : 1=Active / 0=Orphaned
@ -101,8 +130,6 @@ struct grpc_fd {
Ref/Unref by two to avoid altering the orphaned bit */ Ref/Unref by two to avoid altering the orphaned bit */
gpr_atm refst; gpr_atm refst;
gpr_mu mu;
/* Indicates that the fd is shutdown and that any pending read/write closures /* Indicates that the fd is shutdown and that any pending read/write closures
should fail */ should fail */
bool shutdown; bool shutdown;
@ -115,9 +142,6 @@ struct grpc_fd {
grpc_closure *read_closure; grpc_closure *read_closure;
grpc_closure *write_closure; grpc_closure *write_closure;
/* The polling island to which this fd belongs to (protected by mu) */
struct polling_island *polling_island;
struct grpc_fd *freelist_next; struct grpc_fd *freelist_next;
grpc_closure *on_done_closure; grpc_closure *on_done_closure;
@ -220,16 +244,14 @@ struct grpc_pollset_worker {
}; };
struct grpc_pollset { struct grpc_pollset {
gpr_mu mu; poll_obj po;
grpc_pollset_worker root_worker; grpc_pollset_worker root_worker;
bool kicked_without_pollers; bool kicked_without_pollers;
bool shutting_down; /* Is the pollset shutting down ? */ bool shutting_down; /* Is the pollset shutting down ? */
bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */ bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
grpc_closure *shutdown_done; /* Called after after shutdown is complete */ grpc_closure *shutdown_done; /* Called after after shutdown is complete */
/* The polling island to which this pollset belongs to */
struct polling_island *polling_island;
}; };
/******************************************************************************* /*******************************************************************************
@ -242,7 +264,7 @@ struct grpc_pollset {
* simplify the grpc_fd structure since we would no longer need to explicitly * simplify the grpc_fd structure since we would no longer need to explicitly
* maintain the orphaned state */ * maintain the orphaned state */
struct grpc_pollset_set { struct grpc_pollset_set {
gpr_mu mu; poll_obj po;
size_t pollset_count; size_t pollset_count;
size_t pollset_capacity; size_t pollset_capacity;
@ -916,7 +938,7 @@ static void fd_global_shutdown(void) {
while (fd_freelist != NULL) { while (fd_freelist != NULL) {
grpc_fd *fd = fd_freelist; grpc_fd *fd = fd_freelist;
fd_freelist = fd_freelist->freelist_next; fd_freelist = fd_freelist->freelist_next;
gpr_mu_destroy(&fd->mu); gpr_mu_destroy(&fd->po.mu);
gpr_free(fd); gpr_free(fd);
} }
gpr_mu_destroy(&fd_freelist_mu); gpr_mu_destroy(&fd_freelist_mu);
@ -934,13 +956,14 @@ static grpc_fd *fd_create(int fd, const char *name) {
if (new_fd == NULL) { if (new_fd == NULL) {
new_fd = gpr_malloc(sizeof(grpc_fd)); new_fd = gpr_malloc(sizeof(grpc_fd));
gpr_mu_init(&new_fd->mu); gpr_mu_init(&new_fd->po.mu);
} }
/* Note: It is not really needed to get the new_fd->mu lock here. If this is a /* Note: It is not really needed to get the new_fd->po.mu lock here. If this
newly created fd (or an fd we got from the freelist), no one else would be * is a newly created fd (or an fd we got from the freelist), no one else
holding a lock to it anyway. */ * would be holding a lock to it anyway. */
gpr_mu_lock(&new_fd->mu); gpr_mu_lock(&new_fd->po.mu);
new_fd->po.pi = NULL;
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd; new_fd->fd = fd;
@ -948,12 +971,11 @@ static grpc_fd *fd_create(int fd, const char *name) {
new_fd->orphaned = false; new_fd->orphaned = false;
new_fd->read_closure = CLOSURE_NOT_READY; new_fd->read_closure = CLOSURE_NOT_READY;
new_fd->write_closure = CLOSURE_NOT_READY; new_fd->write_closure = CLOSURE_NOT_READY;
new_fd->polling_island = NULL;
new_fd->freelist_next = NULL; new_fd->freelist_next = NULL;
new_fd->on_done_closure = NULL; new_fd->on_done_closure = NULL;
new_fd->read_notifier_pollset = NULL; new_fd->read_notifier_pollset = NULL;
gpr_mu_unlock(&new_fd->mu); gpr_mu_unlock(&new_fd->po.mu);
char *fd_name; char *fd_name;
gpr_asprintf(&fd_name, "%s fd=%d", name, fd); gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
@ -971,11 +993,11 @@ static bool fd_is_orphaned(grpc_fd *fd) {
static int fd_wrapped_fd(grpc_fd *fd) { static int fd_wrapped_fd(grpc_fd *fd) {
int ret_fd = -1; int ret_fd = -1;
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
if (!fd->orphaned) { if (!fd->orphaned) {
ret_fd = fd->fd; ret_fd = fd->fd;
} }
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
return ret_fd; return ret_fd;
} }
@ -987,7 +1009,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
polling_island *unref_pi = NULL; polling_island *unref_pi = NULL;
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
fd->on_done_closure = on_done; fd->on_done_closure = on_done;
/* If release_fd is not NULL, we should be relinquishing control of the file /* If release_fd is not NULL, we should be relinquishing control of the file
@ -1007,25 +1029,25 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
/* Remove the fd from the polling island: /* Remove the fd from the polling island:
- Get a lock on the latest polling island (i.e the last island in the - Get a lock on the latest polling island (i.e the last island in the
linked list pointed by fd->polling_island). This is the island that linked list pointed by fd->po.pi). This is the island that
would actually contain the fd would actually contain the fd
- Remove the fd from the latest polling island - Remove the fd from the latest polling island
- Unlock the latest polling island - Unlock the latest polling island
- Set fd->polling_island to NULL (but remove the ref on the polling island - Set fd->po.pi to NULL (but remove the ref on the polling island
before doing this.) */ before doing this.) */
if (fd->polling_island != NULL) { if (fd->po.pi != NULL) {
polling_island *pi_latest = polling_island_lock(fd->polling_island); polling_island *pi_latest = polling_island_lock(fd->po.pi);
polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error); polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
gpr_mu_unlock(&pi_latest->mu); gpr_mu_unlock(&pi_latest->mu);
unref_pi = fd->polling_island; unref_pi = fd->po.pi;
fd->polling_island = NULL; fd->po.pi = NULL;
} }
grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error), grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error),
NULL); NULL);
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */ UNREF_BY(fd, 2, reason); /* Drop the reference */
if (unref_pi != NULL) { if (unref_pi != NULL) {
/* Unref stale polling island here, outside the fd lock above. /* Unref stale polling island here, outside the fd lock above.
@ -1090,23 +1112,23 @@ static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
grpc_fd *fd) { grpc_fd *fd) {
grpc_pollset *notifier = NULL; grpc_pollset *notifier = NULL;
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
notifier = fd->read_notifier_pollset; notifier = fd->read_notifier_pollset;
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
return notifier; return notifier;
} }
static bool fd_is_shutdown(grpc_fd *fd) { static bool fd_is_shutdown(grpc_fd *fd) {
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
const bool r = fd->shutdown; const bool r = fd->shutdown;
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
return r; return r;
} }
/* Might be called multiple times */ /* Might be called multiple times */
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
/* Do the actual shutdown only once */ /* Do the actual shutdown only once */
if (!fd->shutdown) { if (!fd->shutdown) {
fd->shutdown = true; fd->shutdown = true;
@ -1117,28 +1139,28 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
set_ready_locked(exec_ctx, fd, &fd->read_closure); set_ready_locked(exec_ctx, fd, &fd->read_closure);
set_ready_locked(exec_ctx, fd, &fd->write_closure); set_ready_locked(exec_ctx, fd, &fd->write_closure);
} }
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
} }
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) { grpc_closure *closure) {
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
notify_on_locked(exec_ctx, fd, &fd->read_closure, closure); notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
} }
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) { grpc_closure *closure) {
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
notify_on_locked(exec_ctx, fd, &fd->write_closure, closure); notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
} }
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
grpc_workqueue *workqueue = GRPC_WORKQUEUE_REF( grpc_workqueue *workqueue = GRPC_WORKQUEUE_REF(
(grpc_workqueue *)fd->polling_island, "fd_get_workqueue"); (grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
return workqueue; return workqueue;
} }
@ -1278,8 +1300,9 @@ static grpc_error *kick_poller(void) {
} }
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
gpr_mu_init(&pollset->mu); gpr_mu_init(&pollset->po.mu);
*mu = &pollset->mu; *mu = &pollset->po.mu;
pollset->po.pi = NULL;
pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
pollset->kicked_without_pollers = false; pollset->kicked_without_pollers = false;
@ -1287,8 +1310,6 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
pollset->shutting_down = false; pollset->shutting_down = false;
pollset->finish_shutdown_called = false; pollset->finish_shutdown_called = false;
pollset->shutdown_done = NULL; pollset->shutdown_done = NULL;
pollset->polling_island = NULL;
} }
/* Convert a timespec to milliseconds: /* Convert a timespec to milliseconds:
@ -1318,26 +1339,26 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) { grpc_pollset *notifier) {
/* Need the fd->mu since we might be racing with fd_notify_on_read */ /* Need the fd->po.mu since we might be racing with fd_notify_on_read */
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
set_ready_locked(exec_ctx, fd, &fd->read_closure); set_ready_locked(exec_ctx, fd, &fd->read_closure);
fd->read_notifier_pollset = notifier; fd->read_notifier_pollset = notifier;
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
} }
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
/* Need the fd->mu since we might be racing with fd_notify_on_write */ /* Need the fd->po.mu since we might be racing with fd_notify_on_write */
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
set_ready_locked(exec_ctx, fd, &fd->write_closure); set_ready_locked(exec_ctx, fd, &fd->write_closure);
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
} }
static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx, static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
grpc_pollset *ps, char *reason) { grpc_pollset *ps, char *reason) {
if (ps->polling_island != NULL) { if (ps->po.pi != NULL) {
PI_UNREF(exec_ctx, ps->polling_island, reason); PI_UNREF(exec_ctx, ps->po.pi, reason);
} }
ps->polling_island = NULL; ps->po.pi = NULL;
} }
static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
@ -1347,12 +1368,12 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
pollset->finish_shutdown_called = true; pollset->finish_shutdown_called = true;
/* Release the ref and set pollset->polling_island to NULL */ /* Release the ref and set pollset->po.pi to NULL */
pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown"); pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
} }
/* pollset->mu lock must be held by the caller before calling this */ /* pollset->po.mu lock must be held by the caller before calling this */
static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) { grpc_closure *closure) {
GPR_TIMER_BEGIN("pollset_shutdown", 0); GPR_TIMER_BEGIN("pollset_shutdown", 0);
@ -1377,7 +1398,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
* here */ * here */
static void pollset_destroy(grpc_pollset *pollset) { static void pollset_destroy(grpc_pollset *pollset) {
GPR_ASSERT(!pollset_has_workers(pollset)); GPR_ASSERT(!pollset_has_workers(pollset));
gpr_mu_destroy(&pollset->mu); gpr_mu_destroy(&pollset->po.mu);
} }
static void pollset_reset(grpc_pollset *pollset) { static void pollset_reset(grpc_pollset *pollset) {
@ -1387,7 +1408,7 @@ static void pollset_reset(grpc_pollset *pollset) {
pollset->finish_shutdown_called = false; pollset->finish_shutdown_called = false;
pollset->kicked_without_pollers = false; pollset->kicked_without_pollers = false;
pollset->shutdown_done = NULL; pollset->shutdown_done = NULL;
GPR_ASSERT(pollset->polling_island == NULL); GPR_ASSERT(pollset->po.pi == NULL);
} }
static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
@ -1427,7 +1448,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN("pollset_work_and_unlock", 0); GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
/* We need to get the epoll_fd to wait on. The epoll_fd is in inside the /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
latest polling island pointed by pollset->polling_island. latest polling island pointed by pollset->po.pi
Since epoll_fd is immutable, we can read it without obtaining the polling Since epoll_fd is immutable, we can read it without obtaining the polling
island lock. There is however a possibility that the polling island (from island lock. There is however a possibility that the polling island (from
@ -1436,36 +1457,36 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
right-away from epoll_wait() and pick up the latest polling_island the next right-away from epoll_wait() and pick up the latest polling_island the next
this function (i.e pollset_work_and_unlock()) is called */ this function (i.e pollset_work_and_unlock()) is called */
if (pollset->polling_island == NULL) { if (pollset->po.pi == NULL) {
pollset->polling_island = polling_island_create(exec_ctx, NULL, error); pollset->po.pi = polling_island_create(exec_ctx, NULL, error);
if (pollset->polling_island == NULL) { if (pollset->po.pi == NULL) {
GPR_TIMER_END("pollset_work_and_unlock", 0); GPR_TIMER_END("pollset_work_and_unlock", 0);
return; /* Fatal error. We cannot continue */ return; /* Fatal error. We cannot continue */
} }
PI_ADD_REF(pollset->polling_island, "ps"); PI_ADD_REF(pollset->po.pi, "ps");
GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p", GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
(void *)pollset, (void *)pollset->polling_island); (void *)pollset, (void *)pollset->po.pi);
} }
pi = polling_island_maybe_get_latest(pollset->polling_island); pi = polling_island_maybe_get_latest(pollset->po.pi);
epoll_fd = pi->epoll_fd; epoll_fd = pi->epoll_fd;
/* Update the pollset->polling_island since the island being pointed by /* Update the pollset->po.pi since the island being pointed by
pollset->polling_island maybe older than the one pointed by pi) */ pollset->po.pi maybe older than the one pointed by pi) */
if (pollset->polling_island != pi) { if (pollset->po.pi != pi) {
/* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
polling island to be deleted */ polling island to be deleted */
PI_ADD_REF(pi, "ps"); PI_ADD_REF(pi, "ps");
PI_UNREF(exec_ctx, pollset->polling_island, "ps"); PI_UNREF(exec_ctx, pollset->po.pi, "ps");
pollset->polling_island = pi; pollset->po.pi = pi;
} }
/* Add an extra ref so that the island does not get destroyed (which means /* Add an extra ref so that the island does not get destroyed (which means
the epoll_fd won't be closed) while we are are doing an epoll_wait() on the the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
epoll_fd */ epoll_fd */
PI_ADD_REF(pi, "ps_work"); PI_ADD_REF(pi, "ps_work");
gpr_mu_unlock(&pollset->mu); gpr_mu_unlock(&pollset->po.mu);
/* If we get some workqueue work to do, it might end up completing an item on /* If we get some workqueue work to do, it might end up completing an item on
the completion queue, so there's no need to poll... so we skip that and the completion queue, so there's no need to poll... so we skip that and
@ -1540,17 +1561,17 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(pi != NULL); GPR_ASSERT(pi != NULL);
/* Before leaving, release the extra ref we added to the polling island. It /* Before leaving, release the extra ref we added to the polling island. It
is important to use "pi" here (i.e our old copy of pollset->polling_island is important to use "pi" here (i.e our old copy of pollset->po.pi
that we got before releasing the polling island lock). This is because that we got before releasing the polling island lock). This is because
pollset->polling_island pointer might get udpated in other parts of the pollset->po.pi pointer might get udpated in other parts of the
code when there is an island merge while we are doing epoll_wait() above */ code when there is an island merge while we are doing epoll_wait() above */
PI_UNREF(exec_ctx, pi, "ps_work"); PI_UNREF(exec_ctx, pi, "ps_work");
GPR_TIMER_END("pollset_work_and_unlock", 0); GPR_TIMER_END("pollset_work_and_unlock", 0);
} }
/* pollset->mu lock must be held by the caller before calling this. /* pollset->po.mu lock must be held by the caller before calling this.
The function pollset_work() may temporarily release the lock (pollset->mu) The function pollset_work() may temporarily release the lock (pollset->po.mu)
during the course of its execution but it will always re-acquire the lock and during the course of its execution but it will always re-acquire the lock and
ensure that it is held by the time the function returns */ ensure that it is held by the time the function returns */
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@ -1620,7 +1641,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
&g_orig_sigmask, &error); &g_orig_sigmask, &error);
grpc_exec_ctx_flush(exec_ctx); grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu); gpr_mu_lock(&pollset->po.mu);
/* Note: There is no need to reset worker.is_kicked to 0 since we are no /* Note: There is no need to reset worker.is_kicked to 0 since we are no
longer going to use this worker */ longer going to use this worker */
@ -1640,9 +1661,9 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0); GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
finish_shutdown_locked(exec_ctx, pollset); finish_shutdown_locked(exec_ctx, pollset);
gpr_mu_unlock(&pollset->mu); gpr_mu_unlock(&pollset->po.mu);
grpc_exec_ctx_flush(exec_ctx); grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu); gpr_mu_lock(&pollset->po.mu);
} }
*worker_hdl = NULL; *worker_hdl = NULL;
@ -1656,42 +1677,159 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
return error; return error;
} }
#if 0
static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag,
poll_obj *item, poll_obj_type bag_type,
poll_obj_type item_type) {
GPR_TIMER_BEGIN("add_poll_object", 0);
grpc_error *error = GRPC_ERROR_NONE;
polling_island *pi_new = NULL;
gpr_mu_lock(&bag->mu);
gpr_mu_lock(&item->mu);
retry:
/*
* 1) If item->pi and bag->pi are both non-NULL and equal, do nothing
* 2) If item->pi and bag->pi are both NULL, create a new polling island (with
* a refcount of 2) and point item->pi and bag->pi to the new island
* 3) If exactly one of item->pi or bag->pi is NULL, update it to point to
* the other's non-NULL pi
* 4) Finally if item->pi and bag-pi are non-NULL and not-equal, merge the
* polling islands and update item->pi and bag->pi to point to the new
* island
*/
/* Early out if we are trying to add an 'fd' to a 'bag' but the fd is already
* orphaned */
if (item_type == POLL_OBJ_FD && (FD_FROM_PO(item))->orphaned) {
gpr_mu_unlock(&item->mu);
gpr_mu_unlock(&bag->mu);
return;
}
if (item->pi == bag->pi) {
pi_new = item->pi;
if (pi_new == NULL) {
/* GPR_ASSERT(item->pi == bag->pi == NULL) */
/* If we are adding an fd to a bag (i.e pollset or pollset_set), then
* we need to do some extra work to make TSAN happy */
if (item_type == POLL_OBJ_FD) {
/* Unlock before creating a new polling island: the polling island will
create a workqueue which creates a file descriptor, and holding an fd
lock here can eventually cause a loop to appear to TSAN (making it
unhappy). We don't think it's a real loop (there's an epoch point
where that loop possibility disappears), but the advantages of
keeping TSAN happy outweigh any performance advantage we might have
by keeping the lock held. */
gpr_mu_unlock(&item->mu);
pi_new = polling_island_create(exec_ctx, FD_FROM_PO(item), &error);
gpr_mu_lock(&item->mu);
/* Need to reverify any assumptions made between the initial lock and
getting to this branch: if they've changed, we need to throw away our
work and figure things out again. */
if (item->pi != NULL) {
/* No need to lock 'pi_new' here since this is a new polling island
* and no one has a reference to it yet */
polling_island_remove_all_fds_locked(pi_new, true, &error);
/* Ref and unref so that the polling island gets deleted during unref
*/
PI_ADD_REF(pi_new, "dance_of_destruction");
PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
goto retry;
}
} else {
pi_new = polling_island_create(exec_ctx, NULL, &error);
}
}
} else if (item->pi == NULL) {
/* GPR_ASSERT(bag->pi != NULL) */
/* Make pi_new point to latest pi*/
pi_new = polling_island_lock(bag->pi);
if (item_type == POLL_OBJ_FD) {
grpc_fd *fd = FD_FROM_PO(item);
polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
}
gpr_mu_unlock(&pi_new->mu);
} else if (bag->pi == NULL) {
/* GPR_ASSERT(item->pi != NULL) */
/* Make pi_new to point to latest pi */
pi_new = polling_island_lock(item->pi);
gpr_mu_unlock(&pi_new->mu);
} else {
pi_new = polling_island_merge(item->pi, bag->pi, &error);
}
/* At this point, pi_new is the polling island that both item->pi and bag->pi
MUST be pointing to */
if (item->pi != pi_new) {
PI_ADD_REF(pi_new, poll_obj_string(item_type));
if (item->pi != NULL) {
PI_UNREF(exec_ctx, item->pi, poll_obj_string(item_type));
}
item->pi = pi_new;
}
if (bag->pi != pi_new) {
PI_ADD_REF(pi_new, poll_obj_string(bag_type));
if (bag->pi != NULL) {
PI_UNREF(exec_ctx, bag->pi, poll_obj_string(bag_type));
}
bag->pi = pi_new;
}
gpr_mu_unlock(&item->mu);
gpr_mu_unlock(&bag->mu);
GRPC_LOG_IF_ERROR("add_poll_object", error);
GPR_TIMER_END("add_poll_object", 0);
}
#endif
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd *fd) { grpc_fd *fd) {
GPR_TIMER_BEGIN("pollset_add_fd", 0); GPR_TIMER_BEGIN("pollset_add_fd", 0);
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
gpr_mu_lock(&pollset->mu); gpr_mu_lock(&pollset->po.mu);
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
polling_island *pi_new = NULL; polling_island *pi_new = NULL;
retry: retry:
/* 1) If fd->polling_island and pollset->polling_island are both non-NULL and /* 1) If fd->po.pi and pollset->po.pi are both non-NULL and
* equal, do nothing. * equal, do nothing.
* 2) If fd->polling_island and pollset->polling_island are both NULL, create * 2) If fd->po.pi and pollset->po.pi are both NULL, create
* a new polling island (with a refcount of 2) and make the polling_island * a new polling island (with a refcount of 2) and make the polling_island
* fields in both fd and pollset to point to the new island * fields in both fd and pollset to point to the new island
* 3) If one of fd->polling_island or pollset->polling_island is NULL, update * 3) If one of fd->po.pi or pollset->po.pi is NULL, update
* the NULL polling_island field to point to the non-NULL polling_island * the NULL polling_island field to point to the non-NULL polling_island
* field (ensure that the refcount on the polling island is incremented by * field (ensure that the refcount on the polling island is incremented by
* 1 to account for the newly added reference) * 1 to account for the newly added reference)
* 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL * 4) Finally, if fd->po.pi and pollset->po.pi are non-NULL
* and different, merge both the polling islands and update the * and different, merge both the polling islands and update the
* polling_island fields in both fd and pollset to point to the merged * polling_island fields in both fd and pollset to point to the merged
* polling island. * polling island.
*/ */
if (fd->orphaned) { if (fd->orphaned) {
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
gpr_mu_unlock(&pollset->mu); gpr_mu_unlock(&pollset->po.mu);
/* early out */ /* early out */
return; return;
} }
if (fd->polling_island == pollset->polling_island) { if (fd->po.pi == pollset->po.pi) {
pi_new = fd->polling_island; pi_new = fd->po.pi;
if (pi_new == NULL) { if (pi_new == NULL) {
/* Unlock before creating a new polling island: the polling island will /* Unlock before creating a new polling island: the polling island will
create a workqueue which creates a file descriptor, and holding an fd create a workqueue which creates a file descriptor, and holding an fd
@ -1700,13 +1838,13 @@ retry:
that loop possibility disappears), but the advantages of keeping TSAN that loop possibility disappears), but the advantages of keeping TSAN
happy outweigh any performance advantage we might have by keeping the happy outweigh any performance advantage we might have by keeping the
lock held. */ lock held. */
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
pi_new = polling_island_create(exec_ctx, fd, &error); pi_new = polling_island_create(exec_ctx, fd, &error);
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
/* Need to reverify any assumptions made between the initial lock and /* Need to reverify any assumptions made between the initial lock and
getting to this branch: if they've changed, we need to throw away our getting to this branch: if they've changed, we need to throw away our
work and figure things out again. */ work and figure things out again. */
if (fd->polling_island != NULL) { if (fd->po.pi != NULL) {
GRPC_POLLING_TRACE( GRPC_POLLING_TRACE(
"pollset_add_fd: Raced creating new polling island. pi_new: %p " "pollset_add_fd: Raced creating new polling island. pi_new: %p "
"(fd: %d, pollset: %p)", "(fd: %d, pollset: %p)",
@ -1727,55 +1865,53 @@ retry:
(void *)pi_new, fd->fd, (void *)pollset); (void *)pi_new, fd->fd, (void *)pollset);
} }
} }
} else if (fd->polling_island == NULL) { } else if (fd->po.pi == NULL) {
pi_new = polling_island_lock(pollset->polling_island); pi_new = polling_island_lock(pollset->po.pi);
polling_island_add_fds_locked(pi_new, &fd, 1, true, &error); polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
gpr_mu_unlock(&pi_new->mu); gpr_mu_unlock(&pi_new->mu);
GRPC_POLLING_TRACE( GRPC_POLLING_TRACE(
"pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, " "pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, "
"pollset->pi: %p)", "pollset->pi: %p)",
(void *)pi_new, fd->fd, (void *)pollset, (void *)pi_new, fd->fd, (void *)pollset, (void *)pollset->po.pi);
(void *)pollset->polling_island); } else if (pollset->po.pi == NULL) {
} else if (pollset->polling_island == NULL) { pi_new = polling_island_lock(fd->po.pi);
pi_new = polling_island_lock(fd->polling_island);
gpr_mu_unlock(&pi_new->mu); gpr_mu_unlock(&pi_new->mu);
GRPC_POLLING_TRACE( GRPC_POLLING_TRACE(
"pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: " "pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: "
"%p, fd->pi: %p", "%p, fd->pi: %p",
(void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island); (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->po.pi);
} else { } else {
pi_new = polling_island_merge(fd->polling_island, pollset->polling_island, pi_new = polling_island_merge(fd->po.pi, pollset->po.pi, &error);
&error);
GRPC_POLLING_TRACE( GRPC_POLLING_TRACE(
"pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: " "pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: "
"%p, fd->pi: %p, pollset->pi: %p)", "%p, fd->pi: %p, pollset->pi: %p)",
(void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island, (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->po.pi,
(void *)pollset->polling_island); (void *)pollset->po.pi);
} }
/* At this point, pi_new is the polling island that both fd->polling_island /* At this point, pi_new is the polling island that both fd->po.pi
and pollset->polling_island must be pointing to */ and pollset->po.pi must be pointing to */
if (fd->polling_island != pi_new) { if (fd->po.pi != pi_new) {
PI_ADD_REF(pi_new, "fd"); PI_ADD_REF(pi_new, "fd");
if (fd->polling_island != NULL) { if (fd->po.pi != NULL) {
PI_UNREF(exec_ctx, fd->polling_island, "fd"); PI_UNREF(exec_ctx, fd->po.pi, "fd");
} }
fd->polling_island = pi_new; fd->po.pi = pi_new;
} }
if (pollset->polling_island != pi_new) { if (pollset->po.pi != pi_new) {
PI_ADD_REF(pi_new, "ps"); PI_ADD_REF(pi_new, "ps");
if (pollset->polling_island != NULL) { if (pollset->po.pi != NULL) {
PI_UNREF(exec_ctx, pollset->polling_island, "ps"); PI_UNREF(exec_ctx, pollset->po.pi, "ps");
} }
pollset->polling_island = pi_new; pollset->po.pi = pi_new;
} }
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
gpr_mu_unlock(&pollset->mu); gpr_mu_unlock(&pollset->po.mu);
GRPC_LOG_IF_ERROR("pollset_add_fd", error); GRPC_LOG_IF_ERROR("pollset_add_fd", error);
@ -1789,13 +1925,13 @@ retry:
static grpc_pollset_set *pollset_set_create(void) { static grpc_pollset_set *pollset_set_create(void) {
grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set)); grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
memset(pollset_set, 0, sizeof(*pollset_set)); memset(pollset_set, 0, sizeof(*pollset_set));
gpr_mu_init(&pollset_set->mu); gpr_mu_init(&pollset_set->po.mu);
return pollset_set; return pollset_set;
} }
static void pollset_set_destroy(grpc_pollset_set *pollset_set) { static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
size_t i; size_t i;
gpr_mu_destroy(&pollset_set->mu); gpr_mu_destroy(&pollset_set->po.mu);
for (i = 0; i < pollset_set->fd_count; i++) { for (i = 0; i < pollset_set->fd_count; i++) {
GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
} }
@ -1808,7 +1944,7 @@ static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set, grpc_fd *fd) { grpc_pollset_set *pollset_set, grpc_fd *fd) {
size_t i; size_t i;
gpr_mu_lock(&pollset_set->mu); gpr_mu_lock(&pollset_set->po.mu);
if (pollset_set->fd_count == pollset_set->fd_capacity) { if (pollset_set->fd_count == pollset_set->fd_capacity) {
pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity); pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
pollset_set->fds = gpr_realloc( pollset_set->fds = gpr_realloc(
@ -1822,13 +1958,13 @@ static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
for (i = 0; i < pollset_set->pollset_set_count; i++) { for (i = 0; i < pollset_set->pollset_set_count; i++) {
pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd); pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
} }
gpr_mu_unlock(&pollset_set->mu); gpr_mu_unlock(&pollset_set->po.mu);
} }
static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set, grpc_fd *fd) { grpc_pollset_set *pollset_set, grpc_fd *fd) {
size_t i; size_t i;
gpr_mu_lock(&pollset_set->mu); gpr_mu_lock(&pollset_set->po.mu);
for (i = 0; i < pollset_set->fd_count; i++) { for (i = 0; i < pollset_set->fd_count; i++) {
if (pollset_set->fds[i] == fd) { if (pollset_set->fds[i] == fd) {
pollset_set->fd_count--; pollset_set->fd_count--;
@ -1841,14 +1977,14 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
for (i = 0; i < pollset_set->pollset_set_count; i++) { for (i = 0; i < pollset_set->pollset_set_count; i++) {
pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd); pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
} }
gpr_mu_unlock(&pollset_set->mu); gpr_mu_unlock(&pollset_set->po.mu);
} }
static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set, grpc_pollset_set *pollset_set,
grpc_pollset *pollset) { grpc_pollset *pollset) {
size_t i, j; size_t i, j;
gpr_mu_lock(&pollset_set->mu); gpr_mu_lock(&pollset_set->po.mu);
if (pollset_set->pollset_count == pollset_set->pollset_capacity) { if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
pollset_set->pollset_capacity = pollset_set->pollset_capacity =
GPR_MAX(8, 2 * pollset_set->pollset_capacity); GPR_MAX(8, 2 * pollset_set->pollset_capacity);
@ -1866,14 +2002,14 @@ static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
} }
} }
pollset_set->fd_count = j; pollset_set->fd_count = j;
gpr_mu_unlock(&pollset_set->mu); gpr_mu_unlock(&pollset_set->po.mu);
} }
static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set, grpc_pollset_set *pollset_set,
grpc_pollset *pollset) { grpc_pollset *pollset) {
size_t i; size_t i;
gpr_mu_lock(&pollset_set->mu); gpr_mu_lock(&pollset_set->po.mu);
for (i = 0; i < pollset_set->pollset_count; i++) { for (i = 0; i < pollset_set->pollset_count; i++) {
if (pollset_set->pollsets[i] == pollset) { if (pollset_set->pollsets[i] == pollset) {
pollset_set->pollset_count--; pollset_set->pollset_count--;
@ -1882,14 +2018,14 @@ static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
break; break;
} }
} }
gpr_mu_unlock(&pollset_set->mu); gpr_mu_unlock(&pollset_set->po.mu);
} }
static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *bag, grpc_pollset_set *bag,
grpc_pollset_set *item) { grpc_pollset_set *item) {
size_t i, j; size_t i, j;
gpr_mu_lock(&bag->mu); gpr_mu_lock(&bag->po.mu);
if (bag->pollset_set_count == bag->pollset_set_capacity) { if (bag->pollset_set_count == bag->pollset_set_capacity) {
bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity); bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
bag->pollset_sets = bag->pollset_sets =
@ -1906,14 +2042,14 @@ static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
} }
} }
bag->fd_count = j; bag->fd_count = j;
gpr_mu_unlock(&bag->mu); gpr_mu_unlock(&bag->po.mu);
} }
static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *bag, grpc_pollset_set *bag,
grpc_pollset_set *item) { grpc_pollset_set *item) {
size_t i; size_t i;
gpr_mu_lock(&bag->mu); gpr_mu_lock(&bag->po.mu);
for (i = 0; i < bag->pollset_set_count; i++) { for (i = 0; i < bag->pollset_set_count; i++) {
if (bag->pollset_sets[i] == item) { if (bag->pollset_sets[i] == item) {
bag->pollset_set_count--; bag->pollset_set_count--;
@ -1922,7 +2058,7 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
break; break;
} }
} }
gpr_mu_unlock(&bag->mu); gpr_mu_unlock(&bag->po.mu);
} }
/* Test helper functions /* Test helper functions
@ -1930,9 +2066,9 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
void *grpc_fd_get_polling_island(grpc_fd *fd) { void *grpc_fd_get_polling_island(grpc_fd *fd) {
polling_island *pi; polling_island *pi;
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
pi = fd->polling_island; pi = fd->po.pi;
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
return pi; return pi;
} }
@ -1940,9 +2076,9 @@ void *grpc_fd_get_polling_island(grpc_fd *fd) {
void *grpc_pollset_get_polling_island(grpc_pollset *ps) { void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
polling_island *pi; polling_island *pi;
gpr_mu_lock(&ps->mu); gpr_mu_lock(&ps->po.mu);
pi = ps->polling_island; pi = ps->po.pi;
gpr_mu_unlock(&ps->mu); gpr_mu_unlock(&ps->po.mu);
return pi; return pi;
} }

Loading…
Cancel
Save