Shard channelz stats by CPU

pull/16765/head
ncteisen 6 years ago
parent 242d6a58dc
commit f906185200
  1. 71
      src/core/lib/channel/channelz.cc
  2. 16
      src/core/lib/channel/channelz.h
  3. 7
      src/core/lib/iomgr/exec_ctx.h
  4. 3
      test/core/channel/channelz_test.cc

@ -34,6 +34,7 @@
#include "src/core/lib/gpr/useful.h" #include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/error_utils.h"
@ -55,35 +56,77 @@ char* BaseNode::RenderJsonString() {
} }
CallCountingHelper::CallCountingHelper() { CallCountingHelper::CallCountingHelper() {
gpr_atm_no_barrier_store(&last_call_started_millis_, num_cores_ = GPR_MAX(1, gpr_cpu_num_cores());
(gpr_atm)ExecCtx::Get()->Now()); per_cpu_counter_data_storage_ =
static_cast<CounterData*>(gpr_zalloc(sizeof(CounterData) * num_cores_));
} }
CallCountingHelper::~CallCountingHelper() {} CallCountingHelper::~CallCountingHelper() {
gpr_free(per_cpu_counter_data_storage_);
}
void CallCountingHelper::RecordCallStarted() { void CallCountingHelper::RecordCallStarted() {
gpr_atm_no_barrier_fetch_add(&calls_started_, static_cast<gpr_atm>(1)); gpr_atm_no_barrier_fetch_add(
gpr_atm_no_barrier_store(&last_call_started_millis_, &per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()]
.calls_started_,
static_cast<gpr_atm>(1));
gpr_atm_no_barrier_store(
&per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()]
.last_call_started_millis_,
(gpr_atm)ExecCtx::Get()->Now()); (gpr_atm)ExecCtx::Get()->Now());
} }
void CallCountingHelper::RecordCallFailed() {
gpr_atm_no_barrier_fetch_add(
&per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()]
.calls_failed_,
static_cast<gpr_atm>(1));
}
void CallCountingHelper::RecordCallSucceeded() {
gpr_atm_no_barrier_fetch_add(
&per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()]
.calls_succeeded_,
static_cast<gpr_atm>(1));
}
CallCountingHelper::CounterData CallCountingHelper::Collect() {
CounterData out;
memset(&out, 0, sizeof(out));
for (size_t core = 0; core < num_cores_; ++core) {
out.calls_started_ += gpr_atm_no_barrier_load(
&per_cpu_counter_data_storage_[core].calls_started_);
out.calls_succeeded_ += gpr_atm_no_barrier_load(
&per_cpu_counter_data_storage_[core].calls_succeeded_);
out.calls_failed_ += gpr_atm_no_barrier_load(
&per_cpu_counter_data_storage_[core].calls_failed_);
gpr_atm last_call = gpr_atm_no_barrier_load(
&per_cpu_counter_data_storage_[core].last_call_started_millis_);
if (last_call > out.last_call_started_millis_) {
out.last_call_started_millis_ = last_call;
}
}
return out;
}
void CallCountingHelper::PopulateCallCounts(grpc_json* json) { void CallCountingHelper::PopulateCallCounts(grpc_json* json) {
grpc_json* json_iterator = nullptr; grpc_json* json_iterator = nullptr;
if (calls_started_ != 0) { CounterData data = Collect();
if (data.calls_started_ != 0) {
json_iterator = grpc_json_add_number_string_child( json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "callsStarted", calls_started_); json, json_iterator, "callsStarted", data.calls_started_);
} }
if (calls_succeeded_ != 0) { if (data.calls_succeeded_ != 0) {
json_iterator = grpc_json_add_number_string_child( json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "callsSucceeded", calls_succeeded_); json, json_iterator, "callsSucceeded", data.calls_succeeded_);
} }
if (calls_failed_) { if (data.calls_failed_) {
json_iterator = grpc_json_add_number_string_child( json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "callsFailed", calls_failed_); json, json_iterator, "callsFailed", data.calls_failed_);
} }
if (calls_started_ != 0) { if (data.calls_started_ != 0) {
gpr_timespec ts = gpr_timespec ts = grpc_millis_to_timespec(data.last_call_started_millis_,
grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME); GPR_CLOCK_REALTIME);
json_iterator = json_iterator =
grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp", grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp",
gpr_format_timespec(ts), GRPC_JSON_STRING, true); gpr_format_timespec(ts), GRPC_JSON_STRING, true);

@ -91,12 +91,8 @@ class CallCountingHelper {
~CallCountingHelper(); ~CallCountingHelper();
void RecordCallStarted(); void RecordCallStarted();
void RecordCallFailed() { void RecordCallFailed();
gpr_atm_no_barrier_fetch_add(&calls_failed_, static_cast<gpr_atm>(1)); void RecordCallSucceeded();
}
void RecordCallSucceeded() {
gpr_atm_no_barrier_fetch_add(&calls_succeeded_, static_cast<gpr_atm>(1));
}
// Common rendering of the call count data and last_call_started_timestamp. // Common rendering of the call count data and last_call_started_timestamp.
void PopulateCallCounts(grpc_json* json); void PopulateCallCounts(grpc_json* json);
@ -105,12 +101,20 @@ class CallCountingHelper {
// testing peer friend. // testing peer friend.
friend class testing::CallCountingHelperPeer; friend class testing::CallCountingHelperPeer;
struct CounterData {
gpr_atm calls_started_ = 0; gpr_atm calls_started_ = 0;
gpr_atm calls_succeeded_ = 0; gpr_atm calls_succeeded_ = 0;
gpr_atm calls_failed_ = 0; gpr_atm calls_failed_ = 0;
gpr_atm last_call_started_millis_ = 0; gpr_atm last_call_started_millis_ = 0;
}; };
// collects the sharded data into one CounterData struct.
CounterData Collect();
CounterData* per_cpu_counter_data_storage_ = nullptr;
size_t num_cores_ = 0;
};
// Handles channelz bookkeeping for channels // Handles channelz bookkeeping for channels
class ChannelNode : public BaseNode { class ChannelNode : public BaseNode {
public: public:

@ -116,12 +116,7 @@ class ExecCtx {
ExecCtx(const ExecCtx&) = delete; ExecCtx(const ExecCtx&) = delete;
ExecCtx& operator=(const ExecCtx&) = delete; ExecCtx& operator=(const ExecCtx&) = delete;
/** Return starting_cpu. This is only required for stats collection and is
* hence only defined if GRPC_COLLECT_STATS is enabled.
*/
#if defined(GRPC_COLLECT_STATS) || !defined(NDEBUG)
unsigned starting_cpu() const { return starting_cpu_; } unsigned starting_cpu() const { return starting_cpu_; }
#endif /* defined(GRPC_COLLECT_STATS) || !defined(NDEBUG) */
struct CombinerData { struct CombinerData {
/* currently active combiner: updated only via combiner.c */ /* currently active combiner: updated only via combiner.c */
@ -223,9 +218,7 @@ class ExecCtx {
CombinerData combiner_data_ = {nullptr, nullptr}; CombinerData combiner_data_ = {nullptr, nullptr};
uintptr_t flags_; uintptr_t flags_;
#if defined(GRPC_COLLECT_STATS) || !defined(NDEBUG)
unsigned starting_cpu_ = gpr_cpu_current_cpu(); unsigned starting_cpu_ = gpr_cpu_current_cpu();
#endif /* defined(GRPC_COLLECT_STATS) || !defined(NDEBUG) */
bool now_is_valid_ = false; bool now_is_valid_ = false;
grpc_millis now_ = 0; grpc_millis now_ = 0;

@ -49,8 +49,9 @@ class CallCountingHelperPeer {
public: public:
explicit CallCountingHelperPeer(CallCountingHelper* node) : node_(node) {} explicit CallCountingHelperPeer(CallCountingHelper* node) : node_(node) {}
grpc_millis last_call_started_millis() const { grpc_millis last_call_started_millis() const {
CallCountingHelper::CounterData data = node_->Collect();
return (grpc_millis)gpr_atm_no_barrier_load( return (grpc_millis)gpr_atm_no_barrier_load(
&node_->last_call_started_millis_); &data.last_call_started_millis_);
} }
private: private:

Loading…
Cancel
Save