diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc index 0c033a14c60..553e1fe97b5 100644 --- a/src/core/lib/channel/channelz_registry.cc +++ b/src/core/lib/channel/channelz_registry.cc @@ -18,6 +18,9 @@ #include +#include +#include + #include "src/core/lib/channel/channel_trace.h" #include "src/core/lib/channel/channelz.h" #include "src/core/lib/channel/channelz_registry.h" @@ -29,8 +32,6 @@ #include #include -#include - namespace grpc_core { namespace channelz { namespace { @@ -51,70 +52,17 @@ ChannelzRegistry* ChannelzRegistry::Default() { return g_channelz_registry; } -ChannelzRegistry::ChannelzRegistry() { gpr_mu_init(&mu_); } - -ChannelzRegistry::~ChannelzRegistry() { gpr_mu_destroy(&mu_); } - void ChannelzRegistry::InternalRegister(BaseNode* node) { MutexLock lock(&mu_); - entities_.push_back(node); node->uuid_ = ++uuid_generator_; -} - -void ChannelzRegistry::MaybePerformCompactionLocked() { - constexpr double kEmptinessTheshold = 1. / 3; - double emptiness_ratio = - double(num_empty_slots_) / double(entities_.capacity()); - if (emptiness_ratio > kEmptinessTheshold) { - int front = 0; - for (size_t i = 0; i < entities_.size(); ++i) { - if (entities_[i] != nullptr) { - entities_[front++] = entities_[i]; - } - } - for (int i = 0; i < num_empty_slots_; ++i) { - entities_.pop_back(); - } - num_empty_slots_ = 0; - } -} - -int ChannelzRegistry::FindByUuidLocked(intptr_t target_uuid, - bool direct_hit_needed) { - int left = 0; - int right = int(entities_.size() - 1); - while (left <= right) { - int true_middle = left + (right - left) / 2; - int first_non_null = true_middle; - while (first_non_null < right && entities_[first_non_null] == nullptr) { - first_non_null++; - } - if (entities_[first_non_null] == nullptr) { - right = true_middle - 1; - continue; - } - intptr_t uuid = entities_[first_non_null]->uuid(); - if (uuid == target_uuid) { - return int(first_non_null); - } - if (uuid < target_uuid) { - left = first_non_null + 1; - } else { - right = true_middle - 1; - } - } - return direct_hit_needed ? -1 : left; + node_map_[node->uuid_] = node; } void ChannelzRegistry::InternalUnregister(intptr_t uuid) { GPR_ASSERT(uuid >= 1); MutexLock lock(&mu_); GPR_ASSERT(uuid <= uuid_generator_); - int idx = FindByUuidLocked(uuid, true); - GPR_ASSERT(idx >= 0); - entities_[idx] = nullptr; - num_empty_slots_++; - MaybePerformCompactionLocked(); + node_map_.erase(uuid); } RefCountedPtr ChannelzRegistry::InternalGet(intptr_t uuid) { @@ -122,12 +70,13 @@ RefCountedPtr ChannelzRegistry::InternalGet(intptr_t uuid) { if (uuid < 1 || uuid > uuid_generator_) { return nullptr; } - int idx = FindByUuidLocked(uuid, true); - if (idx < 0 || entities_[idx] == nullptr) return nullptr; + auto it = node_map_.find(uuid); + if (it == node_map_.end()) return nullptr; // Found node. Return only if its refcount is not zero (i.e., when we // know that there is no other thread about to destroy it). - if (!entities_[idx]->RefIfNonZero()) return nullptr; - return RefCountedPtr(entities_[idx]); + BaseNode* node = it->second; + if (!node->RefIfNonZero()) return nullptr; + return RefCountedPtr(node); } char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) { @@ -138,13 +87,11 @@ char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) { RefCountedPtr node_after_pagination_limit; { MutexLock lock(&mu_); - const int start_idx = GPR_MAX(FindByUuidLocked(start_channel_id, false), 0); - for (size_t i = start_idx; i < entities_.size(); ++i) { - if (entities_[i] != nullptr && - entities_[i]->type() == - grpc_core::channelz::BaseNode::EntityType::kTopLevelChannel && - entities_[i]->uuid() >= start_channel_id && - entities_[i]->RefIfNonZero()) { + for (auto it = node_map_.lower_bound(start_channel_id); + it != node_map_.end(); ++it) { + BaseNode* node = it->second; + if (node->type() == BaseNode::EntityType::kTopLevelChannel && + node->RefIfNonZero()) { // Check if we are over pagination limit to determine if we need to set // the "end" element. If we don't go through this block, we know that // when the loop terminates, we have <= to kPaginationLimit. @@ -152,10 +99,10 @@ char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) { // refcount, we need to decrease it, but we can't unref while // holding the lock, because this may lead to a deadlock. if (top_level_channels.size() == kPaginationLimit) { - node_after_pagination_limit.reset(entities_[i]); + node_after_pagination_limit.reset(node); break; } - top_level_channels.emplace_back(entities_[i]); + top_level_channels.emplace_back(node); } } } @@ -186,13 +133,11 @@ char* ChannelzRegistry::InternalGetServers(intptr_t start_server_id) { RefCountedPtr node_after_pagination_limit; { MutexLock lock(&mu_); - const int start_idx = GPR_MAX(FindByUuidLocked(start_server_id, false), 0); - for (size_t i = start_idx; i < entities_.size(); ++i) { - if (entities_[i] != nullptr && - entities_[i]->type() == - grpc_core::channelz::BaseNode::EntityType::kServer && - entities_[i]->uuid() >= start_server_id && - entities_[i]->RefIfNonZero()) { + for (auto it = node_map_.lower_bound(start_server_id); + it != node_map_.end(); ++it) { + BaseNode* node = it->second; + if (node->type() == BaseNode::EntityType::kServer && + node->RefIfNonZero()) { // Check if we are over pagination limit to determine if we need to set // the "end" element. If we don't go through this block, we know that // when the loop terminates, we have <= to kPaginationLimit. @@ -200,10 +145,10 @@ char* ChannelzRegistry::InternalGetServers(intptr_t start_server_id) { // refcount, we need to decrease it, but we can't unref while // holding the lock, because this may lead to a deadlock. if (servers.size() == kPaginationLimit) { - node_after_pagination_limit.reset(entities_[i]); + node_after_pagination_limit.reset(node); break; } - servers.emplace_back(entities_[i]); + servers.emplace_back(node); } } } @@ -230,9 +175,10 @@ void ChannelzRegistry::InternalLogAllEntities() { InlinedVector, 10> nodes; { MutexLock lock(&mu_); - for (size_t i = 0; i < entities_.size(); ++i) { - if (entities_[i] != nullptr && entities_[i]->RefIfNonZero()) { - nodes.emplace_back(entities_[i]); + for (auto& p : node_map_) { + BaseNode* node = p.second; + if (node->RefIfNonZero()) { + nodes.emplace_back(node); } } } diff --git a/src/core/lib/channel/channelz_registry.h b/src/core/lib/channel/channelz_registry.h index b9d42ecf4d6..e04d7c44888 100644 --- a/src/core/lib/channel/channelz_registry.h +++ b/src/core/lib/channel/channelz_registry.h @@ -21,19 +21,16 @@ #include +#include + #include "src/core/lib/channel/channel_trace.h" #include "src/core/lib/channel/channelz.h" -#include "src/core/lib/gprpp/inlined_vector.h" - -#include +#include "src/core/lib/gprpp/map.h" +#include "src/core/lib/gprpp/sync.h" namespace grpc_core { namespace channelz { -namespace testing { -class ChannelzRegistryPeer; -} - // singleton registry object to track all objects that are needed to support // channelz bookkeeping. All objects share globally distributed uuids. class ChannelzRegistry { @@ -69,13 +66,6 @@ class ChannelzRegistry { static void LogAllEntities() { Default()->InternalLogAllEntities(); } private: - GRPC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW - GRPC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE - friend class testing::ChannelzRegistryPeer; - - ChannelzRegistry(); - ~ChannelzRegistry(); - // Returned the singleton instance of ChannelzRegistry; static ChannelzRegistry* Default(); @@ -93,22 +83,12 @@ class ChannelzRegistry { char* InternalGetTopChannels(intptr_t start_channel_id); char* InternalGetServers(intptr_t start_server_id); - // If entities_ has over a certain threshold of empty slots, it will - // compact the vector and move all used slots to the front. - void MaybePerformCompactionLocked(); - - // Performs binary search on entities_ to find the index with that uuid. - // If direct_hit_needed, then will return -1 in case of absence. - // Else, will return idx of the first uuid higher than the target. - int FindByUuidLocked(intptr_t uuid, bool direct_hit_needed); - void InternalLogAllEntities(); // protects members - gpr_mu mu_; - InlinedVector entities_; + Mutex mu_; + Map node_map_; intptr_t uuid_generator_ = 0; - int num_empty_slots_ = 0; }; } // namespace channelz diff --git a/src/core/lib/gprpp/map.h b/src/core/lib/gprpp/map.h index 525c25347f4..36e32d60c07 100644 --- a/src/core/lib/gprpp/map.h +++ b/src/core/lib/gprpp/map.h @@ -22,8 +22,11 @@ #include #include + +#include #include #include + #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/pair.h" @@ -88,6 +91,13 @@ class Map { iterator end() { return iterator(this, nullptr); } + iterator lower_bound(const Key& k) { + key_compare compare; + return std::find_if(begin(), end(), [&k, &compare](const value_type& v) { + return !compare(v.first, k); + }); + } + private: friend class testing::MapTest; struct Entry { diff --git a/test/core/channel/channelz_registry_test.cc b/test/core/channel/channelz_registry_test.cc index 030d52fd548..deb85d85624 100644 --- a/test/core/channel/channelz_registry_test.cc +++ b/test/core/channel/channelz_registry_test.cc @@ -43,16 +43,6 @@ namespace grpc_core { namespace channelz { namespace testing { -class ChannelzRegistryPeer { - public: - const InlinedVector* entities() { - return &ChannelzRegistry::Default()->entities_; - } - int num_empty_slots() { - return ChannelzRegistry::Default()->num_empty_slots_; - } -}; - class ChannelzRegistryTest : public ::testing::Test { protected: // ensure we always have a fresh registry for tests. @@ -115,38 +105,15 @@ TEST_F(ChannelzRegistryTest, NullIfNotPresentTest) { EXPECT_EQ(channelz_channel, retrieved); } -TEST_F(ChannelzRegistryTest, TestCompaction) { - const int kLoopIterations = 300; - // These channels that will stay in the registry for the duration of the test. - std::vector> even_channels; - even_channels.reserve(kLoopIterations); - { - // The channels will unregister themselves at the end of the for block. - std::vector> odd_channels; - odd_channels.reserve(kLoopIterations); - for (int i = 0; i < kLoopIterations; i++) { - even_channels.push_back( - MakeRefCounted(BaseNode::EntityType::kTopLevelChannel)); - odd_channels.push_back( - MakeRefCounted(BaseNode::EntityType::kTopLevelChannel)); - } - } - // without compaction, there would be exactly kLoopIterations empty slots at - // this point. However, one of the unregisters should have triggered - // compaction. - ChannelzRegistryPeer peer; - EXPECT_LT(peer.num_empty_slots(), kLoopIterations); -} - -TEST_F(ChannelzRegistryTest, TestGetAfterCompaction) { +TEST_F(ChannelzRegistryTest, TestUnregistration) { const int kLoopIterations = 100; - // These channels that will stay in the registry for the duration of the test. + // These channels will stay in the registry for the duration of the test. std::vector> even_channels; even_channels.reserve(kLoopIterations); std::vector odd_uuids; odd_uuids.reserve(kLoopIterations); { - // The channels will unregister themselves at the end of the for block. + // These channels will unregister themselves at the end of this block. std::vector> odd_channels; odd_channels.reserve(kLoopIterations); for (int i = 0; i < kLoopIterations; i++) { @@ -157,6 +124,7 @@ TEST_F(ChannelzRegistryTest, TestGetAfterCompaction) { odd_uuids.push_back(odd_channels[i]->uuid()); } } + // Check that the even channels are present and the odd channels are not. for (int i = 0; i < kLoopIterations; i++) { RefCountedPtr retrieved = ChannelzRegistry::Get(even_channels[i]->uuid()); @@ -164,27 +132,8 @@ TEST_F(ChannelzRegistryTest, TestGetAfterCompaction) { retrieved = ChannelzRegistry::Get(odd_uuids[i]); EXPECT_EQ(retrieved, nullptr); } -} - -TEST_F(ChannelzRegistryTest, TestAddAfterCompaction) { - const int kLoopIterations = 100; - // These channels that will stay in the registry for the duration of the test. - std::vector> even_channels; - even_channels.reserve(kLoopIterations); - std::vector odd_uuids; - odd_uuids.reserve(kLoopIterations); - { - // The channels will unregister themselves at the end of the for block. - std::vector> odd_channels; - odd_channels.reserve(kLoopIterations); - for (int i = 0; i < kLoopIterations; i++) { - even_channels.push_back( - MakeRefCounted(BaseNode::EntityType::kTopLevelChannel)); - odd_channels.push_back( - MakeRefCounted(BaseNode::EntityType::kTopLevelChannel)); - odd_uuids.push_back(odd_channels[i]->uuid()); - } - } + // Add more channels and verify that they get added correctly, to make + // sure that the unregistration didn't leave the registry in a weird state. std::vector> more_channels; more_channels.reserve(kLoopIterations); for (int i = 0; i < kLoopIterations; i++) { diff --git a/test/core/gprpp/map_test.cc b/test/core/gprpp/map_test.cc index 6e70a2a2ac2..30d9eb0b207 100644 --- a/test/core/gprpp/map_test.cc +++ b/test/core/gprpp/map_test.cc @@ -419,6 +419,24 @@ TEST_F(MapTest, RandomOpsWithIntKey) { EXPECT_TRUE(test_map.empty()); } +// Tests lower_bound(). +TEST_F(MapTest, LowerBound) { + Map test_map; + for (int i = 0; i < 10; i += 2) { + test_map.emplace(i, Payload(i)); + } + auto it = test_map.lower_bound(-1); + EXPECT_EQ(it, test_map.begin()); + it = test_map.lower_bound(0); + EXPECT_EQ(it, test_map.begin()); + it = test_map.lower_bound(2); + EXPECT_EQ(it->first, 2); + it = test_map.lower_bound(3); + EXPECT_EQ(it->first, 4); + it = test_map.lower_bound(9); + EXPECT_EQ(it, test_map.end()); +} + } // namespace testing } // namespace grpc_core