Merge pull request #19244 from markdroth/channelz_fixes.alt

Change ChannelzRegistry::Get() to return a RefCountedPtr<>.
reviewable/pr19066/r6^2
Mark D. Roth 6 years ago committed by GitHub
commit 37afb4745f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 118
      src/core/lib/channel/channelz_registry.cc
  2. 6
      src/core/lib/channel/channelz_registry.h
  3. 5
      src/core/lib/gprpp/ref_counted.h
  4. 76
      test/core/channel/channelz_registry_test.cc

@ -117,36 +117,46 @@ void ChannelzRegistry::InternalUnregister(intptr_t uuid) {
MaybePerformCompactionLocked();
}
BaseNode* ChannelzRegistry::InternalGet(intptr_t uuid) {
RefCountedPtr<BaseNode> ChannelzRegistry::InternalGet(intptr_t uuid) {
MutexLock lock(&mu_);
if (uuid < 1 || uuid > uuid_generator_) {
return nullptr;
}
int idx = FindByUuidLocked(uuid, true);
return idx < 0 ? nullptr : entities_[idx];
if (idx < 0 || entities_[idx] == nullptr) return nullptr;
// Found node. Return only if its refcount is not zero (i.e., when we
// know that there is no other thread about to destroy it).
if (!entities_[idx]->RefIfNonZero()) return nullptr;
return RefCountedPtr<BaseNode>(entities_[idx]);
}
char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) {
MutexLock lock(&mu_);
grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
grpc_json* json = top_level_json;
grpc_json* json_iterator = nullptr;
InlinedVector<BaseNode*, 10> top_level_channels;
bool reached_pagination_limit = false;
int start_idx = GPR_MAX(FindByUuidLocked(start_channel_id, false), 0);
for (size_t i = start_idx; i < entities_.size(); ++i) {
if (entities_[i] != nullptr &&
entities_[i]->type() ==
grpc_core::channelz::BaseNode::EntityType::kTopLevelChannel &&
entities_[i]->uuid() >= start_channel_id) {
// check if we are over pagination limit to determine if we need to set
// the "end" element. If we don't go through this block, we know that
// when the loop terminates, we have <= to kPaginationLimit.
if (top_level_channels.size() == kPaginationLimit) {
reached_pagination_limit = true;
break;
InlinedVector<RefCountedPtr<BaseNode>, 10> top_level_channels;
RefCountedPtr<BaseNode> node_after_pagination_limit;
{
MutexLock lock(&mu_);
const int start_idx = GPR_MAX(FindByUuidLocked(start_channel_id, false), 0);
for (size_t i = start_idx; i < entities_.size(); ++i) {
if (entities_[i] != nullptr &&
entities_[i]->type() ==
grpc_core::channelz::BaseNode::EntityType::kTopLevelChannel &&
entities_[i]->uuid() >= start_channel_id &&
entities_[i]->RefIfNonZero()) {
// Check if we are over pagination limit to determine if we need to set
// the "end" element. If we don't go through this block, we know that
// when the loop terminates, we have <= to kPaginationLimit.
// Note that because we have already increased this node's
// refcount, we need to decrease it, but we can't unref while
// holding the lock, because this may lead to a deadlock.
if (top_level_channels.size() == kPaginationLimit) {
node_after_pagination_limit.reset(entities_[i]);
break;
}
top_level_channels.emplace_back(entities_[i]);
}
top_level_channels.push_back(entities_[i]);
}
}
if (!top_level_channels.empty()) {
@ -159,7 +169,7 @@ char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) {
grpc_json_link_child(array_parent, channel_json, json_iterator);
}
}
if (!reached_pagination_limit) {
if (node_after_pagination_limit == nullptr) {
grpc_json_create_child(nullptr, json, "end", nullptr, GRPC_JSON_TRUE,
false);
}
@ -169,26 +179,32 @@ char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) {
}
char* ChannelzRegistry::InternalGetServers(intptr_t start_server_id) {
MutexLock lock(&mu_);
grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
grpc_json* json = top_level_json;
grpc_json* json_iterator = nullptr;
InlinedVector<BaseNode*, 10> servers;
bool reached_pagination_limit = false;
int start_idx = GPR_MAX(FindByUuidLocked(start_server_id, false), 0);
for (size_t i = start_idx; i < entities_.size(); ++i) {
if (entities_[i] != nullptr &&
entities_[i]->type() ==
grpc_core::channelz::BaseNode::EntityType::kServer &&
entities_[i]->uuid() >= start_server_id) {
// check if we are over pagination limit to determine if we need to set
// the "end" element. If we don't go through this block, we know that
// when the loop terminates, we have <= to kPaginationLimit.
if (servers.size() == kPaginationLimit) {
reached_pagination_limit = true;
break;
InlinedVector<RefCountedPtr<BaseNode>, 10> servers;
RefCountedPtr<BaseNode> node_after_pagination_limit;
{
MutexLock lock(&mu_);
const int start_idx = GPR_MAX(FindByUuidLocked(start_server_id, false), 0);
for (size_t i = start_idx; i < entities_.size(); ++i) {
if (entities_[i] != nullptr &&
entities_[i]->type() ==
grpc_core::channelz::BaseNode::EntityType::kServer &&
entities_[i]->uuid() >= start_server_id &&
entities_[i]->RefIfNonZero()) {
// Check if we are over pagination limit to determine if we need to set
// the "end" element. If we don't go through this block, we know that
// when the loop terminates, we have <= to kPaginationLimit.
// Note that because we have already increased this node's
// refcount, we need to decrease it, but we can't unref while
// holding the lock, because this may lead to a deadlock.
if (servers.size() == kPaginationLimit) {
node_after_pagination_limit.reset(entities_[i]);
break;
}
servers.emplace_back(entities_[i]);
}
servers.push_back(entities_[i]);
}
}
if (!servers.empty()) {
@ -201,7 +217,7 @@ char* ChannelzRegistry::InternalGetServers(intptr_t start_server_id) {
grpc_json_link_child(array_parent, server_json, json_iterator);
}
}
if (!reached_pagination_limit) {
if (node_after_pagination_limit == nullptr) {
grpc_json_create_child(nullptr, json, "end", nullptr, GRPC_JSON_TRUE,
false);
}
@ -211,14 +227,20 @@ char* ChannelzRegistry::InternalGetServers(intptr_t start_server_id) {
}
void ChannelzRegistry::InternalLogAllEntities() {
MutexLock lock(&mu_);
for (size_t i = 0; i < entities_.size(); ++i) {
if (entities_[i] != nullptr) {
char* json = entities_[i]->RenderJsonString();
gpr_log(GPR_INFO, "%s", json);
gpr_free(json);
InlinedVector<RefCountedPtr<BaseNode>, 10> nodes;
{
MutexLock lock(&mu_);
for (size_t i = 0; i < entities_.size(); ++i) {
if (entities_[i] != nullptr && entities_[i]->RefIfNonZero()) {
nodes.emplace_back(entities_[i]);
}
}
}
for (size_t i = 0; i < nodes.size(); ++i) {
char* json = nodes[i]->RenderJsonString();
gpr_log(GPR_INFO, "%s", json);
gpr_free(json);
}
}
} // namespace channelz
@ -234,7 +256,7 @@ char* grpc_channelz_get_servers(intptr_t start_server_id) {
}
char* grpc_channelz_get_server(intptr_t server_id) {
grpc_core::channelz::BaseNode* server_node =
grpc_core::RefCountedPtr<grpc_core::channelz::BaseNode> server_node =
grpc_core::channelz::ChannelzRegistry::Get(server_id);
if (server_node == nullptr ||
server_node->type() !=
@ -254,7 +276,7 @@ char* grpc_channelz_get_server(intptr_t server_id) {
char* grpc_channelz_get_server_sockets(intptr_t server_id,
intptr_t start_socket_id,
intptr_t max_results) {
grpc_core::channelz::BaseNode* base_node =
grpc_core::RefCountedPtr<grpc_core::channelz::BaseNode> base_node =
grpc_core::channelz::ChannelzRegistry::Get(server_id);
if (base_node == nullptr ||
base_node->type() != grpc_core::channelz::BaseNode::EntityType::kServer) {
@ -263,12 +285,12 @@ char* grpc_channelz_get_server_sockets(intptr_t server_id,
// This cast is ok since we have just checked to make sure base_node is
// actually a server node
grpc_core::channelz::ServerNode* server_node =
static_cast<grpc_core::channelz::ServerNode*>(base_node);
static_cast<grpc_core::channelz::ServerNode*>(base_node.get());
return server_node->RenderServerSockets(start_socket_id, max_results);
}
char* grpc_channelz_get_channel(intptr_t channel_id) {
grpc_core::channelz::BaseNode* channel_node =
grpc_core::RefCountedPtr<grpc_core::channelz::BaseNode> channel_node =
grpc_core::channelz::ChannelzRegistry::Get(channel_id);
if (channel_node == nullptr ||
(channel_node->type() !=
@ -288,7 +310,7 @@ char* grpc_channelz_get_channel(intptr_t channel_id) {
}
char* grpc_channelz_get_subchannel(intptr_t subchannel_id) {
grpc_core::channelz::BaseNode* subchannel_node =
grpc_core::RefCountedPtr<grpc_core::channelz::BaseNode> subchannel_node =
grpc_core::channelz::ChannelzRegistry::Get(subchannel_id);
if (subchannel_node == nullptr ||
subchannel_node->type() !=
@ -306,7 +328,7 @@ char* grpc_channelz_get_subchannel(intptr_t subchannel_id) {
}
char* grpc_channelz_get_socket(intptr_t socket_id) {
grpc_core::channelz::BaseNode* socket_node =
grpc_core::RefCountedPtr<grpc_core::channelz::BaseNode> socket_node =
grpc_core::channelz::ChannelzRegistry::Get(socket_id);
if (socket_node == nullptr ||
socket_node->type() !=

@ -48,7 +48,9 @@ class ChannelzRegistry {
return Default()->InternalRegister(node);
}
static void Unregister(intptr_t uuid) { Default()->InternalUnregister(uuid); }
static BaseNode* Get(intptr_t uuid) { return Default()->InternalGet(uuid); }
static RefCountedPtr<BaseNode> Get(intptr_t uuid) {
return Default()->InternalGet(uuid);
}
// Returns the allocated JSON string that represents the proto
// GetTopChannelsResponse as per channelz.proto.
@ -86,7 +88,7 @@ class ChannelzRegistry {
// if object with uuid has previously been registered as the correct type,
// returns the void* associated with that uuid. Else returns nullptr.
BaseNode* InternalGet(intptr_t uuid);
RefCountedPtr<BaseNode> InternalGet(intptr_t uuid);
char* InternalGetTopChannels(intptr_t start_channel_id);
char* InternalGetServers(intptr_t start_server_id);

@ -221,6 +221,11 @@ class RefCounted : public Impl {
}
}
bool RefIfNonZero() { return refs_.RefIfNonZero(); }
bool RefIfNonZero(const DebugLocation& location, const char* reason) {
return refs_.RefIfNonZero(location, reason);
}
// Not copyable nor movable.
RefCounted(const RefCounted&) = delete;
RefCounted& operator=(const RefCounted&) = delete;

@ -62,8 +62,8 @@ class ChannelzRegistryTest : public ::testing::Test {
};
TEST_F(ChannelzRegistryTest, UuidStartsAboveZeroTest) {
UniquePtr<BaseNode> channelz_channel =
MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel);
RefCountedPtr<BaseNode> channelz_channel =
MakeRefCounted<BaseNode>(BaseNode::EntityType::kTopLevelChannel);
intptr_t uuid = channelz_channel->uuid();
EXPECT_GT(uuid, 0) << "First uuid chose must be greater than zero. Zero if "
"reserved according to "
@ -72,11 +72,11 @@ TEST_F(ChannelzRegistryTest, UuidStartsAboveZeroTest) {
}
TEST_F(ChannelzRegistryTest, UuidsAreIncreasing) {
std::vector<UniquePtr<BaseNode>> channelz_channels;
std::vector<RefCountedPtr<BaseNode>> channelz_channels;
channelz_channels.reserve(10);
for (int i = 0; i < 10; ++i) {
channelz_channels.push_back(
MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
MakeRefCounted<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
}
for (size_t i = 1; i < channelz_channels.size(); ++i) {
EXPECT_LT(channelz_channels[i - 1]->uuid(), channelz_channels[i]->uuid())
@ -85,46 +85,50 @@ TEST_F(ChannelzRegistryTest, UuidsAreIncreasing) {
}
TEST_F(ChannelzRegistryTest, RegisterGetTest) {
UniquePtr<BaseNode> channelz_channel =
MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel);
BaseNode* retrieved = ChannelzRegistry::Get(channelz_channel->uuid());
EXPECT_EQ(channelz_channel.get(), retrieved);
RefCountedPtr<BaseNode> channelz_channel =
MakeRefCounted<BaseNode>(BaseNode::EntityType::kTopLevelChannel);
RefCountedPtr<BaseNode> retrieved =
ChannelzRegistry::Get(channelz_channel->uuid());
EXPECT_EQ(channelz_channel, retrieved);
}
TEST_F(ChannelzRegistryTest, RegisterManyItems) {
std::vector<UniquePtr<BaseNode>> channelz_channels;
std::vector<RefCountedPtr<BaseNode>> channelz_channels;
for (int i = 0; i < 100; i++) {
channelz_channels.push_back(
MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
BaseNode* retrieved = ChannelzRegistry::Get(channelz_channels[i]->uuid());
EXPECT_EQ(channelz_channels[i].get(), retrieved);
MakeRefCounted<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
RefCountedPtr<BaseNode> retrieved =
ChannelzRegistry::Get(channelz_channels[i]->uuid());
EXPECT_EQ(channelz_channels[i], retrieved);
}
}
TEST_F(ChannelzRegistryTest, NullIfNotPresentTest) {
UniquePtr<BaseNode> channelz_channel =
MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel);
RefCountedPtr<BaseNode> channelz_channel =
MakeRefCounted<BaseNode>(BaseNode::EntityType::kTopLevelChannel);
// try to pull out a uuid that does not exist.
BaseNode* nonexistant = ChannelzRegistry::Get(channelz_channel->uuid() + 1);
RefCountedPtr<BaseNode> nonexistant =
ChannelzRegistry::Get(channelz_channel->uuid() + 1);
EXPECT_EQ(nonexistant, nullptr);
BaseNode* retrieved = ChannelzRegistry::Get(channelz_channel->uuid());
EXPECT_EQ(channelz_channel.get(), retrieved);
RefCountedPtr<BaseNode> retrieved =
ChannelzRegistry::Get(channelz_channel->uuid());
EXPECT_EQ(channelz_channel, retrieved);
}
TEST_F(ChannelzRegistryTest, TestCompaction) {
const int kLoopIterations = 300;
// These channels that will stay in the registry for the duration of the test.
std::vector<UniquePtr<BaseNode>> even_channels;
std::vector<RefCountedPtr<BaseNode>> even_channels;
even_channels.reserve(kLoopIterations);
{
// The channels will unregister themselves at the end of the for block.
std::vector<UniquePtr<BaseNode>> odd_channels;
std::vector<RefCountedPtr<BaseNode>> odd_channels;
odd_channels.reserve(kLoopIterations);
for (int i = 0; i < kLoopIterations; i++) {
even_channels.push_back(
MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
MakeRefCounted<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
odd_channels.push_back(
MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
MakeRefCounted<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
}
}
// without compaction, there would be exactly kLoopIterations empty slots at
@ -137,25 +141,26 @@ TEST_F(ChannelzRegistryTest, TestCompaction) {
TEST_F(ChannelzRegistryTest, TestGetAfterCompaction) {
const int kLoopIterations = 100;
// These channels that will stay in the registry for the duration of the test.
std::vector<UniquePtr<BaseNode>> even_channels;
std::vector<RefCountedPtr<BaseNode>> even_channels;
even_channels.reserve(kLoopIterations);
std::vector<intptr_t> odd_uuids;
odd_uuids.reserve(kLoopIterations);
{
// The channels will unregister themselves at the end of the for block.
std::vector<UniquePtr<BaseNode>> odd_channels;
std::vector<RefCountedPtr<BaseNode>> odd_channels;
odd_channels.reserve(kLoopIterations);
for (int i = 0; i < kLoopIterations; i++) {
even_channels.push_back(
MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
MakeRefCounted<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
odd_channels.push_back(
MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
MakeRefCounted<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
odd_uuids.push_back(odd_channels[i]->uuid());
}
}
for (int i = 0; i < kLoopIterations; i++) {
BaseNode* retrieved = ChannelzRegistry::Get(even_channels[i]->uuid());
EXPECT_EQ(even_channels[i].get(), retrieved);
RefCountedPtr<BaseNode> retrieved =
ChannelzRegistry::Get(even_channels[i]->uuid());
EXPECT_EQ(even_channels[i], retrieved);
retrieved = ChannelzRegistry::Get(odd_uuids[i]);
EXPECT_EQ(retrieved, nullptr);
}
@ -164,29 +169,30 @@ TEST_F(ChannelzRegistryTest, TestGetAfterCompaction) {
TEST_F(ChannelzRegistryTest, TestAddAfterCompaction) {
const int kLoopIterations = 100;
// These channels that will stay in the registry for the duration of the test.
std::vector<UniquePtr<BaseNode>> even_channels;
std::vector<RefCountedPtr<BaseNode>> even_channels;
even_channels.reserve(kLoopIterations);
std::vector<intptr_t> odd_uuids;
odd_uuids.reserve(kLoopIterations);
{
// The channels will unregister themselves at the end of the for block.
std::vector<UniquePtr<BaseNode>> odd_channels;
std::vector<RefCountedPtr<BaseNode>> odd_channels;
odd_channels.reserve(kLoopIterations);
for (int i = 0; i < kLoopIterations; i++) {
even_channels.push_back(
MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
MakeRefCounted<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
odd_channels.push_back(
MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
MakeRefCounted<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
odd_uuids.push_back(odd_channels[i]->uuid());
}
}
std::vector<UniquePtr<BaseNode>> more_channels;
std::vector<RefCountedPtr<BaseNode>> more_channels;
more_channels.reserve(kLoopIterations);
for (int i = 0; i < kLoopIterations; i++) {
more_channels.push_back(
MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
BaseNode* retrieved = ChannelzRegistry::Get(more_channels[i]->uuid());
EXPECT_EQ(more_channels[i].get(), retrieved);
MakeRefCounted<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
RefCountedPtr<BaseNode> retrieved =
ChannelzRegistry::Get(more_channels[i]->uuid());
EXPECT_EQ(more_channels[i], retrieved);
}
}

Loading…
Cancel
Save