Call fd_orphan callback a little earlier

pull/2229/head
Craig Tiller 10 years ago
parent 1aa041ab1a
commit 8b6cb8d9c7
  1. 30
      src/core/iomgr/fd_posix.c

@ -100,6 +100,7 @@ static grpc_fd *alloc_fd(int fd) {
&r->inactive_watcher_root; &r->inactive_watcher_root;
r->freelist_next = NULL; r->freelist_next = NULL;
r->read_watcher = r->write_watcher = NULL; r->read_watcher = r->write_watcher = NULL;
r->on_done_closure = NULL;
return r; return r;
} }
@ -138,9 +139,6 @@ static void unref_by(grpc_fd *fd, int n) {
#endif #endif
old = gpr_atm_full_fetch_add(&fd->refst, -n); old = gpr_atm_full_fetch_add(&fd->refst, -n);
if (old == n) { if (old == n) {
if (fd->on_done_closure) {
grpc_iomgr_add_callback(fd->on_done_closure);
}
grpc_iomgr_unregister_object(&fd->iomgr_object); grpc_iomgr_unregister_object(&fd->iomgr_object);
freelist_fd(fd); freelist_fd(fd);
} else { } else {
@ -199,13 +197,23 @@ static void wake_all_watchers_locked(grpc_fd *fd) {
} }
} }
static int has_watchers(grpc_fd *fd) {
return fd->read_watcher != NULL || fd->write_watcher != NULL || fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
}
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done, void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
const char *reason) { const char *reason) {
fd->on_done_closure = on_done; fd->on_done_closure = on_done;
shutdown(fd->fd, SHUT_RDWR); shutdown(fd->fd, SHUT_RDWR);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
gpr_mu_lock(&fd->watcher_mu); gpr_mu_lock(&fd->watcher_mu);
wake_all_watchers_locked(fd); if (!has_watchers(fd)) {
if (fd->on_done_closure) {
grpc_iomgr_add_callback(fd->on_done_closure);
}
} else {
wake_all_watchers_locked(fd);
}
gpr_mu_unlock(&fd->watcher_mu); gpr_mu_unlock(&fd->watcher_mu);
UNREF_BY(fd, 2, reason); /* drop the reference */ UNREF_BY(fd, 2, reason); /* drop the reference */
} }
@ -354,6 +362,13 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
GRPC_FD_REF(fd, "poll"); GRPC_FD_REF(fd, "poll");
gpr_mu_lock(&fd->watcher_mu); gpr_mu_lock(&fd->watcher_mu);
/* if we are shutdown, then don't add to the watcher set */
if (gpr_atm_no_barrier_load(&fd->shutdown)) {
watcher->fd = NULL;
watcher->pollset = NULL;
gpr_mu_unlock(&fd->watcher_mu);
return 0;
}
/* if there is nobody polling for read, but we need to, then start doing so */ /* if there is nobody polling for read, but we need to, then start doing so */
if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) { if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
fd->read_watcher = watcher; fd->read_watcher = watcher;
@ -383,6 +398,10 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
int kick = 0; int kick = 0;
grpc_fd *fd = watcher->fd; grpc_fd *fd = watcher->fd;
if (fd == NULL) {
return;
}
gpr_mu_lock(&fd->watcher_mu); gpr_mu_lock(&fd->watcher_mu);
if (watcher == fd->read_watcher) { if (watcher == fd->read_watcher) {
/* remove read watcher, kick if we still need a read */ /* remove read watcher, kick if we still need a read */
@ -404,6 +423,9 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
if (kick) { if (kick) {
maybe_wake_one_watcher_locked(fd); maybe_wake_one_watcher_locked(fd);
} }
if (fd->on_done_closure != NULL && !has_watchers(fd)) {
grpc_iomgr_add_callback(fd->on_done_closure);
}
gpr_mu_unlock(&fd->watcher_mu); gpr_mu_unlock(&fd->watcher_mu);
GRPC_FD_UNREF(fd, "poll"); GRPC_FD_UNREF(fd, "poll");

Loading…
Cancel
Save