|
|
|
@ -25,6 +25,7 @@ |
|
|
|
|
#include "src/core/lib/iomgr/timer.h" |
|
|
|
|
|
|
|
|
|
#include <grpc/support/alloc.h> |
|
|
|
|
#include <grpc/support/cpu.h> |
|
|
|
|
#include <grpc/support/log.h> |
|
|
|
|
#include <grpc/support/string_util.h> |
|
|
|
|
#include <grpc/support/sync.h> |
|
|
|
@ -37,14 +38,15 @@ |
|
|
|
|
|
|
|
|
|
#define INVALID_HEAP_INDEX 0xffffffffu |
|
|
|
|
|
|
|
|
|
#define LOG2_NUM_SHARDS 5 |
|
|
|
|
#define NUM_SHARDS (1 << LOG2_NUM_SHARDS) |
|
|
|
|
#define ADD_DEADLINE_SCALE 0.33 |
|
|
|
|
#define MIN_QUEUE_WINDOW_DURATION 0.01 |
|
|
|
|
#define MAX_QUEUE_WINDOW_DURATION 1 |
|
|
|
|
|
|
|
|
|
grpc_core::TraceFlag grpc_timer_trace(false, "timer"); |
|
|
|
|
grpc_core::TraceFlag grpc_timer_check_trace(false, "timer_check"); |
|
|
|
|
extern "C" { |
|
|
|
|
grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false, "timer"); |
|
|
|
|
grpc_tracer_flag grpc_timer_check_trace = |
|
|
|
|
GRPC_TRACER_INITIALIZER(false, "timer_check"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with
|
|
|
|
|
* deadlines earlier than 'queue_deadline" cap are maintained in the heap and |
|
|
|
@ -71,14 +73,16 @@ typedef struct { |
|
|
|
|
grpc_timer list; |
|
|
|
|
} timer_shard; |
|
|
|
|
|
|
|
|
|
static size_t g_num_shards; |
|
|
|
|
|
|
|
|
|
/* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address
|
|
|
|
|
* is hashed to select the timer shard to add the timer to */ |
|
|
|
|
static timer_shard g_shards[NUM_SHARDS]; |
|
|
|
|
static timer_shard* g_shards; |
|
|
|
|
|
|
|
|
|
/* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
|
|
|
|
|
* the deadline of the next timer in each shard). |
|
|
|
|
* Access to this is protected by g_shared_mutables.mu */ |
|
|
|
|
static timer_shard* g_shard_queue[NUM_SHARDS]; |
|
|
|
|
static timer_shard** g_shard_queue; |
|
|
|
|
|
|
|
|
|
#ifndef NDEBUG |
|
|
|
|
|
|
|
|
@ -87,7 +91,7 @@ static timer_shard* g_shard_queue[NUM_SHARDS]; |
|
|
|
|
#define NUM_HASH_BUCKETS 1009 /* Prime number close to 1000 */ |
|
|
|
|
|
|
|
|
|
static gpr_mu g_hash_mu[NUM_HASH_BUCKETS]; /* One mutex per bucket */ |
|
|
|
|
static grpc_timer* g_timer_ht[NUM_HASH_BUCKETS] = {NULL}; |
|
|
|
|
static grpc_timer* g_timer_ht[NUM_HASH_BUCKETS] = {nullptr}; |
|
|
|
|
|
|
|
|
|
static void init_timer_ht() { |
|
|
|
|
for (int i = 0; i < NUM_HASH_BUCKETS; i++) { |
|
|
|
@ -100,7 +104,7 @@ static bool is_in_ht(grpc_timer* t) { |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&g_hash_mu[i]); |
|
|
|
|
grpc_timer* p = g_timer_ht[i]; |
|
|
|
|
while (p != NULL && p != t) { |
|
|
|
|
while (p != nullptr && p != t) { |
|
|
|
|
p = p->hash_table_next; |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&g_hash_mu[i]); |
|
|
|
@ -114,7 +118,7 @@ static void add_to_ht(grpc_timer* t) { |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&g_hash_mu[i]); |
|
|
|
|
grpc_timer* p = g_timer_ht[i]; |
|
|
|
|
while (p != NULL && p != t) { |
|
|
|
|
while (p != nullptr && p != t) { |
|
|
|
|
p = p->hash_table_next; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -142,9 +146,9 @@ static void remove_from_ht(grpc_timer* t) { |
|
|
|
|
if (g_timer_ht[i] == t) { |
|
|
|
|
g_timer_ht[i] = g_timer_ht[i]->hash_table_next; |
|
|
|
|
removed = true; |
|
|
|
|
} else if (g_timer_ht[i] != NULL) { |
|
|
|
|
} else if (g_timer_ht[i] != nullptr) { |
|
|
|
|
grpc_timer* p = g_timer_ht[i]; |
|
|
|
|
while (p->hash_table_next != NULL && p->hash_table_next != t) { |
|
|
|
|
while (p->hash_table_next != nullptr && p->hash_table_next != t) { |
|
|
|
|
p = p->hash_table_next; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -165,7 +169,7 @@ static void remove_from_ht(grpc_timer* t) { |
|
|
|
|
abort(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
t->hash_table_next = NULL; |
|
|
|
|
t->hash_table_next = nullptr; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* If a timer is added to a timer shard (either heap or a list), it cannot
|
|
|
|
@ -238,14 +242,21 @@ static gpr_atm compute_min_deadline(timer_shard* shard) { |
|
|
|
|
void grpc_timer_list_init(grpc_exec_ctx* exec_ctx) { |
|
|
|
|
uint32_t i; |
|
|
|
|
|
|
|
|
|
g_num_shards = GPR_MIN(1, 2 * gpr_cpu_num_cores()); |
|
|
|
|
g_shards = (timer_shard*)gpr_zalloc(g_num_shards * sizeof(*g_shards)); |
|
|
|
|
g_shard_queue = |
|
|
|
|
(timer_shard**)gpr_zalloc(g_num_shards * sizeof(*g_shard_queue)); |
|
|
|
|
|
|
|
|
|
g_shared_mutables.initialized = true; |
|
|
|
|
g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER; |
|
|
|
|
gpr_mu_init(&g_shared_mutables.mu); |
|
|
|
|
g_shared_mutables.min_timer = grpc_exec_ctx_now(exec_ctx); |
|
|
|
|
gpr_tls_init(&g_last_seen_min_timer); |
|
|
|
|
gpr_tls_set(&g_last_seen_min_timer, 0); |
|
|
|
|
grpc_register_tracer(&grpc_timer_trace); |
|
|
|
|
grpc_register_tracer(&grpc_timer_check_trace); |
|
|
|
|
|
|
|
|
|
for (i = 0; i < NUM_SHARDS; i++) { |
|
|
|
|
for (i = 0; i < g_num_shards; i++) { |
|
|
|
|
timer_shard* shard = &g_shards[i]; |
|
|
|
|
gpr_mu_init(&shard->mu); |
|
|
|
|
grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1, |
|
|
|
@ -262,17 +273,19 @@ void grpc_timer_list_init(grpc_exec_ctx* exec_ctx) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_timer_list_shutdown(grpc_exec_ctx* exec_ctx) { |
|
|
|
|
int i; |
|
|
|
|
size_t i; |
|
|
|
|
run_some_expired_timers( |
|
|
|
|
exec_ctx, GPR_ATM_MAX, NULL, |
|
|
|
|
exec_ctx, GPR_ATM_MAX, nullptr, |
|
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown")); |
|
|
|
|
for (i = 0; i < NUM_SHARDS; i++) { |
|
|
|
|
for (i = 0; i < g_num_shards; i++) { |
|
|
|
|
timer_shard* shard = &g_shards[i]; |
|
|
|
|
gpr_mu_destroy(&shard->mu); |
|
|
|
|
grpc_timer_heap_destroy(&shard->heap); |
|
|
|
|
} |
|
|
|
|
gpr_mu_destroy(&g_shared_mutables.mu); |
|
|
|
|
gpr_tls_destroy(&g_last_seen_min_timer); |
|
|
|
|
gpr_free(g_shards); |
|
|
|
|
gpr_free(g_shard_queue); |
|
|
|
|
g_shared_mutables.initialized = false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -306,7 +319,7 @@ static void note_deadline_change(timer_shard* shard) { |
|
|
|
|
g_shard_queue[shard->shard_queue_index - 1]->min_deadline) { |
|
|
|
|
swap_adjacent_shards_in_queue(shard->shard_queue_index - 1); |
|
|
|
|
} |
|
|
|
|
while (shard->shard_queue_index < NUM_SHARDS - 1 && |
|
|
|
|
while (shard->shard_queue_index < g_num_shards - 1 && |
|
|
|
|
shard->min_deadline > |
|
|
|
|
g_shard_queue[shard->shard_queue_index + 1]->min_deadline) { |
|
|
|
|
swap_adjacent_shards_in_queue(shard->shard_queue_index); |
|
|
|
@ -318,15 +331,15 @@ void grpc_timer_init_unset(grpc_timer* timer) { timer->pending = false; } |
|
|
|
|
void grpc_timer_init(grpc_exec_ctx* exec_ctx, grpc_timer* timer, |
|
|
|
|
grpc_millis deadline, grpc_closure* closure) { |
|
|
|
|
int is_first_timer = 0; |
|
|
|
|
timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; |
|
|
|
|
timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)]; |
|
|
|
|
timer->closure = closure; |
|
|
|
|
timer->deadline = deadline; |
|
|
|
|
|
|
|
|
|
#ifndef NDEBUG |
|
|
|
|
timer->hash_table_next = NULL; |
|
|
|
|
timer->hash_table_next = nullptr; |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
if (grpc_timer_trace.enabled()) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_timer_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"TIMER %p: SET %" PRIdPTR " now %" PRIdPTR " call %p[%p]", timer, |
|
|
|
|
deadline, grpc_exec_ctx_now(exec_ctx), closure, closure->cb); |
|
|
|
@ -362,7 +375,7 @@ void grpc_timer_init(grpc_exec_ctx* exec_ctx, grpc_timer* timer, |
|
|
|
|
timer->heap_index = INVALID_HEAP_INDEX; |
|
|
|
|
list_join(&shard->list, timer); |
|
|
|
|
} |
|
|
|
|
if (grpc_timer_trace.enabled()) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_timer_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
" .. add to shard %d with queue_deadline_cap=%" PRIdPTR |
|
|
|
|
" => is_first_timer=%s", |
|
|
|
@ -384,7 +397,7 @@ void grpc_timer_init(grpc_exec_ctx* exec_ctx, grpc_timer* timer, |
|
|
|
|
grpc_timer_check. */ |
|
|
|
|
if (is_first_timer) { |
|
|
|
|
gpr_mu_lock(&g_shared_mutables.mu); |
|
|
|
|
if (grpc_timer_trace.enabled()) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_timer_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRIdPTR, |
|
|
|
|
shard->min_deadline); |
|
|
|
|
} |
|
|
|
@ -412,9 +425,9 @@ void grpc_timer_cancel(grpc_exec_ctx* exec_ctx, grpc_timer* timer) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; |
|
|
|
|
timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)]; |
|
|
|
|
gpr_mu_lock(&shard->mu); |
|
|
|
|
if (grpc_timer_trace.enabled()) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_timer_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer, |
|
|
|
|
timer->pending ? "true" : "false"); |
|
|
|
|
} |
|
|
|
@ -455,7 +468,7 @@ static int refill_heap(timer_shard* shard, gpr_atm now) { |
|
|
|
|
saturating_add(GPR_MAX(now, shard->queue_deadline_cap), |
|
|
|
|
(gpr_atm)(deadline_delta * 1000.0)); |
|
|
|
|
|
|
|
|
|
if (grpc_timer_check_trace.enabled()) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_timer_check_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, " .. shard[%d]->queue_deadline_cap --> %" PRIdPTR, |
|
|
|
|
(int)(shard - g_shards), shard->queue_deadline_cap); |
|
|
|
|
} |
|
|
|
@ -463,7 +476,7 @@ static int refill_heap(timer_shard* shard, gpr_atm now) { |
|
|
|
|
next = timer->next; |
|
|
|
|
|
|
|
|
|
if (timer->deadline < shard->queue_deadline_cap) { |
|
|
|
|
if (grpc_timer_check_trace.enabled()) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_timer_check_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, " .. add timer with deadline %" PRIdPTR " to heap", |
|
|
|
|
timer->deadline); |
|
|
|
|
} |
|
|
|
@ -480,23 +493,23 @@ static int refill_heap(timer_shard* shard, gpr_atm now) { |
|
|
|
|
static grpc_timer* pop_one(timer_shard* shard, gpr_atm now) { |
|
|
|
|
grpc_timer* timer; |
|
|
|
|
for (;;) { |
|
|
|
|
if (grpc_timer_check_trace.enabled()) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_timer_check_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, " .. shard[%d]: heap_empty=%s", |
|
|
|
|
(int)(shard - g_shards), |
|
|
|
|
grpc_timer_heap_is_empty(&shard->heap) ? "true" : "false"); |
|
|
|
|
} |
|
|
|
|
if (grpc_timer_heap_is_empty(&shard->heap)) { |
|
|
|
|
if (now < shard->queue_deadline_cap) return NULL; |
|
|
|
|
if (!refill_heap(shard, now)) return NULL; |
|
|
|
|
if (now < shard->queue_deadline_cap) return nullptr; |
|
|
|
|
if (!refill_heap(shard, now)) return nullptr; |
|
|
|
|
} |
|
|
|
|
timer = grpc_timer_heap_top(&shard->heap); |
|
|
|
|
if (grpc_timer_check_trace.enabled()) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_timer_check_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
" .. check top timer deadline=%" PRIdPTR " now=%" PRIdPTR, |
|
|
|
|
timer->deadline, now); |
|
|
|
|
} |
|
|
|
|
if (timer->deadline > now) return NULL; |
|
|
|
|
if (grpc_timer_trace.enabled()) { |
|
|
|
|
if (timer->deadline > now) return nullptr; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_timer_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "TIMER %p: FIRE %" PRIdPTR "ms late via %s scheduler", |
|
|
|
|
timer, now - timer->deadline, |
|
|
|
|
timer->closure->scheduler->vtable->name); |
|
|
|
@ -521,7 +534,7 @@ static size_t pop_timers(grpc_exec_ctx* exec_ctx, timer_shard* shard, |
|
|
|
|
} |
|
|
|
|
*new_min_deadline = compute_min_deadline(shard); |
|
|
|
|
gpr_mu_unlock(&shard->mu); |
|
|
|
|
if (grpc_timer_check_trace.enabled()) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_timer_check_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, " .. shard[%d] popped %" PRIdPTR, |
|
|
|
|
(int)(shard - g_shards), n); |
|
|
|
|
} |
|
|
|
@ -537,7 +550,7 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx* exec_ctx, |
|
|
|
|
gpr_atm min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer); |
|
|
|
|
gpr_tls_set(&g_last_seen_min_timer, min_timer); |
|
|
|
|
if (now < min_timer) { |
|
|
|
|
if (next != NULL) *next = GPR_MIN(*next, min_timer); |
|
|
|
|
if (next != nullptr) *next = GPR_MIN(*next, min_timer); |
|
|
|
|
return GRPC_TIMERS_CHECKED_AND_EMPTY; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -545,7 +558,7 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx* exec_ctx, |
|
|
|
|
gpr_mu_lock(&g_shared_mutables.mu); |
|
|
|
|
result = GRPC_TIMERS_CHECKED_AND_EMPTY; |
|
|
|
|
|
|
|
|
|
if (grpc_timer_check_trace.enabled()) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_timer_check_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, " .. shard[%d]->min_deadline = %" PRIdPTR, |
|
|
|
|
(int)(g_shard_queue[0] - g_shards), |
|
|
|
|
g_shard_queue[0]->min_deadline); |
|
|
|
@ -563,7 +576,7 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx* exec_ctx, |
|
|
|
|
result = GRPC_TIMERS_FIRED; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (grpc_timer_check_trace.enabled()) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_timer_check_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
" .. result --> %d" |
|
|
|
|
", shard[%d]->min_deadline %" PRIdPTR " --> %" PRIdPTR |
|
|
|
@ -605,10 +618,10 @@ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx* exec_ctx, |
|
|
|
|
mutable cacheline in the common case */ |
|
|
|
|
grpc_millis min_timer = gpr_tls_get(&g_last_seen_min_timer); |
|
|
|
|
if (now < min_timer) { |
|
|
|
|
if (next != NULL) { |
|
|
|
|
if (next != nullptr) { |
|
|
|
|
*next = GPR_MIN(*next, min_timer); |
|
|
|
|
} |
|
|
|
|
if (grpc_timer_check_trace.enabled()) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_timer_check_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"TIMER CHECK SKIP: now=%" PRIdPTR " min_timer=%" PRIdPTR, now, |
|
|
|
|
min_timer); |
|
|
|
@ -622,9 +635,9 @@ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx* exec_ctx, |
|
|
|
|
: GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system"); |
|
|
|
|
|
|
|
|
|
// tracing
|
|
|
|
|
if (grpc_timer_check_trace.enabled()) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_timer_check_trace)) { |
|
|
|
|
char* next_str; |
|
|
|
|
if (next == NULL) { |
|
|
|
|
if (next == nullptr) { |
|
|
|
|
next_str = gpr_strdup("NULL"); |
|
|
|
|
} else { |
|
|
|
|
gpr_asprintf(&next_str, "%" PRIdPTR, *next); |
|
|
|
@ -640,9 +653,9 @@ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx* exec_ctx, |
|
|
|
|
grpc_timer_check_result r = |
|
|
|
|
run_some_expired_timers(exec_ctx, now, next, shutdown_error); |
|
|
|
|
// tracing
|
|
|
|
|
if (grpc_timer_check_trace.enabled()) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_timer_check_trace)) { |
|
|
|
|
char* next_str; |
|
|
|
|
if (next == NULL) { |
|
|
|
|
if (next == nullptr) { |
|
|
|
|
next_str = gpr_strdup("NULL"); |
|
|
|
|
} else { |
|
|
|
|
gpr_asprintf(&next_str, "%" PRIdPTR, *next); |
|
|
|
|