Port [] alarm management to GRPC.

This change implements a platform independent alarm manager in alarm.c.
It's integrated with iomgr, and some tests are cleaned up.

The alarm implementation itself is a fairly direct port of LazyAlarmList from eventmanager.
SpinLock has been replaced for now with gpr_mu, and other atomic operations have been dropped (again, for now).
A majority of tests have been ported.
	Change on 2014/12/19 by ctiller <ctiller@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=82551363
pull/1/merge
ctiller 10 years ago committed by Jan Tattermusch
parent 1a809c0ebb
commit 3bf466fb6c
  1. 80
      Makefile
  2. 24
      build.json
  3. 3
      include/grpc/support/time.h
  4. 6
      src/core/channel/client_setup.c
  5. 353
      src/core/iomgr/alarm.c
  6. 32
      src/core/iomgr/alarm.h
  7. 148
      src/core/iomgr/alarm_heap.c
  8. 57
      src/core/iomgr/alarm_heap.h
  9. 50
      src/core/iomgr/alarm_internal.h
  10. 115
      src/core/iomgr/iomgr_libevent.c
  11. 8
      src/core/iomgr/iomgr_libevent.h
  12. 8
      src/core/support/time.c
  13. 2
      test/core/end2end/no_server_test.c
  14. 277
      test/core/iomgr/alarm_heap_test.c
  15. 144
      test/core/iomgr/alarm_list_test.c
  16. 29
      test/core/iomgr/alarm_test.c

File diff suppressed because one or more lines are too long

@ -1325,6 +1325,30 @@
"gpr"
]
},
{
"name": "alarm_list_test",
"build": "test",
"src": [
"test/core/iomgr/alarm_list_test.c"
],
"deps": [
"grpc_test_util",
"grpc",
"gpr"
]
},
{
"name": "alarm_heap_test",
"build": "test",
"src": [
"test/core/iomgr/alarm_heap_test.c"
],
"deps": [
"grpc_test_util",
"grpc",
"gpr"
]
},
{
"name": "time_test",
"build": "test",

@ -78,6 +78,9 @@ gpr_timespec gpr_now(void);
respectively. */
int gpr_time_cmp(gpr_timespec a, gpr_timespec b);
gpr_timespec gpr_time_max(gpr_timespec a, gpr_timespec b);
gpr_timespec gpr_time_min(gpr_timespec a, gpr_timespec b);
/* Add and subtract times. Calculations saturate at infinities. */
gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b);
gpr_timespec gpr_time_sub(gpr_timespec a, gpr_timespec b);

@ -108,6 +108,7 @@ static void setup_initiate(grpc_transport_setup *sp) {
not to initiate again) */
static void setup_cancel(grpc_transport_setup *sp) {
grpc_client_setup *s = (grpc_client_setup *)sp;
int cancel_alarm = 0;
gpr_mu_lock(&s->mu);
@ -115,7 +116,7 @@ static void setup_cancel(grpc_transport_setup *sp) {
/* effectively cancels the current request (if any) */
s->active_request = NULL;
if (s->in_alarm) {
grpc_alarm_cancel(&s->backoff_alarm);
cancel_alarm = 1;
}
if (--s->refs == 0) {
gpr_mu_unlock(&s->mu);
@ -123,6 +124,9 @@ static void setup_cancel(grpc_transport_setup *sp) {
} else {
gpr_mu_unlock(&s->mu);
}
if (cancel_alarm) {
grpc_alarm_cancel(&s->backoff_alarm);
}
}
/* vtable for transport setup */

@ -0,0 +1,353 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/iomgr/alarm.h"
#include "src/core/iomgr/alarm_heap.h"
#include "src/core/iomgr/alarm_internal.h"
#include "src/core/iomgr/time_averaged_stats.h"
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
#define INVALID_HEAP_INDEX 0xffffffffu
#define LOG2_NUM_SHARDS 5
#define NUM_SHARDS (1 << LOG2_NUM_SHARDS)
#define MAX_ALARMS_PER_CHECK 128
#define ADD_DEADLINE_SCALE 0.33
#define MIN_QUEUE_WINDOW_DURATION 0.01
#define MAX_QUEUE_WINDOW_DURATION 1
typedef struct {
gpr_mu mu;
grpc_time_averaged_stats stats;
/* All and only alarms with deadlines <= this will be in the heap. */
gpr_timespec queue_deadline_cap;
gpr_timespec min_deadline;
/* Index in the g_shard_queue */
gpr_uint32 shard_queue_index;
/* This holds all alarms with deadlines < queue_deadline_cap. Alarms in this
list have the top bit of their deadline set to 0. */
grpc_alarm_heap heap;
/* This holds alarms whose deadline is >= queue_deadline_cap. */
grpc_alarm list;
} shard_type;
/* Protects g_shard_queue */
static gpr_mu g_mu;
/* Allow only one run_some_expired_alarms at once */
static gpr_mu g_checker_mu;
static shard_type g_shards[NUM_SHARDS];
/* Protected by g_mu */
static shard_type *g_shard_queue[NUM_SHARDS];
static int run_some_expired_alarms(gpr_timespec now,
grpc_iomgr_cb_status status);
static gpr_timespec compute_min_deadline(shard_type *shard) {
return grpc_alarm_heap_is_empty(&shard->heap)
? shard->queue_deadline_cap
: grpc_alarm_heap_top(&shard->heap)->deadline;
}
void grpc_alarm_list_init(gpr_timespec now) {
int i;
gpr_mu_init(&g_mu);
gpr_mu_init(&g_checker_mu);
for (i = 0; i < NUM_SHARDS; i++) {
shard_type *shard = &g_shards[i];
gpr_mu_init(&shard->mu);
grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
0.5);
shard->queue_deadline_cap = now;
shard->shard_queue_index = i;
grpc_alarm_heap_init(&shard->heap);
shard->list.next = shard->list.prev = &shard->list;
shard->min_deadline = compute_min_deadline(shard);
g_shard_queue[i] = shard;
}
}
void grpc_alarm_list_shutdown() {
int i;
while (run_some_expired_alarms(gpr_inf_future, GRPC_CALLBACK_CANCELLED))
;
for (i = 0; i < NUM_SHARDS; i++) {
shard_type *shard = &g_shards[i];
gpr_mu_destroy(&shard->mu);
grpc_alarm_heap_destroy(&shard->heap);
}
gpr_mu_destroy(&g_mu);
gpr_mu_destroy(&g_checker_mu);
}
/* This is a cheap, but good enough, pointer hash for sharding the tasks: */
static size_t shard_idx(const grpc_alarm *info) {
size_t x = (size_t)info;
return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (NUM_SHARDS - 1);
}
static double ts_to_dbl(gpr_timespec ts) {
return ts.tv_sec + 1e-9 * ts.tv_nsec;
}
static gpr_timespec dbl_to_ts(double d) {
gpr_timespec ts;
ts.tv_sec = d;
ts.tv_nsec = 1e9 * (d - ts.tv_sec);
return ts;
}
static void list_join(grpc_alarm *head, grpc_alarm *alarm) {
alarm->next = head;
alarm->prev = head->prev;
alarm->next->prev = alarm->prev->next = alarm;
}
static void list_remove(grpc_alarm *alarm) {
alarm->next->prev = alarm->prev;
alarm->prev->next = alarm->next;
}
static void swap_adjacent_shards_in_queue(size_t first_shard_queue_index) {
shard_type *temp;
temp = g_shard_queue[first_shard_queue_index];
g_shard_queue[first_shard_queue_index] =
g_shard_queue[first_shard_queue_index + 1];
g_shard_queue[first_shard_queue_index + 1] = temp;
g_shard_queue[first_shard_queue_index]->shard_queue_index =
first_shard_queue_index;
g_shard_queue[first_shard_queue_index + 1]->shard_queue_index =
first_shard_queue_index + 1;
}
static void note_deadline_change(shard_type *shard) {
while (shard->shard_queue_index > 0 &&
gpr_time_cmp(
shard->min_deadline,
g_shard_queue[shard->shard_queue_index - 1]->min_deadline) < 0) {
swap_adjacent_shards_in_queue(shard->shard_queue_index - 1);
}
while (shard->shard_queue_index < NUM_SHARDS - 1 &&
gpr_time_cmp(
shard->min_deadline,
g_shard_queue[shard->shard_queue_index + 1]->min_deadline) > 0) {
swap_adjacent_shards_in_queue(shard->shard_queue_index);
}
}
void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg,
gpr_timespec now) {
int is_first_alarm = 0;
shard_type *shard = &g_shards[shard_idx(alarm)];
alarm->cb = alarm_cb;
alarm->cb_arg = alarm_cb_arg;
alarm->deadline = deadline;
alarm->triggered = 0;
/* TODO(ctiller): check deadline expired */
gpr_mu_lock(&shard->mu);
grpc_time_averaged_stats_add_sample(&shard->stats,
ts_to_dbl(gpr_time_sub(deadline, now)));
if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) {
is_first_alarm = grpc_alarm_heap_add(&shard->heap, alarm);
} else {
alarm->heap_index = INVALID_HEAP_INDEX;
list_join(&shard->list, alarm);
}
gpr_mu_unlock(&shard->mu);
/* Deadline may have decreased, we need to adjust the master queue. Note
that there is a potential racy unlocked region here. There could be a
reordering of multiple grpc_alarm_init calls, at this point, but the < test
below should ensure that we err on the side of caution. There could
also be a race with grpc_alarm_check, which might beat us to the lock. In
that case, it is possible that the alarm that we added will have already
run by the time we hold the lock, but that too is a safe error.
Finally, it's possible that the grpc_alarm_check that intervened failed to
trigger the new alarm because the min_deadline hadn't yet been reduced.
In that case, the alarm will simply have to wait for the next
grpc_alarm_check. */
if (is_first_alarm) {
gpr_mu_lock(&g_mu);
if (gpr_time_cmp(deadline, shard->min_deadline) < 0) {
gpr_timespec old_min_deadline = g_shard_queue[0]->min_deadline;
shard->min_deadline = deadline;
note_deadline_change(shard);
if (shard->shard_queue_index == 0 &&
gpr_time_cmp(deadline, old_min_deadline) < 0) {
grpc_kick_poller();
}
}
gpr_mu_unlock(&g_mu);
}
}
void grpc_alarm_cancel(grpc_alarm *alarm) {
shard_type *shard = &g_shards[shard_idx(alarm)];
int triggered = 0;
gpr_mu_lock(&shard->mu);
if (!alarm->triggered) {
triggered = 1;
alarm->triggered = 1;
if (alarm->heap_index == INVALID_HEAP_INDEX) {
list_remove(alarm);
} else {
grpc_alarm_heap_remove(&shard->heap, alarm);
}
}
gpr_mu_unlock(&shard->mu);
if (triggered) {
alarm->cb(alarm->cb_arg, GRPC_CALLBACK_CANCELLED);
}
}
/* This is called when the queue is empty and "now" has reached the
queue_deadline_cap. We compute a new queue deadline and then scan the map
for alarms that fall at or under it. Returns true if the queue is no
longer empty.
REQUIRES: shard->mu locked */
static int refill_queue(shard_type *shard, gpr_timespec now) {
/* Compute the new queue window width and bound by the limits: */
double computed_deadline_delta =
grpc_time_averaged_stats_update_average(&shard->stats) *
ADD_DEADLINE_SCALE;
double deadline_delta =
GPR_CLAMP(computed_deadline_delta, MIN_QUEUE_WINDOW_DURATION,
MAX_QUEUE_WINDOW_DURATION);
grpc_alarm *alarm, *next;
/* Compute the new cap and put all alarms under it into the queue: */
shard->queue_deadline_cap = gpr_time_add(
gpr_time_max(now, shard->queue_deadline_cap), dbl_to_ts(deadline_delta));
for (alarm = shard->list.next; alarm != &shard->list; alarm = next) {
next = alarm->next;
if (gpr_time_cmp(alarm->deadline, shard->queue_deadline_cap) < 0) {
list_remove(alarm);
grpc_alarm_heap_add(&shard->heap, alarm);
}
}
return !grpc_alarm_heap_is_empty(&shard->heap);
}
/* This pops the next non-cancelled alarm with deadline <= now from the queue,
or returns NULL if there isn't one.
REQUIRES: shard->mu locked */
static grpc_alarm *pop_one(shard_type *shard, gpr_timespec now) {
grpc_alarm *alarm;
for (;;) {
if (grpc_alarm_heap_is_empty(&shard->heap)) {
if (gpr_time_cmp(now, shard->queue_deadline_cap) < 0) return NULL;
if (!refill_queue(shard, now)) return NULL;
}
alarm = grpc_alarm_heap_top(&shard->heap);
if (gpr_time_cmp(alarm->deadline, now) > 0) return NULL;
alarm->triggered = 1;
grpc_alarm_heap_pop(&shard->heap);
return alarm;
}
}
/* REQUIRES: shard->mu unlocked */
static size_t pop_alarms(shard_type *shard, gpr_timespec now,
grpc_alarm **alarms, size_t max_alarms,
gpr_timespec *new_min_deadline) {
size_t n = 0;
grpc_alarm *alarm;
gpr_mu_lock(&shard->mu);
while (n < max_alarms && (alarm = pop_one(shard, now))) {
alarms[n++] = alarm;
}
*new_min_deadline = compute_min_deadline(shard);
gpr_mu_unlock(&shard->mu);
return n;
}
static int run_some_expired_alarms(gpr_timespec now,
grpc_iomgr_cb_status status) {
size_t n = 0;
size_t i;
grpc_alarm *alarms[MAX_ALARMS_PER_CHECK];
/* TODO(ctiller): verify that there are any alarms (atomically) here */
if (gpr_mu_trylock(&g_checker_mu)) {
gpr_mu_lock(&g_mu);
while (n < MAX_ALARMS_PER_CHECK &&
gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) {
gpr_timespec new_min_deadline;
/* For efficiency, we pop as many available alarms as we can from the
shard. This may violate perfect alarm deadline ordering, but that
shouldn't be a big deal because we don't make ordering guarantees. */
n += pop_alarms(g_shard_queue[0], now, alarms + n,
MAX_ALARMS_PER_CHECK - n, &new_min_deadline);
/* An grpc_alarm_init() on the shard could intervene here, adding a new
alarm that is earlier than new_min_deadline. However,
grpc_alarm_init() will block on the master_lock before it can call
set_min_deadline, so this one will complete first and then the AddAlarm
will reduce the min_deadline (perhaps unnecessarily). */
g_shard_queue[0]->min_deadline = new_min_deadline;
note_deadline_change(g_shard_queue[0]);
}
gpr_mu_unlock(&g_mu);
gpr_mu_unlock(&g_checker_mu);
}
for (i = 0; i < n; i++) {
alarms[i]->cb(alarms[i]->cb_arg, status);
}
return n;
}
int grpc_alarm_check(gpr_timespec now) {
return run_some_expired_alarms(now, GRPC_CALLBACK_SUCCESS);
}
gpr_timespec grpc_alarm_list_next_timeout() {
gpr_timespec out;
gpr_mu_lock(&g_mu);
out = g_shard_queue[0]->min_deadline;
gpr_mu_unlock(&g_mu);
return out;
}

@ -38,23 +38,25 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/time.h>
typedef struct grpc_alarm grpc_alarm;
/* One of the following headers should provide struct grpc_alarm */
#ifdef GPR_LIBEVENT
#include "src/core/iomgr/iomgr_libevent.h"
#endif
typedef struct grpc_alarm {
gpr_timespec deadline;
gpr_uint32 heap_index; /* INVALID_HEAP_INDEX if not in heap */
struct grpc_alarm *next;
struct grpc_alarm *prev;
int triggered;
grpc_iomgr_cb_func cb;
void *cb_arg;
} grpc_alarm;
/* Initialize *alarm. When expired or canceled, alarm_cb will be called with
*alarm_cb_arg and status to indicate if it expired (SUCCESS) or was
canceled (CANCELLED). alarm_cb is guaranteed to be called exactly once,
and application code should check the status to determine how it was
invoked. The application callback is also responsible for maintaining
information about when to free up any user-level state.
Returns 1 on success, 0 on failure. */
int grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg,
gpr_timespec now);
information about when to free up any user-level state. */
void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg,
gpr_timespec now);
/* Note that there is no alarm destroy function. This is because the
alarm is a one-time occurrence with a guarantee that the callback will
@ -75,7 +77,13 @@ int grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
exactly once from either the cancellation (with status CANCELLED)
or from the activation (with status SUCCESS)
Note carefully that the callback function MAY occur in the same callstack
as grpc_alarm_cancel. It's expected that most alarms will be cancelled (their
primary use is to implement deadlines), and so this code is optimized such
that cancellation costs as little as possible. Making callbacks run inline
matches this aim.
Requires: cancel() must happen after add() on a given alarm */
int grpc_alarm_cancel(grpc_alarm *alarm);
void grpc_alarm_cancel(grpc_alarm *alarm);
#endif /* __GRPC_INTERNAL_IOMGR_ALARM_H__ */

@ -0,0 +1,148 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/iomgr/alarm_heap.h"
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/useful.h>
/* Adjusts a heap so as to move a hole at position i closer to the root,
until a suitable position is found for element t. Then, copies t into that
position. This functor is called each time immediately after modifying a
value in the underlying container, with the offset of the modified element as
its argument. */
static void adjust_upwards(grpc_alarm **first, int i, grpc_alarm *t) {
while (i > 0) {
int parent = (i - 1) / 2;
if (gpr_time_cmp(first[parent]->deadline, t->deadline) >= 0) break;
first[i] = first[parent];
first[i]->heap_index = i;
i = parent;
}
first[i] = t;
t->heap_index = i;
}
/* Adjusts a heap so as to move a hole at position i farther away from the root,
until a suitable position is found for element t. Then, copies t into that
position. */
static void adjust_downwards(grpc_alarm **first, int i, int length,
grpc_alarm *t) {
for (;;) {
int left_child = 1 + 2 * i;
int right_child;
int next_i;
if (left_child >= length) break;
right_child = left_child + 1;
next_i = right_child < length &&
gpr_time_cmp(first[left_child]->deadline,
first[right_child]->deadline) < 0
? right_child
: left_child;
if (gpr_time_cmp(t->deadline, first[next_i]->deadline) >= 0) break;
first[i] = first[next_i];
first[i]->heap_index = i;
i = next_i;
}
first[i] = t;
t->heap_index = i;
}
#define SHRINK_MIN_ELEMS 8
#define SHRINK_FULLNESS_FACTOR 2
static void maybe_shrink(grpc_alarm_heap *heap) {
if (heap->alarm_count >= 8 &&
heap->alarm_count <= heap->alarm_capacity / SHRINK_FULLNESS_FACTOR / 2) {
heap->alarm_capacity = heap->alarm_count * SHRINK_FULLNESS_FACTOR;
heap->alarms =
gpr_realloc(heap->alarms, heap->alarm_capacity * sizeof(grpc_alarm *));
}
}
static void note_changed_priority(grpc_alarm_heap *heap, grpc_alarm *alarm) {
int i = alarm->heap_index;
int parent = (i - 1) / 2;
if (gpr_time_cmp(heap->alarms[parent]->deadline, alarm->deadline) < 0) {
adjust_upwards(heap->alarms, i, alarm);
} else {
adjust_downwards(heap->alarms, i, heap->alarm_count, alarm);
}
}
void grpc_alarm_heap_init(grpc_alarm_heap *heap) {
memset(heap, 0, sizeof(*heap));
}
void grpc_alarm_heap_destroy(grpc_alarm_heap *heap) { gpr_free(heap->alarms); }
int grpc_alarm_heap_add(grpc_alarm_heap *heap, grpc_alarm *alarm) {
if (heap->alarm_count == heap->alarm_capacity) {
heap->alarm_capacity =
GPR_MAX(heap->alarm_capacity + 1, heap->alarm_capacity * 3 / 2);
heap->alarms =
gpr_realloc(heap->alarms, heap->alarm_capacity * sizeof(grpc_alarm *));
}
alarm->heap_index = heap->alarm_count;
adjust_upwards(heap->alarms, heap->alarm_count, alarm);
heap->alarm_count++;
return alarm->heap_index == 0;
}
void grpc_alarm_heap_remove(grpc_alarm_heap *heap, grpc_alarm *alarm) {
int i = alarm->heap_index;
if (i == heap->alarm_count - 1) {
heap->alarm_count--;
maybe_shrink(heap);
return;
}
heap->alarms[i] = heap->alarms[heap->alarm_count - 1];
heap->alarms[i]->heap_index = i;
heap->alarm_count--;
maybe_shrink(heap);
note_changed_priority(heap, heap->alarms[i]);
}
int grpc_alarm_heap_is_empty(grpc_alarm_heap *heap) {
return heap->alarm_count == 0;
}
grpc_alarm *grpc_alarm_heap_top(grpc_alarm_heap *heap) {
return heap->alarms[0];
}
void grpc_alarm_heap_pop(grpc_alarm_heap *heap) {
grpc_alarm_heap_remove(heap, grpc_alarm_heap_top(heap));
}

@ -0,0 +1,57 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef __GRPC_INTERNAL_IOMGR_ALARM_HEAP_H_
#define __GRPC_INTERNAL_IOMGR_ALARM_HEAP_H_
#include "src/core/iomgr/alarm.h"
typedef struct {
grpc_alarm **alarms;
int alarm_count;
int alarm_capacity;
} grpc_alarm_heap;
/* return 1 if the new alarm is the first alarm in the heap */
int grpc_alarm_heap_add(grpc_alarm_heap *heap, grpc_alarm *alarm);
void grpc_alarm_heap_init(grpc_alarm_heap *heap);
void grpc_alarm_heap_destroy(grpc_alarm_heap *heap);
void grpc_alarm_heap_remove(grpc_alarm_heap *heap, grpc_alarm *alarm);
grpc_alarm *grpc_alarm_heap_top(grpc_alarm_heap *heap);
void grpc_alarm_heap_pop(grpc_alarm_heap *heap);
int grpc_alarm_heap_is_empty(grpc_alarm_heap *heap);
#endif /* __GRPC_INTERNAL_IOMGR_ALARM_HEAP_H_ */

@ -0,0 +1,50 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_
#define __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_
/* iomgr internal api for dealing with alarms */
int grpc_alarm_check(gpr_timespec now);
void grpc_alarm_list_init(gpr_timespec now);
void grpc_alarm_list_shutdown();
gpr_timespec grpc_alarm_list_next_timeout();
/* the following must be implemented by each iomgr implementation */
void grpc_kick_poller();
#endif /* __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_ */

@ -37,6 +37,7 @@
#include <fcntl.h>
#include "src/core/iomgr/alarm.h"
#include "src/core/iomgr/alarm_internal.h"
#include <grpc/support/atm.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -131,6 +132,10 @@ static void maybe_free_fds() {
}
}
/* TODO(ctiller): this is racy. In non-libevent implementations, use a pipe
or eventfd */
void grpc_kick_poller() { event_base_loopbreak(g_event_base); }
/* Spend some time doing polling and libevent maintenance work if no other
thread is. This includes both polling for events and destroying/closing file
descriptor objects.
@ -162,8 +167,20 @@ static int maybe_do_polling_work(struct timeval delay) {
return 1;
}
static int maybe_do_alarm_work(gpr_timespec now, gpr_timespec next) {
int r = 0;
if (gpr_time_cmp(next, now) < 0) {
gpr_mu_unlock(&grpc_iomgr_mu);
r = grpc_alarm_check(now);
gpr_mu_lock(&grpc_iomgr_mu);
}
return r;
}
int grpc_iomgr_work(gpr_timespec deadline) {
gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
gpr_timespec now = gpr_now();
gpr_timespec next = grpc_alarm_list_next_timeout();
gpr_timespec delay_timespec = gpr_time_sub(deadline, now);
/* poll for no longer than one second */
gpr_timespec max_delay = {1, 0};
struct timeval delay;
@ -178,7 +195,8 @@ int grpc_iomgr_work(gpr_timespec deadline) {
delay = gpr_timeval_from_timespec(delay_timespec);
if (maybe_do_queue_work() || maybe_do_polling_work(delay)) {
if (maybe_do_queue_work() || maybe_do_alarm_work(now, next) ||
maybe_do_polling_work(delay)) {
g_last_poll_completed = gpr_now();
return 1;
}
@ -189,7 +207,7 @@ int grpc_iomgr_work(gpr_timespec deadline) {
static void backup_poller_thread(void *p) {
int backup_poller_engaged = 0;
/* allow no pollers for 100 milliseconds, then engage backup polling */
gpr_timespec allow_no_pollers = gpr_time_from_micros(100 * 1000);
gpr_timespec allow_no_pollers = gpr_time_from_millis(100);
gpr_mu_lock(&grpc_iomgr_mu);
while (!g_shutdown_backup_poller) {
@ -203,8 +221,13 @@ static void backup_poller_thread(void *p) {
backup_poller_engaged = 1;
}
if (!maybe_do_queue_work()) {
struct timeval tv = {1, 0};
maybe_do_polling_work(tv);
gpr_timespec next = grpc_alarm_list_next_timeout();
if (!maybe_do_alarm_work(now, next)) {
gpr_timespec deadline =
gpr_time_min(next, gpr_time_add(now, gpr_time_from_seconds(1)));
maybe_do_polling_work(
gpr_timeval_from_timespec(gpr_time_sub(deadline, now)));
}
}
} else {
if (backup_poller_engaged) {
@ -236,6 +259,8 @@ void grpc_iomgr_init() {
abort();
}
grpc_alarm_list_init(gpr_now());
gpr_mu_init(&grpc_iomgr_mu);
gpr_cv_init(&grpc_iomgr_cv);
g_activation_queue = NULL;
@ -295,6 +320,8 @@ void grpc_iomgr_shutdown() {
gpr_event_wait(&g_backup_poller_done, gpr_inf_future);
grpc_alarm_list_shutdown();
/* drain pending work */
gpr_mu_lock(&grpc_iomgr_mu);
while (maybe_do_queue_work())
@ -331,84 +358,6 @@ static void add_task(grpc_libevent_activation_data *adata) {
gpr_mu_unlock(&grpc_iomgr_mu);
}
/* ===============grpc_alarm implementation==================== */
/* The following function frees up the alarm's libevent structure and
should always be invoked just before calling the alarm's callback */
static void alarm_ev_destroy(grpc_alarm *alarm) {
grpc_libevent_activation_data *adata =
&alarm->task.activation[GRPC_EM_TA_ONLY];
if (adata->ev != NULL) {
/* TODO(klempner): Is this safe to do when we're cancelling? */
event_free(adata->ev);
adata->ev = NULL;
}
}
/* Proxy callback triggered by alarm->ev to call alarm->cb */
static void libevent_alarm_cb(int fd, short what, void *arg /*=alarm*/) {
grpc_alarm *alarm = arg;
grpc_libevent_activation_data *adata =
&alarm->task.activation[GRPC_EM_TA_ONLY];
int trigger_old;
/* First check if this alarm has been canceled, atomically */
trigger_old =
gpr_atm_full_fetch_add(&alarm->triggered, ALARM_TRIGGER_INCREMENT);
if (trigger_old == ALARM_TRIGGER_INIT) {
/* Before invoking user callback, destroy the libevent structure */
alarm_ev_destroy(alarm);
adata->status = GRPC_CALLBACK_SUCCESS;
add_task(adata);
}
}
int grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg,
gpr_timespec now) {
grpc_libevent_activation_data *adata =
&alarm->task.activation[GRPC_EM_TA_ONLY];
gpr_timespec delay_timespec = gpr_time_sub(deadline, now);
struct timeval delay = gpr_timeval_from_timespec(delay_timespec);
alarm->task.type = GRPC_EM_TASK_ALARM;
gpr_atm_rel_store(&alarm->triggered, ALARM_TRIGGER_INIT);
adata->cb = alarm_cb;
adata->arg = alarm_cb_arg;
adata->prev = NULL;
adata->next = NULL;
adata->ev = evtimer_new(g_event_base, libevent_alarm_cb, alarm);
/* Set the trigger field to untriggered. Do this as the last store since
it is a release of previous stores. */
gpr_atm_rel_store(&alarm->triggered, ALARM_TRIGGER_INIT);
return adata->ev != NULL && evtimer_add(adata->ev, &delay) == 0;
}
int grpc_alarm_cancel(grpc_alarm *alarm) {
grpc_libevent_activation_data *adata =
&alarm->task.activation[GRPC_EM_TA_ONLY];
int trigger_old;
/* First check if this alarm has been triggered, atomically */
trigger_old =
gpr_atm_full_fetch_add(&alarm->triggered, ALARM_TRIGGER_INCREMENT);
if (trigger_old == ALARM_TRIGGER_INIT) {
/* We need to make sure that we only invoke the callback if it hasn't
already been invoked */
/* First remove this event from libevent. This returns success even if the
event has gone active or invoked its callback. */
if (evtimer_del(adata->ev) != 0) {
/* The delete was unsuccessful for some reason. */
gpr_log(GPR_ERROR, "Attempt to delete alarm event was unsuccessful");
return 0;
}
/* Free up the event structure before invoking callback */
alarm_ev_destroy(alarm);
adata->status = GRPC_CALLBACK_CANCELLED;
add_task(adata);
}
return 1;
}
static void grpc_fd_impl_destroy(grpc_fd *impl) {
grpc_em_task_activity_type type;
grpc_libevent_activation_data *adata;

@ -201,14 +201,6 @@ struct grpc_fd {
void *on_done_user_data;
};
/* gRPC alarm handle.
The handle is used to add an alarm which expires after specified timeout. */
struct grpc_alarm {
grpc_libevent_task task; /* Include the base class */
gpr_atm triggered; /* To be used atomically if alarm triggered */
};
void grpc_iomgr_ref_address_resolution(int delta);
#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ */

@ -47,6 +47,14 @@ int gpr_time_cmp(gpr_timespec a, gpr_timespec b) {
return cmp;
}
gpr_timespec gpr_time_min(gpr_timespec a, gpr_timespec b) {
return gpr_time_cmp(a, b) < 0 ? a : b;
}
gpr_timespec gpr_time_max(gpr_timespec a, gpr_timespec b) {
return gpr_time_cmp(a, b) > 0 ? a : b;
}
/* There's no standard TIME_T_MIN and TIME_T_MAX, so we construct them. The
following assumes that signed types are two's-complement and that bytes are
8 bits. */

@ -41,7 +41,7 @@ static void *tag(gpr_intptr i) { return (void *)i; }
int main(int argc, char **argv) {
grpc_channel *chan;
grpc_call *call;
gpr_timespec timeout = gpr_time_from_micros(4000000);
gpr_timespec timeout = gpr_time_from_seconds(4);
gpr_timespec deadline = gpr_time_add(gpr_now(), timeout);
grpc_completion_queue *cq;
cq_verifier *cqv;

@ -0,0 +1,277 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/iomgr/alarm_heap.h"
#include <stdlib.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "test/core/util/test_config.h"
static gpr_timespec random_deadline() {
gpr_timespec ts;
ts.tv_sec = rand();
ts.tv_nsec = rand();
return ts;
}
static grpc_alarm *create_test_elements(int num_elements) {
grpc_alarm *elems = gpr_malloc(num_elements * sizeof(grpc_alarm));
int i;
for (i = 0; i < num_elements; i++) {
elems[i].deadline = random_deadline();
}
return elems;
}
static int cmp_elem(const void *a, const void *b) {
int i = *(const int *)a;
int j = *(const int *)b;
return i - j;
}
static int *all_top(grpc_alarm_heap *pq, int *n) {
int *vec = NULL;
int *need_to_check_children;
int num_need_to_check_children = 0;
*n = 0;
if (pq->alarm_count == 0) return vec;
need_to_check_children = gpr_malloc(pq->alarm_count * sizeof(int));
need_to_check_children[num_need_to_check_children++] = 0;
vec = gpr_malloc(pq->alarm_count * sizeof(int));
while (num_need_to_check_children > 0) {
int ind = need_to_check_children[0];
int leftchild, rightchild;
num_need_to_check_children--;
memmove(need_to_check_children, need_to_check_children + 1,
num_need_to_check_children * sizeof(int));
vec[(*n)++] = ind;
leftchild = 1 + 2 * ind;
if (leftchild < pq->alarm_count) {
if (gpr_time_cmp(pq->alarms[leftchild]->deadline,
pq->alarms[ind]->deadline) >= 0) {
need_to_check_children[num_need_to_check_children++] = leftchild;
}
rightchild = leftchild + 1;
if (rightchild < pq->alarm_count &&
gpr_time_cmp(pq->alarms[rightchild]->deadline,
pq->alarms[ind]->deadline) >= 0) {
need_to_check_children[num_need_to_check_children++] = rightchild;
}
}
}
gpr_free(need_to_check_children);
return vec;
}
static void check_pq_top(grpc_alarm *elements, grpc_alarm_heap *pq,
gpr_uint8 *inpq, int num_elements) {
gpr_timespec max_deadline = gpr_inf_past;
int *max_deadline_indices = gpr_malloc(num_elements * sizeof(int));
int *top_elements;
int num_max_deadline_indices = 0;
int num_top_elements;
int i;
for (i = 0; i < num_elements; ++i) {
if (inpq[i] && gpr_time_cmp(elements[i].deadline, max_deadline) >= 0) {
if (gpr_time_cmp(elements[i].deadline, max_deadline) > 0) {
num_max_deadline_indices = 0;
max_deadline = elements[i].deadline;
}
max_deadline_indices[num_max_deadline_indices++] = elements[i].heap_index;
}
}
qsort(max_deadline_indices, num_max_deadline_indices, sizeof(int), cmp_elem);
top_elements = all_top(pq, &num_top_elements);
GPR_ASSERT(num_top_elements == num_max_deadline_indices);
for (i = 0; i < num_top_elements; i++) {
GPR_ASSERT(max_deadline_indices[i] == top_elements[i]);
}
gpr_free(max_deadline_indices);
gpr_free(top_elements);
}
static int contains(grpc_alarm_heap *pq, grpc_alarm *el) {
int i;
for (i = 0; i < pq->alarm_count; i++) {
if (pq->alarms[i] == el) return 1;
}
return 0;
}
static void check_valid(grpc_alarm_heap *pq) {
int i;
for (i = 0; i < pq->alarm_count; ++i) {
int left_child = 1 + 2 * i;
int right_child = left_child + 1;
if (left_child < pq->alarm_count) {
GPR_ASSERT(gpr_time_cmp(pq->alarms[i]->deadline,
pq->alarms[left_child]->deadline) >= 0);
}
if (right_child < pq->alarm_count) {
GPR_ASSERT(gpr_time_cmp(pq->alarms[i]->deadline,
pq->alarms[right_child]->deadline) >= 0);
}
}
}
static void test1() {
grpc_alarm_heap pq;
const int num_test_elements = 200;
const int num_test_operations = 10000;
int i;
grpc_alarm *test_elements = create_test_elements(num_test_elements);
gpr_uint8 *inpq = gpr_malloc(num_test_elements);
grpc_alarm_heap_init(&pq);
memset(inpq, 0, num_test_elements);
GPR_ASSERT(grpc_alarm_heap_is_empty(&pq));
check_valid(&pq);
for (i = 0; i < num_test_elements; ++i) {
GPR_ASSERT(!contains(&pq, &test_elements[i]));
grpc_alarm_heap_add(&pq, &test_elements[i]);
check_valid(&pq);
GPR_ASSERT(contains(&pq, &test_elements[i]));
inpq[i] = 1;
check_pq_top(test_elements, &pq, inpq, num_test_elements);
}
for (i = 0; i < num_test_elements; ++i) {
/* Test that check still succeeds even for element that wasn't just
inserted. */
GPR_ASSERT(contains(&pq, &test_elements[i]));
}
GPR_ASSERT(pq.alarm_count == num_test_elements);
check_pq_top(test_elements, &pq, inpq, num_test_elements);
for (i = 0; i < num_test_operations; ++i) {
int elem_num = rand() % num_test_elements;
grpc_alarm *el = &test_elements[elem_num];
if (!inpq[elem_num]) { /* not in pq */
GPR_ASSERT(!contains(&pq, el));
el->deadline = random_deadline();
grpc_alarm_heap_add(&pq, el);
GPR_ASSERT(contains(&pq, el));
inpq[elem_num] = 1;
check_pq_top(test_elements, &pq, inpq, num_test_elements);
check_valid(&pq);
} else {
GPR_ASSERT(contains(&pq, el));
grpc_alarm_heap_remove(&pq, el);
GPR_ASSERT(!contains(&pq, el));
inpq[elem_num] = 0;
check_pq_top(test_elements, &pq, inpq, num_test_elements);
check_valid(&pq);
}
}
grpc_alarm_heap_destroy(&pq);
gpr_free(test_elements);
gpr_free(inpq);
}
static void shrink_test() {
grpc_alarm_heap pq;
int i;
int expected_size;
/* A large random number to allow for multiple shrinkages, at least 512. */
const int num_elements = rand() % 2000 + 512;
grpc_alarm_heap_init(&pq);
/* Create a priority queue with many elements. Make sure the Size() is
correct. */
for (i = 0; i < num_elements; ++i) {
GPR_ASSERT(i == pq.alarm_count);
grpc_alarm_heap_add(&pq, create_test_elements(1));
}
GPR_ASSERT(num_elements == pq.alarm_count);
/* Remove elements until the Size is 1/4 the original size. */
while (pq.alarm_count > num_elements / 4) {
grpc_alarm *const te = pq.alarms[pq.alarm_count - 1];
grpc_alarm_heap_remove(&pq, te);
gpr_free(te);
}
GPR_ASSERT(num_elements / 4 == pq.alarm_count);
/* Expect that Capacity is in the right range:
Size * 2 <= Capacity <= Size * 4 */
GPR_ASSERT(pq.alarm_count * 2 <= pq.alarm_capacity);
GPR_ASSERT(pq.alarm_capacity <= pq.alarm_count * 4);
check_valid(&pq);
/* Remove the rest of the elements. Check that the Capacity is not more than
4 times the Size and not less than 2 times, but never goes below 16. */
expected_size = pq.alarm_count;
while (pq.alarm_count > 0) {
const int which = rand() % pq.alarm_count;
grpc_alarm *te = pq.alarms[which];
grpc_alarm_heap_remove(&pq, te);
gpr_free(te);
expected_size--;
GPR_ASSERT(expected_size == pq.alarm_count);
GPR_ASSERT(pq.alarm_count * 2 <= pq.alarm_capacity);
if (pq.alarm_count >= 8) {
GPR_ASSERT(pq.alarm_capacity <= pq.alarm_count * 4);
} else {
GPR_ASSERT(16 <= pq.alarm_capacity);
}
check_valid(&pq);
}
GPR_ASSERT(0 == pq.alarm_count);
GPR_ASSERT(pq.alarm_capacity >= 16 && pq.alarm_capacity < 32);
grpc_alarm_heap_destroy(&pq);
}
int main(int argc, char **argv) {
int i;
grpc_test_init(argc, argv);
for (i = 0; i < 5; i++) {
test1();
shrink_test();
}
return 0;
}

@ -0,0 +1,144 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/iomgr/alarm.h"
#include <string.h>
#include "src/core/iomgr/alarm_internal.h"
#include <grpc/support/log.h>
#include "test/core/util/test_config.h"
#define MAX_CB 30
static int cb_called[MAX_CB][GRPC_CALLBACK_DO_NOT_USE];
static int kicks;
void grpc_kick_poller() { ++kicks; }
static void cb(void *arg, grpc_iomgr_cb_status status) {
cb_called[(gpr_intptr)arg][status]++;
}
static void add_test() {
gpr_timespec start = gpr_now();
int i;
grpc_alarm alarms[20];
grpc_alarm_list_init(start);
memset(cb_called, 0, sizeof(cb_called));
/* 10 ms alarms. will expire in the current epoch */
for (i = 0; i < 10; i++) {
grpc_alarm_init(&alarms[i], gpr_time_add(start, gpr_time_from_millis(10)),
cb, (void *)(gpr_intptr)i, start);
}
/* 1010 ms alarms. will expire in the next epoch */
for (i = 10; i < 20; i++) {
grpc_alarm_init(&alarms[i], gpr_time_add(start, gpr_time_from_millis(1010)),
cb, (void *)(gpr_intptr)i, start);
}
/* collect alarms. Only the first batch should be ready. */
GPR_ASSERT(10 ==
grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(500))));
for (i = 0; i < 20; i++) {
GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 10));
GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0);
GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0);
}
GPR_ASSERT(0 ==
grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(600))));
for (i = 0; i < 30; i++) {
GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 10));
GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0);
GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0);
}
/* collect the rest of the alarms */
GPR_ASSERT(10 ==
grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(1500))));
for (i = 0; i < 30; i++) {
GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 20));
GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0);
GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0);
}
GPR_ASSERT(0 ==
grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(1600))));
for (i = 0; i < 30; i++) {
GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 20));
GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0);
GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0);
}
grpc_alarm_list_shutdown();
}
/* Cleaning up a list with pending alarms. */
void destruction_test() {
grpc_alarm alarms[5];
grpc_alarm_list_init(gpr_time_0);
memset(cb_called, 0, sizeof(cb_called));
grpc_alarm_init(&alarms[0], gpr_time_from_millis(100), cb,
(void *)(gpr_intptr)0, gpr_time_0);
grpc_alarm_init(&alarms[1], gpr_time_from_millis(3), cb,
(void *)(gpr_intptr)1, gpr_time_0);
grpc_alarm_init(&alarms[2], gpr_time_from_millis(100), cb,
(void *)(gpr_intptr)2, gpr_time_0);
grpc_alarm_init(&alarms[3], gpr_time_from_millis(3), cb,
(void *)(gpr_intptr)3, gpr_time_0);
grpc_alarm_init(&alarms[4], gpr_time_from_millis(1), cb,
(void *)(gpr_intptr)4, gpr_time_0);
GPR_ASSERT(1 == grpc_alarm_check(gpr_time_from_millis(2)));
GPR_ASSERT(1 == cb_called[4][GRPC_CALLBACK_SUCCESS]);
grpc_alarm_cancel(&alarms[0]);
grpc_alarm_cancel(&alarms[3]);
GPR_ASSERT(1 == cb_called[0][GRPC_CALLBACK_CANCELLED]);
GPR_ASSERT(1 == cb_called[3][GRPC_CALLBACK_CANCELLED]);
grpc_alarm_list_shutdown();
GPR_ASSERT(1 == cb_called[1][GRPC_CALLBACK_CANCELLED]);
GPR_ASSERT(1 == cb_called[2][GRPC_CALLBACK_CANCELLED]);
}
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
add_test();
destruction_test();
return 0;
}

@ -92,12 +92,9 @@ static void alarm_cb(void *arg /* alarm_arg */, grpc_iomgr_cb_status status) {
static void test_grpc_alarm() {
grpc_alarm alarm;
grpc_alarm alarm_to_cancel;
gpr_timespec tv0 = {0, 1};
/* Timeout on the alarm cond. var, so make big enough to absorb time
deviations. Otherwise, operations after wait will not be properly ordered
*/
gpr_timespec tv1 = gpr_time_from_micros(200000);
gpr_timespec tv2 = {0, 1};
gpr_timespec alarm_deadline;
gpr_timespec followup_deadline;
@ -116,17 +113,20 @@ static void test_grpc_alarm() {
gpr_cv_init(&arg.cv);
gpr_event_init(&arg.fcb_arg);
grpc_alarm_init(&alarm, gpr_time_add(tv0, gpr_now()), alarm_cb, &arg,
gpr_now());
grpc_alarm_init(&alarm, gpr_time_add(gpr_time_from_millis(100), gpr_now()),
alarm_cb, &arg, gpr_now());
alarm_deadline = gpr_time_add(gpr_now(), tv1);
alarm_deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(1));
gpr_mu_lock(&arg.mu);
while (arg.done == 0) {
gpr_cv_wait(&arg.cv, &arg.mu, alarm_deadline);
if (gpr_cv_wait(&arg.cv, &arg.mu, alarm_deadline)) {
gpr_log(GPR_ERROR, "alarm deadline exceeded");
break;
}
}
gpr_mu_unlock(&arg.mu);
followup_deadline = gpr_time_add(gpr_now(), tv1);
followup_deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(5));
fdone = gpr_event_wait(&arg.fcb_arg, followup_deadline);
if (arg.counter != 1) {
@ -162,18 +162,21 @@ static void test_grpc_alarm() {
gpr_cv_init(&arg2.cv);
gpr_event_init(&arg2.fcb_arg);
grpc_alarm_init(&alarm_to_cancel, gpr_time_add(tv2, gpr_now()), alarm_cb,
grpc_alarm_init(&alarm_to_cancel,
gpr_time_add(gpr_time_from_millis(100), gpr_now()), alarm_cb,
&arg2, gpr_now());
grpc_alarm_cancel(&alarm_to_cancel);
alarm_deadline = gpr_time_add(gpr_now(), tv1);
alarm_deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(1));
gpr_mu_lock(&arg2.mu);
while (arg2.done == 0) {
gpr_cv_wait(&arg2.cv, &arg2.mu, alarm_deadline);
}
gpr_mu_unlock(&arg2.mu);
followup_deadline = gpr_time_add(gpr_now(), tv1);
gpr_log(GPR_INFO, "alarm done = %d", arg2.done);
followup_deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(5));
fdone = gpr_event_wait(&arg2.fcb_arg, followup_deadline);
if (arg2.counter != arg2.done_success_ctr) {
@ -191,11 +194,11 @@ static void test_grpc_alarm() {
} else if (arg2.done_success_ctr) {
gpr_log(GPR_INFO, "Alarm callback executed before cancel");
gpr_log(GPR_INFO, "Current value of triggered is %d\n",
(int)alarm_to_cancel.triggered);
alarm_to_cancel.triggered);
} else if (arg2.done_cancel_ctr) {
gpr_log(GPR_INFO, "Alarm callback canceled");
gpr_log(GPR_INFO, "Current value of triggered is %d\n",
(int)alarm_to_cancel.triggered);
alarm_to_cancel.triggered);
} else {
gpr_log(GPR_ERROR, "Alarm cancel test should not be here");
GPR_ASSERT(0);

Loading…
Cancel
Save