From 3261d69ae4bd974333b6d269b7225540086b0ab5 Mon Sep 17 00:00:00 2001 From: Soheil Hassas Yeganeh Date: Mon, 11 Nov 2019 10:37:16 -0500 Subject: [PATCH] Use 64-bit atomic variales instead of gpr_atm in channelz. There is possibility of overflow for both timestamps and counters. Use Atomic and Atomic to make sure there is no overflow. Prior to bdd3fdddb26, we were using grpc_millis. So the overflow was hidden due to the fact that the deltas were correct upto 2^32 milliseconds (i.e., 49 days). --- src/core/lib/channel/channelz.cc | 43 ++++++++++++++++---------------- src/core/lib/channel/channelz.h | 39 +++++++++++++++-------------- 2 files changed, 42 insertions(+), 40 deletions(-) diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc index 73ba8f1b974..d66d05cf1b7 100644 --- a/src/core/lib/channel/channelz.cc +++ b/src/core/lib/channel/channelz.cc @@ -32,6 +32,7 @@ #include "src/core/lib/channel/status_util.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/atomic.h" #include "src/core/lib/gprpp/host_port.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/iomgr/error.h" @@ -486,26 +487,26 @@ SocketNode::SocketNode(std::string local, std::string remote, std::string name) remote_(std::move(remote)) {} void SocketNode::RecordStreamStartedFromLocal() { - gpr_atm_no_barrier_fetch_add(&streams_started_, static_cast(1)); - gpr_atm_no_barrier_store(&last_local_stream_created_cycle_, - gpr_get_cycle_counter()); + streams_started_.FetchAdd(1, MemoryOrder::RELAXED); + last_local_stream_created_cycle_.Store(gpr_get_cycle_counter(), + MemoryOrder::RELAXED); } void SocketNode::RecordStreamStartedFromRemote() { - gpr_atm_no_barrier_fetch_add(&streams_started_, static_cast(1)); - gpr_atm_no_barrier_store(&last_remote_stream_created_cycle_, - gpr_get_cycle_counter()); + streams_started_.FetchAdd(1, MemoryOrder::RELAXED); + last_remote_stream_created_cycle_.Store(gpr_get_cycle_counter(), + MemoryOrder::RELAXED); } void SocketNode::RecordMessagesSent(uint32_t num_sent) { - gpr_atm_no_barrier_fetch_add(&messages_sent_, static_cast(num_sent)); - gpr_atm_no_barrier_store(&last_message_sent_cycle_, gpr_get_cycle_counter()); + messages_sent_.FetchAdd(num_sent, MemoryOrder::RELAXED); + last_message_sent_cycle_.Store(gpr_get_cycle_counter(), MemoryOrder::RELAXED); } void SocketNode::RecordMessageReceived() { - gpr_atm_no_barrier_fetch_add(&messages_received_, static_cast(1)); - gpr_atm_no_barrier_store(&last_message_received_cycle_, - gpr_get_cycle_counter()); + messages_received_.FetchAdd(1, MemoryOrder::RELAXED); + last_message_received_cycle_.Store(gpr_get_cycle_counter(), + MemoryOrder::RELAXED); } grpc_json* SocketNode::RenderJson() { @@ -534,12 +535,12 @@ grpc_json* SocketNode::RenderJson() { json = data; json_iterator = nullptr; gpr_timespec ts; - gpr_atm streams_started = gpr_atm_no_barrier_load(&streams_started_); + int64_t streams_started = streams_started_.Load(MemoryOrder::RELAXED); if (streams_started != 0) { json_iterator = grpc_json_add_number_string_child( json, json_iterator, "streamsStarted", streams_started); gpr_cycle_counter last_local_stream_created_cycle = - gpr_atm_no_barrier_load(&last_local_stream_created_cycle_); + last_local_stream_created_cycle_.Load(MemoryOrder::RELAXED); if (last_local_stream_created_cycle != 0) { ts = gpr_convert_clock_type( gpr_cycle_counter_to_time(last_local_stream_created_cycle), @@ -549,7 +550,7 @@ grpc_json* SocketNode::RenderJson() { gpr_format_timespec(ts), GRPC_JSON_STRING, true); } gpr_cycle_counter last_remote_stream_created_cycle = - gpr_atm_no_barrier_load(&last_remote_stream_created_cycle_); + last_remote_stream_created_cycle_.Load(MemoryOrder::RELAXED); if (last_remote_stream_created_cycle != 0) { ts = gpr_convert_clock_type( gpr_cycle_counter_to_time(last_remote_stream_created_cycle), @@ -559,41 +560,41 @@ grpc_json* SocketNode::RenderJson() { gpr_format_timespec(ts), GRPC_JSON_STRING, true); } } - gpr_atm streams_succeeded = gpr_atm_no_barrier_load(&streams_succeeded_); + int64_t streams_succeeded = streams_succeeded_.Load(MemoryOrder::RELAXED); if (streams_succeeded != 0) { json_iterator = grpc_json_add_number_string_child( json, json_iterator, "streamsSucceeded", streams_succeeded); } - gpr_atm streams_failed = gpr_atm_no_barrier_load(&streams_failed_); + int64_t streams_failed = streams_failed_.Load(MemoryOrder::RELAXED); if (streams_failed) { json_iterator = grpc_json_add_number_string_child( json, json_iterator, "streamsFailed", streams_failed); } - gpr_atm messages_sent = gpr_atm_no_barrier_load(&messages_sent_); + int64_t messages_sent = messages_sent_.Load(MemoryOrder::RELAXED); if (messages_sent != 0) { json_iterator = grpc_json_add_number_string_child( json, json_iterator, "messagesSent", messages_sent); ts = gpr_convert_clock_type( gpr_cycle_counter_to_time( - gpr_atm_no_barrier_load(&last_message_sent_cycle_)), + last_message_sent_cycle_.Load(MemoryOrder::RELAXED)), GPR_CLOCK_REALTIME); json_iterator = grpc_json_create_child(json_iterator, json, "lastMessageSentTimestamp", gpr_format_timespec(ts), GRPC_JSON_STRING, true); } - gpr_atm messages_received = gpr_atm_no_barrier_load(&messages_received_); + int64_t messages_received = messages_received_.Load(MemoryOrder::RELAXED); if (messages_received != 0) { json_iterator = grpc_json_add_number_string_child( json, json_iterator, "messagesReceived", messages_received); ts = gpr_convert_clock_type( gpr_cycle_counter_to_time( - gpr_atm_no_barrier_load(&last_message_received_cycle_)), + last_message_received_cycle_.Load(MemoryOrder::RELAXED)), GPR_CLOCK_REALTIME); json_iterator = grpc_json_create_child( json_iterator, json, "lastMessageReceivedTimestamp", gpr_format_timespec(ts), GRPC_JSON_STRING, true); } - gpr_atm keepalives_sent = gpr_atm_no_barrier_load(&keepalives_sent_); + int64_t keepalives_sent = keepalives_sent_.Load(MemoryOrder::RELAXED); if (keepalives_sent != 0) { json_iterator = grpc_json_add_number_string_child( json, json_iterator, "keepAlivesSent", keepalives_sent); diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h index 8702cbe9e95..238b48873fa 100644 --- a/src/core/lib/channel/channelz.h +++ b/src/core/lib/channel/channelz.h @@ -27,6 +27,7 @@ #include "src/core/lib/channel/channel_trace.h" #include "src/core/lib/gpr/time_precise.h" +#include "src/core/lib/gprpp/atomic.h" #include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/map.h" @@ -140,9 +141,9 @@ class CallCountingHelper { last_call_started_cycle( that.last_call_started_cycle.Load(MemoryOrder::RELAXED)) {} - Atomic calls_started{0}; - Atomic calls_succeeded{0}; - Atomic calls_failed{0}; + Atomic calls_started{0}; + Atomic calls_succeeded{0}; + Atomic calls_failed{0}; Atomic last_call_started_cycle{0}; // Make sure the size is exactly one cache line. uint8_t padding[GPR_CACHELINE_SIZE - 3 * sizeof(Atomic) - @@ -150,9 +151,9 @@ class CallCountingHelper { } GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE); struct CounterData { - intptr_t calls_started = 0; - intptr_t calls_succeeded = 0; - intptr_t calls_failed = 0; + int64_t calls_started = 0; + int64_t calls_succeeded = 0; + int64_t calls_failed = 0; gpr_cycle_counter last_call_started_cycle = 0; }; @@ -279,30 +280,30 @@ class SocketNode : public BaseNode { void RecordStreamStartedFromLocal(); void RecordStreamStartedFromRemote(); void RecordStreamSucceeded() { - gpr_atm_no_barrier_fetch_add(&streams_succeeded_, static_cast(1)); + streams_succeeded_.FetchAdd(1, MemoryOrder::RELAXED); } void RecordStreamFailed() { - gpr_atm_no_barrier_fetch_add(&streams_failed_, static_cast(1)); + streams_failed_.FetchAdd(1, MemoryOrder::RELAXED); } void RecordMessagesSent(uint32_t num_sent); void RecordMessageReceived(); void RecordKeepaliveSent() { - gpr_atm_no_barrier_fetch_add(&keepalives_sent_, static_cast(1)); + keepalives_sent_.FetchAdd(1, MemoryOrder::RELAXED); } const std::string& remote() { return remote_; } private: - gpr_atm streams_started_ = 0; - gpr_atm streams_succeeded_ = 0; - gpr_atm streams_failed_ = 0; - gpr_atm messages_sent_ = 0; - gpr_atm messages_received_ = 0; - gpr_atm keepalives_sent_ = 0; - gpr_atm last_local_stream_created_cycle_ = 0; - gpr_atm last_remote_stream_created_cycle_ = 0; - gpr_atm last_message_sent_cycle_ = 0; - gpr_atm last_message_received_cycle_ = 0; + Atomic streams_started_{0}; + Atomic streams_succeeded_{0}; + Atomic streams_failed_{0}; + Atomic messages_sent_{0}; + Atomic messages_received_{0}; + Atomic keepalives_sent_{0}; + Atomic last_local_stream_created_cycle_{0}; + Atomic last_remote_stream_created_cycle_{0}; + Atomic last_message_sent_cycle_{0}; + Atomic last_message_received_cycle_{0}; std::string local_; std::string remote_; };