Use 64-bit atomic variales instead of gpr_atm in channelz.

There is possibility of overflow for both timestamps and counters.
Use Atomic<int64_t> and Atomic<gpr_cycle_counter> to make sure
there is no overflow.

Prior to bdd3fdddb2, we were using grpc_millis.  So the overflow
was hidden due to the fact that the deltas were correct upto 2^32
milliseconds (i.e., 49 days).
pull/21000/head
Soheil Hassas Yeganeh 5 years ago
parent af1eaa0d05
commit 3261d69ae4
  1. 43
      src/core/lib/channel/channelz.cc
  2. 39
      src/core/lib/channel/channelz.h

@ -32,6 +32,7 @@
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/error.h"
@ -486,26 +487,26 @@ SocketNode::SocketNode(std::string local, std::string remote, std::string name)
remote_(std::move(remote)) {}
void SocketNode::RecordStreamStartedFromLocal() {
gpr_atm_no_barrier_fetch_add(&streams_started_, static_cast<gpr_atm>(1));
gpr_atm_no_barrier_store(&last_local_stream_created_cycle_,
gpr_get_cycle_counter());
streams_started_.FetchAdd(1, MemoryOrder::RELAXED);
last_local_stream_created_cycle_.Store(gpr_get_cycle_counter(),
MemoryOrder::RELAXED);
}
void SocketNode::RecordStreamStartedFromRemote() {
gpr_atm_no_barrier_fetch_add(&streams_started_, static_cast<gpr_atm>(1));
gpr_atm_no_barrier_store(&last_remote_stream_created_cycle_,
gpr_get_cycle_counter());
streams_started_.FetchAdd(1, MemoryOrder::RELAXED);
last_remote_stream_created_cycle_.Store(gpr_get_cycle_counter(),
MemoryOrder::RELAXED);
}
void SocketNode::RecordMessagesSent(uint32_t num_sent) {
gpr_atm_no_barrier_fetch_add(&messages_sent_, static_cast<gpr_atm>(num_sent));
gpr_atm_no_barrier_store(&last_message_sent_cycle_, gpr_get_cycle_counter());
messages_sent_.FetchAdd(num_sent, MemoryOrder::RELAXED);
last_message_sent_cycle_.Store(gpr_get_cycle_counter(), MemoryOrder::RELAXED);
}
void SocketNode::RecordMessageReceived() {
gpr_atm_no_barrier_fetch_add(&messages_received_, static_cast<gpr_atm>(1));
gpr_atm_no_barrier_store(&last_message_received_cycle_,
gpr_get_cycle_counter());
messages_received_.FetchAdd(1, MemoryOrder::RELAXED);
last_message_received_cycle_.Store(gpr_get_cycle_counter(),
MemoryOrder::RELAXED);
}
grpc_json* SocketNode::RenderJson() {
@ -534,12 +535,12 @@ grpc_json* SocketNode::RenderJson() {
json = data;
json_iterator = nullptr;
gpr_timespec ts;
gpr_atm streams_started = gpr_atm_no_barrier_load(&streams_started_);
int64_t streams_started = streams_started_.Load(MemoryOrder::RELAXED);
if (streams_started != 0) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "streamsStarted", streams_started);
gpr_cycle_counter last_local_stream_created_cycle =
gpr_atm_no_barrier_load(&last_local_stream_created_cycle_);
last_local_stream_created_cycle_.Load(MemoryOrder::RELAXED);
if (last_local_stream_created_cycle != 0) {
ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(last_local_stream_created_cycle),
@ -549,7 +550,7 @@ grpc_json* SocketNode::RenderJson() {
gpr_format_timespec(ts), GRPC_JSON_STRING, true);
}
gpr_cycle_counter last_remote_stream_created_cycle =
gpr_atm_no_barrier_load(&last_remote_stream_created_cycle_);
last_remote_stream_created_cycle_.Load(MemoryOrder::RELAXED);
if (last_remote_stream_created_cycle != 0) {
ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(last_remote_stream_created_cycle),
@ -559,41 +560,41 @@ grpc_json* SocketNode::RenderJson() {
gpr_format_timespec(ts), GRPC_JSON_STRING, true);
}
}
gpr_atm streams_succeeded = gpr_atm_no_barrier_load(&streams_succeeded_);
int64_t streams_succeeded = streams_succeeded_.Load(MemoryOrder::RELAXED);
if (streams_succeeded != 0) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "streamsSucceeded", streams_succeeded);
}
gpr_atm streams_failed = gpr_atm_no_barrier_load(&streams_failed_);
int64_t streams_failed = streams_failed_.Load(MemoryOrder::RELAXED);
if (streams_failed) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "streamsFailed", streams_failed);
}
gpr_atm messages_sent = gpr_atm_no_barrier_load(&messages_sent_);
int64_t messages_sent = messages_sent_.Load(MemoryOrder::RELAXED);
if (messages_sent != 0) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "messagesSent", messages_sent);
ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(
gpr_atm_no_barrier_load(&last_message_sent_cycle_)),
last_message_sent_cycle_.Load(MemoryOrder::RELAXED)),
GPR_CLOCK_REALTIME);
json_iterator =
grpc_json_create_child(json_iterator, json, "lastMessageSentTimestamp",
gpr_format_timespec(ts), GRPC_JSON_STRING, true);
}
gpr_atm messages_received = gpr_atm_no_barrier_load(&messages_received_);
int64_t messages_received = messages_received_.Load(MemoryOrder::RELAXED);
if (messages_received != 0) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "messagesReceived", messages_received);
ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(
gpr_atm_no_barrier_load(&last_message_received_cycle_)),
last_message_received_cycle_.Load(MemoryOrder::RELAXED)),
GPR_CLOCK_REALTIME);
json_iterator = grpc_json_create_child(
json_iterator, json, "lastMessageReceivedTimestamp",
gpr_format_timespec(ts), GRPC_JSON_STRING, true);
}
gpr_atm keepalives_sent = gpr_atm_no_barrier_load(&keepalives_sent_);
int64_t keepalives_sent = keepalives_sent_.Load(MemoryOrder::RELAXED);
if (keepalives_sent != 0) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "keepAlivesSent", keepalives_sent);

@ -27,6 +27,7 @@
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/gpr/time_precise.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/map.h"
@ -140,9 +141,9 @@ class CallCountingHelper {
last_call_started_cycle(
that.last_call_started_cycle.Load(MemoryOrder::RELAXED)) {}
Atomic<intptr_t> calls_started{0};
Atomic<intptr_t> calls_succeeded{0};
Atomic<intptr_t> calls_failed{0};
Atomic<int64_t> calls_started{0};
Atomic<int64_t> calls_succeeded{0};
Atomic<int64_t> calls_failed{0};
Atomic<gpr_cycle_counter> last_call_started_cycle{0};
// Make sure the size is exactly one cache line.
uint8_t padding[GPR_CACHELINE_SIZE - 3 * sizeof(Atomic<intptr_t>) -
@ -150,9 +151,9 @@ class CallCountingHelper {
} GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
struct CounterData {
intptr_t calls_started = 0;
intptr_t calls_succeeded = 0;
intptr_t calls_failed = 0;
int64_t calls_started = 0;
int64_t calls_succeeded = 0;
int64_t calls_failed = 0;
gpr_cycle_counter last_call_started_cycle = 0;
};
@ -279,30 +280,30 @@ class SocketNode : public BaseNode {
void RecordStreamStartedFromLocal();
void RecordStreamStartedFromRemote();
void RecordStreamSucceeded() {
gpr_atm_no_barrier_fetch_add(&streams_succeeded_, static_cast<gpr_atm>(1));
streams_succeeded_.FetchAdd(1, MemoryOrder::RELAXED);
}
void RecordStreamFailed() {
gpr_atm_no_barrier_fetch_add(&streams_failed_, static_cast<gpr_atm>(1));
streams_failed_.FetchAdd(1, MemoryOrder::RELAXED);
}
void RecordMessagesSent(uint32_t num_sent);
void RecordMessageReceived();
void RecordKeepaliveSent() {
gpr_atm_no_barrier_fetch_add(&keepalives_sent_, static_cast<gpr_atm>(1));
keepalives_sent_.FetchAdd(1, MemoryOrder::RELAXED);
}
const std::string& remote() { return remote_; }
private:
gpr_atm streams_started_ = 0;
gpr_atm streams_succeeded_ = 0;
gpr_atm streams_failed_ = 0;
gpr_atm messages_sent_ = 0;
gpr_atm messages_received_ = 0;
gpr_atm keepalives_sent_ = 0;
gpr_atm last_local_stream_created_cycle_ = 0;
gpr_atm last_remote_stream_created_cycle_ = 0;
gpr_atm last_message_sent_cycle_ = 0;
gpr_atm last_message_received_cycle_ = 0;
Atomic<int64_t> streams_started_{0};
Atomic<int64_t> streams_succeeded_{0};
Atomic<int64_t> streams_failed_{0};
Atomic<int64_t> messages_sent_{0};
Atomic<int64_t> messages_received_{0};
Atomic<int64_t> keepalives_sent_{0};
Atomic<gpr_cycle_counter> last_local_stream_created_cycle_{0};
Atomic<gpr_cycle_counter> last_remote_stream_created_cycle_{0};
Atomic<gpr_cycle_counter> last_message_sent_cycle_{0};
Atomic<gpr_cycle_counter> last_message_received_cycle_{0};
std::string local_;
std::string remote_;
};

Loading…
Cancel
Save