64-bit atomic operations

pull/14894/head
Sree Kuchibhotla 7 years ago
parent 4c8d6fb8f5
commit 9c142c9dc9
  1. 71
      include/grpc/impl/codegen/atm64.h
  2. 42
      include/grpc/impl/codegen/atm64_gcc_atomic.h
  3. 43
      include/grpc/impl/codegen/atm64_gcc_sync.h
  4. 50
      include/grpc/impl/codegen/atm64_windows.h
  5. 26
      include/grpc/support/atm64.h
  6. 7
      src/core/lib/iomgr/exec_ctx.h
  7. 65
      src/core/lib/iomgr/timer_generic.cc

@ -0,0 +1,71 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_IMPL_CODEGEN_ATM64_H
#define GRPC_IMPL_CODEGEN_ATM64_H
/** This interface provides atomic operations and barriers for 64 bit integer
data types (instead of intptr_t so that this works on both 32-bit and 64-bit
systems.
It is internal to gpr support code and should not be used outside it.
If an operation with acquire semantics precedes another memory access by the
same thread, the operation will precede that other access as seen by other
threads.
If an operation with release semantics follows another memory access by the
same thread, the operation will follow that other access as seen by other
threads.
Routines with "acq" or "full" in the name have acquire semantics. Routines
with "rel" or "full" in the name have release semantics. Routines with
"no_barrier" in the name have neither acquire not release semantics.
The routines may be implemented as macros.
// Atomic operations act on an intergral_type gpr_atm64 that is 64 bit wide
typedef int64_t gpr_atm64;
gpr_atm64 gpr_atm64_no_barrier_load(gpr_atm64 *p);
// Atomically set *p = value, with release semantics.
void gpr_atm64_no_barrier_store(gpr_atm64 *p, gpr_atm64 value);
*/
#include <grpc/impl/codegen/port_platform.h>
#if defined(GPR_GCC_ATOMIC)
#include <grpc/impl/codegen/atm64_gcc_atomic.h>
#elif defined(GPR_GCC_SYNC)
#include <grpc/impl/codegen/atm64_gcc_sync.h>
#elif defined(GPR_WINDOWS_ATOMIC)
#include <grpc/impl/codegen/atm64_windows.h>
#else
#error could not determine platform for atm
#endif
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif /* GRPC_IMPL_CODEGEN_ATM64_H */

@ -0,0 +1,42 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_IMPL_CODEGEN_ATM64_GCC_ATOMIC_H
#define GRPC_IMPL_CODEGEN_ATM64_GCC_ATOMIC_H
/* atm_platform.h for gcc and gcc-like compilers with the
__atomic_* interface. */
#include <grpc/impl/codegen/port_platform.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef int64_t gpr_atm64;
#define GPR_ATM64_MAX INT64_MAX
#define GPR_ATM64_MIN INT64_MIN
#define gpr_atm_no_barrier_load(p) (__atomic_load_n((p), __ATOMIC_RELAXED))
#define gpr_atm_no_barrier_store(p, value) \
(__atomic_store_n((p), (int64_t)(value), __ATOMIC_RELAXED))
#ifdef __cplusplus
}
#endif
#endif /* GRPC_IMPL_CODEGEN_ATM64_GCC_ATOMIC_H */

@ -0,0 +1,43 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_IMPL_CODEGEN_ATM64_GCC_SYNC64_H
#define GRPC_IMPL_CODEGEN_ATM64_GCC_SYNC64_H
/* variant of atm_platform.h for gcc and gcc-like compiers with __sync_*
interface */
#include <grpc/impl/codegen/port_platform.h>
typedef int64_t gpr_atm64;
#define GPR_ATM64_MAX INT64_MAX
#define GPR_ATM64_MIN INT64_MIN
#define GPR_ATM64_COMPILE_BARRIER_() __asm__ __volatile__("" : : : "memory")
static __inline gpr_atm64 gpr_atm64_no_barrier_load(const gpr_atm64* p) {
gpr_atm64 value = *p;
GPR_ATM64_COMPILE_BARRIER_();
return value;
}
static __inline void gpr_atm64_no_barrier_store(gpr_atm64* p, gpr_atm64 value) {
GPR_ATM64_COMPILE_BARRIER_();
*p = value;
}
#endif /* GRPC_IMPL_CODEGEN_ATM64_GCC_SYNC64_H */

@ -0,0 +1,50 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_IMPL_CODEGEN_ATM64_WINDOWS_H
#define GRPC_IMPL_CODEGEN_ATM64_WINDOWS_H
/** Win32 variant of atm_platform.h */
#include <grpc/impl/codegen/port_platform.h>
typedef int64_t gpr_atm64;
#define GPR_ATM64_MAX INT64_MAX
#define GPR_ATM64_MIN INT64_MIN
#define gpr_atm64_full_barrier MemoryBarrier
static __inline gpr_atm64 gpr_atm64_acq_load(const gpr_atm64* p) {
gpr_atm64 result = *p;
gpr_atm64_full_barrier();
return result;
}
static __inline gpr_atm64 gpr_atm64_no_barrier_load(const gpr_atm64* p) {
return gpr_atm64_acq_load(p);
}
static __inline void gpr_atm64_rel_store(gpr_atm64* p, gpr_atm64 value) {
gpr_atm64_full_barrier();
*p = value;
}
static __inline void gpr_atm64_no_barrier_store(gpr_atm64* p, gpr_atm64 value) {
gpr_atm64_rel_store(p, value);
}
#endif /* GRPC_IMPL_CODEGEN_ATM64_WINDOWS_H */

@ -0,0 +1,26 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_SUPPORT_ATM64_H
#define GRPC_SUPPORT_ATM64_H
#include <grpc/support/port_platform.h>
#include <grpc/impl/codegen/atm64.h>
#endif /* GRPC_SUPPORT_ATM64_H */

@ -22,16 +22,17 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/atm.h>
#include <grpc/support/atm64.h>
#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/iomgr/closure.h"
typedef int64_t grpc_millis;
typedef gpr_atm64 grpc_millis;
#define GRPC_MILLIS_INF_FUTURE INT64_MAX
#define GRPC_MILLIS_INF_PAST INT64_MIN
#define GRPC_MILLIS_INF_FUTURE GPR_ATM64_MAX
#define GRPC_MILLIS_INF_PAST GPR_ATM64_MIN
/** A workqueue represents a list of work to be executed asynchronously.
Forward declared here to avoid a circular dependency with workqueue.h. */

@ -30,11 +30,11 @@
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/spinlock.h"
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/time_averaged_stats.h"
#include "src/core/lib/iomgr/timer_heap.h"
@ -60,9 +60,9 @@ typedef struct {
gpr_mu mu;
grpc_time_averaged_stats stats;
/* All and only timers with deadlines <= this will be in the heap. */
gpr_atm queue_deadline_cap;
grpc_millis queue_deadline_cap;
/* The deadline of the next timer due in this shard */
gpr_atm min_deadline;
grpc_millis min_deadline;
/* Index of this timer_shard in the g_shard_queue */
uint32_t shard_queue_index;
/* This holds all timers with deadlines < queue_deadline_cap. Timers in this
@ -210,7 +210,7 @@ GPR_TLS_DECL(g_last_seen_min_timer);
struct shared_mutables {
/* The deadline of the next timer due across all timer shards */
gpr_atm min_timer;
grpc_millis min_timer;
/* Allow only one run_some_expired_timers at once */
gpr_spinlock checker_mu;
bool initialized;
@ -220,18 +220,18 @@ struct shared_mutables {
static struct shared_mutables g_shared_mutables;
static gpr_atm saturating_add(gpr_atm a, gpr_atm b) {
if (a > GPR_ATM_MAX - b) {
return GPR_ATM_MAX;
static grpc_millis saturating_add(grpc_millis a, grpc_millis b) {
if (a > GRPC_MILLIS_INF_FUTURE - b) {
return GRPC_MILLIS_INF_FUTURE;
}
return a + b;
}
static grpc_timer_check_result run_some_expired_timers(gpr_atm now,
gpr_atm* next,
static grpc_timer_check_result run_some_expired_timers(grpc_millis now,
grpc_millis* next,
grpc_error* error);
static gpr_atm compute_min_deadline(timer_shard* shard) {
static grpc_millis compute_min_deadline(timer_shard* shard) {
return grpc_timer_heap_is_empty(&shard->heap)
? saturating_add(shard->queue_deadline_cap, 1)
: grpc_timer_heap_top(&shard->heap)->deadline;
@ -337,9 +337,9 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline,
#endif
if (grpc_timer_trace.enabled()) {
gpr_log(GPR_DEBUG,
"TIMER %p: SET %" PRIdPTR " now %" PRId64 " call %p[%p]", timer,
deadline, grpc_core::ExecCtx::Get()->Now(), closure, closure->cb);
gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRIdPTR " now %" PRId64 " call %p[%p]",
timer, deadline, grpc_core::ExecCtx::Get()->Now(), closure,
closure->cb);
}
if (!g_shared_mutables.initialized) {
@ -374,7 +374,7 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline,
}
if (grpc_timer_trace.enabled()) {
gpr_log(GPR_DEBUG,
" .. add to shard %d with queue_deadline_cap=%" PRIdPTR
" .. add to shard %d with queue_deadline_cap=%" PRId64
" => is_first_timer=%s",
static_cast<int>(shard - g_shards), shard->queue_deadline_cap,
is_first_timer ? "true" : "false");
@ -395,11 +395,11 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline,
if (is_first_timer) {
gpr_mu_lock(&g_shared_mutables.mu);
if (grpc_timer_trace.enabled()) {
gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRIdPTR,
gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRId64,
shard->min_deadline);
}
if (deadline < shard->min_deadline) {
gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline;
grpc_millis old_min_deadline = g_shard_queue[0]->min_deadline;
shard->min_deadline = deadline;
note_deadline_change(shard);
if (shard->shard_queue_index == 0 && deadline < old_min_deadline) {
@ -450,7 +450,7 @@ static void timer_cancel(grpc_timer* timer) {
'queue_deadline_cap') into into shard->heap.
Returns 'true' if shard->heap has atleast ONE element
REQUIRES: shard->mu locked */
static int refill_heap(timer_shard* shard, gpr_atm now) {
static int refill_heap(timer_shard* shard, grpc_millis now) {
/* Compute the new queue window width and bound by the limits: */
double computed_deadline_delta =
grpc_time_averaged_stats_update_average(&shard->stats) *
@ -463,10 +463,10 @@ static int refill_heap(timer_shard* shard, gpr_atm now) {
/* Compute the new cap and put all timers under it into the queue: */
shard->queue_deadline_cap =
saturating_add(GPR_MAX(now, shard->queue_deadline_cap),
static_cast<gpr_atm>(deadline_delta * 1000.0));
static_cast<grpc_millis>(deadline_delta * 1000.0));
if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, " .. shard[%d]->queue_deadline_cap --> %" PRIdPTR,
gpr_log(GPR_DEBUG, " .. shard[%d]->queue_deadline_cap --> %" PRId64,
static_cast<int>(shard - g_shards), shard->queue_deadline_cap);
}
for (timer = shard->list.next; timer != &shard->list; timer = next) {
@ -474,7 +474,7 @@ static int refill_heap(timer_shard* shard, gpr_atm now) {
if (timer->deadline < shard->queue_deadline_cap) {
if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, " .. add timer with deadline %" PRIdPTR " to heap",
gpr_log(GPR_DEBUG, " .. add timer with deadline %" PRId64 " to heap",
timer->deadline);
}
list_remove(timer);
@ -487,7 +487,7 @@ static int refill_heap(timer_shard* shard, gpr_atm now) {
/* This pops the next non-cancelled timer with deadline <= now from the
queue, or returns NULL if there isn't one.
REQUIRES: shard->mu locked */
static grpc_timer* pop_one(timer_shard* shard, gpr_atm now) {
static grpc_timer* pop_one(timer_shard* shard, grpc_millis now) {
grpc_timer* timer;
for (;;) {
if (grpc_timer_check_trace.enabled()) {
@ -518,8 +518,8 @@ static grpc_timer* pop_one(timer_shard* shard, gpr_atm now) {
}
/* REQUIRES: shard->mu unlocked */
static size_t pop_timers(timer_shard* shard, gpr_atm now,
gpr_atm* new_min_deadline, grpc_error* error) {
static size_t pop_timers(timer_shard* shard, grpc_millis now,
grpc_millis* new_min_deadline, grpc_error* error) {
size_t n = 0;
grpc_timer* timer;
gpr_mu_lock(&shard->mu);
@ -537,12 +537,12 @@ static size_t pop_timers(timer_shard* shard, gpr_atm now,
return n;
}
static grpc_timer_check_result run_some_expired_timers(gpr_atm now,
gpr_atm* next,
static grpc_timer_check_result run_some_expired_timers(grpc_millis now,
grpc_millis* next,
grpc_error* error) {
grpc_timer_check_result result = GRPC_TIMERS_NOT_CHECKED;
gpr_atm min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer);
grpc_millis min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer);
gpr_tls_set(&g_last_seen_min_timer, min_timer);
if (now < min_timer) {
if (next != nullptr) *next = GPR_MIN(*next, min_timer);
@ -554,14 +554,15 @@ static grpc_timer_check_result run_some_expired_timers(gpr_atm now,
result = GRPC_TIMERS_CHECKED_AND_EMPTY;
if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, " .. shard[%d]->min_deadline = %" PRIdPTR,
gpr_log(GPR_DEBUG, " .. shard[%d]->min_deadline = %" PRId64,
static_cast<int>(g_shard_queue[0] - g_shards),
g_shard_queue[0]->min_deadline);
}
while (g_shard_queue[0]->min_deadline < now ||
(now != GPR_ATM_MAX && g_shard_queue[0]->min_deadline == now)) {
gpr_atm new_min_deadline;
(now != GRPC_MILLIS_INF_FUTURE &&
g_shard_queue[0]->min_deadline == now)) {
grpc_millis new_min_deadline;
/* For efficiency, we pop as many available timers as we can from the
shard. This may violate perfect timer deadline ordering, but that
@ -573,8 +574,8 @@ static grpc_timer_check_result run_some_expired_timers(gpr_atm now,
if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG,
" .. result --> %d"
", shard[%d]->min_deadline %" PRIdPTR " --> %" PRIdPTR
", now=%" PRIdPTR,
", shard[%d]->min_deadline %" PRId64 " --> %" PRId64
", now=%" PRId64,
result, static_cast<int>(g_shard_queue[0] - g_shards),
g_shard_queue[0]->min_deadline, new_min_deadline, now);
}
@ -616,7 +617,7 @@ static grpc_timer_check_result timer_check(grpc_millis* next) {
}
if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG,
"TIMER CHECK SKIP: now=%" PRIdPTR " min_timer=%" PRIdPTR, now,
"TIMER CHECK SKIP: now=%" PRId64" min_timer=%" PRId64, now,
min_timer);
}
return GRPC_TIMERS_CHECKED_AND_EMPTY;

Loading…
Cancel
Save