More scalable unary polling

- admit only one poller for read and one for write at a time
  (poll is level triggered, so this avoids a thundering herd on each event)
- wake only one poller when more pollers are needed, again avoiding a thundering herd
pull/1608/head
Craig Tiller 10 years ago
parent b5b4d16866
commit 886d7ecb7a
  1. 75
      src/core/iomgr/fd_posix.c
  2. 4
      src/core/iomgr/fd_posix.h
  3. 2
      src/core/iomgr/pollset_multipoller_with_poll_posix.c
  4. 4
      src/core/iomgr/pollset_posix.c

@ -98,6 +98,7 @@ static grpc_fd *alloc_fd(int fd) {
r->fd = fd;
r->watcher_root.next = r->watcher_root.prev = &r->watcher_root;
r->freelist_next = NULL;
r->read_watcher = r->write_watcher = NULL;
return r;
}
@ -147,14 +148,24 @@ int grpc_fd_is_orphaned(grpc_fd *fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
static void wake_watchers(grpc_fd *fd) {
grpc_fd_watcher *watcher;
static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
if (fd->watcher_root.next != &fd->watcher_root) {
grpc_pollset_force_kick(fd->watcher_root.next->pollset);
}
}
static void maybe_wake_one_watcher(grpc_fd *fd) {
gpr_mu_lock(&fd->watcher_mu);
maybe_wake_one_watcher_locked(fd);
gpr_mu_unlock(&fd->watcher_mu);
}
static void wake_all_watchers(grpc_fd *fd) {
grpc_fd_watcher *watcher;
for (watcher = fd->watcher_root.next; watcher != &fd->watcher_root;
watcher = watcher->next) {
grpc_pollset_force_kick(watcher->pollset);
}
gpr_mu_unlock(&fd->watcher_mu);
}
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
@ -162,7 +173,7 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
fd->on_done_user_data = user_data;
shutdown(fd->fd, SHUT_RDWR);
ref_by(fd, 1); /* remove active status, but keep referenced */
wake_watchers(fd);
wake_all_watchers(fd);
unref_by(fd, 2); /* drop the reference */
}
@ -204,7 +215,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
set_ready call. NOTE: we don't have an ABA problem here,
since we should never have concurrent calls to the same
notify_on function. */
wake_watchers(fd);
maybe_wake_one_watcher(fd);
return;
}
/* swap was unsuccessful due to an intervening set_ready call.
@ -290,29 +301,61 @@ void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure) {
gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
gpr_uint32 read_mask, gpr_uint32 write_mask,
grpc_fd_watcher *watcher) {
gpr_uint32 mask = 0;
/* keep track of pollers that have requested our events, in case they change
*/
grpc_fd_ref(fd);
gpr_mu_lock(&fd->watcher_mu);
watcher->next = &fd->watcher_root;
watcher->prev = watcher->next->prev;
watcher->next->prev = watcher->prev->next = watcher;
/* if there is nobody polling for read, but we need to, then start doing so */
if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
fd->read_watcher = watcher;
mask |= read_mask;
}
/* if there is nobody polling for write, but we need to, then start doing so */
if (!fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) {
fd->write_watcher = watcher;
mask |= write_mask;
}
/* if not polling, remember this watcher in case we need someone to later */
if (mask == 0) {
watcher->next = &fd->watcher_root;
watcher->prev = watcher->next->prev;
watcher->next->prev = watcher->prev->next = watcher;
}
watcher->pollset = pollset;
watcher->fd = fd;
gpr_mu_unlock(&fd->watcher_mu);
return (gpr_atm_acq_load(&fd->readst) != READY ? read_mask : 0) |
(gpr_atm_acq_load(&fd->writest) != READY ? write_mask : 0);
return mask;
}
void grpc_fd_end_poll(grpc_fd_watcher *watcher) {
gpr_mu_lock(&watcher->fd->watcher_mu);
watcher->next->prev = watcher->prev;
watcher->prev->next = watcher->next;
gpr_mu_unlock(&watcher->fd->watcher_mu);
void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
int was_polling = 0;
int kick = 0;
grpc_fd *fd = watcher->fd;
gpr_mu_lock(&fd->watcher_mu);
if (watcher == fd->read_watcher) {
was_polling = 1;
kick |= !got_read;
fd->read_watcher = NULL;
}
if (watcher == fd->write_watcher) {
was_polling = 1;
kick |= !got_write;
fd->write_watcher = NULL;
}
if (!was_polling) {
watcher->next->prev = watcher->prev;
watcher->prev->next = watcher->next;
}
if (kick) {
maybe_wake_one_watcher_locked(fd);
}
gpr_mu_unlock(&fd->watcher_mu);
grpc_fd_unref(watcher->fd);
grpc_fd_unref(fd);
}
void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {

@ -68,6 +68,8 @@ struct grpc_fd {
gpr_mu watcher_mu;
grpc_fd_watcher watcher_root;
grpc_fd_watcher *read_watcher;
grpc_fd_watcher *write_watcher;
gpr_atm readst;
gpr_atm writest;
@ -103,7 +105,7 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
gpr_uint32 read_mask, gpr_uint32 write_mask,
grpc_fd_watcher *rec);
/* Complete polling previously started with grpc_fd_begin_poll */
void grpc_fd_end_poll(grpc_fd_watcher *rec);
void grpc_fd_end_poll(grpc_fd_watcher *rec, int got_read, int got_write);
/* Return 1 if this fd is orphaned, 0 otherwise */
int grpc_fd_is_orphaned(grpc_fd *fd);

@ -98,7 +98,7 @@ static void end_polling(grpc_pollset *pollset) {
pollset_hdr *h;
h = pollset->data.ptr;
for (i = 1; i < h->pfd_count; i++) {
grpc_fd_end_poll(&h->watchers[i]);
grpc_fd_end_poll(&h->watchers[i], h->pfds[i].revents & POLLIN, h->pfds[i].revents & POLLOUT);
}
}

@ -410,10 +410,10 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset,
pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher);
r = poll(pfd, GPR_ARRAY_SIZE(pfd), timeout);
r = poll(pfd, GPR_ARRAY_SIZE(pfd) - (pfd[1].events == 0), timeout);
GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
grpc_fd_end_poll(&fd_watcher);
grpc_fd_end_poll(&fd_watcher, pfd[1].revents & POLLIN, pfd[1].revents & POLLOUT);
if (r < 0) {
if (errno != EINTR) {

Loading…
Cancel
Save