Reviewer feedback

reviewable/pr15343/r6
ncteisen 7 years ago
parent a871704351
commit c845ba66f3
  1. 12
      src/core/lib/channel/channel_trace.cc
  2. 13
      src/core/lib/channel/channel_trace.h
  3. 26
      src/core/lib/channel/channelz.cc
  4. 27
      src/core/lib/channel/channelz.h
  5. 6
      src/core/lib/surface/call.cc
  6. 7
      src/core/lib/surface/channel.cc
  7. 35
      test/core/channel/channel_trace_test.cc
  8. 68
      test/core/channel/channelz_test.cc

@ -121,11 +121,11 @@ void ChannelTrace::AddTraceEventReferencingChannel(
void ChannelTrace::AddTraceEventReferencingSubchannel(
Severity severity, grpc_slice data,
RefCountedPtr<Channel> referenced_channel) {
RefCountedPtr<Channel> referenced_subchannel) {
if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0
// create and fill up the new event
AddTraceEventHelper(New<TraceEvent>(severity, data,
std::move(referenced_channel),
std::move(referenced_subchannel),
ReferencedType::Subchannel));
}
@ -232,13 +232,5 @@ grpc_json* ChannelTrace::RenderJSON() const {
return json;
}
char* ChannelTrace::RenderTrace() const {
grpc_json* json = RenderJSON();
if (json == nullptr) return nullptr;
char* json_str = grpc_json_dump_to_string(json, 0);
grpc_json_destroy(json);
return json_str;
}
} // namespace channelz
} // namespace grpc_core

@ -22,11 +22,13 @@
#include <grpc/impl/codegen/port_platform.h>
#include <grpc/grpc.h>
// #include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/json/json.h"
namespace grpc_core {
namespace channelz {
@ -64,20 +66,15 @@ class ChannelTrace {
// slice.
void AddTraceEventReferencingChannel(
Severity severity, grpc_slice data,
RefCountedPtr<Channel> referenced_tracer);
RefCountedPtr<Channel> referenced_channel);
void AddTraceEventReferencingSubchannel(
Severity severity, grpc_slice data,
RefCountedPtr<Channel> referenced_tracer);
RefCountedPtr<Channel> referenced_subchannel);
// Creates and returns the raw grpc_json object, so a parent channelz
// object may incorporate the json before rendering.
grpc_json* RenderJSON() const;
// Returns the tracing data rendered as a grpc json string. The string
// is owned by the caller and must be freed. This is used for testing only
// so that we may unit test ChannelTrace in isolation.
char* RenderTrace() const;
private:
// Types of objects that can be references by trace events.
enum class ReferencedType { Channel, Subchannel };
@ -87,7 +84,7 @@ class ChannelTrace {
public:
// Constructor for a TraceEvent that references a different channel.
TraceEvent(Severity severity, grpc_slice data,
RefCountedPtr<Channel> referenced_tracer, ReferencedType type);
RefCountedPtr<Channel> referenced_channel, ReferencedType type);
// Constructor for a TraceEvent that does not reverence a different
// channel.

@ -101,28 +101,9 @@ Channel::~Channel() {
ChannelzRegistry::Unregister(channel_uuid_);
}
Channel::AtomicTimespec::AtomicTimespec() {
gpr_atm_no_barrier_store(&tv_sec_, (gpr_atm)0);
gpr_atm_no_barrier_store(&tv_nsec_, (gpr_atm)0);
}
void Channel::AtomicTimespec::Update(gpr_timespec ts) {
gpr_atm_no_barrier_store(&tv_sec_, (gpr_atm)ts.tv_sec);
gpr_atm_no_barrier_store(&tv_nsec_, (gpr_atm)ts.tv_nsec);
}
gpr_timespec Channel::AtomicTimespec::Get() {
gpr_timespec out;
out.clock_type = GPR_CLOCK_REALTIME;
out.tv_sec = static_cast<int64_t>(gpr_atm_no_barrier_load(&tv_sec_));
out.tv_nsec = static_cast<int32_t>(gpr_atm_no_barrier_load(&tv_nsec_));
return out;
}
void Channel::CallStarted() {
void Channel::RecordCallStarted() {
gpr_atm_no_barrier_fetch_add(&calls_started_, (gpr_atm)1);
last_call_started_timestamp_.Update(
grpc_millis_to_timespec(ExecCtx::Get()->Now(), GPR_CLOCK_REALTIME));
gpr_atm_no_barrier_store(&last_call_started_millis_, (gpr_atm)ExecCtx::Get()->Now());
}
grpc_connectivity_state Channel::GetConnectivityState() {
@ -195,9 +176,10 @@ char* Channel::RenderJSON() {
calls_succeeded_ ? calls_succeeded_ : -1);
json_iterator = add_num_str(json, json_iterator, "callsFailed",
calls_failed_ ? calls_failed_ : -1);
gpr_timespec ts = grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME);
json_iterator = grpc_json_create_child(
json_iterator, json, "lastCallStartedTimestamp",
fmt_time(last_call_started_timestamp_.Get()), GRPC_JSON_STRING, true);
fmt_time(ts), GRPC_JSON_STRING, true);
// render and return the over json object
char* json_str = grpc_json_dump_to_string(top_level_json, 0);

@ -28,27 +28,32 @@
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/json/json.h"
namespace grpc_core {
namespace channelz {
namespace testing {
class ChannelPeer;
}
class Channel : public RefCounted<Channel> {
public:
Channel(grpc_channel* channel, size_t channel_tracer_max_nodes);
~Channel();
void CallStarted();
void CallFailed() {
void RecordCallStarted();
void RecordCallFailed() {
gpr_atm_no_barrier_fetch_add(&calls_failed_, (gpr_atm(1)));
}
void CallSucceeded() {
void RecordCallSucceeded() {
gpr_atm_no_barrier_fetch_add(&calls_succeeded_, (gpr_atm(1)));
}
char* RenderJSON();
ChannelTrace* Trace() { return trace_.get(); }
ChannelTrace* trace() { return trace_.get(); }
void set_channel_destroyed() {
GPR_ASSERT(!channel_destroyed_);
@ -58,16 +63,8 @@ class Channel : public RefCounted<Channel> {
intptr_t channel_uuid() { return channel_uuid_; }
private:
class AtomicTimespec {
public:
AtomicTimespec();
void Update(gpr_timespec ts);
gpr_timespec Get();
private:
gpr_atm tv_sec_;
gpr_atm tv_nsec_;
};
// testing peer friend.
friend class testing::ChannelPeer;
bool channel_destroyed_ = false;
grpc_channel* channel_;
@ -75,7 +72,7 @@ class Channel : public RefCounted<Channel> {
gpr_atm calls_started_ = 0;
gpr_atm calls_succeeded_ = 0;
gpr_atm calls_failed_ = 0;
AtomicTimespec last_call_started_timestamp_;
gpr_atm last_call_started_millis_;
intptr_t channel_uuid_;
ManualConstructor<ChannelTrace> trace_;

@ -1084,11 +1084,11 @@ static void recv_trailing_filter(void* args, grpc_metadata_batch* b) {
: nullptr;
if (status_code == GRPC_STATUS_OK) {
if (channelz_channel != nullptr) {
channelz_channel->CallSucceeded();
channelz_channel->RecordCallSucceeded();
}
} else {
if (channelz_channel != nullptr) {
channelz_channel->CallFailed();
channelz_channel->RecordCallFailed();
}
error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error received from peer"),
@ -1677,7 +1677,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
}
grpc_core::channelz::Channel* channelz_channel =
grpc_channel_get_channelz_channel(call->channel);
channelz_channel->CallStarted();
channelz_channel->RecordCallStarted();
break;
}
case GRPC_OP_SEND_MESSAGE: {

@ -149,7 +149,7 @@ grpc_channel* grpc_channel_create_with_builder(
channel->channelz_channel =
grpc_core::MakeRefCounted<grpc_core::channelz::Channel>(
channel, channel_tracer_max_nodes);
channel->channelz_channel->Trace()->AddTraceEvent(
channel->channelz_channel->trace()->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel created"));
return channel;
@ -187,7 +187,10 @@ static grpc_channel_args* build_channel_args(
}
char* grpc_channel_get_trace(grpc_channel* channel) {
return channel->channelz_channel->Trace()->RenderTrace();
grpc_json* json = channel->channelz_channel->trace()->RenderJSON();
char* json_str = grpc_json_dump_to_string(json, 0);
grpc_json_destroy(json);
return json_str;
}
char* grpc_channel_render_channelz(grpc_channel* channel) {

@ -88,12 +88,15 @@ void AddSimpleTrace(ChannelTrace* tracer) {
void ValidateChannelTrace(ChannelTrace* tracer,
size_t expected_num_event_logged, size_t max_nodes) {
if (!max_nodes) return;
char* json_str = tracer->RenderTrace();
grpc_json* json = tracer->RenderJSON();
EXPECT_NE(json, nullptr);
char* json_str = grpc_json_dump_to_string(json, 0);
grpc_json_destroy(json);
grpc::testing::ValidateChannelTraceProtoJsonTranslation(json_str);
grpc_json* json = grpc_json_parse_string(json_str);
ValidateChannelTraceData(json, expected_num_event_logged,
grpc_json* parsed_json = grpc_json_parse_string(json_str);
ValidateChannelTraceData(parsed_json, expected_num_event_logged,
GPR_MIN(expected_num_event_logged, max_nodes));
grpc_json_destroy(json);
grpc_json_destroy(parsed_json);
gpr_free(json_str);
}
@ -159,14 +162,14 @@ TEST_P(ChannelTracerTest, ComplexTest) {
ChannelTrace::Severity::Info,
grpc_slice_from_static_string("subchannel one created"), sc1);
ValidateChannelTrace(&tracer, 3, GetParam());
AddSimpleTrace(sc1->Trace());
AddSimpleTrace(sc1->Trace());
AddSimpleTrace(sc1->Trace());
ValidateChannelTrace(sc1->Trace(), 3, GetParam());
AddSimpleTrace(sc1->Trace());
AddSimpleTrace(sc1->Trace());
AddSimpleTrace(sc1->Trace());
ValidateChannelTrace(sc1->Trace(), 6, GetParam());
AddSimpleTrace(sc1->trace());
AddSimpleTrace(sc1->trace());
AddSimpleTrace(sc1->trace());
ValidateChannelTrace(sc1->trace(), 3, GetParam());
AddSimpleTrace(sc1->trace());
AddSimpleTrace(sc1->trace());
AddSimpleTrace(sc1->trace());
ValidateChannelTrace(sc1->trace(), 6, GetParam());
AddSimpleTrace(&tracer);
AddSimpleTrace(&tracer);
ValidateChannelTrace(&tracer, 5, GetParam());
@ -206,20 +209,20 @@ TEST_P(ChannelTracerTest, TestNesting) {
ChannelTrace::Severity::Info,
grpc_slice_from_static_string("subchannel one created"), sc1);
ValidateChannelTrace(&tracer, 3, GetParam());
AddSimpleTrace(sc1->Trace());
AddSimpleTrace(sc1->trace());
ChannelFixture channel2(GetParam());
RefCountedPtr<Channel> conn1 =
MakeRefCounted<Channel>(channel2.channel(), GetParam());
// nesting one level deeper.
sc1->Trace()->AddTraceEventReferencingSubchannel(
sc1->trace()->AddTraceEventReferencingSubchannel(
ChannelTrace::Severity::Info,
grpc_slice_from_static_string("connection one created"), conn1);
ValidateChannelTrace(&tracer, 3, GetParam());
AddSimpleTrace(conn1->Trace());
AddSimpleTrace(conn1->trace());
AddSimpleTrace(&tracer);
AddSimpleTrace(&tracer);
ValidateChannelTrace(&tracer, 5, GetParam());
ValidateChannelTrace(conn1->Trace(), 1, GetParam());
ValidateChannelTrace(conn1->trace(), 1, GetParam());
ChannelFixture channel3(GetParam());
RefCountedPtr<Channel> sc2 =
MakeRefCounted<Channel>(channel3.channel(), GetParam());

@ -42,6 +42,16 @@
namespace grpc_core {
namespace channelz {
namespace testing {
// testing peer to access channel internals
class ChannelPeer {
public:
ChannelPeer (Channel* channel) : channel_(channel) {}
grpc_millis last_call_started_millis() { return (grpc_millis)gpr_atm_no_barrier_load(&channel_->last_call_started_millis_); }
private:
Channel* channel_;
};
namespace {
grpc_json* GetJsonChild(grpc_json* parent, const char* key) {
@ -100,15 +110,9 @@ void ValidateChannel(Channel* channel, validate_channel_data_args args) {
gpr_free(json_str);
}
char* GetLastCallStartedTimestamp(Channel* channel) {
char* json_str = channel->RenderJSON();
grpc_json* json = grpc_json_parse_string(json_str);
grpc_json* data = GetJsonChild(json, "data");
grpc_json* timestamp = GetJsonChild(data, "lastCallStartedTimestamp");
char* ts_str = grpc_json_dump_to_string(timestamp, 0);
grpc_json_destroy(json);
gpr_free(json_str);
return ts_str;
grpc_millis GetLastCallStartedMillis(Channel* channel) {
ChannelPeer peer(channel);
return peer.last_call_started_millis();
}
void ChannelzSleep(int64_t sleep_us) {
@ -134,16 +138,16 @@ TEST_P(ChannelzChannelTest, BasicChannelAPIFunctionality) {
ChannelFixture channel(GetParam());
intptr_t uuid = grpc_channel_get_uuid(channel.channel());
Channel* channelz_channel = ChannelzRegistry::Get<Channel>(uuid);
channelz_channel->CallStarted();
channelz_channel->CallFailed();
channelz_channel->CallSucceeded();
channelz_channel->RecordCallStarted();
channelz_channel->RecordCallFailed();
channelz_channel->RecordCallSucceeded();
ValidateChannel(channelz_channel, {1, 1, 1});
channelz_channel->CallStarted();
channelz_channel->CallFailed();
channelz_channel->CallSucceeded();
channelz_channel->CallStarted();
channelz_channel->CallFailed();
channelz_channel->CallSucceeded();
channelz_channel->RecordCallStarted();
channelz_channel->RecordCallFailed();
channelz_channel->RecordCallSucceeded();
channelz_channel->RecordCallStarted();
channelz_channel->RecordCallFailed();
channelz_channel->RecordCallSucceeded();
ValidateChannel(channelz_channel, {3, 3, 3});
}
@ -154,34 +158,28 @@ TEST_P(ChannelzChannelTest, LastCallStartedTimestamp) {
Channel* channelz_channel = ChannelzRegistry::Get<Channel>(uuid);
// start a call to set the last call started timestamp
channelz_channel->CallStarted();
char* ts1 = GetLastCallStartedTimestamp(channelz_channel);
channelz_channel->RecordCallStarted();
grpc_millis millis1 = GetLastCallStartedMillis(channelz_channel);
// time gone by should not affect the timestamp
ChannelzSleep(100);
char* ts2 = GetLastCallStartedTimestamp(channelz_channel);
EXPECT_STREQ(ts1, ts2);
grpc_millis millis2 = GetLastCallStartedMillis(channelz_channel);
EXPECT_EQ(millis1, millis2);
// calls succeeded or failed should not affect the timestamp
ChannelzSleep(100);
channelz_channel->CallFailed();
channelz_channel->CallSucceeded();
char* ts3 = GetLastCallStartedTimestamp(channelz_channel);
EXPECT_STREQ(ts1, ts3);
channelz_channel->RecordCallFailed();
channelz_channel->RecordCallSucceeded();
grpc_millis millis3 = GetLastCallStartedMillis(channelz_channel);
EXPECT_EQ(millis1, millis3);
// another call started should affect the timestamp
// sleep for extra long to avoid flakes (since we cache Now())
ChannelzSleep(5000);
grpc_core::ExecCtx::Get()->InvalidateNow();
channelz_channel->CallStarted();
char* ts4 = GetLastCallStartedTimestamp(channelz_channel);
EXPECT_STRNE(ts1, ts4);
// clean up
gpr_free(ts1);
gpr_free(ts2);
gpr_free(ts3);
gpr_free(ts4);
channelz_channel->RecordCallStarted();
grpc_millis millis4 = GetLastCallStartedMillis(channelz_channel);
EXPECT_NE(millis1, millis4);
}
INSTANTIATE_TEST_CASE_P(ChannelzChannelTestSweep, ChannelzChannelTest,

Loading…
Cancel
Save