From 0317b3d082a2120900123744aeaadd871133e724 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 1 Jun 2015 21:57:03 -0700 Subject: [PATCH] Fix TSAN reported errors --- src/core/iomgr/fd_posix.c | 28 ++++++++++++---------------- src/core/iomgr/fd_posix.h | 4 ++-- src/core/iomgr/tcp_client_posix.c | 12 +++++++----- src/core/iomgr/tcp_posix.c | 2 +- src/core/iomgr/tcp_server_posix.c | 5 ++++- test/core/iomgr/fd_posix_test.c | 8 ++++---- 6 files changed, 30 insertions(+), 29 deletions(-) diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index a43e3ed2781..52a6920321d 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -134,7 +134,9 @@ static void unref_by(grpc_fd *fd, int n) { #endif old = gpr_atm_full_fetch_add(&fd->refst, -n); if (old == n) { - grpc_iomgr_add_callback(&fd->on_done_closure); + if (fd->on_done_closure) { + grpc_iomgr_add_callback(fd->on_done_closure); + } freelist_fd(fd); grpc_iomgr_unregister_object(&fd->iomgr_object); } else { @@ -153,8 +155,6 @@ void grpc_fd_global_shutdown(void) { gpr_mu_destroy(&fd_freelist_mu); } -static void do_nothing(void *ignored, int success) {} - grpc_fd *grpc_fd_create(int fd, const char *name) { grpc_fd *r = alloc_fd(fd); grpc_iomgr_register_object(&r->iomgr_object, name); @@ -195,9 +195,8 @@ static void wake_all_watchers_locked(grpc_fd *fd) { } } -void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) { - grpc_iomgr_closure_init(&fd->on_done_closure, on_done ? on_done : do_nothing, - user_data); +void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done) { + fd->on_done_closure = on_done; shutdown(fd->fd, SHUT_RDWR); REF_BY(fd, 1, "orphan"); /* remove active status, but keep referenced */ gpr_mu_lock(&fd->watcher_mu); @@ -208,21 +207,18 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) { /* increment refcount by two to avoid changing the orphan bit */ #ifdef GRPC_FD_REF_COUNT_DEBUG -void grpc_fd_ref(grpc_fd *fd, const char *reason, const char *file, int line) { - ref_by(fd, 2, reason, file, line); +void grpc_fd_ref(grpc_fd *fd, const char *reason, const char *file, int line) { + ref_by(fd, 2, reason, file, line); } -void grpc_fd_unref(grpc_fd *fd, const char *reason, const char *file, int line) { - unref_by(fd, 2, reason, file, line); +void grpc_fd_unref(grpc_fd *fd, const char *reason, const char *file, + int line) { + unref_by(fd, 2, reason, file, line); } #else -void grpc_fd_ref(grpc_fd *fd) { - ref_by(fd, 2); -} +void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); } -void grpc_fd_unref(grpc_fd *fd) { - unref_by(fd, 2); -} +void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } #endif static void process_callback(grpc_iomgr_closure *closure, int success, diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index 70992aead55..1de5d088c5d 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -93,7 +93,7 @@ struct grpc_fd { struct grpc_fd *freelist_next; - grpc_iomgr_closure on_done_closure; + grpc_iomgr_closure *on_done_closure; grpc_iomgr_closure *shutdown_closures[2]; grpc_iomgr_object iomgr_object; @@ -109,7 +109,7 @@ grpc_fd *grpc_fd_create(int fd, const char *name); If on_done is NULL, no callback will be made. Requires: *fd initialized; no outstanding notify_on_read or notify_on_write. */ -void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data); +void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done); /* Begin polling on an fd. Registers that the given pollset is interested in this fd - so that if read diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 981c326511c..93c14c4aaba 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -112,8 +112,6 @@ static void on_writable(void *acp, int success) { void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; void *cb_arg = ac->cb_arg; - grpc_alarm_cancel(&ac->alarm); - if (success) { do { so_error_size = sizeof(so_error); @@ -166,13 +164,15 @@ static void on_writable(void *acp, int success) { finish: gpr_mu_lock(&ac->mu); if (!ep) { - grpc_fd_orphan(ac->fd, NULL, NULL); + grpc_fd_orphan(ac->fd, NULL); } done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); if (done) { gpr_mu_destroy(&ac->mu); gpr_free(ac); + } else { + grpc_alarm_cancel(&ac->alarm); } cb(cb_arg, ep); } @@ -230,7 +230,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), if (errno != EWOULDBLOCK && errno != EINPROGRESS) { gpr_log(GPR_ERROR, "connect error to '%s': %s", strerror(errno)); - grpc_fd_orphan(fdobj, NULL, NULL); + grpc_fd_orphan(fdobj, NULL); cb(arg, NULL); goto done; } @@ -244,8 +244,10 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac->write_closure.cb = on_writable; ac->write_closure.cb_arg = ac; - grpc_fd_notify_on_write(ac->fd, &ac->write_closure); + gpr_mu_lock(&ac->mu); grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now()); + grpc_fd_notify_on_write(ac->fd, &ac->write_closure); + gpr_mu_unlock(&ac->mu); done: gpr_free(name); diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 2f19f9d442f..4fbbaa7c7d6 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -295,7 +295,7 @@ static void grpc_tcp_shutdown(grpc_endpoint *ep) { static void grpc_tcp_unref(grpc_tcp *tcp) { int refcount_zero = gpr_unref(&tcp->refcount); if (refcount_zero) { - grpc_fd_orphan(tcp->em_fd, NULL, NULL); + grpc_fd_orphan(tcp->em_fd, NULL); gpr_free(tcp); } } diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 5c0203c3e3e..fe71bdfe6fd 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -84,6 +84,7 @@ typedef struct { } addr; int addr_len; grpc_iomgr_closure read_closure; + grpc_iomgr_closure destroyed_closure; } server_port; static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { @@ -175,7 +176,9 @@ static void deactivated_all_ports(grpc_tcp_server *s) { if (sp->addr.sockaddr.sa_family == AF_UNIX) { unlink_if_unix_domain_socket(&sp->addr.un); } - grpc_fd_orphan(sp->emfd, destroyed_port, s); + sp->destroyed_closure.cb = destroyed_port; + sp->destroyed_closure.cb_arg = s; + grpc_fd_orphan(sp->emfd, &sp->destroyed_closure); } gpr_mu_unlock(&s->mu); } else { diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index 4faa888ca57..2d1ae45bc58 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -120,7 +120,7 @@ static void session_shutdown_cb(void *arg, /*session*/ int success) { session *se = arg; server *sv = se->sv; - grpc_fd_orphan(se->em_fd, NULL, NULL); + grpc_fd_orphan(se->em_fd, NULL); gpr_free(se); /* Start to shutdown listen fd. */ grpc_fd_shutdown(sv->em_fd); @@ -175,7 +175,7 @@ static void session_read_cb(void *arg, /*session*/ static void listen_shutdown_cb(void *arg /*server*/, int success) { server *sv = arg; - grpc_fd_orphan(sv->em_fd, NULL, NULL); + grpc_fd_orphan(sv->em_fd, NULL); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); sv->done = 1; @@ -284,7 +284,7 @@ static void client_init(client *cl) { /* Called when a client upload session is ready to shutdown. */ static void client_session_shutdown_cb(void *arg /*client*/, int success) { client *cl = arg; - grpc_fd_orphan(cl->em_fd, NULL, NULL); + grpc_fd_orphan(cl->em_fd, NULL); cl->done = 1; grpc_pollset_kick(&g_pollset); } @@ -472,7 +472,7 @@ static void test_grpc_fd_change(void) { GPR_ASSERT(b.cb_that_ran == second_read_callback); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_fd_orphan(em_fd, NULL, NULL); + grpc_fd_orphan(em_fd, NULL); destroy_change_data(&a); destroy_change_data(&b); close(sv[1]);