diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 6c666bd420f..2ac1866a662 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -114,7 +114,8 @@ static void destroy(grpc_fd *fd) { #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__) static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) { gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, - fd->refst, fd->refst + n, reason, file, line); + gpr_atm_no_barrier_load(&fd->refst), + gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line); #else #define REF_BY(fd, n, reason) ref_by(fd, n) #define UNREF_BY(fd, n, reason) unref_by(fd, n) @@ -127,7 +128,8 @@ static void ref_by(grpc_fd *fd, int n) { static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) { gpr_atm old; gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, - fd->refst, fd->refst - n, reason, file, line); + gpr_atm_no_barrier_load(&fd->refst), + gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line); #else static void unref_by(grpc_fd *fd, int n) { gpr_atm old; @@ -195,14 +197,15 @@ static void wake_all_watchers_locked(grpc_fd *fd) { } } -void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done) { +void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done, + const char *reason) { fd->on_done_closure = on_done; shutdown(fd->fd, SHUT_RDWR); - REF_BY(fd, 1, "orphan"); /* remove active status, but keep referenced */ + REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ gpr_mu_lock(&fd->watcher_mu); wake_all_watchers_locked(fd); gpr_mu_unlock(&fd->watcher_mu); - UNREF_BY(fd, 2, "orphan"); /* drop the reference */ + UNREF_BY(fd, 2, reason); /* drop the reference */ } /* increment refcount by two to avoid changing the orphan bit */ diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index 1de5d088c5d..ae85550abab 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -109,7 +109,8 @@ grpc_fd *grpc_fd_create(int fd, const char *name); If on_done is NULL, no callback will be made. Requires: *fd initialized; no outstanding notify_on_read or notify_on_write. */ -void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done); +void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done, + const char *reason); /* Begin polling on an fd. Registers that the given pollset is interested in this fd - so that if read @@ -159,6 +160,8 @@ void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback); void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback); /* Reference counting for fds */ +#define GRPC_FD_REF_COUNT_DEBUG + #ifdef GRPC_FD_REF_COUNT_DEBUG void grpc_fd_ref(grpc_fd *fd, const char *reason, const char *file, int line); void grpc_fd_unref(grpc_fd *fd, const char *reason, const char *file, int line); diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 93c14c4aaba..2cd4aa2f450 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -164,7 +164,7 @@ static void on_writable(void *acp, int success) { finish: gpr_mu_lock(&ac->mu); if (!ep) { - grpc_fd_orphan(ac->fd, NULL); + grpc_fd_orphan(ac->fd, NULL, "tcp_client_orphan"); } done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); @@ -220,7 +220,6 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), gpr_asprintf(&name, "tcp-client:%s", addr_str); fdobj = grpc_fd_create(fd, name); - grpc_pollset_set_add_fd(interested_parties, fdobj); if (err >= 0) { cb(arg, @@ -229,12 +228,14 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), } if (errno != EWOULDBLOCK && errno != EINPROGRESS) { - gpr_log(GPR_ERROR, "connect error to '%s': %s", strerror(errno)); - grpc_fd_orphan(fdobj, NULL); + gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno)); + grpc_fd_orphan(fdobj, NULL, "tcp_client_connect_error"); cb(arg, NULL); goto done; } + grpc_pollset_set_add_fd(interested_parties, fdobj); + ac = gpr_malloc(sizeof(async_connect)); ac->cb = cb; ac->cb_arg = arg; diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 4fbbaa7c7d6..e3289f68061 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -295,7 +295,7 @@ static void grpc_tcp_shutdown(grpc_endpoint *ep) { static void grpc_tcp_unref(grpc_tcp *tcp) { int refcount_zero = gpr_unref(&tcp->refcount); if (refcount_zero) { - grpc_fd_orphan(tcp->em_fd, NULL); + grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan"); gpr_free(tcp); } } diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index fe71bdfe6fd..759a4937957 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -178,7 +178,7 @@ static void deactivated_all_ports(grpc_tcp_server *s) { } sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; - grpc_fd_orphan(sp->emfd, &sp->destroyed_closure); + grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "tcp_listener_shutdown"); } gpr_mu_unlock(&s->mu); } else { diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index 2d1ae45bc58..b1be316a4ed 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -120,7 +120,7 @@ static void session_shutdown_cb(void *arg, /*session*/ int success) { session *se = arg; server *sv = se->sv; - grpc_fd_orphan(se->em_fd, NULL); + grpc_fd_orphan(se->em_fd, NULL, "a"); gpr_free(se); /* Start to shutdown listen fd. */ grpc_fd_shutdown(sv->em_fd); @@ -175,7 +175,7 @@ static void session_read_cb(void *arg, /*session*/ static void listen_shutdown_cb(void *arg /*server*/, int success) { server *sv = arg; - grpc_fd_orphan(sv->em_fd, NULL); + grpc_fd_orphan(sv->em_fd, NULL, "b"); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); sv->done = 1; @@ -284,7 +284,7 @@ static void client_init(client *cl) { /* Called when a client upload session is ready to shutdown. */ static void client_session_shutdown_cb(void *arg /*client*/, int success) { client *cl = arg; - grpc_fd_orphan(cl->em_fd, NULL); + grpc_fd_orphan(cl->em_fd, NULL, "c"); cl->done = 1; grpc_pollset_kick(&g_pollset); } @@ -472,7 +472,7 @@ static void test_grpc_fd_change(void) { GPR_ASSERT(b.cb_that_ran == second_read_callback); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_fd_orphan(em_fd, NULL); + grpc_fd_orphan(em_fd, NULL, "d"); destroy_change_data(&a); destroy_change_data(&b); close(sv[1]);