Restructure heirarchy

reviewable/pr16055/r5
ncteisen 7 years ago
parent a8d5c21b88
commit bfdfe9fefb
  1. 1
      src/core/ext/filters/client_channel/client_channel.cc
  2. 16
      src/core/ext/filters/client_channel/client_channel_channelz.cc
  3. 7
      src/core/ext/filters/client_channel/client_channel_channelz.h
  4. 9
      src/core/ext/filters/client_channel/subchannel.cc
  5. 1
      src/core/lib/channel/channel_stack.h
  6. 16
      src/core/lib/channel/channelz.cc
  7. 21
      src/core/lib/channel/channelz.h
  8. 27
      src/core/lib/surface/call.cc
  9. 13
      src/core/lib/surface/call.h
  10. 4
      src/core/lib/surface/channel.cc
  11. 1
      test/core/channel/channel_stack_test.cc
  12. 112
      test/core/channel/channel_trace_test.cc
  13. 38
      test/core/channel/channelz_test.cc

@ -3094,7 +3094,6 @@ static grpc_error* cc_init_call_elem(grpc_call_element* elem,
calld->arena = args->arena;
calld->owning_call = args->call_stack;
calld->call_combiner = args->call_combiner;
calld->call = args->call;
if (GPR_LIKELY(chand->deadline_checking_enabled)) {
grpc_deadline_state_init(elem, args->call_stack, args->call_combiner,
calld->deadline);

@ -124,8 +124,8 @@ grpc_json* ClientChannelNode::RenderJson() {
grpc_json_create_child(nullptr, json, "target", target_view(),
GRPC_JSON_STRING, false);
// as CallCountingAndTracingNode to populate trace and call count data.
PopulateTrace(json);
PopulateCallData(json);
counter_and_tracer()->PopulateTrace(json);
counter_and_tracer()->PopulateCallData(json);
// reset to the top level
json = top_level_json;
PopulateChildRefs(json);
@ -148,11 +148,11 @@ RefCountedPtr<ChannelNode> ClientChannelNode::MakeClientChannelNode(
SubchannelNode::SubchannelNode(grpc_subchannel* subchannel,
size_t channel_tracer_max_nodes)
: CallCountingAndTracingNode(EntityType::kSubchannel,
channel_tracer_max_nodes),
: BaseNode(EntityType::kSubchannel),
subchannel_(subchannel),
target_(UniquePtr<char>(
gpr_strdup(grpc_subchannel_get_target(subchannel_)))) {}
target_(
UniquePtr<char>(gpr_strdup(grpc_subchannel_get_target(subchannel_)))),
counter_and_tracer_(channel_tracer_max_nodes) {}
SubchannelNode::~SubchannelNode() {}
@ -192,8 +192,8 @@ grpc_json* SubchannelNode::RenderJson() {
GPR_ASSERT(target_.get() != nullptr);
grpc_json_create_child(nullptr, json, "target", target_.get(),
GRPC_JSON_STRING, false);
PopulateTrace(json);
PopulateCallData(json);
counter_and_tracer_.PopulateTrace(json);
counter_and_tracer_.PopulateCallData(json);
return top_level_json;
}

@ -64,7 +64,7 @@ class ClientChannelNode : public ChannelNode {
};
// Handles channelz bookkeeping for sockets
class SubchannelNode : public CallCountingAndTracingNode {
class SubchannelNode : public BaseNode {
public:
SubchannelNode(grpc_subchannel* subchannel, size_t channel_tracer_max_nodes);
~SubchannelNode() override;
@ -76,9 +76,14 @@ class SubchannelNode : public CallCountingAndTracingNode {
grpc_json* RenderJson() override;
CallCountingAndTracingNode* counter_and_tracer() {
return &counter_and_tracer_;
}
private:
grpc_subchannel* subchannel_;
UniquePtr<char> target_;
CallCountingAndTracingNode counter_and_tracer_;
void PopulateConnectivityState(grpc_json* json);
};

@ -183,7 +183,7 @@ static void connection_destroy(void* arg, grpc_error* error) {
static void subchannel_destroy(void* arg, grpc_error* error) {
grpc_subchannel* c = static_cast<grpc_subchannel*>(arg);
if (c->channelz_subchannel != nullptr) {
c->channelz_subchannel->trace()->AddTraceEvent(
c->channelz_subchannel->counter_and_tracer()->trace()->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Subchannel destroyed"));
c->channelz_subchannel->MarkSubchannelDestroyed();
@ -397,7 +397,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
c->channelz_subchannel =
grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>(
c, channel_tracer_max_nodes);
c->channelz_subchannel->trace()->AddTraceEvent(
c->channelz_subchannel->counter_and_tracer()->trace()->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Subchannel created"));
}
@ -857,7 +857,6 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
args.deadline, /* deadline */
args.arena, /* arena */
args.call_combiner, /* call_combiner */
args.call /* call */
};
grpc_error* error = grpc_call_stack_init(
channel_stack_, 1, subchannel_call_destroy, *call, &call_args);
@ -866,10 +865,6 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
gpr_log(GPR_ERROR, "error: %s", error_string);
return error;
}
if (channelz_subchannel_ != nullptr) {
channelz_subchannel_->RecordCallStarted();
grpc_call_set_channelz_subchannel(args.call, channelz_subchannel_);
}
grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
return GRPC_ERROR_NONE;
}

@ -71,7 +71,6 @@ typedef struct {
grpc_millis deadline;
gpr_arena* arena;
grpc_call_combiner* call_combiner;
grpc_call* call;
} grpc_call_element_args;
typedef struct {

@ -54,8 +54,7 @@ char* BaseNode::RenderJsonString() {
}
CallCountingAndTracingNode::CallCountingAndTracingNode(
EntityType type, size_t channel_tracer_max_nodes)
: BaseNode(type) {
size_t channel_tracer_max_nodes) {
trace_.Init(channel_tracer_max_nodes);
gpr_atm_no_barrier_store(&last_call_started_millis_,
(gpr_atm)ExecCtx::Get()->Now());
@ -103,12 +102,11 @@ void CallCountingAndTracingNode::PopulateCallData(grpc_json* json) {
ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel)
: CallCountingAndTracingNode(is_top_level_channel
? EntityType::kTopLevelChannel
: EntityType::kInternalChannel,
channel_tracer_max_nodes),
: BaseNode(is_top_level_channel ? EntityType::kTopLevelChannel
: EntityType::kInternalChannel),
channel_(channel),
target_(UniquePtr<char>(grpc_channel_get_target(channel_))) {}
target_(UniquePtr<char>(grpc_channel_get_target(channel_))),
counter_and_tracer_(channel_tracer_max_nodes) {}
ChannelNode::~ChannelNode() {}
@ -137,8 +135,8 @@ grpc_json* ChannelNode::RenderJson() {
grpc_json_create_child(nullptr, json, "target", target_.get(),
GRPC_JSON_STRING, false);
// as CallCountingAndTracingNode to populate trace and call count data.
PopulateTrace(json);
PopulateCallData(json);
counter_and_tracer_.PopulateTrace(json);
counter_and_tracer_.PopulateCallData(json);
return top_level_json;
}

@ -86,10 +86,10 @@ class BaseNode : public RefCounted<BaseNode> {
// - track last_call_started_timestamp
// - hold the channel trace.
// - perform common rendering.
class CallCountingAndTracingNode : public BaseNode {
class CallCountingAndTracingNode {
public:
CallCountingAndTracingNode(EntityType type, size_t channel_tracer_max_nodes);
~CallCountingAndTracingNode() override;
CallCountingAndTracingNode(size_t channel_tracer_max_nodes);
~CallCountingAndTracingNode();
void RecordCallStarted();
void RecordCallFailed() {
@ -118,7 +118,7 @@ class CallCountingAndTracingNode : public BaseNode {
};
// Handles channelz bookkeeping for channels
class ChannelNode : public CallCountingAndTracingNode {
class ChannelNode : public BaseNode {
public:
static RefCountedPtr<ChannelNode> MakeChannelNode(
grpc_channel* channel, size_t channel_tracer_max_nodes,
@ -137,6 +137,10 @@ class ChannelNode : public CallCountingAndTracingNode {
bool ChannelIsDestroyed() { return channel_ == nullptr; }
CallCountingAndTracingNode* counter_and_tracer() {
return &counter_and_tracer_;
}
protected:
// provides view of target for child.
char* target_view() { return target_.get(); }
@ -144,15 +148,14 @@ class ChannelNode : public CallCountingAndTracingNode {
private:
grpc_channel* channel_ = nullptr;
UniquePtr<char> target_;
CallCountingAndTracingNode counter_and_tracer_;
};
// Handles channelz bookkeeping for servers
// TODO(ncteisen): implement in subsequent PR.
class ServerNode : public CallCountingAndTracingNode {
class ServerNode : public BaseNode {
public:
ServerNode(size_t channel_tracer_max_nodes)
: CallCountingAndTracingNode(EntityType::kServer,
channel_tracer_max_nodes) {}
ServerNode(size_t channel_tracer_max_nodes) : BaseNode(EntityType::kServer) {}
~ServerNode() override {}
};
@ -162,8 +165,6 @@ class SocketNode : public BaseNode {
public:
SocketNode() : BaseNode(EntityType::kSocket) {}
~SocketNode() override {}
private:
};
// Creation functions

@ -170,11 +170,6 @@ struct grpc_call {
/* parent_call* */ gpr_atm parent_call_atm;
child_call* child;
// the call holds onto this so that once the call knows if the RPC was
// a success or failure, it can update the channelz bookkeeping for the
// subchannel that sent it.
grpc_core::channelz::CallCountingAndTracingNode* channelz_subchannel_;
/* client or server call */
bool is_client;
/** has grpc_call_unref been called */
@ -274,11 +269,6 @@ struct grpc_call {
gpr_atm recv_state;
};
void grpc_call_set_channelz_subchannel(
grpc_call* call, grpc_core::channelz::CallCountingAndTracingNode* node) {
call->channelz_subchannel_ = node;
}
grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
grpc_core::TraceFlag grpc_compression_trace(false, "compression");
@ -454,8 +444,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
call->start_time,
send_deadline,
call->arena,
&call->call_combiner,
call};
&call->call_combiner};
add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call,
call, &call_args));
// Publish this call to parent only after the call stack has been initialized.
@ -500,7 +489,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
grpc_core::channelz::ChannelNode* channelz_channel =
grpc_channel_get_channelz_node(call->channel);
if (channelz_channel != nullptr) {
channelz_channel->RecordCallStarted();
channelz_channel->counter_and_tracer()->RecordCallStarted();
}
grpc_slice_unref_internal(path);
@ -1279,17 +1268,9 @@ static void post_batch_completion(batch_control* bctl) {
grpc_channel_get_channelz_node(call->channel);
if (channelz_channel != nullptr) {
if (*call->final_op.client.status != GRPC_STATUS_OK) {
channelz_channel->RecordCallFailed();
} else {
channelz_channel->RecordCallSucceeded();
}
}
// Record channelz data for the subchannel.
if (call->channelz_subchannel_ != nullptr) {
if (*call->final_op.client.status != GRPC_STATUS_OK) {
call->channelz_subchannel_->RecordCallFailed();
channelz_channel->counter_and_tracer()->RecordCallFailed();
} else {
call->channelz_subchannel_->RecordCallSucceeded();
channelz_channel->counter_and_tracer()->RecordCallSucceeded();
}
}
GRPC_ERROR_UNREF(error);

@ -110,19 +110,6 @@ size_t grpc_call_get_initial_size_estimate();
grpc_compression_algorithm grpc_call_compression_for_level(
grpc_call* call, grpc_compression_level level);
namespace grpc_core {
namespace channelz {
class CallCountingAndTracingNode;
} // namespace channelz
} // namespace grpc_core
// We need this so that a subchannel selected for a call can add itself to
// the call's data structure. This allows the call to trigger the correct
// channelz bookkeeping on the subchannel once the call knows if the RPC was
// successful or not.
void grpc_call_set_channelz_subchannel(
grpc_call* call, grpc_core::channelz::CallCountingAndTracingNode* node);
extern grpc_core::TraceFlag grpc_call_error_trace;
extern grpc_core::TraceFlag grpc_compression_trace;

@ -170,7 +170,7 @@ grpc_channel* grpc_channel_create_with_builder(
bool is_top_level_channel = channel->is_client && !internal_channel;
channel->channelz_channel = channel_node_create_func(
channel, channel_tracer_max_nodes, is_top_level_channel);
channel->channelz_channel->trace()->AddTraceEvent(
channel->channelz_channel->counter_and_tracer()->trace()->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel created"));
}
@ -417,7 +417,7 @@ void grpc_channel_internal_unref(grpc_channel* c REF_ARG) {
static void destroy_channel(void* arg, grpc_error* error) {
grpc_channel* channel = static_cast<grpc_channel*>(arg);
if (channel->channelz_channel != nullptr) {
channel->channelz_channel->trace()->AddTraceEvent(
channel->channelz_channel->counter_and_tracer()->trace()->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel destroyed"));
channel->channelz_channel->MarkChannelDestroyed();

@ -125,7 +125,6 @@ static void test_create_channel_stack(void) {
GRPC_MILLIS_INF_FUTURE, /* deadline */
nullptr, /* arena */
nullptr, /* call_combiner */
nullptr /* call */
};
grpc_error* error =
grpc_call_stack_init(channel_stack, 1, free_call, call_stack, &args);

@ -160,14 +160,14 @@ TEST_P(ChannelTracerTest, ComplexTest) {
ChannelTrace::Severity::Info,
grpc_slice_from_static_string("subchannel one created"), sc1);
ValidateChannelTrace(&tracer, 3, GetParam());
AddSimpleTrace(sc1->trace());
AddSimpleTrace(sc1->trace());
AddSimpleTrace(sc1->trace());
ValidateChannelTrace(sc1->trace(), 3, GetParam());
AddSimpleTrace(sc1->trace());
AddSimpleTrace(sc1->trace());
AddSimpleTrace(sc1->trace());
ValidateChannelTrace(sc1->trace(), 6, GetParam());
AddSimpleTrace(sc1->counter_and_tracer()->trace());
AddSimpleTrace(sc1->counter_and_tracer()->trace());
AddSimpleTrace(sc1->counter_and_tracer()->trace());
ValidateChannelTrace(sc1->counter_and_tracer()->trace(), 3, GetParam());
AddSimpleTrace(sc1->counter_and_tracer()->trace());
AddSimpleTrace(sc1->counter_and_tracer()->trace());
AddSimpleTrace(sc1->counter_and_tracer()->trace());
ValidateChannelTrace(sc1->counter_and_tracer()->trace(), 6, GetParam());
AddSimpleTrace(&tracer);
AddSimpleTrace(&tracer);
ValidateChannelTrace(&tracer, 5, GetParam());
@ -191,55 +191,53 @@ TEST_P(ChannelTracerTest, ComplexTest) {
sc2.reset(nullptr);
}
// // Test a case in which the parent channel has subchannels and the
// subchannels
// // have connections. Ensures that everything lives as long as it should then
// // gets deleted.
// TEST_P(ChannelTracerTest, TestNesting) {
// grpc_core::ExecCtx exec_ctx;
// ChannelTrace tracer(GetParam());
// AddSimpleTrace(&tracer);
// AddSimpleTrace(&tracer);
// ValidateChannelTrace(&tracer, 2, GetParam());
// ChannelFixture channel1(GetParam());
// RefCountedPtr<ChannelNode> sc1 =
// MakeRefCounted<ChannelNode>(channel1.channel(), GetParam(), true);
// tracer.AddTraceEventReferencingChannel(
// ChannelTrace::Severity::Info,
// grpc_slice_from_static_string("subchannel one created"), sc1);
// ValidateChannelTrace(&tracer, 3, GetParam());
// AddSimpleTrace(sc1->trace());
// ChannelFixture channel2(GetParam());
// RefCountedPtr<ChannelNode> conn1 =
// MakeRefCounted<ChannelNode>(channel2.channel(), GetParam(), true);
// // nesting one level deeper.
// sc1->trace()->AddTraceEventReferencingChannel(
// ChannelTrace::Severity::Info,
// grpc_slice_from_static_string("connection one created"), conn1);
// ValidateChannelTrace(&tracer, 3, GetParam());
// AddSimpleTrace(conn1->trace());
// AddSimpleTrace(&tracer);
// AddSimpleTrace(&tracer);
// ValidateChannelTrace(&tracer, 5, GetParam());
// ValidateChannelTrace(conn1->trace(), 1, GetParam());
// ChannelFixture channel3(GetParam());
// RefCountedPtr<ChannelNode> sc2 =
// MakeRefCounted<ChannelNode>(channel3.channel(), GetParam(), true);
// tracer.AddTraceEventReferencingChannel(
// ChannelTrace::Severity::Info,
// grpc_slice_from_static_string("subchannel two created"), sc2);
// // this trace should not get added to the parents children since it is
// already
// // present in the tracer.
// tracer.AddTraceEventReferencingChannel(
// ChannelTrace::Severity::Warning,
// grpc_slice_from_static_string("subchannel one inactive"), sc1);
// AddSimpleTrace(&tracer);
// ValidateChannelTrace(&tracer, 8, GetParam());
// sc1.reset(nullptr);
// sc2.reset(nullptr);
// conn1.reset(nullptr);
// }
// Test a case in which the parent channel has subchannels and the subchannels
// have connections. Ensures that everything lives as long as it should then
// gets deleted.
TEST_P(ChannelTracerTest, TestNesting) {
grpc_core::ExecCtx exec_ctx;
ChannelTrace tracer(GetParam());
AddSimpleTrace(&tracer);
AddSimpleTrace(&tracer);
ValidateChannelTrace(&tracer, 2, GetParam());
ChannelFixture channel1(GetParam());
RefCountedPtr<ChannelNode> sc1 =
MakeRefCounted<ChannelNode>(channel1.channel(), GetParam(), true);
tracer.AddTraceEventReferencingChannel(
ChannelTrace::Severity::Info,
grpc_slice_from_static_string("subchannel one created"), sc1);
ValidateChannelTrace(&tracer, 3, GetParam());
AddSimpleTrace(sc1->counter_and_tracer()->trace());
ChannelFixture channel2(GetParam());
RefCountedPtr<ChannelNode> conn1 =
MakeRefCounted<ChannelNode>(channel2.channel(), GetParam(), true);
// nesting one level deeper.
sc1->counter_and_tracer()->trace()->AddTraceEventReferencingChannel(
ChannelTrace::Severity::Info,
grpc_slice_from_static_string("connection one created"), conn1);
ValidateChannelTrace(&tracer, 3, GetParam());
AddSimpleTrace(conn1->counter_and_tracer()->trace());
AddSimpleTrace(&tracer);
AddSimpleTrace(&tracer);
ValidateChannelTrace(&tracer, 5, GetParam());
ValidateChannelTrace(conn1->counter_and_tracer()->trace(), 1, GetParam());
ChannelFixture channel3(GetParam());
RefCountedPtr<ChannelNode> sc2 =
MakeRefCounted<ChannelNode>(channel3.channel(), GetParam(), true);
tracer.AddTraceEventReferencingChannel(
ChannelTrace::Severity::Info,
grpc_slice_from_static_string("subchannel two created"), sc2);
// this trace should not get added to the parents children since it is already
// present in the tracer.
tracer.AddTraceEventReferencingChannel(
ChannelTrace::Severity::Warning,
grpc_slice_from_static_string("subchannel one inactive"), sc1);
AddSimpleTrace(&tracer);
ValidateChannelTrace(&tracer, 8, GetParam());
sc1.reset(nullptr);
sc2.reset(nullptr);
conn1.reset(nullptr);
}
INSTANTIATE_TEST_CASE_P(ChannelTracerTestSweep, ChannelTracerTest,
::testing::Values(0, 1, 2, 6, 10, 15));

@ -201,16 +201,16 @@ TEST_P(ChannelzChannelTest, BasicChannelAPIFunctionality) {
ChannelFixture channel(GetParam());
ChannelNode* channelz_channel =
grpc_channel_get_channelz_node(channel.channel());
channelz_channel->RecordCallStarted();
channelz_channel->RecordCallFailed();
channelz_channel->RecordCallSucceeded();
channelz_channel->counter_and_tracer()->RecordCallStarted();
channelz_channel->counter_and_tracer()->RecordCallFailed();
channelz_channel->counter_and_tracer()->RecordCallSucceeded();
ValidateChannel(channelz_channel, {1, 1, 1});
channelz_channel->RecordCallStarted();
channelz_channel->RecordCallFailed();
channelz_channel->RecordCallSucceeded();
channelz_channel->RecordCallStarted();
channelz_channel->RecordCallFailed();
channelz_channel->RecordCallSucceeded();
channelz_channel->counter_and_tracer()->RecordCallStarted();
channelz_channel->counter_and_tracer()->RecordCallFailed();
channelz_channel->counter_and_tracer()->RecordCallSucceeded();
channelz_channel->counter_and_tracer()->RecordCallStarted();
channelz_channel->counter_and_tracer()->RecordCallFailed();
channelz_channel->counter_and_tracer()->RecordCallSucceeded();
ValidateChannel(channelz_channel, {3, 3, 3});
}
@ -220,23 +220,27 @@ TEST_P(ChannelzChannelTest, LastCallStartedMillis) {
ChannelNode* channelz_channel =
grpc_channel_get_channelz_node(channel.channel());
// start a call to set the last call started timestamp
channelz_channel->RecordCallStarted();
grpc_millis millis1 = GetLastCallStartedMillis(channelz_channel);
channelz_channel->counter_and_tracer()->RecordCallStarted();
grpc_millis millis1 =
GetLastCallStartedMillis(channelz_channel->counter_and_tracer());
// time gone by should not affect the timestamp
ChannelzSleep(100);
grpc_millis millis2 = GetLastCallStartedMillis(channelz_channel);
grpc_millis millis2 =
GetLastCallStartedMillis(channelz_channel->counter_and_tracer());
EXPECT_EQ(millis1, millis2);
// calls succeeded or failed should not affect the timestamp
ChannelzSleep(100);
channelz_channel->RecordCallFailed();
channelz_channel->RecordCallSucceeded();
grpc_millis millis3 = GetLastCallStartedMillis(channelz_channel);
channelz_channel->counter_and_tracer()->RecordCallFailed();
channelz_channel->counter_and_tracer()->RecordCallSucceeded();
grpc_millis millis3 =
GetLastCallStartedMillis(channelz_channel->counter_and_tracer());
EXPECT_EQ(millis1, millis3);
// another call started should affect the timestamp
// sleep for extra long to avoid flakes (since we cache Now())
ChannelzSleep(5000);
channelz_channel->RecordCallStarted();
grpc_millis millis4 = GetLastCallStartedMillis(channelz_channel);
channelz_channel->counter_and_tracer()->RecordCallStarted();
grpc_millis millis4 =
GetLastCallStartedMillis(channelz_channel->counter_and_tracer());
EXPECT_NE(millis1, millis4);
}

Loading…
Cancel
Save