Merge pull request #18937 from soheilhy/cycle-clock

Expose cycle counter and use it in /channelz.
pull/19879/head
Soheil Hassas Yeganeh 6 years ago committed by GitHub
commit 069403c1f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      include/grpc/impl/codegen/port_platform.h
  2. 110
      src/core/lib/channel/channelz.cc
  3. 38
      src/core/lib/channel/channelz.h
  4. 143
      src/core/lib/gpr/time_precise.cc
  5. 36
      src/core/lib/gpr/time_precise.h
  6. 20
      src/core/lib/gprpp/inlined_vector.h
  7. 3
      test/core/channel/channelz_test.cc

@ -463,6 +463,23 @@ typedef unsigned __int64 uint64_t;
#include <stdint.h>
#endif /* _MSC_VER */
/* Type of cycle clock implementation */
#ifdef GPR_LINUX
/* Disable cycle clock by default.
TODO(soheil): enable when we support fallback for unstable cycle clocks.
#if defined(__i386__)
#define GPR_CYCLE_COUNTER_RDTSC_32 1
#elif defined(__x86_64__) || defined(__amd64__)
#define GPR_CYCLE_COUNTER_RDTSC_64 1
#else
#define GPR_CYCLE_COUNTER_FALLBACK 1
#endif
*/
#define GPR_CYCLE_COUNTER_FALLBACK 1
#else
#define GPR_CYCLE_COUNTER_FALLBACK 1
#endif /* GPR_LINUX */
/* Cache line alignment */
#ifndef GPR_CACHELINE_SIZE_LOG
#if defined(__i386__) || defined(__x86_64__)

@ -107,51 +107,45 @@ char* BaseNode::RenderJsonString() {
CallCountingHelper::CallCountingHelper() {
num_cores_ = GPR_MAX(1, gpr_cpu_num_cores());
per_cpu_counter_data_storage_ = static_cast<AtomicCounterData*>(
gpr_zalloc(sizeof(AtomicCounterData) * num_cores_));
}
CallCountingHelper::~CallCountingHelper() {
gpr_free(per_cpu_counter_data_storage_);
per_cpu_counter_data_storage_.reserve(num_cores_);
for (size_t i = 0; i < num_cores_; ++i) {
per_cpu_counter_data_storage_.emplace_back();
}
}
void CallCountingHelper::RecordCallStarted() {
gpr_atm_no_barrier_fetch_add(
&per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()]
.calls_started,
static_cast<gpr_atm>(1));
gpr_atm_no_barrier_store(
&per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()]
.last_call_started_millis,
(gpr_atm)ExecCtx::Get()->Now());
AtomicCounterData& data =
per_cpu_counter_data_storage_[ExecCtx::Get()->starting_cpu()];
data.calls_started.FetchAdd(1, MemoryOrder::RELAXED);
data.last_call_started_cycle.Store(gpr_get_cycle_counter(),
MemoryOrder::RELAXED);
}
void CallCountingHelper::RecordCallFailed() {
gpr_atm_no_barrier_fetch_add(
&per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()]
.calls_failed,
static_cast<gpr_atm>(1));
per_cpu_counter_data_storage_[ExecCtx::Get()->starting_cpu()]
.calls_failed.FetchAdd(1, MemoryOrder::RELAXED);
}
void CallCountingHelper::RecordCallSucceeded() {
gpr_atm_no_barrier_fetch_add(
&per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()]
.calls_succeeded,
static_cast<gpr_atm>(1));
per_cpu_counter_data_storage_[ExecCtx::Get()->starting_cpu()]
.calls_succeeded.FetchAdd(1, MemoryOrder::RELAXED);
}
void CallCountingHelper::CollectData(CounterData* out) {
for (size_t core = 0; core < num_cores_; ++core) {
out->calls_started += gpr_atm_no_barrier_load(
&per_cpu_counter_data_storage_[core].calls_started);
out->calls_succeeded += gpr_atm_no_barrier_load(
&per_cpu_counter_data_storage_[core].calls_succeeded);
out->calls_failed += gpr_atm_no_barrier_load(
&per_cpu_counter_data_storage_[core].calls_failed);
gpr_atm last_call = gpr_atm_no_barrier_load(
&per_cpu_counter_data_storage_[core].last_call_started_millis);
if (last_call > out->last_call_started_millis) {
out->last_call_started_millis = last_call;
AtomicCounterData& data = per_cpu_counter_data_storage_[core];
out->calls_started += data.calls_started.Load(MemoryOrder::RELAXED);
out->calls_succeeded +=
per_cpu_counter_data_storage_[core].calls_succeeded.Load(
MemoryOrder::RELAXED);
out->calls_failed += per_cpu_counter_data_storage_[core].calls_failed.Load(
MemoryOrder::RELAXED);
const gpr_cycle_counter last_call =
per_cpu_counter_data_storage_[core].last_call_started_cycle.Load(
MemoryOrder::RELAXED);
if (last_call > out->last_call_started_cycle) {
out->last_call_started_cycle = last_call;
}
}
}
@ -173,8 +167,9 @@ void CallCountingHelper::PopulateCallCounts(grpc_json* json) {
json, json_iterator, "callsFailed", data.calls_failed);
}
if (data.calls_started != 0) {
gpr_timespec ts = grpc_millis_to_timespec(data.last_call_started_millis,
GPR_CLOCK_REALTIME);
gpr_timespec ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(data.last_call_started_cycle),
GPR_CLOCK_REALTIME);
json_iterator =
grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp",
gpr_format_timespec(ts), GRPC_JSON_STRING, true);
@ -493,26 +488,25 @@ SocketNode::SocketNode(UniquePtr<char> local, UniquePtr<char> remote,
void SocketNode::RecordStreamStartedFromLocal() {
gpr_atm_no_barrier_fetch_add(&streams_started_, static_cast<gpr_atm>(1));
gpr_atm_no_barrier_store(&last_local_stream_created_millis_,
(gpr_atm)ExecCtx::Get()->Now());
gpr_atm_no_barrier_store(&last_local_stream_created_cycle_,
gpr_get_cycle_counter());
}
void SocketNode::RecordStreamStartedFromRemote() {
gpr_atm_no_barrier_fetch_add(&streams_started_, static_cast<gpr_atm>(1));
gpr_atm_no_barrier_store(&last_remote_stream_created_millis_,
(gpr_atm)ExecCtx::Get()->Now());
gpr_atm_no_barrier_store(&last_remote_stream_created_cycle_,
gpr_get_cycle_counter());
}
void SocketNode::RecordMessagesSent(uint32_t num_sent) {
gpr_atm_no_barrier_fetch_add(&messages_sent_, static_cast<gpr_atm>(num_sent));
gpr_atm_no_barrier_store(&last_message_sent_millis_,
(gpr_atm)ExecCtx::Get()->Now());
gpr_atm_no_barrier_store(&last_message_sent_cycle_, gpr_get_cycle_counter());
}
void SocketNode::RecordMessageReceived() {
gpr_atm_no_barrier_fetch_add(&messages_received_, static_cast<gpr_atm>(1));
gpr_atm_no_barrier_store(&last_message_received_millis_,
(gpr_atm)ExecCtx::Get()->Now());
gpr_atm_no_barrier_store(&last_message_received_cycle_,
gpr_get_cycle_counter());
}
grpc_json* SocketNode::RenderJson() {
@ -545,20 +539,22 @@ grpc_json* SocketNode::RenderJson() {
if (streams_started != 0) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "streamsStarted", streams_started);
gpr_atm last_local_stream_created_millis =
gpr_atm_no_barrier_load(&last_local_stream_created_millis_);
if (last_local_stream_created_millis != 0) {
ts = grpc_millis_to_timespec(last_local_stream_created_millis,
GPR_CLOCK_REALTIME);
gpr_cycle_counter last_local_stream_created_cycle =
gpr_atm_no_barrier_load(&last_local_stream_created_cycle_);
if (last_local_stream_created_cycle != 0) {
ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(last_local_stream_created_cycle),
GPR_CLOCK_REALTIME);
json_iterator = grpc_json_create_child(
json_iterator, json, "lastLocalStreamCreatedTimestamp",
gpr_format_timespec(ts), GRPC_JSON_STRING, true);
}
gpr_atm last_remote_stream_created_millis =
gpr_atm_no_barrier_load(&last_remote_stream_created_millis_);
if (last_remote_stream_created_millis != 0) {
ts = grpc_millis_to_timespec(last_remote_stream_created_millis,
GPR_CLOCK_REALTIME);
gpr_cycle_counter last_remote_stream_created_cycle =
gpr_atm_no_barrier_load(&last_remote_stream_created_cycle_);
if (last_remote_stream_created_cycle != 0) {
ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(last_remote_stream_created_cycle),
GPR_CLOCK_REALTIME);
json_iterator = grpc_json_create_child(
json_iterator, json, "lastRemoteStreamCreatedTimestamp",
gpr_format_timespec(ts), GRPC_JSON_STRING, true);
@ -578,8 +574,9 @@ grpc_json* SocketNode::RenderJson() {
if (messages_sent != 0) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "messagesSent", messages_sent);
ts = grpc_millis_to_timespec(
gpr_atm_no_barrier_load(&last_message_sent_millis_),
ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(
gpr_atm_no_barrier_load(&last_message_sent_cycle_)),
GPR_CLOCK_REALTIME);
json_iterator =
grpc_json_create_child(json_iterator, json, "lastMessageSentTimestamp",
@ -589,8 +586,9 @@ grpc_json* SocketNode::RenderJson() {
if (messages_received != 0) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "messagesReceived", messages_received);
ts = grpc_millis_to_timespec(
gpr_atm_no_barrier_load(&last_message_received_millis_),
ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(
gpr_atm_no_barrier_load(&last_message_received_cycle_)),
GPR_CLOCK_REALTIME);
json_iterator = grpc_json_create_child(
json_iterator, json, "lastMessageReceivedTimestamp",

@ -24,6 +24,7 @@
#include <grpc/grpc.h>
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/gpr/time_precise.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/map.h"
@ -111,7 +112,6 @@ class BaseNode : public RefCounted<BaseNode> {
class CallCountingHelper {
public:
CallCountingHelper();
~CallCountingHelper();
void RecordCallStarted();
void RecordCallFailed();
@ -124,24 +124,38 @@ class CallCountingHelper {
// testing peer friend.
friend class testing::CallCountingHelperPeer;
// TODO(soheil): add a proper PerCPU helper and use it here.
struct AtomicCounterData {
gpr_atm calls_started = 0;
gpr_atm calls_succeeded = 0;
gpr_atm calls_failed = 0;
gpr_atm last_call_started_millis = 0;
};
// Define the ctors so that we can use this structure in InlinedVector.
AtomicCounterData() = default;
AtomicCounterData(const AtomicCounterData& that)
: calls_started(that.calls_started.Load(MemoryOrder::RELAXED)),
calls_succeeded(that.calls_succeeded.Load(MemoryOrder::RELAXED)),
calls_failed(that.calls_failed.Load(MemoryOrder::RELAXED)),
last_call_started_cycle(
that.last_call_started_cycle.Load(MemoryOrder::RELAXED)) {}
Atomic<intptr_t> calls_started{0};
Atomic<intptr_t> calls_succeeded{0};
Atomic<intptr_t> calls_failed{0};
Atomic<gpr_cycle_counter> last_call_started_cycle{0};
// Make sure the size is exactly one cache line.
uint8_t padding[GPR_CACHELINE_SIZE - 3 * sizeof(Atomic<intptr_t>) -
sizeof(Atomic<gpr_cycle_counter>)];
} GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
struct CounterData {
intptr_t calls_started = 0;
intptr_t calls_succeeded = 0;
intptr_t calls_failed = 0;
intptr_t last_call_started_millis = 0;
gpr_cycle_counter last_call_started_cycle = 0;
};
// collects the sharded data into one CounterData struct.
void CollectData(CounterData* out);
AtomicCounterData* per_cpu_counter_data_storage_ = nullptr;
// Really zero-sized, but 0-sized arrays are illegal on MSVC.
InlinedVector<AtomicCounterData, 1> per_cpu_counter_data_storage_;
size_t num_cores_ = 0;
};
@ -281,10 +295,10 @@ class SocketNode : public BaseNode {
gpr_atm messages_sent_ = 0;
gpr_atm messages_received_ = 0;
gpr_atm keepalives_sent_ = 0;
gpr_atm last_local_stream_created_millis_ = 0;
gpr_atm last_remote_stream_created_millis_ = 0;
gpr_atm last_message_sent_millis_ = 0;
gpr_atm last_message_received_millis_ = 0;
gpr_atm last_local_stream_created_cycle_ = 0;
gpr_atm last_remote_stream_created_cycle_ = 0;
gpr_atm last_message_sent_cycle_ = 0;
gpr_atm last_message_received_cycle_ = 0;
UniquePtr<char> local_;
UniquePtr<char> remote_;
};

@ -18,61 +18,132 @@
#include <grpc/support/port_platform.h>
#if GPR_LINUX
#include <fcntl.h>
#include <unistd.h>
#endif
#include <algorithm>
#include <grpc/impl/codegen/gpr_types.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <stdio.h>
#include "src/core/lib/gpr/time_precise.h"
#ifdef GRPC_TIMERS_RDTSC
#if defined(__i386__)
static void gpr_get_cycle_counter(int64_t int* clk) {
int64_t int ret;
__asm__ volatile("rdtsc" : "=A"(ret));
*clk = ret;
#if GPR_CYCLE_COUNTER_RDTSC_32 or GPR_CYCLE_COUNTER_RDTSC_64
#if GPR_LINUX
static bool read_freq_from_kernel(double* freq) {
// Google production kernel export the frequency for us in kHz.
int fd = open("/sys/devices/system/cpu/cpu0/tsc_freq_khz", O_RDONLY);
if (fd == -1) {
return false;
}
char line[1024] = {};
char* err;
bool ret = false;
int len = read(fd, line, sizeof(line) - 1);
if (len > 0) {
const long val = strtol(line, &err, 10);
if (line[0] != '\0' && (*err == '\n' || *err == '\0')) {
*freq = val * 1e3; // Value is kHz.
ret = true;
}
}
close(fd);
return ret;
}
#endif /* GPR_LINUX */
static double cycles_per_second = 0;
static gpr_cycle_counter start_cycle;
// ----------------------------------------------------------------
#elif defined(__x86_64__) || defined(__amd64__)
static void gpr_get_cycle_counter(int64_t* clk) {
uint64_t low, high;
__asm__ volatile("rdtsc" : "=a"(low), "=d"(high));
*clk = (int64_t)(high << 32) | (int64_t)low;
static bool is_fake_clock() {
gpr_timespec start = gpr_now(GPR_CLOCK_MONOTONIC);
int64_t sum = 0;
for (int i = 0; i < 8; ++i) {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec delta = gpr_time_sub(now, start);
sum += delta.tv_sec * GPR_NS_PER_SEC + delta.tv_nsec;
}
// If the clock doesn't move even a nano after 8 tries, it's a fake one.
return sum == 0;
}
#endif
static double cycles_per_second = 0;
static int64_t start_cycle;
void gpr_precise_clock_init(void) {
time_t start;
int64_t end_cycle;
gpr_log(GPR_DEBUG, "Calibrating timers");
start = time(NULL);
while (time(NULL) == start)
;
gpr_get_cycle_counter(&start_cycle);
while (time(NULL) <= start + 10)
;
gpr_get_cycle_counter(&end_cycle);
cycles_per_second = (double)(end_cycle - start_cycle) / 10.0;
#if GPR_LINUX
if (read_freq_from_kernel(&cycles_per_second)) {
start_cycle = gpr_get_cycle_counter();
return;
}
#endif /* GPR_LINUX */
if (is_fake_clock()) {
cycles_per_second = 1;
start_cycle = 0;
return;
}
// Start from a loop of 1ms, and gradually increase the loop duration
// until we either converge or we have passed 255ms (1ms+2ms+...+128ms).
int64_t measurement_ns = GPR_NS_PER_MS;
double last_freq = -1;
bool converged = false;
for (int i = 0; i < 8 && !converged; ++i, measurement_ns *= 2) {
start_cycle = gpr_get_cycle_counter();
int64_t loop_ns;
gpr_timespec start = gpr_now(GPR_CLOCK_MONOTONIC);
do {
// TODO(soheil): Maybe sleep instead of busy polling.
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec delta = gpr_time_sub(now, start);
loop_ns = delta.tv_sec * GPR_NS_PER_SEC + delta.tv_nsec;
} while (loop_ns < measurement_ns);
gpr_cycle_counter end_cycle = gpr_get_cycle_counter();
// Frequency should be in Hz.
const double freq =
static_cast<double>(end_cycle - start_cycle) / loop_ns * GPR_NS_PER_SEC;
converged =
last_freq != -1 && (freq * 0.99 < last_freq && last_freq < freq * 1.01);
last_freq = freq;
}
cycles_per_second = last_freq;
gpr_log(GPR_DEBUG, "... cycles_per_second = %f\n", cycles_per_second);
}
void gpr_precise_clock_now(gpr_timespec* clk) {
int64_t counter;
double secs;
gpr_get_cycle_counter(&counter);
secs = (double)(counter - start_cycle) / cycles_per_second;
clk->clock_type = GPR_CLOCK_PRECISE;
clk->tv_sec = (int64_t)secs;
clk->tv_nsec = (int32_t)(1e9 * (secs - (double)clk->tv_sec));
gpr_timespec gpr_cycle_counter_to_time(gpr_cycle_counter cycles) {
double secs = static_cast<double>(cycles - start_cycle) / cycles_per_second;
gpr_timespec ts;
ts.tv_sec = static_cast<int64_t>(secs);
ts.tv_nsec = static_cast<int32_t>(GPR_NS_PER_SEC *
(secs - static_cast<double>(ts.tv_sec)));
ts.clock_type = GPR_CLOCK_PRECISE;
return ts;
}
#else /* GRPC_TIMERS_RDTSC */
void gpr_precise_clock_now(gpr_timespec* clk) {
int64_t counter = gpr_get_cycle_counter();
*clk = gpr_cycle_counter_to_time(counter);
}
#elif GPR_CYCLE_COUNTER_FALLBACK
void gpr_precise_clock_init(void) {}
gpr_cycle_counter gpr_get_cycle_counter() {
gpr_timespec ts = gpr_now(GPR_CLOCK_REALTIME);
return gpr_timespec_to_micros(ts);
}
gpr_timespec gpr_cycle_counter_to_time(gpr_cycle_counter cycles) {
gpr_timespec ts;
ts.tv_sec = cycles / GPR_US_PER_SEC;
ts.tv_nsec = (cycles - ts.tv_sec * GPR_US_PER_SEC) * GPR_NS_PER_US;
ts.clock_type = GPR_CLOCK_PRECISE;
return ts;
}
void gpr_precise_clock_now(gpr_timespec* clk) {
*clk = gpr_now(GPR_CLOCK_REALTIME);
clk->clock_type = GPR_CLOCK_PRECISE;
}
#endif /* GRPC_TIMERS_RDTSC */
#endif /* GPR_CYCLE_COUNTER_FALLBACK */

@ -21,9 +21,45 @@
#include <grpc/support/port_platform.h>
#include <grpc/impl/codegen/gpr_types.h>
#include <grpc/support/time.h>
// Depending on the platform gpr_get_cycle_counter() can have a resolution as
// low as a usec. Use other clock sources or gpr_precise_clock_now(),
// where you need high resolution clocks.
//
// Using gpr_get_cycle_counter() is preferred to using ExecCtx::Get()->Now()
// whenever possible.
#if GPR_CYCLE_COUNTER_RDTSC_32
typedef int64_t gpr_cycle_counter;
inline gpr_cycle_counter gpr_get_cycle_counter() {
int64_t ret;
__asm__ volatile("rdtsc" : "=A"(ret));
return ret;
}
#elif GPR_CYCLE_COUNTER_RDTSC_64
typedef int64_t gpr_cycle_counter;
inline gpr_cycle_counter gpr_get_cycle_counter() {
uint64_t low, high;
__asm__ volatile("rdtsc" : "=a"(low), "=d"(high));
return (high << 32) | low;
}
#elif GPR_CYCLE_COUNTER_FALLBACK
// TODO(soheil): add support for mrs on Arm.
// Real time in micros.
typedef double gpr_cycle_counter;
gpr_cycle_counter gpr_get_cycle_counter();
#else
#error Must define exactly one of \
GPR_CYCLE_COUNTER_RDTSC_32, \
GPR_CYCLE_COUNTER_RDTSC_64, or \
GPR_CYCLE_COUNTER_FALLBACK
#endif
void gpr_precise_clock_init(void);
void gpr_precise_clock_now(gpr_timespec* clk);
gpr_timespec gpr_cycle_counter_to_time(gpr_cycle_counter cycles);
#endif /* GRPC_CORE_LIB_GPR_TIME_PRECISE_H */

@ -109,9 +109,13 @@ class InlinedVector {
void reserve(size_t capacity) {
if (capacity > capacity_) {
T* new_dynamic = static_cast<T*>(gpr_malloc(sizeof(T) * capacity));
T* new_dynamic =
std::alignment_of<T>::value == 0
? static_cast<T*>(gpr_malloc(sizeof(T) * capacity))
: static_cast<T*>(gpr_malloc_aligned(
sizeof(T) * capacity, std::alignment_of<T>::value));
move_elements(data(), new_dynamic, size_);
gpr_free(dynamic_);
free_dynamic();
dynamic_ = new_dynamic;
capacity_ = capacity;
}
@ -196,7 +200,17 @@ class InlinedVector {
T& value = data()[i];
value.~T();
}
gpr_free(dynamic_);
free_dynamic();
}
void free_dynamic() {
if (dynamic_ != nullptr) {
if (std::alignment_of<T>::value == 0) {
gpr_free(dynamic_);
} else {
gpr_free_aligned(dynamic_);
}
}
}
typename std::aligned_storage<sizeof(T)>::type inline_[N];

@ -51,7 +51,8 @@ class CallCountingHelperPeer {
grpc_millis last_call_started_millis() const {
CallCountingHelper::CounterData data;
node_->CollectData(&data);
return (grpc_millis)gpr_atm_no_barrier_load(&data.last_call_started_millis);
gpr_timespec ts = gpr_cycle_counter_to_time(data.last_call_started_cycle);
return grpc_timespec_to_millis_round_up(ts);
}
private:

Loading…
Cancel
Save