Fix rounding, reduce contention on global shared state

pull/10194/head
Craig Tiller 8 years ago
parent 7b2dd93362
commit 185f6c9e04
  1. 2
      include/grpc/impl/codegen/port_platform.h
  2. 2
      include/grpc/support/tls.h
  3. 2
      src/core/lib/iomgr/ev_epoll_linux.c
  4. 2
      src/core/lib/iomgr/ev_poll_posix.c
  5. 3
      src/core/lib/iomgr/timer.h
  6. 104
      src/core/lib/iomgr/timer_generic.c

@ -367,8 +367,10 @@ typedef unsigned __int64 uint64_t;
#ifndef GRPC_MUST_USE_RESULT
#if defined(__GNUC__) && !defined(__MINGW32__)
#define GRPC_MUST_USE_RESULT __attribute__((warn_unused_result))
#define GPR_ALIGN_STRUCT(n) __attribute__((aligned(n)))
#else
#define GRPC_MUST_USE_RESULT
#define GPR_ALIGN_STRUCT(n)
#endif
#endif

@ -58,7 +58,7 @@
gpr_tls_set(&foo, new_value);
Accessing a thread local:
current_value = gpr_tls_get(&foo, value);
current_value = gpr_tls_get(&foo);
ALL functions here may be implemented as macros. */

@ -56,6 +56,7 @@
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
@ -1669,6 +1670,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
for (int i = 0; i < ep_rv; ++i) {
void *data_ptr = ep_ev[i].data.ptr;
if (data_ptr == &global_wakeup_fd) {
grpc_timer_consume_kick();
append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
} else if (data_ptr == &pi->workqueue_wakeup_fd) {

@ -52,6 +52,7 @@
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_cv.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h"
@ -1004,6 +1005,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
} else {
if (pfds[0].revents & POLLIN_CHECK) {
grpc_timer_consume_kick();
work_combine_error(&error,
grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd));
}

@ -101,6 +101,9 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
void grpc_timer_list_init(gpr_timespec now);
void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx);
/* Consume a kick issued by grpc_kick_poller */
void grpc_timer_consume_kick(void);
/* the following must be implemented by each iomgr implementation */
void grpc_kick_poller(void);

@ -39,6 +39,7 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/tls.h>
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/time_averaged_stats.h"
#include "src/core/lib/iomgr/timer_heap.h"
@ -67,17 +68,25 @@ typedef struct {
grpc_timer list;
} shard_type;
/* Protects g_shard_queue */
static gpr_mu g_mu;
/* Allow only one run_some_expired_timers at once */
static gpr_spinlock g_checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER;
struct shared_mutables {
gpr_atm min_timer;
/* Allow only one run_some_expired_timers at once */
gpr_spinlock checker_mu;
bool initialized;
/* Protects g_shard_queue */
gpr_mu mu;
} GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
static struct shared_mutables g_shared_mutables = {
.checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false,
};
static gpr_clock_type g_clock_type;
static shard_type g_shards[NUM_SHARDS];
/* Protected by g_mu */
/* Protected by g_shared_mutables.mu */
static shard_type *g_shard_queue[NUM_SHARDS];
static bool g_initialized = false;
static gpr_timespec g_start_time;
static gpr_atm g_min_timer;
GPR_TLS_DECL(g_last_seen_min_timer);
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
gpr_atm *next, grpc_error *error);
@ -90,8 +99,17 @@ static gpr_timespec dbl_to_ts(double d) {
return ts;
}
static gpr_atm timespec_to_atm(gpr_timespec ts) {
double x = gpr_timespec_to_micros(gpr_time_sub(ts, g_start_time)) / 1000.0;
static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) {
double x = GPR_MS_PER_SEC * (double)ts.tv_sec +
(double)ts.tv_nsec / GPR_NS_PER_MS + 1.0;
if (x < 0) return 0;
if (x > GPR_ATM_MAX) return GPR_ATM_MAX;
return (gpr_atm)x;
}
static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) {
double x =
GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS;
if (x < 0) return 0;
if (x > GPR_ATM_MAX) return GPR_ATM_MAX;
return (gpr_atm)x;
@ -110,18 +128,19 @@ static gpr_atm compute_min_deadline(shard_type *shard) {
void grpc_timer_list_init(gpr_timespec now) {
uint32_t i;
g_initialized = true;
gpr_mu_init(&g_mu);
g_shared_mutables.initialized = true;
gpr_mu_init(&g_shared_mutables.mu);
g_clock_type = now.clock_type;
g_start_time = now;
g_min_timer = timespec_to_atm(now);
g_shared_mutables.min_timer = timespec_to_atm_round_down(now);
gpr_tls_init(&g_last_seen_min_timer);
for (i = 0; i < NUM_SHARDS; i++) {
shard_type *shard = &g_shards[i];
gpr_mu_init(&shard->mu);
grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
0.5);
shard->queue_deadline_cap = timespec_to_atm(now);
shard->queue_deadline_cap = g_shared_mutables.min_timer;
shard->shard_queue_index = i;
grpc_timer_heap_init(&shard->heap);
shard->list.next = shard->list.prev = &shard->list;
@ -139,8 +158,9 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {
gpr_mu_destroy(&shard->mu);
grpc_timer_heap_destroy(&shard->heap);
}
gpr_mu_destroy(&g_mu);
g_initialized = false;
gpr_mu_destroy(&g_shared_mutables.mu);
gpr_tls_destroy(&g_last_seen_min_timer);
g_shared_mutables.initialized = false;
}
static double ts_to_dbl(gpr_timespec ts) {
@ -191,9 +211,9 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
GPR_ASSERT(deadline.clock_type == g_clock_type);
GPR_ASSERT(now.clock_type == g_clock_type);
timer->closure = closure;
timer->deadline = timespec_to_atm(deadline);
timer->deadline = timespec_to_atm_round_up(deadline);
if (!g_initialized) {
if (!g_shared_mutables.initialized) {
timer->pending = false;
grpc_closure_sched(
exec_ctx, timer->closure,
@ -233,22 +253,27 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
In that case, the timer will simply have to wait for the next
grpc_timer_check. */
if (is_first_timer) {
gpr_mu_lock(&g_mu);
gpr_mu_lock(&g_shared_mutables.mu);
if (timer->deadline < shard->min_deadline) {
gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline;
shard->min_deadline = timer->deadline;
note_deadline_change(shard);
if (shard->shard_queue_index == 0 && timer->deadline < old_min_deadline) {
gpr_atm_no_barrier_store(&g_min_timer, timer->deadline);
gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, timer->deadline);
grpc_kick_poller();
}
}
gpr_mu_unlock(&g_mu);
gpr_mu_unlock(&g_shared_mutables.mu);
}
}
void grpc_timer_consume_kick(void) {
/* force re-evaluation of last seeen min */
gpr_tls_set(&g_last_seen_min_timer, 0);
}
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
if (!g_initialized) {
if (!g_shared_mutables.initialized) {
/* must have already been cancelled, also the shard mutex is invalid */
return;
}
@ -334,12 +359,23 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
gpr_atm *next, grpc_error *error) {
size_t n = 0;
if (now < gpr_atm_no_barrier_load(&g_min_timer)) {
/* fetch from a thread-local first: this avoids contention on a globally
mutable cacheline in the common case */
gpr_atm min_timer = gpr_tls_get(&g_last_seen_min_timer);
if (now < min_timer) {
if (next != NULL) *next = GPR_MIN(*next, min_timer);
return 0;
}
if (gpr_spinlock_trylock(&g_checker_mu)) {
gpr_mu_lock(&g_mu);
min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer);
gpr_tls_set(&g_last_seen_min_timer, min_timer);
if (now < min_timer) {
if (next != NULL) *next = GPR_MIN(*next, min_timer);
return 0;
}
if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) {
gpr_mu_lock(&g_shared_mutables.mu);
while (g_shard_queue[0]->min_deadline < now) {
gpr_atm new_min_deadline;
@ -363,20 +399,10 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
*next = GPR_MIN(*next, g_shard_queue[0]->min_deadline);
}
gpr_atm_no_barrier_store(&g_min_timer, g_shard_queue[0]->min_deadline);
gpr_mu_unlock(&g_mu);
gpr_spinlock_unlock(&g_checker_mu);
} else if (next != NULL) {
/* TODO(ctiller): this forces calling code to do an short poll, and
then retry the timer check (because this time through the timer list was
contended).
We could reduce the cost here dramatically by keeping a count of how many
currently active pollers got through the uncontended case above
successfully, and waking up other pollers IFF that count drops to zero.
Once that count is in place, this entire else branch could disappear. */
*next = GPR_MIN(*next, now + 1);
gpr_atm_no_barrier_store(&g_shared_mutables.min_timer,
g_shard_queue[0]->min_deadline);
gpr_mu_unlock(&g_shared_mutables.mu);
gpr_spinlock_unlock(&g_shared_mutables.checker_mu);
}
GRPC_ERROR_UNREF(error);
@ -387,7 +413,7 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next) {
GPR_ASSERT(now.clock_type == g_clock_type);
gpr_atm now_atm = timespec_to_atm(now);
gpr_atm now_atm = timespec_to_atm_round_down(now);
gpr_atm next_atm;
bool r = run_some_expired_timers(
exec_ctx, now_atm, &next_atm,

Loading…
Cancel
Save