diff --git a/include/grpc/impl/codegen/port_platform.h b/include/grpc/impl/codegen/port_platform.h index c0ca9f43716..dd49a66fe25 100644 --- a/include/grpc/impl/codegen/port_platform.h +++ b/include/grpc/impl/codegen/port_platform.h @@ -463,6 +463,23 @@ typedef unsigned __int64 uint64_t; #include #endif /* _MSC_VER */ +/* Type of cycle clock implementation */ +#ifdef GPR_LINUX +/* Disable cycle clock by default. + TODO(soheil): enable when we support fallback for unstable cycle clocks. +#if defined(__i386__) +#define GPR_CYCLE_COUNTER_RDTSC_32 1 +#elif defined(__x86_64__) || defined(__amd64__) +#define GPR_CYCLE_COUNTER_RDTSC_64 1 +#else +#define GPR_CYCLE_COUNTER_FALLBACK 1 +#endif +*/ +#define GPR_CYCLE_COUNTER_FALLBACK 1 +#else +#define GPR_CYCLE_COUNTER_FALLBACK 1 +#endif /* GPR_LINUX */ + /* Cache line alignment */ #ifndef GPR_CACHELINE_SIZE_LOG #if defined(__i386__) || defined(__x86_64__) diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc index 325131b8439..fb913beb0f0 100644 --- a/src/core/lib/channel/channelz.cc +++ b/src/core/lib/channel/channelz.cc @@ -107,51 +107,45 @@ char* BaseNode::RenderJsonString() { CallCountingHelper::CallCountingHelper() { num_cores_ = GPR_MAX(1, gpr_cpu_num_cores()); - per_cpu_counter_data_storage_ = static_cast( - gpr_zalloc(sizeof(AtomicCounterData) * num_cores_)); -} - -CallCountingHelper::~CallCountingHelper() { - gpr_free(per_cpu_counter_data_storage_); + per_cpu_counter_data_storage_.reserve(num_cores_); + for (size_t i = 0; i < num_cores_; ++i) { + per_cpu_counter_data_storage_.emplace_back(); + } } void CallCountingHelper::RecordCallStarted() { - gpr_atm_no_barrier_fetch_add( - &per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()] - .calls_started, - static_cast(1)); - gpr_atm_no_barrier_store( - &per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()] - .last_call_started_millis, - (gpr_atm)ExecCtx::Get()->Now()); + AtomicCounterData& data = + per_cpu_counter_data_storage_[ExecCtx::Get()->starting_cpu()]; + data.calls_started.FetchAdd(1, MemoryOrder::RELAXED); + data.last_call_started_cycle.Store(gpr_get_cycle_counter(), + MemoryOrder::RELAXED); } void CallCountingHelper::RecordCallFailed() { - gpr_atm_no_barrier_fetch_add( - &per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()] - .calls_failed, - static_cast(1)); + per_cpu_counter_data_storage_[ExecCtx::Get()->starting_cpu()] + .calls_failed.FetchAdd(1, MemoryOrder::RELAXED); } void CallCountingHelper::RecordCallSucceeded() { - gpr_atm_no_barrier_fetch_add( - &per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()] - .calls_succeeded, - static_cast(1)); + per_cpu_counter_data_storage_[ExecCtx::Get()->starting_cpu()] + .calls_succeeded.FetchAdd(1, MemoryOrder::RELAXED); } void CallCountingHelper::CollectData(CounterData* out) { for (size_t core = 0; core < num_cores_; ++core) { - out->calls_started += gpr_atm_no_barrier_load( - &per_cpu_counter_data_storage_[core].calls_started); - out->calls_succeeded += gpr_atm_no_barrier_load( - &per_cpu_counter_data_storage_[core].calls_succeeded); - out->calls_failed += gpr_atm_no_barrier_load( - &per_cpu_counter_data_storage_[core].calls_failed); - gpr_atm last_call = gpr_atm_no_barrier_load( - &per_cpu_counter_data_storage_[core].last_call_started_millis); - if (last_call > out->last_call_started_millis) { - out->last_call_started_millis = last_call; + AtomicCounterData& data = per_cpu_counter_data_storage_[core]; + + out->calls_started += data.calls_started.Load(MemoryOrder::RELAXED); + out->calls_succeeded += + per_cpu_counter_data_storage_[core].calls_succeeded.Load( + MemoryOrder::RELAXED); + out->calls_failed += per_cpu_counter_data_storage_[core].calls_failed.Load( + MemoryOrder::RELAXED); + const gpr_cycle_counter last_call = + per_cpu_counter_data_storage_[core].last_call_started_cycle.Load( + MemoryOrder::RELAXED); + if (last_call > out->last_call_started_cycle) { + out->last_call_started_cycle = last_call; } } } @@ -173,8 +167,9 @@ void CallCountingHelper::PopulateCallCounts(grpc_json* json) { json, json_iterator, "callsFailed", data.calls_failed); } if (data.calls_started != 0) { - gpr_timespec ts = grpc_millis_to_timespec(data.last_call_started_millis, - GPR_CLOCK_REALTIME); + gpr_timespec ts = gpr_convert_clock_type( + gpr_cycle_counter_to_time(data.last_call_started_cycle), + GPR_CLOCK_REALTIME); json_iterator = grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp", gpr_format_timespec(ts), GRPC_JSON_STRING, true); @@ -493,26 +488,25 @@ SocketNode::SocketNode(UniquePtr local, UniquePtr remote, void SocketNode::RecordStreamStartedFromLocal() { gpr_atm_no_barrier_fetch_add(&streams_started_, static_cast(1)); - gpr_atm_no_barrier_store(&last_local_stream_created_millis_, - (gpr_atm)ExecCtx::Get()->Now()); + gpr_atm_no_barrier_store(&last_local_stream_created_cycle_, + gpr_get_cycle_counter()); } void SocketNode::RecordStreamStartedFromRemote() { gpr_atm_no_barrier_fetch_add(&streams_started_, static_cast(1)); - gpr_atm_no_barrier_store(&last_remote_stream_created_millis_, - (gpr_atm)ExecCtx::Get()->Now()); + gpr_atm_no_barrier_store(&last_remote_stream_created_cycle_, + gpr_get_cycle_counter()); } void SocketNode::RecordMessagesSent(uint32_t num_sent) { gpr_atm_no_barrier_fetch_add(&messages_sent_, static_cast(num_sent)); - gpr_atm_no_barrier_store(&last_message_sent_millis_, - (gpr_atm)ExecCtx::Get()->Now()); + gpr_atm_no_barrier_store(&last_message_sent_cycle_, gpr_get_cycle_counter()); } void SocketNode::RecordMessageReceived() { gpr_atm_no_barrier_fetch_add(&messages_received_, static_cast(1)); - gpr_atm_no_barrier_store(&last_message_received_millis_, - (gpr_atm)ExecCtx::Get()->Now()); + gpr_atm_no_barrier_store(&last_message_received_cycle_, + gpr_get_cycle_counter()); } grpc_json* SocketNode::RenderJson() { @@ -545,20 +539,22 @@ grpc_json* SocketNode::RenderJson() { if (streams_started != 0) { json_iterator = grpc_json_add_number_string_child( json, json_iterator, "streamsStarted", streams_started); - gpr_atm last_local_stream_created_millis = - gpr_atm_no_barrier_load(&last_local_stream_created_millis_); - if (last_local_stream_created_millis != 0) { - ts = grpc_millis_to_timespec(last_local_stream_created_millis, - GPR_CLOCK_REALTIME); + gpr_cycle_counter last_local_stream_created_cycle = + gpr_atm_no_barrier_load(&last_local_stream_created_cycle_); + if (last_local_stream_created_cycle != 0) { + ts = gpr_convert_clock_type( + gpr_cycle_counter_to_time(last_local_stream_created_cycle), + GPR_CLOCK_REALTIME); json_iterator = grpc_json_create_child( json_iterator, json, "lastLocalStreamCreatedTimestamp", gpr_format_timespec(ts), GRPC_JSON_STRING, true); } - gpr_atm last_remote_stream_created_millis = - gpr_atm_no_barrier_load(&last_remote_stream_created_millis_); - if (last_remote_stream_created_millis != 0) { - ts = grpc_millis_to_timespec(last_remote_stream_created_millis, - GPR_CLOCK_REALTIME); + gpr_cycle_counter last_remote_stream_created_cycle = + gpr_atm_no_barrier_load(&last_remote_stream_created_cycle_); + if (last_remote_stream_created_cycle != 0) { + ts = gpr_convert_clock_type( + gpr_cycle_counter_to_time(last_remote_stream_created_cycle), + GPR_CLOCK_REALTIME); json_iterator = grpc_json_create_child( json_iterator, json, "lastRemoteStreamCreatedTimestamp", gpr_format_timespec(ts), GRPC_JSON_STRING, true); @@ -578,8 +574,9 @@ grpc_json* SocketNode::RenderJson() { if (messages_sent != 0) { json_iterator = grpc_json_add_number_string_child( json, json_iterator, "messagesSent", messages_sent); - ts = grpc_millis_to_timespec( - gpr_atm_no_barrier_load(&last_message_sent_millis_), + ts = gpr_convert_clock_type( + gpr_cycle_counter_to_time( + gpr_atm_no_barrier_load(&last_message_sent_cycle_)), GPR_CLOCK_REALTIME); json_iterator = grpc_json_create_child(json_iterator, json, "lastMessageSentTimestamp", @@ -589,8 +586,9 @@ grpc_json* SocketNode::RenderJson() { if (messages_received != 0) { json_iterator = grpc_json_add_number_string_child( json, json_iterator, "messagesReceived", messages_received); - ts = grpc_millis_to_timespec( - gpr_atm_no_barrier_load(&last_message_received_millis_), + ts = gpr_convert_clock_type( + gpr_cycle_counter_to_time( + gpr_atm_no_barrier_load(&last_message_received_cycle_)), GPR_CLOCK_REALTIME); json_iterator = grpc_json_create_child( json_iterator, json, "lastMessageReceivedTimestamp", diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h index e1af701834d..2561bff807e 100644 --- a/src/core/lib/channel/channelz.h +++ b/src/core/lib/channel/channelz.h @@ -24,6 +24,7 @@ #include #include "src/core/lib/channel/channel_trace.h" +#include "src/core/lib/gpr/time_precise.h" #include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/map.h" @@ -111,7 +112,6 @@ class BaseNode : public RefCounted { class CallCountingHelper { public: CallCountingHelper(); - ~CallCountingHelper(); void RecordCallStarted(); void RecordCallFailed(); @@ -124,24 +124,38 @@ class CallCountingHelper { // testing peer friend. friend class testing::CallCountingHelperPeer; + // TODO(soheil): add a proper PerCPU helper and use it here. struct AtomicCounterData { - gpr_atm calls_started = 0; - gpr_atm calls_succeeded = 0; - gpr_atm calls_failed = 0; - gpr_atm last_call_started_millis = 0; - }; + // Define the ctors so that we can use this structure in InlinedVector. + AtomicCounterData() = default; + AtomicCounterData(const AtomicCounterData& that) + : calls_started(that.calls_started.Load(MemoryOrder::RELAXED)), + calls_succeeded(that.calls_succeeded.Load(MemoryOrder::RELAXED)), + calls_failed(that.calls_failed.Load(MemoryOrder::RELAXED)), + last_call_started_cycle( + that.last_call_started_cycle.Load(MemoryOrder::RELAXED)) {} + + Atomic calls_started{0}; + Atomic calls_succeeded{0}; + Atomic calls_failed{0}; + Atomic last_call_started_cycle{0}; + // Make sure the size is exactly one cache line. + uint8_t padding[GPR_CACHELINE_SIZE - 3 * sizeof(Atomic) - + sizeof(Atomic)]; + } GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE); struct CounterData { intptr_t calls_started = 0; intptr_t calls_succeeded = 0; intptr_t calls_failed = 0; - intptr_t last_call_started_millis = 0; + gpr_cycle_counter last_call_started_cycle = 0; }; // collects the sharded data into one CounterData struct. void CollectData(CounterData* out); - AtomicCounterData* per_cpu_counter_data_storage_ = nullptr; + // Really zero-sized, but 0-sized arrays are illegal on MSVC. + InlinedVector per_cpu_counter_data_storage_; size_t num_cores_ = 0; }; @@ -281,10 +295,10 @@ class SocketNode : public BaseNode { gpr_atm messages_sent_ = 0; gpr_atm messages_received_ = 0; gpr_atm keepalives_sent_ = 0; - gpr_atm last_local_stream_created_millis_ = 0; - gpr_atm last_remote_stream_created_millis_ = 0; - gpr_atm last_message_sent_millis_ = 0; - gpr_atm last_message_received_millis_ = 0; + gpr_atm last_local_stream_created_cycle_ = 0; + gpr_atm last_remote_stream_created_cycle_ = 0; + gpr_atm last_message_sent_cycle_ = 0; + gpr_atm last_message_received_cycle_ = 0; UniquePtr local_; UniquePtr remote_; }; diff --git a/src/core/lib/gpr/time_precise.cc b/src/core/lib/gpr/time_precise.cc index 1b34fd7eb1b..9d11c66831c 100644 --- a/src/core/lib/gpr/time_precise.cc +++ b/src/core/lib/gpr/time_precise.cc @@ -18,61 +18,132 @@ #include +#if GPR_LINUX +#include +#include +#endif + +#include + +#include #include #include -#include #include "src/core/lib/gpr/time_precise.h" -#ifdef GRPC_TIMERS_RDTSC -#if defined(__i386__) -static void gpr_get_cycle_counter(int64_t int* clk) { - int64_t int ret; - __asm__ volatile("rdtsc" : "=A"(ret)); - *clk = ret; +#if GPR_CYCLE_COUNTER_RDTSC_32 or GPR_CYCLE_COUNTER_RDTSC_64 +#if GPR_LINUX +static bool read_freq_from_kernel(double* freq) { + // Google production kernel export the frequency for us in kHz. + int fd = open("/sys/devices/system/cpu/cpu0/tsc_freq_khz", O_RDONLY); + if (fd == -1) { + return false; + } + char line[1024] = {}; + char* err; + bool ret = false; + int len = read(fd, line, sizeof(line) - 1); + if (len > 0) { + const long val = strtol(line, &err, 10); + if (line[0] != '\0' && (*err == '\n' || *err == '\0')) { + *freq = val * 1e3; // Value is kHz. + ret = true; + } + } + close(fd); + return ret; } +#endif /* GPR_LINUX */ + +static double cycles_per_second = 0; +static gpr_cycle_counter start_cycle; -// ---------------------------------------------------------------- -#elif defined(__x86_64__) || defined(__amd64__) -static void gpr_get_cycle_counter(int64_t* clk) { - uint64_t low, high; - __asm__ volatile("rdtsc" : "=a"(low), "=d"(high)); - *clk = (int64_t)(high << 32) | (int64_t)low; +static bool is_fake_clock() { + gpr_timespec start = gpr_now(GPR_CLOCK_MONOTONIC); + int64_t sum = 0; + for (int i = 0; i < 8; ++i) { + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec delta = gpr_time_sub(now, start); + sum += delta.tv_sec * GPR_NS_PER_SEC + delta.tv_nsec; + } + // If the clock doesn't move even a nano after 8 tries, it's a fake one. + return sum == 0; } -#endif -static double cycles_per_second = 0; -static int64_t start_cycle; void gpr_precise_clock_init(void) { - time_t start; - int64_t end_cycle; gpr_log(GPR_DEBUG, "Calibrating timers"); - start = time(NULL); - while (time(NULL) == start) - ; - gpr_get_cycle_counter(&start_cycle); - while (time(NULL) <= start + 10) - ; - gpr_get_cycle_counter(&end_cycle); - cycles_per_second = (double)(end_cycle - start_cycle) / 10.0; + +#if GPR_LINUX + if (read_freq_from_kernel(&cycles_per_second)) { + start_cycle = gpr_get_cycle_counter(); + return; + } +#endif /* GPR_LINUX */ + + if (is_fake_clock()) { + cycles_per_second = 1; + start_cycle = 0; + return; + } + // Start from a loop of 1ms, and gradually increase the loop duration + // until we either converge or we have passed 255ms (1ms+2ms+...+128ms). + int64_t measurement_ns = GPR_NS_PER_MS; + double last_freq = -1; + bool converged = false; + for (int i = 0; i < 8 && !converged; ++i, measurement_ns *= 2) { + start_cycle = gpr_get_cycle_counter(); + int64_t loop_ns; + gpr_timespec start = gpr_now(GPR_CLOCK_MONOTONIC); + do { + // TODO(soheil): Maybe sleep instead of busy polling. + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec delta = gpr_time_sub(now, start); + loop_ns = delta.tv_sec * GPR_NS_PER_SEC + delta.tv_nsec; + } while (loop_ns < measurement_ns); + gpr_cycle_counter end_cycle = gpr_get_cycle_counter(); + // Frequency should be in Hz. + const double freq = + static_cast(end_cycle - start_cycle) / loop_ns * GPR_NS_PER_SEC; + converged = + last_freq != -1 && (freq * 0.99 < last_freq && last_freq < freq * 1.01); + last_freq = freq; + } + cycles_per_second = last_freq; gpr_log(GPR_DEBUG, "... cycles_per_second = %f\n", cycles_per_second); } -void gpr_precise_clock_now(gpr_timespec* clk) { - int64_t counter; - double secs; - gpr_get_cycle_counter(&counter); - secs = (double)(counter - start_cycle) / cycles_per_second; - clk->clock_type = GPR_CLOCK_PRECISE; - clk->tv_sec = (int64_t)secs; - clk->tv_nsec = (int32_t)(1e9 * (secs - (double)clk->tv_sec)); +gpr_timespec gpr_cycle_counter_to_time(gpr_cycle_counter cycles) { + double secs = static_cast(cycles - start_cycle) / cycles_per_second; + gpr_timespec ts; + ts.tv_sec = static_cast(secs); + ts.tv_nsec = static_cast(GPR_NS_PER_SEC * + (secs - static_cast(ts.tv_sec))); + ts.clock_type = GPR_CLOCK_PRECISE; + return ts; } -#else /* GRPC_TIMERS_RDTSC */ +void gpr_precise_clock_now(gpr_timespec* clk) { + int64_t counter = gpr_get_cycle_counter(); + *clk = gpr_cycle_counter_to_time(counter); +} +#elif GPR_CYCLE_COUNTER_FALLBACK void gpr_precise_clock_init(void) {} +gpr_cycle_counter gpr_get_cycle_counter() { + gpr_timespec ts = gpr_now(GPR_CLOCK_REALTIME); + return gpr_timespec_to_micros(ts); +} + +gpr_timespec gpr_cycle_counter_to_time(gpr_cycle_counter cycles) { + gpr_timespec ts; + ts.tv_sec = cycles / GPR_US_PER_SEC; + ts.tv_nsec = (cycles - ts.tv_sec * GPR_US_PER_SEC) * GPR_NS_PER_US; + ts.clock_type = GPR_CLOCK_PRECISE; + return ts; +} + void gpr_precise_clock_now(gpr_timespec* clk) { *clk = gpr_now(GPR_CLOCK_REALTIME); clk->clock_type = GPR_CLOCK_PRECISE; } -#endif /* GRPC_TIMERS_RDTSC */ +#endif /* GPR_CYCLE_COUNTER_FALLBACK */ diff --git a/src/core/lib/gpr/time_precise.h b/src/core/lib/gpr/time_precise.h index a63ea9dc689..ce16bafab2d 100644 --- a/src/core/lib/gpr/time_precise.h +++ b/src/core/lib/gpr/time_precise.h @@ -21,9 +21,45 @@ #include +#include #include +// Depending on the platform gpr_get_cycle_counter() can have a resolution as +// low as a usec. Use other clock sources or gpr_precise_clock_now(), +// where you need high resolution clocks. +// +// Using gpr_get_cycle_counter() is preferred to using ExecCtx::Get()->Now() +// whenever possible. + +#if GPR_CYCLE_COUNTER_RDTSC_32 +typedef int64_t gpr_cycle_counter; +inline gpr_cycle_counter gpr_get_cycle_counter() { + int64_t ret; + __asm__ volatile("rdtsc" : "=A"(ret)); + return ret; +} +#elif GPR_CYCLE_COUNTER_RDTSC_64 +typedef int64_t gpr_cycle_counter; +inline gpr_cycle_counter gpr_get_cycle_counter() { + uint64_t low, high; + __asm__ volatile("rdtsc" : "=a"(low), "=d"(high)); + return (high << 32) | low; +} +#elif GPR_CYCLE_COUNTER_FALLBACK +// TODO(soheil): add support for mrs on Arm. + +// Real time in micros. +typedef double gpr_cycle_counter; +gpr_cycle_counter gpr_get_cycle_counter(); +#else +#error Must define exactly one of \ + GPR_CYCLE_COUNTER_RDTSC_32, \ + GPR_CYCLE_COUNTER_RDTSC_64, or \ + GPR_CYCLE_COUNTER_FALLBACK +#endif + void gpr_precise_clock_init(void); void gpr_precise_clock_now(gpr_timespec* clk); +gpr_timespec gpr_cycle_counter_to_time(gpr_cycle_counter cycles); #endif /* GRPC_CORE_LIB_GPR_TIME_PRECISE_H */ diff --git a/src/core/lib/gprpp/inlined_vector.h b/src/core/lib/gprpp/inlined_vector.h index f511d520f50..c5ae0e8e65d 100644 --- a/src/core/lib/gprpp/inlined_vector.h +++ b/src/core/lib/gprpp/inlined_vector.h @@ -109,9 +109,13 @@ class InlinedVector { void reserve(size_t capacity) { if (capacity > capacity_) { - T* new_dynamic = static_cast(gpr_malloc(sizeof(T) * capacity)); + T* new_dynamic = + std::alignment_of::value == 0 + ? static_cast(gpr_malloc(sizeof(T) * capacity)) + : static_cast(gpr_malloc_aligned( + sizeof(T) * capacity, std::alignment_of::value)); move_elements(data(), new_dynamic, size_); - gpr_free(dynamic_); + free_dynamic(); dynamic_ = new_dynamic; capacity_ = capacity; } @@ -196,7 +200,17 @@ class InlinedVector { T& value = data()[i]; value.~T(); } - gpr_free(dynamic_); + free_dynamic(); + } + + void free_dynamic() { + if (dynamic_ != nullptr) { + if (std::alignment_of::value == 0) { + gpr_free(dynamic_); + } else { + gpr_free_aligned(dynamic_); + } + } } typename std::aligned_storage::type inline_[N]; diff --git a/test/core/channel/channelz_test.cc b/test/core/channel/channelz_test.cc index 7c55f9ffe0f..08ed4dd4fa5 100644 --- a/test/core/channel/channelz_test.cc +++ b/test/core/channel/channelz_test.cc @@ -51,7 +51,8 @@ class CallCountingHelperPeer { grpc_millis last_call_started_millis() const { CallCountingHelper::CounterData data; node_->CollectData(&data); - return (grpc_millis)gpr_atm_no_barrier_load(&data.last_call_started_millis); + gpr_timespec ts = gpr_cycle_counter_to_time(data.last_call_started_cycle); + return grpc_timespec_to_millis_round_up(ts); } private: