Merge pull request #24199 from markdroth/channelz_linkage

Decouple channel creation from channelz child linkage.
pull/24201/head
Mark D. Roth 4 years ago committed by GitHub
commit f6f2594fc1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 47
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  2. 54
      src/core/ext/xds/xds_client.cc
  3. 2
      src/core/ext/xds/xds_client.h
  4. 55
      src/core/lib/channel/channelz.cc
  5. 30
      src/core/lib/channel/channelz.h
  6. 3
      src/core/lib/channel/channelz_registry.h
  7. 40
      src/core/lib/surface/channel.cc
  8. 10
      test/core/channel/channelz_test.cc

@ -427,6 +427,8 @@ class GrpcLb : public LoadBalancingPolicy {
StateWatcher* watcher_ = nullptr;
// Response generator to inject address updates into lb_channel_.
RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
// Parent channelz node.
RefCountedPtr<channelz::ChannelNode> parent_channelz_node_;
// The data associated with the current LB call. It holds a ref to this LB
// policy. It's initialized every time we query for backends. It's reset to
@ -1280,25 +1282,18 @@ grpc_channel_args* BuildBalancerChannelArgs(
GRPC_ARG_CHANNELZ_CHANNEL_NODE,
};
// Channel args to add.
absl::InlinedVector<grpc_arg, 3> args_to_add;
// The fake resolver response generator, which we use to inject
// address updates into the LB channel.
args_to_add.emplace_back(
absl::InlinedVector<grpc_arg, 3> args_to_add = {
// The fake resolver response generator, which we use to inject
// address updates into the LB channel.
grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
response_generator));
// A channel arg indicating the target is a grpclb load balancer.
args_to_add.emplace_back(grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1));
// The parent channel's channelz uuid.
channelz::ChannelNode* channelz_node = nullptr;
const grpc_arg* arg =
grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
if (arg != nullptr && arg->type == GRPC_ARG_POINTER &&
arg->value.pointer.p != nullptr) {
channelz_node = static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
args_to_add.emplace_back(
channelz::MakeParentUuidArg(channelz_node->uuid()));
}
response_generator),
// A channel arg indicating the target is a grpclb load balancer.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1),
// Tells channelz that this is an internal channel.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
};
// Construct channel args.
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add.data(),
@ -1375,6 +1370,12 @@ void GrpcLb::ShutdownLocked() {
// OnBalancerChannelConnectivityChangedLocked(), and we need to be
// alive when that callback is invoked.
if (lb_channel_ != nullptr) {
if (parent_channelz_node_ != nullptr) {
channelz::ChannelNode* child_channelz_node =
grpc_channel_get_channelz_node(lb_channel_);
GPR_ASSERT(child_channelz_node != nullptr);
parent_channelz_node_->RemoveChildChannel(child_channelz_node->uuid());
}
grpc_channel_destroy(lb_channel_);
lb_channel_ = nullptr;
}
@ -1461,6 +1462,16 @@ void GrpcLb::ProcessAddressesAndChannelArgsLocked(
lb_channel_ =
CreateGrpclbBalancerChannel(uri_str.c_str(), *lb_channel_args);
GPR_ASSERT(lb_channel_ != nullptr);
// Set up channelz linkage.
channelz::ChannelNode* child_channelz_node =
grpc_channel_get_channelz_node(lb_channel_);
channelz::ChannelNode* parent_channelz_node =
grpc_channel_args_find_pointer<channelz::ChannelNode>(
&args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
if (child_channelz_node != nullptr && parent_channelz_node != nullptr) {
parent_channelz_node->AddChildChannel(child_channelz_node->uuid());
parent_channelz_node_ = parent_channelz_node->Ref();
}
}
// Propagate updates to the LB channel (pick_first) through the fake
// resolver.

@ -453,26 +453,21 @@ grpc_channel_args* BuildXdsChannelArgs(const grpc_channel_args& args) {
GRPC_ARG_KEEPALIVE_TIME_MS,
};
// Channel args to add.
absl::InlinedVector<grpc_arg, 3> args_to_add;
// Keepalive interval.
args_to_add.emplace_back(grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS), 5 * 60 * GPR_MS_PER_SEC));
// A channel arg indicating that the target is an xds server.
// TODO(roth): Once we figure out our fallback and credentials story, decide
// whether this is actually needed. Note that it's currently used by the
// fake security connector as well.
args_to_add.emplace_back(grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_XDS_SERVER), 1));
// The parent channel's channelz uuid.
channelz::ChannelNode* channelz_node = nullptr;
const grpc_arg* arg =
grpc_channel_args_find(&args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
if (arg != nullptr && arg->type == GRPC_ARG_POINTER &&
arg->value.pointer.p != nullptr) {
channelz_node = static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
args_to_add.emplace_back(
channelz::MakeParentUuidArg(channelz_node->uuid()));
}
absl::InlinedVector<grpc_arg, 3> args_to_add = {
// Keepalive interval.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
5 * 60 * GPR_MS_PER_SEC),
// Tell channelz this is an internal channel.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
// A channel arg indicating that the target is an xds server.
// TODO(roth): Once we figure out our fallback and credentials story,
// decide whether this is actually needed. Note that it's currently
// used by the fake security connector as well.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_XDS_SERVER), 1),
};
// Construct channel args.
return grpc_channel_args_copy_and_add_and_remove(
&args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add.data(),
@ -1802,6 +1797,17 @@ XdsClient::XdsClient(const grpc_channel_args& channel_args, grpc_error** error)
grpc_error_string(*error));
return;
}
// Add channelz linkage.
channelz::ChannelNode* xds_channelz_node =
grpc_channel_get_channelz_node(channel);
channelz::ChannelNode* parent_channelz_node =
grpc_channel_args_find_pointer<channelz::ChannelNode>(
&channel_args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
if (xds_channelz_node != nullptr && parent_channelz_node != nullptr) {
parent_channelz_node->AddChildChannel(xds_channelz_node->uuid());
parent_channelz_node_ = parent_channelz_node->Ref();
}
// Create ChannelState object.
chand_ = MakeOrphanable<ChannelState>(
Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel);
}
@ -1820,6 +1826,14 @@ void XdsClient::Orphan() {
{
MutexLock lock(&mu_);
shutting_down_ = true;
// Remove channelz linkage.
if (parent_channelz_node_ != nullptr) {
channelz::ChannelNode* xds_channelz_node =
grpc_channel_get_channelz_node(chand_->channel());
GPR_ASSERT(xds_channelz_node != nullptr);
parent_channelz_node_->RemoveChildChannel(xds_channelz_node->uuid());
}
// Orphan ChannelState object.
chand_.reset();
// We do not clear cluster_map_ and endpoint_map_ if the xds client was
// created by the XdsResolver because the maps contain refs for watchers

@ -28,6 +28,7 @@
#include "src/core/ext/xds/xds_api.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/map.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/orphanable.h"
@ -307,6 +308,7 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
// The channel for communicating with the xds server.
OrphanablePtr<ChannelState> chand_;
RefCountedPtr<channelz::ChannelNode> parent_channelz_node_;
// One entry for each watched LDS resource.
std::map<std::string /*listener_name*/, ListenerState> listener_map_;

@ -48,40 +48,6 @@
namespace grpc_core {
namespace channelz {
//
// channel arg code
//
namespace {
void* parent_uuid_copy(void* p) { return p; }
void parent_uuid_destroy(void* /*p*/) {}
int parent_uuid_cmp(void* p1, void* p2) { return GPR_ICMP(p1, p2); }
const grpc_arg_pointer_vtable parent_uuid_vtable = {
parent_uuid_copy, parent_uuid_destroy, parent_uuid_cmp};
} // namespace
grpc_arg MakeParentUuidArg(intptr_t parent_uuid) {
// We would ideally like to store the uuid in an integer argument.
// Unfortunately, that won't work, because intptr_t (the type used for
// uuids) doesn't fit in an int (the type used for integer args).
// So instead, we use a hack to store it as a pointer, because
// intptr_t should be the same size as void*.
static_assert(sizeof(intptr_t) <= sizeof(void*),
"can't fit intptr_t inside of void*");
return grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_PARENT_UUID),
reinterpret_cast<void*>(parent_uuid), &parent_uuid_vtable);
}
intptr_t GetParentUuidFromArgs(const grpc_channel_args& args) {
const grpc_arg* arg =
grpc_channel_args_find(&args, GRPC_ARG_CHANNELZ_PARENT_UUID);
if (arg == nullptr || arg->type != GRPC_ARG_POINTER) return 0;
return reinterpret_cast<intptr_t>(arg->value.pointer.p);
}
//
// BaseNode
//
@ -171,13 +137,12 @@ void CallCountingHelper::PopulateCallCounts(Json::Object* object) {
//
ChannelNode::ChannelNode(std::string target, size_t channel_tracer_max_nodes,
intptr_t parent_uuid)
: BaseNode(parent_uuid == 0 ? EntityType::kTopLevelChannel
: EntityType::kInternalChannel,
bool is_internal_channel)
: BaseNode(is_internal_channel ? EntityType::kInternalChannel
: EntityType::kTopLevelChannel,
target),
target_(std::move(target)),
trace_(channel_tracer_max_nodes),
parent_uuid_(parent_uuid) {}
trace_(channel_tracer_max_nodes) {}
const char* ChannelNode::GetChannelConnectivityStateChangeString(
grpc_connectivity_state state) {
@ -235,18 +200,18 @@ void ChannelNode::PopulateChildRefs(Json::Object* json) {
MutexLock lock(&child_mu_);
if (!child_subchannels_.empty()) {
Json::Array array;
for (const auto& p : child_subchannels_) {
for (intptr_t subchannel_uuid : child_subchannels_) {
array.emplace_back(Json::Object{
{"subchannelId", std::to_string(p.first)},
{"subchannelId", std::to_string(subchannel_uuid)},
});
}
(*json)["subchannelRef"] = std::move(array);
}
if (!child_channels_.empty()) {
Json::Array array;
for (const auto& p : child_channels_) {
for (intptr_t channel_uuid : child_channels_) {
array.emplace_back(Json::Object{
{"channelId", std::to_string(p.first)},
{"channelId", std::to_string(channel_uuid)},
});
}
(*json)["channelRef"] = std::move(array);
@ -261,7 +226,7 @@ void ChannelNode::SetConnectivityState(grpc_connectivity_state state) {
void ChannelNode::AddChildChannel(intptr_t child_uuid) {
MutexLock lock(&child_mu_);
child_channels_.insert(std::make_pair(child_uuid, true));
child_channels_.insert(child_uuid);
}
void ChannelNode::RemoveChildChannel(intptr_t child_uuid) {
@ -271,7 +236,7 @@ void ChannelNode::RemoveChildChannel(intptr_t child_uuid) {
void ChannelNode::AddChildSubchannel(intptr_t child_uuid) {
MutexLock lock(&child_mu_);
child_subchannels_.insert(std::make_pair(child_uuid, true));
child_subchannels_.insert(child_uuid);
}
void ChannelNode::RemoveChildSubchannel(intptr_t child_uuid) {

@ -23,6 +23,7 @@
#include <grpc/grpc.h>
#include <set>
#include <string>
#include "absl/container/inlined_vector.h"
@ -42,8 +43,9 @@
// Channel arg key for channelz node.
#define GRPC_ARG_CHANNELZ_CHANNEL_NODE "grpc.channelz_channel_node"
// Channel arg key to encode the channelz uuid of the channel's parent.
#define GRPC_ARG_CHANNELZ_PARENT_UUID "grpc.channelz_parent_uuid"
// Channel arg key for indicating an internal channel.
#define GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL \
"grpc.channelz_is_internal_channel"
/** This is the default value for whether or not to enable channelz. If
* GRPC_ARG_ENABLE_CHANNELZ is set, it will override this default value. */
@ -59,10 +61,6 @@ namespace grpc_core {
namespace channelz {
// Helpers for getting and setting GRPC_ARG_CHANNELZ_PARENT_UUID.
grpc_arg MakeParentUuidArg(intptr_t parent_uuid);
intptr_t GetParentUuidFromArgs(const grpc_channel_args& args);
class SocketNode;
class ListenSocketNode;
@ -176,14 +174,12 @@ class CallCountingHelper {
class ChannelNode : public BaseNode {
public:
ChannelNode(std::string target, size_t channel_tracer_max_nodes,
intptr_t parent_uuid);
bool is_internal_channel);
// Returns the string description of the given connectivity state.
static const char* GetChannelConnectivityStateChangeString(
grpc_connectivity_state state);
intptr_t parent_uuid() const { return parent_uuid_; }
Json RenderJson() override;
// proxy methods to composed classes.
@ -213,26 +209,22 @@ class ChannelNode : public BaseNode {
void RemoveChildSubchannel(intptr_t child_uuid);
private:
void PopulateChildRefs(Json::Object* json);
// to allow the channel trace test to access trace_.
// Allows the channel trace test to access trace_.
friend class testing::ChannelNodePeer;
void PopulateChildRefs(Json::Object* json);
std::string target_;
CallCountingHelper call_counter_;
ChannelTrace trace_;
const intptr_t parent_uuid_;
// Least significant bit indicates whether the value is set. Remaining
// bits are a grpc_connectivity_state value.
Atomic<int> connectivity_state_{0};
Mutex child_mu_; // Guards child maps below.
// TODO(roth): We don't actually use the values here, only the keys, so
// these should be sets instead of maps, but we don't currently have a set
// implementation. Change this if/when we have one.
std::map<intptr_t, bool> child_channels_;
std::map<intptr_t, bool> child_subchannels_;
Mutex child_mu_; // Guards sets below.
std::set<intptr_t> child_channels_;
std::set<intptr_t> child_subchannels_;
};
// Handles channelz bookkeeping for servers

@ -23,6 +23,9 @@
#include <stdint.h>
#include <map>
#include <string>
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/map.h"

@ -183,42 +183,30 @@ void CreateChannelzNode(grpc_channel_stack_builder* builder) {
const grpc_channel_args* args =
grpc_channel_stack_builder_get_channel_arguments(builder);
// Check whether channelz is enabled.
const bool channelz_enabled = grpc_channel_arg_get_bool(
grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ),
GRPC_ENABLE_CHANNELZ_DEFAULT);
const bool channelz_enabled = grpc_channel_args_find_bool(
args, GRPC_ARG_ENABLE_CHANNELZ, GRPC_ENABLE_CHANNELZ_DEFAULT);
if (!channelz_enabled) return;
// Get parameters needed to create the channelz node.
const size_t channel_tracer_max_memory = grpc_channel_arg_get_integer(
grpc_channel_args_find(args,
GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE),
const size_t channel_tracer_max_memory = grpc_channel_args_find_integer(
args, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE,
{GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX});
const intptr_t channelz_parent_uuid =
grpc_core::channelz::GetParentUuidFromArgs(*args);
const bool is_internal_channel = grpc_channel_args_find_bool(
args, GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL, false);
// Create the channelz node.
const char* target = grpc_channel_stack_builder_get_target(builder);
grpc_core::RefCountedPtr<grpc_core::channelz::ChannelNode> channelz_node =
grpc_core::MakeRefCounted<grpc_core::channelz::ChannelNode>(
target != nullptr ? target : "", channel_tracer_max_memory,
channelz_parent_uuid);
is_internal_channel);
channelz_node->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel created"));
// Update parent channel node, if any.
if (channelz_parent_uuid > 0) {
grpc_core::RefCountedPtr<grpc_core::channelz::BaseNode> parent_node =
grpc_core::channelz::ChannelzRegistry::Get(channelz_parent_uuid);
if (parent_node != nullptr) {
grpc_core::channelz::ChannelNode* parent =
static_cast<grpc_core::channelz::ChannelNode*>(parent_node.get());
parent->AddChildChannel(channelz_node->uuid());
}
}
// Add channelz node to channel args.
// We remove the arg for the parent uuid, since we no longer need it.
// We remove the is_internal_channel arg, since we no longer need it.
grpc_arg new_arg = grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_NODE), channelz_node.get(),
&channelz_node_arg_vtable);
const char* args_to_remove[] = {GRPC_ARG_CHANNELZ_PARENT_UUID};
const char* args_to_remove[] = {GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL};
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
grpc_channel_stack_builder_set_channel_arguments(builder, new_args);
@ -506,16 +494,6 @@ grpc_call* grpc_channel_create_registered_call(
static void destroy_channel(void* arg, grpc_error* /*error*/) {
grpc_channel* channel = static_cast<grpc_channel*>(arg);
if (channel->channelz_node != nullptr) {
if (channel->channelz_node->parent_uuid() > 0) {
grpc_core::RefCountedPtr<grpc_core::channelz::BaseNode> parent_node =
grpc_core::channelz::ChannelzRegistry::Get(
channel->channelz_node->parent_uuid());
if (parent_node != nullptr) {
grpc_core::channelz::ChannelNode* parent =
static_cast<grpc_core::channelz::ChannelNode*>(parent_node.get());
parent->RemoveChildChannel(channel->channelz_node->uuid());
}
}
channel->channelz_node->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel destroyed"));

@ -504,10 +504,12 @@ TEST_F(ChannelzRegistryBasedTest, InternalChannelTest) {
ChannelFixture channels[10];
(void)channels; // suppress unused variable error
// create an internal channel
grpc_arg client_a[2];
client_a[0] = grpc_core::channelz::MakeParentUuidArg(1);
client_a[1] = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ENABLE_CHANNELZ), true);
grpc_arg client_a[] = {
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ENABLE_CHANNELZ), true),
};
grpc_channel_args client_args = {GPR_ARRAY_SIZE(client_a), client_a};
grpc_channel* internal_channel =
grpc_insecure_channel_create("fake_target", &client_args, nullptr);

Loading…
Cancel
Save