[StatsPlugin] Use lock-free list for global stats plugins list (#38060)

Closes #38060

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/38060 from yashykt:MetricsLockFree d5bdaea2b5
PiperOrigin-RevId: 696320956
pull/38120/head^2
Yash Tibrewal 2 weeks ago committed by Copybara-Service
parent 06b2452feb
commit c0f22d125f
  1. 27
      src/core/telemetry/metrics.cc
  2. 10
      src/core/telemetry/metrics.h
  3. 33
      test/core/telemetry/metrics_test.cc
  4. 10
      test/core/test_util/fake_stats_plugin.h

@ -120,27 +120,30 @@ void GlobalStatsPluginRegistry::StatsPluginGroup::AddServerCallTracers(
} }
} }
NoDestruct<Mutex> GlobalStatsPluginRegistry::mutex_; std::atomic<GlobalStatsPluginRegistry::GlobalStatsPluginNode*>
NoDestruct<std::vector<std::shared_ptr<StatsPlugin>>>
GlobalStatsPluginRegistry::plugins_; GlobalStatsPluginRegistry::plugins_;
void GlobalStatsPluginRegistry::RegisterStatsPlugin( void GlobalStatsPluginRegistry::RegisterStatsPlugin(
std::shared_ptr<StatsPlugin> plugin) { std::shared_ptr<StatsPlugin> plugin) {
MutexLock lock(&*mutex_); GlobalStatsPluginNode* node = new GlobalStatsPluginNode();
plugins_->push_back(std::move(plugin)); node->plugin = std::move(plugin);
node->next = plugins_.load(std::memory_order_relaxed);
while (!plugins_.compare_exchange_weak(
node->next, node, std::memory_order_acq_rel, std::memory_order_relaxed)) {
}
} }
GlobalStatsPluginRegistry::StatsPluginGroup GlobalStatsPluginRegistry::StatsPluginGroup
GlobalStatsPluginRegistry::GetStatsPluginsForChannel( GlobalStatsPluginRegistry::GetStatsPluginsForChannel(
const experimental::StatsPluginChannelScope& scope) { const experimental::StatsPluginChannelScope& scope) {
MutexLock lock(&*mutex_);
StatsPluginGroup group; StatsPluginGroup group;
for (const auto& plugin : *plugins_) { for (GlobalStatsPluginNode* node = plugins_.load(std::memory_order_acquire);
node != nullptr; node = node->next) {
bool is_enabled = false; bool is_enabled = false;
std::shared_ptr<StatsPlugin::ScopeConfig> config; std::shared_ptr<StatsPlugin::ScopeConfig> config;
std::tie(is_enabled, config) = plugin->IsEnabledForChannel(scope); std::tie(is_enabled, config) = node->plugin->IsEnabledForChannel(scope);
if (is_enabled) { if (is_enabled) {
group.AddStatsPlugin(plugin, std::move(config)); group.AddStatsPlugin(node->plugin, std::move(config));
} }
} }
return group; return group;
@ -148,14 +151,14 @@ GlobalStatsPluginRegistry::GetStatsPluginsForChannel(
GlobalStatsPluginRegistry::StatsPluginGroup GlobalStatsPluginRegistry::StatsPluginGroup
GlobalStatsPluginRegistry::GetStatsPluginsForServer(const ChannelArgs& args) { GlobalStatsPluginRegistry::GetStatsPluginsForServer(const ChannelArgs& args) {
MutexLock lock(&*mutex_);
StatsPluginGroup group; StatsPluginGroup group;
for (const auto& plugin : *plugins_) { for (GlobalStatsPluginNode* node = plugins_.load(std::memory_order_acquire);
node != nullptr; node = node->next) {
bool is_enabled = false; bool is_enabled = false;
std::shared_ptr<StatsPlugin::ScopeConfig> config; std::shared_ptr<StatsPlugin::ScopeConfig> config;
std::tie(is_enabled, config) = plugin->IsEnabledForServer(args); std::tie(is_enabled, config) = node->plugin->IsEnabledForServer(args);
if (is_enabled) { if (is_enabled) {
group.AddStatsPlugin(plugin, std::move(config)); group.AddStatsPlugin(node->plugin, std::move(config));
} }
} }
return group; return group;

@ -446,6 +446,8 @@ class GlobalStatsPluginRegistry {
return false; return false;
} }
size_t size() const { return plugins_state_.size(); }
// Registers a callback to be used to populate callback metrics. // Registers a callback to be used to populate callback metrics.
// The callback will update the specified metrics. The callback // The callback will update the specified metrics. The callback
// will be invoked no more often than min_interval. Multiple callbacks may // will be invoked no more often than min_interval. Multiple callbacks may
@ -508,13 +510,15 @@ class GlobalStatsPluginRegistry {
static StatsPluginGroup GetStatsPluginsForServer(const ChannelArgs& args); static StatsPluginGroup GetStatsPluginsForServer(const ChannelArgs& args);
private: private:
struct GlobalStatsPluginNode {
std::shared_ptr<StatsPlugin> plugin;
GlobalStatsPluginNode* next = nullptr;
};
friend class GlobalStatsPluginRegistryTestPeer; friend class GlobalStatsPluginRegistryTestPeer;
GlobalStatsPluginRegistry() = default; GlobalStatsPluginRegistry() = default;
static NoDestruct<Mutex> mutex_; static std::atomic<GlobalStatsPluginNode*> plugins_;
static NoDestruct<std::vector<std::shared_ptr<StatsPlugin>>> plugins_
ABSL_GUARDED_BY(mutex_);
}; };
// A metric callback that is registered with a stats plugin group. // A metric callback that is registered with a stats plugin group.

@ -15,6 +15,7 @@
#include "src/core/telemetry/metrics.h" #include "src/core/telemetry/metrics.h"
#include <memory> #include <memory>
#include <thread>
#include "absl/log/log.h" #include "absl/log/log.h"
#include "gmock/gmock.h" #include "gmock/gmock.h"
@ -648,6 +649,38 @@ TEST_F(MetricsTest, FindInstrumentByName) {
::testing::Eq(uint64_counter_handle.index)))); ::testing::Eq(uint64_counter_handle.index))));
} }
TEST_F(MetricsTest, ParallelStatsPluginRegistrationAndLookup) {
std::vector<std::thread> register_threads;
std::vector<std::thread> lookup_threads;
register_threads.reserve(100);
lookup_threads.reserve(100);
// 100 threads that register 100 stats plugins each
for (int i = 0; i < 100; ++i) {
register_threads.emplace_back([] {
for (int j = 0; j < 100; ++j) {
FakeStatsPluginBuilder().BuildAndRegister();
}
});
}
// 100 threads that keep looking up stats plugins till they see 10000 stats
// plugins
for (int i = 0; i < 100; ++i) {
lookup_threads.emplace_back([this] {
while (GlobalStatsPluginRegistry::GetStatsPluginsForChannel(
StatsPluginChannelScope("", "", endpoint_config_))
.size() < 10000) {
};
});
}
for (int i = 0; i < 100; ++i) {
register_threads[i].join();
lookup_threads[i].join();
}
EXPECT_THAT(GlobalStatsPluginRegistry::GetStatsPluginsForChannel(
StatsPluginChannelScope("", "", endpoint_config_)),
::testing::SizeIs(10000));
}
using MetricsDeathTest = MetricsTest; using MetricsDeathTest = MetricsTest;
TEST_F(MetricsDeathTest, RegisterTheSameMetricNameWouldCrash) { TEST_F(MetricsDeathTest, RegisterTheSameMetricNameWouldCrash) {

@ -696,8 +696,14 @@ class GlobalInstrumentsRegistryTestPeer {
class GlobalStatsPluginRegistryTestPeer { class GlobalStatsPluginRegistryTestPeer {
public: public:
static void ResetGlobalStatsPluginRegistry() { static void ResetGlobalStatsPluginRegistry() {
MutexLock lock(&*GlobalStatsPluginRegistry::mutex_); GlobalStatsPluginRegistry::GlobalStatsPluginNode* node =
GlobalStatsPluginRegistry::plugins_->clear(); GlobalStatsPluginRegistry::plugins_.exchange(nullptr,
std::memory_order_acq_rel);
while (node != nullptr) {
GlobalStatsPluginRegistry::GlobalStatsPluginNode* next = node->next;
delete node;
node = next;
}
} }
}; };

Loading…
Cancel
Save