[channelz] add synchronization for channel traces (#36434)

Fixes #36409.

Also do a bit of code cleanup.

Closes #36434

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36434 from markdroth:channelz_crash_fix c9ea4ecdaa
PiperOrigin-RevId: 627771370
pull/36402/head
Mark D. Roth 11 months ago committed by Copybara-Service
parent eb505095c4
commit 459abbec5a
  1. 111
      src/core/channelz/channel_trace.cc
  2. 31
      src/core/channelz/channel_trace.h
  3. 24
      test/core/channelz/channel_trace_test.cc

@ -36,37 +36,68 @@
namespace grpc_core {
namespace channelz {
//
// ChannelTrace::TraceEvent
//
ChannelTrace::TraceEvent::TraceEvent(Severity severity, const grpc_slice& data,
RefCountedPtr<BaseNode> referenced_entity)
: severity_(severity),
: timestamp_(Timestamp::Now().as_timespec(GPR_CLOCK_REALTIME)),
severity_(severity),
data_(data),
timestamp_(Timestamp::Now().as_timespec(GPR_CLOCK_REALTIME)),
next_(nullptr),
referenced_entity_(std::move(referenced_entity)),
memory_usage_(sizeof(TraceEvent) + grpc_slice_memory_usage(data)) {}
memory_usage_(sizeof(TraceEvent) + grpc_slice_memory_usage(data)),
referenced_entity_(std::move(referenced_entity)) {}
ChannelTrace::TraceEvent::TraceEvent(Severity severity, const grpc_slice& data)
: severity_(severity),
data_(data),
timestamp_(Timestamp::Now().as_timespec(GPR_CLOCK_REALTIME)),
next_(nullptr),
memory_usage_(sizeof(TraceEvent) + grpc_slice_memory_usage(data)) {}
: TraceEvent(severity, data, nullptr) {}
ChannelTrace::TraceEvent::~TraceEvent() { CSliceUnref(data_); }
ChannelTrace::ChannelTrace(size_t max_event_memory)
: num_events_logged_(0),
event_list_memory_usage_(0),
max_event_memory_(max_event_memory),
head_trace_(nullptr),
tail_trace_(nullptr) {
if (max_event_memory_ == 0) {
return; // tracing is disabled if max_event_memory_ == 0
namespace {
const char* SeverityString(ChannelTrace::Severity severity) {
switch (severity) {
case ChannelTrace::Severity::Info:
return "CT_INFO";
case ChannelTrace::Severity::Warning:
return "CT_WARNING";
case ChannelTrace::Severity::Error:
return "CT_ERROR";
default:
GPR_UNREACHABLE_CODE(return "CT_UNKNOWN");
}
}
} // anonymous namespace
Json ChannelTrace::TraceEvent::RenderTraceEvent() const {
char* description = grpc_slice_to_c_string(data_);
Json::Object object = {
{"description", Json::FromString(description)},
{"severity", Json::FromString(SeverityString(severity_))},
{"timestamp", Json::FromString(gpr_format_timespec(timestamp_))},
};
gpr_free(description);
if (referenced_entity_ != nullptr) {
const bool is_channel =
(referenced_entity_->type() == BaseNode::EntityType::kTopLevelChannel ||
referenced_entity_->type() == BaseNode::EntityType::kInternalChannel);
object[is_channel ? "channelRef" : "subchannelRef"] = Json::FromObject({
{(is_channel ? "channelId" : "subchannelId"),
Json::FromString(absl::StrCat(referenced_entity_->uuid()))},
});
}
gpr_mu_init(&tracer_mu_);
time_created_ = Timestamp::Now().as_timespec(GPR_CLOCK_REALTIME);
return Json::FromObject(std::move(object));
}
//
// ChannelTrace
//
ChannelTrace::ChannelTrace(size_t max_event_memory)
: max_event_memory_(max_event_memory),
time_created_(Timestamp::Now().as_timespec(GPR_CLOCK_REALTIME)) {}
ChannelTrace::~ChannelTrace() {
if (max_event_memory_ == 0) {
return; // tracing is disabled if max_event_memory_ == 0
@ -77,10 +108,10 @@ ChannelTrace::~ChannelTrace() {
it = it->next();
delete to_free;
}
gpr_mu_destroy(&tracer_mu_);
}
void ChannelTrace::AddTraceEventHelper(TraceEvent* new_trace_event) {
MutexLock lock(&mu_);
++num_events_logged_;
// first event case
if (head_trace_ == nullptr) {
@ -121,43 +152,6 @@ void ChannelTrace::AddTraceEventWithReference(
new TraceEvent(severity, data, std::move(referenced_entity)));
}
namespace {
const char* severity_string(ChannelTrace::Severity severity) {
switch (severity) {
case ChannelTrace::Severity::Info:
return "CT_INFO";
case ChannelTrace::Severity::Warning:
return "CT_WARNING";
case ChannelTrace::Severity::Error:
return "CT_ERROR";
default:
GPR_UNREACHABLE_CODE(return "CT_UNKNOWN");
}
}
} // anonymous namespace
Json ChannelTrace::TraceEvent::RenderTraceEvent() const {
char* description = grpc_slice_to_c_string(data_);
Json::Object object = {
{"description", Json::FromString(description)},
{"severity", Json::FromString(severity_string(severity_))},
{"timestamp", Json::FromString(gpr_format_timespec(timestamp_))},
};
gpr_free(description);
if (referenced_entity_ != nullptr) {
const bool is_channel =
(referenced_entity_->type() == BaseNode::EntityType::kTopLevelChannel ||
referenced_entity_->type() == BaseNode::EntityType::kInternalChannel);
object[is_channel ? "channelRef" : "subchannelRef"] = Json::FromObject({
{(is_channel ? "channelId" : "subchannelId"),
Json::FromString(absl::StrCat(referenced_entity_->uuid()))},
});
}
return Json::FromObject(std::move(object));
}
Json ChannelTrace::RenderJson() const {
// Tracing is disabled if max_event_memory_ == 0.
if (max_event_memory_ == 0) {
@ -167,6 +161,7 @@ Json ChannelTrace::RenderJson() const {
{"creationTimestamp",
Json::FromString(gpr_format_timespec(time_created_))},
};
MutexLock lock(&mu_);
if (num_events_logged_ > 0) {
object["numEventsLogged"] =
Json::FromString(absl::StrCat(num_events_logged_));

@ -22,12 +22,14 @@
#include <stddef.h>
#include <stdint.h>
#include "absl/base/thread_annotations.h"
#include <grpc/slice.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/json/json.h"
namespace grpc_core {
@ -110,25 +112,26 @@ class ChannelTrace {
size_t memory_usage() const { return memory_usage_; }
private:
Severity severity_;
grpc_slice data_;
gpr_timespec timestamp_;
TraceEvent* next_;
const gpr_timespec timestamp_;
const Severity severity_;
const grpc_slice data_;
const size_t memory_usage_;
// the tracer object for the (sub)channel that this trace event refers to.
RefCountedPtr<BaseNode> referenced_entity_;
size_t memory_usage_;
const RefCountedPtr<BaseNode> referenced_entity_;
TraceEvent* next_ = nullptr;
}; // TraceEvent
// Internal helper to add and link in a trace event
void AddTraceEventHelper(TraceEvent* new_trace_event);
gpr_mu tracer_mu_;
uint64_t num_events_logged_;
size_t event_list_memory_usage_;
size_t max_event_memory_;
TraceEvent* head_trace_;
TraceEvent* tail_trace_;
gpr_timespec time_created_;
const size_t max_event_memory_;
const gpr_timespec time_created_;
mutable Mutex mu_;
uint64_t num_events_logged_ ABSL_GUARDED_BY(mu_) = 0;
size_t event_list_memory_usage_ ABSL_GUARDED_BY(mu_) = 0;
TraceEvent* head_trace_ ABSL_GUARDED_BY(mu_) = nullptr;
TraceEvent* tail_trace_ ABSL_GUARDED_BY(mu_) = nullptr;
};
} // namespace channelz

@ -21,7 +21,9 @@
#include <stdlib.h>
#include <string>
#include <thread>
#include "absl/synchronization/notification.h"
#include "gtest/gtest.h"
#include <grpc/credentials.h>
@ -317,6 +319,28 @@ TEST(ChannelTracerTest, TestTotalEviction) {
ValidateChannelTraceCustom(&tracer, kNumEvents + 1, 0);
}
// Tests that the code is thread-safe.
TEST(ChannelTracerTest, ThreadSafety) {
ExecCtx exec_ctx;
ChannelTrace tracer(kEventListMemoryLimit);
absl::Notification done;
std::vector<std::unique_ptr<std::thread>> threads;
for (size_t i = 0; i < 10; ++i) {
threads.push_back(std::make_unique<std::thread>([&]() {
do {
AddSimpleTrace(&tracer);
} while (!done.HasBeenNotified());
}));
}
for (size_t i = 0; i < 10; ++i) {
tracer.RenderJson();
}
done.Notify();
for (const auto& thd : threads) {
thd->join();
}
}
} // namespace testing
} // namespace channelz
} // namespace grpc_core

Loading…
Cancel
Save