Merge pull request #19250 from markdroth/channelz_map.alt

Convert channelz registry to use Map<> instead of InlinedVector<>.
reviewable/pr19042/r6^2
Mark D. Roth 6 years ago committed by GitHub
commit b7e1b6f3ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 110
      src/core/lib/channel/channelz_registry.cc
  2. 32
      src/core/lib/channel/channelz_registry.h
  3. 10
      src/core/lib/gprpp/map.h
  4. 63
      test/core/channel/channelz_registry_test.cc
  5. 18
      test/core/gprpp/map_test.cc

@ -18,6 +18,9 @@
#include <grpc/impl/codegen/port_platform.h>
#include <algorithm>
#include <cstring>
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/channel/channelz_registry.h"
@ -29,8 +32,6 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <cstring>
namespace grpc_core {
namespace channelz {
namespace {
@ -51,70 +52,17 @@ ChannelzRegistry* ChannelzRegistry::Default() {
return g_channelz_registry;
}
ChannelzRegistry::ChannelzRegistry() { gpr_mu_init(&mu_); }
ChannelzRegistry::~ChannelzRegistry() { gpr_mu_destroy(&mu_); }
void ChannelzRegistry::InternalRegister(BaseNode* node) {
MutexLock lock(&mu_);
entities_.push_back(node);
node->uuid_ = ++uuid_generator_;
}
void ChannelzRegistry::MaybePerformCompactionLocked() {
constexpr double kEmptinessTheshold = 1. / 3;
double emptiness_ratio =
double(num_empty_slots_) / double(entities_.capacity());
if (emptiness_ratio > kEmptinessTheshold) {
int front = 0;
for (size_t i = 0; i < entities_.size(); ++i) {
if (entities_[i] != nullptr) {
entities_[front++] = entities_[i];
}
}
for (int i = 0; i < num_empty_slots_; ++i) {
entities_.pop_back();
}
num_empty_slots_ = 0;
}
}
int ChannelzRegistry::FindByUuidLocked(intptr_t target_uuid,
bool direct_hit_needed) {
int left = 0;
int right = int(entities_.size() - 1);
while (left <= right) {
int true_middle = left + (right - left) / 2;
int first_non_null = true_middle;
while (first_non_null < right && entities_[first_non_null] == nullptr) {
first_non_null++;
}
if (entities_[first_non_null] == nullptr) {
right = true_middle - 1;
continue;
}
intptr_t uuid = entities_[first_non_null]->uuid();
if (uuid == target_uuid) {
return int(first_non_null);
}
if (uuid < target_uuid) {
left = first_non_null + 1;
} else {
right = true_middle - 1;
}
}
return direct_hit_needed ? -1 : left;
node_map_[node->uuid_] = node;
}
void ChannelzRegistry::InternalUnregister(intptr_t uuid) {
GPR_ASSERT(uuid >= 1);
MutexLock lock(&mu_);
GPR_ASSERT(uuid <= uuid_generator_);
int idx = FindByUuidLocked(uuid, true);
GPR_ASSERT(idx >= 0);
entities_[idx] = nullptr;
num_empty_slots_++;
MaybePerformCompactionLocked();
node_map_.erase(uuid);
}
RefCountedPtr<BaseNode> ChannelzRegistry::InternalGet(intptr_t uuid) {
@ -122,12 +70,13 @@ RefCountedPtr<BaseNode> ChannelzRegistry::InternalGet(intptr_t uuid) {
if (uuid < 1 || uuid > uuid_generator_) {
return nullptr;
}
int idx = FindByUuidLocked(uuid, true);
if (idx < 0 || entities_[idx] == nullptr) return nullptr;
auto it = node_map_.find(uuid);
if (it == node_map_.end()) return nullptr;
// Found node. Return only if its refcount is not zero (i.e., when we
// know that there is no other thread about to destroy it).
if (!entities_[idx]->RefIfNonZero()) return nullptr;
return RefCountedPtr<BaseNode>(entities_[idx]);
BaseNode* node = it->second;
if (!node->RefIfNonZero()) return nullptr;
return RefCountedPtr<BaseNode>(node);
}
char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) {
@ -138,13 +87,11 @@ char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) {
RefCountedPtr<BaseNode> node_after_pagination_limit;
{
MutexLock lock(&mu_);
const int start_idx = GPR_MAX(FindByUuidLocked(start_channel_id, false), 0);
for (size_t i = start_idx; i < entities_.size(); ++i) {
if (entities_[i] != nullptr &&
entities_[i]->type() ==
grpc_core::channelz::BaseNode::EntityType::kTopLevelChannel &&
entities_[i]->uuid() >= start_channel_id &&
entities_[i]->RefIfNonZero()) {
for (auto it = node_map_.lower_bound(start_channel_id);
it != node_map_.end(); ++it) {
BaseNode* node = it->second;
if (node->type() == BaseNode::EntityType::kTopLevelChannel &&
node->RefIfNonZero()) {
// Check if we are over pagination limit to determine if we need to set
// the "end" element. If we don't go through this block, we know that
// when the loop terminates, we have <= to kPaginationLimit.
@ -152,10 +99,10 @@ char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) {
// refcount, we need to decrease it, but we can't unref while
// holding the lock, because this may lead to a deadlock.
if (top_level_channels.size() == kPaginationLimit) {
node_after_pagination_limit.reset(entities_[i]);
node_after_pagination_limit.reset(node);
break;
}
top_level_channels.emplace_back(entities_[i]);
top_level_channels.emplace_back(node);
}
}
}
@ -186,13 +133,11 @@ char* ChannelzRegistry::InternalGetServers(intptr_t start_server_id) {
RefCountedPtr<BaseNode> node_after_pagination_limit;
{
MutexLock lock(&mu_);
const int start_idx = GPR_MAX(FindByUuidLocked(start_server_id, false), 0);
for (size_t i = start_idx; i < entities_.size(); ++i) {
if (entities_[i] != nullptr &&
entities_[i]->type() ==
grpc_core::channelz::BaseNode::EntityType::kServer &&
entities_[i]->uuid() >= start_server_id &&
entities_[i]->RefIfNonZero()) {
for (auto it = node_map_.lower_bound(start_server_id);
it != node_map_.end(); ++it) {
BaseNode* node = it->second;
if (node->type() == BaseNode::EntityType::kServer &&
node->RefIfNonZero()) {
// Check if we are over pagination limit to determine if we need to set
// the "end" element. If we don't go through this block, we know that
// when the loop terminates, we have <= to kPaginationLimit.
@ -200,10 +145,10 @@ char* ChannelzRegistry::InternalGetServers(intptr_t start_server_id) {
// refcount, we need to decrease it, but we can't unref while
// holding the lock, because this may lead to a deadlock.
if (servers.size() == kPaginationLimit) {
node_after_pagination_limit.reset(entities_[i]);
node_after_pagination_limit.reset(node);
break;
}
servers.emplace_back(entities_[i]);
servers.emplace_back(node);
}
}
}
@ -230,9 +175,10 @@ void ChannelzRegistry::InternalLogAllEntities() {
InlinedVector<RefCountedPtr<BaseNode>, 10> nodes;
{
MutexLock lock(&mu_);
for (size_t i = 0; i < entities_.size(); ++i) {
if (entities_[i] != nullptr && entities_[i]->RefIfNonZero()) {
nodes.emplace_back(entities_[i]);
for (auto& p : node_map_) {
BaseNode* node = p.second;
if (node->RefIfNonZero()) {
nodes.emplace_back(node);
}
}
}

@ -21,19 +21,16 @@
#include <grpc/impl/codegen/port_platform.h>
#include <stdint.h>
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include <stdint.h>
#include "src/core/lib/gprpp/map.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_core {
namespace channelz {
namespace testing {
class ChannelzRegistryPeer;
}
// singleton registry object to track all objects that are needed to support
// channelz bookkeeping. All objects share globally distributed uuids.
class ChannelzRegistry {
@ -69,13 +66,6 @@ class ChannelzRegistry {
static void LogAllEntities() { Default()->InternalLogAllEntities(); }
private:
GRPC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
GRPC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
friend class testing::ChannelzRegistryPeer;
ChannelzRegistry();
~ChannelzRegistry();
// Returned the singleton instance of ChannelzRegistry;
static ChannelzRegistry* Default();
@ -93,22 +83,12 @@ class ChannelzRegistry {
char* InternalGetTopChannels(intptr_t start_channel_id);
char* InternalGetServers(intptr_t start_server_id);
// If entities_ has over a certain threshold of empty slots, it will
// compact the vector and move all used slots to the front.
void MaybePerformCompactionLocked();
// Performs binary search on entities_ to find the index with that uuid.
// If direct_hit_needed, then will return -1 in case of absence.
// Else, will return idx of the first uuid higher than the target.
int FindByUuidLocked(intptr_t uuid, bool direct_hit_needed);
void InternalLogAllEntities();
// protects members
gpr_mu mu_;
InlinedVector<BaseNode*, 20> entities_;
Mutex mu_;
Map<intptr_t, BaseNode*> node_map_;
intptr_t uuid_generator_ = 0;
int num_empty_slots_ = 0;
};
} // namespace channelz

@ -22,8 +22,11 @@
#include <grpc/support/port_platform.h>
#include <string.h>
#include <algorithm>
#include <functional>
#include <iterator>
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/pair.h"
@ -88,6 +91,13 @@ class Map {
iterator end() { return iterator(this, nullptr); }
iterator lower_bound(const Key& k) {
key_compare compare;
return std::find_if(begin(), end(), [&k, &compare](const value_type& v) {
return !compare(v.first, k);
});
}
private:
friend class testing::MapTest;
struct Entry {

@ -43,16 +43,6 @@ namespace grpc_core {
namespace channelz {
namespace testing {
class ChannelzRegistryPeer {
public:
const InlinedVector<BaseNode*, 20>* entities() {
return &ChannelzRegistry::Default()->entities_;
}
int num_empty_slots() {
return ChannelzRegistry::Default()->num_empty_slots_;
}
};
class ChannelzRegistryTest : public ::testing::Test {
protected:
// ensure we always have a fresh registry for tests.
@ -115,38 +105,15 @@ TEST_F(ChannelzRegistryTest, NullIfNotPresentTest) {
EXPECT_EQ(channelz_channel, retrieved);
}
TEST_F(ChannelzRegistryTest, TestCompaction) {
const int kLoopIterations = 300;
// These channels that will stay in the registry for the duration of the test.
std::vector<RefCountedPtr<BaseNode>> even_channels;
even_channels.reserve(kLoopIterations);
{
// The channels will unregister themselves at the end of the for block.
std::vector<RefCountedPtr<BaseNode>> odd_channels;
odd_channels.reserve(kLoopIterations);
for (int i = 0; i < kLoopIterations; i++) {
even_channels.push_back(
MakeRefCounted<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
odd_channels.push_back(
MakeRefCounted<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
}
}
// without compaction, there would be exactly kLoopIterations empty slots at
// this point. However, one of the unregisters should have triggered
// compaction.
ChannelzRegistryPeer peer;
EXPECT_LT(peer.num_empty_slots(), kLoopIterations);
}
TEST_F(ChannelzRegistryTest, TestGetAfterCompaction) {
TEST_F(ChannelzRegistryTest, TestUnregistration) {
const int kLoopIterations = 100;
// These channels that will stay in the registry for the duration of the test.
// These channels will stay in the registry for the duration of the test.
std::vector<RefCountedPtr<BaseNode>> even_channels;
even_channels.reserve(kLoopIterations);
std::vector<intptr_t> odd_uuids;
odd_uuids.reserve(kLoopIterations);
{
// The channels will unregister themselves at the end of the for block.
// These channels will unregister themselves at the end of this block.
std::vector<RefCountedPtr<BaseNode>> odd_channels;
odd_channels.reserve(kLoopIterations);
for (int i = 0; i < kLoopIterations; i++) {
@ -157,6 +124,7 @@ TEST_F(ChannelzRegistryTest, TestGetAfterCompaction) {
odd_uuids.push_back(odd_channels[i]->uuid());
}
}
// Check that the even channels are present and the odd channels are not.
for (int i = 0; i < kLoopIterations; i++) {
RefCountedPtr<BaseNode> retrieved =
ChannelzRegistry::Get(even_channels[i]->uuid());
@ -164,27 +132,8 @@ TEST_F(ChannelzRegistryTest, TestGetAfterCompaction) {
retrieved = ChannelzRegistry::Get(odd_uuids[i]);
EXPECT_EQ(retrieved, nullptr);
}
}
TEST_F(ChannelzRegistryTest, TestAddAfterCompaction) {
const int kLoopIterations = 100;
// These channels that will stay in the registry for the duration of the test.
std::vector<RefCountedPtr<BaseNode>> even_channels;
even_channels.reserve(kLoopIterations);
std::vector<intptr_t> odd_uuids;
odd_uuids.reserve(kLoopIterations);
{
// The channels will unregister themselves at the end of the for block.
std::vector<RefCountedPtr<BaseNode>> odd_channels;
odd_channels.reserve(kLoopIterations);
for (int i = 0; i < kLoopIterations; i++) {
even_channels.push_back(
MakeRefCounted<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
odd_channels.push_back(
MakeRefCounted<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
odd_uuids.push_back(odd_channels[i]->uuid());
}
}
// Add more channels and verify that they get added correctly, to make
// sure that the unregistration didn't leave the registry in a weird state.
std::vector<RefCountedPtr<BaseNode>> more_channels;
more_channels.reserve(kLoopIterations);
for (int i = 0; i < kLoopIterations; i++) {

@ -419,6 +419,24 @@ TEST_F(MapTest, RandomOpsWithIntKey) {
EXPECT_TRUE(test_map.empty());
}
// Tests lower_bound().
TEST_F(MapTest, LowerBound) {
Map<int, Payload> test_map;
for (int i = 0; i < 10; i += 2) {
test_map.emplace(i, Payload(i));
}
auto it = test_map.lower_bound(-1);
EXPECT_EQ(it, test_map.begin());
it = test_map.lower_bound(0);
EXPECT_EQ(it, test_map.begin());
it = test_map.lower_bound(2);
EXPECT_EQ(it->first, 2);
it = test_map.lower_bound(3);
EXPECT_EQ(it->first, 4);
it = test_map.lower_bound(9);
EXPECT_EQ(it, test_map.end());
}
} // namespace testing
} // namespace grpc_core

Loading…
Cancel
Save