From 7b2dd93362099eef75b49fe33b93692bb148b93f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 16 Mar 2017 16:25:12 -0700 Subject: [PATCH 01/24] Track milliseconds since process start in timer heap Allows reducing a lock-then-check to an atomic load and check on the fast path of timer checks. Reduces locks per RPC by about 5. --- include/grpc/impl/codegen/atm_gcc_atomic.h | 1 + include/grpc/impl/codegen/atm_gcc_sync.h | 1 + include/grpc/impl/codegen/atm_windows.h | 1 + src/core/lib/iomgr/timer_generic.c | 111 ++++++++++++--------- src/core/lib/iomgr/timer_generic.h | 2 +- src/core/lib/iomgr/timer_heap.c | 16 +-- test/core/iomgr/timer_heap_test.c | 21 ++-- test/core/iomgr/timer_list_test.c | 8 +- 8 files changed, 88 insertions(+), 73 deletions(-) diff --git a/include/grpc/impl/codegen/atm_gcc_atomic.h b/include/grpc/impl/codegen/atm_gcc_atomic.h index 4bd3b257413..c8832419dfb 100644 --- a/include/grpc/impl/codegen/atm_gcc_atomic.h +++ b/include/grpc/impl/codegen/atm_gcc_atomic.h @@ -39,6 +39,7 @@ #include typedef intptr_t gpr_atm; +#define GPR_ATM_MAX INTPTR_MAX #ifdef GPR_LOW_LEVEL_COUNTERS extern gpr_atm gpr_counter_atm_cas; diff --git a/include/grpc/impl/codegen/atm_gcc_sync.h b/include/grpc/impl/codegen/atm_gcc_sync.h index 9aa2b43189f..dd814760310 100644 --- a/include/grpc/impl/codegen/atm_gcc_sync.h +++ b/include/grpc/impl/codegen/atm_gcc_sync.h @@ -39,6 +39,7 @@ #include typedef intptr_t gpr_atm; +#define GPR_ATM_MAX INTPTR_MAX #define GPR_ATM_COMPILE_BARRIER_() __asm__ __volatile__("" : : : "memory") diff --git a/include/grpc/impl/codegen/atm_windows.h b/include/grpc/impl/codegen/atm_windows.h index 0ab70b95c4a..b8f63da7587 100644 --- a/include/grpc/impl/codegen/atm_windows.h +++ b/include/grpc/impl/codegen/atm_windows.h @@ -38,6 +38,7 @@ #include typedef intptr_t gpr_atm; +#define GPR_ATM_MAX INTPTR_MAX #define gpr_atm_full_barrier MemoryBarrier diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index d4df96c214d..090b4dc2d4d 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -56,8 +56,8 @@ typedef struct { gpr_mu mu; grpc_time_averaged_stats stats; /* All and only timers with deadlines <= this will be in the heap. */ - gpr_timespec queue_deadline_cap; - gpr_timespec min_deadline; + gpr_atm queue_deadline_cap; + gpr_atm min_deadline; /* Index in the g_shard_queue */ uint32_t shard_queue_index; /* This holds all timers with deadlines < queue_deadline_cap. Timers in this @@ -76,11 +76,32 @@ static shard_type g_shards[NUM_SHARDS]; /* Protected by g_mu */ static shard_type *g_shard_queue[NUM_SHARDS]; static bool g_initialized = false; +static gpr_timespec g_start_time; +static gpr_atm g_min_timer; -static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next, grpc_error *error); +static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, + gpr_atm *next, grpc_error *error); -static gpr_timespec compute_min_deadline(shard_type *shard) { +static gpr_timespec dbl_to_ts(double d) { + gpr_timespec ts; + ts.tv_sec = (int64_t)d; + ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec)); + ts.clock_type = GPR_TIMESPAN; + return ts; +} + +static gpr_atm timespec_to_atm(gpr_timespec ts) { + double x = gpr_timespec_to_micros(gpr_time_sub(ts, g_start_time)) / 1000.0; + if (x < 0) return 0; + if (x > GPR_ATM_MAX) return GPR_ATM_MAX; + return (gpr_atm)x; +} + +static gpr_timespec atm_to_timespec(gpr_atm x) { + return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0)); +} + +static gpr_atm compute_min_deadline(shard_type *shard) { return grpc_timer_heap_is_empty(&shard->heap) ? shard->queue_deadline_cap : grpc_timer_heap_top(&shard->heap)->deadline; @@ -92,13 +113,15 @@ void grpc_timer_list_init(gpr_timespec now) { g_initialized = true; gpr_mu_init(&g_mu); g_clock_type = now.clock_type; + g_start_time = now; + g_min_timer = timespec_to_atm(now); for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; gpr_mu_init(&shard->mu); grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1, 0.5); - shard->queue_deadline_cap = now; + shard->queue_deadline_cap = timespec_to_atm(now); shard->shard_queue_index = i; grpc_timer_heap_init(&shard->heap); shard->list.next = shard->list.prev = &shard->list; @@ -109,7 +132,7 @@ void grpc_timer_list_init(gpr_timespec now) { void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { int i; - run_some_expired_timers(exec_ctx, gpr_inf_future(g_clock_type), NULL, + run_some_expired_timers(exec_ctx, GPR_ATM_MAX, NULL, GRPC_ERROR_CREATE("Timer list shutdown")); for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; @@ -124,14 +147,6 @@ static double ts_to_dbl(gpr_timespec ts) { return (double)ts.tv_sec + 1e-9 * ts.tv_nsec; } -static gpr_timespec dbl_to_ts(double d) { - gpr_timespec ts; - ts.tv_sec = (int64_t)d; - ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec)); - ts.clock_type = GPR_TIMESPAN; - return ts; -} - static void list_join(grpc_timer *head, grpc_timer *timer) { timer->next = head; timer->prev = head->prev; @@ -157,15 +172,13 @@ static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) { static void note_deadline_change(shard_type *shard) { while (shard->shard_queue_index > 0 && - gpr_time_cmp( - shard->min_deadline, - g_shard_queue[shard->shard_queue_index - 1]->min_deadline) < 0) { + shard->min_deadline < + g_shard_queue[shard->shard_queue_index - 1]->min_deadline) { swap_adjacent_shards_in_queue(shard->shard_queue_index - 1); } while (shard->shard_queue_index < NUM_SHARDS - 1 && - gpr_time_cmp( - shard->min_deadline, - g_shard_queue[shard->shard_queue_index + 1]->min_deadline) > 0) { + shard->min_deadline > + g_shard_queue[shard->shard_queue_index + 1]->min_deadline) { swap_adjacent_shards_in_queue(shard->shard_queue_index); } } @@ -178,7 +191,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, GPR_ASSERT(deadline.clock_type == g_clock_type); GPR_ASSERT(now.clock_type == g_clock_type); timer->closure = closure; - timer->deadline = deadline; + timer->deadline = timespec_to_atm(deadline); if (!g_initialized) { timer->pending = false; @@ -200,7 +213,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, grpc_time_averaged_stats_add_sample(&shard->stats, ts_to_dbl(gpr_time_sub(deadline, now))); - if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) { + if (timer->deadline < shard->queue_deadline_cap) { is_first_timer = grpc_timer_heap_add(&shard->heap, timer); } else { timer->heap_index = INVALID_HEAP_INDEX; @@ -221,12 +234,12 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, grpc_timer_check. */ if (is_first_timer) { gpr_mu_lock(&g_mu); - if (gpr_time_cmp(deadline, shard->min_deadline) < 0) { - gpr_timespec old_min_deadline = g_shard_queue[0]->min_deadline; - shard->min_deadline = deadline; + if (timer->deadline < shard->min_deadline) { + gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline; + shard->min_deadline = timer->deadline; note_deadline_change(shard); - if (shard->shard_queue_index == 0 && - gpr_time_cmp(deadline, old_min_deadline) < 0) { + if (shard->shard_queue_index == 0 && timer->deadline < old_min_deadline) { + gpr_atm_no_barrier_store(&g_min_timer, timer->deadline); grpc_kick_poller(); } } @@ -259,7 +272,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { for timers that fall at or under it. Returns true if the queue is no longer empty. REQUIRES: shard->mu locked */ -static int refill_queue(shard_type *shard, gpr_timespec now) { +static int refill_queue(shard_type *shard, gpr_atm now) { /* Compute the new queue window width and bound by the limits: */ double computed_deadline_delta = grpc_time_averaged_stats_update_average(&shard->stats) * @@ -270,12 +283,12 @@ static int refill_queue(shard_type *shard, gpr_timespec now) { grpc_timer *timer, *next; /* Compute the new cap and put all timers under it into the queue: */ - shard->queue_deadline_cap = gpr_time_add( - gpr_time_max(now, shard->queue_deadline_cap), dbl_to_ts(deadline_delta)); + shard->queue_deadline_cap = GPR_MAX(now, shard->queue_deadline_cap) + + (gpr_atm)(deadline_delta * 1000.0); for (timer = shard->list.next; timer != &shard->list; timer = next) { next = timer->next; - if (gpr_time_cmp(timer->deadline, shard->queue_deadline_cap) < 0) { + if (timer->deadline < shard->queue_deadline_cap) { list_remove(timer); grpc_timer_heap_add(&shard->heap, timer); } @@ -286,15 +299,15 @@ static int refill_queue(shard_type *shard, gpr_timespec now) { /* This pops the next non-cancelled timer with deadline <= now from the queue, or returns NULL if there isn't one. REQUIRES: shard->mu locked */ -static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) { +static grpc_timer *pop_one(shard_type *shard, gpr_atm now) { grpc_timer *timer; for (;;) { if (grpc_timer_heap_is_empty(&shard->heap)) { - if (gpr_time_cmp(now, shard->queue_deadline_cap) < 0) return NULL; + if (now < shard->queue_deadline_cap) return NULL; if (!refill_queue(shard, now)) return NULL; } timer = grpc_timer_heap_top(&shard->heap); - if (gpr_time_cmp(timer->deadline, now) > 0) return NULL; + if (timer->deadline > now) return NULL; timer->pending = false; grpc_timer_heap_pop(&shard->heap); return timer; @@ -303,7 +316,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) { /* REQUIRES: shard->mu unlocked */ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, - gpr_timespec now, gpr_timespec *new_min_deadline, + gpr_atm now, gpr_atm *new_min_deadline, grpc_error *error) { size_t n = 0; grpc_timer *timer; @@ -317,17 +330,19 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, return n; } -static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next, grpc_error *error) { +static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, + gpr_atm *next, grpc_error *error) { size_t n = 0; - /* TODO(ctiller): verify that there are any timers (atomically) here */ + if (now < gpr_atm_no_barrier_load(&g_min_timer)) { + return 0; + } if (gpr_spinlock_trylock(&g_checker_mu)) { gpr_mu_lock(&g_mu); - while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) { - gpr_timespec new_min_deadline; + while (g_shard_queue[0]->min_deadline < now) { + gpr_atm new_min_deadline; /* For efficiency, we pop as many available timers as we can from the shard. This may violate perfect timer deadline ordering, but that @@ -345,9 +360,10 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, } if (next) { - *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline); + *next = GPR_MIN(*next, g_shard_queue[0]->min_deadline); } + gpr_atm_no_barrier_store(&g_min_timer, g_shard_queue[0]->min_deadline); gpr_mu_unlock(&g_mu); gpr_spinlock_unlock(&g_checker_mu); } else if (next != NULL) { @@ -360,8 +376,7 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, successfully, and waking up other pollers IFF that count drops to zero. Once that count is in place, this entire else branch could disappear. */ - *next = gpr_time_min( - *next, gpr_time_add(now, gpr_time_from_millis(1, GPR_TIMESPAN))); + *next = GPR_MIN(*next, now + 1); } GRPC_ERROR_UNREF(error); @@ -372,11 +387,15 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_timespec *next) { GPR_ASSERT(now.clock_type == g_clock_type); - return run_some_expired_timers( - exec_ctx, now, next, + gpr_atm now_atm = timespec_to_atm(now); + gpr_atm next_atm; + bool r = run_some_expired_timers( + exec_ctx, now_atm, &next_atm, gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE("Shutting down timer system")); + if (next != NULL) *next = atm_to_timespec(next_atm); + return r; } #endif /* GRPC_TIMER_USE_GENERIC */ diff --git a/src/core/lib/iomgr/timer_generic.h b/src/core/lib/iomgr/timer_generic.h index 1608dce9fb1..c79a431aa03 100644 --- a/src/core/lib/iomgr/timer_generic.h +++ b/src/core/lib/iomgr/timer_generic.h @@ -38,7 +38,7 @@ #include "src/core/lib/iomgr/exec_ctx.h" struct grpc_timer { - gpr_timespec deadline; + gpr_atm deadline; uint32_t heap_index; /* INVALID_HEAP_INDEX if not in heap */ bool pending; struct grpc_timer *next; diff --git a/src/core/lib/iomgr/timer_heap.c b/src/core/lib/iomgr/timer_heap.c index f736d335e6c..03ccfe023a5 100644 --- a/src/core/lib/iomgr/timer_heap.c +++ b/src/core/lib/iomgr/timer_heap.c @@ -50,7 +50,7 @@ static void adjust_upwards(grpc_timer **first, uint32_t i, grpc_timer *t) { while (i > 0) { uint32_t parent = (uint32_t)(((int)i - 1) / 2); - if (gpr_time_cmp(first[parent]->deadline, t->deadline) <= 0) break; + if (first[parent]->deadline <= t->deadline) break; first[i] = first[parent]; first[i]->heap_index = i; i = parent; @@ -68,12 +68,12 @@ static void adjust_downwards(grpc_timer **first, uint32_t i, uint32_t length, uint32_t left_child = 1u + 2u * i; if (left_child >= length) break; uint32_t right_child = left_child + 1; - uint32_t next_i = right_child < length && - gpr_time_cmp(first[left_child]->deadline, - first[right_child]->deadline) > 0 - ? right_child - : left_child; - if (gpr_time_cmp(t->deadline, first[next_i]->deadline) <= 0) break; + uint32_t next_i = + right_child < length && + first[left_child]->deadline > first[right_child]->deadline + ? right_child + : left_child; + if (t->deadline <= first[next_i]->deadline) break; first[i] = first[next_i]; first[i]->heap_index = i; i = next_i; @@ -97,7 +97,7 @@ static void maybe_shrink(grpc_timer_heap *heap) { static void note_changed_priority(grpc_timer_heap *heap, grpc_timer *timer) { uint32_t i = timer->heap_index; uint32_t parent = (uint32_t)(((int)i - 1) / 2); - if (gpr_time_cmp(heap->timers[parent]->deadline, timer->deadline) > 0) { + if (heap->timers[parent]->deadline > timer->deadline) { adjust_upwards(heap->timers, i, timer); } else { adjust_downwards(heap->timers, i, heap->timer_count, timer); diff --git a/test/core/iomgr/timer_heap_test.c b/test/core/iomgr/timer_heap_test.c index 410d972313b..153304fa7b4 100644 --- a/test/core/iomgr/timer_heap_test.c +++ b/test/core/iomgr/timer_heap_test.c @@ -47,13 +47,7 @@ #include "test/core/util/test_config.h" -static gpr_timespec random_deadline(void) { - gpr_timespec ts; - ts.tv_sec = rand(); - ts.tv_nsec = rand(); - ts.clock_type = GPR_CLOCK_REALTIME; - return ts; -} +static gpr_atm random_deadline(void) { return rand(); } static grpc_timer *create_test_elements(size_t num_elements) { grpc_timer *elems = gpr_malloc(num_elements * sizeof(grpc_timer)); @@ -78,12 +72,10 @@ static void check_valid(grpc_timer_heap *pq) { size_t left_child = 1u + 2u * i; size_t right_child = left_child + 1u; if (left_child < pq->timer_count) { - GPR_ASSERT(gpr_time_cmp(pq->timers[i]->deadline, - pq->timers[left_child]->deadline) <= 0); + GPR_ASSERT(pq->timers[i]->deadline <= pq->timers[left_child]->deadline); } if (right_child < pq->timer_count) { - GPR_ASSERT(gpr_time_cmp(pq->timers[i]->deadline, - pq->timers[right_child]->deadline) <= 0); + GPR_ASSERT(pq->timers[i]->deadline <= pq->timers[right_child]->deadline); } } } @@ -227,20 +219,19 @@ static void test2(void) { } if (num_inserted) { - gpr_timespec *min_deadline = NULL; + gpr_atm *min_deadline = NULL; for (size_t i = 0; i < elems_size; i++) { if (elems[i].inserted) { if (min_deadline == NULL) { min_deadline = &elems[i].elem.deadline; } else { - if (gpr_time_cmp(elems[i].elem.deadline, *min_deadline) < 0) { + if (elems[i].elem.deadline < *min_deadline) { min_deadline = &elems[i].elem.deadline; } } } } - GPR_ASSERT( - 0 == gpr_time_cmp(grpc_timer_heap_top(&pq)->deadline, *min_deadline)); + GPR_ASSERT(grpc_timer_heap_top(&pq)->deadline == *min_deadline); } } diff --git a/test/core/iomgr/timer_list_test.c b/test/core/iomgr/timer_list_test.c index 85ad5277cc0..5c397b32cbc 100644 --- a/test/core/iomgr/timer_list_test.c +++ b/test/core/iomgr/timer_list_test.c @@ -57,6 +57,8 @@ static void add_test(void) { grpc_timer timers[20]; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_log(GPR_INFO, "add_test"); + grpc_timer_list_init(start); memset(cb_called, 0, sizeof(cb_called)); @@ -120,9 +122,7 @@ static void add_test(void) { } static gpr_timespec tfm(int m) { - gpr_timespec t = gpr_time_from_millis(m, GPR_TIMESPAN); - t.clock_type = GPR_CLOCK_REALTIME; - return t; + return gpr_time_from_millis(m, GPR_CLOCK_REALTIME); } /* Cleaning up a list with pending timers. */ @@ -130,6 +130,8 @@ void destruction_test(void) { grpc_timer timers[5]; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_log(GPR_INFO, "destruction_test"); + grpc_timer_list_init(gpr_time_0(GPR_CLOCK_REALTIME)); memset(cb_called, 0, sizeof(cb_called)); From 185f6c9e047d67ffa8cc4cc9fc1844456e92edfe Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 17 Mar 2017 08:33:19 -0700 Subject: [PATCH 02/24] Fix rounding, reduce contention on global shared state --- include/grpc/impl/codegen/port_platform.h | 2 + include/grpc/support/tls.h | 2 +- src/core/lib/iomgr/ev_epoll_linux.c | 2 + src/core/lib/iomgr/ev_poll_posix.c | 2 + src/core/lib/iomgr/timer.h | 3 + src/core/lib/iomgr/timer_generic.c | 104 ++++++++++++++-------- 6 files changed, 75 insertions(+), 40 deletions(-) diff --git a/include/grpc/impl/codegen/port_platform.h b/include/grpc/impl/codegen/port_platform.h index e565cd31d75..3d490db1a55 100644 --- a/include/grpc/impl/codegen/port_platform.h +++ b/include/grpc/impl/codegen/port_platform.h @@ -367,8 +367,10 @@ typedef unsigned __int64 uint64_t; #ifndef GRPC_MUST_USE_RESULT #if defined(__GNUC__) && !defined(__MINGW32__) #define GRPC_MUST_USE_RESULT __attribute__((warn_unused_result)) +#define GPR_ALIGN_STRUCT(n) __attribute__((aligned(n))) #else #define GRPC_MUST_USE_RESULT +#define GPR_ALIGN_STRUCT(n) #endif #endif diff --git a/include/grpc/support/tls.h b/include/grpc/support/tls.h index a45e1f0a4d5..5365449f0da 100644 --- a/include/grpc/support/tls.h +++ b/include/grpc/support/tls.h @@ -58,7 +58,7 @@ gpr_tls_set(&foo, new_value); Accessing a thread local: - current_value = gpr_tls_get(&foo, value); + current_value = gpr_tls_get(&foo); ALL functions here may be implemented as macros. */ diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 11208b9ad13..400d4057a7c 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -56,6 +56,7 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" @@ -1669,6 +1670,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, for (int i = 0; i < ep_rv; ++i) { void *data_ptr = ep_ev[i].data.ptr; if (data_ptr == &global_wakeup_fd) { + grpc_timer_consume_kick(); append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), err_desc); } else if (data_ptr == &pi->workqueue_wakeup_fd) { diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 5ddd5313e2b..f27eb888436 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -52,6 +52,7 @@ #include #include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_cv.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" @@ -1004,6 +1005,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } else { if (pfds[0].revents & POLLIN_CHECK) { + grpc_timer_consume_kick(); work_combine_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd)); } diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index d84a278b183..e0338f93c7d 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -101,6 +101,9 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, void grpc_timer_list_init(gpr_timespec now); void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx); +/* Consume a kick issued by grpc_kick_poller */ +void grpc_timer_consume_kick(void); + /* the following must be implemented by each iomgr implementation */ void grpc_kick_poller(void); diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 090b4dc2d4d..900731c37c6 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -39,6 +39,7 @@ #include #include +#include #include #include "src/core/lib/iomgr/time_averaged_stats.h" #include "src/core/lib/iomgr/timer_heap.h" @@ -67,17 +68,25 @@ typedef struct { grpc_timer list; } shard_type; -/* Protects g_shard_queue */ -static gpr_mu g_mu; -/* Allow only one run_some_expired_timers at once */ -static gpr_spinlock g_checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER; +struct shared_mutables { + gpr_atm min_timer; + /* Allow only one run_some_expired_timers at once */ + gpr_spinlock checker_mu; + bool initialized; + /* Protects g_shard_queue */ + gpr_mu mu; +} GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE); + +static struct shared_mutables g_shared_mutables = { + .checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false, +}; static gpr_clock_type g_clock_type; static shard_type g_shards[NUM_SHARDS]; -/* Protected by g_mu */ +/* Protected by g_shared_mutables.mu */ static shard_type *g_shard_queue[NUM_SHARDS]; -static bool g_initialized = false; static gpr_timespec g_start_time; -static gpr_atm g_min_timer; + +GPR_TLS_DECL(g_last_seen_min_timer); static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, gpr_atm *next, grpc_error *error); @@ -90,8 +99,17 @@ static gpr_timespec dbl_to_ts(double d) { return ts; } -static gpr_atm timespec_to_atm(gpr_timespec ts) { - double x = gpr_timespec_to_micros(gpr_time_sub(ts, g_start_time)) / 1000.0; +static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { + double x = GPR_MS_PER_SEC * (double)ts.tv_sec + + (double)ts.tv_nsec / GPR_NS_PER_MS + 1.0; + if (x < 0) return 0; + if (x > GPR_ATM_MAX) return GPR_ATM_MAX; + return (gpr_atm)x; +} + +static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { + double x = + GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; if (x < 0) return 0; if (x > GPR_ATM_MAX) return GPR_ATM_MAX; return (gpr_atm)x; @@ -110,18 +128,19 @@ static gpr_atm compute_min_deadline(shard_type *shard) { void grpc_timer_list_init(gpr_timespec now) { uint32_t i; - g_initialized = true; - gpr_mu_init(&g_mu); + g_shared_mutables.initialized = true; + gpr_mu_init(&g_shared_mutables.mu); g_clock_type = now.clock_type; g_start_time = now; - g_min_timer = timespec_to_atm(now); + g_shared_mutables.min_timer = timespec_to_atm_round_down(now); + gpr_tls_init(&g_last_seen_min_timer); for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; gpr_mu_init(&shard->mu); grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1, 0.5); - shard->queue_deadline_cap = timespec_to_atm(now); + shard->queue_deadline_cap = g_shared_mutables.min_timer; shard->shard_queue_index = i; grpc_timer_heap_init(&shard->heap); shard->list.next = shard->list.prev = &shard->list; @@ -139,8 +158,9 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { gpr_mu_destroy(&shard->mu); grpc_timer_heap_destroy(&shard->heap); } - gpr_mu_destroy(&g_mu); - g_initialized = false; + gpr_mu_destroy(&g_shared_mutables.mu); + gpr_tls_destroy(&g_last_seen_min_timer); + g_shared_mutables.initialized = false; } static double ts_to_dbl(gpr_timespec ts) { @@ -191,9 +211,9 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, GPR_ASSERT(deadline.clock_type == g_clock_type); GPR_ASSERT(now.clock_type == g_clock_type); timer->closure = closure; - timer->deadline = timespec_to_atm(deadline); + timer->deadline = timespec_to_atm_round_up(deadline); - if (!g_initialized) { + if (!g_shared_mutables.initialized) { timer->pending = false; grpc_closure_sched( exec_ctx, timer->closure, @@ -233,22 +253,27 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, In that case, the timer will simply have to wait for the next grpc_timer_check. */ if (is_first_timer) { - gpr_mu_lock(&g_mu); + gpr_mu_lock(&g_shared_mutables.mu); if (timer->deadline < shard->min_deadline) { gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline; shard->min_deadline = timer->deadline; note_deadline_change(shard); if (shard->shard_queue_index == 0 && timer->deadline < old_min_deadline) { - gpr_atm_no_barrier_store(&g_min_timer, timer->deadline); + gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, timer->deadline); grpc_kick_poller(); } } - gpr_mu_unlock(&g_mu); + gpr_mu_unlock(&g_shared_mutables.mu); } } +void grpc_timer_consume_kick(void) { + /* force re-evaluation of last seeen min */ + gpr_tls_set(&g_last_seen_min_timer, 0); +} + void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { - if (!g_initialized) { + if (!g_shared_mutables.initialized) { /* must have already been cancelled, also the shard mutex is invalid */ return; } @@ -334,12 +359,23 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, gpr_atm *next, grpc_error *error) { size_t n = 0; - if (now < gpr_atm_no_barrier_load(&g_min_timer)) { + /* fetch from a thread-local first: this avoids contention on a globally + mutable cacheline in the common case */ + gpr_atm min_timer = gpr_tls_get(&g_last_seen_min_timer); + if (now < min_timer) { + if (next != NULL) *next = GPR_MIN(*next, min_timer); return 0; } - if (gpr_spinlock_trylock(&g_checker_mu)) { - gpr_mu_lock(&g_mu); + min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer); + gpr_tls_set(&g_last_seen_min_timer, min_timer); + if (now < min_timer) { + if (next != NULL) *next = GPR_MIN(*next, min_timer); + return 0; + } + + if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) { + gpr_mu_lock(&g_shared_mutables.mu); while (g_shard_queue[0]->min_deadline < now) { gpr_atm new_min_deadline; @@ -363,20 +399,10 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, *next = GPR_MIN(*next, g_shard_queue[0]->min_deadline); } - gpr_atm_no_barrier_store(&g_min_timer, g_shard_queue[0]->min_deadline); - gpr_mu_unlock(&g_mu); - gpr_spinlock_unlock(&g_checker_mu); - } else if (next != NULL) { - /* TODO(ctiller): this forces calling code to do an short poll, and - then retry the timer check (because this time through the timer list was - contended). - - We could reduce the cost here dramatically by keeping a count of how many - currently active pollers got through the uncontended case above - successfully, and waking up other pollers IFF that count drops to zero. - - Once that count is in place, this entire else branch could disappear. */ - *next = GPR_MIN(*next, now + 1); + gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, + g_shard_queue[0]->min_deadline); + gpr_mu_unlock(&g_shared_mutables.mu); + gpr_spinlock_unlock(&g_shared_mutables.checker_mu); } GRPC_ERROR_UNREF(error); @@ -387,7 +413,7 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_timespec *next) { GPR_ASSERT(now.clock_type == g_clock_type); - gpr_atm now_atm = timespec_to_atm(now); + gpr_atm now_atm = timespec_to_atm_round_down(now); gpr_atm next_atm; bool r = run_some_expired_timers( exec_ctx, now_atm, &next_atm, From bd0af4fc6cad1f4d9b371e5cea19cf5c91d8c8c2 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 20 Mar 2017 08:33:02 -0700 Subject: [PATCH 03/24] Fix signed overflow on queue drain --- src/core/lib/iomgr/timer_generic.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 900731c37c6..d52613e713f 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -88,6 +88,13 @@ static gpr_timespec g_start_time; GPR_TLS_DECL(g_last_seen_min_timer); +static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { + if (a > GPR_ATM_MAX - b) { + return GPR_ATM_MAX; + } + return a + b; +} + static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, gpr_atm *next, grpc_error *error); @@ -308,8 +315,9 @@ static int refill_queue(shard_type *shard, gpr_atm now) { grpc_timer *timer, *next; /* Compute the new cap and put all timers under it into the queue: */ - shard->queue_deadline_cap = GPR_MAX(now, shard->queue_deadline_cap) + - (gpr_atm)(deadline_delta * 1000.0); + shard->queue_deadline_cap = + saturating_add(GPR_MAX(now, shard->queue_deadline_cap), + (gpr_atm)(deadline_delta * 1000.0)); for (timer = shard->list.next; timer != &shard->list; timer = next) { next = timer->next; From 18e15064849b9b5c85f28ea9e8f0e56c059fb57b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 20 Mar 2017 09:35:39 -0700 Subject: [PATCH 04/24] Fix uninitialized variable --- src/core/lib/iomgr/timer_generic.c | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index d52613e713f..048cf3dc78a 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -422,13 +422,17 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_timespec *next) { GPR_ASSERT(now.clock_type == g_clock_type); gpr_atm now_atm = timespec_to_atm_round_down(now); - gpr_atm next_atm; - bool r = run_some_expired_timers( - exec_ctx, now_atm, &next_atm, + grpc_error *shutdown_error = gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 ? GRPC_ERROR_NONE - : GRPC_ERROR_CREATE("Shutting down timer system")); - if (next != NULL) *next = atm_to_timespec(next_atm); + : GRPC_ERROR_CREATE("Shutting down timer system"); + if (next == NULL) { + r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error); + } else { + gpr_atm next_atm = timespec_to_atm_round_down(*next); + r = run_some_expired_timers(exec_ctx, now_atm, &next_atm, shutdown_error); + *next = atm_to_timespec(next_atm); + } return r; } From 99c718b8f7fc1cba779894c5e07988c68ea84801 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 20 Mar 2017 20:03:52 -0700 Subject: [PATCH 05/24] Update timer_generic.c --- src/core/lib/iomgr/timer_generic.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 048cf3dc78a..5a6490bf736 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -426,6 +426,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE("Shutting down timer system"); + bool r; if (next == NULL) { r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error); } else { From 4e4647ae443ca43b4b72e79927409c46d14da7a9 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 21 Mar 2017 08:44:43 -0700 Subject: [PATCH 06/24] Fix crash on mac --- src/core/lib/iomgr/timer_generic.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 5a6490bf736..05cc51422e1 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -141,6 +141,7 @@ void grpc_timer_list_init(gpr_timespec now) { g_start_time = now; g_shared_mutables.min_timer = timespec_to_atm_round_down(now); gpr_tls_init(&g_last_seen_min_timer); + gpr_tls_set(&g_last_seen_min_timer, 0); for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; From 2a1949e566d2120bfafdf4ea87919cc7c3feccef Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 21 Mar 2017 09:29:12 -0700 Subject: [PATCH 07/24] Add tracing for timers, fix a time shifting bug --- doc/environment_variables.md | 1 + src/core/lib/iomgr/timer_generic.c | 47 ++++++++++++++++++++++++++++-- 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/doc/environment_variables.md b/doc/environment_variables.md index a3806732334..a92cadd8fcc 100644 --- a/doc/environment_variables.md +++ b/doc/environment_variables.md @@ -55,6 +55,7 @@ some configuration as environment variables that can be set. - queue_timeout - server_channel - lightweight trace of significant server channel events - secure_endpoint - traces bytes flowing through encrypted channels + - timer - timers (alarms) in the grpc internals - transport_security - traces metadata about secure channel establishment - tcp - traces bytes in and out of a channel diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 05cc51422e1..4316ece8c3a 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -37,10 +37,13 @@ #include "src/core/lib/iomgr/timer.h" +#include #include +#include #include #include #include +#include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/time_averaged_stats.h" #include "src/core/lib/iomgr/timer_heap.h" #include "src/core/lib/support/spinlock.h" @@ -53,6 +56,9 @@ #define MIN_QUEUE_WINDOW_DURATION 0.01 #define MAX_QUEUE_WINDOW_DURATION 1 +static int grpc_timer_trace = 0; +static int grpc_timer_check_trace = 0; + typedef struct { gpr_mu mu; grpc_time_averaged_stats stats; @@ -107,6 +113,7 @@ static gpr_timespec dbl_to_ts(double d) { } static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { + ts = gpr_time_sub(ts, g_start_time); double x = GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS + 1.0; if (x < 0) return 0; @@ -115,6 +122,7 @@ static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { } static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { + ts = gpr_time_sub(ts, g_start_time); double x = GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; if (x < 0) return 0; @@ -142,6 +150,8 @@ void grpc_timer_list_init(gpr_timespec now) { g_shared_mutables.min_timer = timespec_to_atm_round_down(now); gpr_tls_init(&g_last_seen_min_timer); gpr_tls_set(&g_last_seen_min_timer, 0); + grpc_register_tracer("timer", &grpc_timer_trace); + grpc_register_tracer("timer_check", &grpc_timer_check_trace); for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; @@ -221,6 +231,13 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, timer->closure = closure; timer->deadline = timespec_to_atm_round_up(deadline); + if (grpc_timer_trace) { + gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRIdPTR ".%09d now %" PRIdPTR + ".%09d [%" PRIdPTR "] call %p[%p]", + timer, deadline.tv_sec, deadline.tv_nsec, now.tv_sec, now.tv_nsec, + timer->deadline, closure, closure->cb); + } + if (!g_shared_mutables.initialized) { timer->pending = false; grpc_closure_sched( @@ -427,15 +444,41 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE("Shutting down timer system"); + if (grpc_timer_check_trace) { + char *next_str; + if (next == NULL) { + next_str = gpr_strdup("NULL"); + } else { + gpr_asprintf(&next_str, "%" PRIdPTR ".%09d [%" PRIdPTR "]", next->tv_sec, + next->tv_nsec, timespec_to_atm_round_down(*next)); + } + gpr_log(GPR_DEBUG, + "TIMER CHECK BEGIN: now=%" PRIdPTR ".%09d [%" PRIdPTR "] next=%s", + now.tv_sec, now.tv_nsec, now_atm, next_str); + gpr_free(next_str); + } bool r; + gpr_atm next_atm; if (next == NULL) { r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error); } else { - gpr_atm next_atm = timespec_to_atm_round_down(*next); + next_atm = timespec_to_atm_round_down(*next); r = run_some_expired_timers(exec_ctx, now_atm, &next_atm, shutdown_error); *next = atm_to_timespec(next_atm); } - return r; + if (grpc_timer_check_trace) { + char *next_str; + if (next == NULL) { + next_str = gpr_strdup("NULL"); + } else { + gpr_asprintf(&next_str, "%" PRIdPTR ".%09d [%" PRIdPTR "]", next->tv_sec, + next->tv_nsec, next_atm); + } + gpr_log(GPR_DEBUG, "TIMER CHECK END: %d timers triggered; next=%s", r, + next_str); + gpr_free(next_str); + } + return r > 0; } #endif /* GRPC_TIMER_USE_GENERIC */ From afb168bc2e2fbe2cb7bfa26b3d0d5d7e8bd54b7d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 21 Mar 2017 12:48:57 -0700 Subject: [PATCH 08/24] Fix race that ended up forcing us to spin poll with every thread on the timer stack --- src/core/lib/iomgr/timer_generic.c | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 4316ece8c3a..459717212a3 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -232,10 +232,11 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, timer->deadline = timespec_to_atm_round_up(deadline); if (grpc_timer_trace) { - gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRIdPTR ".%09d now %" PRIdPTR - ".%09d [%" PRIdPTR "] call %p[%p]", - timer, deadline.tv_sec, deadline.tv_nsec, now.tv_sec, now.tv_nsec, - timer->deadline, closure, closure->cb); + gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRIdPTR ".%09d [%" PRIdPTR + "] now %" PRIdPTR ".%09d [%" PRIdPTR "] call %p[%p]", + timer, deadline.tv_sec, deadline.tv_nsec, timer->deadline, + now.tv_sec, now.tv_nsec, timespec_to_atm_round_down(now), closure, + closure->cb); } if (!g_shared_mutables.initialized) { @@ -358,7 +359,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_atm now) { if (!refill_queue(shard, now)) return NULL; } timer = grpc_timer_heap_top(&shard->heap); - if (timer->deadline > now) return NULL; + if (timer->deadline >= now) return NULL; timer->pending = false; grpc_timer_heap_pop(&shard->heap); return timer; @@ -452,9 +453,11 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_asprintf(&next_str, "%" PRIdPTR ".%09d [%" PRIdPTR "]", next->tv_sec, next->tv_nsec, timespec_to_atm_round_down(*next)); } - gpr_log(GPR_DEBUG, - "TIMER CHECK BEGIN: now=%" PRIdPTR ".%09d [%" PRIdPTR "] next=%s", - now.tv_sec, now.tv_nsec, now_atm, next_str); + gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRIdPTR ".%09d [%" PRIdPTR + "] next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, + now.tv_sec, now.tv_nsec, now_atm, next_str, + gpr_tls_get(&g_last_seen_min_timer), + gpr_atm_no_barrier_load(&g_shared_mutables.min_timer)); gpr_free(next_str); } bool r; From 0b4c531805f52c1b12710257e3e770ed0ce86ada Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 24 Mar 2017 15:23:57 -0700 Subject: [PATCH 09/24] Add tracing, fix off-by-one error --- src/core/lib/iomgr/timer_generic.c | 73 +++++++++++++++++++++++------- test/core/iomgr/timer_list_test.c | 8 ++++ 2 files changed, 65 insertions(+), 16 deletions(-) diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index f70a36fb7c9..d6c63e724af 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -56,8 +56,8 @@ #define MIN_QUEUE_WINDOW_DURATION 0.01 #define MAX_QUEUE_WINDOW_DURATION 1 -static int grpc_timer_trace = 0; -static int grpc_timer_check_trace = 0; +int grpc_timer_trace = 0; +int grpc_timer_check_trace = 0; typedef struct { gpr_mu mu; @@ -136,7 +136,7 @@ static gpr_timespec atm_to_timespec(gpr_atm x) { static gpr_atm compute_min_deadline(shard_type *shard) { return grpc_timer_heap_is_empty(&shard->heap) - ? shard->queue_deadline_cap + ? saturating_add(shard->queue_deadline_cap, 1) : grpc_timer_heap_top(&shard->heap)->deadline; } @@ -186,10 +186,13 @@ static double ts_to_dbl(gpr_timespec ts) { return (double)ts.tv_sec + 1e-9 * ts.tv_nsec; } -static void list_join(grpc_timer *head, grpc_timer *timer) { +/* returns true if the first element in the list */ +static bool list_join(grpc_timer *head, grpc_timer *timer) { + bool is_first = head->next == head; timer->next = head; timer->prev = head->prev; timer->next->prev = timer->prev->next = timer; + return is_first; } static void list_remove(grpc_timer *timer) { @@ -233,8 +236,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, timer->deadline = timespec_to_atm_round_up(deadline); if (grpc_timer_trace) { - gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRIdPTR ".%09d [%" PRIdPTR - "] now %" PRIdPTR ".%09d [%" PRIdPTR "] call %p[%p]", + gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRId64 ".%09d [%" PRIdPTR + "] now %" PRId64 ".%09d [%" PRIdPTR "] call %p[%p]", timer, deadline.tv_sec, deadline.tv_nsec, timer->deadline, now.tv_sec, now.tv_nsec, timespec_to_atm_round_down(now), closure, closure->cb); @@ -264,7 +267,14 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, is_first_timer = grpc_timer_heap_add(&shard->heap, timer); } else { timer->heap_index = INVALID_HEAP_INDEX; - list_join(&shard->list, timer); + is_first_timer = list_join(&shard->list, timer) && + grpc_timer_heap_is_empty(&shard->heap); + } + if (grpc_timer_trace) { + gpr_log(GPR_DEBUG, " .. add to shard %d with queue_deadline_cap=%" PRIdPTR + " => is_first_timer=%s", + (int)(shard - g_shards), shard->queue_deadline_cap, + is_first_timer ? "true" : "false"); } gpr_mu_unlock(&shard->mu); @@ -281,6 +291,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, grpc_timer_check. */ if (is_first_timer) { gpr_mu_lock(&g_shared_mutables.mu); + gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRIdPTR, + shard->min_deadline); if (timer->deadline < shard->min_deadline) { gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline; shard->min_deadline = timer->deadline; @@ -338,10 +350,17 @@ static int refill_queue(shard_type *shard, gpr_atm now) { shard->queue_deadline_cap = saturating_add(GPR_MAX(now, shard->queue_deadline_cap), (gpr_atm)(deadline_delta * 1000.0)); + + if (grpc_timer_check_trace) { + gpr_log(GPR_DEBUG, " .. shard[%d]->queue_deadline_cap --> %" PRIdPTR, + (int)(shard - g_shards), shard->queue_deadline_cap); + } for (timer = shard->list.next; timer != &shard->list; timer = next) { next = timer->next; if (timer->deadline < shard->queue_deadline_cap) { + gpr_log(GPR_DEBUG, " .. add timer with deadline %" PRIdPTR " to heap", + timer->deadline); list_remove(timer); grpc_timer_heap_add(&shard->heap, timer); } @@ -355,12 +374,20 @@ static int refill_queue(shard_type *shard, gpr_atm now) { static grpc_timer *pop_one(shard_type *shard, gpr_atm now) { grpc_timer *timer; for (;;) { + if (grpc_timer_check_trace) { + gpr_log(GPR_DEBUG, " .. shard[%d]: heap_empty=%s", + (int)(shard - g_shards), + grpc_timer_heap_is_empty(&shard->heap) ? "true" : "false"); + } if (grpc_timer_heap_is_empty(&shard->heap)) { if (now < shard->queue_deadline_cap) return NULL; if (!refill_queue(shard, now)) return NULL; } timer = grpc_timer_heap_top(&shard->heap); - if (timer->deadline >= now) return NULL; + gpr_log(GPR_DEBUG, + " .. check top timer deadline=%" PRIdPTR " now=%" PRIdPTR, + timer->deadline, now); + if (timer->deadline > now) return NULL; timer->pending = false; grpc_timer_heap_pop(&shard->heap); return timer; @@ -405,6 +432,12 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) { gpr_mu_lock(&g_shared_mutables.mu); + if (grpc_timer_check_trace) { + gpr_log(GPR_DEBUG, " .. shard[%d]->min_deadline = %" PRIdPTR, + (int)(g_shard_queue[0] - g_shards), + g_shard_queue[0]->min_deadline); + } + while (g_shard_queue[0]->min_deadline < now) { gpr_atm new_min_deadline; @@ -414,6 +447,14 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, n += pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, error); + if (grpc_timer_check_trace) { + gpr_log(GPR_DEBUG, + " .. popped --> %" PRIdPTR + ", shard[%d]->min_deadline %" PRIdPTR " --> %" PRIdPTR, + n, (int)(g_shard_queue[0] - g_shards), + g_shard_queue[0]->min_deadline, new_min_deadline); + } + /* An grpc_timer_init() on the shard could intervene here, adding a new timer that is earlier than new_min_deadline. However, grpc_timer_init() will block on the master_lock before it can call @@ -440,28 +481,30 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_timespec *next) { + // prelude GPR_ASSERT(now.clock_type == g_clock_type); gpr_atm now_atm = timespec_to_atm_round_down(now); grpc_error *shutdown_error = gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 ? GRPC_ERROR_NONE -<<<<<<< HEAD - : GRPC_ERROR_CREATE("Shutting down timer system"); + : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system"); + // tracing if (grpc_timer_check_trace) { char *next_str; if (next == NULL) { next_str = gpr_strdup("NULL"); } else { - gpr_asprintf(&next_str, "%" PRIdPTR ".%09d [%" PRIdPTR "]", next->tv_sec, + gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, next->tv_nsec, timespec_to_atm_round_down(*next)); } - gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRIdPTR ".%09d [%" PRIdPTR + gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRId64 ".%09d [%" PRIdPTR "] next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, now.tv_sec, now.tv_nsec, now_atm, next_str, gpr_tls_get(&g_last_seen_min_timer), gpr_atm_no_barrier_load(&g_shared_mutables.min_timer)); gpr_free(next_str); } + // actual code bool r; gpr_atm next_atm; if (next == NULL) { @@ -471,12 +514,13 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, r = run_some_expired_timers(exec_ctx, now_atm, &next_atm, shutdown_error); *next = atm_to_timespec(next_atm); } + // tracing if (grpc_timer_check_trace) { char *next_str; if (next == NULL) { next_str = gpr_strdup("NULL"); } else { - gpr_asprintf(&next_str, "%" PRIdPTR ".%09d [%" PRIdPTR "]", next->tv_sec, + gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, next->tv_nsec, next_atm); } gpr_log(GPR_DEBUG, "TIMER CHECK END: %d timers triggered; next=%s", r, @@ -484,9 +528,6 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_free(next_str); } return r > 0; -======= - : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system")); ->>>>>>> 7e6b7df8d6bbb80c19ae1736e0c35b4eab06c541 } #endif /* GRPC_TIMER_USE_GENERIC */ diff --git a/test/core/iomgr/timer_list_test.c b/test/core/iomgr/timer_list_test.c index 5c397b32cbc..46e41dd4490 100644 --- a/test/core/iomgr/timer_list_test.c +++ b/test/core/iomgr/timer_list_test.c @@ -45,6 +45,9 @@ #define MAX_CB 30 +extern int grpc_timer_trace; +extern int grpc_timer_check_trace; + static int cb_called[MAX_CB][2]; static void cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -60,6 +63,8 @@ static void add_test(void) { gpr_log(GPR_INFO, "add_test"); grpc_timer_list_init(start); + grpc_timer_trace = 1; + grpc_timer_check_trace = 1; memset(cb_called, 0, sizeof(cb_called)); /* 10 ms timers. will expire in the current epoch */ @@ -133,6 +138,8 @@ void destruction_test(void) { gpr_log(GPR_INFO, "destruction_test"); grpc_timer_list_init(gpr_time_0(GPR_CLOCK_REALTIME)); + grpc_timer_trace = 1; + grpc_timer_check_trace = 1; memset(cb_called, 0, sizeof(cb_called)); grpc_timer_init( @@ -172,6 +179,7 @@ void destruction_test(void) { int main(int argc, char **argv) { grpc_test_init(argc, argv); + gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); add_test(); destruction_test(); return 0; From 18e7465fae0da39cc2d2f93110ec9b6b1edc4e8d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 24 Mar 2017 15:44:27 -0700 Subject: [PATCH 10/24] Spam fixes --- src/core/lib/iomgr/timer_generic.c | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index d6c63e724af..639911166e4 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -291,8 +291,10 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, grpc_timer_check. */ if (is_first_timer) { gpr_mu_lock(&g_shared_mutables.mu); - gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRIdPTR, - shard->min_deadline); + if (grpc_timer_trace) { + gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRIdPTR, + shard->min_deadline); + } if (timer->deadline < shard->min_deadline) { gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline; shard->min_deadline = timer->deadline; @@ -359,8 +361,10 @@ static int refill_queue(shard_type *shard, gpr_atm now) { next = timer->next; if (timer->deadline < shard->queue_deadline_cap) { - gpr_log(GPR_DEBUG, " .. add timer with deadline %" PRIdPTR " to heap", - timer->deadline); + if (grpc_timer_check_trace) { + gpr_log(GPR_DEBUG, " .. add timer with deadline %" PRIdPTR " to heap", + timer->deadline); + } list_remove(timer); grpc_timer_heap_add(&shard->heap, timer); } @@ -384,9 +388,11 @@ static grpc_timer *pop_one(shard_type *shard, gpr_atm now) { if (!refill_queue(shard, now)) return NULL; } timer = grpc_timer_heap_top(&shard->heap); - gpr_log(GPR_DEBUG, - " .. check top timer deadline=%" PRIdPTR " now=%" PRIdPTR, - timer->deadline, now); + if (grpc_timer_check_trace) { + gpr_log(GPR_DEBUG, + " .. check top timer deadline=%" PRIdPTR " now=%" PRIdPTR, + timer->deadline, now); + } if (timer->deadline > now) return NULL; timer->pending = false; grpc_timer_heap_pop(&shard->heap); From 883243ae97cfbc58a3e125b72bae7b6e6433ed49 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 24 Mar 2017 16:08:08 -0700 Subject: [PATCH 11/24] Fix rounding --- src/core/lib/iomgr/timer_generic.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 639911166e4..6bbe4a507a5 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -115,7 +115,8 @@ static gpr_timespec dbl_to_ts(double d) { static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { ts = gpr_time_sub(ts, g_start_time); double x = GPR_MS_PER_SEC * (double)ts.tv_sec + - (double)ts.tv_nsec / GPR_NS_PER_MS + 1.0; + (double)ts.tv_nsec / GPR_NS_PER_MS + + (double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC; if (x < 0) return 0; if (x > GPR_ATM_MAX) return GPR_ATM_MAX; return (gpr_atm)x; From a0bfee91e7a44afccb361ce49cc58a9083350f22 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 27 Mar 2017 09:21:11 -0700 Subject: [PATCH 12/24] Better logging --- src/core/lib/iomgr/timer_generic.c | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 639911166e4..a77a6403444 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -95,8 +95,8 @@ static gpr_timespec g_start_time; GPR_TLS_DECL(g_last_seen_min_timer); static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { - if (a > GPR_ATM_MAX - b) { - return GPR_ATM_MAX; + if (a > GPR_ATM_MAX - 1 - b) { + return GPR_ATM_MAX - 1; } return a + b; } @@ -117,7 +117,10 @@ static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { double x = GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS + 1.0; if (x < 0) return 0; - if (x > GPR_ATM_MAX) return GPR_ATM_MAX; + // when rounding up (for deadlines) we clamp at 1ms before 'infinity', so that + // when executing with now=infinity (at shutdown) all timers are guaranteed to + // fire + if (x > GPR_ATM_MAX - 1) return GPR_ATM_MAX - 1; return (gpr_atm)x; } @@ -321,6 +324,10 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; gpr_mu_lock(&shard->mu); + if (grpc_timer_trace) { + gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer, + timer->pending ? "true" : "false"); + } if (timer->pending) { grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); timer->pending = false; @@ -394,6 +401,10 @@ static grpc_timer *pop_one(shard_type *shard, gpr_atm now) { timer->deadline, now); } if (timer->deadline > now) return NULL; + if (grpc_timer_trace) { + gpr_log(GPR_DEBUG, "TIMER %p: FIRE %" PRIdPTR "ms late", timer, + now - timer->deadline); + } timer->pending = false; grpc_timer_heap_pop(&shard->heap); return timer; @@ -444,7 +455,7 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, g_shard_queue[0]->min_deadline); } - while (g_shard_queue[0]->min_deadline < now) { + while (g_shard_queue[0]->min_deadline <= now) { gpr_atm new_min_deadline; /* For efficiency, we pop as many available timers as we can from the @@ -454,11 +465,11 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, error); if (grpc_timer_check_trace) { - gpr_log(GPR_DEBUG, - " .. popped --> %" PRIdPTR - ", shard[%d]->min_deadline %" PRIdPTR " --> %" PRIdPTR, + gpr_log(GPR_DEBUG, " .. popped --> %" PRIdPTR + ", shard[%d]->min_deadline %" PRIdPTR + " --> %" PRIdPTR ", now=%" PRIdPTR, n, (int)(g_shard_queue[0] - g_shards), - g_shard_queue[0]->min_deadline, new_min_deadline); + g_shard_queue[0]->min_deadline, new_min_deadline, now); } /* An grpc_timer_init() on the shard could intervene here, adding a new From 041bf64dddf059b58cd9f28c4354733c066b2af8 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 27 Mar 2017 09:26:07 -0700 Subject: [PATCH 13/24] Fix infinite loop --- src/core/lib/iomgr/timer_generic.c | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index a77a6403444..4e4af9abc18 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -95,8 +95,8 @@ static gpr_timespec g_start_time; GPR_TLS_DECL(g_last_seen_min_timer); static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { - if (a > GPR_ATM_MAX - 1 - b) { - return GPR_ATM_MAX - 1; + if (a > GPR_ATM_MAX - b) { + return GPR_ATM_MAX; } return a + b; } @@ -117,10 +117,7 @@ static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { double x = GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS + 1.0; if (x < 0) return 0; - // when rounding up (for deadlines) we clamp at 1ms before 'infinity', so that - // when executing with now=infinity (at shutdown) all timers are guaranteed to - // fire - if (x > GPR_ATM_MAX - 1) return GPR_ATM_MAX - 1; + if (x > GPR_ATM_MAX) return GPR_ATM_MAX; return (gpr_atm)x; } @@ -455,7 +452,8 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, g_shard_queue[0]->min_deadline); } - while (g_shard_queue[0]->min_deadline <= now) { + while (g_shard_queue[0]->min_deadline < now || + (now != GPR_ATM_MAX && g_shard_queue[0]->min_deadline == now)) { gpr_atm new_min_deadline; /* For efficiency, we pop as many available timers as we can from the From 8e2fd9ca4d8cdf610dcb9fc89cceeb6e51ad2467 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 27 Mar 2017 09:32:24 -0700 Subject: [PATCH 14/24] Avoid spin polling --- src/core/lib/iomgr/ev_epoll_linux.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index e6478c11b05..168980ab394 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -1487,8 +1487,9 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, return 0; } timeout = gpr_time_sub(deadline, now); - return gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); + return GPR_MAX( + 1, gpr_time_to_millis(gpr_time_add( + timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)))); } static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, From 799e7e8a4077c1d64fa48b595badba331b9e0498 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 27 Mar 2017 12:42:34 -0700 Subject: [PATCH 15/24] Avoid re-evaluation --- src/core/lib/iomgr/ev_epoll_linux.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 168980ab394..4e472bd6ecd 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -1487,9 +1487,9 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, return 0; } timeout = gpr_time_sub(deadline, now); - return GPR_MAX( - 1, gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)))); + int millis = gpr_time_to_millis(gpr_time_add( + timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); + return millis >= 1 ? millis : 1; } static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, From 1a7692683fc40579180dee72bb2863a9bca827b6 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 27 Mar 2017 12:52:59 -0700 Subject: [PATCH 16/24] Easier to read spam --- src/core/lib/iomgr/timer_generic.c | 35 ++++++++++++++++++------------ 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 4e4af9abc18..208572e7ca6 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -187,12 +187,10 @@ static double ts_to_dbl(gpr_timespec ts) { } /* returns true if the first element in the list */ -static bool list_join(grpc_timer *head, grpc_timer *timer) { - bool is_first = head->next == head; +static void list_join(grpc_timer *head, grpc_timer *timer) { timer->next = head; timer->prev = head->prev; timer->next->prev = timer->prev->next = timer; - return is_first; } static void list_remove(grpc_timer *timer) { @@ -267,8 +265,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, is_first_timer = grpc_timer_heap_add(&shard->heap, timer); } else { timer->heap_index = INVALID_HEAP_INDEX; - is_first_timer = list_join(&shard->list, timer) && - grpc_timer_heap_is_empty(&shard->heap); + list_join(&shard->list, timer); } if (grpc_timer_trace) { gpr_log(GPR_DEBUG, " .. add to shard %d with queue_deadline_cap=%" PRIdPTR @@ -428,15 +425,7 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, gpr_atm *next, grpc_error *error) { size_t n = 0; - /* fetch from a thread-local first: this avoids contention on a globally - mutable cacheline in the common case */ - gpr_atm min_timer = gpr_tls_get(&g_last_seen_min_timer); - if (now < min_timer) { - if (next != NULL) *next = GPR_MIN(*next, min_timer); - return 0; - } - - min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer); + gpr_atm min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer); gpr_tls_set(&g_last_seen_min_timer, min_timer); if (now < min_timer) { if (next != NULL) *next = GPR_MIN(*next, min_timer); @@ -499,10 +488,28 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, // prelude GPR_ASSERT(now.clock_type == g_clock_type); gpr_atm now_atm = timespec_to_atm_round_down(now); + + /* fetch from a thread-local first: this avoids contention on a globally + mutable cacheline in the common case */ + gpr_atm min_timer = gpr_tls_get(&g_last_seen_min_timer); + if (now_atm < min_timer) { + if (next != NULL) { + *next = + atm_to_timespec(GPR_MIN(timespec_to_atm_round_up(*next), min_timer)); + } + if (grpc_timer_check_trace) { + gpr_log(GPR_DEBUG, + "TIMER CHECK SKIP: now_atm=%" PRId64 " min_timer=%" PRId64, + now_atm, min_timer); + } + return 0; + } + grpc_error *shutdown_error = gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system"); + // tracing if (grpc_timer_check_trace) { char *next_str; From 97d401128bf00254ab626b86dfebef4289e6369c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 29 Mar 2017 16:46:13 -0700 Subject: [PATCH 17/24] Fix compile error --- src/core/lib/iomgr/timer_generic.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index e3e0e7fc3f6..c7ce76ad936 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -500,7 +500,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, } if (grpc_timer_check_trace) { gpr_log(GPR_DEBUG, - "TIMER CHECK SKIP: now_atm=%" PRId64 " min_timer=%" PRId64, + "TIMER CHECK SKIP: now_atm=%" PRIdPTR " min_timer=%" PRIdPTR, now_atm, min_timer); } return 0; From cd90c4b534e3897c6e8ad0ae6fbf543d2d7a2fc8 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 31 Mar 2017 06:41:04 -0700 Subject: [PATCH 18/24] Sensitivity --- tools/profiling/microbenchmarks/bm_diff.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/profiling/microbenchmarks/bm_diff.py b/tools/profiling/microbenchmarks/bm_diff.py index 339e9595185..d6119332a77 100755 --- a/tools/profiling/microbenchmarks/bm_diff.py +++ b/tools/profiling/microbenchmarks/bm_diff.py @@ -65,8 +65,8 @@ nanos = { 'pct_diff': 5, } counter = { - 'abs_diff': 1, - 'pct_diff': 1, + 'abs_diff': 0.5, + 'pct_diff': 0.5, } _INTERESTING = { From f5b7e27016c629b45a03734b97e6d6da98ebd06f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 31 Mar 2017 06:41:23 -0700 Subject: [PATCH 19/24] Revert "Spam cleanup" This reverts commit 1463d0e74df002e8d48515ad03d132f5f4cf7ad2. --- tools/profiling/microbenchmarks/bm_diff.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tools/profiling/microbenchmarks/bm_diff.py b/tools/profiling/microbenchmarks/bm_diff.py index d6119332a77..6f39738f594 100755 --- a/tools/profiling/microbenchmarks/bm_diff.py +++ b/tools/profiling/microbenchmarks/bm_diff.py @@ -185,6 +185,9 @@ class Benchmark: old_mdn = median(old) delta = new_mdn - old_mdn ratio = changed_ratio(new_mdn, old_mdn) + print 'new=%r old=%r new_mdn=%f old_mdn=%f delta=%f ratio=%f p=%f' % ( + new, old, new_mdn, old_mdn, delta, ratio, p + ) if p < args.p_threshold and abs(delta) > _INTERESTING[f]['abs_diff'] and abs(ratio) > _INTERESTING[f]['pct_diff']: self.final[f] = delta return self.final.keys() @@ -209,16 +212,19 @@ for bm in comparables: js_old_opt = json.loads(f.read()) for row in bm_json.expand_json(js_new_ctr, js_new_opt): + print row name = row['cpp_name'] if name.endswith('_mean') or name.endswith('_stddev'): continue benchmarks[name].add_sample(row, True) for row in bm_json.expand_json(js_old_ctr, js_old_opt): + print row name = row['cpp_name'] if name.endswith('_mean') or name.endswith('_stddev'): continue benchmarks[name].add_sample(row, False) really_interesting = set() for name, bm in benchmarks.items(): + print name really_interesting.update(bm.process()) fields = [f for f in args.track if f in args.track] From 27703c8e3bac847f137f2d3065810857e9ca6f02 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 31 Mar 2017 06:51:48 -0700 Subject: [PATCH 20/24] Fixes --- tools/profiling/microbenchmarks/bm_diff.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/profiling/microbenchmarks/bm_diff.py b/tools/profiling/microbenchmarks/bm_diff.py index 6f39738f594..4255e29ff2f 100755 --- a/tools/profiling/microbenchmarks/bm_diff.py +++ b/tools/profiling/microbenchmarks/bm_diff.py @@ -176,7 +176,7 @@ class Benchmark: self.samples[new][f].append(float(data[f])) def process(self): - for f in args.track: + for f in sorted(args.track): new = self.samples[True][f] old = self.samples[False][f] if not new or not old: continue @@ -185,10 +185,10 @@ class Benchmark: old_mdn = median(old) delta = new_mdn - old_mdn ratio = changed_ratio(new_mdn, old_mdn) - print 'new=%r old=%r new_mdn=%f old_mdn=%f delta=%f ratio=%f p=%f' % ( - new, old, new_mdn, old_mdn, delta, ratio, p + print '%s: new=%r old=%r new_mdn=%f old_mdn=%f delta=%f(%f:%f) ratio=%f(%f:%f) p=%f' % ( + f, new, old, new_mdn, old_mdn, delta, abs(delta), _INTERESTING[f]['abs_diff'], ratio, abs(ratio), _INTERESTING[f]['pct_diff']/100.0, p ) - if p < args.p_threshold and abs(delta) > _INTERESTING[f]['abs_diff'] and abs(ratio) > _INTERESTING[f]['pct_diff']: + if p < args.p_threshold and abs(delta) > _INTERESTING[f]['abs_diff'] and abs(ratio) > _INTERESTING[f]['pct_diff']/100.0: self.final[f] = delta return self.final.keys() From ac50b27992697eaf806ad9a52332938c5f6b5c89 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 31 Mar 2017 14:54:02 -0700 Subject: [PATCH 21/24] Fix a bug whereby we miss some wakeups in highly concurrent situations --- src/core/lib/iomgr/timer_generic.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index c7ce76ad936..497875a2f20 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -477,6 +477,8 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, g_shard_queue[0]->min_deadline); gpr_mu_unlock(&g_shared_mutables.mu); gpr_spinlock_unlock(&g_shared_mutables.checker_mu); + } else { + if (next != NULL) *next = GPR_MIN(*next, min_timer); } GRPC_ERROR_UNREF(error); From a046ff1d95a58ffec653770498742e221ae27c5a Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 31 Mar 2017 14:57:18 -0700 Subject: [PATCH 22/24] Restore old branch --- src/core/lib/iomgr/timer_generic.c | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 497875a2f20..d8e60684312 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -477,8 +477,17 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, g_shard_queue[0]->min_deadline); gpr_mu_unlock(&g_shared_mutables.mu); gpr_spinlock_unlock(&g_shared_mutables.checker_mu); - } else { - if (next != NULL) *next = GPR_MIN(*next, min_timer); + } else if (next != NULL) { + /* TODO(ctiller): this forces calling code to do an short poll, and + then retry the timer check (because this time through the timer list was + contended). + + We could reduce the cost here dramatically by keeping a count of how + many currently active pollers got through the uncontended case above + successfully, and waking up other pollers IFF that count drops to zero. + + Once that count is in place, this entire else branch could disappear. */ + *next = GPR_MIN(*next, now + 1); } GRPC_ERROR_UNREF(error); From 9d6f04d51976c94690092573c62fb5c5ba7544a9 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 3 Apr 2017 08:17:16 -0700 Subject: [PATCH 23/24] Increase grace period: 300ms is way too short in our test environments --- test/core/end2end/tests/max_connection_age.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/core/end2end/tests/max_connection_age.c b/test/core/end2end/tests/max_connection_age.c index 1de54e08252..59bfdbabb9b 100644 --- a/test/core/end2end/tests/max_connection_age.c +++ b/test/core/end2end/tests/max_connection_age.c @@ -57,7 +57,7 @@ should be shorter than CALL_DEADLINE_S - CQ_MAX_CONNECTION_AGE_WAIT_TIME_S */ #define CQ_MAX_CONNECTION_AGE_GRACE_WAIT_TIME_S 2 /* The grace period for the test to observe the channel shutdown process */ -#define IMMEDIATE_SHUTDOWN_GRACE_TIME_MS 300 +#define IMMEDIATE_SHUTDOWN_GRACE_TIME_MS 3000 static void *tag(intptr_t t) { return (void *)t; } From 3eeba6de6b2bed5a38a6935fba50bfd933d5c387 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 3 Apr 2017 12:36:02 -0700 Subject: [PATCH 24/24] Increase grace time --- test/core/end2end/tests/max_connection_idle.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/core/end2end/tests/max_connection_idle.c b/test/core/end2end/tests/max_connection_idle.c index 9dc1ee47664..c0984e4d14e 100644 --- a/test/core/end2end/tests/max_connection_idle.c +++ b/test/core/end2end/tests/max_connection_idle.c @@ -89,8 +89,8 @@ static void test_max_connection_idle(grpc_end2end_test_config config) { /* wait for the channel to reach its maximum idle time */ grpc_channel_watch_connectivity_state( f.client, GRPC_CHANNEL_READY, - grpc_timeout_milliseconds_to_deadline(MAX_CONNECTION_IDLE_MS + 500), f.cq, - tag(99)); + grpc_timeout_milliseconds_to_deadline(MAX_CONNECTION_IDLE_MS + 3000), + f.cq, tag(99)); CQ_EXPECT_COMPLETION(cqv, tag(99), 1); cq_verify(cqv); state = grpc_channel_check_connectivity_state(f.client, 0);