Rewrite async_exec_lock using mpscq

pull/6407/head
Craig Tiller 9 years ago
parent 0bc11711b7
commit ad3c8c1a5d
  1. 2
      BUILD
  2. 1
      Makefile
  3. 1
      binding.gyp
  4. 1
      build.yaml
  5. 1
      config.m4
  6. 1
      gRPC.podspec
  7. 1
      grpc.gemspec
  8. 1
      package.xml
  9. 103
      src/core/lib/iomgr/async_execution_lock.c
  10. 11
      src/core/lib/iomgr/async_execution_lock.h
  11. 1
      src/python/grpcio/grpc_core_dependencies.py
  12. 1
      tools/doxygen/Doxyfile.core.internal
  13. 1
      tools/run_tests/sources_and_headers.json
  14. 2
      vsprojects/vcxproj/gpr/gpr.vcxproj
  15. 3
      vsprojects/vcxproj/gpr/gpr.vcxproj.filters

@ -83,6 +83,7 @@ cc_library(
"src/core/lib/support/murmur_hash.c",
"src/core/lib/support/slice.c",
"src/core/lib/support/slice_buffer.c",
"src/core/lib/support/stack_lockfree.c",
"src/core/lib/support/string.c",
"src/core/lib/support/string_posix.c",
"src/core/lib/support/string_util_win32.c",
@ -1224,6 +1225,7 @@ objc_library(
"src/core/lib/support/murmur_hash.c",
"src/core/lib/support/slice.c",
"src/core/lib/support/slice_buffer.c",
"src/core/lib/support/stack_lockfree.c",
"src/core/lib/support/string.c",
"src/core/lib/support/string_posix.c",
"src/core/lib/support/string_util_win32.c",

@ -2352,6 +2352,7 @@ LIBGPR_SRC = \
src/core/lib/support/murmur_hash.c \
src/core/lib/support/slice.c \
src/core/lib/support/slice_buffer.c \
src/core/lib/support/stack_lockfree.c \
src/core/lib/support/string.c \
src/core/lib/support/string_posix.c \
src/core/lib/support/string_util_win32.c \

@ -520,6 +520,7 @@
'src/core/lib/support/murmur_hash.c',
'src/core/lib/support/slice.c',
'src/core/lib/support/slice_buffer.c',
'src/core/lib/support/stack_lockfree.c',
'src/core/lib/support/string.c',
'src/core/lib/support/string_posix.c',
'src/core/lib/support/string_util_win32.c',

@ -102,6 +102,7 @@ filegroups:
- src/core/lib/support/murmur_hash.c
- src/core/lib/support/slice.c
- src/core/lib/support/slice_buffer.c
- src/core/lib/support/stack_lockfree.c
- src/core/lib/support/string.c
- src/core/lib/support/string_posix.c
- src/core/lib/support/string_util_win32.c

@ -61,6 +61,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/support/murmur_hash.c \
src/core/lib/support/slice.c \
src/core/lib/support/slice_buffer.c \
src/core/lib/support/stack_lockfree.c \
src/core/lib/support/string.c \
src/core/lib/support/string_posix.c \
src/core/lib/support/string_util_win32.c \

@ -143,6 +143,7 @@ Pod::Spec.new do |s|
'src/core/lib/support/murmur_hash.c',
'src/core/lib/support/slice.c',
'src/core/lib/support/slice_buffer.c',
'src/core/lib/support/stack_lockfree.c',
'src/core/lib/support/string.c',
'src/core/lib/support/string_posix.c',
'src/core/lib/support/string_util_win32.c',

@ -123,6 +123,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/support/murmur_hash.c )
s.files += %w( src/core/lib/support/slice.c )
s.files += %w( src/core/lib/support/slice_buffer.c )
s.files += %w( src/core/lib/support/stack_lockfree.c )
s.files += %w( src/core/lib/support/string.c )
s.files += %w( src/core/lib/support/string_posix.c )
s.files += %w( src/core/lib/support/string_util_win32.c )

@ -130,6 +130,7 @@
<file baseinstalldir="/" name="src/core/lib/support/murmur_hash.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/slice.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/slice_buffer.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/stack_lockfree.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/string.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/string_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/string_util_win32.c" role="src" />

@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -40,105 +40,44 @@
#define NO_CONSUMER ((gpr_atm)1)
static void bad_action(grpc_exec_ctx *exec_ctx, void *arg) {
GPR_UNREACHABLE_CODE(return );
}
void grpc_aelock_init(grpc_aelock *lock, grpc_workqueue *optional_workqueue) {
lock->optional_workqueue = optional_workqueue;
gpr_atm_no_barrier_store(&lock->head, NO_CONSUMER);
gpr_atm_no_barrier_store(&lock->tombstone.next, 0);
lock->tombstone.action = bad_action;
lock->tail = &lock->tombstone;
gpr_atm_no_barrier_store(&lock->locked, 0);
gpr_mpscq_init(&lock->queue);
}
void grpc_aelock_destroy(grpc_aelock *lock) {
GPR_ASSERT(gpr_atm_no_barrier_load(&lock->head) == NO_CONSUMER);
GPR_ASSERT(gpr_atm_no_barrier_load(&lock->locked) == 0);
gpr_mpscq_destroy(&lock->queue);
}
static void finish(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) {
for (;;) {
grpc_aelock_qnode *tail = lock->tail;
grpc_aelock_qnode *next =
(grpc_aelock_qnode *)gpr_atm_acq_load(&tail->next);
if (tail == &lock->tombstone) {
if (next == NULL) {
if (gpr_atm_rel_cas(&lock->head, (gpr_atm)&lock->tombstone,
NO_CONSUMER)) {
return;
}
// TODO(ctiller): consider sleeping
continue;
} else {
// skip the tombstone: we'll re-add it later
lock->tail = next;
tail = next;
next = (grpc_aelock_qnode *)gpr_atm_acq_load(&tail->next);
}
}
if (next != NULL) {
// found a node
lock->tail = next;
tail->action(exec_ctx, tail->arg);
gpr_free(tail);
} else {
// nothing there: might be in an incosistant state
grpc_aelock_qnode *head =
(grpc_aelock_qnode *)gpr_atm_acq_load(&lock->head);
if (head != tail) {
// non-empty list: spin for a bit
// TODO(ctiller): consider sleeping?
continue;
}
// must have swallowed tombstone above: re-add it
gpr_atm_no_barrier_store(&lock->tombstone.next, 0);
while (!gpr_atm_rel_cas(&lock->head, (gpr_atm)head,
(gpr_atm)&lock->tombstone)) {
head = (grpc_aelock_qnode *)gpr_atm_acq_load(&lock->head);
}
gpr_atm_rel_store(&head->next, (gpr_atm)&lock->tombstone);
while (gpr_atm_full_fetch_add(&lock->locked, -1) != 1) {
gpr_mpscq_node *n;
while ((n = gpr_mpscq_pop(&lock->queue)) == NULL) {
// TODO(ctiller): find something to fill in the time
}
grpc_aelock_qnode *ln = (grpc_aelock_qnode*)n;
ln->action(exec_ctx, ln->arg);
gpr_free(ln);
}
}
void grpc_aelock_execute(grpc_exec_ctx *exec_ctx, grpc_aelock *lock,
grpc_aelock_action action, void *arg,
size_t sizeof_arg) {
gpr_atm head;
retry_top:
head = gpr_atm_acq_load(&lock->head);
if (head == NO_CONSUMER) {
if (!gpr_atm_rel_cas(&lock->head, NO_CONSUMER, (gpr_atm)&lock->tombstone)) {
goto retry_top;
}
if (gpr_atm_full_fetch_add(&lock->locked, 1) == 0) {
action(exec_ctx, arg);
finish(exec_ctx, lock);
return; // early out
}
grpc_aelock_qnode *n = gpr_malloc(sizeof(*n) + sizeof_arg);
n->action = action;
if (sizeof_arg > 0) {
memcpy(n + 1, arg, sizeof_arg);
n->arg = n + 1;
} else {
n->arg = arg;
}
gpr_atm_rel_store(&n->next, 0);
while (!gpr_atm_rel_cas(&lock->head, head, (gpr_atm)n)) {
retry_queue_load:
head = gpr_atm_acq_load(&lock->head);
if (head == NO_CONSUMER) {
if (!gpr_atm_rel_cas(&lock->head, NO_CONSUMER,
(gpr_atm)&lock->tombstone)) {
goto retry_queue_load;
}
gpr_free(n);
action(exec_ctx, arg);
finish(exec_ctx, lock);
return; // early out
grpc_aelock_qnode *n = gpr_malloc(sizeof(*n) + sizeof_arg);
n->action = action;
if (sizeof_arg > 0) {
memcpy(n + 1, arg, sizeof_arg);
n->arg = n + 1;
} else {
n->arg = arg;
}
gpr_mpscq_push(&lock->queue, &n->mpscq_node);
}
GPR_ASSERT(gpr_atm_rel_cas(&((grpc_aelock_qnode *)head)->next, 0, (gpr_atm)n));
// gpr_atm_rel_store(&((grpc_aelock_qnode *)head)->next, (gpr_atm)n);
}

@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -37,22 +37,21 @@
#include <stddef.h>
#include <grpc/support/atm.h>
#include "src/core/lib/support/mpscq.h"
#include "src/core/lib/iomgr/exec_ctx.h"
typedef void (*grpc_aelock_action)(grpc_exec_ctx *exec_ctx, void *arg);
typedef struct grpc_aelock_qnode {
gpr_mpscq_node mpscq_node;
grpc_aelock_action action;
void *arg;
gpr_atm next;
} grpc_aelock_qnode;
typedef struct grpc_aelock {
grpc_workqueue *optional_workqueue;
// grpc_aelock_qnode*
gpr_atm head;
grpc_aelock_qnode *tail;
grpc_aelock_qnode tombstone;
gpr_mpscq queue;
gpr_atm locked;
} grpc_aelock;
void grpc_aelock_init(grpc_aelock *lock, grpc_workqueue *optional_workqueue);

@ -55,6 +55,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/support/murmur_hash.c',
'src/core/lib/support/slice.c',
'src/core/lib/support/slice_buffer.c',
'src/core/lib/support/stack_lockfree.c',
'src/core/lib/support/string.c',
'src/core/lib/support/string_posix.c',
'src/core/lib/support/string_util_win32.c',

@ -1171,6 +1171,7 @@ src/core/lib/support/mpscq.c \
src/core/lib/support/murmur_hash.c \
src/core/lib/support/slice.c \
src/core/lib/support/slice_buffer.c \
src/core/lib/support/stack_lockfree.c \
src/core/lib/support/string.c \
src/core/lib/support/string_posix.c \
src/core/lib/support/string_util_win32.c \

@ -5529,6 +5529,7 @@
"src/core/lib/support/murmur_hash.h",
"src/core/lib/support/slice.c",
"src/core/lib/support/slice_buffer.c",
"src/core/lib/support/stack_lockfree.c",
"src/core/lib/support/stack_lockfree.h",
"src/core/lib/support/string.c",
"src/core/lib/support/string.h",

@ -256,6 +256,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\support\slice_buffer.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\support\stack_lockfree.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\support\string.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\support\string_posix.c">

@ -76,6 +76,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\support\slice_buffer.c">
<Filter>src\core\lib\support</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\support\stack_lockfree.c">
<Filter>src\core\lib\support</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\support\string.c">
<Filter>src\core\lib\support</Filter>
</ClCompile>

Loading…
Cancel
Save