Fix TSAN reported lock-inversion in epoll fd addition

pull/2713/head
Craig Tiller 9 years ago
parent 06aeac6470
commit e97c9b4d86
  1. 60
      src/core/iomgr/pollset_multipoller_with_epoll.c
  2. 16
      src/core/iomgr/pollset_posix.c
  3. 4
      src/core/iomgr/pollset_posix.h

@ -50,23 +50,23 @@ typedef struct wakeup_fd_hdl {
struct wakeup_fd_hdl *next;
} wakeup_fd_hdl;
typedef struct {
grpc_pollset *pollset;
grpc_fd *fd;
grpc_iomgr_closure closure;
} delayed_add;
typedef struct {
int epoll_fd;
wakeup_fd_hdl *free_wakeup_fds;
} pollset_hdr;
static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
grpc_fd *fd,
int and_unlock_pollset) {
static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
pollset_hdr *h = pollset->data.ptr;
struct epoll_event ev;
int err;
grpc_fd_watcher watcher;
if (and_unlock_pollset) {
gpr_mu_unlock(&pollset->mu);
}
/* We pretend to be polling whilst adding an fd to keep the fd from being
closed during the add. This may result in a spurious wakeup being assigned
to this pollset whilst adding, but that should be benign. */
@ -86,6 +86,52 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
grpc_fd_end_poll(&watcher, 0, 0);
}
static void perform_delayed_add(void *arg, int iomgr_status) {
delayed_add *da = arg;
int do_shutdown_cb = 0;
if (!grpc_fd_is_orphaned(da->fd)) {
finally_add_fd(da->pollset, da->fd);
}
gpr_mu_lock(&da->pollset->mu);
da->pollset->in_flight_cbs--;
if (da->pollset->shutting_down) {
/* We don't care about this pollset anymore. */
if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) {
GPR_ASSERT(!grpc_pollset_has_workers(da->pollset));
da->pollset->called_shutdown = 1;
do_shutdown_cb = 1;
}
}
gpr_mu_unlock(&da->pollset->mu);
GRPC_FD_UNREF(da->fd, "delayed_add");
if (do_shutdown_cb) {
da->pollset->shutdown_done_cb(da->pollset->shutdown_done_arg);
}
gpr_free(da);
}
static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
grpc_fd *fd,
int and_unlock_pollset) {
if (and_unlock_pollset) {
gpr_mu_unlock(&pollset->mu);
finally_add_fd(pollset, fd);
} else {
delayed_add *da = gpr_malloc(sizeof(*da));
da->pollset = pollset;
da->fd = fd;
GRPC_FD_REF(fd, "delayed_add");
grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da);
pollset->in_flight_cbs++;
grpc_iomgr_add_callback(&da->closure);
}
}
static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
grpc_fd *fd,
int and_unlock_pollset) {

@ -62,12 +62,12 @@ static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->next->prev = worker->prev;
}
static int has_workers(grpc_pollset *p) {
int grpc_pollset_has_workers(grpc_pollset *p) {
return p->root_worker.next != &p->root_worker;
}
static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
if (has_workers(p)) {
if (grpc_pollset_has_workers(p)) {
grpc_pollset_worker *w = p->root_worker.next;
remove_worker(p, w);
return w;
@ -204,7 +204,7 @@ done:
remove_worker(pollset, worker);
}
if (pollset->shutting_down) {
if (has_workers(pollset)) {
if (grpc_pollset_has_workers(pollset)) {
grpc_pollset_kick(pollset, NULL);
} else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
pollset->called_shutdown = 1;
@ -228,7 +228,7 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
GPR_ASSERT(!pollset->shutting_down);
pollset->shutting_down = 1;
if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
!has_workers(pollset)) {
!grpc_pollset_has_workers(pollset)) {
pollset->called_shutdown = 1;
call_shutdown = 1;
}
@ -245,7 +245,7 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
void grpc_pollset_destroy(grpc_pollset *pollset) {
GPR_ASSERT(pollset->shutting_down);
GPR_ASSERT(pollset->in_flight_cbs == 0);
GPR_ASSERT(!has_workers(pollset));
GPR_ASSERT(!grpc_pollset_has_workers(pollset));
pollset->vtable->destroy(pollset);
gpr_mu_destroy(&pollset->mu);
}
@ -297,7 +297,7 @@ static void basic_do_promote(void *args, int success) {
gpr_mu_lock(&pollset->mu);
/* First we need to ensure that nobody is polling concurrently */
if (has_workers(pollset)) {
if (grpc_pollset_has_workers(pollset)) {
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
grpc_iomgr_add_callback(&up_args->promotion_closure);
gpr_mu_unlock(&pollset->mu);
@ -314,7 +314,7 @@ static void basic_do_promote(void *args, int success) {
if (pollset->shutting_down) {
/* We don't care about this pollset anymore. */
if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) {
GPR_ASSERT(!has_workers(pollset));
GPR_ASSERT(!grpc_pollset_has_workers(pollset));
pollset->called_shutdown = 1;
do_shutdown_cb = 1;
}
@ -357,7 +357,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
GPR_ASSERT(fd);
if (fd == pollset->data.ptr) goto exit;
if (!has_workers(pollset)) {
if (!grpc_pollset_has_workers(pollset)) {
/* Fast path -- no in flight cbs */
/* TODO(klempner): Comment this out and fix any test failures or establish
* they are due to timing issues */

@ -113,4 +113,8 @@ extern grpc_platform_become_multipoller_type grpc_platform_become_multipoller;
void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds,
size_t fd_count);
/* Return 1 if the pollset has active threads in grpc_pollset_work (pollset must
* be locked) */
int grpc_pollset_has_workers(grpc_pollset *pollset);
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */

Loading…
Cancel
Save