Track milliseconds since process start in timer heap

Allows reducing a lock-then-check to an atomic load and check on the
fast path of timer checks.

Reduces locks per RPC by about 5.
pull/10194/head
Craig Tiller 8 years ago
parent da73580d2c
commit 7b2dd93362
  1. 1
      include/grpc/impl/codegen/atm_gcc_atomic.h
  2. 1
      include/grpc/impl/codegen/atm_gcc_sync.h
  3. 1
      include/grpc/impl/codegen/atm_windows.h
  4. 111
      src/core/lib/iomgr/timer_generic.c
  5. 2
      src/core/lib/iomgr/timer_generic.h
  6. 16
      src/core/lib/iomgr/timer_heap.c
  7. 21
      test/core/iomgr/timer_heap_test.c
  8. 8
      test/core/iomgr/timer_list_test.c

@ -39,6 +39,7 @@
#include <grpc/impl/codegen/port_platform.h>
typedef intptr_t gpr_atm;
#define GPR_ATM_MAX INTPTR_MAX
#ifdef GPR_LOW_LEVEL_COUNTERS
extern gpr_atm gpr_counter_atm_cas;

@ -39,6 +39,7 @@
#include <grpc/impl/codegen/port_platform.h>
typedef intptr_t gpr_atm;
#define GPR_ATM_MAX INTPTR_MAX
#define GPR_ATM_COMPILE_BARRIER_() __asm__ __volatile__("" : : : "memory")

@ -38,6 +38,7 @@
#include <grpc/impl/codegen/port_platform.h>
typedef intptr_t gpr_atm;
#define GPR_ATM_MAX INTPTR_MAX
#define gpr_atm_full_barrier MemoryBarrier

@ -56,8 +56,8 @@ typedef struct {
gpr_mu mu;
grpc_time_averaged_stats stats;
/* All and only timers with deadlines <= this will be in the heap. */
gpr_timespec queue_deadline_cap;
gpr_timespec min_deadline;
gpr_atm queue_deadline_cap;
gpr_atm min_deadline;
/* Index in the g_shard_queue */
uint32_t shard_queue_index;
/* This holds all timers with deadlines < queue_deadline_cap. Timers in this
@ -76,11 +76,32 @@ static shard_type g_shards[NUM_SHARDS];
/* Protected by g_mu */
static shard_type *g_shard_queue[NUM_SHARDS];
static bool g_initialized = false;
static gpr_timespec g_start_time;
static gpr_atm g_min_timer;
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next, grpc_error *error);
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
gpr_atm *next, grpc_error *error);
static gpr_timespec compute_min_deadline(shard_type *shard) {
static gpr_timespec dbl_to_ts(double d) {
gpr_timespec ts;
ts.tv_sec = (int64_t)d;
ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec));
ts.clock_type = GPR_TIMESPAN;
return ts;
}
static gpr_atm timespec_to_atm(gpr_timespec ts) {
double x = gpr_timespec_to_micros(gpr_time_sub(ts, g_start_time)) / 1000.0;
if (x < 0) return 0;
if (x > GPR_ATM_MAX) return GPR_ATM_MAX;
return (gpr_atm)x;
}
static gpr_timespec atm_to_timespec(gpr_atm x) {
return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0));
}
static gpr_atm compute_min_deadline(shard_type *shard) {
return grpc_timer_heap_is_empty(&shard->heap)
? shard->queue_deadline_cap
: grpc_timer_heap_top(&shard->heap)->deadline;
@ -92,13 +113,15 @@ void grpc_timer_list_init(gpr_timespec now) {
g_initialized = true;
gpr_mu_init(&g_mu);
g_clock_type = now.clock_type;
g_start_time = now;
g_min_timer = timespec_to_atm(now);
for (i = 0; i < NUM_SHARDS; i++) {
shard_type *shard = &g_shards[i];
gpr_mu_init(&shard->mu);
grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
0.5);
shard->queue_deadline_cap = now;
shard->queue_deadline_cap = timespec_to_atm(now);
shard->shard_queue_index = i;
grpc_timer_heap_init(&shard->heap);
shard->list.next = shard->list.prev = &shard->list;
@ -109,7 +132,7 @@ void grpc_timer_list_init(gpr_timespec now) {
void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {
int i;
run_some_expired_timers(exec_ctx, gpr_inf_future(g_clock_type), NULL,
run_some_expired_timers(exec_ctx, GPR_ATM_MAX, NULL,
GRPC_ERROR_CREATE("Timer list shutdown"));
for (i = 0; i < NUM_SHARDS; i++) {
shard_type *shard = &g_shards[i];
@ -124,14 +147,6 @@ static double ts_to_dbl(gpr_timespec ts) {
return (double)ts.tv_sec + 1e-9 * ts.tv_nsec;
}
static gpr_timespec dbl_to_ts(double d) {
gpr_timespec ts;
ts.tv_sec = (int64_t)d;
ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec));
ts.clock_type = GPR_TIMESPAN;
return ts;
}
static void list_join(grpc_timer *head, grpc_timer *timer) {
timer->next = head;
timer->prev = head->prev;
@ -157,15 +172,13 @@ static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) {
static void note_deadline_change(shard_type *shard) {
while (shard->shard_queue_index > 0 &&
gpr_time_cmp(
shard->min_deadline,
g_shard_queue[shard->shard_queue_index - 1]->min_deadline) < 0) {
shard->min_deadline <
g_shard_queue[shard->shard_queue_index - 1]->min_deadline) {
swap_adjacent_shards_in_queue(shard->shard_queue_index - 1);
}
while (shard->shard_queue_index < NUM_SHARDS - 1 &&
gpr_time_cmp(
shard->min_deadline,
g_shard_queue[shard->shard_queue_index + 1]->min_deadline) > 0) {
shard->min_deadline >
g_shard_queue[shard->shard_queue_index + 1]->min_deadline) {
swap_adjacent_shards_in_queue(shard->shard_queue_index);
}
}
@ -178,7 +191,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
GPR_ASSERT(deadline.clock_type == g_clock_type);
GPR_ASSERT(now.clock_type == g_clock_type);
timer->closure = closure;
timer->deadline = deadline;
timer->deadline = timespec_to_atm(deadline);
if (!g_initialized) {
timer->pending = false;
@ -200,7 +213,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
grpc_time_averaged_stats_add_sample(&shard->stats,
ts_to_dbl(gpr_time_sub(deadline, now)));
if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) {
if (timer->deadline < shard->queue_deadline_cap) {
is_first_timer = grpc_timer_heap_add(&shard->heap, timer);
} else {
timer->heap_index = INVALID_HEAP_INDEX;
@ -221,12 +234,12 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
grpc_timer_check. */
if (is_first_timer) {
gpr_mu_lock(&g_mu);
if (gpr_time_cmp(deadline, shard->min_deadline) < 0) {
gpr_timespec old_min_deadline = g_shard_queue[0]->min_deadline;
shard->min_deadline = deadline;
if (timer->deadline < shard->min_deadline) {
gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline;
shard->min_deadline = timer->deadline;
note_deadline_change(shard);
if (shard->shard_queue_index == 0 &&
gpr_time_cmp(deadline, old_min_deadline) < 0) {
if (shard->shard_queue_index == 0 && timer->deadline < old_min_deadline) {
gpr_atm_no_barrier_store(&g_min_timer, timer->deadline);
grpc_kick_poller();
}
}
@ -259,7 +272,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
for timers that fall at or under it. Returns true if the queue is no
longer empty.
REQUIRES: shard->mu locked */
static int refill_queue(shard_type *shard, gpr_timespec now) {
static int refill_queue(shard_type *shard, gpr_atm now) {
/* Compute the new queue window width and bound by the limits: */
double computed_deadline_delta =
grpc_time_averaged_stats_update_average(&shard->stats) *
@ -270,12 +283,12 @@ static int refill_queue(shard_type *shard, gpr_timespec now) {
grpc_timer *timer, *next;
/* Compute the new cap and put all timers under it into the queue: */
shard->queue_deadline_cap = gpr_time_add(
gpr_time_max(now, shard->queue_deadline_cap), dbl_to_ts(deadline_delta));
shard->queue_deadline_cap = GPR_MAX(now, shard->queue_deadline_cap) +
(gpr_atm)(deadline_delta * 1000.0);
for (timer = shard->list.next; timer != &shard->list; timer = next) {
next = timer->next;
if (gpr_time_cmp(timer->deadline, shard->queue_deadline_cap) < 0) {
if (timer->deadline < shard->queue_deadline_cap) {
list_remove(timer);
grpc_timer_heap_add(&shard->heap, timer);
}
@ -286,15 +299,15 @@ static int refill_queue(shard_type *shard, gpr_timespec now) {
/* This pops the next non-cancelled timer with deadline <= now from the
queue, or returns NULL if there isn't one.
REQUIRES: shard->mu locked */
static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) {
static grpc_timer *pop_one(shard_type *shard, gpr_atm now) {
grpc_timer *timer;
for (;;) {
if (grpc_timer_heap_is_empty(&shard->heap)) {
if (gpr_time_cmp(now, shard->queue_deadline_cap) < 0) return NULL;
if (now < shard->queue_deadline_cap) return NULL;
if (!refill_queue(shard, now)) return NULL;
}
timer = grpc_timer_heap_top(&shard->heap);
if (gpr_time_cmp(timer->deadline, now) > 0) return NULL;
if (timer->deadline > now) return NULL;
timer->pending = false;
grpc_timer_heap_pop(&shard->heap);
return timer;
@ -303,7 +316,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) {
/* REQUIRES: shard->mu unlocked */
static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
gpr_timespec now, gpr_timespec *new_min_deadline,
gpr_atm now, gpr_atm *new_min_deadline,
grpc_error *error) {
size_t n = 0;
grpc_timer *timer;
@ -317,17 +330,19 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
return n;
}
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next, grpc_error *error) {
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
gpr_atm *next, grpc_error *error) {
size_t n = 0;
/* TODO(ctiller): verify that there are any timers (atomically) here */
if (now < gpr_atm_no_barrier_load(&g_min_timer)) {
return 0;
}
if (gpr_spinlock_trylock(&g_checker_mu)) {
gpr_mu_lock(&g_mu);
while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) {
gpr_timespec new_min_deadline;
while (g_shard_queue[0]->min_deadline < now) {
gpr_atm new_min_deadline;
/* For efficiency, we pop as many available timers as we can from the
shard. This may violate perfect timer deadline ordering, but that
@ -345,9 +360,10 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
}
if (next) {
*next = gpr_time_min(*next, g_shard_queue[0]->min_deadline);
*next = GPR_MIN(*next, g_shard_queue[0]->min_deadline);
}
gpr_atm_no_barrier_store(&g_min_timer, g_shard_queue[0]->min_deadline);
gpr_mu_unlock(&g_mu);
gpr_spinlock_unlock(&g_checker_mu);
} else if (next != NULL) {
@ -360,8 +376,7 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
successfully, and waking up other pollers IFF that count drops to zero.
Once that count is in place, this entire else branch could disappear. */
*next = gpr_time_min(
*next, gpr_time_add(now, gpr_time_from_millis(1, GPR_TIMESPAN)));
*next = GPR_MIN(*next, now + 1);
}
GRPC_ERROR_UNREF(error);
@ -372,11 +387,15 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next) {
GPR_ASSERT(now.clock_type == g_clock_type);
return run_some_expired_timers(
exec_ctx, now, next,
gpr_atm now_atm = timespec_to_atm(now);
gpr_atm next_atm;
bool r = run_some_expired_timers(
exec_ctx, now_atm, &next_atm,
gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE("Shutting down timer system"));
if (next != NULL) *next = atm_to_timespec(next_atm);
return r;
}
#endif /* GRPC_TIMER_USE_GENERIC */

@ -38,7 +38,7 @@
#include "src/core/lib/iomgr/exec_ctx.h"
struct grpc_timer {
gpr_timespec deadline;
gpr_atm deadline;
uint32_t heap_index; /* INVALID_HEAP_INDEX if not in heap */
bool pending;
struct grpc_timer *next;

@ -50,7 +50,7 @@
static void adjust_upwards(grpc_timer **first, uint32_t i, grpc_timer *t) {
while (i > 0) {
uint32_t parent = (uint32_t)(((int)i - 1) / 2);
if (gpr_time_cmp(first[parent]->deadline, t->deadline) <= 0) break;
if (first[parent]->deadline <= t->deadline) break;
first[i] = first[parent];
first[i]->heap_index = i;
i = parent;
@ -68,12 +68,12 @@ static void adjust_downwards(grpc_timer **first, uint32_t i, uint32_t length,
uint32_t left_child = 1u + 2u * i;
if (left_child >= length) break;
uint32_t right_child = left_child + 1;
uint32_t next_i = right_child < length &&
gpr_time_cmp(first[left_child]->deadline,
first[right_child]->deadline) > 0
? right_child
: left_child;
if (gpr_time_cmp(t->deadline, first[next_i]->deadline) <= 0) break;
uint32_t next_i =
right_child < length &&
first[left_child]->deadline > first[right_child]->deadline
? right_child
: left_child;
if (t->deadline <= first[next_i]->deadline) break;
first[i] = first[next_i];
first[i]->heap_index = i;
i = next_i;
@ -97,7 +97,7 @@ static void maybe_shrink(grpc_timer_heap *heap) {
static void note_changed_priority(grpc_timer_heap *heap, grpc_timer *timer) {
uint32_t i = timer->heap_index;
uint32_t parent = (uint32_t)(((int)i - 1) / 2);
if (gpr_time_cmp(heap->timers[parent]->deadline, timer->deadline) > 0) {
if (heap->timers[parent]->deadline > timer->deadline) {
adjust_upwards(heap->timers, i, timer);
} else {
adjust_downwards(heap->timers, i, heap->timer_count, timer);

@ -47,13 +47,7 @@
#include "test/core/util/test_config.h"
static gpr_timespec random_deadline(void) {
gpr_timespec ts;
ts.tv_sec = rand();
ts.tv_nsec = rand();
ts.clock_type = GPR_CLOCK_REALTIME;
return ts;
}
static gpr_atm random_deadline(void) { return rand(); }
static grpc_timer *create_test_elements(size_t num_elements) {
grpc_timer *elems = gpr_malloc(num_elements * sizeof(grpc_timer));
@ -78,12 +72,10 @@ static void check_valid(grpc_timer_heap *pq) {
size_t left_child = 1u + 2u * i;
size_t right_child = left_child + 1u;
if (left_child < pq->timer_count) {
GPR_ASSERT(gpr_time_cmp(pq->timers[i]->deadline,
pq->timers[left_child]->deadline) <= 0);
GPR_ASSERT(pq->timers[i]->deadline <= pq->timers[left_child]->deadline);
}
if (right_child < pq->timer_count) {
GPR_ASSERT(gpr_time_cmp(pq->timers[i]->deadline,
pq->timers[right_child]->deadline) <= 0);
GPR_ASSERT(pq->timers[i]->deadline <= pq->timers[right_child]->deadline);
}
}
}
@ -227,20 +219,19 @@ static void test2(void) {
}
if (num_inserted) {
gpr_timespec *min_deadline = NULL;
gpr_atm *min_deadline = NULL;
for (size_t i = 0; i < elems_size; i++) {
if (elems[i].inserted) {
if (min_deadline == NULL) {
min_deadline = &elems[i].elem.deadline;
} else {
if (gpr_time_cmp(elems[i].elem.deadline, *min_deadline) < 0) {
if (elems[i].elem.deadline < *min_deadline) {
min_deadline = &elems[i].elem.deadline;
}
}
}
}
GPR_ASSERT(
0 == gpr_time_cmp(grpc_timer_heap_top(&pq)->deadline, *min_deadline));
GPR_ASSERT(grpc_timer_heap_top(&pq)->deadline == *min_deadline);
}
}

@ -57,6 +57,8 @@ static void add_test(void) {
grpc_timer timers[20];
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_INFO, "add_test");
grpc_timer_list_init(start);
memset(cb_called, 0, sizeof(cb_called));
@ -120,9 +122,7 @@ static void add_test(void) {
}
static gpr_timespec tfm(int m) {
gpr_timespec t = gpr_time_from_millis(m, GPR_TIMESPAN);
t.clock_type = GPR_CLOCK_REALTIME;
return t;
return gpr_time_from_millis(m, GPR_CLOCK_REALTIME);
}
/* Cleaning up a list with pending timers. */
@ -130,6 +130,8 @@ void destruction_test(void) {
grpc_timer timers[5];
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_INFO, "destruction_test");
grpc_timer_list_init(gpr_time_0(GPR_CLOCK_REALTIME));
memset(cb_called, 0, sizeof(cb_called));

Loading…
Cancel
Save