From 96d23c3f081915a2e2f60b132941a09d6c3a632d Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Wed, 20 Sep 2017 17:10:37 -0700 Subject: [PATCH] Timer hash table for debug builds --- src/core/lib/iomgr/timer_generic.c | 143 ++++++++++++++++++----------- 1 file changed, 87 insertions(+), 56 deletions(-) diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 5091c29ae8c..699947409bb 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -79,67 +79,121 @@ static timer_shard g_shards[NUM_SHARDS]; * Access to this is protected by g_shared_mutables.mu */ static timer_shard *g_shard_queue[NUM_SHARDS]; +#ifndef NDEBUG + +/* == Hash table for duplicate timer detection == */ + #define NUM_HASH_BUCKETS 1000 -#define NUM_SLOTS_PER_BUCKET 5 -static gpr_mu g_hash_mu; -static grpc_timer *g_timer_hash[1000][5] = {{NULL, NULL}}; +#define NUM_SLOTS_PER_BUCKET 30 -static void init_timer_hash() { gpr_mu_init(&g_hash_mu); } +static gpr_mu g_hash_mu[NUM_HASH_BUCKETS]; /* One mutex per bucket */ +static grpc_timer *g_timer_ht[NUM_HASH_BUCKETS] + [NUM_SLOTS_PER_BUCKET] = {{NULL, NULL}}; -static bool is_timer_present(grpc_timer *t) { +static void init_timer_ht() { + for (int i = 0; i < NUM_HASH_BUCKETS; i++) { + gpr_mu_init(&g_hash_mu[i]); + } +} + +static bool is_in_ht(grpc_timer *t) { size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS); bool is_found = false; - gpr_mu_lock(&g_hash_mu); + gpr_mu_lock(&g_hash_mu[i]); for (int j = 0; j < NUM_SLOTS_PER_BUCKET; j++) { - if (g_timer_hash[i][j] == t) { + if (g_timer_ht[i][j] == t) { is_found = true; break; } } - gpr_mu_unlock(&g_hash_mu); + gpr_mu_unlock(&g_hash_mu[i]); return is_found; } -static void check_and_add_timer(grpc_timer *t) { +static void add_to_ht(grpc_timer *t) { size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS); bool added = false; - gpr_mu_lock(&g_hash_mu); + gpr_mu_lock(&g_hash_mu[i]); for (int j = 0; j < NUM_SLOTS_PER_BUCKET; j++) { - if (g_timer_hash[i][j] == NULL) { - g_timer_hash[i][j] = t; + if (g_timer_ht[i][j] == NULL) { + g_timer_ht[i][j] = t; added = true; break; - } else if (g_timer_hash[i][j] == t) { - gpr_log(GPR_ERROR, "*** DUPLICATE TIMER BEING ADDED (%p) **", (void *)t); + } else if (g_timer_ht[i][j] == t) { + grpc_closure *c = t->closure; + gpr_log(GPR_ERROR, + "** Duplicate timer (%p) being added. Closure: (%p), created at: " + "(%s:%d), scheduled at: (%s:%d) **", + t, c, c->file_created, c->line_created, c->file_initiated, + c->line_initiated); abort(); } } - gpr_mu_unlock(&g_hash_mu); + + gpr_mu_unlock(&g_hash_mu[i]); if (!added) { - gpr_log(GPR_ERROR, "** NOT ENOUGH BUCKETS **"); + gpr_log(GPR_ERROR, + "** Not enough slots in the timer hash table. Please increase " + "NUM_SLOTS_PER_BUCKET **"); abort(); } } -static void remove_timer(grpc_timer *t) { +static void remove_from_ht(grpc_timer *t) { size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS); bool removed = false; - gpr_mu_lock(&g_hash_mu); + gpr_mu_lock(&g_hash_mu[i]); for (int j = 0; j < NUM_SLOTS_PER_BUCKET; j++) { - if (g_timer_hash[i][j] == t) { - g_timer_hash[i][j] = 0; + if (g_timer_ht[i][j] == t) { + g_timer_ht[i][j] = 0; removed = true; break; } } - gpr_mu_unlock(&g_hash_mu); + gpr_mu_unlock(&g_hash_mu[i]); if (!removed) { - gpr_log(GPR_ERROR, "*** Unable to remove %p. BUG! **", (void *)t); + grpc_closure *c = t->closure; + gpr_log(GPR_ERROR, + "** Removing timer (%p) that is not added to hash table. Closure " + "(%p), created at: (%s:%d), scheduled at: (%s:%d) **", + t, c, c->file_created, c->line_created, c->file_initiated, + c->line_initiated); abort(); } } +/* If a timer is added to a timer shard (either heap or a list), it cannot + * be pending. A timer is added to hash table only-if it is added to the + * timer shard. + * Therefore, if timer->pending is false, it cannot be in hash table */ +static void validate_non_pending_timer(grpc_timer *t) { + if (!t->pending && is_in_ht(t)) { + grpc_closure *c = t->closure; + gpr_log(GPR_ERROR, + "** gpr_timer_cancel() called on a non-pending timer (%p) which " + "is in the hash table. Closure: (%p), created at: (%s:%d), " + "scheduled at: (%s:%d) **", + t, c, c->file_created, c->line_created, c->file_initiated, + c->line_initiated); + abort(); + } +} + +#define INIT_TIMER_HASH_TABLE() init_timer_ht() +#define ADD_TO_HASH_TABLE(t) add_to_ht((t)) +#define REMOVE_FROM_HASH_TABLE(t) remove_from_ht((t)) +#define VALIDATE_NON_PENDING_TIMER(t) validate_non_pending_timer((t)) + +#else + +#define INIT_TIMER_HASH_TABLE() +#define ADD_TO_HASH_TABLE(t) +#define REMOVE_FROM_HASH_TABLE(t) +#define VALIDATE_NON_PENDING_TIMER(t) + +#endif + /* Thread local variable that stores the deadline of the next timer the thread * has last-seen. This is an optimization to prevent the thread from checking * shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock, @@ -237,7 +291,7 @@ void grpc_timer_list_init(gpr_timespec now) { g_shard_queue[i] = shard; } - init_timer_hash(); + INIT_TIMER_HASH_TABLE(); } void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { @@ -309,9 +363,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_atm deadline_atm = timer->deadline = timespec_to_atm_round_up(deadline); if (GRPC_TRACER_ON(grpc_timer_trace)) { - gpr_log(GPR_DEBUG, - "TIMER %p: SET %" PRId64 ".%09d [%" PRIdPTR "] now %" PRId64 - ".%09d [%" PRIdPTR "] call %p[%p]", + gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRId64 ".%09d [%" PRIdPTR + "] now %" PRId64 ".%09d [%" PRIdPTR "] call %p[%p]", timer, deadline.tv_sec, deadline.tv_nsec, deadline_atm, now.tv_sec, now.tv_nsec, timespec_to_atm_round_down(now), closure, closure->cb); } @@ -337,8 +390,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, grpc_time_averaged_stats_add_sample(&shard->stats, ts_to_dbl(gpr_time_sub(deadline, now))); - /** TODO: sreek - CHECK HERE AND ADD **/ - check_and_add_timer(timer); + ADD_TO_HASH_TABLE(timer); if (deadline_atm < shard->queue_deadline_cap) { is_first_timer = grpc_timer_heap_add(&shard->heap, timer); @@ -347,9 +399,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, list_join(&shard->list, timer); } if (GRPC_TRACER_ON(grpc_timer_trace)) { - gpr_log(GPR_DEBUG, - " .. add to shard %d with queue_deadline_cap=%" PRIdPTR - " => is_first_timer=%s", + gpr_log(GPR_DEBUG, " .. add to shard %d with queue_deadline_cap=%" PRIdPTR + " => is_first_timer=%s", (int)(shard - g_shards), shard->queue_deadline_cap, is_first_timer ? "true" : "false"); } @@ -404,8 +455,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { } if (timer->pending) { - /* TODO: sreek - Remove the timer here */ - remove_timer(timer); + REMOVE_FROM_HASH_TABLE(timer); GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); timer->pending = false; @@ -415,12 +465,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { grpc_timer_heap_remove(&shard->heap, timer); } } else { - if (is_timer_present(timer)) { - gpr_log(GPR_ERROR, - "** gpr_timer_cancel called on a non-pending timer! %p", - (void *)timer); - abort(); - } + VALIDATE_NON_PENDING_TIMER(timer); } gpr_mu_unlock(&shard->mu); } @@ -452,18 +497,6 @@ static int refill_heap(timer_shard *shard, gpr_atm now) { for (timer = shard->list.next; timer != &shard->list; timer = next) { next = timer->next; -#ifndef NDEBUG - if (next == timer && next != &shard->list) { - grpc_closure *c = timer->closure; - gpr_log(GPR_ERROR, - "We have a problem!!!! - timer %p closure: %p, created-at: " - "[%s,%d], scheduled-at: [%s, %d]", - timer, c, c->file_initiated, c->line_created, c->file_initiated, - c->line_initiated); - abort(); - } -#endif - if (timer->deadline < shard->queue_deadline_cap) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, " .. add timer with deadline %" PRIdPTR " to heap", @@ -516,8 +549,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, timer_shard *shard, grpc_timer *timer; gpr_mu_lock(&shard->mu); while ((timer = pop_one(shard, now))) { - /* TODO: sreek: Remove timer here */ - remove_timer(timer); + REMOVE_FROM_HASH_TABLE(timer); GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_REF(error)); n++; } @@ -630,9 +662,8 @@ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, next->tv_nsec, timespec_to_atm_round_down(*next)); } - gpr_log(GPR_DEBUG, - "TIMER CHECK BEGIN: now=%" PRId64 ".%09d [%" PRIdPTR - "] next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, + gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRId64 ".%09d [%" PRIdPTR + "] next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, now.tv_sec, now.tv_nsec, now_atm, next_str, gpr_tls_get(&g_last_seen_min_timer), gpr_atm_no_barrier_load(&g_shared_mutables.min_timer));