Fix threading problem on early orphaning

pull/1888/head
Craig Tiller 10 years ago
parent 79e11c6d05
commit 4b678bd722
  1. 13
      src/core/iomgr/fd_posix.c
  2. 5
      src/core/iomgr/fd_posix.h
  3. 9
      src/core/iomgr/tcp_client_posix.c
  4. 2
      src/core/iomgr/tcp_posix.c
  5. 2
      src/core/iomgr/tcp_server_posix.c
  6. 8
      test/core/iomgr/fd_posix_test.c

@ -114,7 +114,8 @@ static void destroy(grpc_fd *fd) {
#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) {
gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
fd->refst, fd->refst + n, reason, file, line);
gpr_atm_no_barrier_load(&fd->refst),
gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
#else
#define REF_BY(fd, n, reason) ref_by(fd, n)
#define UNREF_BY(fd, n, reason) unref_by(fd, n)
@ -127,7 +128,8 @@ static void ref_by(grpc_fd *fd, int n) {
static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) {
gpr_atm old;
gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
fd->refst, fd->refst - n, reason, file, line);
gpr_atm_no_barrier_load(&fd->refst),
gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
#else
static void unref_by(grpc_fd *fd, int n) {
gpr_atm old;
@ -195,14 +197,15 @@ static void wake_all_watchers_locked(grpc_fd *fd) {
}
}
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done) {
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
const char *reason) {
fd->on_done_closure = on_done;
shutdown(fd->fd, SHUT_RDWR);
REF_BY(fd, 1, "orphan"); /* remove active status, but keep referenced */
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
gpr_mu_lock(&fd->watcher_mu);
wake_all_watchers_locked(fd);
gpr_mu_unlock(&fd->watcher_mu);
UNREF_BY(fd, 2, "orphan"); /* drop the reference */
UNREF_BY(fd, 2, reason); /* drop the reference */
}
/* increment refcount by two to avoid changing the orphan bit */

@ -109,7 +109,8 @@ grpc_fd *grpc_fd_create(int fd, const char *name);
If on_done is NULL, no callback will be made.
Requires: *fd initialized; no outstanding notify_on_read or
notify_on_write. */
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done);
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
const char *reason);
/* Begin polling on an fd.
Registers that the given pollset is interested in this fd - so that if read
@ -159,6 +160,8 @@ void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback);
void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback);
/* Reference counting for fds */
#define GRPC_FD_REF_COUNT_DEBUG
#ifdef GRPC_FD_REF_COUNT_DEBUG
void grpc_fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
void grpc_fd_unref(grpc_fd *fd, const char *reason, const char *file, int line);

@ -164,7 +164,7 @@ static void on_writable(void *acp, int success) {
finish:
gpr_mu_lock(&ac->mu);
if (!ep) {
grpc_fd_orphan(ac->fd, NULL);
grpc_fd_orphan(ac->fd, NULL, "tcp_client_orphan");
}
done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
@ -220,7 +220,6 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
gpr_asprintf(&name, "tcp-client:%s", addr_str);
fdobj = grpc_fd_create(fd, name);
grpc_pollset_set_add_fd(interested_parties, fdobj);
if (err >= 0) {
cb(arg,
@ -229,12 +228,14 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
gpr_log(GPR_ERROR, "connect error to '%s': %s", strerror(errno));
grpc_fd_orphan(fdobj, NULL);
gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno));
grpc_fd_orphan(fdobj, NULL, "tcp_client_connect_error");
cb(arg, NULL);
goto done;
}
grpc_pollset_set_add_fd(interested_parties, fdobj);
ac = gpr_malloc(sizeof(async_connect));
ac->cb = cb;
ac->cb_arg = arg;

@ -295,7 +295,7 @@ static void grpc_tcp_shutdown(grpc_endpoint *ep) {
static void grpc_tcp_unref(grpc_tcp *tcp) {
int refcount_zero = gpr_unref(&tcp->refcount);
if (refcount_zero) {
grpc_fd_orphan(tcp->em_fd, NULL);
grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan");
gpr_free(tcp);
}
}

@ -178,7 +178,7 @@ static void deactivated_all_ports(grpc_tcp_server *s) {
}
sp->destroyed_closure.cb = destroyed_port;
sp->destroyed_closure.cb_arg = s;
grpc_fd_orphan(sp->emfd, &sp->destroyed_closure);
grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "tcp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
} else {

@ -120,7 +120,7 @@ static void session_shutdown_cb(void *arg, /*session*/
int success) {
session *se = arg;
server *sv = se->sv;
grpc_fd_orphan(se->em_fd, NULL);
grpc_fd_orphan(se->em_fd, NULL, "a");
gpr_free(se);
/* Start to shutdown listen fd. */
grpc_fd_shutdown(sv->em_fd);
@ -175,7 +175,7 @@ static void session_read_cb(void *arg, /*session*/
static void listen_shutdown_cb(void *arg /*server*/, int success) {
server *sv = arg;
grpc_fd_orphan(sv->em_fd, NULL);
grpc_fd_orphan(sv->em_fd, NULL, "b");
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
sv->done = 1;
@ -284,7 +284,7 @@ static void client_init(client *cl) {
/* Called when a client upload session is ready to shutdown. */
static void client_session_shutdown_cb(void *arg /*client*/, int success) {
client *cl = arg;
grpc_fd_orphan(cl->em_fd, NULL);
grpc_fd_orphan(cl->em_fd, NULL, "c");
cl->done = 1;
grpc_pollset_kick(&g_pollset);
}
@ -472,7 +472,7 @@ static void test_grpc_fd_change(void) {
GPR_ASSERT(b.cb_that_ran == second_read_callback);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
grpc_fd_orphan(em_fd, NULL);
grpc_fd_orphan(em_fd, NULL, "d");
destroy_change_data(&a);
destroy_change_data(&b);
close(sv[1]);

Loading…
Cancel
Save