Expose cycle counter and use it /channelz.

We are using clock real time (which is very expensive) on every
RPC to update channelz. We should simply use cycle clock.

This patch exposes cycle clock, enables RDTSC on intel,
and use it for channelz.

This patch also changes the implementation of time_precise_init
to that more accurately using the monotonic clock and
shorter by looping at most a few hundred milliseconds.

This is part of a larger performance series.
pull/18937/head
Soheil Hassas Yeganeh 6 years ago
parent e4406a8018
commit bdd3fdddb2
  1. 17
      include/grpc/impl/codegen/port_platform.h
  2. 110
      src/core/lib/channel/channelz.cc
  3. 38
      src/core/lib/channel/channelz.h
  4. 143
      src/core/lib/gpr/time_precise.cc
  5. 36
      src/core/lib/gpr/time_precise.h
  6. 20
      src/core/lib/gprpp/inlined_vector.h
  7. 3
      test/core/channel/channelz_test.cc

@ -463,6 +463,23 @@ typedef unsigned __int64 uint64_t;
#include <stdint.h>
#endif /* _MSC_VER */
/* Type of cycle clock implementation */
#ifdef GPR_LINUX
/* Disable cycle clock by default.
TODO(soheil): enable when we support fallback for unstable cycle clocks.
#if defined(__i386__)
#define GPR_CYCLE_COUNTER_RDTSC_32 1
#elif defined(__x86_64__) || defined(__amd64__)
#define GPR_CYCLE_COUNTER_RDTSC_64 1
#else
#define GPR_CYCLE_COUNTER_FALLBACK 1
#endif
*/
#define GPR_CYCLE_COUNTER_FALLBACK 1
#else
#define GPR_CYCLE_COUNTER_FALLBACK 1
#endif /* GPR_LINUX */
/* Cache line alignment */
#ifndef GPR_CACHELINE_SIZE_LOG
#if defined(__i386__) || defined(__x86_64__)

@ -107,51 +107,45 @@ char* BaseNode::RenderJsonString() {
CallCountingHelper::CallCountingHelper() {
num_cores_ = GPR_MAX(1, gpr_cpu_num_cores());
per_cpu_counter_data_storage_ = static_cast<AtomicCounterData*>(
gpr_zalloc(sizeof(AtomicCounterData) * num_cores_));
}
CallCountingHelper::~CallCountingHelper() {
gpr_free(per_cpu_counter_data_storage_);
per_cpu_counter_data_storage_.reserve(num_cores_);
for (size_t i = 0; i < num_cores_; ++i) {
per_cpu_counter_data_storage_.emplace_back();
}
}
void CallCountingHelper::RecordCallStarted() {
gpr_atm_no_barrier_fetch_add(
&per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()]
.calls_started,
static_cast<gpr_atm>(1));
gpr_atm_no_barrier_store(
&per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()]
.last_call_started_millis,
(gpr_atm)ExecCtx::Get()->Now());
AtomicCounterData& data =
per_cpu_counter_data_storage_[ExecCtx::Get()->starting_cpu()];
data.calls_started.FetchAdd(1, MemoryOrder::RELAXED);
data.last_call_started_cycle.Store(gpr_get_cycle_counter(),
MemoryOrder::RELAXED);
}
void CallCountingHelper::RecordCallFailed() {
gpr_atm_no_barrier_fetch_add(
&per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()]
.calls_failed,
static_cast<gpr_atm>(1));
per_cpu_counter_data_storage_[ExecCtx::Get()->starting_cpu()]
.calls_failed.FetchAdd(1, MemoryOrder::RELAXED);
}
void CallCountingHelper::RecordCallSucceeded() {
gpr_atm_no_barrier_fetch_add(
&per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()]
.calls_succeeded,
static_cast<gpr_atm>(1));
per_cpu_counter_data_storage_[ExecCtx::Get()->starting_cpu()]
.calls_succeeded.FetchAdd(1, MemoryOrder::RELAXED);
}
void CallCountingHelper::CollectData(CounterData* out) {
for (size_t core = 0; core < num_cores_; ++core) {
out->calls_started += gpr_atm_no_barrier_load(
&per_cpu_counter_data_storage_[core].calls_started);
out->calls_succeeded += gpr_atm_no_barrier_load(
&per_cpu_counter_data_storage_[core].calls_succeeded);
out->calls_failed += gpr_atm_no_barrier_load(
&per_cpu_counter_data_storage_[core].calls_failed);
gpr_atm last_call = gpr_atm_no_barrier_load(
&per_cpu_counter_data_storage_[core].last_call_started_millis);
if (last_call > out->last_call_started_millis) {
out->last_call_started_millis = last_call;
AtomicCounterData& data = per_cpu_counter_data_storage_[core];
out->calls_started += data.calls_started.Load(MemoryOrder::RELAXED);
out->calls_succeeded +=
per_cpu_counter_data_storage_[core].calls_succeeded.Load(
MemoryOrder::RELAXED);
out->calls_failed += per_cpu_counter_data_storage_[core].calls_failed.Load(
MemoryOrder::RELAXED);
const gpr_cycle_counter last_call =
per_cpu_counter_data_storage_[core].last_call_started_cycle.Load(
MemoryOrder::RELAXED);
if (last_call > out->last_call_started_cycle) {
out->last_call_started_cycle = last_call;
}
}
}
@ -173,8 +167,9 @@ void CallCountingHelper::PopulateCallCounts(grpc_json* json) {
json, json_iterator, "callsFailed", data.calls_failed);
}
if (data.calls_started != 0) {
gpr_timespec ts = grpc_millis_to_timespec(data.last_call_started_millis,
GPR_CLOCK_REALTIME);
gpr_timespec ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(data.last_call_started_cycle),
GPR_CLOCK_REALTIME);
json_iterator =
grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp",
gpr_format_timespec(ts), GRPC_JSON_STRING, true);
@ -493,26 +488,25 @@ SocketNode::SocketNode(UniquePtr<char> local, UniquePtr<char> remote,
void SocketNode::RecordStreamStartedFromLocal() {
gpr_atm_no_barrier_fetch_add(&streams_started_, static_cast<gpr_atm>(1));
gpr_atm_no_barrier_store(&last_local_stream_created_millis_,
(gpr_atm)ExecCtx::Get()->Now());
gpr_atm_no_barrier_store(&last_local_stream_created_cycle_,
gpr_get_cycle_counter());
}
void SocketNode::RecordStreamStartedFromRemote() {
gpr_atm_no_barrier_fetch_add(&streams_started_, static_cast<gpr_atm>(1));
gpr_atm_no_barrier_store(&last_remote_stream_created_millis_,
(gpr_atm)ExecCtx::Get()->Now());
gpr_atm_no_barrier_store(&last_remote_stream_created_cycle_,
gpr_get_cycle_counter());
}
void SocketNode::RecordMessagesSent(uint32_t num_sent) {
gpr_atm_no_barrier_fetch_add(&messages_sent_, static_cast<gpr_atm>(num_sent));
gpr_atm_no_barrier_store(&last_message_sent_millis_,
(gpr_atm)ExecCtx::Get()->Now());
gpr_atm_no_barrier_store(&last_message_sent_cycle_, gpr_get_cycle_counter());
}
void SocketNode::RecordMessageReceived() {
gpr_atm_no_barrier_fetch_add(&messages_received_, static_cast<gpr_atm>(1));
gpr_atm_no_barrier_store(&last_message_received_millis_,
(gpr_atm)ExecCtx::Get()->Now());
gpr_atm_no_barrier_store(&last_message_received_cycle_,
gpr_get_cycle_counter());
}
grpc_json* SocketNode::RenderJson() {
@ -545,20 +539,22 @@ grpc_json* SocketNode::RenderJson() {
if (streams_started != 0) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "streamsStarted", streams_started);
gpr_atm last_local_stream_created_millis =
gpr_atm_no_barrier_load(&last_local_stream_created_millis_);
if (last_local_stream_created_millis != 0) {
ts = grpc_millis_to_timespec(last_local_stream_created_millis,
GPR_CLOCK_REALTIME);
gpr_cycle_counter last_local_stream_created_cycle =
gpr_atm_no_barrier_load(&last_local_stream_created_cycle_);
if (last_local_stream_created_cycle != 0) {
ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(last_local_stream_created_cycle),
GPR_CLOCK_REALTIME);
json_iterator = grpc_json_create_child(
json_iterator, json, "lastLocalStreamCreatedTimestamp",
gpr_format_timespec(ts), GRPC_JSON_STRING, true);
}
gpr_atm last_remote_stream_created_millis =
gpr_atm_no_barrier_load(&last_remote_stream_created_millis_);
if (last_remote_stream_created_millis != 0) {
ts = grpc_millis_to_timespec(last_remote_stream_created_millis,
GPR_CLOCK_REALTIME);
gpr_cycle_counter last_remote_stream_created_cycle =
gpr_atm_no_barrier_load(&last_remote_stream_created_cycle_);
if (last_remote_stream_created_cycle != 0) {
ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(last_remote_stream_created_cycle),
GPR_CLOCK_REALTIME);
json_iterator = grpc_json_create_child(
json_iterator, json, "lastRemoteStreamCreatedTimestamp",
gpr_format_timespec(ts), GRPC_JSON_STRING, true);
@ -578,8 +574,9 @@ grpc_json* SocketNode::RenderJson() {
if (messages_sent != 0) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "messagesSent", messages_sent);
ts = grpc_millis_to_timespec(
gpr_atm_no_barrier_load(&last_message_sent_millis_),
ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(
gpr_atm_no_barrier_load(&last_message_sent_cycle_)),
GPR_CLOCK_REALTIME);
json_iterator =
grpc_json_create_child(json_iterator, json, "lastMessageSentTimestamp",
@ -589,8 +586,9 @@ grpc_json* SocketNode::RenderJson() {
if (messages_received != 0) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "messagesReceived", messages_received);
ts = grpc_millis_to_timespec(
gpr_atm_no_barrier_load(&last_message_received_millis_),
ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(
gpr_atm_no_barrier_load(&last_message_received_cycle_)),
GPR_CLOCK_REALTIME);
json_iterator = grpc_json_create_child(
json_iterator, json, "lastMessageReceivedTimestamp",

@ -24,6 +24,7 @@
#include <grpc/grpc.h>
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/gpr/time_precise.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/map.h"
@ -111,7 +112,6 @@ class BaseNode : public RefCounted<BaseNode> {
class CallCountingHelper {
public:
CallCountingHelper();
~CallCountingHelper();
void RecordCallStarted();
void RecordCallFailed();
@ -124,24 +124,38 @@ class CallCountingHelper {
// testing peer friend.
friend class testing::CallCountingHelperPeer;
// TODO(soheil): add a proper PerCPU helper and use it here.
struct AtomicCounterData {
gpr_atm calls_started = 0;
gpr_atm calls_succeeded = 0;
gpr_atm calls_failed = 0;
gpr_atm last_call_started_millis = 0;
};
// Define the ctors so that we can use this structure in InlinedVector.
AtomicCounterData() = default;
AtomicCounterData(const AtomicCounterData& that)
: calls_started(that.calls_started.Load(MemoryOrder::RELAXED)),
calls_succeeded(that.calls_succeeded.Load(MemoryOrder::RELAXED)),
calls_failed(that.calls_failed.Load(MemoryOrder::RELAXED)),
last_call_started_cycle(
that.last_call_started_cycle.Load(MemoryOrder::RELAXED)) {}
Atomic<intptr_t> calls_started{0};
Atomic<intptr_t> calls_succeeded{0};
Atomic<intptr_t> calls_failed{0};
Atomic<gpr_cycle_counter> last_call_started_cycle{0};
// Make sure the size is exactly one cache line.
uint8_t padding[GPR_CACHELINE_SIZE - 3 * sizeof(Atomic<intptr_t>) -
sizeof(Atomic<gpr_cycle_counter>)];
} GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
struct CounterData {
intptr_t calls_started = 0;
intptr_t calls_succeeded = 0;
intptr_t calls_failed = 0;
intptr_t last_call_started_millis = 0;
gpr_cycle_counter last_call_started_cycle = 0;
};
// collects the sharded data into one CounterData struct.
void CollectData(CounterData* out);
AtomicCounterData* per_cpu_counter_data_storage_ = nullptr;
// Really zero-sized, but 0-sized arrays are illegal on MSVC.
InlinedVector<AtomicCounterData, 1> per_cpu_counter_data_storage_;
size_t num_cores_ = 0;
};
@ -281,10 +295,10 @@ class SocketNode : public BaseNode {
gpr_atm messages_sent_ = 0;
gpr_atm messages_received_ = 0;
gpr_atm keepalives_sent_ = 0;
gpr_atm last_local_stream_created_millis_ = 0;
gpr_atm last_remote_stream_created_millis_ = 0;
gpr_atm last_message_sent_millis_ = 0;
gpr_atm last_message_received_millis_ = 0;
gpr_atm last_local_stream_created_cycle_ = 0;
gpr_atm last_remote_stream_created_cycle_ = 0;
gpr_atm last_message_sent_cycle_ = 0;
gpr_atm last_message_received_cycle_ = 0;
UniquePtr<char> local_;
UniquePtr<char> remote_;
};

@ -18,61 +18,132 @@
#include <grpc/support/port_platform.h>
#if GPR_LINUX
#include <fcntl.h>
#include <unistd.h>
#endif
#include <algorithm>
#include <grpc/impl/codegen/gpr_types.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <stdio.h>
#include "src/core/lib/gpr/time_precise.h"
#ifdef GRPC_TIMERS_RDTSC
#if defined(__i386__)
static void gpr_get_cycle_counter(int64_t int* clk) {
int64_t int ret;
__asm__ volatile("rdtsc" : "=A"(ret));
*clk = ret;
#if GPR_CYCLE_COUNTER_RDTSC_32 or GPR_CYCLE_COUNTER_RDTSC_64
#if GPR_LINUX
static bool read_freq_from_kernel(double* freq) {
// Google production kernel export the frequency for us in kHz.
int fd = open("/sys/devices/system/cpu/cpu0/tsc_freq_khz", O_RDONLY);
if (fd == -1) {
return false;
}
char line[1024] = {};
char* err;
bool ret = false;
int len = read(fd, line, sizeof(line) - 1);
if (len > 0) {
const long val = strtol(line, &err, 10);
if (line[0] != '\0' && (*err == '\n' || *err == '\0')) {
*freq = val * 1e3; // Value is kHz.
ret = true;
}
}
close(fd);
return ret;
}
#endif /* GPR_LINUX */
static double cycles_per_second = 0;
static gpr_cycle_counter start_cycle;
// ----------------------------------------------------------------
#elif defined(__x86_64__) || defined(__amd64__)
static void gpr_get_cycle_counter(int64_t* clk) {
uint64_t low, high;
__asm__ volatile("rdtsc" : "=a"(low), "=d"(high));
*clk = (int64_t)(high << 32) | (int64_t)low;
static bool is_fake_clock() {
gpr_timespec start = gpr_now(GPR_CLOCK_MONOTONIC);
int64_t sum = 0;
for (int i = 0; i < 8; ++i) {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec delta = gpr_time_sub(now, start);
sum += delta.tv_sec * GPR_NS_PER_SEC + delta.tv_nsec;
}
// If the clock doesn't move even a nano after 8 tries, it's a fake one.
return sum == 0;
}
#endif
static double cycles_per_second = 0;
static int64_t start_cycle;
void gpr_precise_clock_init(void) {
time_t start;
int64_t end_cycle;
gpr_log(GPR_DEBUG, "Calibrating timers");
start = time(NULL);
while (time(NULL) == start)
;
gpr_get_cycle_counter(&start_cycle);
while (time(NULL) <= start + 10)
;
gpr_get_cycle_counter(&end_cycle);
cycles_per_second = (double)(end_cycle - start_cycle) / 10.0;
#if GPR_LINUX
if (read_freq_from_kernel(&cycles_per_second)) {
start_cycle = gpr_get_cycle_counter();
return;
}
#endif /* GPR_LINUX */
if (is_fake_clock()) {
cycles_per_second = 1;
start_cycle = 0;
return;
}
// Start from a loop of 1ms, and gradually increase the loop duration
// until we either converge or we have passed 255ms (1ms+2ms+...+128ms).
int64_t measurement_ns = GPR_NS_PER_MS;
double last_freq = -1;
bool converged = false;
for (int i = 0; i < 8 && !converged; ++i, measurement_ns *= 2) {
start_cycle = gpr_get_cycle_counter();
int64_t loop_ns;
gpr_timespec start = gpr_now(GPR_CLOCK_MONOTONIC);
do {
// TODO(soheil): Maybe sleep instead of busy polling.
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec delta = gpr_time_sub(now, start);
loop_ns = delta.tv_sec * GPR_NS_PER_SEC + delta.tv_nsec;
} while (loop_ns < measurement_ns);
gpr_cycle_counter end_cycle = gpr_get_cycle_counter();
// Frequency should be in Hz.
const double freq =
static_cast<double>(end_cycle - start_cycle) / loop_ns * GPR_NS_PER_SEC;
converged =
last_freq != -1 && (freq * 0.99 < last_freq && last_freq < freq * 1.01);
last_freq = freq;
}
cycles_per_second = last_freq;
gpr_log(GPR_DEBUG, "... cycles_per_second = %f\n", cycles_per_second);
}
void gpr_precise_clock_now(gpr_timespec* clk) {
int64_t counter;
double secs;
gpr_get_cycle_counter(&counter);
secs = (double)(counter - start_cycle) / cycles_per_second;
clk->clock_type = GPR_CLOCK_PRECISE;
clk->tv_sec = (int64_t)secs;
clk->tv_nsec = (int32_t)(1e9 * (secs - (double)clk->tv_sec));
gpr_timespec gpr_cycle_counter_to_time(gpr_cycle_counter cycles) {
double secs = static_cast<double>(cycles - start_cycle) / cycles_per_second;
gpr_timespec ts;
ts.tv_sec = static_cast<int64_t>(secs);
ts.tv_nsec = static_cast<int32_t>(GPR_NS_PER_SEC *
(secs - static_cast<double>(ts.tv_sec)));
ts.clock_type = GPR_CLOCK_PRECISE;
return ts;
}
#else /* GRPC_TIMERS_RDTSC */
void gpr_precise_clock_now(gpr_timespec* clk) {
int64_t counter = gpr_get_cycle_counter();
*clk = gpr_cycle_counter_to_time(counter);
}
#elif GPR_CYCLE_COUNTER_FALLBACK
void gpr_precise_clock_init(void) {}
gpr_cycle_counter gpr_get_cycle_counter() {
gpr_timespec ts = gpr_now(GPR_CLOCK_REALTIME);
return gpr_timespec_to_micros(ts);
}
gpr_timespec gpr_cycle_counter_to_time(gpr_cycle_counter cycles) {
gpr_timespec ts;
ts.tv_sec = cycles / GPR_US_PER_SEC;
ts.tv_nsec = (cycles - ts.tv_sec * GPR_US_PER_SEC) * GPR_NS_PER_US;
ts.clock_type = GPR_CLOCK_PRECISE;
return ts;
}
void gpr_precise_clock_now(gpr_timespec* clk) {
*clk = gpr_now(GPR_CLOCK_REALTIME);
clk->clock_type = GPR_CLOCK_PRECISE;
}
#endif /* GRPC_TIMERS_RDTSC */
#endif /* GPR_CYCLE_COUNTER_FALLBACK */

@ -21,9 +21,45 @@
#include <grpc/support/port_platform.h>
#include <grpc/impl/codegen/gpr_types.h>
#include <grpc/support/time.h>
// Depending on the platform gpr_get_cycle_counter() can have a resolution as
// low as a usec. Use other clock sources or gpr_precise_clock_now(),
// where you need high resolution clocks.
//
// Using gpr_get_cycle_counter() is preferred to using ExecCtx::Get()->Now()
// whenever possible.
#if GPR_CYCLE_COUNTER_RDTSC_32
typedef int64_t gpr_cycle_counter;
inline gpr_cycle_counter gpr_get_cycle_counter() {
int64_t ret;
__asm__ volatile("rdtsc" : "=A"(ret));
return ret;
}
#elif GPR_CYCLE_COUNTER_RDTSC_64
typedef int64_t gpr_cycle_counter;
inline gpr_cycle_counter gpr_get_cycle_counter() {
uint64_t low, high;
__asm__ volatile("rdtsc" : "=a"(low), "=d"(high));
return (high << 32) | low;
}
#elif GPR_CYCLE_COUNTER_FALLBACK
// TODO(soheil): add support for mrs on Arm.
// Real time in micros.
typedef double gpr_cycle_counter;
gpr_cycle_counter gpr_get_cycle_counter();
#else
#error Must define exactly one of \
GPR_CYCLE_COUNTER_RDTSC_32, \
GPR_CYCLE_COUNTER_RDTSC_64, or \
GPR_CYCLE_COUNTER_FALLBACK
#endif
void gpr_precise_clock_init(void);
void gpr_precise_clock_now(gpr_timespec* clk);
gpr_timespec gpr_cycle_counter_to_time(gpr_cycle_counter cycles);
#endif /* GRPC_CORE_LIB_GPR_TIME_PRECISE_H */

@ -109,9 +109,13 @@ class InlinedVector {
void reserve(size_t capacity) {
if (capacity > capacity_) {
T* new_dynamic = static_cast<T*>(gpr_malloc(sizeof(T) * capacity));
T* new_dynamic =
std::alignment_of<T>::value == 0
? static_cast<T*>(gpr_malloc(sizeof(T) * capacity))
: static_cast<T*>(gpr_malloc_aligned(
sizeof(T) * capacity, std::alignment_of<T>::value));
move_elements(data(), new_dynamic, size_);
gpr_free(dynamic_);
free_dynamic();
dynamic_ = new_dynamic;
capacity_ = capacity;
}
@ -196,7 +200,17 @@ class InlinedVector {
T& value = data()[i];
value.~T();
}
gpr_free(dynamic_);
free_dynamic();
}
void free_dynamic() {
if (dynamic_ != nullptr) {
if (std::alignment_of<T>::value == 0) {
gpr_free(dynamic_);
} else {
gpr_free_aligned(dynamic_);
}
}
}
typename std::aligned_storage<sizeof(T)>::type inline_[N];

@ -51,7 +51,8 @@ class CallCountingHelperPeer {
grpc_millis last_call_started_millis() const {
CallCountingHelper::CounterData data;
node_->CollectData(&data);
return (grpc_millis)gpr_atm_no_barrier_load(&data.last_call_started_millis);
gpr_timespec ts = gpr_cycle_counter_to_time(data.last_call_started_cycle);
return grpc_timespec_to_millis_round_up(ts);
}
private:

Loading…
Cancel
Save