Add comments to timer_generic.c file

pull/11716/head
Sree Kuchibhotla 8 years ago
parent 9796c8a31a
commit bd3ea98caa
  1. 68
      src/core/lib/iomgr/timer_generic.c

@ -44,41 +44,63 @@
grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false); grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false);
grpc_tracer_flag grpc_timer_check_trace = GRPC_TRACER_INITIALIZER(false); grpc_tracer_flag grpc_timer_check_trace = GRPC_TRACER_INITIALIZER(false);
/* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with
* deadlines earlier than 'queue_deadline" cap are maintained in the heap and
* others are maintained in the list (unordered). This helps to keep the number
* of elements in the heap low.
*
* The 'queue_deadline_cap' gets recomputed periodically based on the timer
* stats maintained in 'stats' and the relevant timers are then moved from the
* 'list' to 'heap'
*/
typedef struct { typedef struct {
gpr_mu mu; gpr_mu mu;
grpc_time_averaged_stats stats; grpc_time_averaged_stats stats;
/* All and only timers with deadlines <= this will be in the heap. */ /* All and only timers with deadlines <= this will be in the heap. */
gpr_atm queue_deadline_cap; gpr_atm queue_deadline_cap;
/* The deadline of the next timer due in this shard */
gpr_atm min_deadline; gpr_atm min_deadline;
/* Index in the g_shard_queue */ /* Index of this timer_shard in the g_shard_queue */
uint32_t shard_queue_index; uint32_t shard_queue_index;
/* This holds all timers with deadlines < queue_deadline_cap. Timers in this /* This holds all timers with deadlines < queue_deadline_cap. Timers in this
list have the top bit of their deadline set to 0. */ list have the top bit of their deadline set to 0. */
grpc_timer_heap heap; grpc_timer_heap heap;
/* This holds timers whose deadline is >= queue_deadline_cap. */ /* This holds timers whose deadline is >= queue_deadline_cap. */
grpc_timer list; grpc_timer list;
} shard_type; } timer_shard;
/* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address
* is hashed to select the timer shard to add the timer to */
static timer_shard g_shards[NUM_SHARDS];
/* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
* the deadline of the next timer in each shard).
* Access to this is protected by g_shared_mutables.mu */
static timer_shard *g_shard_queue[NUM_SHARDS];
/* Thread local variable that stores the deadline of the next timer the thread
* has last-seen. This is an optimization to prevent the thread from checking
* shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock,
* an expensive operation) */
GPR_TLS_DECL(g_last_seen_min_timer);
struct shared_mutables { struct shared_mutables {
/* The deadline of the next timer due across all timer shards */
gpr_atm min_timer; gpr_atm min_timer;
/* Allow only one run_some_expired_timers at once */ /* Allow only one run_some_expired_timers at once */
gpr_spinlock checker_mu; gpr_spinlock checker_mu;
bool initialized; bool initialized;
/* Protects g_shard_queue */ /* Protects g_shard_queue (and the shared_mutables struct itself) */
gpr_mu mu; gpr_mu mu;
} GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE); } GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
static struct shared_mutables g_shared_mutables = { static struct shared_mutables g_shared_mutables = {
.checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false, .checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false,
}; };
static gpr_clock_type g_clock_type; static gpr_clock_type g_clock_type;
static shard_type g_shards[NUM_SHARDS];
/* Protected by g_shared_mutables.mu */
static shard_type *g_shard_queue[NUM_SHARDS];
static gpr_timespec g_start_time; static gpr_timespec g_start_time;
GPR_TLS_DECL(g_last_seen_min_timer);
static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { static gpr_atm saturating_add(gpr_atm a, gpr_atm b) {
if (a > GPR_ATM_MAX - b) { if (a > GPR_ATM_MAX - b) {
return GPR_ATM_MAX; return GPR_ATM_MAX;
@ -122,7 +144,7 @@ static gpr_timespec atm_to_timespec(gpr_atm x) {
return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0)); return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0));
} }
static gpr_atm compute_min_deadline(shard_type *shard) { static gpr_atm compute_min_deadline(timer_shard *shard) {
return grpc_timer_heap_is_empty(&shard->heap) return grpc_timer_heap_is_empty(&shard->heap)
? saturating_add(shard->queue_deadline_cap, 1) ? saturating_add(shard->queue_deadline_cap, 1)
: grpc_timer_heap_top(&shard->heap)->deadline; : grpc_timer_heap_top(&shard->heap)->deadline;
@ -142,7 +164,7 @@ void grpc_timer_list_init(gpr_timespec now) {
grpc_register_tracer("timer_check", &grpc_timer_check_trace); grpc_register_tracer("timer_check", &grpc_timer_check_trace);
for (i = 0; i < NUM_SHARDS; i++) { for (i = 0; i < NUM_SHARDS; i++) {
shard_type *shard = &g_shards[i]; timer_shard *shard = &g_shards[i];
gpr_mu_init(&shard->mu); gpr_mu_init(&shard->mu);
grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1, grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
0.5); 0.5);
@ -161,7 +183,7 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {
exec_ctx, GPR_ATM_MAX, NULL, exec_ctx, GPR_ATM_MAX, NULL,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown")); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown"));
for (i = 0; i < NUM_SHARDS; i++) { for (i = 0; i < NUM_SHARDS; i++) {
shard_type *shard = &g_shards[i]; timer_shard *shard = &g_shards[i];
gpr_mu_destroy(&shard->mu); gpr_mu_destroy(&shard->mu);
grpc_timer_heap_destroy(&shard->heap); grpc_timer_heap_destroy(&shard->heap);
} }
@ -187,7 +209,7 @@ static void list_remove(grpc_timer *timer) {
} }
static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) { static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) {
shard_type *temp; timer_shard *temp;
temp = g_shard_queue[first_shard_queue_index]; temp = g_shard_queue[first_shard_queue_index];
g_shard_queue[first_shard_queue_index] = g_shard_queue[first_shard_queue_index] =
g_shard_queue[first_shard_queue_index + 1]; g_shard_queue[first_shard_queue_index + 1];
@ -198,7 +220,7 @@ static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) {
first_shard_queue_index + 1; first_shard_queue_index + 1;
} }
static void note_deadline_change(shard_type *shard) { static void note_deadline_change(timer_shard *shard) {
while (shard->shard_queue_index > 0 && while (shard->shard_queue_index > 0 &&
shard->min_deadline < shard->min_deadline <
g_shard_queue[shard->shard_queue_index - 1]->min_deadline) { g_shard_queue[shard->shard_queue_index - 1]->min_deadline) {
@ -215,7 +237,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_timespec deadline, grpc_closure *closure, gpr_timespec deadline, grpc_closure *closure,
gpr_timespec now) { gpr_timespec now) {
int is_first_timer = 0; int is_first_timer = 0;
shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
GPR_ASSERT(deadline.clock_type == g_clock_type); GPR_ASSERT(deadline.clock_type == g_clock_type);
GPR_ASSERT(now.clock_type == g_clock_type); GPR_ASSERT(now.clock_type == g_clock_type);
timer->closure = closure; timer->closure = closure;
@ -303,7 +325,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
return; return;
} }
shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
gpr_mu_lock(&shard->mu); gpr_mu_lock(&shard->mu);
if (GRPC_TRACER_ON(grpc_timer_trace)) { if (GRPC_TRACER_ON(grpc_timer_trace)) {
gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer, gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer,
@ -321,12 +343,12 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
gpr_mu_unlock(&shard->mu); gpr_mu_unlock(&shard->mu);
} }
/* This is called when the queue is empty and "now" has reached the /* Rebalances the timer shard by computing a new 'queue_deadline_cap' and moving
queue_deadline_cap. We compute a new queue deadline and then scan the map all relevant timers in shard->list (i.e timers with deadlines earlier than
for timers that fall at or under it. Returns true if the queue is no 'queue_deadline_cap') into into shard->heap.
longer empty. Returns 'true' if shard->heap has atleast ONE element
REQUIRES: shard->mu locked */ REQUIRES: shard->mu locked */
static int refill_queue(shard_type *shard, gpr_atm now) { static int refill_heap(timer_shard *shard, gpr_atm now) {
/* Compute the new queue window width and bound by the limits: */ /* Compute the new queue window width and bound by the limits: */
double computed_deadline_delta = double computed_deadline_delta =
grpc_time_averaged_stats_update_average(&shard->stats) * grpc_time_averaged_stats_update_average(&shard->stats) *
@ -363,7 +385,7 @@ static int refill_queue(shard_type *shard, gpr_atm now) {
/* This pops the next non-cancelled timer with deadline <= now from the /* This pops the next non-cancelled timer with deadline <= now from the
queue, or returns NULL if there isn't one. queue, or returns NULL if there isn't one.
REQUIRES: shard->mu locked */ REQUIRES: shard->mu locked */
static grpc_timer *pop_one(shard_type *shard, gpr_atm now) { static grpc_timer *pop_one(timer_shard *shard, gpr_atm now) {
grpc_timer *timer; grpc_timer *timer;
for (;;) { for (;;) {
if (GRPC_TRACER_ON(grpc_timer_check_trace)) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
@ -373,7 +395,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_atm now) {
} }
if (grpc_timer_heap_is_empty(&shard->heap)) { if (grpc_timer_heap_is_empty(&shard->heap)) {
if (now < shard->queue_deadline_cap) return NULL; if (now < shard->queue_deadline_cap) return NULL;
if (!refill_queue(shard, now)) return NULL; if (!refill_heap(shard, now)) return NULL;
} }
timer = grpc_timer_heap_top(&shard->heap); timer = grpc_timer_heap_top(&shard->heap);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
@ -393,7 +415,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_atm now) {
} }
/* REQUIRES: shard->mu unlocked */ /* REQUIRES: shard->mu unlocked */
static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, static size_t pop_timers(grpc_exec_ctx *exec_ctx, timer_shard *shard,
gpr_atm now, gpr_atm *new_min_deadline, gpr_atm now, gpr_atm *new_min_deadline,
grpc_error *error) { grpc_error *error) {
size_t n = 0; size_t n = 0;

Loading…
Cancel
Save