|
|
|
@ -31,10 +31,10 @@ |
|
|
|
|
* |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
#include "src/core/iomgr/alarm.h" |
|
|
|
|
#include "src/core/iomgr/timer.h" |
|
|
|
|
|
|
|
|
|
#include "src/core/iomgr/alarm_heap.h" |
|
|
|
|
#include "src/core/iomgr/alarm_internal.h" |
|
|
|
|
#include "src/core/iomgr/timer_heap.h" |
|
|
|
|
#include "src/core/iomgr/timer_internal.h" |
|
|
|
|
#include "src/core/iomgr/time_averaged_stats.h" |
|
|
|
|
#include <grpc/support/log.h> |
|
|
|
|
#include <grpc/support/sync.h> |
|
|
|
@ -51,37 +51,37 @@ |
|
|
|
|
typedef struct { |
|
|
|
|
gpr_mu mu; |
|
|
|
|
grpc_time_averaged_stats stats; |
|
|
|
|
/* All and only alarms with deadlines <= this will be in the heap. */ |
|
|
|
|
/* All and only timers with deadlines <= this will be in the heap. */ |
|
|
|
|
gpr_timespec queue_deadline_cap; |
|
|
|
|
gpr_timespec min_deadline; |
|
|
|
|
/* Index in the g_shard_queue */ |
|
|
|
|
gpr_uint32 shard_queue_index; |
|
|
|
|
/* This holds all alarms with deadlines < queue_deadline_cap. Alarms in this
|
|
|
|
|
/* This holds all timers with deadlines < queue_deadline_cap. Timers in this
|
|
|
|
|
list have the top bit of their deadline set to 0. */ |
|
|
|
|
grpc_alarm_heap heap; |
|
|
|
|
/* This holds alarms whose deadline is >= queue_deadline_cap. */ |
|
|
|
|
grpc_alarm list; |
|
|
|
|
grpc_timer_heap heap; |
|
|
|
|
/* This holds timers whose deadline is >= queue_deadline_cap. */ |
|
|
|
|
grpc_timer list; |
|
|
|
|
} shard_type; |
|
|
|
|
|
|
|
|
|
/* Protects g_shard_queue */ |
|
|
|
|
static gpr_mu g_mu; |
|
|
|
|
/* Allow only one run_some_expired_alarms at once */ |
|
|
|
|
/* Allow only one run_some_expired_timers at once */ |
|
|
|
|
static gpr_mu g_checker_mu; |
|
|
|
|
static gpr_clock_type g_clock_type; |
|
|
|
|
static shard_type g_shards[NUM_SHARDS]; |
|
|
|
|
/* Protected by g_mu */ |
|
|
|
|
static shard_type *g_shard_queue[NUM_SHARDS]; |
|
|
|
|
|
|
|
|
|
static int run_some_expired_alarms(grpc_exec_ctx *exec_ctx, gpr_timespec now, |
|
|
|
|
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, |
|
|
|
|
gpr_timespec *next, int success); |
|
|
|
|
|
|
|
|
|
static gpr_timespec compute_min_deadline(shard_type *shard) { |
|
|
|
|
return grpc_alarm_heap_is_empty(&shard->heap) |
|
|
|
|
return grpc_timer_heap_is_empty(&shard->heap) |
|
|
|
|
? shard->queue_deadline_cap |
|
|
|
|
: grpc_alarm_heap_top(&shard->heap)->deadline; |
|
|
|
|
: grpc_timer_heap_top(&shard->heap)->deadline; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_alarm_list_init(gpr_timespec now) { |
|
|
|
|
void grpc_timer_list_init(gpr_timespec now) { |
|
|
|
|
gpr_uint32 i; |
|
|
|
|
|
|
|
|
|
gpr_mu_init(&g_mu); |
|
|
|
@ -95,27 +95,27 @@ void grpc_alarm_list_init(gpr_timespec now) { |
|
|
|
|
0.5); |
|
|
|
|
shard->queue_deadline_cap = now; |
|
|
|
|
shard->shard_queue_index = i; |
|
|
|
|
grpc_alarm_heap_init(&shard->heap); |
|
|
|
|
grpc_timer_heap_init(&shard->heap); |
|
|
|
|
shard->list.next = shard->list.prev = &shard->list; |
|
|
|
|
shard->min_deadline = compute_min_deadline(shard); |
|
|
|
|
g_shard_queue[i] = shard; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_alarm_list_shutdown(grpc_exec_ctx *exec_ctx) { |
|
|
|
|
void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { |
|
|
|
|
int i; |
|
|
|
|
run_some_expired_alarms(exec_ctx, gpr_inf_future(g_clock_type), NULL, 0); |
|
|
|
|
run_some_expired_timers(exec_ctx, gpr_inf_future(g_clock_type), NULL, 0); |
|
|
|
|
for (i = 0; i < NUM_SHARDS; i++) { |
|
|
|
|
shard_type *shard = &g_shards[i]; |
|
|
|
|
gpr_mu_destroy(&shard->mu); |
|
|
|
|
grpc_alarm_heap_destroy(&shard->heap); |
|
|
|
|
grpc_timer_heap_destroy(&shard->heap); |
|
|
|
|
} |
|
|
|
|
gpr_mu_destroy(&g_mu); |
|
|
|
|
gpr_mu_destroy(&g_checker_mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* This is a cheap, but good enough, pointer hash for sharding the tasks: */ |
|
|
|
|
static size_t shard_idx(const grpc_alarm *info) { |
|
|
|
|
static size_t shard_idx(const grpc_timer *info) { |
|
|
|
|
size_t x = (size_t)info; |
|
|
|
|
return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (NUM_SHARDS - 1); |
|
|
|
|
} |
|
|
|
@ -132,15 +132,15 @@ static gpr_timespec dbl_to_ts(double d) { |
|
|
|
|
return ts; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void list_join(grpc_alarm *head, grpc_alarm *alarm) { |
|
|
|
|
alarm->next = head; |
|
|
|
|
alarm->prev = head->prev; |
|
|
|
|
alarm->next->prev = alarm->prev->next = alarm; |
|
|
|
|
static void list_join(grpc_timer *head, grpc_timer *timer) { |
|
|
|
|
timer->next = head; |
|
|
|
|
timer->prev = head->prev; |
|
|
|
|
timer->next->prev = timer->prev->next = timer; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void list_remove(grpc_alarm *alarm) { |
|
|
|
|
alarm->next->prev = alarm->prev; |
|
|
|
|
alarm->prev->next = alarm->next; |
|
|
|
|
static void list_remove(grpc_timer *timer) { |
|
|
|
|
timer->next->prev = timer->prev; |
|
|
|
|
timer->prev->next = timer->next; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void swap_adjacent_shards_in_queue(gpr_uint32 first_shard_queue_index) { |
|
|
|
@ -170,16 +170,16 @@ static void note_deadline_change(shard_type *shard) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_alarm_init(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm, |
|
|
|
|
gpr_timespec deadline, grpc_iomgr_cb_func alarm_cb, |
|
|
|
|
void *alarm_cb_arg, gpr_timespec now) { |
|
|
|
|
int is_first_alarm = 0; |
|
|
|
|
shard_type *shard = &g_shards[shard_idx(alarm)]; |
|
|
|
|
void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, |
|
|
|
|
gpr_timespec deadline, grpc_iomgr_cb_func timer_cb, |
|
|
|
|
void *timer_cb_arg, gpr_timespec now) { |
|
|
|
|
int is_first_timer = 0; |
|
|
|
|
shard_type *shard = &g_shards[shard_idx(timer)]; |
|
|
|
|
GPR_ASSERT(deadline.clock_type == g_clock_type); |
|
|
|
|
GPR_ASSERT(now.clock_type == g_clock_type); |
|
|
|
|
grpc_closure_init(&alarm->closure, alarm_cb, alarm_cb_arg); |
|
|
|
|
alarm->deadline = deadline; |
|
|
|
|
alarm->triggered = 0; |
|
|
|
|
grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg); |
|
|
|
|
timer->deadline = deadline; |
|
|
|
|
timer->triggered = 0; |
|
|
|
|
|
|
|
|
|
/* TODO(ctiller): check deadline expired */ |
|
|
|
|
|
|
|
|
@ -187,25 +187,25 @@ void grpc_alarm_init(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm, |
|
|
|
|
grpc_time_averaged_stats_add_sample(&shard->stats, |
|
|
|
|
ts_to_dbl(gpr_time_sub(deadline, now))); |
|
|
|
|
if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) { |
|
|
|
|
is_first_alarm = grpc_alarm_heap_add(&shard->heap, alarm); |
|
|
|
|
is_first_timer = grpc_timer_heap_add(&shard->heap, timer); |
|
|
|
|
} else { |
|
|
|
|
alarm->heap_index = INVALID_HEAP_INDEX; |
|
|
|
|
list_join(&shard->list, alarm); |
|
|
|
|
timer->heap_index = INVALID_HEAP_INDEX; |
|
|
|
|
list_join(&shard->list, timer); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&shard->mu); |
|
|
|
|
|
|
|
|
|
/* Deadline may have decreased, we need to adjust the master queue. Note
|
|
|
|
|
that there is a potential racy unlocked region here. There could be a |
|
|
|
|
reordering of multiple grpc_alarm_init calls, at this point, but the < test |
|
|
|
|
reordering of multiple grpc_timer_init calls, at this point, but the < test |
|
|
|
|
below should ensure that we err on the side of caution. There could |
|
|
|
|
also be a race with grpc_alarm_check, which might beat us to the lock. In |
|
|
|
|
that case, it is possible that the alarm that we added will have already |
|
|
|
|
also be a race with grpc_timer_check, which might beat us to the lock. In |
|
|
|
|
that case, it is possible that the timer that we added will have already |
|
|
|
|
run by the time we hold the lock, but that too is a safe error. |
|
|
|
|
Finally, it's possible that the grpc_alarm_check that intervened failed to |
|
|
|
|
trigger the new alarm because the min_deadline hadn't yet been reduced. |
|
|
|
|
In that case, the alarm will simply have to wait for the next |
|
|
|
|
grpc_alarm_check. */ |
|
|
|
|
if (is_first_alarm) { |
|
|
|
|
Finally, it's possible that the grpc_timer_check that intervened failed to |
|
|
|
|
trigger the new timer because the min_deadline hadn't yet been reduced. |
|
|
|
|
In that case, the timer will simply have to wait for the next |
|
|
|
|
grpc_timer_check. */ |
|
|
|
|
if (is_first_timer) { |
|
|
|
|
gpr_mu_lock(&g_mu); |
|
|
|
|
if (gpr_time_cmp(deadline, shard->min_deadline) < 0) { |
|
|
|
|
gpr_timespec old_min_deadline = g_shard_queue[0]->min_deadline; |
|
|
|
@ -220,16 +220,16 @@ void grpc_alarm_init(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_alarm_cancel(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm) { |
|
|
|
|
shard_type *shard = &g_shards[shard_idx(alarm)]; |
|
|
|
|
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { |
|
|
|
|
shard_type *shard = &g_shards[shard_idx(timer)]; |
|
|
|
|
gpr_mu_lock(&shard->mu); |
|
|
|
|
if (!alarm->triggered) { |
|
|
|
|
grpc_exec_ctx_enqueue(exec_ctx, &alarm->closure, 0); |
|
|
|
|
alarm->triggered = 1; |
|
|
|
|
if (alarm->heap_index == INVALID_HEAP_INDEX) { |
|
|
|
|
list_remove(alarm); |
|
|
|
|
if (!timer->triggered) { |
|
|
|
|
grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, 0); |
|
|
|
|
timer->triggered = 1; |
|
|
|
|
if (timer->heap_index == INVALID_HEAP_INDEX) { |
|
|
|
|
list_remove(timer); |
|
|
|
|
} else { |
|
|
|
|
grpc_alarm_heap_remove(&shard->heap, alarm); |
|
|
|
|
grpc_timer_heap_remove(&shard->heap, timer); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&shard->mu); |
|
|
|
@ -237,7 +237,7 @@ void grpc_alarm_cancel(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm) { |
|
|
|
|
|
|
|
|
|
/* This is called when the queue is empty and "now" has reached the
|
|
|
|
|
queue_deadline_cap. We compute a new queue deadline and then scan the map |
|
|
|
|
for alarms that fall at or under it. Returns true if the queue is no |
|
|
|
|
for timers that fall at or under it. Returns true if the queue is no |
|
|
|
|
longer empty. |
|
|
|
|
REQUIRES: shard->mu locked */ |
|
|
|
|
static int refill_queue(shard_type *shard, gpr_timespec now) { |
|
|
|
@ -248,49 +248,49 @@ static int refill_queue(shard_type *shard, gpr_timespec now) { |
|
|
|
|
double deadline_delta = |
|
|
|
|
GPR_CLAMP(computed_deadline_delta, MIN_QUEUE_WINDOW_DURATION, |
|
|
|
|
MAX_QUEUE_WINDOW_DURATION); |
|
|
|
|
grpc_alarm *alarm, *next; |
|
|
|
|
grpc_timer *timer, *next; |
|
|
|
|
|
|
|
|
|
/* Compute the new cap and put all alarms under it into the queue: */ |
|
|
|
|
/* Compute the new cap and put all timers under it into the queue: */ |
|
|
|
|
shard->queue_deadline_cap = gpr_time_add( |
|
|
|
|
gpr_time_max(now, shard->queue_deadline_cap), dbl_to_ts(deadline_delta)); |
|
|
|
|
for (alarm = shard->list.next; alarm != &shard->list; alarm = next) { |
|
|
|
|
next = alarm->next; |
|
|
|
|
for (timer = shard->list.next; timer != &shard->list; timer = next) { |
|
|
|
|
next = timer->next; |
|
|
|
|
|
|
|
|
|
if (gpr_time_cmp(alarm->deadline, shard->queue_deadline_cap) < 0) { |
|
|
|
|
list_remove(alarm); |
|
|
|
|
grpc_alarm_heap_add(&shard->heap, alarm); |
|
|
|
|
if (gpr_time_cmp(timer->deadline, shard->queue_deadline_cap) < 0) { |
|
|
|
|
list_remove(timer); |
|
|
|
|
grpc_timer_heap_add(&shard->heap, timer); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return !grpc_alarm_heap_is_empty(&shard->heap); |
|
|
|
|
return !grpc_timer_heap_is_empty(&shard->heap); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* This pops the next non-cancelled alarm with deadline <= now from the queue,
|
|
|
|
|
/* This pops the next non-cancelled timer with deadline <= now from the queue,
|
|
|
|
|
or returns NULL if there isn't one. |
|
|
|
|
REQUIRES: shard->mu locked */ |
|
|
|
|
static grpc_alarm *pop_one(shard_type *shard, gpr_timespec now) { |
|
|
|
|
grpc_alarm *alarm; |
|
|
|
|
static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) { |
|
|
|
|
grpc_timer *timer; |
|
|
|
|
for (;;) { |
|
|
|
|
if (grpc_alarm_heap_is_empty(&shard->heap)) { |
|
|
|
|
if (grpc_timer_heap_is_empty(&shard->heap)) { |
|
|
|
|
if (gpr_time_cmp(now, shard->queue_deadline_cap) < 0) return NULL; |
|
|
|
|
if (!refill_queue(shard, now)) return NULL; |
|
|
|
|
} |
|
|
|
|
alarm = grpc_alarm_heap_top(&shard->heap); |
|
|
|
|
if (gpr_time_cmp(alarm->deadline, now) > 0) return NULL; |
|
|
|
|
alarm->triggered = 1; |
|
|
|
|
grpc_alarm_heap_pop(&shard->heap); |
|
|
|
|
return alarm; |
|
|
|
|
timer = grpc_timer_heap_top(&shard->heap); |
|
|
|
|
if (gpr_time_cmp(timer->deadline, now) > 0) return NULL; |
|
|
|
|
timer->triggered = 1; |
|
|
|
|
grpc_timer_heap_pop(&shard->heap); |
|
|
|
|
return timer; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* REQUIRES: shard->mu unlocked */ |
|
|
|
|
static size_t pop_alarms(grpc_exec_ctx *exec_ctx, shard_type *shard, |
|
|
|
|
static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, |
|
|
|
|
gpr_timespec now, gpr_timespec *new_min_deadline, |
|
|
|
|
int success) { |
|
|
|
|
size_t n = 0; |
|
|
|
|
grpc_alarm *alarm; |
|
|
|
|
grpc_timer *timer; |
|
|
|
|
gpr_mu_lock(&shard->mu); |
|
|
|
|
while ((alarm = pop_one(shard, now))) { |
|
|
|
|
grpc_exec_ctx_enqueue(exec_ctx, &alarm->closure, success); |
|
|
|
|
while ((timer = pop_one(shard, now))) { |
|
|
|
|
grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, success); |
|
|
|
|
n++; |
|
|
|
|
} |
|
|
|
|
*new_min_deadline = compute_min_deadline(shard); |
|
|
|
@ -298,11 +298,11 @@ static size_t pop_alarms(grpc_exec_ctx *exec_ctx, shard_type *shard, |
|
|
|
|
return n; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int run_some_expired_alarms(grpc_exec_ctx *exec_ctx, gpr_timespec now, |
|
|
|
|
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, |
|
|
|
|
gpr_timespec *next, int success) { |
|
|
|
|
size_t n = 0; |
|
|
|
|
|
|
|
|
|
/* TODO(ctiller): verify that there are any alarms (atomically) here */ |
|
|
|
|
/* TODO(ctiller): verify that there are any timers (atomically) here */ |
|
|
|
|
|
|
|
|
|
if (gpr_mu_trylock(&g_checker_mu)) { |
|
|
|
|
gpr_mu_lock(&g_mu); |
|
|
|
@ -310,16 +310,16 @@ static int run_some_expired_alarms(grpc_exec_ctx *exec_ctx, gpr_timespec now, |
|
|
|
|
while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) { |
|
|
|
|
gpr_timespec new_min_deadline; |
|
|
|
|
|
|
|
|
|
/* For efficiency, we pop as many available alarms as we can from the
|
|
|
|
|
shard. This may violate perfect alarm deadline ordering, but that |
|
|
|
|
/* For efficiency, we pop as many available timers as we can from the
|
|
|
|
|
shard. This may violate perfect timer deadline ordering, but that |
|
|
|
|
shouldn't be a big deal because we don't make ordering guarantees. */ |
|
|
|
|
n += pop_alarms(exec_ctx, g_shard_queue[0], now, &new_min_deadline, |
|
|
|
|
n += pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, |
|
|
|
|
success); |
|
|
|
|
|
|
|
|
|
/* An grpc_alarm_init() on the shard could intervene here, adding a new
|
|
|
|
|
alarm that is earlier than new_min_deadline. However, |
|
|
|
|
grpc_alarm_init() will block on the master_lock before it can call |
|
|
|
|
set_min_deadline, so this one will complete first and then the AddAlarm |
|
|
|
|
/* An grpc_timer_init() on the shard could intervene here, adding a new
|
|
|
|
|
timer that is earlier than new_min_deadline. However, |
|
|
|
|
grpc_timer_init() will block on the master_lock before it can call |
|
|
|
|
set_min_deadline, so this one will complete first and then the Addtimer |
|
|
|
|
will reduce the min_deadline (perhaps unnecessarily). */ |
|
|
|
|
g_shard_queue[0]->min_deadline = new_min_deadline; |
|
|
|
|
note_deadline_change(g_shard_queue[0]); |
|
|
|
@ -336,15 +336,15 @@ static int run_some_expired_alarms(grpc_exec_ctx *exec_ctx, gpr_timespec now, |
|
|
|
|
return (int)n; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int grpc_alarm_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, |
|
|
|
|
int grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, |
|
|
|
|
gpr_timespec *next) { |
|
|
|
|
GPR_ASSERT(now.clock_type == g_clock_type); |
|
|
|
|
return run_some_expired_alarms( |
|
|
|
|
return run_some_expired_timers( |
|
|
|
|
exec_ctx, now, next, |
|
|
|
|
gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_timespec grpc_alarm_list_next_timeout(void) { |
|
|
|
|
gpr_timespec grpc_timer_list_next_timeout(void) { |
|
|
|
|
gpr_timespec out; |
|
|
|
|
gpr_mu_lock(&g_mu); |
|
|
|
|
out = g_shard_queue[0]->min_deadline; |