|
|
@ -37,9 +37,13 @@ |
|
|
|
|
|
|
|
|
|
|
|
#include "src/core/lib/iomgr/timer.h" |
|
|
|
#include "src/core/lib/iomgr/timer.h" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#include <grpc/support/alloc.h> |
|
|
|
#include <grpc/support/log.h> |
|
|
|
#include <grpc/support/log.h> |
|
|
|
|
|
|
|
#include <grpc/support/string_util.h> |
|
|
|
#include <grpc/support/sync.h> |
|
|
|
#include <grpc/support/sync.h> |
|
|
|
|
|
|
|
#include <grpc/support/tls.h> |
|
|
|
#include <grpc/support/useful.h> |
|
|
|
#include <grpc/support/useful.h> |
|
|
|
|
|
|
|
#include "src/core/lib/debug/trace.h" |
|
|
|
#include "src/core/lib/iomgr/time_averaged_stats.h" |
|
|
|
#include "src/core/lib/iomgr/time_averaged_stats.h" |
|
|
|
#include "src/core/lib/iomgr/timer_heap.h" |
|
|
|
#include "src/core/lib/iomgr/timer_heap.h" |
|
|
|
#include "src/core/lib/support/spinlock.h" |
|
|
|
#include "src/core/lib/support/spinlock.h" |
|
|
@ -52,12 +56,15 @@ |
|
|
|
#define MIN_QUEUE_WINDOW_DURATION 0.01 |
|
|
|
#define MIN_QUEUE_WINDOW_DURATION 0.01 |
|
|
|
#define MAX_QUEUE_WINDOW_DURATION 1 |
|
|
|
#define MAX_QUEUE_WINDOW_DURATION 1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int grpc_timer_trace = 0; |
|
|
|
|
|
|
|
int grpc_timer_check_trace = 0; |
|
|
|
|
|
|
|
|
|
|
|
typedef struct { |
|
|
|
typedef struct { |
|
|
|
gpr_mu mu; |
|
|
|
gpr_mu mu; |
|
|
|
grpc_time_averaged_stats stats; |
|
|
|
grpc_time_averaged_stats stats; |
|
|
|
/* All and only timers with deadlines <= this will be in the heap. */ |
|
|
|
/* All and only timers with deadlines <= this will be in the heap. */ |
|
|
|
gpr_timespec queue_deadline_cap; |
|
|
|
gpr_atm queue_deadline_cap; |
|
|
|
gpr_timespec min_deadline; |
|
|
|
gpr_atm min_deadline; |
|
|
|
/* Index in the g_shard_queue */ |
|
|
|
/* Index in the g_shard_queue */ |
|
|
|
uint32_t shard_queue_index; |
|
|
|
uint32_t shard_queue_index; |
|
|
|
/* This holds all timers with deadlines < queue_deadline_cap. Timers in this
|
|
|
|
/* This holds all timers with deadlines < queue_deadline_cap. Timers in this
|
|
|
@ -67,38 +74,92 @@ typedef struct { |
|
|
|
grpc_timer list; |
|
|
|
grpc_timer list; |
|
|
|
} shard_type; |
|
|
|
} shard_type; |
|
|
|
|
|
|
|
|
|
|
|
/* Protects g_shard_queue */ |
|
|
|
struct shared_mutables { |
|
|
|
static gpr_mu g_mu; |
|
|
|
gpr_atm min_timer; |
|
|
|
/* Allow only one run_some_expired_timers at once */ |
|
|
|
/* Allow only one run_some_expired_timers at once */ |
|
|
|
static gpr_spinlock g_checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER; |
|
|
|
gpr_spinlock checker_mu; |
|
|
|
|
|
|
|
bool initialized; |
|
|
|
|
|
|
|
/* Protects g_shard_queue */ |
|
|
|
|
|
|
|
gpr_mu mu; |
|
|
|
|
|
|
|
} GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static struct shared_mutables g_shared_mutables = { |
|
|
|
|
|
|
|
.checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false, |
|
|
|
|
|
|
|
}; |
|
|
|
static gpr_clock_type g_clock_type; |
|
|
|
static gpr_clock_type g_clock_type; |
|
|
|
static shard_type g_shards[NUM_SHARDS]; |
|
|
|
static shard_type g_shards[NUM_SHARDS]; |
|
|
|
/* Protected by g_mu */ |
|
|
|
/* Protected by g_shared_mutables.mu */ |
|
|
|
static shard_type *g_shard_queue[NUM_SHARDS]; |
|
|
|
static shard_type *g_shard_queue[NUM_SHARDS]; |
|
|
|
static bool g_initialized = false; |
|
|
|
static gpr_timespec g_start_time; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
GPR_TLS_DECL(g_last_seen_min_timer); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { |
|
|
|
|
|
|
|
if (a > GPR_ATM_MAX - b) { |
|
|
|
|
|
|
|
return GPR_ATM_MAX; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return a + b; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, |
|
|
|
|
|
|
|
gpr_atm *next, grpc_error *error); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static gpr_timespec dbl_to_ts(double d) { |
|
|
|
|
|
|
|
gpr_timespec ts; |
|
|
|
|
|
|
|
ts.tv_sec = (int64_t)d; |
|
|
|
|
|
|
|
ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec)); |
|
|
|
|
|
|
|
ts.clock_type = GPR_TIMESPAN; |
|
|
|
|
|
|
|
return ts; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { |
|
|
|
|
|
|
|
ts = gpr_time_sub(ts, g_start_time); |
|
|
|
|
|
|
|
double x = GPR_MS_PER_SEC * (double)ts.tv_sec + |
|
|
|
|
|
|
|
(double)ts.tv_nsec / GPR_NS_PER_MS + |
|
|
|
|
|
|
|
(double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC; |
|
|
|
|
|
|
|
if (x < 0) return 0; |
|
|
|
|
|
|
|
if (x > GPR_ATM_MAX) return GPR_ATM_MAX; |
|
|
|
|
|
|
|
return (gpr_atm)x; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { |
|
|
|
|
|
|
|
ts = gpr_time_sub(ts, g_start_time); |
|
|
|
|
|
|
|
double x = |
|
|
|
|
|
|
|
GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; |
|
|
|
|
|
|
|
if (x < 0) return 0; |
|
|
|
|
|
|
|
if (x > GPR_ATM_MAX) return GPR_ATM_MAX; |
|
|
|
|
|
|
|
return (gpr_atm)x; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, |
|
|
|
static gpr_timespec atm_to_timespec(gpr_atm x) { |
|
|
|
gpr_timespec *next, grpc_error *error); |
|
|
|
return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static gpr_timespec compute_min_deadline(shard_type *shard) { |
|
|
|
static gpr_atm compute_min_deadline(shard_type *shard) { |
|
|
|
return grpc_timer_heap_is_empty(&shard->heap) |
|
|
|
return grpc_timer_heap_is_empty(&shard->heap) |
|
|
|
? shard->queue_deadline_cap |
|
|
|
? saturating_add(shard->queue_deadline_cap, 1) |
|
|
|
: grpc_timer_heap_top(&shard->heap)->deadline; |
|
|
|
: grpc_timer_heap_top(&shard->heap)->deadline; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void grpc_timer_list_init(gpr_timespec now) { |
|
|
|
void grpc_timer_list_init(gpr_timespec now) { |
|
|
|
uint32_t i; |
|
|
|
uint32_t i; |
|
|
|
|
|
|
|
|
|
|
|
g_initialized = true; |
|
|
|
g_shared_mutables.initialized = true; |
|
|
|
gpr_mu_init(&g_mu); |
|
|
|
gpr_mu_init(&g_shared_mutables.mu); |
|
|
|
g_clock_type = now.clock_type; |
|
|
|
g_clock_type = now.clock_type; |
|
|
|
|
|
|
|
g_start_time = now; |
|
|
|
|
|
|
|
g_shared_mutables.min_timer = timespec_to_atm_round_down(now); |
|
|
|
|
|
|
|
gpr_tls_init(&g_last_seen_min_timer); |
|
|
|
|
|
|
|
gpr_tls_set(&g_last_seen_min_timer, 0); |
|
|
|
|
|
|
|
grpc_register_tracer("timer", &grpc_timer_trace); |
|
|
|
|
|
|
|
grpc_register_tracer("timer_check", &grpc_timer_check_trace); |
|
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < NUM_SHARDS; i++) { |
|
|
|
for (i = 0; i < NUM_SHARDS; i++) { |
|
|
|
shard_type *shard = &g_shards[i]; |
|
|
|
shard_type *shard = &g_shards[i]; |
|
|
|
gpr_mu_init(&shard->mu); |
|
|
|
gpr_mu_init(&shard->mu); |
|
|
|
grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1, |
|
|
|
grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1, |
|
|
|
0.5); |
|
|
|
0.5); |
|
|
|
shard->queue_deadline_cap = now; |
|
|
|
shard->queue_deadline_cap = g_shared_mutables.min_timer; |
|
|
|
shard->shard_queue_index = i; |
|
|
|
shard->shard_queue_index = i; |
|
|
|
grpc_timer_heap_init(&shard->heap); |
|
|
|
grpc_timer_heap_init(&shard->heap); |
|
|
|
shard->list.next = shard->list.prev = &shard->list; |
|
|
|
shard->list.next = shard->list.prev = &shard->list; |
|
|
@ -110,29 +171,23 @@ void grpc_timer_list_init(gpr_timespec now) { |
|
|
|
void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { |
|
|
|
void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { |
|
|
|
int i; |
|
|
|
int i; |
|
|
|
run_some_expired_timers( |
|
|
|
run_some_expired_timers( |
|
|
|
exec_ctx, gpr_inf_future(g_clock_type), NULL, |
|
|
|
exec_ctx, GPR_ATM_MAX, NULL, |
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown")); |
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown")); |
|
|
|
for (i = 0; i < NUM_SHARDS; i++) { |
|
|
|
for (i = 0; i < NUM_SHARDS; i++) { |
|
|
|
shard_type *shard = &g_shards[i]; |
|
|
|
shard_type *shard = &g_shards[i]; |
|
|
|
gpr_mu_destroy(&shard->mu); |
|
|
|
gpr_mu_destroy(&shard->mu); |
|
|
|
grpc_timer_heap_destroy(&shard->heap); |
|
|
|
grpc_timer_heap_destroy(&shard->heap); |
|
|
|
} |
|
|
|
} |
|
|
|
gpr_mu_destroy(&g_mu); |
|
|
|
gpr_mu_destroy(&g_shared_mutables.mu); |
|
|
|
g_initialized = false; |
|
|
|
gpr_tls_destroy(&g_last_seen_min_timer); |
|
|
|
|
|
|
|
g_shared_mutables.initialized = false; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static double ts_to_dbl(gpr_timespec ts) { |
|
|
|
static double ts_to_dbl(gpr_timespec ts) { |
|
|
|
return (double)ts.tv_sec + 1e-9 * ts.tv_nsec; |
|
|
|
return (double)ts.tv_sec + 1e-9 * ts.tv_nsec; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static gpr_timespec dbl_to_ts(double d) { |
|
|
|
/* returns true if the first element in the list */ |
|
|
|
gpr_timespec ts; |
|
|
|
|
|
|
|
ts.tv_sec = (int64_t)d; |
|
|
|
|
|
|
|
ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec)); |
|
|
|
|
|
|
|
ts.clock_type = GPR_TIMESPAN; |
|
|
|
|
|
|
|
return ts; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void list_join(grpc_timer *head, grpc_timer *timer) { |
|
|
|
static void list_join(grpc_timer *head, grpc_timer *timer) { |
|
|
|
timer->next = head; |
|
|
|
timer->next = head; |
|
|
|
timer->prev = head->prev; |
|
|
|
timer->prev = head->prev; |
|
|
@ -158,15 +213,13 @@ static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) { |
|
|
|
|
|
|
|
|
|
|
|
static void note_deadline_change(shard_type *shard) { |
|
|
|
static void note_deadline_change(shard_type *shard) { |
|
|
|
while (shard->shard_queue_index > 0 && |
|
|
|
while (shard->shard_queue_index > 0 && |
|
|
|
gpr_time_cmp( |
|
|
|
shard->min_deadline < |
|
|
|
shard->min_deadline, |
|
|
|
g_shard_queue[shard->shard_queue_index - 1]->min_deadline) { |
|
|
|
g_shard_queue[shard->shard_queue_index - 1]->min_deadline) < 0) { |
|
|
|
|
|
|
|
swap_adjacent_shards_in_queue(shard->shard_queue_index - 1); |
|
|
|
swap_adjacent_shards_in_queue(shard->shard_queue_index - 1); |
|
|
|
} |
|
|
|
} |
|
|
|
while (shard->shard_queue_index < NUM_SHARDS - 1 && |
|
|
|
while (shard->shard_queue_index < NUM_SHARDS - 1 && |
|
|
|
gpr_time_cmp( |
|
|
|
shard->min_deadline > |
|
|
|
shard->min_deadline, |
|
|
|
g_shard_queue[shard->shard_queue_index + 1]->min_deadline) { |
|
|
|
g_shard_queue[shard->shard_queue_index + 1]->min_deadline) > 0) { |
|
|
|
|
|
|
|
swap_adjacent_shards_in_queue(shard->shard_queue_index); |
|
|
|
swap_adjacent_shards_in_queue(shard->shard_queue_index); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -179,9 +232,17 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, |
|
|
|
GPR_ASSERT(deadline.clock_type == g_clock_type); |
|
|
|
GPR_ASSERT(deadline.clock_type == g_clock_type); |
|
|
|
GPR_ASSERT(now.clock_type == g_clock_type); |
|
|
|
GPR_ASSERT(now.clock_type == g_clock_type); |
|
|
|
timer->closure = closure; |
|
|
|
timer->closure = closure; |
|
|
|
timer->deadline = deadline; |
|
|
|
timer->deadline = timespec_to_atm_round_up(deadline); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (grpc_timer_trace) { |
|
|
|
|
|
|
|
gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRId64 ".%09d [%" PRIdPTR |
|
|
|
|
|
|
|
"] now %" PRId64 ".%09d [%" PRIdPTR "] call %p[%p]", |
|
|
|
|
|
|
|
timer, deadline.tv_sec, deadline.tv_nsec, timer->deadline, |
|
|
|
|
|
|
|
now.tv_sec, now.tv_nsec, timespec_to_atm_round_down(now), closure, |
|
|
|
|
|
|
|
closure->cb); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (!g_initialized) { |
|
|
|
if (!g_shared_mutables.initialized) { |
|
|
|
timer->pending = false; |
|
|
|
timer->pending = false; |
|
|
|
grpc_closure_sched(exec_ctx, timer->closure, |
|
|
|
grpc_closure_sched(exec_ctx, timer->closure, |
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
@ -201,12 +262,18 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, |
|
|
|
|
|
|
|
|
|
|
|
grpc_time_averaged_stats_add_sample(&shard->stats, |
|
|
|
grpc_time_averaged_stats_add_sample(&shard->stats, |
|
|
|
ts_to_dbl(gpr_time_sub(deadline, now))); |
|
|
|
ts_to_dbl(gpr_time_sub(deadline, now))); |
|
|
|
if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) { |
|
|
|
if (timer->deadline < shard->queue_deadline_cap) { |
|
|
|
is_first_timer = grpc_timer_heap_add(&shard->heap, timer); |
|
|
|
is_first_timer = grpc_timer_heap_add(&shard->heap, timer); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
timer->heap_index = INVALID_HEAP_INDEX; |
|
|
|
timer->heap_index = INVALID_HEAP_INDEX; |
|
|
|
list_join(&shard->list, timer); |
|
|
|
list_join(&shard->list, timer); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if (grpc_timer_trace) { |
|
|
|
|
|
|
|
gpr_log(GPR_DEBUG, " .. add to shard %d with queue_deadline_cap=%" PRIdPTR |
|
|
|
|
|
|
|
" => is_first_timer=%s", |
|
|
|
|
|
|
|
(int)(shard - g_shards), shard->queue_deadline_cap, |
|
|
|
|
|
|
|
is_first_timer ? "true" : "false"); |
|
|
|
|
|
|
|
} |
|
|
|
gpr_mu_unlock(&shard->mu); |
|
|
|
gpr_mu_unlock(&shard->mu); |
|
|
|
|
|
|
|
|
|
|
|
/* Deadline may have decreased, we need to adjust the master queue. Note
|
|
|
|
/* Deadline may have decreased, we need to adjust the master queue. Note
|
|
|
@ -221,28 +288,41 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, |
|
|
|
In that case, the timer will simply have to wait for the next |
|
|
|
In that case, the timer will simply have to wait for the next |
|
|
|
grpc_timer_check. */ |
|
|
|
grpc_timer_check. */ |
|
|
|
if (is_first_timer) { |
|
|
|
if (is_first_timer) { |
|
|
|
gpr_mu_lock(&g_mu); |
|
|
|
gpr_mu_lock(&g_shared_mutables.mu); |
|
|
|
if (gpr_time_cmp(deadline, shard->min_deadline) < 0) { |
|
|
|
if (grpc_timer_trace) { |
|
|
|
gpr_timespec old_min_deadline = g_shard_queue[0]->min_deadline; |
|
|
|
gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRIdPTR, |
|
|
|
shard->min_deadline = deadline; |
|
|
|
shard->min_deadline); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if (timer->deadline < shard->min_deadline) { |
|
|
|
|
|
|
|
gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline; |
|
|
|
|
|
|
|
shard->min_deadline = timer->deadline; |
|
|
|
note_deadline_change(shard); |
|
|
|
note_deadline_change(shard); |
|
|
|
if (shard->shard_queue_index == 0 && |
|
|
|
if (shard->shard_queue_index == 0 && timer->deadline < old_min_deadline) { |
|
|
|
gpr_time_cmp(deadline, old_min_deadline) < 0) { |
|
|
|
gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, timer->deadline); |
|
|
|
grpc_kick_poller(); |
|
|
|
grpc_kick_poller(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
gpr_mu_unlock(&g_mu); |
|
|
|
gpr_mu_unlock(&g_shared_mutables.mu); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void grpc_timer_consume_kick(void) { |
|
|
|
|
|
|
|
/* force re-evaluation of last seeen min */ |
|
|
|
|
|
|
|
gpr_tls_set(&g_last_seen_min_timer, 0); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { |
|
|
|
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { |
|
|
|
if (!g_initialized) { |
|
|
|
if (!g_shared_mutables.initialized) { |
|
|
|
/* must have already been cancelled, also the shard mutex is invalid */ |
|
|
|
/* must have already been cancelled, also the shard mutex is invalid */ |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; |
|
|
|
shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; |
|
|
|
gpr_mu_lock(&shard->mu); |
|
|
|
gpr_mu_lock(&shard->mu); |
|
|
|
|
|
|
|
if (grpc_timer_trace) { |
|
|
|
|
|
|
|
gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer, |
|
|
|
|
|
|
|
timer->pending ? "true" : "false"); |
|
|
|
|
|
|
|
} |
|
|
|
if (timer->pending) { |
|
|
|
if (timer->pending) { |
|
|
|
grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); |
|
|
|
grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); |
|
|
|
timer->pending = false; |
|
|
|
timer->pending = false; |
|
|
@ -260,7 +340,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { |
|
|
|
for timers that fall at or under it. Returns true if the queue is no |
|
|
|
for timers that fall at or under it. Returns true if the queue is no |
|
|
|
longer empty. |
|
|
|
longer empty. |
|
|
|
REQUIRES: shard->mu locked */ |
|
|
|
REQUIRES: shard->mu locked */ |
|
|
|
static int refill_queue(shard_type *shard, gpr_timespec now) { |
|
|
|
static int refill_queue(shard_type *shard, gpr_atm now) { |
|
|
|
/* Compute the new queue window width and bound by the limits: */ |
|
|
|
/* Compute the new queue window width and bound by the limits: */ |
|
|
|
double computed_deadline_delta = |
|
|
|
double computed_deadline_delta = |
|
|
|
grpc_time_averaged_stats_update_average(&shard->stats) * |
|
|
|
grpc_time_averaged_stats_update_average(&shard->stats) * |
|
|
@ -271,12 +351,22 @@ static int refill_queue(shard_type *shard, gpr_timespec now) { |
|
|
|
grpc_timer *timer, *next; |
|
|
|
grpc_timer *timer, *next; |
|
|
|
|
|
|
|
|
|
|
|
/* Compute the new cap and put all timers under it into the queue: */ |
|
|
|
/* Compute the new cap and put all timers under it into the queue: */ |
|
|
|
shard->queue_deadline_cap = gpr_time_add( |
|
|
|
shard->queue_deadline_cap = |
|
|
|
gpr_time_max(now, shard->queue_deadline_cap), dbl_to_ts(deadline_delta)); |
|
|
|
saturating_add(GPR_MAX(now, shard->queue_deadline_cap), |
|
|
|
|
|
|
|
(gpr_atm)(deadline_delta * 1000.0)); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (grpc_timer_check_trace) { |
|
|
|
|
|
|
|
gpr_log(GPR_DEBUG, " .. shard[%d]->queue_deadline_cap --> %" PRIdPTR, |
|
|
|
|
|
|
|
(int)(shard - g_shards), shard->queue_deadline_cap); |
|
|
|
|
|
|
|
} |
|
|
|
for (timer = shard->list.next; timer != &shard->list; timer = next) { |
|
|
|
for (timer = shard->list.next; timer != &shard->list; timer = next) { |
|
|
|
next = timer->next; |
|
|
|
next = timer->next; |
|
|
|
|
|
|
|
|
|
|
|
if (gpr_time_cmp(timer->deadline, shard->queue_deadline_cap) < 0) { |
|
|
|
if (timer->deadline < shard->queue_deadline_cap) { |
|
|
|
|
|
|
|
if (grpc_timer_check_trace) { |
|
|
|
|
|
|
|
gpr_log(GPR_DEBUG, " .. add timer with deadline %" PRIdPTR " to heap", |
|
|
|
|
|
|
|
timer->deadline); |
|
|
|
|
|
|
|
} |
|
|
|
list_remove(timer); |
|
|
|
list_remove(timer); |
|
|
|
grpc_timer_heap_add(&shard->heap, timer); |
|
|
|
grpc_timer_heap_add(&shard->heap, timer); |
|
|
|
} |
|
|
|
} |
|
|
@ -287,15 +377,29 @@ static int refill_queue(shard_type *shard, gpr_timespec now) { |
|
|
|
/* This pops the next non-cancelled timer with deadline <= now from the
|
|
|
|
/* This pops the next non-cancelled timer with deadline <= now from the
|
|
|
|
queue, or returns NULL if there isn't one. |
|
|
|
queue, or returns NULL if there isn't one. |
|
|
|
REQUIRES: shard->mu locked */ |
|
|
|
REQUIRES: shard->mu locked */ |
|
|
|
static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) { |
|
|
|
static grpc_timer *pop_one(shard_type *shard, gpr_atm now) { |
|
|
|
grpc_timer *timer; |
|
|
|
grpc_timer *timer; |
|
|
|
for (;;) { |
|
|
|
for (;;) { |
|
|
|
|
|
|
|
if (grpc_timer_check_trace) { |
|
|
|
|
|
|
|
gpr_log(GPR_DEBUG, " .. shard[%d]: heap_empty=%s", |
|
|
|
|
|
|
|
(int)(shard - g_shards), |
|
|
|
|
|
|
|
grpc_timer_heap_is_empty(&shard->heap) ? "true" : "false"); |
|
|
|
|
|
|
|
} |
|
|
|
if (grpc_timer_heap_is_empty(&shard->heap)) { |
|
|
|
if (grpc_timer_heap_is_empty(&shard->heap)) { |
|
|
|
if (gpr_time_cmp(now, shard->queue_deadline_cap) < 0) return NULL; |
|
|
|
if (now < shard->queue_deadline_cap) return NULL; |
|
|
|
if (!refill_queue(shard, now)) return NULL; |
|
|
|
if (!refill_queue(shard, now)) return NULL; |
|
|
|
} |
|
|
|
} |
|
|
|
timer = grpc_timer_heap_top(&shard->heap); |
|
|
|
timer = grpc_timer_heap_top(&shard->heap); |
|
|
|
if (gpr_time_cmp(timer->deadline, now) > 0) return NULL; |
|
|
|
if (grpc_timer_check_trace) { |
|
|
|
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
|
|
|
" .. check top timer deadline=%" PRIdPTR " now=%" PRIdPTR, |
|
|
|
|
|
|
|
timer->deadline, now); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if (timer->deadline > now) return NULL; |
|
|
|
|
|
|
|
if (grpc_timer_trace) { |
|
|
|
|
|
|
|
gpr_log(GPR_DEBUG, "TIMER %p: FIRE %" PRIdPTR "ms late", timer, |
|
|
|
|
|
|
|
now - timer->deadline); |
|
|
|
|
|
|
|
} |
|
|
|
timer->pending = false; |
|
|
|
timer->pending = false; |
|
|
|
grpc_timer_heap_pop(&shard->heap); |
|
|
|
grpc_timer_heap_pop(&shard->heap); |
|
|
|
return timer; |
|
|
|
return timer; |
|
|
@ -304,7 +408,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) { |
|
|
|
|
|
|
|
|
|
|
|
/* REQUIRES: shard->mu unlocked */ |
|
|
|
/* REQUIRES: shard->mu unlocked */ |
|
|
|
static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, |
|
|
|
static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, |
|
|
|
gpr_timespec now, gpr_timespec *new_min_deadline, |
|
|
|
gpr_atm now, gpr_atm *new_min_deadline, |
|
|
|
grpc_error *error) { |
|
|
|
grpc_error *error) { |
|
|
|
size_t n = 0; |
|
|
|
size_t n = 0; |
|
|
|
grpc_timer *timer; |
|
|
|
grpc_timer *timer; |
|
|
@ -318,17 +422,29 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, |
|
|
|
return n; |
|
|
|
return n; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, |
|
|
|
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, |
|
|
|
gpr_timespec *next, grpc_error *error) { |
|
|
|
gpr_atm *next, grpc_error *error) { |
|
|
|
size_t n = 0; |
|
|
|
size_t n = 0; |
|
|
|
|
|
|
|
|
|
|
|
/* TODO(ctiller): verify that there are any timers (atomically) here */ |
|
|
|
gpr_atm min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer); |
|
|
|
|
|
|
|
gpr_tls_set(&g_last_seen_min_timer, min_timer); |
|
|
|
|
|
|
|
if (now < min_timer) { |
|
|
|
|
|
|
|
if (next != NULL) *next = GPR_MIN(*next, min_timer); |
|
|
|
|
|
|
|
return 0; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (gpr_spinlock_trylock(&g_checker_mu)) { |
|
|
|
if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) { |
|
|
|
gpr_mu_lock(&g_mu); |
|
|
|
gpr_mu_lock(&g_shared_mutables.mu); |
|
|
|
|
|
|
|
|
|
|
|
while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) { |
|
|
|
if (grpc_timer_check_trace) { |
|
|
|
gpr_timespec new_min_deadline; |
|
|
|
gpr_log(GPR_DEBUG, " .. shard[%d]->min_deadline = %" PRIdPTR, |
|
|
|
|
|
|
|
(int)(g_shard_queue[0] - g_shards), |
|
|
|
|
|
|
|
g_shard_queue[0]->min_deadline); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
while (g_shard_queue[0]->min_deadline < now || |
|
|
|
|
|
|
|
(now != GPR_ATM_MAX && g_shard_queue[0]->min_deadline == now)) { |
|
|
|
|
|
|
|
gpr_atm new_min_deadline; |
|
|
|
|
|
|
|
|
|
|
|
/* For efficiency, we pop as many available timers as we can from the
|
|
|
|
/* For efficiency, we pop as many available timers as we can from the
|
|
|
|
shard. This may violate perfect timer deadline ordering, but that |
|
|
|
shard. This may violate perfect timer deadline ordering, but that |
|
|
@ -336,6 +452,14 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, |
|
|
|
n += |
|
|
|
n += |
|
|
|
pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, error); |
|
|
|
pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, error); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (grpc_timer_check_trace) { |
|
|
|
|
|
|
|
gpr_log(GPR_DEBUG, " .. popped --> %" PRIdPTR |
|
|
|
|
|
|
|
", shard[%d]->min_deadline %" PRIdPTR |
|
|
|
|
|
|
|
" --> %" PRIdPTR ", now=%" PRIdPTR, |
|
|
|
|
|
|
|
n, (int)(g_shard_queue[0] - g_shards), |
|
|
|
|
|
|
|
g_shard_queue[0]->min_deadline, new_min_deadline, now); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/* An grpc_timer_init() on the shard could intervene here, adding a new
|
|
|
|
/* An grpc_timer_init() on the shard could intervene here, adding a new
|
|
|
|
timer that is earlier than new_min_deadline. However, |
|
|
|
timer that is earlier than new_min_deadline. However, |
|
|
|
grpc_timer_init() will block on the master_lock before it can call |
|
|
|
grpc_timer_init() will block on the master_lock before it can call |
|
|
@ -346,23 +470,24 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (next) { |
|
|
|
if (next) { |
|
|
|
*next = gpr_time_min(*next, g_shard_queue[0]->min_deadline); |
|
|
|
*next = GPR_MIN(*next, g_shard_queue[0]->min_deadline); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&g_mu); |
|
|
|
gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, |
|
|
|
gpr_spinlock_unlock(&g_checker_mu); |
|
|
|
g_shard_queue[0]->min_deadline); |
|
|
|
|
|
|
|
gpr_mu_unlock(&g_shared_mutables.mu); |
|
|
|
|
|
|
|
gpr_spinlock_unlock(&g_shared_mutables.checker_mu); |
|
|
|
} else if (next != NULL) { |
|
|
|
} else if (next != NULL) { |
|
|
|
/* TODO(ctiller): this forces calling code to do an short poll, and
|
|
|
|
/* TODO(ctiller): this forces calling code to do an short poll, and
|
|
|
|
then retry the timer check (because this time through the timer list was |
|
|
|
then retry the timer check (because this time through the timer list was |
|
|
|
contended). |
|
|
|
contended). |
|
|
|
|
|
|
|
|
|
|
|
We could reduce the cost here dramatically by keeping a count of how many |
|
|
|
We could reduce the cost here dramatically by keeping a count of how |
|
|
|
currently active pollers got through the uncontended case above |
|
|
|
many currently active pollers got through the uncontended case above |
|
|
|
successfully, and waking up other pollers IFF that count drops to zero. |
|
|
|
successfully, and waking up other pollers IFF that count drops to zero. |
|
|
|
|
|
|
|
|
|
|
|
Once that count is in place, this entire else branch could disappear. */ |
|
|
|
Once that count is in place, this entire else branch could disappear. */ |
|
|
|
*next = gpr_time_min( |
|
|
|
*next = GPR_MIN(*next, now + 1); |
|
|
|
*next, gpr_time_add(now, gpr_time_from_millis(1, GPR_TIMESPAN))); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
@ -372,12 +497,71 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, |
|
|
|
|
|
|
|
|
|
|
|
bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, |
|
|
|
bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, |
|
|
|
gpr_timespec *next) { |
|
|
|
gpr_timespec *next) { |
|
|
|
|
|
|
|
// prelude
|
|
|
|
GPR_ASSERT(now.clock_type == g_clock_type); |
|
|
|
GPR_ASSERT(now.clock_type == g_clock_type); |
|
|
|
return run_some_expired_timers( |
|
|
|
gpr_atm now_atm = timespec_to_atm_round_down(now); |
|
|
|
exec_ctx, now, next, |
|
|
|
|
|
|
|
|
|
|
|
/* fetch from a thread-local first: this avoids contention on a globally
|
|
|
|
|
|
|
|
mutable cacheline in the common case */ |
|
|
|
|
|
|
|
gpr_atm min_timer = gpr_tls_get(&g_last_seen_min_timer); |
|
|
|
|
|
|
|
if (now_atm < min_timer) { |
|
|
|
|
|
|
|
if (next != NULL) { |
|
|
|
|
|
|
|
*next = |
|
|
|
|
|
|
|
atm_to_timespec(GPR_MIN(timespec_to_atm_round_up(*next), min_timer)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if (grpc_timer_check_trace) { |
|
|
|
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
|
|
|
"TIMER CHECK SKIP: now_atm=%" PRIdPTR " min_timer=%" PRIdPTR, |
|
|
|
|
|
|
|
now_atm, min_timer); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return 0; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
grpc_error *shutdown_error = |
|
|
|
gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 |
|
|
|
gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 |
|
|
|
? GRPC_ERROR_NONE |
|
|
|
? GRPC_ERROR_NONE |
|
|
|
: GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system")); |
|
|
|
: GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// tracing
|
|
|
|
|
|
|
|
if (grpc_timer_check_trace) { |
|
|
|
|
|
|
|
char *next_str; |
|
|
|
|
|
|
|
if (next == NULL) { |
|
|
|
|
|
|
|
next_str = gpr_strdup("NULL"); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, |
|
|
|
|
|
|
|
next->tv_nsec, timespec_to_atm_round_down(*next)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRId64 ".%09d [%" PRIdPTR |
|
|
|
|
|
|
|
"] next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, |
|
|
|
|
|
|
|
now.tv_sec, now.tv_nsec, now_atm, next_str, |
|
|
|
|
|
|
|
gpr_tls_get(&g_last_seen_min_timer), |
|
|
|
|
|
|
|
gpr_atm_no_barrier_load(&g_shared_mutables.min_timer)); |
|
|
|
|
|
|
|
gpr_free(next_str); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// actual code
|
|
|
|
|
|
|
|
bool r; |
|
|
|
|
|
|
|
gpr_atm next_atm; |
|
|
|
|
|
|
|
if (next == NULL) { |
|
|
|
|
|
|
|
r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
next_atm = timespec_to_atm_round_down(*next); |
|
|
|
|
|
|
|
r = run_some_expired_timers(exec_ctx, now_atm, &next_atm, shutdown_error); |
|
|
|
|
|
|
|
*next = atm_to_timespec(next_atm); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// tracing
|
|
|
|
|
|
|
|
if (grpc_timer_check_trace) { |
|
|
|
|
|
|
|
char *next_str; |
|
|
|
|
|
|
|
if (next == NULL) { |
|
|
|
|
|
|
|
next_str = gpr_strdup("NULL"); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, |
|
|
|
|
|
|
|
next->tv_nsec, next_atm); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
gpr_log(GPR_DEBUG, "TIMER CHECK END: %d timers triggered; next=%s", r, |
|
|
|
|
|
|
|
next_str); |
|
|
|
|
|
|
|
gpr_free(next_str); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return r > 0; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
#endif /* GRPC_TIMER_USE_GENERIC */ |
|
|
|
#endif /* GRPC_TIMER_USE_GENERIC */ |
|
|
|