[channelz] Save some memory per channel (#32996)

Whilst the per cpu counters probably help single channel contention, we
think it's likely that they're a pessimization when taken fleetwide.

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/32998/head
Craig Tiller 2 years ago committed by GitHub
parent b22d81889b
commit 79e46a6022
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 113
      src/core/lib/channel/channelz.cc
  3. 58
      src/core/lib/channel/channelz.h
  4. 1
      test/core/channel/channelz_registry_test.cc
  5. 7
      test/core/channel/channelz_test.cc

@ -1538,6 +1538,7 @@ grpc_cc_library(
"//src/core:no_destruct",
"//src/core:notification",
"//src/core:packed_table",
"//src/core:per_cpu",
"//src/core:pipe",
"//src/core:poll",
"//src/core:pollset_set",

@ -22,13 +22,13 @@
#include <algorithm>
#include <atomic>
#include <cstdint>
#include "absl/status/statusor.h"
#include "absl/strings/escaping.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/strip.h"
#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@ -38,7 +38,6 @@
#include "src/core/lib/channel/channelz_registry.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/json/json_writer.h"
#include "src/core/lib/transport/connectivity_state.h"
@ -68,69 +67,87 @@ std::string BaseNode::RenderJsonString() {
// CallCountingHelper
//
CallCountingHelper::CallCountingHelper() {
num_cores_ = std::max(1u, gpr_cpu_num_cores());
per_cpu_counter_data_storage_.reserve(num_cores_);
for (size_t i = 0; i < num_cores_; ++i) {
per_cpu_counter_data_storage_.emplace_back();
void CallCountingHelper::RecordCallStarted() {
calls_started_.fetch_add(1, std::memory_order_relaxed);
last_call_started_cycle_.store(gpr_get_cycle_counter(),
std::memory_order_relaxed);
}
void CallCountingHelper::RecordCallFailed() {
calls_failed_.fetch_add(1, std::memory_order_relaxed);
}
void CallCountingHelper::RecordCallSucceeded() {
calls_succeeded_.fetch_add(1, std::memory_order_relaxed);
}
void CallCountingHelper::PopulateCallCounts(Json::Object* json) {
auto calls_started = calls_started_.load(std::memory_order_relaxed);
auto calls_succeeded = calls_succeeded_.load(std::memory_order_relaxed);
auto calls_failed = calls_failed_.load(std::memory_order_relaxed);
auto last_call_started_cycle =
last_call_started_cycle_.load(std::memory_order_relaxed);
if (calls_started != 0) {
(*json)["callsStarted"] = Json::FromString(absl::StrCat(calls_started));
gpr_timespec ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(last_call_started_cycle), GPR_CLOCK_REALTIME);
(*json)["lastCallStartedTimestamp"] =
Json::FromString(gpr_format_timespec(ts));
}
if (calls_succeeded != 0) {
(*json)["callsSucceeded"] = Json::FromString(absl::StrCat(calls_succeeded));
}
if (calls_failed != 0) {
(*json)["callsFailed"] = Json::FromString(absl::StrCat(calls_failed));
}
}
void CallCountingHelper::RecordCallStarted() {
AtomicCounterData& data =
per_cpu_counter_data_storage_[ExecCtx::Get()->starting_cpu()];
//
// PerCpuCallCountingHelper
//
void PerCpuCallCountingHelper::RecordCallStarted() {
auto& data = per_cpu_data_.this_cpu();
data.calls_started.fetch_add(1, std::memory_order_relaxed);
data.last_call_started_cycle.store(gpr_get_cycle_counter(),
std::memory_order_relaxed);
}
void CallCountingHelper::RecordCallFailed() {
per_cpu_counter_data_storage_[ExecCtx::Get()->starting_cpu()]
.calls_failed.fetch_add(1, std::memory_order_relaxed);
void PerCpuCallCountingHelper::RecordCallFailed() {
per_cpu_data_.this_cpu().calls_failed.fetch_add(1, std::memory_order_relaxed);
}
void CallCountingHelper::RecordCallSucceeded() {
per_cpu_counter_data_storage_[ExecCtx::Get()->starting_cpu()]
.calls_succeeded.fetch_add(1, std::memory_order_relaxed);
}
void CallCountingHelper::CollectData(CounterData* out) {
for (size_t core = 0; core < num_cores_; ++core) {
AtomicCounterData& data = per_cpu_counter_data_storage_[core];
out->calls_started += data.calls_started.load(std::memory_order_relaxed);
out->calls_succeeded +=
per_cpu_counter_data_storage_[core].calls_succeeded.load(
std::memory_order_relaxed);
out->calls_failed += per_cpu_counter_data_storage_[core].calls_failed.load(
std::memory_order_relaxed);
const gpr_cycle_counter last_call =
per_cpu_counter_data_storage_[core].last_call_started_cycle.load(
std::memory_order_relaxed);
if (last_call > out->last_call_started_cycle) {
out->last_call_started_cycle = last_call;
}
}
void PerCpuCallCountingHelper::RecordCallSucceeded() {
per_cpu_data_.this_cpu().calls_succeeded.fetch_add(1,
std::memory_order_relaxed);
}
void CallCountingHelper::PopulateCallCounts(Json::Object* json) {
CounterData data;
CollectData(&data);
if (data.calls_started != 0) {
(*json)["callsStarted"] =
Json::FromString(absl::StrCat(data.calls_started));
void PerCpuCallCountingHelper::PopulateCallCounts(Json::Object* json) {
int64_t calls_started = 0;
int64_t calls_succeeded = 0;
int64_t calls_failed = 0;
gpr_cycle_counter last_call_started_cycle = 0;
for (const auto& cpu : per_cpu_data_) {
calls_started += cpu.calls_started.load(std::memory_order_relaxed);
calls_succeeded += cpu.calls_succeeded.load(std::memory_order_relaxed);
calls_failed += cpu.calls_failed.load(std::memory_order_relaxed);
last_call_started_cycle =
std::max(last_call_started_cycle,
cpu.last_call_started_cycle.load(std::memory_order_relaxed));
}
if (calls_started != 0) {
(*json)["callsStarted"] = Json::FromString(absl::StrCat(calls_started));
gpr_timespec ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(data.last_call_started_cycle),
GPR_CLOCK_REALTIME);
gpr_cycle_counter_to_time(last_call_started_cycle), GPR_CLOCK_REALTIME);
(*json)["lastCallStartedTimestamp"] =
Json::FromString(gpr_format_timespec(ts));
}
if (data.calls_succeeded != 0) {
(*json)["callsSucceeded"] =
Json::FromString(absl::StrCat(data.calls_succeeded));
if (calls_succeeded != 0) {
(*json)["callsSucceeded"] = Json::FromString(absl::StrCat(calls_succeeded));
}
if (data.calls_failed != 0) {
(*json)["callsFailed"] = Json::FromString(absl::StrCat(data.calls_failed));
if (calls_failed != 0) {
(*json)["callsFailed"] = Json::FromString(absl::StrCat(calls_failed));
}
}

@ -29,7 +29,6 @@
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
@ -41,6 +40,7 @@
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/gpr/time_precise.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/per_cpu.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
@ -122,8 +122,6 @@ class BaseNode : public RefCounted<BaseNode> {
// - perform rendering of the above items
class CallCountingHelper {
public:
CallCountingHelper();
void RecordCallStarted();
void RecordCallFailed();
void RecordCallSucceeded();
@ -135,44 +133,32 @@ class CallCountingHelper {
// testing peer friend.
friend class testing::CallCountingHelperPeer;
// TODO(soheil): add a proper PerCPU helper and use it here.
struct AtomicCounterData {
// Define the ctors so that we can use this structure in InlinedVector.
AtomicCounterData() = default;
AtomicCounterData(const AtomicCounterData& that)
: calls_started(that.calls_started.load(std::memory_order_relaxed)),
calls_succeeded(that.calls_succeeded.load(std::memory_order_relaxed)),
calls_failed(that.calls_failed.load(std::memory_order_relaxed)),
last_call_started_cycle(
that.last_call_started_cycle.load(std::memory_order_relaxed)) {}
std::atomic<int64_t> calls_started_{0};
std::atomic<int64_t> calls_succeeded_{0};
std::atomic<int64_t> calls_failed_{0};
std::atomic<gpr_cycle_counter> last_call_started_cycle_{0};
};
class PerCpuCallCountingHelper {
public:
void RecordCallStarted();
void RecordCallFailed();
void RecordCallSucceeded();
// Common rendering of the call count data and last_call_started_timestamp.
void PopulateCallCounts(Json::Object* json);
private:
// testing peer friend.
friend class testing::CallCountingHelperPeer;
struct alignas(GPR_CACHELINE_SIZE) PerCpuData {
std::atomic<int64_t> calls_started{0};
std::atomic<int64_t> calls_succeeded{0};
std::atomic<int64_t> calls_failed{0};
std::atomic<gpr_cycle_counter> last_call_started_cycle{0};
// Make sure the size is exactly one cache line.
uint8_t padding[GPR_CACHELINE_SIZE - 3 * sizeof(std::atomic<intptr_t>) -
sizeof(std::atomic<gpr_cycle_counter>)];
};
// TODO(soheilhy,veblush): Revist this after abseil integration.
// This has a problem when using abseil inlined_vector because it
// carries an alignment attribute properly but our allocator doesn't
// respect this. To avoid UBSAN errors, this should be removed with
// abseil inlined_vector.
// GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
struct CounterData {
int64_t calls_started = 0;
int64_t calls_succeeded = 0;
int64_t calls_failed = 0;
gpr_cycle_counter last_call_started_cycle = 0;
};
// collects the sharded data into one CounterData struct.
void CollectData(CounterData* out);
std::vector<AtomicCounterData> per_cpu_counter_data_storage_;
size_t num_cores_ = 0;
PerCpu<PerCpuData> per_cpu_data_;
};
// Handles channelz bookkeeping for channels
@ -271,7 +257,7 @@ class ServerNode : public BaseNode {
void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); }
private:
CallCountingHelper call_counter_;
PerCpuCallCountingHelper call_counter_;
ChannelTrace trace_;
Mutex child_mu_; // Guards child maps below.
std::map<intptr_t, RefCountedPtr<SocketNode>> child_sockets_;

@ -20,6 +20,7 @@
#include <stdlib.h>
#include <algorithm>
#include <vector>
#include "gtest/gtest.h"

@ -21,7 +21,9 @@
#include <stdlib.h>
#include <algorithm>
#include <atomic>
#include <memory>
#include <vector>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
@ -53,9 +55,8 @@ class CallCountingHelperPeer {
explicit CallCountingHelperPeer(CallCountingHelper* node) : node_(node) {}
gpr_timespec last_call_started_time() const {
CallCountingHelper::CounterData data;
node_->CollectData(&data);
return gpr_cycle_counter_to_time(data.last_call_started_cycle);
return gpr_cycle_counter_to_time(
node_->last_call_started_cycle_.load(std::memory_order_relaxed));
}
private:

Loading…
Cancel
Save