Merge pull request #13311 from ctiller/lfe3

Fix races in rolled-back LockfreeEvent code
pull/13281/head^2
Craig Tiller 7 years ago committed by GitHub
commit 2575141dc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 30
      src/core/lib/iomgr/ev_epoll1_linux.cc
  2. 28
      src/core/lib/iomgr/ev_epollex_linux.cc
  3. 28
      src/core/lib/iomgr/ev_epollsig_linux.cc
  4. 130
      src/core/lib/iomgr/lockfree_event.cc
  5. 44
      src/core/lib/iomgr/lockfree_event.h

@ -46,6 +46,7 @@
#include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/lockfree_event.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/support/string.h" #include "src/core/lib/support/string.h"
static grpc_wakeup_fd global_wakeup_fd; static grpc_wakeup_fd global_wakeup_fd;
@ -111,8 +112,8 @@ static void epoll_set_shutdown() {
struct grpc_fd { struct grpc_fd {
int fd; int fd;
gpr_atm read_closure; grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
gpr_atm write_closure; grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
struct grpc_fd* freelist_next; struct grpc_fd* freelist_next;
@ -264,8 +265,8 @@ static grpc_fd* fd_create(int fd, const char* name) {
} }
new_fd->fd = fd; new_fd->fd = fd;
grpc_lfev_init(&new_fd->read_closure); new_fd->read_closure.Init();
grpc_lfev_init(&new_fd->write_closure); new_fd->write_closure.Init();
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = NULL; new_fd->freelist_next = NULL;
@ -297,12 +298,11 @@ static int fd_wrapped_fd(grpc_fd* fd) { return fd->fd; }
* shutdown() syscall on that fd) */ * shutdown() syscall on that fd) */
static void fd_shutdown_internal(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_shutdown_internal(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_error* why, bool releasing_fd) { grpc_error* why, bool releasing_fd) {
if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure, if (fd->read_closure->SetShutdown(exec_ctx, GRPC_ERROR_REF(why))) {
GRPC_ERROR_REF(why))) {
if (!releasing_fd) { if (!releasing_fd) {
shutdown(fd->fd, SHUT_RDWR); shutdown(fd->fd, SHUT_RDWR);
} }
grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why)); fd->write_closure->SetShutdown(exec_ctx, GRPC_ERROR_REF(why));
} }
GRPC_ERROR_UNREF(why); GRPC_ERROR_UNREF(why);
} }
@ -318,7 +318,7 @@ static void fd_orphan(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
bool is_release_fd = (release_fd != NULL); bool is_release_fd = (release_fd != NULL);
if (!grpc_lfev_is_shutdown(&fd->read_closure)) { if (!fd->read_closure->IsShutdown()) {
fd_shutdown_internal(exec_ctx, fd, fd_shutdown_internal(exec_ctx, fd,
GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason), GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason),
is_release_fd); is_release_fd);
@ -335,8 +335,8 @@ static void fd_orphan(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
GRPC_CLOSURE_SCHED(exec_ctx, on_done, GRPC_ERROR_REF(error)); GRPC_CLOSURE_SCHED(exec_ctx, on_done, GRPC_ERROR_REF(error));
grpc_iomgr_unregister_object(&fd->iomgr_object); grpc_iomgr_unregister_object(&fd->iomgr_object);
grpc_lfev_destroy(&fd->read_closure); fd->read_closure.Destroy();
grpc_lfev_destroy(&fd->write_closure); fd->write_closure.Destroy();
gpr_mu_lock(&fd_freelist_mu); gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist; fd->freelist_next = fd_freelist;
@ -351,28 +351,28 @@ static grpc_pollset* fd_get_read_notifier_pollset(grpc_exec_ctx* exec_ctx,
} }
static bool fd_is_shutdown(grpc_fd* fd) { static bool fd_is_shutdown(grpc_fd* fd) {
return grpc_lfev_is_shutdown(&fd->read_closure); return fd->read_closure->IsShutdown();
} }
static void fd_notify_on_read(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_notify_on_read(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_closure* closure) { grpc_closure* closure) {
grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read"); fd->read_closure->NotifyOn(exec_ctx, closure);
} }
static void fd_notify_on_write(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_notify_on_write(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_closure* closure) { grpc_closure* closure) {
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write"); fd->write_closure->NotifyOn(exec_ctx, closure);
} }
static void fd_become_readable(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_become_readable(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_pollset* notifier) { grpc_pollset* notifier) {
grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read"); fd->read_closure->SetReady(exec_ctx);
/* Use release store to match with acquire load in fd_get_read_notifier */ /* Use release store to match with acquire load in fd_get_read_notifier */
gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier); gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
} }
static void fd_become_writable(grpc_exec_ctx* exec_ctx, grpc_fd* fd) { static void fd_become_writable(grpc_exec_ctx* exec_ctx, grpc_fd* fd) {
grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write"); fd->write_closure->SetReady(exec_ctx);
} }
/******************************************************************************* /*******************************************************************************

@ -48,6 +48,7 @@
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/support/spinlock.h" #include "src/core/lib/support/spinlock.h"
// debug aid: create workers on the heap (allows asan to spot // debug aid: create workers on the heap (allows asan to spot
@ -153,8 +154,8 @@ struct grpc_fd {
gpr_mu pollable_mu; gpr_mu pollable_mu;
pollable* pollable_obj; pollable* pollable_obj;
gpr_atm read_closure; grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
gpr_atm write_closure; grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
struct grpc_fd* freelist_next; struct grpc_fd* freelist_next;
grpc_closure* on_done_closure; grpc_closure* on_done_closure;
@ -286,8 +287,8 @@ static void fd_destroy(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
fd->freelist_next = fd_freelist; fd->freelist_next = fd_freelist;
fd_freelist = fd; fd_freelist = fd;
grpc_lfev_destroy(&fd->read_closure); fd->read_closure.Destroy();
grpc_lfev_destroy(&fd->write_closure); fd->write_closure.Destroy();
gpr_mu_unlock(&fd_freelist_mu); gpr_mu_unlock(&fd_freelist_mu);
} }
@ -347,8 +348,8 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd->pollable_obj = NULL; new_fd->pollable_obj = NULL;
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd; new_fd->fd = fd;
grpc_lfev_init(&new_fd->read_closure); new_fd->read_closure.Init();
grpc_lfev_init(&new_fd->write_closure); new_fd->write_closure.Init();
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = NULL; new_fd->freelist_next = NULL;
@ -411,27 +412,26 @@ static grpc_pollset* fd_get_read_notifier_pollset(grpc_exec_ctx* exec_ctx,
} }
static bool fd_is_shutdown(grpc_fd* fd) { static bool fd_is_shutdown(grpc_fd* fd) {
return grpc_lfev_is_shutdown(&fd->read_closure); return fd->read_closure->IsShutdown();
} }
/* Might be called multiple times */ /* Might be called multiple times */
static void fd_shutdown(grpc_exec_ctx* exec_ctx, grpc_fd* fd, grpc_error* why) { static void fd_shutdown(grpc_exec_ctx* exec_ctx, grpc_fd* fd, grpc_error* why) {
if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure, if (fd->read_closure->SetShutdown(exec_ctx, GRPC_ERROR_REF(why))) {
GRPC_ERROR_REF(why))) {
shutdown(fd->fd, SHUT_RDWR); shutdown(fd->fd, SHUT_RDWR);
grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why)); fd->write_closure->SetShutdown(exec_ctx, GRPC_ERROR_REF(why));
} }
GRPC_ERROR_UNREF(why); GRPC_ERROR_UNREF(why);
} }
static void fd_notify_on_read(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_notify_on_read(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_closure* closure) { grpc_closure* closure) {
grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read"); fd->read_closure->NotifyOn(exec_ctx, closure);
} }
static void fd_notify_on_write(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_notify_on_write(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_closure* closure) { grpc_closure* closure) {
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write"); fd->write_closure->NotifyOn(exec_ctx, closure);
} }
/******************************************************************************* /*******************************************************************************
@ -702,7 +702,7 @@ static int poll_deadline_to_millis_timeout(grpc_exec_ctx* exec_ctx,
static void fd_become_readable(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_become_readable(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_pollset* notifier) { grpc_pollset* notifier) {
grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read"); fd->read_closure->SetReady(exec_ctx);
/* Note, it is possible that fd_become_readable might be called twice with /* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll different 'notifier's when an fd becomes readable and it is in two epoll
@ -714,7 +714,7 @@ static void fd_become_readable(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
} }
static void fd_become_writable(grpc_exec_ctx* exec_ctx, grpc_fd* fd) { static void fd_become_writable(grpc_exec_ctx* exec_ctx, grpc_fd* fd) {
grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write"); fd->write_closure->SetReady(exec_ctx);
} }
static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) { static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) {

@ -50,6 +50,7 @@
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/manual_constructor.h"
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1) #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
@ -127,8 +128,8 @@ struct grpc_fd {
valid */ valid */
bool orphaned; bool orphaned;
gpr_atm read_closure; grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
gpr_atm write_closure; grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
struct grpc_fd* freelist_next; struct grpc_fd* freelist_next;
grpc_closure* on_done_closure; grpc_closure* on_done_closure;
@ -766,8 +767,8 @@ static void unref_by(grpc_fd* fd, int n) {
fd_freelist = fd; fd_freelist = fd;
grpc_iomgr_unregister_object(&fd->iomgr_object); grpc_iomgr_unregister_object(&fd->iomgr_object);
grpc_lfev_destroy(&fd->read_closure); fd->read_closure.Destroy();
grpc_lfev_destroy(&fd->write_closure); fd->write_closure.Destroy();
gpr_mu_unlock(&fd_freelist_mu); gpr_mu_unlock(&fd_freelist_mu);
} else { } else {
@ -832,8 +833,8 @@ static grpc_fd* fd_create(int fd, const char* name) {
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd; new_fd->fd = fd;
new_fd->orphaned = false; new_fd->orphaned = false;
grpc_lfev_init(&new_fd->read_closure); new_fd->read_closure.Init();
grpc_lfev_init(&new_fd->write_closure); new_fd->write_closure.Init();
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = NULL; new_fd->freelist_next = NULL;
@ -924,27 +925,26 @@ static grpc_pollset* fd_get_read_notifier_pollset(grpc_exec_ctx* exec_ctx,
} }
static bool fd_is_shutdown(grpc_fd* fd) { static bool fd_is_shutdown(grpc_fd* fd) {
return grpc_lfev_is_shutdown(&fd->read_closure); return fd->read_closure->IsShutdown();
} }
/* Might be called multiple times */ /* Might be called multiple times */
static void fd_shutdown(grpc_exec_ctx* exec_ctx, grpc_fd* fd, grpc_error* why) { static void fd_shutdown(grpc_exec_ctx* exec_ctx, grpc_fd* fd, grpc_error* why) {
if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure, if (fd->read_closure->SetShutdown(exec_ctx, GRPC_ERROR_REF(why))) {
GRPC_ERROR_REF(why))) {
shutdown(fd->fd, SHUT_RDWR); shutdown(fd->fd, SHUT_RDWR);
grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why)); fd->write_closure->SetShutdown(exec_ctx, GRPC_ERROR_REF(why));
} }
GRPC_ERROR_UNREF(why); GRPC_ERROR_UNREF(why);
} }
static void fd_notify_on_read(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_notify_on_read(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_closure* closure) { grpc_closure* closure) {
grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read"); fd->read_closure->NotifyOn(exec_ctx, closure);
} }
static void fd_notify_on_write(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_notify_on_write(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_closure* closure) { grpc_closure* closure) {
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write"); fd->write_closure->NotifyOn(exec_ctx, closure);
} }
/******************************************************************************* /*******************************************************************************
@ -1108,7 +1108,7 @@ static int poll_deadline_to_millis_timeout(grpc_exec_ctx* exec_ctx,
static void fd_become_readable(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_become_readable(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_pollset* notifier) { grpc_pollset* notifier) {
grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read"); fd->read_closure->SetReady(exec_ctx);
/* Note, it is possible that fd_become_readable might be called twice with /* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll different 'notifier's when an fd becomes readable and it is in two epoll
@ -1120,7 +1120,7 @@ static void fd_become_readable(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
} }
static void fd_become_writable(grpc_exec_ctx* exec_ctx, grpc_fd* fd) { static void fd_become_writable(grpc_exec_ctx* exec_ctx, grpc_fd* fd) {
grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write"); fd->write_closure->SetReady(exec_ctx);
} }
static void pollset_release_polling_island(grpc_exec_ctx* exec_ctx, static void pollset_release_polling_island(grpc_exec_ctx* exec_ctx,

@ -26,92 +26,96 @@ extern grpc_tracer_flag grpc_polling_trace;
/* 'state' holds the to call when the fd is readable or writable respectively. /* 'state' holds the to call when the fd is readable or writable respectively.
It can contain one of the following values: It can contain one of the following values:
CLOSURE_READY : The fd has an I/O event of interest but there is no kClosureReady : The fd has an I/O event of interest but there is no
closure yet to execute closure yet to execute
CLOSURE_NOT_READY : The fd has no I/O event of interest kClosureNotReady : The fd has no I/O event of interest
closure ptr : The closure to be executed when the fd has an I/O closure ptr : The closure to be executed when the fd has an I/O
event of interest event of interest
shutdown_error | FD_SHUTDOWN_BIT : shutdown_error | kShutdownBit :
'shutdown_error' field ORed with FD_SHUTDOWN_BIT. 'shutdown_error' field ORed with kShutdownBit.
This indicates that the fd is shutdown. Since all This indicates that the fd is shutdown. Since all
memory allocations are word-aligned, the lower two memory allocations are word-aligned, the lower two
bits of the shutdown_error pointer are always 0. So bits of the shutdown_error pointer are always 0. So
it is safe to OR these with FD_SHUTDOWN_BIT it is safe to OR these with kShutdownBit
Valid state transitions: Valid state transitions:
<closure ptr> <-----3------ CLOSURE_NOT_READY ----1----> CLOSURE_READY <closure ptr> <-----3------ kClosureNotReady -----1-------> kClosureReady
| | ^ | ^ | | | | ^ | ^ | |
| | | | | | | | | | | | | |
| +--------------4----------+ 6 +---------2---------------+ | | +--------------4----------+ 6 +---------2---------------+ |
| | | | | |
| v | | v |
+-----5-------> [shutdown_error | FD_SHUTDOWN_BIT] <----7---------+ +-----5-------> [shutdown_error | kShutdownBit] <-------7---------+
For 1, 4 : See grpc_lfev_set_ready() function For 1, 4 : See SetReady() function
For 2, 3 : See grpc_lfev_notify_on() function For 2, 3 : See NotifyOn() function
For 5,6,7: See grpc_lfev_set_shutdown() function */ For 5,6,7: See SetShutdown() function */
#define CLOSURE_NOT_READY ((gpr_atm)0) namespace grpc_core {
#define CLOSURE_READY ((gpr_atm)2)
#define FD_SHUTDOWN_BIT ((gpr_atm)1) LockfreeEvent::LockfreeEvent() {
/* Perform an atomic store to start the state machine.
void grpc_lfev_init(gpr_atm* state) { Note carefully that LockfreeEvent *MAY* be used whilst in a destroyed
gpr_atm_no_barrier_store(state, CLOSURE_NOT_READY); state, while a file descriptor is on a freelist. In such a state it may
be SetReady'd, and so we need to perform an atomic operation here to
ensure no races */
gpr_atm_no_barrier_store(&state_, kClosureNotReady);
} }
void grpc_lfev_destroy(gpr_atm* state) { LockfreeEvent::~LockfreeEvent() {
gpr_atm curr = gpr_atm_no_barrier_load(state); gpr_atm curr;
if (curr & FD_SHUTDOWN_BIT) { do {
GRPC_ERROR_UNREF((grpc_error*)(curr & ~FD_SHUTDOWN_BIT)); curr = gpr_atm_no_barrier_load(&state_);
} else { if (curr & kShutdownBit) {
GPR_ASSERT(curr == CLOSURE_NOT_READY || curr == CLOSURE_READY); GRPC_ERROR_UNREF((grpc_error*)(curr & ~kShutdownBit));
} } else {
} GPR_ASSERT(curr == kClosureNotReady || curr == kClosureReady);
}
bool grpc_lfev_is_shutdown(gpr_atm* state) { /* we CAS in a shutdown, no error value here. If this event is interacted
gpr_atm curr = gpr_atm_no_barrier_load(state); with post-deletion (see the note in the constructor) we want the bit
return (curr & FD_SHUTDOWN_BIT) != 0; pattern to prevent error retention in a deleted object */
} while (!gpr_atm_no_barrier_cas(&state_, curr,
kShutdownBit /* shutdown, no error */));
} }
void grpc_lfev_notify_on(grpc_exec_ctx* exec_ctx, gpr_atm* state, void LockfreeEvent::NotifyOn(grpc_exec_ctx* exec_ctx, grpc_closure* closure) {
grpc_closure* closure, const char* variable) {
while (true) { while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(state); gpr_atm curr = gpr_atm_no_barrier_load(&state_);
if (GRPC_TRACER_ON(grpc_polling_trace)) { if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, "lfev_notify_on[%s]: %p curr=%p closure=%p", variable, gpr_log(GPR_ERROR, "LockfreeEvent::NotifyOn: %p curr=%p closure=%p", this,
state, (void*)curr, closure); (void*)curr, closure);
} }
switch (curr) { switch (curr) {
case CLOSURE_NOT_READY: { case kClosureNotReady: {
/* CLOSURE_NOT_READY -> <closure>. /* kClosureNotReady -> <closure>.
We're guaranteed by API that there's an acquire barrier before here, We're guaranteed by API that there's an acquire barrier before here,
so there's no need to double-dip and this can be a release-only. so there's no need to double-dip and this can be a release-only.
The release itself pairs with the acquire half of a set_ready full The release itself pairs with the acquire half of a set_ready full
barrier. */ barrier. */
if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) { if (gpr_atm_rel_cas(&state_, kClosureNotReady, (gpr_atm)closure)) {
return; /* Successful. Return */ return; /* Successful. Return */
} }
break; /* retry */ break; /* retry */
} }
case CLOSURE_READY: { case kClosureReady: {
/* Change the state to CLOSURE_NOT_READY. Schedule the closure if /* Change the state to kClosureNotReady. Schedule the closure if
successful. If not, the state most likely transitioned to shutdown. successful. If not, the state most likely transitioned to shutdown.
We should retry. We should retry.
This can be a no-barrier cas since the state is being transitioned to This can be a no-barrier cas since the state is being transitioned to
CLOSURE_NOT_READY; set_ready and set_shutdown do not schedule any kClosureNotReady; set_ready and set_shutdown do not schedule any
closure when transitioning out of CLOSURE_NO_READY state (i.e there closure when transitioning out of CLOSURE_NO_READY state (i.e there
is no other code that needs to 'happen-after' this) */ is no other code that needs to 'happen-after' this) */
if (gpr_atm_no_barrier_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) { if (gpr_atm_no_barrier_cas(&state_, kClosureReady, kClosureNotReady)) {
GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
return; /* Successful. Return */ return; /* Successful. Return */
} }
@ -123,8 +127,8 @@ void grpc_lfev_notify_on(grpc_exec_ctx* exec_ctx, gpr_atm* state,
/* 'curr' is either a closure or the fd is shutdown(in which case 'curr' /* 'curr' is either a closure or the fd is shutdown(in which case 'curr'
contains a pointer to the shutdown-error). If the fd is shutdown, contains a pointer to the shutdown-error). If the fd is shutdown,
schedule the closure with the shutdown error */ schedule the closure with the shutdown error */
if ((curr & FD_SHUTDOWN_BIT) > 0) { if ((curr & kShutdownBit) > 0) {
grpc_error* shutdown_err = (grpc_error*)(curr & ~FD_SHUTDOWN_BIT); grpc_error* shutdown_err = (grpc_error*)(curr & ~kShutdownBit);
GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_CLOSURE_SCHED(exec_ctx, closure,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1)); "FD Shutdown", &shutdown_err, 1));
@ -133,7 +137,8 @@ void grpc_lfev_notify_on(grpc_exec_ctx* exec_ctx, gpr_atm* state,
/* There is already a closure!. This indicates a bug in the code */ /* There is already a closure!. This indicates a bug in the code */
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"notify_on called with a previous callback still pending"); "LockfreeEvent::NotifyOn: notify_on called with a previous "
"callback still pending");
abort(); abort();
} }
} }
@ -142,22 +147,22 @@ void grpc_lfev_notify_on(grpc_exec_ctx* exec_ctx, gpr_atm* state,
GPR_UNREACHABLE_CODE(return ); GPR_UNREACHABLE_CODE(return );
} }
bool grpc_lfev_set_shutdown(grpc_exec_ctx* exec_ctx, gpr_atm* state, bool LockfreeEvent::SetShutdown(grpc_exec_ctx* exec_ctx,
grpc_error* shutdown_err) { grpc_error* shutdown_err) {
gpr_atm new_state = (gpr_atm)shutdown_err | FD_SHUTDOWN_BIT; gpr_atm new_state = (gpr_atm)shutdown_err | kShutdownBit;
while (true) { while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(state); gpr_atm curr = gpr_atm_no_barrier_load(&state_);
if (GRPC_TRACER_ON(grpc_polling_trace)) { if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, "lfev_set_shutdown: %p curr=%p err=%s", state, gpr_log(GPR_ERROR, "LockfreeEvent::SetShutdown: %p curr=%p err=%s",
(void*)curr, grpc_error_string(shutdown_err)); &state_, (void*)curr, grpc_error_string(shutdown_err));
} }
switch (curr) { switch (curr) {
case CLOSURE_READY: case kClosureReady:
case CLOSURE_NOT_READY: case kClosureNotReady:
/* Need a full barrier here so that the initial load in notify_on /* Need a full barrier here so that the initial load in notify_on
doesn't need a barrier */ doesn't need a barrier */
if (gpr_atm_full_cas(state, curr, new_state)) { if (gpr_atm_full_cas(&state_, curr, new_state)) {
return true; /* early out */ return true; /* early out */
} }
break; /* retry */ break; /* retry */
@ -166,7 +171,7 @@ bool grpc_lfev_set_shutdown(grpc_exec_ctx* exec_ctx, gpr_atm* state,
/* 'curr' is either a closure or the fd is already shutdown */ /* 'curr' is either a closure or the fd is already shutdown */
/* If fd is already shutdown, we are done */ /* If fd is already shutdown, we are done */
if ((curr & FD_SHUTDOWN_BIT) > 0) { if ((curr & kShutdownBit) > 0) {
GRPC_ERROR_UNREF(shutdown_err); GRPC_ERROR_UNREF(shutdown_err);
return false; return false;
} }
@ -176,7 +181,7 @@ bool grpc_lfev_set_shutdown(grpc_exec_ctx* exec_ctx, gpr_atm* state,
Needs an acquire to pair with setting the closure (and get a Needs an acquire to pair with setting the closure (and get a
happens-after on that edge), and a release to pair with anything happens-after on that edge), and a release to pair with anything
loading the shutdown state. */ loading the shutdown state. */
if (gpr_atm_full_cas(state, curr, new_state)) { if (gpr_atm_full_cas(&state_, curr, new_state)) {
GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure*)curr, GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure*)curr,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1)); "FD Shutdown", &shutdown_err, 1));
@ -193,26 +198,25 @@ bool grpc_lfev_set_shutdown(grpc_exec_ctx* exec_ctx, gpr_atm* state,
GPR_UNREACHABLE_CODE(return false); GPR_UNREACHABLE_CODE(return false);
} }
void grpc_lfev_set_ready(grpc_exec_ctx* exec_ctx, gpr_atm* state, void LockfreeEvent::SetReady(grpc_exec_ctx* exec_ctx) {
const char* variable) {
while (true) { while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(state); gpr_atm curr = gpr_atm_no_barrier_load(&state_);
if (GRPC_TRACER_ON(grpc_polling_trace)) { if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, "lfev_set_ready[%s]: %p curr=%p", variable, state, gpr_log(GPR_ERROR, "LockfreeEvent::SetReady: %p curr=%p", &state_,
(void*)curr); (void*)curr);
} }
switch (curr) { switch (curr) {
case CLOSURE_READY: { case kClosureReady: {
/* Already ready. We are done here */ /* Already ready. We are done here */
return; return;
} }
case CLOSURE_NOT_READY: { case kClosureNotReady: {
/* No barrier required as we're transitioning to a state that does not /* No barrier required as we're transitioning to a state that does not
involve a closure */ involve a closure */
if (gpr_atm_no_barrier_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) { if (gpr_atm_no_barrier_cas(&state_, kClosureNotReady, kClosureReady)) {
return; /* early out */ return; /* early out */
} }
break; /* retry */ break; /* retry */
@ -220,14 +224,14 @@ void grpc_lfev_set_ready(grpc_exec_ctx* exec_ctx, gpr_atm* state,
default: { default: {
/* 'curr' is either a closure or the fd is shutdown */ /* 'curr' is either a closure or the fd is shutdown */
if ((curr & FD_SHUTDOWN_BIT) > 0) { if ((curr & kShutdownBit) > 0) {
/* The fd is shutdown. Do nothing */ /* The fd is shutdown. Do nothing */
return; return;
} }
/* Full cas: acquire pairs with this cas' release in the event of a /* Full cas: acquire pairs with this cas' release in the event of a
spurious set_ready; release pairs with this or the acquire in spurious set_ready; release pairs with this or the acquire in
notify_on (or set_shutdown) */ notify_on (or set_shutdown) */
else if (gpr_atm_full_cas(state, curr, CLOSURE_NOT_READY)) { else if (gpr_atm_full_cas(&state_, curr, kClosureNotReady)) {
GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure*)curr, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure*)curr, GRPC_ERROR_NONE);
return; return;
} }
@ -239,3 +243,5 @@ void grpc_lfev_set_ready(grpc_exec_ctx* exec_ctx, gpr_atm* state,
} }
} }
} }
} // namespace grpc_core

@ -25,24 +25,30 @@
#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/exec_ctx.h"
#ifdef __cplusplus namespace grpc_core {
extern "C" {
#endif class LockfreeEvent {
public:
void grpc_lfev_init(gpr_atm* state); LockfreeEvent();
void grpc_lfev_destroy(gpr_atm* state); ~LockfreeEvent();
bool grpc_lfev_is_shutdown(gpr_atm* state);
LockfreeEvent(const LockfreeEvent&) = delete;
void grpc_lfev_notify_on(grpc_exec_ctx* exec_ctx, gpr_atm* state, LockfreeEvent& operator=(const LockfreeEvent&) = delete;
grpc_closure* closure, const char* variable);
/* Returns true on first successful shutdown */ bool IsShutdown() const {
bool grpc_lfev_set_shutdown(grpc_exec_ctx* exec_ctx, gpr_atm* state, return (gpr_atm_no_barrier_load(&state_) & kShutdownBit) != 0;
grpc_error* shutdown_err); }
void grpc_lfev_set_ready(grpc_exec_ctx* exec_ctx, gpr_atm* state,
const char* variable); void NotifyOn(grpc_exec_ctx* exec_ctx, grpc_closure* closure);
bool SetShutdown(grpc_exec_ctx* exec_ctx, grpc_error* error);
#ifdef __cplusplus void SetReady(grpc_exec_ctx* exec_ctx);
}
#endif private:
enum State { kClosureNotReady = 0, kClosureReady = 2, kShutdownBit = 1 };
gpr_atm state_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_IOMGR_LOCKFREE_EVENT_H */ #endif /* GRPC_CORE_LIB_IOMGR_LOCKFREE_EVENT_H */

Loading…
Cancel
Save