Merge pull request #11333 from ctiller/faster_timer_pool

Get rid of zero-length poll case now that timer pool exists
reviewable/pr11336/r7^2
Craig Tiller 8 years ago committed by GitHub
commit 6b19d3b408
  1. 3
      src/core/lib/iomgr/iomgr.c
  2. 10
      src/core/lib/iomgr/timer.h
  3. 58
      src/core/lib/iomgr/timer_generic.c
  4. 87
      src/core/lib/iomgr/timer_manager.c
  5. 6
      src/core/lib/iomgr/timer_uv.c
  6. 6
      src/core/lib/support/log.c
  7. 13
      test/core/iomgr/tcp_client_posix_test.c
  8. 13
      test/core/iomgr/tcp_client_uv_test.c
  9. 26
      test/core/iomgr/timer_list_test.c

@ -107,7 +107,8 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) {
}
last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
}
if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) {
if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL) ==
GRPC_TIMERS_FIRED) {
gpr_mu_unlock(&g_mu);
grpc_exec_ctx_flush(exec_ctx);
grpc_iomgr_platform_flush();

@ -89,6 +89,12 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer);
/* iomgr internal api for dealing with timers */
typedef enum {
GRPC_TIMERS_NOT_CHECKED,
GRPC_TIMERS_CHECKED_AND_EMPTY,
GRPC_TIMERS_FIRED,
} grpc_timer_check_result;
/* Check for timers to be run, and run them.
Return true if timer callbacks were executed.
If next is non-null, TRY to update *next with the next running timer
@ -96,8 +102,8 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer);
*next is never guaranteed to be updated on any given execution; however,
with high probability at least one thread in the system will see an update
at any time slice. */
bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next);
grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx,
gpr_timespec now, gpr_timespec *next);
void grpc_timer_list_init(gpr_timespec now);
void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx);

@ -101,8 +101,10 @@ static gpr_atm saturating_add(gpr_atm a, gpr_atm b) {
return a + b;
}
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
gpr_atm *next, grpc_error *error);
static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx,
gpr_atm now,
gpr_atm *next,
grpc_error *error);
static gpr_timespec dbl_to_ts(double d) {
gpr_timespec ts;
@ -421,19 +423,22 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
return n;
}
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
gpr_atm *next, grpc_error *error) {
size_t n = 0;
static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx,
gpr_atm now,
gpr_atm *next,
grpc_error *error) {
grpc_timer_check_result result = GRPC_TIMERS_NOT_CHECKED;
gpr_atm min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer);
gpr_tls_set(&g_last_seen_min_timer, min_timer);
if (now < min_timer) {
if (next != NULL) *next = GPR_MIN(*next, min_timer);
return 0;
return GRPC_TIMERS_CHECKED_AND_EMPTY;
}
if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) {
gpr_mu_lock(&g_shared_mutables.mu);
result = GRPC_TIMERS_CHECKED_AND_EMPTY;
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, " .. shard[%d]->min_deadline = %" PRIdPTR,
@ -448,14 +453,17 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
/* For efficiency, we pop as many available timers as we can from the
shard. This may violate perfect timer deadline ordering, but that
shouldn't be a big deal because we don't make ordering guarantees. */
n +=
pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, error);
if (pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline,
error) > 0) {
result = GRPC_TIMERS_FIRED;
}
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, " .. popped --> %" PRIdPTR
", shard[%d]->min_deadline %" PRIdPTR
" --> %" PRIdPTR ", now=%" PRIdPTR,
n, (int)(g_shard_queue[0] - g_shards),
gpr_log(GPR_DEBUG,
" .. result --> %d"
", shard[%d]->min_deadline %" PRIdPTR " --> %" PRIdPTR
", now=%" PRIdPTR,
result, (int)(g_shard_queue[0] - g_shards),
g_shard_queue[0]->min_deadline, new_min_deadline, now);
}
@ -476,26 +484,15 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
g_shard_queue[0]->min_deadline);
gpr_mu_unlock(&g_shared_mutables.mu);
gpr_spinlock_unlock(&g_shared_mutables.checker_mu);
} else if (next != NULL) {
/* TODO(ctiller): this forces calling code to do an short poll, and
then retry the timer check (because this time through the timer list was
contended).
We could reduce the cost here dramatically by keeping a count of how
many currently active pollers got through the uncontended case above
successfully, and waking up other pollers IFF that count drops to zero.
Once that count is in place, this entire else branch could disappear. */
*next = GPR_MIN(*next, now + 1);
}
GRPC_ERROR_UNREF(error);
return (int)n;
return result;
}
bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next) {
grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx,
gpr_timespec now, gpr_timespec *next) {
// prelude
GPR_ASSERT(now.clock_type == g_clock_type);
gpr_atm now_atm = timespec_to_atm_round_down(now);
@ -513,7 +510,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
"TIMER CHECK SKIP: now_atm=%" PRIdPTR " min_timer=%" PRIdPTR,
now_atm, min_timer);
}
return 0;
return GRPC_TIMERS_CHECKED_AND_EMPTY;
}
grpc_error *shutdown_error =
@ -538,7 +535,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_free(next_str);
}
// actual code
bool r;
grpc_timer_check_result r;
gpr_atm next_atm;
if (next == NULL) {
r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error);
@ -556,11 +553,10 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec,
next->tv_nsec, next_atm);
}
gpr_log(GPR_DEBUG, "TIMER CHECK END: %d timers triggered; next=%s", r,
next_str);
gpr_log(GPR_DEBUG, "TIMER CHECK END: r=%d; next=%s", r, next_str);
gpr_free(next_str);
}
return r > 0;
return r;
}
#endif /* GRPC_TIMER_USE_GENERIC */

@ -107,17 +107,7 @@ void grpc_timer_manager_tick() {
grpc_exec_ctx_finish(&exec_ctx);
}
static void timer_thread(void *unused) {
// this threads exec_ctx: we try to run things through to completion here
// since it's easy to spin up new threads
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC);
for (;;) {
gpr_timespec next = inf_future;
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
// check timer state, updates next to the next time to run a check
if (grpc_timer_check(&exec_ctx, now, &next)) {
static void run_some_timers(grpc_exec_ctx *exec_ctx) {
// if there's something to execute...
gpr_mu_lock(&g_mu);
// remove a waiter from the pool, and start another thread if necessary
@ -125,7 +115,8 @@ static void timer_thread(void *unused) {
if (g_waiter_count == 0 && g_threaded) {
start_timer_thread_and_unlock();
} else {
// if there's no thread waiting with a timeout, kick an existing waiter
// if there's no thread waiting with a timeout, kick an existing
// waiter
// so that the next deadline is not missed
if (!g_has_timed_waiter) {
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
@ -136,29 +127,40 @@ static void timer_thread(void *unused) {
gpr_mu_unlock(&g_mu);
}
// without our lock, flush the exec_ctx
grpc_exec_ctx_flush(&exec_ctx);
grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&g_mu);
// garbage collect any threads hanging out that are dead
gc_completed_threads();
// get ready to wait again
++g_waiter_count;
gpr_mu_unlock(&g_mu);
} else {
}
// wait until 'next' (or forever if there is already a timed waiter in the pool)
// returns true if the thread should continue executing (false if it should
// shutdown)
static bool wait_until(gpr_timespec next) {
const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC);
gpr_mu_lock(&g_mu);
// if we're not threaded anymore, leave
if (!g_threaded) break;
if (!g_threaded) {
gpr_mu_unlock(&g_mu);
return false;
}
// if there's no timed waiter, we should become one: that waiter waits
// only until the next timer should expire
// all other timers wait forever
uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
if (!g_has_timed_waiter) {
if (!g_has_timed_waiter && gpr_time_cmp(next, inf_future) != 0) {
g_has_timed_waiter = true;
// we use a generation counter to track the timed waiter so we can
// cancel an existing one quickly (and when it actually times out it'll
// figure stuff out instead of incurring a wakeup)
my_timed_waiter_generation = ++g_timed_waiter_generation;
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "sleep for a while");
gpr_timespec wait_time = gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC));
gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds",
wait_time.tv_sec, wait_time.tv_nsec);
}
} else {
next = inf_future;
@ -169,8 +171,7 @@ static void timer_thread(void *unused) {
gpr_cv_wait(&g_cv_wait, &g_mu, next);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
my_timed_waiter_generation == g_timed_waiter_generation,
g_kicked);
my_timed_waiter_generation == g_timed_waiter_generation, g_kicked);
}
// if this was the timed waiter, then we need to check timers, and flag
// that there's now no timed waiter... we'll look for a replacement if
@ -185,8 +186,45 @@ static void timer_thread(void *unused) {
g_kicked = false;
}
gpr_mu_unlock(&g_mu);
return true;
}
static void timer_main_loop(grpc_exec_ctx *exec_ctx) {
const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC);
for (;;) {
gpr_timespec next = inf_future;
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
// check timer state, updates next to the next time to run a check
switch (grpc_timer_check(exec_ctx, now, &next)) {
case GRPC_TIMERS_FIRED:
run_some_timers(exec_ctx);
break;
case GRPC_TIMERS_NOT_CHECKED:
/* This case only happens under contention, meaning more than one timer
manager thread checked timers concurrently.
If that happens, we're guaranteed that some other thread has just
checked timers, and this will avalanche into some other thread seeing
empty timers and doing a timed sleep.
Consequently, we can just sleep forever here and be happy at some
saved wakeup cycles. */
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "timers not checked: expect another thread to");
}
next = inf_future;
/* fall through */
case GRPC_TIMERS_CHECKED_AND_EMPTY:
if (!wait_until(next)) {
return;
}
break;
}
}
}
static void timer_thread_cleanup(void) {
gpr_mu_lock(&g_mu);
// terminate the thread: drop the waiter count, thread count, and let whomever
// stopped the threading stuff know that we're done
--g_waiter_count;
@ -199,12 +237,21 @@ static void timer_thread(void *unused) {
ct->next = g_completed_threads;
g_completed_threads = ct;
gpr_mu_unlock(&g_mu);
grpc_exec_ctx_finish(&exec_ctx);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "End timer thread");
}
}
static void timer_thread(void *unused) {
// this threads exec_ctx: we try to run things through to completion here
// since it's easy to spin up new threads
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
timer_main_loop(&exec_ctx);
grpc_exec_ctx_finish(&exec_ctx);
timer_thread_cleanup();
}
static void start_threads(void) {
gpr_mu_lock(&g_mu);
if (!g_threaded) {

@ -96,9 +96,9 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
}
}
bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next) {
return false;
grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx,
gpr_timespec now, gpr_timespec *next) {
return GRPC_TIMERS_NOT_CHECKED;
}
void grpc_timer_list_init(gpr_timespec now) {}

@ -43,7 +43,7 @@
#include <string.h>
extern void gpr_default_log(gpr_log_func_args *args);
static gpr_log_func g_log_func = gpr_default_log;
static gpr_atm g_log_func = (gpr_atm)gpr_default_log;
static gpr_atm g_min_severity_to_print = GPR_LOG_VERBOSITY_UNSET;
const char *gpr_log_severity_string(gpr_log_severity severity) {
@ -70,7 +70,7 @@ void gpr_log_message(const char *file, int line, gpr_log_severity severity,
lfargs.line = line;
lfargs.severity = severity;
lfargs.message = message;
g_log_func(&lfargs);
((gpr_log_func)gpr_atm_no_barrier_load(&g_log_func))(&lfargs);
}
void gpr_set_log_verbosity(gpr_log_severity min_severity_to_print) {
@ -99,5 +99,5 @@ void gpr_log_verbosity_init() {
}
void gpr_set_log_function(gpr_log_func f) {
g_log_func = f ? f : gpr_default_log;
gpr_atm_no_barrier_store(&g_log_func, (gpr_atm)(f ? f : gpr_default_log));
}

@ -181,10 +181,17 @@ void test_fails(void) {
grpc_pollset_worker *worker = NULL;
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec polling_deadline = test_deadline();
if (!grpc_timer_check(&exec_ctx, now, &polling_deadline)) {
switch (grpc_timer_check(&exec_ctx, now, &polling_deadline)) {
case GRPC_TIMERS_FIRED:
break;
case GRPC_TIMERS_NOT_CHECKED:
polling_deadline = now;
/* fall through */
case GRPC_TIMERS_CHECKED_AND_EMPTY:
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker, now,
polling_deadline)));
"pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker,
now, polling_deadline)));
break;
}
gpr_mu_unlock(g_mu);
grpc_exec_ctx_flush(&exec_ctx);

@ -178,10 +178,17 @@ void test_fails(void) {
grpc_pollset_worker *worker = NULL;
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec polling_deadline = test_deadline();
if (!grpc_timer_check(&exec_ctx, now, &polling_deadline)) {
switch (grpc_timer_check(&exec_ctx, now, &polling_deadline)) {
case GRPC_TIMERS_FIRED:
break;
case GRPC_TIMERS_NOT_CHECKED:
polling_deadline = now;
/* fall through */
case GRPC_TIMERS_CHECKED_AND_EMPTY:
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker, now,
polling_deadline)));
"pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker,
now, polling_deadline)));
break;
}
gpr_mu_unlock(g_mu);
grpc_exec_ctx_flush(&exec_ctx);

@ -88,17 +88,19 @@ static void add_test(void) {
/* collect timers. Only the first batch should be ready. */
GPR_ASSERT(grpc_timer_check(
&exec_ctx, gpr_time_add(start, gpr_time_from_millis(500, GPR_TIMESPAN)),
NULL));
&exec_ctx,
gpr_time_add(start, gpr_time_from_millis(500, GPR_TIMESPAN)),
NULL) == GRPC_TIMERS_FIRED);
grpc_exec_ctx_finish(&exec_ctx);
for (i = 0; i < 20; i++) {
GPR_ASSERT(cb_called[i][1] == (i < 10));
GPR_ASSERT(cb_called[i][0] == 0);
}
GPR_ASSERT(!grpc_timer_check(
&exec_ctx, gpr_time_add(start, gpr_time_from_millis(600, GPR_TIMESPAN)),
NULL));
GPR_ASSERT(grpc_timer_check(
&exec_ctx,
gpr_time_add(start, gpr_time_from_millis(600, GPR_TIMESPAN)),
NULL) == GRPC_TIMERS_CHECKED_AND_EMPTY);
grpc_exec_ctx_finish(&exec_ctx);
for (i = 0; i < 30; i++) {
GPR_ASSERT(cb_called[i][1] == (i < 10));
@ -107,17 +109,19 @@ static void add_test(void) {
/* collect the rest of the timers */
GPR_ASSERT(grpc_timer_check(
&exec_ctx, gpr_time_add(start, gpr_time_from_millis(1500, GPR_TIMESPAN)),
NULL));
&exec_ctx,
gpr_time_add(start, gpr_time_from_millis(1500, GPR_TIMESPAN)),
NULL) == GRPC_TIMERS_FIRED);
grpc_exec_ctx_finish(&exec_ctx);
for (i = 0; i < 30; i++) {
GPR_ASSERT(cb_called[i][1] == (i < 20));
GPR_ASSERT(cb_called[i][0] == 0);
}
GPR_ASSERT(!grpc_timer_check(
&exec_ctx, gpr_time_add(start, gpr_time_from_millis(1600, GPR_TIMESPAN)),
NULL));
GPR_ASSERT(grpc_timer_check(
&exec_ctx,
gpr_time_add(start, gpr_time_from_millis(1600, GPR_TIMESPAN)),
NULL) == GRPC_TIMERS_CHECKED_AND_EMPTY);
for (i = 0; i < 30; i++) {
GPR_ASSERT(cb_called[i][1] == (i < 20));
GPR_ASSERT(cb_called[i][0] == 0);
@ -163,7 +167,7 @@ void destruction_test(void) {
&exec_ctx, &timers[4], tfm(1),
grpc_closure_create(cb, (void *)(intptr_t)4, grpc_schedule_on_exec_ctx),
gpr_time_0(GPR_CLOCK_REALTIME));
GPR_ASSERT(1 == grpc_timer_check(&exec_ctx, tfm(2), NULL));
GPR_ASSERT(grpc_timer_check(&exec_ctx, tfm(2), NULL) == GRPC_TIMERS_FIRED);
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(1 == cb_called[4][1]);
grpc_timer_cancel(&exec_ctx, &timers[0]);

Loading…
Cancel
Save