Merge branch 'master' into cc-tsan-1

pull/13294/head
Sree Kuchibhotla 7 years ago
commit c4a973ca5c
  1. 4
      src/core/ext/filters/client_channel/uri_parser.cc
  2. 30
      src/core/lib/iomgr/ev_epoll1_linux.cc
  3. 28
      src/core/lib/iomgr/ev_epollex_linux.cc
  4. 28
      src/core/lib/iomgr/ev_epollsig_linux.cc
  5. 113
      src/core/lib/iomgr/lockfree_event.cc
  6. 42
      src/core/lib/iomgr/lockfree_event.h
  7. 10
      test/core/end2end/data/client_certs.cc
  8. 4
      test/core/end2end/data/server1_cert.cc
  9. 4
      test/core/end2end/data/server1_key.cc
  10. 4
      test/core/end2end/data/test_root_cert.cc

@ -59,7 +59,9 @@ static grpc_uri* bad_uri(const char* uri_text, size_t pos, const char* section,
static char* decode_and_copy_component(grpc_exec_ctx* exec_ctx, const char* src, static char* decode_and_copy_component(grpc_exec_ctx* exec_ctx, const char* src,
size_t begin, size_t end) { size_t begin, size_t end) {
grpc_slice component = grpc_slice component =
grpc_slice_from_copied_buffer(src + begin, end - begin); (begin == NOT_SET || end == NOT_SET)
? grpc_empty_slice()
: grpc_slice_from_copied_buffer(src + begin, end - begin);
grpc_slice decoded_component = grpc_slice decoded_component =
grpc_permissive_percent_decode_slice(component); grpc_permissive_percent_decode_slice(component);
char* out = grpc_dump_slice(decoded_component, GPR_DUMP_ASCII); char* out = grpc_dump_slice(decoded_component, GPR_DUMP_ASCII);

@ -46,7 +46,6 @@
#include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/lockfree_event.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/support/string.h" #include "src/core/lib/support/string.h"
static grpc_wakeup_fd global_wakeup_fd; static grpc_wakeup_fd global_wakeup_fd;
@ -112,8 +111,8 @@ static void epoll_set_shutdown() {
struct grpc_fd { struct grpc_fd {
int fd; int fd;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure; gpr_atm read_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure; gpr_atm write_closure;
struct grpc_fd* freelist_next; struct grpc_fd* freelist_next;
@ -265,8 +264,8 @@ static grpc_fd* fd_create(int fd, const char* name) {
} }
new_fd->fd = fd; new_fd->fd = fd;
new_fd->read_closure.Init(); grpc_lfev_init(&new_fd->read_closure);
new_fd->write_closure.Init(); grpc_lfev_init(&new_fd->write_closure);
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = NULL; new_fd->freelist_next = NULL;
@ -298,11 +297,12 @@ static int fd_wrapped_fd(grpc_fd* fd) { return fd->fd; }
* shutdown() syscall on that fd) */ * shutdown() syscall on that fd) */
static void fd_shutdown_internal(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_shutdown_internal(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_error* why, bool releasing_fd) { grpc_error* why, bool releasing_fd) {
if (fd->read_closure->SetShutdown(exec_ctx, GRPC_ERROR_REF(why))) { if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
GRPC_ERROR_REF(why))) {
if (!releasing_fd) { if (!releasing_fd) {
shutdown(fd->fd, SHUT_RDWR); shutdown(fd->fd, SHUT_RDWR);
} }
fd->write_closure->SetShutdown(exec_ctx, GRPC_ERROR_REF(why)); grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
} }
GRPC_ERROR_UNREF(why); GRPC_ERROR_UNREF(why);
} }
@ -318,7 +318,7 @@ static void fd_orphan(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
bool is_release_fd = (release_fd != NULL); bool is_release_fd = (release_fd != NULL);
if (!fd->read_closure->IsShutdown()) { if (!grpc_lfev_is_shutdown(&fd->read_closure)) {
fd_shutdown_internal(exec_ctx, fd, fd_shutdown_internal(exec_ctx, fd,
GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason), GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason),
is_release_fd); is_release_fd);
@ -335,8 +335,8 @@ static void fd_orphan(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
GRPC_CLOSURE_SCHED(exec_ctx, on_done, GRPC_ERROR_REF(error)); GRPC_CLOSURE_SCHED(exec_ctx, on_done, GRPC_ERROR_REF(error));
grpc_iomgr_unregister_object(&fd->iomgr_object); grpc_iomgr_unregister_object(&fd->iomgr_object);
fd->read_closure.Destroy(); grpc_lfev_destroy(&fd->read_closure);
fd->write_closure.Destroy(); grpc_lfev_destroy(&fd->write_closure);
gpr_mu_lock(&fd_freelist_mu); gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist; fd->freelist_next = fd_freelist;
@ -351,28 +351,28 @@ static grpc_pollset* fd_get_read_notifier_pollset(grpc_exec_ctx* exec_ctx,
} }
static bool fd_is_shutdown(grpc_fd* fd) { static bool fd_is_shutdown(grpc_fd* fd) {
return fd->read_closure->IsShutdown(); return grpc_lfev_is_shutdown(&fd->read_closure);
} }
static void fd_notify_on_read(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_notify_on_read(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_closure* closure) { grpc_closure* closure) {
fd->read_closure->NotifyOn(exec_ctx, closure); grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
} }
static void fd_notify_on_write(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_notify_on_write(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_closure* closure) { grpc_closure* closure) {
fd->write_closure->NotifyOn(exec_ctx, closure); grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
} }
static void fd_become_readable(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_become_readable(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_pollset* notifier) { grpc_pollset* notifier) {
fd->read_closure->SetReady(exec_ctx); grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Use release store to match with acquire load in fd_get_read_notifier */ /* Use release store to match with acquire load in fd_get_read_notifier */
gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier); gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
} }
static void fd_become_writable(grpc_exec_ctx* exec_ctx, grpc_fd* fd) { static void fd_become_writable(grpc_exec_ctx* exec_ctx, grpc_fd* fd) {
fd->write_closure->SetReady(exec_ctx); grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
} }
/******************************************************************************* /*******************************************************************************

@ -48,7 +48,6 @@
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/support/spinlock.h" #include "src/core/lib/support/spinlock.h"
// debug aid: create workers on the heap (allows asan to spot // debug aid: create workers on the heap (allows asan to spot
@ -154,8 +153,8 @@ struct grpc_fd {
gpr_mu pollable_mu; gpr_mu pollable_mu;
pollable* pollable_obj; pollable* pollable_obj;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure; gpr_atm read_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure; gpr_atm write_closure;
struct grpc_fd* freelist_next; struct grpc_fd* freelist_next;
grpc_closure* on_done_closure; grpc_closure* on_done_closure;
@ -287,8 +286,8 @@ static void fd_destroy(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
fd->freelist_next = fd_freelist; fd->freelist_next = fd_freelist;
fd_freelist = fd; fd_freelist = fd;
fd->read_closure.Destroy(); grpc_lfev_destroy(&fd->read_closure);
fd->write_closure.Destroy(); grpc_lfev_destroy(&fd->write_closure);
gpr_mu_unlock(&fd_freelist_mu); gpr_mu_unlock(&fd_freelist_mu);
} }
@ -348,8 +347,8 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd->pollable_obj = NULL; new_fd->pollable_obj = NULL;
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd; new_fd->fd = fd;
new_fd->read_closure.Init(); grpc_lfev_init(&new_fd->read_closure);
new_fd->write_closure.Init(); grpc_lfev_init(&new_fd->write_closure);
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = NULL; new_fd->freelist_next = NULL;
@ -412,26 +411,27 @@ static grpc_pollset* fd_get_read_notifier_pollset(grpc_exec_ctx* exec_ctx,
} }
static bool fd_is_shutdown(grpc_fd* fd) { static bool fd_is_shutdown(grpc_fd* fd) {
return fd->read_closure->IsShutdown(); return grpc_lfev_is_shutdown(&fd->read_closure);
} }
/* Might be called multiple times */ /* Might be called multiple times */
static void fd_shutdown(grpc_exec_ctx* exec_ctx, grpc_fd* fd, grpc_error* why) { static void fd_shutdown(grpc_exec_ctx* exec_ctx, grpc_fd* fd, grpc_error* why) {
if (fd->read_closure->SetShutdown(exec_ctx, GRPC_ERROR_REF(why))) { if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
GRPC_ERROR_REF(why))) {
shutdown(fd->fd, SHUT_RDWR); shutdown(fd->fd, SHUT_RDWR);
fd->write_closure->SetShutdown(exec_ctx, GRPC_ERROR_REF(why)); grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
} }
GRPC_ERROR_UNREF(why); GRPC_ERROR_UNREF(why);
} }
static void fd_notify_on_read(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_notify_on_read(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_closure* closure) { grpc_closure* closure) {
fd->read_closure->NotifyOn(exec_ctx, closure); grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
} }
static void fd_notify_on_write(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_notify_on_write(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_closure* closure) { grpc_closure* closure) {
fd->write_closure->NotifyOn(exec_ctx, closure); grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
} }
/******************************************************************************* /*******************************************************************************
@ -702,7 +702,7 @@ static int poll_deadline_to_millis_timeout(grpc_exec_ctx* exec_ctx,
static void fd_become_readable(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_become_readable(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_pollset* notifier) { grpc_pollset* notifier) {
fd->read_closure->SetReady(exec_ctx); grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Note, it is possible that fd_become_readable might be called twice with /* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll different 'notifier's when an fd becomes readable and it is in two epoll
@ -714,7 +714,7 @@ static void fd_become_readable(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
} }
static void fd_become_writable(grpc_exec_ctx* exec_ctx, grpc_fd* fd) { static void fd_become_writable(grpc_exec_ctx* exec_ctx, grpc_fd* fd) {
fd->write_closure->SetReady(exec_ctx); grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
} }
static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) { static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) {

@ -50,7 +50,6 @@
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/manual_constructor.h"
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1) #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
@ -128,8 +127,8 @@ struct grpc_fd {
valid */ valid */
bool orphaned; bool orphaned;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure; gpr_atm read_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure; gpr_atm write_closure;
struct grpc_fd* freelist_next; struct grpc_fd* freelist_next;
grpc_closure* on_done_closure; grpc_closure* on_done_closure;
@ -767,8 +766,8 @@ static void unref_by(grpc_fd* fd, int n) {
fd_freelist = fd; fd_freelist = fd;
grpc_iomgr_unregister_object(&fd->iomgr_object); grpc_iomgr_unregister_object(&fd->iomgr_object);
fd->read_closure.Destroy(); grpc_lfev_destroy(&fd->read_closure);
fd->write_closure.Destroy(); grpc_lfev_destroy(&fd->write_closure);
gpr_mu_unlock(&fd_freelist_mu); gpr_mu_unlock(&fd_freelist_mu);
} else { } else {
@ -833,8 +832,8 @@ static grpc_fd* fd_create(int fd, const char* name) {
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd; new_fd->fd = fd;
new_fd->orphaned = false; new_fd->orphaned = false;
new_fd->read_closure.Init(); grpc_lfev_init(&new_fd->read_closure);
new_fd->write_closure.Init(); grpc_lfev_init(&new_fd->write_closure);
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = NULL; new_fd->freelist_next = NULL;
@ -925,26 +924,27 @@ static grpc_pollset* fd_get_read_notifier_pollset(grpc_exec_ctx* exec_ctx,
} }
static bool fd_is_shutdown(grpc_fd* fd) { static bool fd_is_shutdown(grpc_fd* fd) {
return fd->read_closure->IsShutdown(); return grpc_lfev_is_shutdown(&fd->read_closure);
} }
/* Might be called multiple times */ /* Might be called multiple times */
static void fd_shutdown(grpc_exec_ctx* exec_ctx, grpc_fd* fd, grpc_error* why) { static void fd_shutdown(grpc_exec_ctx* exec_ctx, grpc_fd* fd, grpc_error* why) {
if (fd->read_closure->SetShutdown(exec_ctx, GRPC_ERROR_REF(why))) { if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
GRPC_ERROR_REF(why))) {
shutdown(fd->fd, SHUT_RDWR); shutdown(fd->fd, SHUT_RDWR);
fd->write_closure->SetShutdown(exec_ctx, GRPC_ERROR_REF(why)); grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
} }
GRPC_ERROR_UNREF(why); GRPC_ERROR_UNREF(why);
} }
static void fd_notify_on_read(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_notify_on_read(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_closure* closure) { grpc_closure* closure) {
fd->read_closure->NotifyOn(exec_ctx, closure); grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
} }
static void fd_notify_on_write(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_notify_on_write(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_closure* closure) { grpc_closure* closure) {
fd->write_closure->NotifyOn(exec_ctx, closure); grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
} }
/******************************************************************************* /*******************************************************************************
@ -1108,7 +1108,7 @@ static int poll_deadline_to_millis_timeout(grpc_exec_ctx* exec_ctx,
static void fd_become_readable(grpc_exec_ctx* exec_ctx, grpc_fd* fd, static void fd_become_readable(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
grpc_pollset* notifier) { grpc_pollset* notifier) {
fd->read_closure->SetReady(exec_ctx); grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Note, it is possible that fd_become_readable might be called twice with /* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll different 'notifier's when an fd becomes readable and it is in two epoll
@ -1120,7 +1120,7 @@ static void fd_become_readable(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
} }
static void fd_become_writable(grpc_exec_ctx* exec_ctx, grpc_fd* fd) { static void fd_become_writable(grpc_exec_ctx* exec_ctx, grpc_fd* fd) {
fd->write_closure->SetReady(exec_ctx); grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
} }
static void pollset_release_polling_island(grpc_exec_ctx* exec_ctx, static void pollset_release_polling_island(grpc_exec_ctx* exec_ctx,

@ -26,79 +26,92 @@ extern grpc_tracer_flag grpc_polling_trace;
/* 'state' holds the to call when the fd is readable or writable respectively. /* 'state' holds the to call when the fd is readable or writable respectively.
It can contain one of the following values: It can contain one of the following values:
kClosureReady : The fd has an I/O event of interest but there is no CLOSURE_READY : The fd has an I/O event of interest but there is no
closure yet to execute closure yet to execute
kClosureNotReady : The fd has no I/O event of interest CLOSURE_NOT_READY : The fd has no I/O event of interest
closure ptr : The closure to be executed when the fd has an I/O closure ptr : The closure to be executed when the fd has an I/O
event of interest event of interest
shutdown_error | kShutdownBit : shutdown_error | FD_SHUTDOWN_BIT :
'shutdown_error' field ORed with kShutdownBit. 'shutdown_error' field ORed with FD_SHUTDOWN_BIT.
This indicates that the fd is shutdown. Since all This indicates that the fd is shutdown. Since all
memory allocations are word-aligned, the lower two memory allocations are word-aligned, the lower two
bits of the shutdown_error pointer are always 0. So bits of the shutdown_error pointer are always 0. So
it is safe to OR these with kShutdownBit it is safe to OR these with FD_SHUTDOWN_BIT
Valid state transitions: Valid state transitions:
<closure ptr> <-----3------ kClosureNotReady -----1-------> kClosureReady <closure ptr> <-----3------ CLOSURE_NOT_READY ----1----> CLOSURE_READY
| | ^ | ^ | | | | ^ | ^ | |
| | | | | | | | | | | | | |
| +--------------4----------+ 6 +---------2---------------+ | | +--------------4----------+ 6 +---------2---------------+ |
| | | | | |
| v | | v |
+-----5-------> [shutdown_error | kShutdownBit] <-------7---------+ +-----5-------> [shutdown_error | FD_SHUTDOWN_BIT] <----7---------+
For 1, 4 : See SetReady() function For 1, 4 : See grpc_lfev_set_ready() function
For 2, 3 : See NotifyOn() function For 2, 3 : See grpc_lfev_notify_on() function
For 5,6,7: See SetShutdown() function */ For 5,6,7: See grpc_lfev_set_shutdown() function */
namespace grpc_core { #define CLOSURE_NOT_READY ((gpr_atm)0)
#define CLOSURE_READY ((gpr_atm)2)
LockfreeEvent::~LockfreeEvent() { #define FD_SHUTDOWN_BIT ((gpr_atm)1)
gpr_atm curr = gpr_atm_no_barrier_load(&state_);
if (curr & kShutdownBit) { void grpc_lfev_init(gpr_atm* state) {
GRPC_ERROR_UNREF((grpc_error*)(curr & ~kShutdownBit)); gpr_atm_no_barrier_store(state, CLOSURE_NOT_READY);
}
void grpc_lfev_destroy(gpr_atm* state) {
gpr_atm curr = gpr_atm_no_barrier_load(state);
if (curr & FD_SHUTDOWN_BIT) {
GRPC_ERROR_UNREF((grpc_error*)(curr & ~FD_SHUTDOWN_BIT));
} else { } else {
GPR_ASSERT(curr == kClosureNotReady || curr == kClosureReady); GPR_ASSERT(curr == CLOSURE_NOT_READY || curr == CLOSURE_READY);
} }
} }
void LockfreeEvent::NotifyOn(grpc_exec_ctx* exec_ctx, grpc_closure* closure) { bool grpc_lfev_is_shutdown(gpr_atm* state) {
gpr_atm curr = gpr_atm_no_barrier_load(state);
return (curr & FD_SHUTDOWN_BIT) != 0;
}
void grpc_lfev_notify_on(grpc_exec_ctx* exec_ctx, gpr_atm* state,
grpc_closure* closure, const char* variable) {
while (true) { while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(&state_); gpr_atm curr = gpr_atm_no_barrier_load(state);
if (GRPC_TRACER_ON(grpc_polling_trace)) { if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, "LockfreeEvent::NotifyOn: %p curr=%p closure=%p", this, gpr_log(GPR_ERROR, "lfev_notify_on[%s]: %p curr=%p closure=%p", variable,
(void*)curr, closure); state, (void*)curr, closure);
} }
switch (curr) { switch (curr) {
case kClosureNotReady: { case CLOSURE_NOT_READY: {
/* kClosureNotReady -> <closure>. /* CLOSURE_NOT_READY -> <closure>.
We're guaranteed by API that there's an acquire barrier before here, We're guaranteed by API that there's an acquire barrier before here,
so there's no need to double-dip and this can be a release-only. so there's no need to double-dip and this can be a release-only.
The release itself pairs with the acquire half of a set_ready full The release itself pairs with the acquire half of a set_ready full
barrier. */ barrier. */
if (gpr_atm_rel_cas(&state_, kClosureNotReady, (gpr_atm)closure)) { if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) {
return; /* Successful. Return */ return; /* Successful. Return */
} }
break; /* retry */ break; /* retry */
} }
case kClosureReady: { case CLOSURE_READY: {
/* Change the state to kClosureNotReady. Schedule the closure if /* Change the state to CLOSURE_NOT_READY. Schedule the closure if
successful. If not, the state most likely transitioned to shutdown. successful. If not, the state most likely transitioned to shutdown.
We should retry. We should retry.
This can be a no-barrier cas since the state is being transitioned to This can be a no-barrier cas since the state is being transitioned to
kClosureNotReady; set_ready and set_shutdown do not schedule any CLOSURE_NOT_READY; set_ready and set_shutdown do not schedule any
closure when transitioning out of CLOSURE_NO_READY state (i.e there closure when transitioning out of CLOSURE_NO_READY state (i.e there
is no other code that needs to 'happen-after' this) */ is no other code that needs to 'happen-after' this) */
if (gpr_atm_no_barrier_cas(&state_, kClosureReady, kClosureNotReady)) { if (gpr_atm_no_barrier_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) {
GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
return; /* Successful. Return */ return; /* Successful. Return */
} }
@ -110,8 +123,8 @@ void LockfreeEvent::NotifyOn(grpc_exec_ctx* exec_ctx, grpc_closure* closure) {
/* 'curr' is either a closure or the fd is shutdown(in which case 'curr' /* 'curr' is either a closure or the fd is shutdown(in which case 'curr'
contains a pointer to the shutdown-error). If the fd is shutdown, contains a pointer to the shutdown-error). If the fd is shutdown,
schedule the closure with the shutdown error */ schedule the closure with the shutdown error */
if ((curr & kShutdownBit) > 0) { if ((curr & FD_SHUTDOWN_BIT) > 0) {
grpc_error* shutdown_err = (grpc_error*)(curr & ~kShutdownBit); grpc_error* shutdown_err = (grpc_error*)(curr & ~FD_SHUTDOWN_BIT);
GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_CLOSURE_SCHED(exec_ctx, closure,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1)); "FD Shutdown", &shutdown_err, 1));
@ -120,8 +133,7 @@ void LockfreeEvent::NotifyOn(grpc_exec_ctx* exec_ctx, grpc_closure* closure) {
/* There is already a closure!. This indicates a bug in the code */ /* There is already a closure!. This indicates a bug in the code */
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"LockfreeEvent::NotifyOn: notify_on called with a previous " "notify_on called with a previous callback still pending");
"callback still pending");
abort(); abort();
} }
} }
@ -130,22 +142,22 @@ void LockfreeEvent::NotifyOn(grpc_exec_ctx* exec_ctx, grpc_closure* closure) {
GPR_UNREACHABLE_CODE(return ); GPR_UNREACHABLE_CODE(return );
} }
bool LockfreeEvent::SetShutdown(grpc_exec_ctx* exec_ctx, bool grpc_lfev_set_shutdown(grpc_exec_ctx* exec_ctx, gpr_atm* state,
grpc_error* shutdown_err) { grpc_error* shutdown_err) {
gpr_atm new_state = (gpr_atm)shutdown_err | kShutdownBit; gpr_atm new_state = (gpr_atm)shutdown_err | FD_SHUTDOWN_BIT;
while (true) { while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(&state_); gpr_atm curr = gpr_atm_no_barrier_load(state);
if (GRPC_TRACER_ON(grpc_polling_trace)) { if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, "LockfreeEvent::SetShutdown: %p curr=%p err=%s", gpr_log(GPR_ERROR, "lfev_set_shutdown: %p curr=%p err=%s", state,
&state_, (void*)curr, grpc_error_string(shutdown_err)); (void*)curr, grpc_error_string(shutdown_err));
} }
switch (curr) { switch (curr) {
case kClosureReady: case CLOSURE_READY:
case kClosureNotReady: case CLOSURE_NOT_READY:
/* Need a full barrier here so that the initial load in notify_on /* Need a full barrier here so that the initial load in notify_on
doesn't need a barrier */ doesn't need a barrier */
if (gpr_atm_full_cas(&state_, curr, new_state)) { if (gpr_atm_full_cas(state, curr, new_state)) {
return true; /* early out */ return true; /* early out */
} }
break; /* retry */ break; /* retry */
@ -154,7 +166,7 @@ bool LockfreeEvent::SetShutdown(grpc_exec_ctx* exec_ctx,
/* 'curr' is either a closure or the fd is already shutdown */ /* 'curr' is either a closure or the fd is already shutdown */
/* If fd is already shutdown, we are done */ /* If fd is already shutdown, we are done */
if ((curr & kShutdownBit) > 0) { if ((curr & FD_SHUTDOWN_BIT) > 0) {
GRPC_ERROR_UNREF(shutdown_err); GRPC_ERROR_UNREF(shutdown_err);
return false; return false;
} }
@ -164,7 +176,7 @@ bool LockfreeEvent::SetShutdown(grpc_exec_ctx* exec_ctx,
Needs an acquire to pair with setting the closure (and get a Needs an acquire to pair with setting the closure (and get a
happens-after on that edge), and a release to pair with anything happens-after on that edge), and a release to pair with anything
loading the shutdown state. */ loading the shutdown state. */
if (gpr_atm_full_cas(&state_, curr, new_state)) { if (gpr_atm_full_cas(state, curr, new_state)) {
GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure*)curr, GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure*)curr,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1)); "FD Shutdown", &shutdown_err, 1));
@ -181,25 +193,26 @@ bool LockfreeEvent::SetShutdown(grpc_exec_ctx* exec_ctx,
GPR_UNREACHABLE_CODE(return false); GPR_UNREACHABLE_CODE(return false);
} }
void LockfreeEvent::SetReady(grpc_exec_ctx* exec_ctx) { void grpc_lfev_set_ready(grpc_exec_ctx* exec_ctx, gpr_atm* state,
const char* variable) {
while (true) { while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(&state_); gpr_atm curr = gpr_atm_no_barrier_load(state);
if (GRPC_TRACER_ON(grpc_polling_trace)) { if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, "LockfreeEvent::SetReady: %p curr=%p", &state_, gpr_log(GPR_ERROR, "lfev_set_ready[%s]: %p curr=%p", variable, state,
(void*)curr); (void*)curr);
} }
switch (curr) { switch (curr) {
case kClosureReady: { case CLOSURE_READY: {
/* Already ready. We are done here */ /* Already ready. We are done here */
return; return;
} }
case kClosureNotReady: { case CLOSURE_NOT_READY: {
/* No barrier required as we're transitioning to a state that does not /* No barrier required as we're transitioning to a state that does not
involve a closure */ involve a closure */
if (gpr_atm_no_barrier_cas(&state_, kClosureNotReady, kClosureReady)) { if (gpr_atm_no_barrier_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) {
return; /* early out */ return; /* early out */
} }
break; /* retry */ break; /* retry */
@ -207,14 +220,14 @@ void LockfreeEvent::SetReady(grpc_exec_ctx* exec_ctx) {
default: { default: {
/* 'curr' is either a closure or the fd is shutdown */ /* 'curr' is either a closure or the fd is shutdown */
if ((curr & kShutdownBit) > 0) { if ((curr & FD_SHUTDOWN_BIT) > 0) {
/* The fd is shutdown. Do nothing */ /* The fd is shutdown. Do nothing */
return; return;
} }
/* Full cas: acquire pairs with this cas' release in the event of a /* Full cas: acquire pairs with this cas' release in the event of a
spurious set_ready; release pairs with this or the acquire in spurious set_ready; release pairs with this or the acquire in
notify_on (or set_shutdown) */ notify_on (or set_shutdown) */
else if (gpr_atm_full_cas(&state_, curr, kClosureNotReady)) { else if (gpr_atm_full_cas(state, curr, CLOSURE_NOT_READY)) {
GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure*)curr, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure*)curr, GRPC_ERROR_NONE);
return; return;
} }
@ -226,5 +239,3 @@ void LockfreeEvent::SetReady(grpc_exec_ctx* exec_ctx) {
} }
} }
} }
} // namespace grpc_core

@ -25,30 +25,24 @@
#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc_core { #ifdef __cplusplus
extern "C" {
class LockfreeEvent { #endif
public:
LockfreeEvent() = default; void grpc_lfev_init(gpr_atm* state);
~LockfreeEvent(); void grpc_lfev_destroy(gpr_atm* state);
bool grpc_lfev_is_shutdown(gpr_atm* state);
LockfreeEvent(const LockfreeEvent&) = delete;
LockfreeEvent& operator=(const LockfreeEvent&) = delete; void grpc_lfev_notify_on(grpc_exec_ctx* exec_ctx, gpr_atm* state,
grpc_closure* closure, const char* variable);
bool IsShutdown() const { /* Returns true on first successful shutdown */
return (gpr_atm_no_barrier_load(&state_) & kShutdownBit) != 0; bool grpc_lfev_set_shutdown(grpc_exec_ctx* exec_ctx, gpr_atm* state,
grpc_error* shutdown_err);
void grpc_lfev_set_ready(grpc_exec_ctx* exec_ctx, gpr_atm* state,
const char* variable);
#ifdef __cplusplus
} }
#endif
void NotifyOn(grpc_exec_ctx* exec_ctx, grpc_closure* closure);
bool SetShutdown(grpc_exec_ctx* exec_ctx, grpc_error* error);
void SetReady(grpc_exec_ctx* exec_ctx);
private:
enum State { kClosureNotReady = 0, kClosureReady = 2, kShutdownBit = 1 };
gpr_atm state_ = kClosureNotReady;
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_IOMGR_LOCKFREE_EVENT_H */ #endif /* GRPC_CORE_LIB_IOMGR_LOCKFREE_EVENT_H */

@ -16,9 +16,7 @@
* *
*/ */
#include "test/core/end2end/data/ssl_test_data.h" extern "C" const char test_self_signed_client_cert[] = {
const char test_self_signed_client_cert[] = {
0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x42, 0x45, 0x47, 0x49, 0x4e, 0x20, 0x43, 0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x42, 0x45, 0x47, 0x49, 0x4e, 0x20, 0x43,
0x45, 0x52, 0x54, 0x49, 0x46, 0x49, 0x43, 0x41, 0x54, 0x45, 0x2d, 0x2d, 0x45, 0x52, 0x54, 0x49, 0x46, 0x49, 0x43, 0x41, 0x54, 0x45, 0x2d, 0x2d,
0x2d, 0x2d, 0x2d, 0x0a, 0x4d, 0x49, 0x49, 0x43, 0x6f, 0x44, 0x43, 0x43, 0x2d, 0x2d, 0x2d, 0x0a, 0x4d, 0x49, 0x49, 0x43, 0x6f, 0x44, 0x43, 0x43,
@ -102,7 +100,7 @@ const char test_self_signed_client_cert[] = {
0x49, 0x46, 0x49, 0x43, 0x41, 0x54, 0x45, 0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x49, 0x46, 0x49, 0x43, 0x41, 0x54, 0x45, 0x2d, 0x2d, 0x2d, 0x2d, 0x2d,
0x0a, 0x00}; 0x0a, 0x00};
const char test_self_signed_client_key[] = { extern "C" const char test_self_signed_client_key[] = {
0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x42, 0x45, 0x47, 0x49, 0x4e, 0x20, 0x50, 0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x42, 0x45, 0x47, 0x49, 0x4e, 0x20, 0x50,
0x52, 0x49, 0x56, 0x41, 0x54, 0x45, 0x20, 0x4b, 0x45, 0x59, 0x2d, 0x2d, 0x52, 0x49, 0x56, 0x41, 0x54, 0x45, 0x20, 0x4b, 0x45, 0x59, 0x2d, 0x2d,
0x2d, 0x2d, 0x2d, 0x0a, 0x4d, 0x49, 0x49, 0x43, 0x64, 0x77, 0x49, 0x42, 0x2d, 0x2d, 0x2d, 0x0a, 0x4d, 0x49, 0x49, 0x43, 0x64, 0x77, 0x49, 0x42,
@ -181,7 +179,7 @@ const char test_self_signed_client_key[] = {
0x52, 0x49, 0x56, 0x41, 0x54, 0x45, 0x20, 0x4b, 0x45, 0x59, 0x2d, 0x2d, 0x52, 0x49, 0x56, 0x41, 0x54, 0x45, 0x20, 0x4b, 0x45, 0x59, 0x2d, 0x2d,
0x2d, 0x2d, 0x2d, 0x0a, 0x00}; 0x2d, 0x2d, 0x2d, 0x0a, 0x00};
const char test_signed_client_cert[] = { extern "C" const char test_signed_client_cert[] = {
0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x42, 0x45, 0x47, 0x49, 0x4e, 0x20, 0x43, 0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x42, 0x45, 0x47, 0x49, 0x4e, 0x20, 0x43,
0x45, 0x52, 0x54, 0x49, 0x46, 0x49, 0x43, 0x41, 0x54, 0x45, 0x2d, 0x2d, 0x45, 0x52, 0x54, 0x49, 0x46, 0x49, 0x43, 0x41, 0x54, 0x45, 0x2d, 0x2d,
0x2d, 0x2d, 0x2d, 0x0a, 0x4d, 0x49, 0x49, 0x43, 0x48, 0x7a, 0x43, 0x43, 0x2d, 0x2d, 0x2d, 0x0a, 0x4d, 0x49, 0x49, 0x43, 0x48, 0x7a, 0x43, 0x43,
@ -250,7 +248,7 @@ const char test_signed_client_cert[] = {
0x20, 0x43, 0x45, 0x52, 0x54, 0x49, 0x46, 0x49, 0x43, 0x41, 0x54, 0x45, 0x20, 0x43, 0x45, 0x52, 0x54, 0x49, 0x46, 0x49, 0x43, 0x41, 0x54, 0x45,
0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x0a, 0x00}; 0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x0a, 0x00};
const char test_signed_client_key[] = { extern "C" const char test_signed_client_key[] = {
0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x42, 0x45, 0x47, 0x49, 0x4e, 0x20, 0x50, 0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x42, 0x45, 0x47, 0x49, 0x4e, 0x20, 0x50,
0x52, 0x49, 0x56, 0x41, 0x54, 0x45, 0x20, 0x4b, 0x45, 0x59, 0x2d, 0x2d, 0x52, 0x49, 0x56, 0x41, 0x54, 0x45, 0x20, 0x4b, 0x45, 0x59, 0x2d, 0x2d,
0x2d, 0x2d, 0x2d, 0x0a, 0x4d, 0x49, 0x49, 0x43, 0x65, 0x51, 0x49, 0x42, 0x2d, 0x2d, 0x2d, 0x0a, 0x4d, 0x49, 0x49, 0x43, 0x65, 0x51, 0x49, 0x42,

@ -16,9 +16,7 @@
* *
*/ */
#include "test/core/end2end/data/ssl_test_data.h" extern "C" const char test_server1_cert[] = {
const char test_server1_cert[] = {
0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x42, 0x45, 0x47, 0x49, 0x4e, 0x20, 0x43, 0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x42, 0x45, 0x47, 0x49, 0x4e, 0x20, 0x43,
0x45, 0x52, 0x54, 0x49, 0x46, 0x49, 0x43, 0x41, 0x54, 0x45, 0x2d, 0x2d, 0x45, 0x52, 0x54, 0x49, 0x46, 0x49, 0x43, 0x41, 0x54, 0x45, 0x2d, 0x2d,
0x2d, 0x2d, 0x2d, 0x0a, 0x4d, 0x49, 0x49, 0x43, 0x6e, 0x44, 0x43, 0x43, 0x2d, 0x2d, 0x2d, 0x0a, 0x4d, 0x49, 0x49, 0x43, 0x6e, 0x44, 0x43, 0x43,

@ -16,9 +16,7 @@
* *
*/ */
#include "test/core/end2end/data/ssl_test_data.h" extern "C" const char test_server1_key[] = {
const char test_server1_key[] = {
0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x42, 0x45, 0x47, 0x49, 0x4e, 0x20, 0x52, 0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x42, 0x45, 0x47, 0x49, 0x4e, 0x20, 0x52,
0x53, 0x41, 0x20, 0x50, 0x52, 0x49, 0x56, 0x41, 0x54, 0x45, 0x20, 0x4b, 0x53, 0x41, 0x20, 0x50, 0x52, 0x49, 0x56, 0x41, 0x54, 0x45, 0x20, 0x4b,
0x45, 0x59, 0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x0a, 0x4d, 0x49, 0x49, 0x43, 0x45, 0x59, 0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x0a, 0x4d, 0x49, 0x49, 0x43,

@ -16,9 +16,7 @@
* *
*/ */
#include "test/core/end2end/data/ssl_test_data.h" extern "C" const char test_root_cert[] = {
const char test_root_cert[] = {
0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x42, 0x45, 0x47, 0x49, 0x4e, 0x20, 0x43, 0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x42, 0x45, 0x47, 0x49, 0x4e, 0x20, 0x43,
0x45, 0x52, 0x54, 0x49, 0x46, 0x49, 0x43, 0x41, 0x54, 0x45, 0x2d, 0x2d, 0x45, 0x52, 0x54, 0x49, 0x46, 0x49, 0x43, 0x41, 0x54, 0x45, 0x2d, 0x2d,
0x2d, 0x2d, 0x2d, 0x0a, 0x4d, 0x49, 0x49, 0x43, 0x49, 0x7a, 0x43, 0x43, 0x2d, 0x2d, 0x2d, 0x0a, 0x4d, 0x49, 0x49, 0x43, 0x49, 0x7a, 0x43, 0x43,

Loading…
Cancel
Save