Stabilize mac build

pull/3610/head
Craig Tiller 10 years ago
parent dc17471545
commit 58d05a63df
  1. 241
      src/core/iomgr/fd_posix.c
  2. 14
      src/core/iomgr/fd_posix.h
  3. 30
      src/core/iomgr/pollset_multipoller_with_poll_posix.c
  4. 32
      src/core/iomgr/pollset_posix.c

@ -44,11 +44,10 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
#include <sys/syscall.h>
enum descriptor_state {
NOT_READY = 0,
READY = 1
}; /* or a pointer to a closure to call */
#define CLOSURE_NOT_READY ((grpc_closure*)0)
#define CLOSURE_READY ((grpc_closure*)1)
/* We need to keep a freelist not because of any concerns of malloc performance
* but instead so that implementations with multiple threads in (for example)
@ -70,6 +69,8 @@ enum descriptor_state {
static grpc_fd *fd_freelist = NULL;
static gpr_mu fd_freelist_mu;
static long gettid(void) { return syscall(__NR_gettid); }
static void freelist_fd(grpc_fd *fd) {
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
@ -88,14 +89,15 @@ static grpc_fd *alloc_fd(int fd) {
gpr_mu_unlock(&fd_freelist_mu);
if (r == NULL) {
r = gpr_malloc(sizeof(grpc_fd));
gpr_mu_init(&r->set_state_mu);
gpr_mu_init(&r->watcher_mu);
gpr_mu_init(&r->mu);
r->cap_ev = 0;
r->ev = NULL;
}
gpr_atm_rel_store(&r->refst, 1);
gpr_atm_rel_store(&r->readst, NOT_READY);
gpr_atm_rel_store(&r->writest, NOT_READY);
gpr_atm_rel_store(&r->shutdown, 0);
r->shutdown = 0;
r->read_closure = CLOSURE_NOT_READY;
r->write_closure = CLOSURE_NOT_READY;
r->fd = fd;
r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
&r->inactive_watcher_root;
@ -103,12 +105,13 @@ static grpc_fd *alloc_fd(int fd) {
r->read_watcher = r->write_watcher = NULL;
r->on_done_closure = NULL;
r->closed = 0;
r->num_ev = 0;
return r;
}
static void destroy(grpc_fd *fd) {
gpr_mu_destroy(&fd->set_state_mu);
gpr_mu_destroy(&fd->watcher_mu);
gpr_mu_destroy(&fd->mu);
gpr_free(fd->ev);
gpr_free(fd);
}
@ -169,43 +172,70 @@ grpc_fd *grpc_fd_create(int fd, const char *name) {
return r;
}
static int count_inactive(grpc_fd *fd) {
int n = 0;
grpc_fd_watcher *w;
for (w = fd->inactive_watcher_root.next; w != &fd->inactive_watcher_root; w = w->next) {
n++;
}
return n;
}
static void fdev_add(fd_event_type type, grpc_fd *fd, grpc_pollset *pollset, grpc_pollset_worker *pollset_worker, grpc_fd_watcher *fd_watcher) {
fd_event *ev;
if (fd->num_ev == fd->cap_ev) {
fd->cap_ev = GPR_MAX(2 * fd->cap_ev, 32);
fd->ev = gpr_realloc(fd->ev, sizeof(*fd->ev) * fd->cap_ev);
}
ev = &fd->ev[fd->num_ev++];
ev->thread = gettid();
ev->type = type;
ev->pollset = pollset;
ev->pollset_worker = pollset_worker;
ev->watcher = fd_watcher;
ev->shutdown = fd->shutdown;
ev->closed = fd->closed;
ev->read_closure = fd->read_closure;
ev->write_closure = fd->write_closure;
ev->read_watcher = fd->read_watcher;
ev->write_watcher = fd->write_watcher;
ev->num_inactive = count_inactive(fd);
}
int grpc_fd_is_orphaned(grpc_fd *fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
static void pollset_kick_locked(grpc_fd_watcher *watcher) {
static void pollset_kick_locked(grpc_fd_watcher *watcher, fd_event_type type) {
fdev_add(type, watcher->fd, watcher->pollset, watcher->worker, watcher);
gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset));
GPR_ASSERT(watcher->worker);
grpc_pollset_kick_ex(watcher->pollset, watcher->worker, GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset));
fdev_add(type + 1, watcher->fd, watcher->pollset, watcher->worker, watcher);
}
static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
pollset_kick_locked(fd->inactive_watcher_root.next);
pollset_kick_locked(fd->inactive_watcher_root.next, FDEV_KICK_INACTIVE);
} else if (fd->read_watcher) {
pollset_kick_locked(fd->read_watcher);
pollset_kick_locked(fd->read_watcher, FDEV_KICK_READER);
} else if (fd->write_watcher) {
pollset_kick_locked(fd->write_watcher);
pollset_kick_locked(fd->write_watcher, FDEV_KICK_WRITER);
}
}
static void maybe_wake_one_watcher(grpc_fd *fd) {
gpr_mu_lock(&fd->watcher_mu);
maybe_wake_one_watcher_locked(fd);
gpr_mu_unlock(&fd->watcher_mu);
}
static void wake_all_watchers_locked(grpc_fd *fd) {
grpc_fd_watcher *watcher;
for (watcher = fd->inactive_watcher_root.next;
watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
pollset_kick_locked(watcher);
pollset_kick_locked(watcher, FDEV_KICK_INACTIVE);
}
if (fd->read_watcher) {
pollset_kick_locked(fd->read_watcher);
pollset_kick_locked(fd->read_watcher, FDEV_KICK_READER);
}
if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
pollset_kick_locked(fd->write_watcher);
pollset_kick_locked(fd->write_watcher, FDEV_KICK_WRITER);
}
}
@ -218,7 +248,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
const char *reason) {
fd->on_done_closure = on_done;
shutdown(fd->fd, SHUT_RDWR);
gpr_mu_lock(&fd->watcher_mu);
gpr_mu_lock(&fd->mu);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
if (!has_watchers(fd)) {
fd->closed = 1;
@ -227,7 +257,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
} else {
wake_all_watchers_locked(fd);
}
gpr_mu_unlock(&fd->watcher_mu);
gpr_mu_unlock(&fd->mu);
UNREF_BY(fd, 2, reason); /* drop the reference */
}
@ -247,125 +277,107 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
#endif
static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *st,
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st,
grpc_closure *closure) {
switch (gpr_atm_acq_load(st)) {
case NOT_READY:
/* There is no race if the descriptor is already ready, so we skip
the interlocked op in that case. As long as the app doesn't
try to set the same upcall twice (which it shouldn't) then
oldval should never be anything other than READY or NOT_READY. We
don't
check for user error on the fast path. */
if (gpr_atm_rel_cas(st, NOT_READY, (gpr_intptr)closure)) {
/* swap was successful -- the closure will run after the next
set_ready call. NOTE: we don't have an ABA problem here,
since we should never have concurrent calls to the same
notify_on function. */
maybe_wake_one_watcher(fd);
return;
}
/* swap was unsuccessful due to an intervening set_ready call.
Fall through to the READY code below */
case READY:
GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY);
gpr_atm_rel_store(st, NOT_READY);
grpc_exec_ctx_enqueue(exec_ctx, closure,
!gpr_atm_acq_load(&fd->shutdown));
return;
default: /* WAITING */
/* upcallptr was set to a different closure. This is an error! */
gpr_log(GPR_ERROR,
"User called a notify_on function with a previous callback still "
"pending");
abort();
if (*st == CLOSURE_NOT_READY) {
*st = closure;
} else if (*st == CLOSURE_READY) {
*st = CLOSURE_NOT_READY;
grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown);
maybe_wake_one_watcher_locked(fd);
} else {
/* upcallptr was set to a different closure. This is an error! */
gpr_log(GPR_ERROR,
"User called a notify_on function with a previous callback still "
"pending");
abort();
}
gpr_log(GPR_ERROR, "Corrupt memory in &st->state");
abort();
}
static void set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
gpr_atm *st) {
gpr_intptr state = gpr_atm_acq_load(st);
switch (state) {
case READY:
/* duplicate ready, ignore */
return;
case NOT_READY:
if (gpr_atm_rel_cas(st, NOT_READY, READY)) {
/* swap was successful -- the closure will run after the next
notify_on call. */
return;
}
/* swap was unsuccessful due to an intervening set_ready call.
Fall through to the WAITING code below */
state = gpr_atm_acq_load(st);
default: /* waiting */
GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY &&
gpr_atm_no_barrier_load(st) != NOT_READY);
grpc_exec_ctx_enqueue(exec_ctx, (grpc_closure *)state,
!gpr_atm_acq_load(&fd->shutdown));
gpr_atm_rel_store(st, NOT_READY);
return;
/* returns 1 if state becomes not ready */
static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure **st) {
if (*st == CLOSURE_READY) {
/* duplicate ready, ignore */
return 0;
} else if (*st == CLOSURE_NOT_READY) {
*st = CLOSURE_READY;
return 0;
} else {
grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown);
*st = CLOSURE_NOT_READY;
return 1;
}
}
static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *st) {
static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) {
/* only one set_ready can be active at once (but there may be a racing
notify_on) */
gpr_mu_lock(&fd->set_state_mu);
gpr_mu_lock(&fd->mu);
set_ready_locked(exec_ctx, fd, st);
gpr_mu_unlock(&fd->set_state_mu);
gpr_mu_unlock(&fd->mu);
}
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_lock(&fd->set_state_mu);
gpr_mu_lock(&fd->mu);
GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown));
gpr_atm_rel_store(&fd->shutdown, 1);
set_ready_locked(exec_ctx, fd, &fd->readst);
set_ready_locked(exec_ctx, fd, &fd->writest);
gpr_mu_unlock(&fd->set_state_mu);
fd->shutdown = 1;
set_ready_locked(exec_ctx, fd, &fd->read_closure);
set_ready_locked(exec_ctx, fd, &fd->write_closure);
gpr_mu_unlock(&fd->mu);
}
void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
notify_on(exec_ctx, fd, &fd->readst, closure);
gpr_mu_lock(&fd->mu);
fdev_add(FDEV_NOTIFY_ON_READ, fd, NULL, NULL, NULL);
notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
gpr_mu_unlock(&fd->mu);
}
void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
notify_on(exec_ctx, fd, &fd->writest, closure);
gpr_mu_lock(&fd->mu);
fdev_add(FDEV_NOTIFY_ON_WRITE, fd, NULL, NULL, NULL);
notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
gpr_mu_unlock(&fd->mu);
}
gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
grpc_pollset_worker *worker, gpr_uint32 read_mask,
gpr_uint32 write_mask, grpc_fd_watcher *watcher) {
gpr_uint32 mask = 0;
grpc_closure *cur;
int requested;
/* keep track of pollers that have requested our events, in case they change
*/
GRPC_FD_REF(fd, "poll");
gpr_mu_lock(&fd->watcher_mu);
gpr_mu_lock(&fd->mu);
fdev_add(FDEV_BEGIN_POLL, fd, pollset, worker, watcher);
/* if we are shutdown, then don't add to the watcher set */
if (gpr_atm_no_barrier_load(&fd->shutdown)) {
watcher->fd = NULL;
watcher->pollset = NULL;
watcher->worker = NULL;
gpr_mu_unlock(&fd->watcher_mu);
gpr_mu_unlock(&fd->mu);
GRPC_FD_UNREF(fd, "poll");
return 0;
}
/* if there is nobody polling for read, but we need to, then start doing so */
if (read_mask && !fd->read_watcher &&
(gpr_uintptr)gpr_atm_acq_load(&fd->readst) > READY) {
cur = fd->read_closure;
requested = cur != CLOSURE_READY;
if (read_mask && fd->read_watcher == NULL && requested) {
fd->read_watcher = watcher;
mask |= read_mask;
}
/* if there is nobody polling for write, but we need to, then start doing so
*/
if (write_mask && !fd->write_watcher &&
(gpr_uintptr)gpr_atm_acq_load(&fd->writest) > READY) {
cur = fd->write_closure;
requested = cur != CLOSURE_READY;
if (write_mask && fd->write_watcher == NULL && requested) {
fd->write_watcher = watcher;
mask |= write_mask;
}
@ -378,7 +390,7 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
watcher->pollset = pollset;
watcher->worker = worker;
watcher->fd = fd;
gpr_mu_unlock(&fd->watcher_mu);
gpr_mu_unlock(&fd->mu);
return mask;
}
@ -393,17 +405,24 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
return;
}
gpr_mu_lock(&fd->watcher_mu);
gpr_mu_lock(&fd->mu);
fdev_add(FDEV_END_POLL, watcher->fd, watcher->pollset, watcher->worker, watcher);
if (watcher == fd->read_watcher) {
/* remove read watcher, kick if we still need a read */
was_polling = 1;
kick = kick || !got_read;
if (!got_read) {
kick = 1;
}
fd->read_watcher = NULL;
}
if (watcher == fd->write_watcher) {
/* remove write watcher, kick if we still need a write */
was_polling = 1;
kick = kick || !got_write;
if (!got_write) {
kick = 1;
}
fd->write_watcher = NULL;
}
if (!was_polling && watcher->worker != NULL) {
@ -411,6 +430,16 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
watcher->next->prev = watcher->prev;
watcher->prev->next = watcher->next;
}
if (got_read) {
if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
kick = 1;
}
}
if (got_write) {
if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
kick = 1;
}
}
if (kick) {
maybe_wake_one_watcher_locked(fd);
}
@ -419,17 +448,17 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
close(fd->fd);
grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1);
}
gpr_mu_unlock(&fd->watcher_mu);
gpr_mu_unlock(&fd->mu);
GRPC_FD_UNREF(fd, "poll");
}
void grpc_fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
set_ready(exec_ctx, fd, &fd->readst);
set_ready(exec_ctx, fd, &fd->read_closure);
}
void grpc_fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
set_ready(exec_ctx, fd, &fd->writest);
set_ready(exec_ctx, fd, &fd->write_closure);
}
#endif

@ -59,8 +59,8 @@ struct grpc_fd {
and just unref by 1 when we're ready to flag the object as orphaned */
gpr_atm refst;
gpr_mu set_state_mu;
gpr_atm shutdown;
gpr_mu mu;
int shutdown;
int closed;
/* The watcher list.
@ -85,20 +85,22 @@ struct grpc_fd {
If at a later time there becomes need of a poller to poll, one of
the inactive pollers may be kicked out of their poll loops to take
that responsibility. */
gpr_mu watcher_mu;
grpc_fd_watcher inactive_watcher_root;
grpc_fd_watcher *read_watcher;
grpc_fd_watcher *write_watcher;
gpr_atm readst;
gpr_atm writest;
grpc_closure *read_closure;
grpc_closure *write_closure;
struct grpc_fd *freelist_next;
grpc_closure *on_done_closure;
grpc_closure *shutdown_closures[2];
grpc_iomgr_object iomgr_object;
size_t num_ev;
size_t cap_ev;
fd_event *ev;
};
/* Create a wrapped file descriptor.

@ -102,6 +102,9 @@ static void multipoll_with_poll_pollset_del_fd(grpc_exec_ctx *exec_ctx,
static void multipoll_with_poll_pollset_maybe_work_and_unlock(
grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline, gpr_timespec now) {
#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
int timeout;
int r;
size_t i, j, fd_count;
@ -157,34 +160,29 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
r = grpc_poll_function(pfds, pfd_count, timeout);
GRPC_SCHEDULING_END_BLOCKING_REGION;
for (i = 2; i < pfd_count; i++) {
grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN,
pfds[i].revents & POLLOUT);
}
if (r < 0) {
if (errno != EINTR) {
gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
for (i = 2; i < pfd_count; i++) {
grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0);
}
} else if (r == 0) {
/* do nothing */
for (i = 2; i < pfd_count; i++) {
grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0);
}
} else {
if (pfds[0].revents & POLLIN) {
if (pfds[0].revents & POLLIN_CHECK) {
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
}
if (pfds[1].revents & POLLIN) {
if (pfds[1].revents & POLLIN_CHECK) {
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
for (i = 2; i < pfd_count; i++) {
if (watchers[i].fd == NULL) {
grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0);
continue;
}
if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
grpc_fd_become_readable(exec_ctx, watchers[i].fd);
}
if (pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) {
grpc_fd_become_writable(exec_ctx, watchers[i].fd);
}
grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
pfds[i].revents & POLLOUT_CHECK);
}
}

@ -274,6 +274,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
if (worker->reevaluate_polling_on_wakeup) {
worker->reevaluate_polling_on_wakeup = 0;
pollset->kicked_without_pollers = 0;
if (queued_work) {
deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
}
@ -497,6 +498,9 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_pollset_worker *worker,
gpr_timespec deadline,
gpr_timespec now) {
#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
struct pollfd pfd[3];
grpc_fd *fd;
grpc_fd_watcher fd_watcher;
@ -539,31 +543,27 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
GRPC_SCHEDULING_END_BLOCKING_REGION;
GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
if (fd) {
grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN,
pfd[2].revents & POLLOUT);
}
if (r < 0) {
if (errno != EINTR) {
gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
if (fd) {
grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
}
} else if (r == 0) {
/* do nothing */
if (fd) {
grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
}
} else {
if (pfd[0].revents & POLLIN) {
if (pfd[0].revents & POLLIN_CHECK) {
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
}
if (pfd[1].revents & POLLIN) {
if (pfd[1].revents & POLLIN_CHECK) {
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
if (nfds > 2) {
if (pfd[2].revents & (POLLIN | POLLHUP | POLLERR)) {
grpc_fd_become_readable(exec_ctx, fd);
}
if (pfd[2].revents & (POLLOUT | POLLHUP | POLLERR)) {
grpc_fd_become_writable(exec_ctx, fd);
}
grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK,
pfd[2].revents & POLLOUT_CHECK);
} else if (fd) {
grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
}
}
}

Loading…
Cancel
Save