Merge pull request #16055 from ncteisen/channelz-subchannels

Channelz Part 4: Add Subchannel Support
pull/16082/head^2
Noah Eisen 6 years ago committed by GitHub
commit 2edfddb66f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      grpc.def
  2. 4
      include/grpc/grpc.h
  3. 104
      src/core/ext/filters/client_channel/client_channel.cc
  4. 60
      src/core/ext/filters/client_channel/client_channel_channelz.cc
  5. 56
      src/core/ext/filters/client_channel/client_channel_channelz.h
  6. 2
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  7. 2
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  8. 38
      src/core/ext/filters/client_channel/subchannel.cc
  9. 11
      src/core/ext/filters/client_channel/subchannel.h
  10. 73
      src/core/lib/channel/channel_trace.cc
  11. 28
      src/core/lib/channel/channel_trace.h
  12. 119
      src/core/lib/channel/channelz.cc
  13. 154
      src/core/lib/channel/channelz.h
  14. 60
      src/core/lib/channel/channelz_registry.cc
  15. 50
      src/core/lib/channel/channelz_registry.h
  16. 5
      src/core/lib/surface/channel.cc
  17. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.c
  18. 3
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  19. 2
      test/core/channel/channel_stack_test.cc
  20. 50
      test/core/channel/channel_trace_test.cc
  21. 30
      test/core/channel/channelz_registry_test.cc
  22. 36
      test/core/channel/channelz_test.cc
  23. 1
      test/core/surface/public_headers_must_be_c89.c
  24. 4
      test/cpp/util/channel_trace_proto_helper.cc
  25. 1
      test/cpp/util/channel_trace_proto_helper.h

@ -75,6 +75,7 @@ EXPORTS
grpc_resource_quota_arg_vtable
grpc_channelz_get_top_channels
grpc_channelz_get_channel
grpc_channelz_get_subchannel
grpc_insecure_channel_create_from_fd
grpc_server_add_insecure_channel_from_fd
grpc_use_signal

@ -503,6 +503,10 @@ GRPCAPI char* grpc_channelz_get_top_channels(intptr_t start_channel_id);
is allocated and must be freed by the application. */
GRPCAPI char* grpc_channelz_get_channel(intptr_t channel_id);
/* Returns a single Subchannel, or else a NOT_FOUND code. The returned string
is allocated and must be freed by the application. */
GRPCAPI char* grpc_channelz_get_subchannel(intptr_t subchannel_id);
#ifdef __cplusplus
}
#endif

@ -933,6 +933,11 @@ typedef struct client_channel_call_data {
grpc_closure pick_closure;
grpc_closure pick_cancel_closure;
// state needed to support channelz interception of recv trailing metadata.
grpc_closure recv_trailing_metadata_ready_channelz;
grpc_closure* original_recv_trailing_metadata;
grpc_metadata_batch* recv_trailing_metadata;
grpc_polling_entity* pollent;
bool pollent_added_to_interested_parties;
@ -994,6 +999,8 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
static void on_complete(void* arg, grpc_error* error);
static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
static void start_pick_locked(void* arg, grpc_error* ignored);
static void maybe_intercept_recv_trailing_metadata_for_channelz(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
//
// send op data caching
@ -1292,6 +1299,7 @@ static void pending_batches_resume(grpc_call_element* elem) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
maybe_intercept_recv_trailing_metadata_for_channelz(elem, batch);
batch->handler_private.extra_arg = calld->subchannel_call;
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
resume_pending_batch_in_call_combiner, batch,
@ -1777,23 +1785,22 @@ static void recv_message_ready(void* arg, grpc_error* error) {
// recv_trailing_metadata handling
//
// Sets *status and *server_pushback_md based on batch_data and error.
static void get_call_status(subchannel_batch_data* batch_data,
grpc_error* error, grpc_status_code* status,
// Sets *status and *server_pushback_md based on md_batch and error.
// Only sets *server_pushback_md if server_pushback_md != nullptr.
static void get_call_status(grpc_call_element* elem,
grpc_metadata_batch* md_batch, grpc_error* error,
grpc_status_code* status,
grpc_mdelem** server_pushback_md) {
grpc_call_element* elem = batch_data->elem;
call_data* calld = static_cast<call_data*>(elem->call_data);
if (error != GRPC_ERROR_NONE) {
grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
nullptr);
} else {
grpc_metadata_batch* md_batch =
batch_data->batch.payload->recv_trailing_metadata
.recv_trailing_metadata;
GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
*status =
grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
if (server_pushback_md != nullptr &&
md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
*server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
}
}
@ -1966,8 +1973,19 @@ static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
// Get the call's status and check for server pushback metadata.
grpc_status_code status = GRPC_STATUS_OK;
grpc_mdelem* server_pushback_md = nullptr;
get_call_status(batch_data, GRPC_ERROR_REF(error), &status,
grpc_metadata_batch* md_batch =
batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status,
&server_pushback_md);
grpc_core::channelz::SubchannelNode* channelz_subchannel =
calld->pick.connected_subchannel->channelz_subchannel();
if (channelz_subchannel != nullptr) {
if (status == GRPC_STATUS_OK) {
channelz_subchannel->RecordCallSucceeded();
} else {
channelz_subchannel->RecordCallFailed();
}
}
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
calld, grpc_status_code_to_string(status));
@ -2571,6 +2589,69 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
closures.RunClosures(calld->call_combiner);
}
//
// Channelz
//
static void recv_trailing_metadata_ready_channelz(void* arg,
grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, "
"error=%s",
chand, calld, grpc_error_string(error));
}
GPR_ASSERT(calld->recv_trailing_metadata != nullptr);
grpc_status_code status = GRPC_STATUS_OK;
grpc_metadata_batch* md_batch = calld->recv_trailing_metadata;
get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr);
grpc_core::channelz::SubchannelNode* channelz_subchannel =
calld->pick.connected_subchannel->channelz_subchannel();
GPR_ASSERT(channelz_subchannel != nullptr);
if (status == GRPC_STATUS_OK) {
channelz_subchannel->RecordCallSucceeded();
} else {
channelz_subchannel->RecordCallFailed();
}
calld->recv_trailing_metadata = nullptr;
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error);
}
// If channelz is enabled, intercept recv_trailing so that we may check the
// status and associate it to a subchannel.
// Returns true if callback was intercepted, false otherwise.
static void maybe_intercept_recv_trailing_metadata_for_channelz(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
call_data* calld = static_cast<call_data*>(elem->call_data);
// only intercept payloads with recv trailing.
if (!batch->recv_trailing_metadata) {
return;
}
// only add interceptor is channelz is enabled.
if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) {
return;
}
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO,
"calld=%p batch=%p: intercepting recv trailing for channelz", calld,
batch);
}
GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz,
recv_trailing_metadata_ready_channelz, elem,
grpc_schedule_on_exec_ctx);
// save some state needed for the interception callback.
GPR_ASSERT(calld->recv_trailing_metadata == nullptr);
calld->recv_trailing_metadata =
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
calld->original_recv_trailing_metadata =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->recv_trailing_metadata_ready_channelz;
}
//
// LB pick
//
@ -2600,6 +2681,11 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
new_error = grpc_error_add_child(new_error, error);
pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
} else {
grpc_core::channelz::SubchannelNode* channelz_subchannel =
calld->pick.connected_subchannel->channelz_subchannel();
if (channelz_subchannel != nullptr) {
channelz_subchannel->RecordCallStarted();
}
if (parent_data_size > 0) {
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(

@ -20,10 +20,13 @@
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/lib/channel/channelz_registry.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/connectivity_state.h"
#include <grpc/support/string_util.h>
namespace grpc_core {
namespace channelz {
namespace {
@ -109,5 +112,62 @@ RefCountedPtr<ChannelNode> ClientChannelNode::MakeClientChannelNode(
is_top_level_channel);
}
SubchannelNode::SubchannelNode(grpc_subchannel* subchannel,
size_t channel_tracer_max_nodes)
: BaseNode(EntityType::kSubchannel),
subchannel_(subchannel),
target_(
UniquePtr<char>(gpr_strdup(grpc_subchannel_get_target(subchannel_)))),
trace_(channel_tracer_max_nodes) {}
SubchannelNode::~SubchannelNode() {}
void SubchannelNode::PopulateConnectivityState(grpc_json* json) {
grpc_connectivity_state state;
if (subchannel_ == nullptr) {
state = GRPC_CHANNEL_SHUTDOWN;
} else {
state = grpc_subchannel_check_connectivity(subchannel_, nullptr);
}
json = grpc_json_create_child(nullptr, json, "state", nullptr,
GRPC_JSON_OBJECT, false);
grpc_json_create_child(nullptr, json, "state",
grpc_connectivity_state_name(state), GRPC_JSON_STRING,
false);
}
grpc_json* SubchannelNode::RenderJson() {
grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
grpc_json* json = top_level_json;
grpc_json* json_iterator = nullptr;
json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr,
GRPC_JSON_OBJECT, false);
json = json_iterator;
json_iterator = nullptr;
json_iterator = grpc_json_add_number_string_child(json, json_iterator,
"subchannelId", uuid());
// reset json iterators to top level object
json = top_level_json;
json_iterator = nullptr;
// create and fill the data child.
grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr,
GRPC_JSON_OBJECT, false);
json = data;
json_iterator = nullptr;
PopulateConnectivityState(json);
GPR_ASSERT(target_.get() != nullptr);
grpc_json_create_child(nullptr, json, "target", target_.get(),
GRPC_JSON_STRING, false);
// fill in the channel trace if applicable
grpc_json* trace_json = trace_.RenderJson();
if (trace_json != nullptr) {
trace_json->key = "trace"; // this object is named trace in channelz.proto
grpc_json_link_child(json, trace_json, nullptr);
}
// ask CallCountingHelper to populate trace and call count data.
call_counter_.PopulateCallCounts(json);
return top_level_json;
}
} // namespace channelz
} // namespace grpc_core

@ -23,9 +23,12 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/inlined_vector.h"
typedef struct grpc_subchannel grpc_subchannel;
namespace grpc_core {
// TODO(ncteisen), this only contains the uuids of the children for now,
@ -43,28 +46,59 @@ class ClientChannelNode : public ChannelNode {
grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel);
// Override this functionality since client_channels have a notion of
// channel connectivity.
void PopulateConnectivityState(grpc_json* json) override;
ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel);
virtual ~ClientChannelNode() {}
// Override this functionality since client_channels have subchannels
// Overriding template methods from ChannelNode to render information that
// only ClientChannelNode knows about.
void PopulateConnectivityState(grpc_json* json) override;
void PopulateChildRefs(grpc_json* json) override;
// Helper to create a channel arg to ensure this type of ChannelNode is
// created.
static grpc_arg CreateChannelArg();
protected:
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel);
virtual ~ClientChannelNode() {}
private:
grpc_channel_element* client_channel_;
};
// Handles channelz bookkeeping for sockets
class SubchannelNode : public BaseNode {
public:
SubchannelNode(grpc_subchannel* subchannel, size_t channel_tracer_max_nodes);
~SubchannelNode() override;
void MarkSubchannelDestroyed() {
GPR_ASSERT(subchannel_ != nullptr);
subchannel_ = nullptr;
}
grpc_json* RenderJson() override;
// proxy methods to composed classes.
void AddTraceEvent(ChannelTrace::Severity severity, grpc_slice data) {
trace_.AddTraceEvent(severity, data);
}
void AddTraceEventWithReference(ChannelTrace::Severity severity,
grpc_slice data,
RefCountedPtr<BaseNode> referenced_channel) {
trace_.AddTraceEventWithReference(severity, data,
std::move(referenced_channel));
}
void RecordCallStarted() { call_counter_.RecordCallStarted(); }
void RecordCallFailed() { call_counter_.RecordCallFailed(); }
void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); }
private:
grpc_subchannel* subchannel_;
UniquePtr<char> target_;
CallCountingHelper call_counter_;
ChannelTrace trace_;
void PopulateConnectivityState(grpc_json* json);
};
} // namespace channelz
} // namespace grpc_core

@ -1265,7 +1265,7 @@ void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels,
grpc_core::channelz::ChannelNode* channel_node =
grpc_channel_get_channelz_node(lb_channel_);
if (channel_node != nullptr) {
child_channels->push_back(channel_node->channel_uuid());
child_channels->push_back(channel_node->uuid());
}
}
}

@ -200,7 +200,7 @@ class SubchannelList
grpc_core::channelz::SubchannelNode* subchannel_node =
grpc_subchannel_get_channelz_node(subchannels_[i].subchannel());
if (subchannel_node != nullptr) {
refs_list->push_back(subchannel_node->subchannel_uuid());
refs_list->push_back(subchannel_node->uuid());
}
}
}

@ -183,7 +183,13 @@ static void connection_destroy(void* arg, grpc_error* error) {
static void subchannel_destroy(void* arg, grpc_error* error) {
grpc_subchannel* c = static_cast<grpc_subchannel*>(arg);
c->channelz_subchannel.reset();
if (c->channelz_subchannel != nullptr) {
c->channelz_subchannel->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Subchannel destroyed"));
c->channelz_subchannel->MarkSubchannelDestroyed();
c->channelz_subchannel.reset();
}
gpr_free((void*)c->filters);
grpc_channel_args_destroy(c->args);
grpc_connectivity_state_destroy(&c->state_tracker);
@ -383,9 +389,18 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
const grpc_arg* arg =
grpc_channel_args_find(c->args, GRPC_ARG_ENABLE_CHANNELZ);
bool channelz_enabled = grpc_channel_arg_get_bool(arg, false);
arg = grpc_channel_args_find(c->args,
GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE);
const grpc_integer_options options = {0, 0, INT_MAX};
size_t channel_tracer_max_nodes =
(size_t)grpc_channel_arg_get_integer(arg, options);
if (channelz_enabled) {
c->channelz_subchannel =
grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>();
grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>(
c, channel_tracer_max_nodes);
c->channelz_subchannel->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Subchannel created"));
}
return grpc_subchannel_index_register(key, c);
@ -625,8 +640,8 @@ static bool publish_transport_locked(grpc_subchannel* c) {
}
/* publish */
c->connected_subchannel.reset(
grpc_core::New<grpc_core::ConnectedSubchannel>(stk));
c->connected_subchannel.reset(grpc_core::New<grpc_core::ConnectedSubchannel>(
stk, c->channelz_subchannel.get()));
gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
c->connected_subchannel.get(), c);
@ -770,6 +785,14 @@ void grpc_get_subchannel_address_arg(const grpc_channel_args* args,
}
}
const char* grpc_subchannel_get_target(grpc_subchannel* subchannel) {
const grpc_arg* addr_arg =
grpc_channel_args_find(subchannel->args, GRPC_ARG_SUBCHANNEL_ADDRESS);
const char* addr_str = grpc_channel_arg_get_string(addr_arg);
GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy.
return addr_str;
}
const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args) {
const grpc_arg* addr_arg =
grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS);
@ -786,9 +809,12 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) {
namespace grpc_core {
ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack)
ConnectedSubchannel::ConnectedSubchannel(
grpc_channel_stack* channel_stack,
channelz::SubchannelNode* channelz_subchannel)
: RefCountedWithTracing<ConnectedSubchannel>(&grpc_trace_stream_refcount),
channel_stack_(channel_stack) {}
channel_stack_(channel_stack),
channelz_subchannel_(channelz_subchannel) {}
ConnectedSubchannel::~ConnectedSubchannel() {
GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");

@ -85,7 +85,8 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> {
size_t parent_data_size;
};
explicit ConnectedSubchannel(grpc_channel_stack* channel_stack);
explicit ConnectedSubchannel(grpc_channel_stack* channel_stack,
channelz::SubchannelNode* channelz_subchannel);
~ConnectedSubchannel();
grpc_channel_stack* channel_stack() { return channel_stack_; }
@ -94,9 +95,15 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> {
grpc_closure* closure);
void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call);
channelz::SubchannelNode* channelz_subchannel() {
return channelz_subchannel_;
}
private:
grpc_channel_stack* channel_stack_;
// backpointer to the channelz node in this connected subchannel's
// owning subchannel.
channelz::SubchannelNode* channelz_subchannel_;
};
} // namespace grpc_core
@ -184,6 +191,8 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
void grpc_get_subchannel_address_arg(const grpc_channel_args* args,
grpc_resolved_address* addr);
const char* grpc_subchannel_get_target(grpc_subchannel* subchannel);
/// Returns the URI string for the address to connect to.
const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args);

@ -41,16 +41,14 @@
namespace grpc_core {
namespace channelz {
ChannelTrace::TraceEvent::TraceEvent(
Severity severity, grpc_slice data,
RefCountedPtr<ChannelNode> referenced_channel, ReferencedType type)
ChannelTrace::TraceEvent::TraceEvent(Severity severity, grpc_slice data,
RefCountedPtr<BaseNode> referenced_entity)
: severity_(severity),
data_(data),
timestamp_(grpc_millis_to_timespec(grpc_core::ExecCtx::Get()->Now(),
GPR_CLOCK_REALTIME)),
next_(nullptr),
referenced_channel_(std::move(referenced_channel)),
referenced_type_(type) {}
referenced_entity_(std::move(referenced_entity)) {}
ChannelTrace::TraceEvent::TraceEvent(Severity severity, grpc_slice data)
: severity_(severity),
@ -110,23 +108,13 @@ void ChannelTrace::AddTraceEvent(Severity severity, grpc_slice data) {
AddTraceEventHelper(New<TraceEvent>(severity, data));
}
void ChannelTrace::AddTraceEventReferencingChannel(
Severity severity, grpc_slice data,
RefCountedPtr<ChannelNode> referenced_channel) {
if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0
// create and fill up the new event
AddTraceEventHelper(New<TraceEvent>(
severity, data, std::move(referenced_channel), ReferencedType::Channel));
}
void ChannelTrace::AddTraceEventReferencingSubchannel(
void ChannelTrace::AddTraceEventWithReference(
Severity severity, grpc_slice data,
RefCountedPtr<ChannelNode> referenced_subchannel) {
RefCountedPtr<BaseNode> referenced_entity) {
if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0
// create and fill up the new event
AddTraceEventHelper(New<TraceEvent>(severity, data,
std::move(referenced_subchannel),
ReferencedType::Subchannel));
AddTraceEventHelper(
New<TraceEvent>(severity, data, std::move(referenced_entity)));
}
namespace {
@ -157,19 +145,18 @@ void ChannelTrace::TraceEvent::RenderTraceEvent(grpc_json* json) const {
json_iterator = grpc_json_create_child(json_iterator, json, "timestamp",
gpr_format_timespec(timestamp_),
GRPC_JSON_STRING, true);
if (referenced_channel_ != nullptr) {
if (referenced_entity_ != nullptr) {
const bool is_channel =
(referenced_entity_->type() == BaseNode::EntityType::kTopLevelChannel ||
referenced_entity_->type() == BaseNode::EntityType::kInternalChannel);
char* uuid_str;
gpr_asprintf(&uuid_str, "%" PRIdPTR, referenced_channel_->channel_uuid());
gpr_asprintf(&uuid_str, "%" PRIdPTR, referenced_entity_->uuid());
grpc_json* child_ref = grpc_json_create_child(
json_iterator, json,
(referenced_type_ == ReferencedType::Channel) ? "channelRef"
: "subchannelRef",
json_iterator, json, is_channel ? "channelRef" : "subchannelRef",
nullptr, GRPC_JSON_OBJECT, false);
json_iterator = grpc_json_create_child(
nullptr, child_ref,
(referenced_type_ == ReferencedType::Channel) ? "channelId"
: "subchannelId",
uuid_str, GRPC_JSON_STRING, true);
nullptr, child_ref, is_channel ? "channelId" : "subchannelId", uuid_str,
GRPC_JSON_STRING, true);
json_iterator = child_ref;
}
}
@ -178,24 +165,26 @@ grpc_json* ChannelTrace::RenderJson() const {
if (!max_list_size_)
return nullptr; // tracing is disabled if max_events == 0
grpc_json* json = grpc_json_create(GRPC_JSON_OBJECT);
char* num_events_logged_str;
gpr_asprintf(&num_events_logged_str, "%" PRId64, num_events_logged_);
grpc_json* json_iterator = nullptr;
json_iterator =
grpc_json_create_child(json_iterator, json, "numEventsLogged",
num_events_logged_str, GRPC_JSON_STRING, true);
if (num_events_logged_ > 0) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "numEventsLogged", num_events_logged_);
}
json_iterator = grpc_json_create_child(
json_iterator, json, "creationTimestamp",
gpr_format_timespec(time_created_), GRPC_JSON_STRING, true);
grpc_json* events = grpc_json_create_child(json_iterator, json, "events",
nullptr, GRPC_JSON_ARRAY, false);
json_iterator = nullptr;
TraceEvent* it = head_trace_;
while (it != nullptr) {
json_iterator = grpc_json_create_child(json_iterator, events, nullptr,
nullptr, GRPC_JSON_OBJECT, false);
it->RenderTraceEvent(json_iterator);
it = it->next();
// only add in the event list if it is non-empty.
if (num_events_logged_ > 0) {
grpc_json* events = grpc_json_create_child(json_iterator, json, "events",
nullptr, GRPC_JSON_ARRAY, false);
json_iterator = nullptr;
TraceEvent* it = head_trace_;
while (it != nullptr) {
json_iterator = grpc_json_create_child(json_iterator, events, nullptr,
nullptr, GRPC_JSON_OBJECT, false);
it->RenderTraceEvent(json_iterator);
it = it->next();
}
}
return json;
}

@ -30,7 +30,7 @@
namespace grpc_core {
namespace channelz {
class ChannelNode;
class BaseNode;
// Object used to hold live data for a channel. This data is exposed via the
// channelz service:
@ -55,35 +55,28 @@ class ChannelTrace {
void AddTraceEvent(Severity severity, grpc_slice data);
// Adds a new trace event to the tracing object. This trace event refers to a
// an event on a child of the channel. For example, if this channel has
// created a new subchannel, then it would record that with a TraceEvent
// referencing the new subchannel.
// an event that concerns a different channelz entity. For example, if this
// channel has created a new subchannel, then it would record that with
// a TraceEvent referencing the new subchannel.
//
// TODO(ncteisen): as this call is used more and more throughout the gRPC
// stack, determine if it makes more sense to accept a char* instead of a
// slice.
void AddTraceEventReferencingChannel(
Severity severity, grpc_slice data,
RefCountedPtr<ChannelNode> referenced_channel);
void AddTraceEventReferencingSubchannel(
Severity severity, grpc_slice data,
RefCountedPtr<ChannelNode> referenced_subchannel);
void AddTraceEventWithReference(Severity severity, grpc_slice data,
RefCountedPtr<BaseNode> referenced_channel);
// Creates and returns the raw grpc_json object, so a parent channelz
// object may incorporate the json before rendering.
grpc_json* RenderJson() const;
private:
// Types of objects that can be references by trace events.
enum class ReferencedType { Channel, Subchannel };
// Private class to encapsulate all the data and bookkeeping needed for a
// a trace event.
class TraceEvent {
public:
// Constructor for a TraceEvent that references a different channel.
// Constructor for a TraceEvent that references a channel.
TraceEvent(Severity severity, grpc_slice data,
RefCountedPtr<ChannelNode> referenced_channel,
ReferencedType type);
RefCountedPtr<BaseNode> referenced_entity_);
// Constructor for a TraceEvent that does not reverence a different
// channel.
@ -105,10 +98,7 @@ class ChannelTrace {
gpr_timespec timestamp_;
TraceEvent* next_;
// the tracer object for the (sub)channel that this trace event refers to.
RefCountedPtr<ChannelNode> referenced_channel_;
// the type that the referenced tracer points to. Unused if this trace
// does not point to any channel or subchannel
ReferencedType referenced_type_;
RefCountedPtr<BaseNode> referenced_entity_;
}; // TraceEvent
// Internal helper to add and link in a trace event

@ -41,33 +41,61 @@
namespace grpc_core {
namespace channelz {
ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel)
: channel_(channel),
target_(nullptr),
channel_uuid_(-1),
is_top_level_channel_(is_top_level_channel) {
trace_.Init(channel_tracer_max_nodes);
target_ = UniquePtr<char>(grpc_channel_get_target(channel_));
channel_uuid_ = ChannelzRegistry::RegisterChannelNode(this);
BaseNode::BaseNode(EntityType type)
: type_(type), uuid_(ChannelzRegistry::Register(this)) {}
BaseNode::~BaseNode() { ChannelzRegistry::Unregister(uuid_); }
char* BaseNode::RenderJsonString() {
grpc_json* json = RenderJson();
char* json_str = grpc_json_dump_to_string(json, 0);
grpc_json_destroy(json);
return json_str;
}
CallCountingHelper::CallCountingHelper() {
gpr_atm_no_barrier_store(&last_call_started_millis_,
(gpr_atm)ExecCtx::Get()->Now());
}
ChannelNode::~ChannelNode() {
trace_.Destroy();
ChannelzRegistry::UnregisterChannelNode(channel_uuid_);
}
CallCountingHelper::~CallCountingHelper() {}
void ChannelNode::RecordCallStarted() {
void CallCountingHelper::RecordCallStarted() {
gpr_atm_no_barrier_fetch_add(&calls_started_, (gpr_atm)1);
gpr_atm_no_barrier_store(&last_call_started_millis_,
(gpr_atm)ExecCtx::Get()->Now());
}
void ChannelNode::PopulateConnectivityState(grpc_json* json) {}
void CallCountingHelper::PopulateCallCounts(grpc_json* json) {
grpc_json* json_iterator = nullptr;
if (calls_started_ != 0) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "callsStarted", calls_started_);
}
if (calls_succeeded_ != 0) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "callsSucceeded", calls_succeeded_);
}
if (calls_failed_) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "callsFailed", calls_failed_);
}
gpr_timespec ts =
grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME);
json_iterator =
grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp",
gpr_format_timespec(ts), GRPC_JSON_STRING, true);
}
void ChannelNode::PopulateChildRefs(grpc_json* json) {}
ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel)
: BaseNode(is_top_level_channel ? EntityType::kTopLevelChannel
: EntityType::kInternalChannel),
channel_(channel),
target_(UniquePtr<char>(grpc_channel_get_target(channel_))),
trace_(channel_tracer_max_nodes) {}
ChannelNode::~ChannelNode() {}
grpc_json* ChannelNode::RenderJson() {
// We need to track these three json objects to build our object
@ -80,7 +108,7 @@ grpc_json* ChannelNode::RenderJson() {
json = json_iterator;
json_iterator = nullptr;
json_iterator = grpc_json_add_number_string_child(json, json_iterator,
"channelId", channel_uuid_);
"channelId", uuid());
// reset json iterators to top level object
json = top_level_json;
json_iterator = nullptr;
@ -89,51 +117,28 @@ grpc_json* ChannelNode::RenderJson() {
GRPC_JSON_OBJECT, false);
json = data;
json_iterator = nullptr;
// template method. Child classes may override this to add their specific
// functionality.
PopulateConnectivityState(json);
// populate the target.
GPR_ASSERT(target_.get() != nullptr);
json_iterator = grpc_json_create_child(
json_iterator, json, "target", target_.get(), GRPC_JSON_STRING, false);
grpc_json_create_child(nullptr, json, "target", target_.get(),
GRPC_JSON_STRING, false);
// fill in the channel trace if applicable
grpc_json* trace = trace_->RenderJson();
if (trace != nullptr) {
// we manually link up and fill the child since it was created for us in
// ChannelTrace::RenderJson
trace->key = "trace"; // this object is named trace in channelz.proto
json_iterator = grpc_json_link_child(json, trace, json_iterator);
}
// reset the parent to be the data object.
json = data;
json_iterator = nullptr;
if (calls_started_ != 0) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "callsStarted", calls_started_);
}
if (calls_succeeded_ != 0) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "callsSucceeded", calls_succeeded_);
}
if (calls_failed_) {
json_iterator = grpc_json_add_number_string_child(
json, json_iterator, "callsFailed", calls_failed_);
grpc_json* trace_json = trace_.RenderJson();
if (trace_json != nullptr) {
trace_json->key = "trace"; // this object is named trace in channelz.proto
grpc_json_link_child(json, trace_json, nullptr);
}
gpr_timespec ts =
grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME);
json_iterator =
grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp",
gpr_format_timespec(ts), GRPC_JSON_STRING, true);
// ask CallCountingHelper to populate trace and call count data.
call_counter_.PopulateCallCounts(json);
json = top_level_json;
json_iterator = nullptr;
// template method. Child classes may override this to add their specific
// functionality.
PopulateChildRefs(json);
return top_level_json;
}
char* ChannelNode::RenderJsonString() {
grpc_json* json = RenderJson();
char* json_str = grpc_json_dump_to_string(json, 0);
grpc_json_destroy(json);
return json_str;
}
RefCountedPtr<ChannelNode> ChannelNode::MakeChannelNode(
grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel) {
@ -141,13 +146,5 @@ RefCountedPtr<ChannelNode> ChannelNode::MakeChannelNode(
channel, channel_tracer_max_nodes, is_top_level_channel);
}
SubchannelNode::SubchannelNode() {
subchannel_uuid_ = ChannelzRegistry::RegisterSubchannelNode(this);
}
SubchannelNode::~SubchannelNode() {
ChannelzRegistry::UnregisterSubchannelNode(subchannel_uuid_);
}
} // namespace channelz
} // namespace grpc_core

@ -43,14 +43,52 @@ namespace grpc_core {
namespace channelz {
namespace testing {
class CallCountingHelperPeer;
class ChannelNodePeer;
}
} // namespace testing
class ChannelNode : public RefCounted<ChannelNode> {
// base class for all channelz entities
class BaseNode : public RefCounted<BaseNode> {
public:
static RefCountedPtr<ChannelNode> MakeChannelNode(
grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel);
// There are only four high level channelz entities. However, to support
// GetTopChannelsRequest, we split the Channel entity into two different
// types. All children of BaseNode must be one of these types.
enum class EntityType {
kTopLevelChannel,
kInternalChannel,
kSubchannel,
kServer,
kSocket,
};
explicit BaseNode(EntityType type);
virtual ~BaseNode();
// All children must implement this function.
virtual grpc_json* RenderJson() GRPC_ABSTRACT;
// Renders the json and returns allocated string that must be freed by the
// caller.
char* RenderJsonString();
EntityType type() const { return type_; }
intptr_t uuid() const { return uuid_; }
private:
const EntityType type_;
const intptr_t uuid_;
};
// This class is a helper class for channelz entities that deal with Channels,
// Subchannels, and Servers, since those have similar proto definitions.
// This class has the ability to:
// - track calls_{started,succeeded,failed}
// - track last_call_started_timestamp
// - perform rendering of the above items
class CallCountingHelper {
public:
CallCountingHelper();
~CallCountingHelper();
void RecordCallStarted();
void RecordCallFailed() {
@ -60,17 +98,46 @@ class ChannelNode : public RefCounted<ChannelNode> {
gpr_atm_no_barrier_fetch_add(&calls_succeeded_, (gpr_atm(1)));
}
grpc_json* RenderJson();
char* RenderJsonString();
// Common rendering of the call count data and last_call_started_timestamp.
void PopulateCallCounts(grpc_json* json);
// helper for getting and populating connectivity state. It is virtual
// because it allows the client_channel specific code to live in ext/
// instead of lib/
virtual void PopulateConnectivityState(grpc_json* json);
private:
// testing peer friend.
friend class testing::CallCountingHelperPeer;
virtual void PopulateChildRefs(grpc_json* json);
gpr_atm calls_started_ = 0;
gpr_atm calls_succeeded_ = 0;
gpr_atm calls_failed_ = 0;
gpr_atm last_call_started_millis_ = 0;
};
ChannelTrace* trace() { return trace_.get(); }
// Handles channelz bookkeeping for channels
class ChannelNode : public BaseNode {
public:
static RefCountedPtr<ChannelNode> MakeChannelNode(
grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel);
ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel);
~ChannelNode() override;
grpc_json* RenderJson() override;
// template methods. RenderJSON uses these methods to render its JSON
// representation. These are virtual so that children classes may provide
// their specific mechanism for populating these parts of the channelz
// object.
//
// ChannelNode does not have a notion of connectivity state or child refs,
// so it leaves these implementations blank.
//
// This is utilizing the template method design pattern.
//
// TODO(ncteisen): remove these template methods in favor of manual traversal
// and mutation of the grpc_json object.
virtual void PopulateConnectivityState(grpc_json* json) {}
virtual void PopulateChildRefs(grpc_json* json) {}
void MarkChannelDestroyed() {
GPR_ASSERT(channel_ != nullptr);
@ -79,47 +146,44 @@ class ChannelNode : public RefCounted<ChannelNode> {
bool ChannelIsDestroyed() { return channel_ == nullptr; }
intptr_t channel_uuid() { return channel_uuid_; }
bool is_top_level_channel() { return is_top_level_channel_; }
protected:
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel);
virtual ~ChannelNode();
// proxy methods to composed classes.
void AddTraceEvent(ChannelTrace::Severity severity, grpc_slice data) {
trace_.AddTraceEvent(severity, data);
}
void AddTraceEventWithReference(ChannelTrace::Severity severity,
grpc_slice data,
RefCountedPtr<BaseNode> referenced_channel) {
trace_.AddTraceEventWithReference(severity, data,
std::move(referenced_channel));
}
void RecordCallStarted() { call_counter_.RecordCallStarted(); }
void RecordCallFailed() { call_counter_.RecordCallFailed(); }
void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); }
private:
// testing peer friend.
// to allow the channel trace test to access trace_.
friend class testing::ChannelNodePeer;
grpc_channel* channel_ = nullptr;
UniquePtr<char> target_;
gpr_atm calls_started_ = 0;
gpr_atm calls_succeeded_ = 0;
gpr_atm calls_failed_ = 0;
gpr_atm last_call_started_millis_ = 0;
intptr_t channel_uuid_;
bool is_top_level_channel_ = true;
ManualConstructor<ChannelTrace> trace_;
CallCountingHelper call_counter_;
ChannelTrace trace_;
};
// Placeholds channelz class for subchannels. All this can do now is track its
// uuid (this information is needed by the parent channelz class).
// TODO(ncteisen): build this out to support the GetSubchannel channelz request.
class SubchannelNode : public RefCounted<SubchannelNode> {
// Handles channelz bookkeeping for servers
// TODO(ncteisen): implement in subsequent PR.
class ServerNode : public BaseNode {
public:
SubchannelNode();
virtual ~SubchannelNode();
intptr_t subchannel_uuid() { return subchannel_uuid_; }
protected:
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
explicit ServerNode(size_t channel_tracer_max_nodes)
: BaseNode(EntityType::kServer) {}
~ServerNode() override {}
};
private:
intptr_t subchannel_uuid_;
// Handles channelz bookkeeping for sockets
// TODO(ncteisen): implement in subsequent PR.
class SocketNode : public BaseNode {
public:
SocketNode() : BaseNode(EntityType::kSocket) {}
~SocketNode() override {}
};
// Creation functions

@ -53,54 +53,46 @@ ChannelzRegistry::ChannelzRegistry() { gpr_mu_init(&mu_); }
ChannelzRegistry::~ChannelzRegistry() { gpr_mu_destroy(&mu_); }
intptr_t ChannelzRegistry::InternalRegisterEntry(const RegistryEntry& entry) {
intptr_t ChannelzRegistry::InternalRegister(BaseNode* node) {
MutexLock lock(&mu_);
entities_.push_back(entry);
entities_.push_back(node);
intptr_t uuid = entities_.size();
return uuid;
}
void ChannelzRegistry::InternalUnregisterEntry(intptr_t uuid, EntityType type) {
void ChannelzRegistry::InternalUnregister(intptr_t uuid) {
GPR_ASSERT(uuid >= 1);
MutexLock lock(&mu_);
GPR_ASSERT(static_cast<size_t>(uuid) <= entities_.size());
GPR_ASSERT(entities_[uuid - 1].type == type);
entities_[uuid - 1].object = nullptr;
entities_[uuid - 1].type = EntityType::kUnset;
entities_[uuid - 1] = nullptr;
}
void* ChannelzRegistry::InternalGetEntry(intptr_t uuid, EntityType type) {
BaseNode* ChannelzRegistry::InternalGet(intptr_t uuid) {
MutexLock lock(&mu_);
if (uuid < 1 || uuid > static_cast<intptr_t>(entities_.size())) {
return nullptr;
}
if (entities_[uuid - 1].type == type) {
return entities_[uuid - 1].object;
} else {
return nullptr;
}
return entities_[uuid - 1];
}
char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) {
grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
grpc_json* json = top_level_json;
grpc_json* json_iterator = nullptr;
InlinedVector<ChannelNode*, 10> top_level_channels;
InlinedVector<BaseNode*, 10> top_level_channels;
// uuids index into entities one-off (idx 0 is really uuid 1, since 0 is
// reserved). However, we want to support requests coming in with
// start_channel_id=0, which signifies "give me everything." Hence this
// funky looking line below.
size_t start_idx = start_channel_id == 0 ? 0 : start_channel_id - 1;
for (size_t i = start_idx; i < entities_.size(); ++i) {
if (entities_[i].type == EntityType::kChannelNode) {
ChannelNode* channel_node =
static_cast<ChannelNode*>(entities_[i].object);
if (channel_node->is_top_level_channel()) {
top_level_channels.push_back(channel_node);
}
if (entities_[i] != nullptr &&
entities_[i]->type() ==
grpc_core::channelz::BaseNode::EntityType::kTopLevelChannel) {
top_level_channels.push_back(entities_[i]);
}
}
if (top_level_channels.size() > 0) {
if (!top_level_channels.empty()) {
// create list of channels
grpc_json* array_parent = grpc_json_create_child(
nullptr, json, "channel", nullptr, GRPC_JSON_ARRAY, false);
@ -129,9 +121,13 @@ char* grpc_channelz_get_top_channels(intptr_t start_channel_id) {
}
char* grpc_channelz_get_channel(intptr_t channel_id) {
grpc_core::channelz::ChannelNode* channel_node =
grpc_core::channelz::ChannelzRegistry::GetChannelNode(channel_id);
if (channel_node == nullptr) {
grpc_core::channelz::BaseNode* channel_node =
grpc_core::channelz::ChannelzRegistry::Get(channel_id);
if (channel_node == nullptr ||
(channel_node->type() !=
grpc_core::channelz::BaseNode::EntityType::kTopLevelChannel &&
channel_node->type() !=
grpc_core::channelz::BaseNode::EntityType::kInternalChannel)) {
return nullptr;
}
grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
@ -143,3 +139,21 @@ char* grpc_channelz_get_channel(intptr_t channel_id) {
grpc_json_destroy(top_level_json);
return json_str;
}
char* grpc_channelz_get_subchannel(intptr_t subchannel_id) {
grpc_core::channelz::BaseNode* subchannel_node =
grpc_core::channelz::ChannelzRegistry::Get(subchannel_id);
if (subchannel_node == nullptr ||
subchannel_node->type() !=
grpc_core::channelz::BaseNode::EntityType::kSubchannel) {
return nullptr;
}
grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
grpc_json* json = top_level_json;
grpc_json* subchannel_json = subchannel_node->RenderJson();
subchannel_json->key = "subchannel";
grpc_json_link_child(json, subchannel_json, nullptr);
char* json_str = grpc_json_dump_to_string(top_level_json, 0);
grpc_json_destroy(top_level_json);
return json_str;
}

@ -40,32 +40,11 @@ class ChannelzRegistry {
// To be called in grpc_shutdown();
static void Shutdown();
// Register/Unregister/Get for ChannelNode
static intptr_t RegisterChannelNode(ChannelNode* channel_node) {
RegistryEntry entry(channel_node, EntityType::kChannelNode);
return Default()->InternalRegisterEntry(entry);
}
static void UnregisterChannelNode(intptr_t uuid) {
Default()->InternalUnregisterEntry(uuid, EntityType::kChannelNode);
}
static ChannelNode* GetChannelNode(intptr_t uuid) {
void* gotten = Default()->InternalGetEntry(uuid, EntityType::kChannelNode);
return gotten == nullptr ? nullptr : static_cast<ChannelNode*>(gotten);
}
// Register/Unregister/Get for SubchannelNode
static intptr_t RegisterSubchannelNode(SubchannelNode* channel_node) {
RegistryEntry entry(channel_node, EntityType::kSubchannelNode);
return Default()->InternalRegisterEntry(entry);
}
static void UnregisterSubchannelNode(intptr_t uuid) {
Default()->InternalUnregisterEntry(uuid, EntityType::kSubchannelNode);
}
static SubchannelNode* GetSubchannelNode(intptr_t uuid) {
void* gotten =
Default()->InternalGetEntry(uuid, EntityType::kSubchannelNode);
return gotten == nullptr ? nullptr : static_cast<SubchannelNode*>(gotten);
static intptr_t Register(BaseNode* node) {
return Default()->InternalRegister(node);
}
static void Unregister(intptr_t uuid) { Default()->InternalUnregister(uuid); }
static BaseNode* Get(intptr_t uuid) { return Default()->InternalGet(uuid); }
// Returns the allocated JSON string that represents the proto
// GetTopChannelsResponse as per channelz.proto.
@ -74,19 +53,6 @@ class ChannelzRegistry {
}
private:
enum class EntityType {
kChannelNode,
kSubchannelNode,
kUnset,
};
struct RegistryEntry {
RegistryEntry(void* object_in, EntityType type_in)
: object(object_in), type(type_in) {}
void* object;
EntityType type;
};
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
@ -97,21 +63,21 @@ class ChannelzRegistry {
static ChannelzRegistry* Default();
// globally registers an Entry. Returns its unique uuid
intptr_t InternalRegisterEntry(const RegistryEntry& entry);
intptr_t InternalRegister(BaseNode* node);
// globally unregisters the object that is associated to uuid. Also does
// sanity check that an object doesn't try to unregister the wrong type.
void InternalUnregisterEntry(intptr_t uuid, EntityType type);
void InternalUnregister(intptr_t uuid);
// if object with uuid has previously been registered as the correct type,
// returns the void* associated with that uuid. Else returns nullptr.
void* InternalGetEntry(intptr_t uuid, EntityType type);
BaseNode* InternalGet(intptr_t uuid);
char* InternalGetTopChannels(intptr_t start_channel_id);
// protects entities_ and uuid_
gpr_mu mu_;
InlinedVector<RegistryEntry, 20> entities_;
InlinedVector<BaseNode*, 20> entities_;
};
} // namespace channelz

@ -169,7 +169,7 @@ grpc_channel* grpc_channel_create_with_builder(
bool is_top_level_channel = channel->is_client && !internal_channel;
channel->channelz_channel = channel_node_create_func(
channel, channel_tracer_max_nodes, is_top_level_channel);
channel->channelz_channel->trace()->AddTraceEvent(
channel->channelz_channel->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel created"));
}
@ -427,6 +427,9 @@ void grpc_channel_internal_unref(grpc_channel* c REF_ARG) {
static void destroy_channel(void* arg, grpc_error* error) {
grpc_channel* channel = static_cast<grpc_channel*>(arg);
if (channel->channelz_channel != nullptr) {
channel->channelz_channel->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel destroyed"));
channel->channelz_channel->MarkChannelDestroyed();
channel->channelz_channel.reset();
}

@ -98,6 +98,7 @@ grpc_resource_quota_set_max_threads_type grpc_resource_quota_set_max_threads_imp
grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import;
grpc_channelz_get_top_channels_type grpc_channelz_get_top_channels_import;
grpc_channelz_get_channel_type grpc_channelz_get_channel_import;
grpc_channelz_get_subchannel_type grpc_channelz_get_subchannel_import;
grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import;
grpc_server_add_insecure_channel_from_fd_type grpc_server_add_insecure_channel_from_fd_import;
grpc_use_signal_type grpc_use_signal_import;
@ -352,6 +353,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_resource_quota_arg_vtable_import = (grpc_resource_quota_arg_vtable_type) GetProcAddress(library, "grpc_resource_quota_arg_vtable");
grpc_channelz_get_top_channels_import = (grpc_channelz_get_top_channels_type) GetProcAddress(library, "grpc_channelz_get_top_channels");
grpc_channelz_get_channel_import = (grpc_channelz_get_channel_type) GetProcAddress(library, "grpc_channelz_get_channel");
grpc_channelz_get_subchannel_import = (grpc_channelz_get_subchannel_type) GetProcAddress(library, "grpc_channelz_get_subchannel");
grpc_insecure_channel_create_from_fd_import = (grpc_insecure_channel_create_from_fd_type) GetProcAddress(library, "grpc_insecure_channel_create_from_fd");
grpc_server_add_insecure_channel_from_fd_import = (grpc_server_add_insecure_channel_from_fd_type) GetProcAddress(library, "grpc_server_add_insecure_channel_from_fd");
grpc_use_signal_import = (grpc_use_signal_type) GetProcAddress(library, "grpc_use_signal");

@ -269,6 +269,9 @@ extern grpc_channelz_get_top_channels_type grpc_channelz_get_top_channels_import
typedef char*(*grpc_channelz_get_channel_type)(intptr_t channel_id);
extern grpc_channelz_get_channel_type grpc_channelz_get_channel_import;
#define grpc_channelz_get_channel grpc_channelz_get_channel_import
typedef char*(*grpc_channelz_get_subchannel_type)(intptr_t subchannel_id);
extern grpc_channelz_get_subchannel_type grpc_channelz_get_subchannel_import;
#define grpc_channelz_get_subchannel grpc_channelz_get_subchannel_import
typedef grpc_channel*(*grpc_insecure_channel_create_from_fd_type)(const char* target, int fd, const grpc_channel_args* args);
extern grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import;
#define grpc_insecure_channel_create_from_fd grpc_insecure_channel_create_from_fd_import

@ -124,7 +124,7 @@ static void test_create_channel_stack(void) {
gpr_now(GPR_CLOCK_MONOTONIC), /* start_time */
GRPC_MILLIS_INF_FUTURE, /* deadline */
nullptr, /* arena */
nullptr /* call_combiner */
nullptr, /* call_combiner */
};
grpc_error* error =
grpc_call_stack_init(channel_stack, 1, free_call, call_stack, &args);

@ -40,6 +40,17 @@
namespace grpc_core {
namespace channelz {
namespace testing {
// testing peer to access channel internals
class ChannelNodePeer {
public:
explicit ChannelNodePeer(ChannelNode* node) : node_(node) {}
ChannelTrace* trace() const { return &node_->trace_; }
private:
ChannelNode* node_;
};
namespace {
grpc_json* GetJsonChild(grpc_json* parent, const char* key) {
@ -156,28 +167,29 @@ TEST_P(ChannelTracerTest, ComplexTest) {
ChannelFixture channel1(GetParam());
RefCountedPtr<ChannelNode> sc1 =
MakeRefCounted<ChannelNode>(channel1.channel(), GetParam(), true);
tracer.AddTraceEventReferencingSubchannel(
ChannelNodePeer sc1_peer(sc1.get());
tracer.AddTraceEventWithReference(
ChannelTrace::Severity::Info,
grpc_slice_from_static_string("subchannel one created"), sc1);
ValidateChannelTrace(&tracer, 3, GetParam());
AddSimpleTrace(sc1->trace());
AddSimpleTrace(sc1->trace());
AddSimpleTrace(sc1->trace());
ValidateChannelTrace(sc1->trace(), 3, GetParam());
AddSimpleTrace(sc1->trace());
AddSimpleTrace(sc1->trace());
AddSimpleTrace(sc1->trace());
ValidateChannelTrace(sc1->trace(), 6, GetParam());
AddSimpleTrace(sc1_peer.trace());
AddSimpleTrace(sc1_peer.trace());
AddSimpleTrace(sc1_peer.trace());
ValidateChannelTrace(sc1_peer.trace(), 3, GetParam());
AddSimpleTrace(sc1_peer.trace());
AddSimpleTrace(sc1_peer.trace());
AddSimpleTrace(sc1_peer.trace());
ValidateChannelTrace(sc1_peer.trace(), 6, GetParam());
AddSimpleTrace(&tracer);
AddSimpleTrace(&tracer);
ValidateChannelTrace(&tracer, 5, GetParam());
ChannelFixture channel2(GetParam());
RefCountedPtr<ChannelNode> sc2 =
MakeRefCounted<ChannelNode>(channel2.channel(), GetParam(), true);
tracer.AddTraceEventReferencingChannel(
tracer.AddTraceEventWithReference(
ChannelTrace::Severity::Info,
grpc_slice_from_static_string("LB channel two created"), sc2);
tracer.AddTraceEventReferencingSubchannel(
tracer.AddTraceEventWithReference(
ChannelTrace::Severity::Warning,
grpc_slice_from_static_string("subchannel one inactive"), sc1);
ValidateChannelTrace(&tracer, 7, GetParam());
@ -203,33 +215,35 @@ TEST_P(ChannelTracerTest, TestNesting) {
ChannelFixture channel1(GetParam());
RefCountedPtr<ChannelNode> sc1 =
MakeRefCounted<ChannelNode>(channel1.channel(), GetParam(), true);
tracer.AddTraceEventReferencingChannel(
ChannelNodePeer sc1_peer(sc1.get());
tracer.AddTraceEventWithReference(
ChannelTrace::Severity::Info,
grpc_slice_from_static_string("subchannel one created"), sc1);
ValidateChannelTrace(&tracer, 3, GetParam());
AddSimpleTrace(sc1->trace());
AddSimpleTrace(sc1_peer.trace());
ChannelFixture channel2(GetParam());
RefCountedPtr<ChannelNode> conn1 =
MakeRefCounted<ChannelNode>(channel2.channel(), GetParam(), true);
ChannelNodePeer conn1_peer(conn1.get());
// nesting one level deeper.
sc1->trace()->AddTraceEventReferencingSubchannel(
sc1_peer.trace()->AddTraceEventWithReference(
ChannelTrace::Severity::Info,
grpc_slice_from_static_string("connection one created"), conn1);
ValidateChannelTrace(&tracer, 3, GetParam());
AddSimpleTrace(conn1->trace());
AddSimpleTrace(conn1_peer.trace());
AddSimpleTrace(&tracer);
AddSimpleTrace(&tracer);
ValidateChannelTrace(&tracer, 5, GetParam());
ValidateChannelTrace(conn1->trace(), 1, GetParam());
ValidateChannelTrace(conn1_peer.trace(), 1, GetParam());
ChannelFixture channel3(GetParam());
RefCountedPtr<ChannelNode> sc2 =
MakeRefCounted<ChannelNode>(channel3.channel(), GetParam(), true);
tracer.AddTraceEventReferencingSubchannel(
tracer.AddTraceEventWithReference(
ChannelTrace::Severity::Info,
grpc_slice_from_static_string("subchannel two created"), sc2);
// this trace should not get added to the parents children since it is already
// present in the tracer.
tracer.AddTraceEventReferencingChannel(
tracer.AddTraceEventWithReference(
ChannelTrace::Severity::Warning,
grpc_slice_from_static_string("subchannel one inactive"), sc1);
AddSimpleTrace(&tracer);

@ -44,22 +44,22 @@ namespace channelz {
namespace testing {
TEST(ChannelzRegistryTest, UuidStartsAboveZeroTest) {
ChannelNode* channelz_channel = nullptr;
intptr_t uuid = ChannelzRegistry::RegisterChannelNode(channelz_channel);
BaseNode* channelz_channel = nullptr;
intptr_t uuid = ChannelzRegistry::Register(channelz_channel);
EXPECT_GT(uuid, 0) << "First uuid chose must be greater than zero. Zero if "
"reserved according to "
"https://github.com/grpc/proposal/blob/master/"
"A14-channelz.md";
ChannelzRegistry::UnregisterChannelNode(uuid);
ChannelzRegistry::Unregister(uuid);
}
TEST(ChannelzRegistryTest, UuidsAreIncreasing) {
ChannelNode* channelz_channel = nullptr;
BaseNode* channelz_channel = nullptr;
std::vector<intptr_t> uuids;
uuids.reserve(10);
for (int i = 0; i < 10; ++i) {
// reregister the same object. It's ok since we are just testing uuids
uuids.push_back(ChannelzRegistry::RegisterChannelNode(channelz_channel));
uuids.push_back(ChannelzRegistry::Register(channelz_channel));
}
for (size_t i = 1; i < uuids.size(); ++i) {
EXPECT_LT(uuids[i - 1], uuids[i]) << "Uuids must always be increasing";
@ -68,30 +68,30 @@ TEST(ChannelzRegistryTest, UuidsAreIncreasing) {
TEST(ChannelzRegistryTest, RegisterGetTest) {
// we hackily jam an intptr_t into this pointer to check for equality later
ChannelNode* channelz_channel = (ChannelNode*)42;
intptr_t uuid = ChannelzRegistry::RegisterChannelNode(channelz_channel);
ChannelNode* retrieved = ChannelzRegistry::GetChannelNode(uuid);
BaseNode* channelz_channel = (BaseNode*)42;
intptr_t uuid = ChannelzRegistry::Register(channelz_channel);
BaseNode* retrieved = ChannelzRegistry::Get(uuid);
EXPECT_EQ(channelz_channel, retrieved);
}
TEST(ChannelzRegistryTest, RegisterManyItems) {
// we hackily jam an intptr_t into this pointer to check for equality later
ChannelNode* channelz_channel = (ChannelNode*)42;
BaseNode* channelz_channel = (BaseNode*)42;
for (int i = 0; i < 100; i++) {
intptr_t uuid = ChannelzRegistry::RegisterChannelNode(channelz_channel);
ChannelNode* retrieved = ChannelzRegistry::GetChannelNode(uuid);
intptr_t uuid = ChannelzRegistry::Register(channelz_channel);
BaseNode* retrieved = ChannelzRegistry::Get(uuid);
EXPECT_EQ(channelz_channel, retrieved);
}
}
TEST(ChannelzRegistryTest, NullIfNotPresentTest) {
// we hackily jam an intptr_t into this pointer to check for equality later
ChannelNode* channelz_channel = (ChannelNode*)42;
intptr_t uuid = ChannelzRegistry::RegisterChannelNode(channelz_channel);
BaseNode* channelz_channel = (BaseNode*)42;
intptr_t uuid = ChannelzRegistry::Register(channelz_channel);
// try to pull out a uuid that does not exist.
ChannelNode* nonexistant = ChannelzRegistry::GetChannelNode(uuid + 1);
BaseNode* nonexistant = ChannelzRegistry::Get(uuid + 1);
EXPECT_EQ(nonexistant, nullptr);
ChannelNode* retrieved = ChannelzRegistry::GetChannelNode(uuid);
BaseNode* retrieved = ChannelzRegistry::Get(uuid);
EXPECT_EQ(channelz_channel, retrieved);
}

@ -44,16 +44,16 @@ namespace channelz {
namespace testing {
// testing peer to access channel internals
class ChannelNodePeer {
class CallCountingHelperPeer {
public:
ChannelNodePeer(ChannelNode* channel) : channel_(channel) {}
grpc_millis last_call_started_millis() {
explicit CallCountingHelperPeer(CallCountingHelper* node) : node_(node) {}
grpc_millis last_call_started_millis() const {
return (grpc_millis)gpr_atm_no_barrier_load(
&channel_->last_call_started_millis_);
&node_->last_call_started_millis_);
}
private:
ChannelNode* channel_;
CallCountingHelper* node_;
};
namespace {
@ -157,14 +157,14 @@ void ValidateChannel(ChannelNode* channel, validate_channel_data_args args) {
ValidateCounters(json_str, args);
gpr_free(json_str);
// also check that the core API formats this the correct way
char* core_api_json_str = grpc_channelz_get_channel(channel->channel_uuid());
char* core_api_json_str = grpc_channelz_get_channel(channel->uuid());
grpc::testing::ValidateGetChannelResponseProtoJsonTranslation(
core_api_json_str);
gpr_free(core_api_json_str);
}
grpc_millis GetLastCallStartedMillis(ChannelNode* channel) {
ChannelNodePeer peer(channel);
grpc_millis GetLastCallStartedMillis(CallCountingHelper* channel) {
CallCountingHelperPeer peer(channel);
return peer.last_call_started_millis();
}
@ -215,27 +215,25 @@ TEST_P(ChannelzChannelTest, BasicChannelAPIFunctionality) {
TEST_P(ChannelzChannelTest, LastCallStartedMillis) {
grpc_core::ExecCtx exec_ctx;
ChannelFixture channel(GetParam());
ChannelNode* channelz_channel =
grpc_channel_get_channelz_node(channel.channel());
CallCountingHelper counter;
// start a call to set the last call started timestamp
channelz_channel->RecordCallStarted();
grpc_millis millis1 = GetLastCallStartedMillis(channelz_channel);
counter.RecordCallStarted();
grpc_millis millis1 = GetLastCallStartedMillis(&counter);
// time gone by should not affect the timestamp
ChannelzSleep(100);
grpc_millis millis2 = GetLastCallStartedMillis(channelz_channel);
grpc_millis millis2 = GetLastCallStartedMillis(&counter);
EXPECT_EQ(millis1, millis2);
// calls succeeded or failed should not affect the timestamp
ChannelzSleep(100);
channelz_channel->RecordCallFailed();
channelz_channel->RecordCallSucceeded();
grpc_millis millis3 = GetLastCallStartedMillis(channelz_channel);
counter.RecordCallFailed();
counter.RecordCallSucceeded();
grpc_millis millis3 = GetLastCallStartedMillis(&counter);
EXPECT_EQ(millis1, millis3);
// another call started should affect the timestamp
// sleep for extra long to avoid flakes (since we cache Now())
ChannelzSleep(5000);
channelz_channel->RecordCallStarted();
grpc_millis millis4 = GetLastCallStartedMillis(channelz_channel);
counter.RecordCallStarted();
grpc_millis millis4 = GetLastCallStartedMillis(&counter);
EXPECT_NE(millis1, millis4);
}

@ -137,6 +137,7 @@ int main(int argc, char **argv) {
printf("%lx", (unsigned long) grpc_resource_quota_arg_vtable);
printf("%lx", (unsigned long) grpc_channelz_get_top_channels);
printf("%lx", (unsigned long) grpc_channelz_get_channel);
printf("%lx", (unsigned long) grpc_channelz_get_subchannel);
printf("%lx", (unsigned long) grpc_auth_property_iterator_next);
printf("%lx", (unsigned long) grpc_auth_context_property_iterator);
printf("%lx", (unsigned long) grpc_auth_context_peer_identity);

@ -82,5 +82,9 @@ void ValidateGetChannelResponseProtoJsonTranslation(char* json_c_str) {
json_c_str);
}
void ValidateSubchannelProtoJsonTranslation(char* json_c_str) {
VaidateProtoJsonTranslation<grpc::channelz::v1::Subchannel>(json_c_str);
}
} // namespace testing
} // namespace grpc

@ -26,6 +26,7 @@ void ValidateChannelTraceProtoJsonTranslation(char* json_c_str);
void ValidateChannelProtoJsonTranslation(char* json_c_str);
void ValidateGetTopChannelsResponseProtoJsonTranslation(char* json_c_str);
void ValidateGetChannelResponseProtoJsonTranslation(char* json_c_str);
void ValidateSubchannelProtoJsonTranslation(char* json_c_str);
} // namespace testing
} // namespace grpc

Loading…
Cancel
Save