Fix a TSAN reported error

We now pass down pointers to closures instead of (callback, arg) pair
elements separately. This allows us to store one word atomically, fixing
a race condition.

All call sites have been updated to the new API. No new allocations are
incurred. grpc_fd_state is deleted to avoid any temptation to ever add
anything there again.
pull/592/head
Craig Tiller 10 years ago
parent 9be83eec1d
commit 0fcd53c701
  1. 78
      src/core/iomgr/fd_posix.c
  2. 14
      src/core/iomgr/fd_posix.h
  3. 7
      src/core/iomgr/tcp_client_posix.c
  4. 15
      src/core/iomgr/tcp_posix.c
  5. 7
      src/core/iomgr/tcp_server_posix.c
  6. 30
      test/core/iomgr/fd_posix_test.c

@ -45,7 +45,7 @@
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
enum descriptor_state { NOT_READY, READY, WAITING };
enum descriptor_state { NOT_READY = 0, READY = 1 }; /* or a pointer to a closure to call */
/* We need to keep a freelist not because of any concerns of malloc performance
* but instead so that implementations with multiple threads in (for example)
@ -88,8 +88,8 @@ static grpc_fd *alloc_fd(int fd) {
gpr_mu_init(&r->watcher_mu);
}
gpr_atm_rel_store(&r->refst, 1);
gpr_atm_rel_store(&r->readst.state, NOT_READY);
gpr_atm_rel_store(&r->writest.state, NOT_READY);
gpr_atm_rel_store(&r->readst, NOT_READY);
gpr_atm_rel_store(&r->writest, NOT_READY);
gpr_atm_rel_store(&r->shutdown, 0);
r->fd = fd;
r->watcher_root.next = r->watcher_root.prev = &r->watcher_root;
@ -166,11 +166,6 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
typedef struct {
grpc_iomgr_cb_func cb;
void *arg;
} callback;
static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
int allow_synchronous_callback) {
if (allow_synchronous_callback) {
@ -180,18 +175,18 @@ static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
}
}
static void make_callbacks(callback *callbacks, size_t n, int success,
static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success,
int allow_synchronous_callback) {
size_t i;
for (i = 0; i < n; i++) {
make_callback(callbacks[i].cb, callbacks[i].arg, success,
make_callback(callbacks[i].cb, callbacks[i].cb_arg, success,
allow_synchronous_callback);
}
}
static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb,
void *arg, int allow_synchronous_callback) {
switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) {
static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
int allow_synchronous_callback) {
switch (gpr_atm_acq_load(st)) {
case NOT_READY:
/* There is no race if the descriptor is already ready, so we skip
the interlocked op in that case. As long as the app doesn't
@ -199,9 +194,7 @@ static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb,
oldval should never be anything other than READY or NOT_READY. We
don't
check for user error on the fast path. */
st->cb = cb;
st->cb_arg = arg;
if (gpr_atm_rel_cas(&st->state, NOT_READY, WAITING)) {
if (gpr_atm_rel_cas(st, NOT_READY, (gpr_intptr) closure)) {
/* swap was successful -- the closure will run after the next
set_ready call. NOTE: we don't have an ABA problem here,
since we should never have concurrent calls to the same
@ -212,12 +205,12 @@ static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb,
/* swap was unsuccessful due to an intervening set_ready call.
Fall through to the READY code below */
case READY:
assert(gpr_atm_acq_load(&st->state) == READY);
gpr_atm_rel_store(&st->state, NOT_READY);
make_callback(cb, arg, !gpr_atm_acq_load(&fd->shutdown),
assert(gpr_atm_acq_load(st) == READY);
gpr_atm_rel_store(st, NOT_READY);
make_callback(closure->cb, closure->cb_arg, !gpr_atm_acq_load(&fd->shutdown),
allow_synchronous_callback);
return;
case WAITING:
default: /* WAITING */
/* upcallptr was set to a different closure. This is an error! */
gpr_log(GPR_ERROR,
"User called a notify_on function with a previous callback still "
@ -228,38 +221,37 @@ static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb,
abort();
}
static void set_ready_locked(grpc_fd_state *st, callback *callbacks,
static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks,
size_t *ncallbacks) {
callback *c;
gpr_intptr state = gpr_atm_acq_load(st);
switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) {
switch (state) {
case READY:
/* duplicate ready, ignore */
return;
case NOT_READY:
if (gpr_atm_rel_cas(&st->state, NOT_READY, READY)) {
if (gpr_atm_rel_cas(st, NOT_READY, READY)) {
/* swap was successful -- the closure will run after the next
notify_on call. */
return;
}
/* swap was unsuccessful due to an intervening set_ready call.
Fall through to the WAITING code below */
case WAITING:
assert(gpr_atm_acq_load(&st->state) == WAITING);
c = &callbacks[(*ncallbacks)++];
c->cb = st->cb;
c->arg = st->cb_arg;
gpr_atm_rel_store(&st->state, NOT_READY);
return;
case READY:
/* duplicate ready, ignore */
state = gpr_atm_acq_load(st);
default: /* waiting */
assert(gpr_atm_acq_load(st) != READY && gpr_atm_acq_load(st) != NOT_READY);
callbacks[(*ncallbacks)++] = *(grpc_iomgr_closure*)state;
gpr_atm_rel_store(st, NOT_READY);
return;
}
}
static void set_ready(grpc_fd *fd, grpc_fd_state *st,
static void set_ready(grpc_fd *fd, gpr_atm *st,
int allow_synchronous_callback) {
/* only one set_ready can be active at once (but there may be a racing
notify_on) */
int success;
callback cb;
grpc_iomgr_closure cb;
size_t ncb = 0;
gpr_mu_lock(&fd->set_state_mu);
set_ready_locked(st, &cb, &ncb);
@ -269,7 +261,7 @@ static void set_ready(grpc_fd *fd, grpc_fd_state *st,
}
void grpc_fd_shutdown(grpc_fd *fd) {
callback cb[2];
grpc_iomgr_closure cb[2];
size_t ncb = 0;
gpr_mu_lock(&fd->set_state_mu);
GPR_ASSERT(!gpr_atm_acq_load(&fd->shutdown));
@ -280,14 +272,12 @@ void grpc_fd_shutdown(grpc_fd *fd) {
make_callbacks(cb, ncb, 0, 0);
}
void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb,
void *read_cb_arg) {
notify_on(fd, &fd->readst, read_cb, read_cb_arg, 0);
void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) {
notify_on(fd, &fd->readst, closure, 0);
}
void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb,
void *write_cb_arg) {
notify_on(fd, &fd->writest, write_cb, write_cb_arg, 0);
void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure) {
notify_on(fd, &fd->writest, closure, 0);
}
gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
@ -303,8 +293,8 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
watcher->fd = fd;
gpr_mu_unlock(&fd->watcher_mu);
return (gpr_atm_acq_load(&fd->readst.state) != READY ? read_mask : 0) |
(gpr_atm_acq_load(&fd->writest.state) != READY ? write_mask : 0);
return (gpr_atm_acq_load(&fd->readst) != READY ? read_mask : 0) |
(gpr_atm_acq_load(&fd->writest) != READY ? write_mask : 0);
}
void grpc_fd_end_poll(grpc_fd_watcher *watcher) {

@ -43,9 +43,7 @@
typedef struct {
grpc_iomgr_cb_func cb;
void *cb_arg;
int success;
gpr_atm state;
} grpc_fd_state;
} grpc_iomgr_closure;
typedef struct grpc_fd grpc_fd;
@ -71,8 +69,8 @@ struct grpc_fd {
gpr_mu watcher_mu;
grpc_fd_watcher watcher_root;
grpc_fd_state readst;
grpc_fd_state writest;
gpr_atm readst;
gpr_atm writest;
grpc_iomgr_cb_func on_done;
void *on_done_user_data;
@ -126,12 +124,10 @@ void grpc_fd_shutdown(grpc_fd *fd);
underlying platform. This means that users must drain fd in read_cb before
calling notify_on_read again. Users are also expected to handle spurious
events, i.e read_cb is called while nothing can be readable from fd */
void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb,
void *read_cb_arg);
void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure);
/* Exactly the same semantics as above, except based on writable events. */
void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb,
void *write_cb_arg);
void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure);
/* Notification from the poller to an fd that it has become readable or
writable.

@ -60,6 +60,7 @@ typedef struct {
gpr_timespec deadline;
grpc_alarm alarm;
int refs;
grpc_iomgr_closure write_closure;
} async_connect;
static int prepare_socket(const struct sockaddr *addr, int fd) {
@ -136,7 +137,7 @@ static void on_writable(void *acp, int success) {
opened too many network connections. The "easy" fix:
don't do that! */
gpr_log(GPR_ERROR, "kernel out of buffers");
grpc_fd_notify_on_write(ac->fd, on_writable, ac);
grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
return;
} else {
switch (so_error) {
@ -229,9 +230,11 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
ac->fd = grpc_fd_create(fd);
gpr_mu_init(&ac->mu);
ac->refs = 2;
ac->write_closure.cb = on_writable;
ac->write_closure.cb_arg = ac;
grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
grpc_fd_notify_on_write(ac->fd, on_writable, ac);
grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
}
#endif

@ -263,6 +263,9 @@ typedef struct {
void *write_user_data;
grpc_tcp_slice_state write_state;
grpc_iomgr_closure read_closure;
grpc_iomgr_closure write_closure;
} grpc_tcp;
static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success);
@ -370,7 +373,7 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
} else {
/* Spurious read event, consume it here */
slice_state_destroy(&read_state);
grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp);
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
}
} else {
/* TODO(klempner): Log interesting errors */
@ -405,7 +408,7 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
tcp->read_cb = cb;
tcp->read_user_data = user_data;
gpr_ref(&tcp->refcount);
grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp);
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
}
#define MAX_WRITE_IOVEC 16
@ -468,7 +471,7 @@ static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) {
write_status = grpc_tcp_flush(tcp);
if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp);
grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
} else {
slice_state_destroy(&tcp->write_state);
if (write_status == GRPC_ENDPOINT_WRITE_DONE) {
@ -513,7 +516,7 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
gpr_ref(&tcp->refcount);
tcp->write_cb = cb;
tcp->write_user_data = user_data;
grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp);
grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
}
return status;
@ -541,6 +544,10 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
tcp->em_fd = em_fd;
tcp->read_closure.cb = grpc_tcp_handle_read;
tcp->read_closure.cb_arg = tcp;
tcp->write_closure.cb = grpc_tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
return &tcp->base;
}

@ -82,6 +82,7 @@ typedef struct {
struct sockaddr_un un;
} addr;
int addr_len;
grpc_iomgr_closure read_closure;
} server_port;
static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {
@ -244,7 +245,7 @@ static void on_read(void *arg, int success) {
case EINTR:
continue;
case EAGAIN:
grpc_fd_notify_on_read(sp->emfd, on_read, sp);
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
return;
default:
gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
@ -393,7 +394,9 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets,
for (j = 0; j < pollset_count; j++) {
grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd);
}
grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i]);
s->ports[i].read_closure.cb = on_read;
s->ports[i].read_closure.cb_arg = &s->ports[i];
grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure);
s->active_ports++;
}
gpr_mu_unlock(&s->mu);

@ -97,6 +97,7 @@ typedef struct {
gpr_mu mu; /* protect done and done_cv */
gpr_cv done_cv; /* signaled when a server finishes serving */
int done; /* set to 1 when a server finishes serving */
grpc_iomgr_closure listen_closure;
} server;
static void server_init(server *sv) {
@ -112,6 +113,7 @@ typedef struct {
server *sv; /* not owned by a single session */
grpc_fd *em_fd; /* fd to read upload bytes */
char read_buf[BUF_SIZE]; /* buffer to store upload bytes */
grpc_iomgr_closure session_read_closure;
} session;
/* Called when an upload session can be safely shutdown.
@ -162,7 +164,7 @@ static void session_read_cb(void *arg, /*session*/
TODO(chenw): in multi-threaded version, callback and polling can be
run in different threads. polling may catch a persist read edge event
before notify_on_read is called. */
grpc_fd_notify_on_read(se->em_fd, session_read_cb, se);
grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
} else {
gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno));
abort();
@ -207,9 +209,11 @@ static void listen_cb(void *arg, /*=sv_arg*/
se = gpr_malloc(sizeof(*se));
se->sv = sv;
se->em_fd = grpc_fd_create(fd);
grpc_fd_notify_on_read(se->em_fd, session_read_cb, se);
se->session_read_closure.cb = session_read_cb;
se->session_read_closure.cb_arg = se;
grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
grpc_fd_notify_on_read(listen_em_fd, listen_cb, sv);
grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure);
}
/* Max number of connections pending to be accepted by listen(). */
@ -234,7 +238,9 @@ static int server_start(server *sv) {
sv->em_fd = grpc_fd_create(fd);
/* Register to be interested in reading from listen_fd. */
grpc_fd_notify_on_read(sv->em_fd, listen_cb, sv);
sv->listen_closure.cb = listen_cb;
sv->listen_closure.cb_arg = sv;
grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure);
return port;
}
@ -268,6 +274,7 @@ typedef struct {
gpr_mu mu; /* protect done and done_cv */
gpr_cv done_cv; /* signaled when a client finishes sending */
int done; /* set to 1 when a client finishes sending */
grpc_iomgr_closure write_closure;
} client;
static void client_init(client *cl) {
@ -309,7 +316,9 @@ static void client_session_write(void *arg, /*client*/
if (errno == EAGAIN) {
gpr_mu_lock(&cl->mu);
if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
grpc_fd_notify_on_write(cl->em_fd, client_session_write, cl);
cl->write_closure.cb = client_session_write;
cl->write_closure.cb_arg = cl;
grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure);
cl->client_write_cnt++;
} else {
client_session_shutdown_cb(arg, 1);
@ -421,6 +430,13 @@ static void test_grpc_fd_change(void) {
int sv[2];
char data;
int result;
grpc_iomgr_closure first_closure;
grpc_iomgr_closure second_closure;
first_closure.cb = first_read_callback;
first_closure.cb_arg = &a;
second_closure.cb = second_read_callback;
second_closure.cb_arg = &b;
init_change_data(&a);
init_change_data(&b);
@ -434,7 +450,7 @@ static void test_grpc_fd_change(void) {
em_fd = grpc_fd_create(sv[0]);
/* Register the first callback, then make its FD readable */
grpc_fd_notify_on_read(em_fd, first_read_callback, &a);
grpc_fd_notify_on_read(em_fd, &first_closure);
data = 0;
result = write(sv[1], &data, 1);
GPR_ASSERT(result == 1);
@ -453,7 +469,7 @@ static void test_grpc_fd_change(void) {
/* Now register a second callback with distinct change data, and do the same
thing again. */
grpc_fd_notify_on_read(em_fd, second_read_callback, &b);
grpc_fd_notify_on_read(em_fd, &second_closure);
data = 0;
result = write(sv[1], &data, 1);
GPR_ASSERT(result == 1);

Loading…
Cancel
Save