Merge pull request #10218 from ctiller/bugscrub1-proposedfix1

Possible fix to TSAN race
pull/10375/head
Craig Tiller 8 years ago committed by GitHub
commit 5692dfd339
  1. 5
      include/grpc/impl/codegen/atm_gcc_atomic.h
  2. 1
      include/grpc/impl/codegen/atm_gcc_sync.h
  3. 135
      src/core/lib/iomgr/ev_epoll_linux.c
  4. 85
      test/core/iomgr/ev_epoll_linux_test.c

@ -85,6 +85,11 @@ static __inline int gpr_atm_rel_cas(gpr_atm *p, gpr_atm o, gpr_atm n) {
p, &o, n, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED));
}
static __inline int gpr_atm_full_cas(gpr_atm *p, gpr_atm o, gpr_atm n) {
return GPR_ATM_INC_CAS_THEN(__atomic_compare_exchange_n(
p, &o, n, 0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED));
}
#define gpr_atm_full_xchg(p, n) \
GPR_ATM_INC_CAS_THEN(__atomic_exchange_n((p), (n), __ATOMIC_ACQ_REL))

@ -83,6 +83,7 @@ static __inline void gpr_atm_no_barrier_store(gpr_atm *p, gpr_atm value) {
#define gpr_atm_no_barrier_cas(p, o, n) gpr_atm_acq_cas((p), (o), (n))
#define gpr_atm_acq_cas(p, o, n) (__sync_bool_compare_and_swap((p), (o), (n)))
#define gpr_atm_rel_cas(p, o, n) gpr_atm_acq_cas((p), (o), (n))
#define gpr_atm_full_cas(p, o, n) gpr_atm_acq_cas((p), (o), (n))
static __inline gpr_atm gpr_atm_full_xchg(gpr_atm *p, gpr_atm n) {
gpr_atm cur;

@ -1107,19 +1107,20 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
grpc_closure *closure) {
while (true) {
/* Fast-path: CLOSURE_NOT_READY -> <closure>.
The 'release' cas here matches the 'acquire' load in set_ready and
set_shutdown ensuring that the closure (scheduled by set_ready or
set_shutdown) happens-after the I/O event on the fd */
if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) {
return; /* Fast-path successful. Return */
}
/* Slowpath. The 'acquire' load matches the 'release' cas in set_ready and
set_shutdown */
gpr_atm curr = gpr_atm_acq_load(state);
gpr_atm curr = gpr_atm_no_barrier_load(state);
switch (curr) {
case CLOSURE_NOT_READY: {
/* CLOSURE_NOT_READY -> <closure>.
We're guaranteed by API that there's an acquire barrier before here,
so there's no need to double-dip and this can be a release-only.
The release itself pairs with the acquire half of a set_ready full
barrier. */
if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) {
return; /* Successful. Return */
}
break; /* retry */
}
@ -1134,7 +1135,7 @@ static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
is no other code that needs to 'happen-after' this) */
if (gpr_atm_no_barrier_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) {
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
return; /* Slow-path successful. Return */
return; /* Successful. Return */
}
break; /* retry */
@ -1165,30 +1166,19 @@ static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
grpc_error *shutdown_err) {
/* Try the fast-path first (i.e expect the current value to be
CLOSURE_NOT_READY */
gpr_atm curr = CLOSURE_NOT_READY;
gpr_atm new_state = (gpr_atm)shutdown_err | FD_SHUTDOWN_BIT;
while (true) {
/* The 'release' cas here matches the 'acquire' load in notify_on to ensure
that the closure it schedules 'happens-after' the set_shutdown is called
on the fd */
if (gpr_atm_rel_cas(state, curr, new_state)) {
return; /* Fast-path successful. Return */
}
/* Fallback to slowpath. This 'acquire' load matches the 'release' cas in
notify_on and set_ready */
curr = gpr_atm_acq_load(state);
gpr_atm curr = gpr_atm_no_barrier_load(state);
switch (curr) {
case CLOSURE_READY: {
case CLOSURE_READY:
case CLOSURE_NOT_READY:
/* Need a full barrier here so that the initial load in notify_on
doesn't need a barrier */
if (gpr_atm_full_cas(state, curr, new_state)) {
return; /* early out */
}
break; /* retry */
}
case CLOSURE_NOT_READY: {
break; /* retry */
}
default: {
/* 'curr' is either a closure or the fd is already shutdown */
@ -1199,10 +1189,11 @@ static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
}
/* Fd is not shutdown. Schedule the closure and move the state to
shutdown state. The 'release' cas here matches the 'acquire' load in
notify_on to ensure that the closure it schedules 'happens-after'
the set_shutdown is called on the fd */
if (gpr_atm_rel_cas(state, curr, new_state)) {
shutdown state.
Needs an acquire to pair with setting the closure (and get a
happens-after on that edge), and a release to pair with anything
loading the shutdown state. */
if (gpr_atm_full_cas(state, curr, new_state)) {
grpc_closure_sched(exec_ctx, (grpc_closure *)curr,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1));
@ -1220,52 +1211,42 @@ static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
}
static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) {
/* Try an optimistic case first (i.e assume current state is
CLOSURE_NOT_READY).
This 'release' cas matches the 'acquire' load in notify_on ensuring that
any closure (scheduled by notify_on) 'happens-after' the return from
epoll_pwait */
if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) {
return; /* early out */
}
/* The 'acquire' load here matches the 'release' cas in notify_on and
set_shutdown */
gpr_atm curr = gpr_atm_acq_load(state);
switch (curr) {
case CLOSURE_READY: {
/* Already ready. We are done here */
break;
}
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(state);
case CLOSURE_NOT_READY: {
/* The state was not CLOSURE_NOT_READY when we checked initially at the
beginning of this function but now it is CLOSURE_NOT_READY again.
This is only possible if the state transitioned out of
CLOSURE_NOT_READY to either CLOSURE_READY or <some closure> and then
back to CLOSURE_NOT_READY again (i.e after we entered this function,
the fd became "ready" and the necessary actions were already done).
So there is no need to make the state CLOSURE_READY now */
break;
}
switch (curr) {
case CLOSURE_READY: {
/* Already ready. We are done here */
return;
}
default: {
/* 'curr' is either a closure or the fd is shutdown */
if ((curr & FD_SHUTDOWN_BIT) > 0) {
/* The fd is shutdown. Do nothing */
} else if (gpr_atm_no_barrier_cas(state, curr, CLOSURE_NOT_READY)) {
/* The cas above was no-barrier since the state is being transitioned to
CLOSURE_NOT_READY; notify_on and set_shutdown do not schedule any
closures when transitioning out of CLOSURE_NO_READY state (i.e there
is no other code that needs to 'happen-after' this) */
case CLOSURE_NOT_READY: {
/* No barrier required as we're transitioning to a state that does not
involve a closure */
if (gpr_atm_no_barrier_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) {
return; /* early out */
}
break; /* retry */
}
grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE);
default: {
/* 'curr' is either a closure or the fd is shutdown */
if ((curr & FD_SHUTDOWN_BIT) > 0) {
/* The fd is shutdown. Do nothing */
return;
}
/* Full cas: acquire pairs with this cas' release in the event of a
spurious set_ready; release pairs with this or the acquire in
notify_on (or set_shutdown) */
else if (gpr_atm_full_cas(state, curr, CLOSURE_NOT_READY)) {
grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE);
return;
}
/* else the state changed again (only possible by either a racing
set_ready or set_shutdown functions. In both these cases, the closure
would have been scheduled for execution. So we are done here */
return;
}
/* else the state changed again (only possible by either a racing
set_ready or set_shutdown functions. In both these cases, the closure
would have been scheduled for execution. So we are done here */
break;
}
}
}

@ -43,6 +43,8 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/workqueue.h"
@ -312,6 +314,88 @@ static void test_add_fd_to_pollset() {
#undef NUM_FDS
#undef NUM_POLLSETS
typedef struct threading_shared {
gpr_mu *mu;
grpc_pollset *pollset;
grpc_wakeup_fd *wakeup_fd;
grpc_fd *wakeup_desc;
grpc_closure on_wakeup;
int wakeups;
} threading_shared;
static __thread int thread_wakeups = 0;
static void test_threading_loop(void *arg) {
threading_shared *shared = arg;
while (thread_wakeups < 1000000) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_worker *worker;
gpr_mu_lock(shared->mu);
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work",
grpc_pollset_work(&exec_ctx, shared->pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC))));
gpr_mu_unlock(shared->mu);
grpc_exec_ctx_finish(&exec_ctx);
}
}
static void test_threading_wakeup(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
threading_shared *shared = arg;
++shared->wakeups;
++thread_wakeups;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"consume_wakeup", grpc_wakeup_fd_consume_wakeup(shared->wakeup_fd)));
grpc_fd_notify_on_read(exec_ctx, shared->wakeup_desc, &shared->on_wakeup);
GPR_ASSERT(GRPC_LOG_IF_ERROR("wakeup_next",
grpc_wakeup_fd_wakeup(shared->wakeup_fd)));
}
static void test_threading(void) {
threading_shared shared;
shared.pollset = gpr_zalloc(grpc_pollset_size());
grpc_pollset_init(shared.pollset, &shared.mu);
gpr_thd_id thds[10];
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
gpr_thd_options opt = gpr_thd_options_default();
gpr_thd_options_set_joinable(&opt);
gpr_thd_new(&thds[i], test_threading_loop, &shared, &opt);
}
grpc_wakeup_fd fd;
GPR_ASSERT(GRPC_LOG_IF_ERROR("wakeup_fd_init", grpc_wakeup_fd_init(&fd)));
shared.wakeup_fd = &fd;
shared.wakeup_desc = grpc_fd_create(fd.read_fd, "wakeup");
shared.wakeups = 0;
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_add_fd(&exec_ctx, shared.pollset, shared.wakeup_desc);
grpc_fd_notify_on_read(
&exec_ctx, shared.wakeup_desc,
grpc_closure_init(&shared.on_wakeup, test_threading_wakeup, &shared,
grpc_schedule_on_exec_ctx));
grpc_exec_ctx_finish(&exec_ctx);
}
GPR_ASSERT(GRPC_LOG_IF_ERROR("wakeup_first",
grpc_wakeup_fd_wakeup(shared.wakeup_fd)));
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
gpr_thd_join(thds[i]);
}
fd.read_fd = 0;
grpc_wakeup_fd_destroy(&fd);
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_fd_orphan(&exec_ctx, shared.wakeup_desc, NULL, NULL, "done");
grpc_pollset_shutdown(&exec_ctx, shared.pollset,
grpc_closure_create(destroy_pollset, shared.pollset,
grpc_schedule_on_exec_ctx));
grpc_exec_ctx_finish(&exec_ctx);
}
gpr_free(shared.pollset);
}
int main(int argc, char **argv) {
const char *poll_strategy = NULL;
grpc_test_init(argc, argv);
@ -321,6 +405,7 @@ int main(int argc, char **argv) {
if (poll_strategy != NULL && strcmp(poll_strategy, "epoll") == 0) {
test_add_fd_to_pollset();
test_pollset_queue_merge_items();
test_threading();
} else {
gpr_log(GPR_INFO,
"Skipping the test. The test is only relevant for 'epoll' "

Loading…
Cancel
Save