Merge pull request #8797 from sreecha/pss_pi

Rewrite pollset_set implementation
pull/8893/head^2
David G. Quintas 8 years ago committed by GitHub
commit 48b1558823
  1. 585
      src/core/lib/iomgr/ev_epoll_linux.c

@ -69,6 +69,9 @@ static int grpc_polling_trace = 0; /* Disabled by default */
gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \ gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
} }
/* Uncomment the following enable extra checks on poll_object operations */
/* #define PO_DEBUG */
static int grpc_wakeup_signal = -1; static int grpc_wakeup_signal = -1;
static bool is_grpc_wakeup_signal_initialized = false; static bool is_grpc_wakeup_signal_initialized = false;
@ -95,10 +98,42 @@ void grpc_use_signal(int signum) {
struct polling_island; struct polling_island;
typedef enum {
POLL_OBJ_FD,
POLL_OBJ_POLLSET,
POLL_OBJ_POLLSET_SET
} poll_obj_type;
typedef struct poll_obj {
#ifdef PO_DEBUG
poll_obj_type obj_type;
#endif
gpr_mu mu;
struct polling_island *pi;
} poll_obj;
const char *poll_obj_string(poll_obj_type po_type) {
switch (po_type) {
case POLL_OBJ_FD:
return "fd";
case POLL_OBJ_POLLSET:
return "pollset";
case POLL_OBJ_POLLSET_SET:
return "pollset_set";
}
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
/******************************************************************************* /*******************************************************************************
* Fd Declarations * Fd Declarations
*/ */
#define FD_FROM_PO(po) ((grpc_fd *)(po))
struct grpc_fd { struct grpc_fd {
poll_obj po;
int fd; int fd;
/* refst format: /* refst format:
bit 0 : 1=Active / 0=Orphaned bit 0 : 1=Active / 0=Orphaned
@ -106,8 +141,6 @@ struct grpc_fd {
Ref/Unref by two to avoid altering the orphaned bit */ Ref/Unref by two to avoid altering the orphaned bit */
gpr_atm refst; gpr_atm refst;
gpr_mu mu;
/* Indicates that the fd is shutdown and that any pending read/write closures /* Indicates that the fd is shutdown and that any pending read/write closures
should fail */ should fail */
bool shutdown; bool shutdown;
@ -120,9 +153,6 @@ struct grpc_fd {
grpc_closure *read_closure; grpc_closure *read_closure;
grpc_closure *write_closure; grpc_closure *write_closure;
/* The polling island to which this fd belongs to (protected by mu) */
struct polling_island *polling_island;
struct grpc_fd *freelist_next; struct grpc_fd *freelist_next;
grpc_closure *on_done_closure; grpc_closure *on_done_closure;
@ -225,41 +255,21 @@ struct grpc_pollset_worker {
}; };
struct grpc_pollset { struct grpc_pollset {
gpr_mu mu; poll_obj po;
grpc_pollset_worker root_worker; grpc_pollset_worker root_worker;
bool kicked_without_pollers; bool kicked_without_pollers;
bool shutting_down; /* Is the pollset shutting down ? */ bool shutting_down; /* Is the pollset shutting down ? */
bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */ bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
grpc_closure *shutdown_done; /* Called after after shutdown is complete */ grpc_closure *shutdown_done; /* Called after after shutdown is complete */
/* The polling island to which this pollset belongs to */
struct polling_island *polling_island;
}; };
/******************************************************************************* /*******************************************************************************
* Pollset-set Declarations * Pollset-set Declarations
*/ */
/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
* directly points to a polling_island (and adding an fd/pollset/pollset_set to
* the current pollset_set would result in polling island merges. This would
* remove the need to maintain fd_count here. This will also significantly
* simplify the grpc_fd structure since we would no longer need to explicitly
* maintain the orphaned state */
struct grpc_pollset_set { struct grpc_pollset_set {
gpr_mu mu; poll_obj po;
size_t pollset_count;
size_t pollset_capacity;
grpc_pollset **pollsets;
size_t pollset_set_count;
size_t pollset_set_capacity;
struct grpc_pollset_set **pollset_sets;
size_t fd_count;
size_t fd_capacity;
grpc_fd **fds;
}; };
/******************************************************************************* /*******************************************************************************
@ -915,7 +925,7 @@ static void fd_global_shutdown(void) {
while (fd_freelist != NULL) { while (fd_freelist != NULL) {
grpc_fd *fd = fd_freelist; grpc_fd *fd = fd_freelist;
fd_freelist = fd_freelist->freelist_next; fd_freelist = fd_freelist->freelist_next;
gpr_mu_destroy(&fd->mu); gpr_mu_destroy(&fd->po.mu);
gpr_free(fd); gpr_free(fd);
} }
gpr_mu_destroy(&fd_freelist_mu); gpr_mu_destroy(&fd_freelist_mu);
@ -933,13 +943,17 @@ static grpc_fd *fd_create(int fd, const char *name) {
if (new_fd == NULL) { if (new_fd == NULL) {
new_fd = gpr_malloc(sizeof(grpc_fd)); new_fd = gpr_malloc(sizeof(grpc_fd));
gpr_mu_init(&new_fd->mu); gpr_mu_init(&new_fd->po.mu);
} }
/* Note: It is not really needed to get the new_fd->mu lock here. If this is a /* Note: It is not really needed to get the new_fd->po.mu lock here. If this
newly created fd (or an fd we got from the freelist), no one else would be * is a newly created fd (or an fd we got from the freelist), no one else
holding a lock to it anyway. */ * would be holding a lock to it anyway. */
gpr_mu_lock(&new_fd->mu); gpr_mu_lock(&new_fd->po.mu);
new_fd->po.pi = NULL;
#ifdef PO_DEBUG
new_fd->po.obj_type = POLL_OBJ_FD;
#endif
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd; new_fd->fd = fd;
@ -947,12 +961,11 @@ static grpc_fd *fd_create(int fd, const char *name) {
new_fd->orphaned = false; new_fd->orphaned = false;
new_fd->read_closure = CLOSURE_NOT_READY; new_fd->read_closure = CLOSURE_NOT_READY;
new_fd->write_closure = CLOSURE_NOT_READY; new_fd->write_closure = CLOSURE_NOT_READY;
new_fd->polling_island = NULL;
new_fd->freelist_next = NULL; new_fd->freelist_next = NULL;
new_fd->on_done_closure = NULL; new_fd->on_done_closure = NULL;
new_fd->read_notifier_pollset = NULL; new_fd->read_notifier_pollset = NULL;
gpr_mu_unlock(&new_fd->mu); gpr_mu_unlock(&new_fd->po.mu);
char *fd_name; char *fd_name;
gpr_asprintf(&fd_name, "%s fd=%d", name, fd); gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
@ -964,17 +977,13 @@ static grpc_fd *fd_create(int fd, const char *name) {
return new_fd; return new_fd;
} }
static bool fd_is_orphaned(grpc_fd *fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
static int fd_wrapped_fd(grpc_fd *fd) { static int fd_wrapped_fd(grpc_fd *fd) {
int ret_fd = -1; int ret_fd = -1;
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
if (!fd->orphaned) { if (!fd->orphaned) {
ret_fd = fd->fd; ret_fd = fd->fd;
} }
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
return ret_fd; return ret_fd;
} }
@ -986,7 +995,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
polling_island *unref_pi = NULL; polling_island *unref_pi = NULL;
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
fd->on_done_closure = on_done; fd->on_done_closure = on_done;
/* If release_fd is not NULL, we should be relinquishing control of the file /* If release_fd is not NULL, we should be relinquishing control of the file
@ -1006,25 +1015,25 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
/* Remove the fd from the polling island: /* Remove the fd from the polling island:
- Get a lock on the latest polling island (i.e the last island in the - Get a lock on the latest polling island (i.e the last island in the
linked list pointed by fd->polling_island). This is the island that linked list pointed by fd->po.pi). This is the island that
would actually contain the fd would actually contain the fd
- Remove the fd from the latest polling island - Remove the fd from the latest polling island
- Unlock the latest polling island - Unlock the latest polling island
- Set fd->polling_island to NULL (but remove the ref on the polling island - Set fd->po.pi to NULL (but remove the ref on the polling island
before doing this.) */ before doing this.) */
if (fd->polling_island != NULL) { if (fd->po.pi != NULL) {
polling_island *pi_latest = polling_island_lock(fd->polling_island); polling_island *pi_latest = polling_island_lock(fd->po.pi);
polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error); polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
gpr_mu_unlock(&pi_latest->mu); gpr_mu_unlock(&pi_latest->mu);
unref_pi = fd->polling_island; unref_pi = fd->po.pi;
fd->polling_island = NULL; fd->po.pi = NULL;
} }
grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error), grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error),
NULL); NULL);
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */ UNREF_BY(fd, 2, reason); /* Drop the reference */
if (unref_pi != NULL) { if (unref_pi != NULL) {
/* Unref stale polling island here, outside the fd lock above. /* Unref stale polling island here, outside the fd lock above.
@ -1089,23 +1098,23 @@ static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
grpc_fd *fd) { grpc_fd *fd) {
grpc_pollset *notifier = NULL; grpc_pollset *notifier = NULL;
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
notifier = fd->read_notifier_pollset; notifier = fd->read_notifier_pollset;
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
return notifier; return notifier;
} }
static bool fd_is_shutdown(grpc_fd *fd) { static bool fd_is_shutdown(grpc_fd *fd) {
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
const bool r = fd->shutdown; const bool r = fd->shutdown;
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
return r; return r;
} }
/* Might be called multiple times */ /* Might be called multiple times */
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
/* Do the actual shutdown only once */ /* Do the actual shutdown only once */
if (!fd->shutdown) { if (!fd->shutdown) {
fd->shutdown = true; fd->shutdown = true;
@ -1116,28 +1125,28 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
set_ready_locked(exec_ctx, fd, &fd->read_closure); set_ready_locked(exec_ctx, fd, &fd->read_closure);
set_ready_locked(exec_ctx, fd, &fd->write_closure); set_ready_locked(exec_ctx, fd, &fd->write_closure);
} }
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
} }
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) { grpc_closure *closure) {
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
notify_on_locked(exec_ctx, fd, &fd->read_closure, closure); notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
} }
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) { grpc_closure *closure) {
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
notify_on_locked(exec_ctx, fd, &fd->write_closure, closure); notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
} }
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
grpc_workqueue *workqueue = GRPC_WORKQUEUE_REF( grpc_workqueue *workqueue =
(grpc_workqueue *)fd->polling_island, "fd_get_workqueue"); GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
return workqueue; return workqueue;
} }
@ -1277,8 +1286,12 @@ static grpc_error *kick_poller(void) {
} }
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
gpr_mu_init(&pollset->mu); gpr_mu_init(&pollset->po.mu);
*mu = &pollset->mu; *mu = &pollset->po.mu;
pollset->po.pi = NULL;
#ifdef PO_DEBUG
pollset->po.obj_type = POLL_OBJ_POLLSET;
#endif
pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
pollset->kicked_without_pollers = false; pollset->kicked_without_pollers = false;
@ -1286,8 +1299,6 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
pollset->shutting_down = false; pollset->shutting_down = false;
pollset->finish_shutdown_called = false; pollset->finish_shutdown_called = false;
pollset->shutdown_done = NULL; pollset->shutdown_done = NULL;
pollset->polling_island = NULL;
} }
/* Convert a timespec to milliseconds: /* Convert a timespec to milliseconds:
@ -1317,26 +1328,26 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) { grpc_pollset *notifier) {
/* Need the fd->mu since we might be racing with fd_notify_on_read */ /* Need the fd->po.mu since we might be racing with fd_notify_on_read */
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
set_ready_locked(exec_ctx, fd, &fd->read_closure); set_ready_locked(exec_ctx, fd, &fd->read_closure);
fd->read_notifier_pollset = notifier; fd->read_notifier_pollset = notifier;
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
} }
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
/* Need the fd->mu since we might be racing with fd_notify_on_write */ /* Need the fd->po.mu since we might be racing with fd_notify_on_write */
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
set_ready_locked(exec_ctx, fd, &fd->write_closure); set_ready_locked(exec_ctx, fd, &fd->write_closure);
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
} }
static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx, static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
grpc_pollset *ps, char *reason) { grpc_pollset *ps, char *reason) {
if (ps->polling_island != NULL) { if (ps->po.pi != NULL) {
PI_UNREF(exec_ctx, ps->polling_island, reason); PI_UNREF(exec_ctx, ps->po.pi, reason);
} }
ps->polling_island = NULL; ps->po.pi = NULL;
} }
static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
@ -1346,12 +1357,12 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
pollset->finish_shutdown_called = true; pollset->finish_shutdown_called = true;
/* Release the ref and set pollset->polling_island to NULL */ /* Release the ref and set pollset->po.pi to NULL */
pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown"); pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
} }
/* pollset->mu lock must be held by the caller before calling this */ /* pollset->po.mu lock must be held by the caller before calling this */
static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) { grpc_closure *closure) {
GPR_TIMER_BEGIN("pollset_shutdown", 0); GPR_TIMER_BEGIN("pollset_shutdown", 0);
@ -1376,7 +1387,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
* here */ * here */
static void pollset_destroy(grpc_pollset *pollset) { static void pollset_destroy(grpc_pollset *pollset) {
GPR_ASSERT(!pollset_has_workers(pollset)); GPR_ASSERT(!pollset_has_workers(pollset));
gpr_mu_destroy(&pollset->mu); gpr_mu_destroy(&pollset->po.mu);
} }
static void pollset_reset(grpc_pollset *pollset) { static void pollset_reset(grpc_pollset *pollset) {
@ -1386,7 +1397,7 @@ static void pollset_reset(grpc_pollset *pollset) {
pollset->finish_shutdown_called = false; pollset->finish_shutdown_called = false;
pollset->kicked_without_pollers = false; pollset->kicked_without_pollers = false;
pollset->shutdown_done = NULL; pollset->shutdown_done = NULL;
GPR_ASSERT(pollset->polling_island == NULL); GPR_ASSERT(pollset->po.pi == NULL);
} }
static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
@ -1426,7 +1437,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN("pollset_work_and_unlock", 0); GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
/* We need to get the epoll_fd to wait on. The epoll_fd is in inside the /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
latest polling island pointed by pollset->polling_island. latest polling island pointed by pollset->po.pi
Since epoll_fd is immutable, we can read it without obtaining the polling Since epoll_fd is immutable, we can read it without obtaining the polling
island lock. There is however a possibility that the polling island (from island lock. There is however a possibility that the polling island (from
@ -1435,36 +1446,36 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
right-away from epoll_wait() and pick up the latest polling_island the next right-away from epoll_wait() and pick up the latest polling_island the next
this function (i.e pollset_work_and_unlock()) is called */ this function (i.e pollset_work_and_unlock()) is called */
if (pollset->polling_island == NULL) { if (pollset->po.pi == NULL) {
pollset->polling_island = polling_island_create(exec_ctx, NULL, error); pollset->po.pi = polling_island_create(exec_ctx, NULL, error);
if (pollset->polling_island == NULL) { if (pollset->po.pi == NULL) {
GPR_TIMER_END("pollset_work_and_unlock", 0); GPR_TIMER_END("pollset_work_and_unlock", 0);
return; /* Fatal error. We cannot continue */ return; /* Fatal error. We cannot continue */
} }
PI_ADD_REF(pollset->polling_island, "ps"); PI_ADD_REF(pollset->po.pi, "ps");
GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p", GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
(void *)pollset, (void *)pollset->polling_island); (void *)pollset, (void *)pollset->po.pi);
} }
pi = polling_island_maybe_get_latest(pollset->polling_island); pi = polling_island_maybe_get_latest(pollset->po.pi);
epoll_fd = pi->epoll_fd; epoll_fd = pi->epoll_fd;
/* Update the pollset->polling_island since the island being pointed by /* Update the pollset->po.pi since the island being pointed by
pollset->polling_island maybe older than the one pointed by pi) */ pollset->po.pi maybe older than the one pointed by pi) */
if (pollset->polling_island != pi) { if (pollset->po.pi != pi) {
/* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
polling island to be deleted */ polling island to be deleted */
PI_ADD_REF(pi, "ps"); PI_ADD_REF(pi, "ps");
PI_UNREF(exec_ctx, pollset->polling_island, "ps"); PI_UNREF(exec_ctx, pollset->po.pi, "ps");
pollset->polling_island = pi; pollset->po.pi = pi;
} }
/* Add an extra ref so that the island does not get destroyed (which means /* Add an extra ref so that the island does not get destroyed (which means
the epoll_fd won't be closed) while we are are doing an epoll_wait() on the the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
epoll_fd */ epoll_fd */
PI_ADD_REF(pi, "ps_work"); PI_ADD_REF(pi, "ps_work");
gpr_mu_unlock(&pollset->mu); gpr_mu_unlock(&pollset->po.mu);
/* If we get some workqueue work to do, it might end up completing an item on /* If we get some workqueue work to do, it might end up completing an item on
the completion queue, so there's no need to poll... so we skip that and the completion queue, so there's no need to poll... so we skip that and
@ -1537,17 +1548,17 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(pi != NULL); GPR_ASSERT(pi != NULL);
/* Before leaving, release the extra ref we added to the polling island. It /* Before leaving, release the extra ref we added to the polling island. It
is important to use "pi" here (i.e our old copy of pollset->polling_island is important to use "pi" here (i.e our old copy of pollset->po.pi
that we got before releasing the polling island lock). This is because that we got before releasing the polling island lock). This is because
pollset->polling_island pointer might get udpated in other parts of the pollset->po.pi pointer might get udpated in other parts of the
code when there is an island merge while we are doing epoll_wait() above */ code when there is an island merge while we are doing epoll_wait() above */
PI_UNREF(exec_ctx, pi, "ps_work"); PI_UNREF(exec_ctx, pi, "ps_work");
GPR_TIMER_END("pollset_work_and_unlock", 0); GPR_TIMER_END("pollset_work_and_unlock", 0);
} }
/* pollset->mu lock must be held by the caller before calling this. /* pollset->po.mu lock must be held by the caller before calling this.
The function pollset_work() may temporarily release the lock (pollset->mu) The function pollset_work() may temporarily release the lock (pollset->po.mu)
during the course of its execution but it will always re-acquire the lock and during the course of its execution but it will always re-acquire the lock and
ensure that it is held by the time the function returns */ ensure that it is held by the time the function returns */
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@ -1617,7 +1628,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
&g_orig_sigmask, &error); &g_orig_sigmask, &error);
grpc_exec_ctx_flush(exec_ctx); grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu); gpr_mu_lock(&pollset->po.mu);
/* Note: There is no need to reset worker.is_kicked to 0 since we are no /* Note: There is no need to reset worker.is_kicked to 0 since we are no
longer going to use this worker */ longer going to use this worker */
@ -1637,9 +1648,9 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0); GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
finish_shutdown_locked(exec_ctx, pollset); finish_shutdown_locked(exec_ctx, pollset);
gpr_mu_unlock(&pollset->mu); gpr_mu_unlock(&pollset->po.mu);
grpc_exec_ctx_flush(exec_ctx); grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu); gpr_mu_lock(&pollset->po.mu);
} }
*worker_hdl = NULL; *worker_hdl = NULL;
@ -1653,130 +1664,160 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
return error; return error;
} }
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag,
grpc_fd *fd) { poll_obj_type bag_type, poll_obj *item,
GPR_TIMER_BEGIN("pollset_add_fd", 0); poll_obj_type item_type) {
GPR_TIMER_BEGIN("add_poll_object", 0);
grpc_error *error = GRPC_ERROR_NONE;
gpr_mu_lock(&pollset->mu); #ifdef PO_DEBUG
gpr_mu_lock(&fd->mu); GPR_ASSERT(item->obj_type == item_type);
GPR_ASSERT(bag->obj_type == bag_type);
#endif
grpc_error *error = GRPC_ERROR_NONE;
polling_island *pi_new = NULL; polling_island *pi_new = NULL;
gpr_mu_lock(&bag->mu);
gpr_mu_lock(&item->mu);
retry: retry:
/* 1) If fd->polling_island and pollset->polling_island are both non-NULL and /*
* equal, do nothing. * 1) If item->pi and bag->pi are both non-NULL and equal, do nothing
* 2) If fd->polling_island and pollset->polling_island are both NULL, create * 2) If item->pi and bag->pi are both NULL, create a new polling island (with
* a new polling island (with a refcount of 2) and make the polling_island * a refcount of 2) and point item->pi and bag->pi to the new island
* fields in both fd and pollset to point to the new island * 3) If exactly one of item->pi or bag->pi is NULL, update it to point to
* 3) If one of fd->polling_island or pollset->polling_island is NULL, update * the other's non-NULL pi
* the NULL polling_island field to point to the non-NULL polling_island * 4) Finally if item->pi and bag-pi are non-NULL and not-equal, merge the
* field (ensure that the refcount on the polling island is incremented by * polling islands and update item->pi and bag->pi to point to the new
* 1 to account for the newly added reference) * island
* 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
* and different, merge both the polling islands and update the
* polling_island fields in both fd and pollset to point to the merged
* polling island.
*/ */
if (fd->orphaned) { /* Early out if we are trying to add an 'fd' to a 'bag' but the fd is already
gpr_mu_unlock(&fd->mu); * orphaned */
gpr_mu_unlock(&pollset->mu); if (item_type == POLL_OBJ_FD && (FD_FROM_PO(item))->orphaned) {
/* early out */ gpr_mu_unlock(&item->mu);
gpr_mu_unlock(&bag->mu);
return; return;
} }
if (fd->polling_island == pollset->polling_island) { if (item->pi == bag->pi) {
pi_new = fd->polling_island; pi_new = item->pi;
if (pi_new == NULL) { if (pi_new == NULL) {
/* Unlock before creating a new polling island: the polling island will /* GPR_ASSERT(item->pi == bag->pi == NULL) */
create a workqueue which creates a file descriptor, and holding an fd
lock here can eventually cause a loop to appear to TSAN (making it /* If we are adding an fd to a bag (i.e pollset or pollset_set), then
unhappy). We don't think it's a real loop (there's an epoch point where * we need to do some extra work to make TSAN happy */
that loop possibility disappears), but the advantages of keeping TSAN if (item_type == POLL_OBJ_FD) {
happy outweigh any performance advantage we might have by keeping the /* Unlock before creating a new polling island: the polling island will
lock held. */ create a workqueue which creates a file descriptor, and holding an fd
gpr_mu_unlock(&fd->mu); lock here can eventually cause a loop to appear to TSAN (making it
pi_new = polling_island_create(exec_ctx, fd, &error); unhappy). We don't think it's a real loop (there's an epoch point
gpr_mu_lock(&fd->mu); where that loop possibility disappears), but the advantages of
/* Need to reverify any assumptions made between the initial lock and keeping TSAN happy outweigh any performance advantage we might have
getting to this branch: if they've changed, we need to throw away our by keeping the lock held. */
work and figure things out again. */ gpr_mu_unlock(&item->mu);
if (fd->polling_island != NULL) { pi_new = polling_island_create(exec_ctx, FD_FROM_PO(item), &error);
GRPC_POLLING_TRACE( gpr_mu_lock(&item->mu);
"pollset_add_fd: Raced creating new polling island. pi_new: %p "
"(fd: %d, pollset: %p)", /* Need to reverify any assumptions made between the initial lock and
(void *)pi_new, fd->fd, (void *)pollset); getting to this branch: if they've changed, we need to throw away our
work and figure things out again. */
/* No need to lock 'pi_new' here since this is a new polling island and if (item->pi != NULL) {
* no one has a reference to it yet */ GRPC_POLLING_TRACE(
polling_island_remove_all_fds_locked(pi_new, true, &error); "add_poll_object: Raced creating new polling island. pi_new: %p "
"(fd: %d, %s: %p)",
/* Ref and unref so that the polling island gets deleted during unref */ (void *)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type),
PI_ADD_REF(pi_new, "dance_of_destruction"); (void *)bag);
PI_UNREF(exec_ctx, pi_new, "dance_of_destruction"); /* No need to lock 'pi_new' here since this is a new polling island
goto retry; * and no one has a reference to it yet */
polling_island_remove_all_fds_locked(pi_new, true, &error);
/* Ref and unref so that the polling island gets deleted during unref
*/
PI_ADD_REF(pi_new, "dance_of_destruction");
PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
goto retry;
}
} else { } else {
GRPC_POLLING_TRACE( pi_new = polling_island_create(exec_ctx, NULL, &error);
"pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
"pollset: %p)",
(void *)pi_new, fd->fd, (void *)pollset);
} }
GRPC_POLLING_TRACE(
"add_poll_object: Created new polling island. pi_new: %p (%s: %p, "
"%s: %p)",
(void *)pi_new, poll_obj_string(item_type), (void *)item,
poll_obj_string(bag_type), (void *)bag);
} else {
GRPC_POLLING_TRACE(
"add_poll_object: Same polling island. pi: %p (%s, %s)",
(void *)pi_new, poll_obj_string(item_type),
poll_obj_string(bag_type));
}
} else if (item->pi == NULL) {
/* GPR_ASSERT(bag->pi != NULL) */
/* Make pi_new point to latest pi*/
pi_new = polling_island_lock(bag->pi);
if (item_type == POLL_OBJ_FD) {
grpc_fd *fd = FD_FROM_PO(item);
polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
} }
} else if (fd->polling_island == NULL) {
pi_new = polling_island_lock(pollset->polling_island);
polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
gpr_mu_unlock(&pi_new->mu);
gpr_mu_unlock(&pi_new->mu);
GRPC_POLLING_TRACE( GRPC_POLLING_TRACE(
"pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, " "add_poll_obj: item->pi was NULL. pi_new: %p (item(%s): %p, "
"pollset->pi: %p)", "bag(%s): %p)",
(void *)pi_new, fd->fd, (void *)pollset, (void *)pi_new, poll_obj_string(item_type), (void *)item,
(void *)pollset->polling_island); poll_obj_string(bag_type), (void *)bag);
} else if (pollset->polling_island == NULL) { } else if (bag->pi == NULL) {
pi_new = polling_island_lock(fd->polling_island); /* GPR_ASSERT(item->pi != NULL) */
/* Make pi_new to point to latest pi */
pi_new = polling_island_lock(item->pi);
gpr_mu_unlock(&pi_new->mu); gpr_mu_unlock(&pi_new->mu);
GRPC_POLLING_TRACE( GRPC_POLLING_TRACE(
"pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: " "add_poll_obj: bag->pi was NULL. pi_new: %p (item(%s): %p, "
"%p, fd->pi: %p", "bag(%s): %p)",
(void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island); (void *)pi_new, poll_obj_string(item_type), (void *)item,
poll_obj_string(bag_type), (void *)bag);
} else { } else {
pi_new = polling_island_merge(fd->polling_island, pollset->polling_island, pi_new = polling_island_merge(item->pi, bag->pi, &error);
&error);
GRPC_POLLING_TRACE( GRPC_POLLING_TRACE(
"pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: " "add_poll_obj: polling islands merged. pi_new: %p (item(%s): %p, "
"%p, fd->pi: %p, pollset->pi: %p)", "bag(%s): %p)",
(void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island, (void *)pi_new, poll_obj_string(item_type), (void *)item,
(void *)pollset->polling_island); poll_obj_string(bag_type), (void *)bag);
} }
/* At this point, pi_new is the polling island that both fd->polling_island /* At this point, pi_new is the polling island that both item->pi and bag->pi
and pollset->polling_island must be pointing to */ MUST be pointing to */
if (fd->polling_island != pi_new) { if (item->pi != pi_new) {
PI_ADD_REF(pi_new, "fd"); PI_ADD_REF(pi_new, poll_obj_string(item_type));
if (fd->polling_island != NULL) { if (item->pi != NULL) {
PI_UNREF(exec_ctx, fd->polling_island, "fd"); PI_UNREF(exec_ctx, item->pi, poll_obj_string(item_type));
} }
fd->polling_island = pi_new; item->pi = pi_new;
} }
if (pollset->polling_island != pi_new) { if (bag->pi != pi_new) {
PI_ADD_REF(pi_new, "ps"); PI_ADD_REF(pi_new, poll_obj_string(bag_type));
if (pollset->polling_island != NULL) { if (bag->pi != NULL) {
PI_UNREF(exec_ctx, pollset->polling_island, "ps"); PI_UNREF(exec_ctx, bag->pi, poll_obj_string(bag_type));
} }
pollset->polling_island = pi_new; bag->pi = pi_new;
} }
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&item->mu);
gpr_mu_unlock(&pollset->mu); gpr_mu_unlock(&bag->mu);
GRPC_LOG_IF_ERROR("pollset_add_fd", error); GRPC_LOG_IF_ERROR("add_poll_object", error);
GPR_TIMER_END("add_poll_object", 0);
}
GPR_TIMER_END("pollset_add_fd", 0); static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd *fd) {
add_poll_object(exec_ctx, &pollset->po, POLL_OBJ_POLLSET, &fd->po,
POLL_OBJ_FD);
} }
/******************************************************************************* /*******************************************************************************
@ -1784,142 +1825,60 @@ retry:
*/ */
static grpc_pollset_set *pollset_set_create(void) { static grpc_pollset_set *pollset_set_create(void) {
grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set)); grpc_pollset_set *pss = gpr_malloc(sizeof(*pss));
memset(pollset_set, 0, sizeof(*pollset_set)); gpr_mu_init(&pss->po.mu);
gpr_mu_init(&pollset_set->mu); pss->po.pi = NULL;
return pollset_set; #ifdef PO_DEBUG
pss->po.obj_type = POLL_OBJ_POLLSET_SET;
#endif
return pss;
} }
static void pollset_set_destroy(grpc_pollset_set *pollset_set) { static void pollset_set_destroy(grpc_pollset_set *pss) {
size_t i; gpr_mu_destroy(&pss->po.mu);
gpr_mu_destroy(&pollset_set->mu);
for (i = 0; i < pollset_set->fd_count; i++) { if (pss->po.pi != NULL) {
GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
PI_UNREF(&exec_ctx, pss->po.pi, "pss_destroy");
grpc_exec_ctx_finish(&exec_ctx);
} }
gpr_free(pollset_set->pollsets);
gpr_free(pollset_set->pollset_sets); gpr_free(pss);
gpr_free(pollset_set->fds);
gpr_free(pollset_set);
} }
static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
grpc_pollset_set *pollset_set, grpc_fd *fd) { grpc_fd *fd) {
size_t i; add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &fd->po,
gpr_mu_lock(&pollset_set->mu); POLL_OBJ_FD);
if (pollset_set->fd_count == pollset_set->fd_capacity) {
pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
pollset_set->fds = gpr_realloc(
pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
}
GRPC_FD_REF(fd, "pollset_set");
pollset_set->fds[pollset_set->fd_count++] = fd;
for (i = 0; i < pollset_set->pollset_count; i++) {
pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
}
for (i = 0; i < pollset_set->pollset_set_count; i++) {
pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
}
gpr_mu_unlock(&pollset_set->mu);
} }
static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
grpc_pollset_set *pollset_set, grpc_fd *fd) { grpc_fd *fd) {
size_t i; /* Nothing to do */
gpr_mu_lock(&pollset_set->mu);
for (i = 0; i < pollset_set->fd_count; i++) {
if (pollset_set->fds[i] == fd) {
pollset_set->fd_count--;
GPR_SWAP(grpc_fd *, pollset_set->fds[i],
pollset_set->fds[pollset_set->fd_count]);
GRPC_FD_UNREF(fd, "pollset_set");
break;
}
}
for (i = 0; i < pollset_set->pollset_set_count; i++) {
pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
}
gpr_mu_unlock(&pollset_set->mu);
} }
static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set, grpc_pollset_set *pss, grpc_pollset *ps) {
grpc_pollset *pollset) { add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &ps->po,
size_t i, j; POLL_OBJ_POLLSET);
gpr_mu_lock(&pollset_set->mu);
if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
pollset_set->pollset_capacity =
GPR_MAX(8, 2 * pollset_set->pollset_capacity);
pollset_set->pollsets =
gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
sizeof(*pollset_set->pollsets));
}
pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
if (fd_is_orphaned(pollset_set->fds[i])) {
GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
} else {
pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
pollset_set->fds[j++] = pollset_set->fds[i];
}
}
pollset_set->fd_count = j;
gpr_mu_unlock(&pollset_set->mu);
} }
static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set, grpc_pollset_set *pss, grpc_pollset *ps) {
grpc_pollset *pollset) { /* Nothing to do */
size_t i;
gpr_mu_lock(&pollset_set->mu);
for (i = 0; i < pollset_set->pollset_count; i++) {
if (pollset_set->pollsets[i] == pollset) {
pollset_set->pollset_count--;
GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
pollset_set->pollsets[pollset_set->pollset_count]);
break;
}
}
gpr_mu_unlock(&pollset_set->mu);
} }
static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *bag, grpc_pollset_set *bag,
grpc_pollset_set *item) { grpc_pollset_set *item) {
size_t i, j; add_poll_object(exec_ctx, &bag->po, POLL_OBJ_POLLSET_SET, &item->po,
gpr_mu_lock(&bag->mu); POLL_OBJ_POLLSET_SET);
if (bag->pollset_set_count == bag->pollset_set_capacity) {
bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
bag->pollset_sets =
gpr_realloc(bag->pollset_sets,
bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
}
bag->pollset_sets[bag->pollset_set_count++] = item;
for (i = 0, j = 0; i < bag->fd_count; i++) {
if (fd_is_orphaned(bag->fds[i])) {
GRPC_FD_UNREF(bag->fds[i], "pollset_set");
} else {
pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
bag->fds[j++] = bag->fds[i];
}
}
bag->fd_count = j;
gpr_mu_unlock(&bag->mu);
} }
static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *bag, grpc_pollset_set *bag,
grpc_pollset_set *item) { grpc_pollset_set *item) {
size_t i; /* Nothing to do */
gpr_mu_lock(&bag->mu);
for (i = 0; i < bag->pollset_set_count; i++) {
if (bag->pollset_sets[i] == item) {
bag->pollset_set_count--;
GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
bag->pollset_sets[bag->pollset_set_count]);
break;
}
}
gpr_mu_unlock(&bag->mu);
} }
/* Test helper functions /* Test helper functions
@ -1927,9 +1886,9 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
void *grpc_fd_get_polling_island(grpc_fd *fd) { void *grpc_fd_get_polling_island(grpc_fd *fd) {
polling_island *pi; polling_island *pi;
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->po.mu);
pi = fd->polling_island; pi = fd->po.pi;
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->po.mu);
return pi; return pi;
} }
@ -1937,9 +1896,9 @@ void *grpc_fd_get_polling_island(grpc_fd *fd) {
void *grpc_pollset_get_polling_island(grpc_pollset *ps) { void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
polling_island *pi; polling_island *pi;
gpr_mu_lock(&ps->mu); gpr_mu_lock(&ps->po.mu);
pi = ps->polling_island; pi = ps->po.pi;
gpr_mu_unlock(&ps->mu); gpr_mu_unlock(&ps->po.mu);
return pi; return pi;
} }

Loading…
Cancel
Save