Add tracing, fix off-by-one error

pull/10194/head
Craig Tiller 8 years ago
parent c900063f7c
commit 0b4c531805
  1. 73
      src/core/lib/iomgr/timer_generic.c
  2. 8
      test/core/iomgr/timer_list_test.c

@ -56,8 +56,8 @@
#define MIN_QUEUE_WINDOW_DURATION 0.01 #define MIN_QUEUE_WINDOW_DURATION 0.01
#define MAX_QUEUE_WINDOW_DURATION 1 #define MAX_QUEUE_WINDOW_DURATION 1
static int grpc_timer_trace = 0; int grpc_timer_trace = 0;
static int grpc_timer_check_trace = 0; int grpc_timer_check_trace = 0;
typedef struct { typedef struct {
gpr_mu mu; gpr_mu mu;
@ -136,7 +136,7 @@ static gpr_timespec atm_to_timespec(gpr_atm x) {
static gpr_atm compute_min_deadline(shard_type *shard) { static gpr_atm compute_min_deadline(shard_type *shard) {
return grpc_timer_heap_is_empty(&shard->heap) return grpc_timer_heap_is_empty(&shard->heap)
? shard->queue_deadline_cap ? saturating_add(shard->queue_deadline_cap, 1)
: grpc_timer_heap_top(&shard->heap)->deadline; : grpc_timer_heap_top(&shard->heap)->deadline;
} }
@ -186,10 +186,13 @@ static double ts_to_dbl(gpr_timespec ts) {
return (double)ts.tv_sec + 1e-9 * ts.tv_nsec; return (double)ts.tv_sec + 1e-9 * ts.tv_nsec;
} }
static void list_join(grpc_timer *head, grpc_timer *timer) { /* returns true if the first element in the list */
static bool list_join(grpc_timer *head, grpc_timer *timer) {
bool is_first = head->next == head;
timer->next = head; timer->next = head;
timer->prev = head->prev; timer->prev = head->prev;
timer->next->prev = timer->prev->next = timer; timer->next->prev = timer->prev->next = timer;
return is_first;
} }
static void list_remove(grpc_timer *timer) { static void list_remove(grpc_timer *timer) {
@ -233,8 +236,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
timer->deadline = timespec_to_atm_round_up(deadline); timer->deadline = timespec_to_atm_round_up(deadline);
if (grpc_timer_trace) { if (grpc_timer_trace) {
gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRIdPTR ".%09d [%" PRIdPTR gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRId64 ".%09d [%" PRIdPTR
"] now %" PRIdPTR ".%09d [%" PRIdPTR "] call %p[%p]", "] now %" PRId64 ".%09d [%" PRIdPTR "] call %p[%p]",
timer, deadline.tv_sec, deadline.tv_nsec, timer->deadline, timer, deadline.tv_sec, deadline.tv_nsec, timer->deadline,
now.tv_sec, now.tv_nsec, timespec_to_atm_round_down(now), closure, now.tv_sec, now.tv_nsec, timespec_to_atm_round_down(now), closure,
closure->cb); closure->cb);
@ -264,7 +267,14 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
is_first_timer = grpc_timer_heap_add(&shard->heap, timer); is_first_timer = grpc_timer_heap_add(&shard->heap, timer);
} else { } else {
timer->heap_index = INVALID_HEAP_INDEX; timer->heap_index = INVALID_HEAP_INDEX;
list_join(&shard->list, timer); is_first_timer = list_join(&shard->list, timer) &&
grpc_timer_heap_is_empty(&shard->heap);
}
if (grpc_timer_trace) {
gpr_log(GPR_DEBUG, " .. add to shard %d with queue_deadline_cap=%" PRIdPTR
" => is_first_timer=%s",
(int)(shard - g_shards), shard->queue_deadline_cap,
is_first_timer ? "true" : "false");
} }
gpr_mu_unlock(&shard->mu); gpr_mu_unlock(&shard->mu);
@ -281,6 +291,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
grpc_timer_check. */ grpc_timer_check. */
if (is_first_timer) { if (is_first_timer) {
gpr_mu_lock(&g_shared_mutables.mu); gpr_mu_lock(&g_shared_mutables.mu);
gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRIdPTR,
shard->min_deadline);
if (timer->deadline < shard->min_deadline) { if (timer->deadline < shard->min_deadline) {
gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline; gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline;
shard->min_deadline = timer->deadline; shard->min_deadline = timer->deadline;
@ -338,10 +350,17 @@ static int refill_queue(shard_type *shard, gpr_atm now) {
shard->queue_deadline_cap = shard->queue_deadline_cap =
saturating_add(GPR_MAX(now, shard->queue_deadline_cap), saturating_add(GPR_MAX(now, shard->queue_deadline_cap),
(gpr_atm)(deadline_delta * 1000.0)); (gpr_atm)(deadline_delta * 1000.0));
if (grpc_timer_check_trace) {
gpr_log(GPR_DEBUG, " .. shard[%d]->queue_deadline_cap --> %" PRIdPTR,
(int)(shard - g_shards), shard->queue_deadline_cap);
}
for (timer = shard->list.next; timer != &shard->list; timer = next) { for (timer = shard->list.next; timer != &shard->list; timer = next) {
next = timer->next; next = timer->next;
if (timer->deadline < shard->queue_deadline_cap) { if (timer->deadline < shard->queue_deadline_cap) {
gpr_log(GPR_DEBUG, " .. add timer with deadline %" PRIdPTR " to heap",
timer->deadline);
list_remove(timer); list_remove(timer);
grpc_timer_heap_add(&shard->heap, timer); grpc_timer_heap_add(&shard->heap, timer);
} }
@ -355,12 +374,20 @@ static int refill_queue(shard_type *shard, gpr_atm now) {
static grpc_timer *pop_one(shard_type *shard, gpr_atm now) { static grpc_timer *pop_one(shard_type *shard, gpr_atm now) {
grpc_timer *timer; grpc_timer *timer;
for (;;) { for (;;) {
if (grpc_timer_check_trace) {
gpr_log(GPR_DEBUG, " .. shard[%d]: heap_empty=%s",
(int)(shard - g_shards),
grpc_timer_heap_is_empty(&shard->heap) ? "true" : "false");
}
if (grpc_timer_heap_is_empty(&shard->heap)) { if (grpc_timer_heap_is_empty(&shard->heap)) {
if (now < shard->queue_deadline_cap) return NULL; if (now < shard->queue_deadline_cap) return NULL;
if (!refill_queue(shard, now)) return NULL; if (!refill_queue(shard, now)) return NULL;
} }
timer = grpc_timer_heap_top(&shard->heap); timer = grpc_timer_heap_top(&shard->heap);
if (timer->deadline >= now) return NULL; gpr_log(GPR_DEBUG,
" .. check top timer deadline=%" PRIdPTR " now=%" PRIdPTR,
timer->deadline, now);
if (timer->deadline > now) return NULL;
timer->pending = false; timer->pending = false;
grpc_timer_heap_pop(&shard->heap); grpc_timer_heap_pop(&shard->heap);
return timer; return timer;
@ -405,6 +432,12 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) { if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) {
gpr_mu_lock(&g_shared_mutables.mu); gpr_mu_lock(&g_shared_mutables.mu);
if (grpc_timer_check_trace) {
gpr_log(GPR_DEBUG, " .. shard[%d]->min_deadline = %" PRIdPTR,
(int)(g_shard_queue[0] - g_shards),
g_shard_queue[0]->min_deadline);
}
while (g_shard_queue[0]->min_deadline < now) { while (g_shard_queue[0]->min_deadline < now) {
gpr_atm new_min_deadline; gpr_atm new_min_deadline;
@ -414,6 +447,14 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
n += n +=
pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, error); pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, error);
if (grpc_timer_check_trace) {
gpr_log(GPR_DEBUG,
" .. popped --> %" PRIdPTR
", shard[%d]->min_deadline %" PRIdPTR " --> %" PRIdPTR,
n, (int)(g_shard_queue[0] - g_shards),
g_shard_queue[0]->min_deadline, new_min_deadline);
}
/* An grpc_timer_init() on the shard could intervene here, adding a new /* An grpc_timer_init() on the shard could intervene here, adding a new
timer that is earlier than new_min_deadline. However, timer that is earlier than new_min_deadline. However,
grpc_timer_init() will block on the master_lock before it can call grpc_timer_init() will block on the master_lock before it can call
@ -440,28 +481,30 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next) { gpr_timespec *next) {
// prelude
GPR_ASSERT(now.clock_type == g_clock_type); GPR_ASSERT(now.clock_type == g_clock_type);
gpr_atm now_atm = timespec_to_atm_round_down(now); gpr_atm now_atm = timespec_to_atm_round_down(now);
grpc_error *shutdown_error = grpc_error *shutdown_error =
gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0
? GRPC_ERROR_NONE ? GRPC_ERROR_NONE
<<<<<<< HEAD : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system");
: GRPC_ERROR_CREATE("Shutting down timer system"); // tracing
if (grpc_timer_check_trace) { if (grpc_timer_check_trace) {
char *next_str; char *next_str;
if (next == NULL) { if (next == NULL) {
next_str = gpr_strdup("NULL"); next_str = gpr_strdup("NULL");
} else { } else {
gpr_asprintf(&next_str, "%" PRIdPTR ".%09d [%" PRIdPTR "]", next->tv_sec, gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec,
next->tv_nsec, timespec_to_atm_round_down(*next)); next->tv_nsec, timespec_to_atm_round_down(*next));
} }
gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRIdPTR ".%09d [%" PRIdPTR gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRId64 ".%09d [%" PRIdPTR
"] next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, "] next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR,
now.tv_sec, now.tv_nsec, now_atm, next_str, now.tv_sec, now.tv_nsec, now_atm, next_str,
gpr_tls_get(&g_last_seen_min_timer), gpr_tls_get(&g_last_seen_min_timer),
gpr_atm_no_barrier_load(&g_shared_mutables.min_timer)); gpr_atm_no_barrier_load(&g_shared_mutables.min_timer));
gpr_free(next_str); gpr_free(next_str);
} }
// actual code
bool r; bool r;
gpr_atm next_atm; gpr_atm next_atm;
if (next == NULL) { if (next == NULL) {
@ -471,12 +514,13 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
r = run_some_expired_timers(exec_ctx, now_atm, &next_atm, shutdown_error); r = run_some_expired_timers(exec_ctx, now_atm, &next_atm, shutdown_error);
*next = atm_to_timespec(next_atm); *next = atm_to_timespec(next_atm);
} }
// tracing
if (grpc_timer_check_trace) { if (grpc_timer_check_trace) {
char *next_str; char *next_str;
if (next == NULL) { if (next == NULL) {
next_str = gpr_strdup("NULL"); next_str = gpr_strdup("NULL");
} else { } else {
gpr_asprintf(&next_str, "%" PRIdPTR ".%09d [%" PRIdPTR "]", next->tv_sec, gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec,
next->tv_nsec, next_atm); next->tv_nsec, next_atm);
} }
gpr_log(GPR_DEBUG, "TIMER CHECK END: %d timers triggered; next=%s", r, gpr_log(GPR_DEBUG, "TIMER CHECK END: %d timers triggered; next=%s", r,
@ -484,9 +528,6 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_free(next_str); gpr_free(next_str);
} }
return r > 0; return r > 0;
=======
: GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system"));
>>>>>>> 7e6b7df8d6bbb80c19ae1736e0c35b4eab06c541
} }
#endif /* GRPC_TIMER_USE_GENERIC */ #endif /* GRPC_TIMER_USE_GENERIC */

@ -45,6 +45,9 @@
#define MAX_CB 30 #define MAX_CB 30
extern int grpc_timer_trace;
extern int grpc_timer_check_trace;
static int cb_called[MAX_CB][2]; static int cb_called[MAX_CB][2];
static void cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
@ -60,6 +63,8 @@ static void add_test(void) {
gpr_log(GPR_INFO, "add_test"); gpr_log(GPR_INFO, "add_test");
grpc_timer_list_init(start); grpc_timer_list_init(start);
grpc_timer_trace = 1;
grpc_timer_check_trace = 1;
memset(cb_called, 0, sizeof(cb_called)); memset(cb_called, 0, sizeof(cb_called));
/* 10 ms timers. will expire in the current epoch */ /* 10 ms timers. will expire in the current epoch */
@ -133,6 +138,8 @@ void destruction_test(void) {
gpr_log(GPR_INFO, "destruction_test"); gpr_log(GPR_INFO, "destruction_test");
grpc_timer_list_init(gpr_time_0(GPR_CLOCK_REALTIME)); grpc_timer_list_init(gpr_time_0(GPR_CLOCK_REALTIME));
grpc_timer_trace = 1;
grpc_timer_check_trace = 1;
memset(cb_called, 0, sizeof(cb_called)); memset(cb_called, 0, sizeof(cb_called));
grpc_timer_init( grpc_timer_init(
@ -172,6 +179,7 @@ void destruction_test(void) {
int main(int argc, char **argv) { int main(int argc, char **argv) {
grpc_test_init(argc, argv); grpc_test_init(argc, argv);
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
add_test(); add_test();
destruction_test(); destruction_test();
return 0; return 0;

Loading…
Cancel
Save