[resource_quota] Add a mechanism to query all of the memory quotas in the system (#34169)

Pre-req for adding observability for this stuff
pull/34211/head
Craig Tiller 2 years ago committed by GitHub
parent 79a983472c
commit be22006bc1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 60
      src/core/lib/resource_quota/memory_quota.cc
  2. 5
      src/core/lib/resource_quota/memory_quota.h
  3. 18
      test/core/resource_quota/memory_quota_test.cc

@ -37,12 +37,60 @@
namespace grpc_core {
namespace {
// Maximum number of bytes an allocator will request from a quota in one step.
// Larger allocations than this will require multiple allocation requests.
static constexpr size_t kMaxReplenishBytes = 1024 * 1024;
constexpr size_t kMaxReplenishBytes = 1024 * 1024;
// Minimum number of bytes an allocator will request from a quota in one step.
static constexpr size_t kMinReplenishBytes = 4096;
constexpr size_t kMinReplenishBytes = 4096;
class MemoryQuotaTracker {
public:
static MemoryQuotaTracker& Get() {
static MemoryQuotaTracker* tracker = new MemoryQuotaTracker();
return *tracker;
}
void Add(std::shared_ptr<BasicMemoryQuota> quota) {
MutexLock lock(&mu_);
// Common usage is that we only create a few (one or two) quotas.
// We'd like to ensure that we don't OOM if more are added - and
// using a weak_ptr here, whilst nicely braindead, does run that
// risk.
// If usage patterns change sufficiently we'll likely want to
// change this class to have a more sophisticated data structure
// and probably a Remove() method.
GatherAndGarbageCollect();
quotas_.push_back(quota);
}
std::vector<std::shared_ptr<BasicMemoryQuota>> All() {
MutexLock lock(&mu_);
return GatherAndGarbageCollect();
}
private:
MemoryQuotaTracker() {}
std::vector<std::shared_ptr<BasicMemoryQuota>> GatherAndGarbageCollect()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
std::vector<std::weak_ptr<BasicMemoryQuota>> new_quotas;
std::vector<std::shared_ptr<BasicMemoryQuota>> all_quotas;
for (const auto& quota : quotas_) {
auto p = quota.lock();
if (p == nullptr) continue;
new_quotas.push_back(quota);
all_quotas.push_back(p);
}
quotas_.swap(new_quotas);
return all_quotas;
}
Mutex mu_;
std::vector<std::weak_ptr<BasicMemoryQuota>> quotas_ ABSL_GUARDED_BY(mu_);
};
} // namespace
//
// Reclaimer
@ -314,9 +362,13 @@ class BasicMemoryQuota::WaitForSweepPromise {
uint64_t token_;
};
BasicMemoryQuota::BasicMemoryQuota(std::string name) : name_(std::move(name)) {}
void BasicMemoryQuota::Start() {
auto self = shared_from_this();
MemoryQuotaTracker::Get().Add(self);
// Reclamation loop:
// basically, wait until we are in overcommit (free_bytes_ < 0), and then:
// while (free_bytes_ < 0) reclaim_memory()
@ -695,4 +747,8 @@ MemoryOwner MemoryQuota::CreateMemoryOwner(absl::string_view name) {
return MemoryOwner(std::move(impl));
}
std::vector<std::shared_ptr<BasicMemoryQuota>> AllMemoryQuotas() {
return MemoryQuotaTracker::Get().All();
}
} // namespace grpc_core

@ -26,6 +26,7 @@
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_set.h"
@ -296,7 +297,7 @@ class BasicMemoryQuota final
size_t max_recommended_allocation_size = 0;
};
explicit BasicMemoryQuota(std::string name) : name_(std::move(name)) {}
explicit BasicMemoryQuota(std::string name);
// Start the reclamation activity.
void Start();
@ -586,6 +587,8 @@ inline MemoryQuotaRefPtr MakeMemoryQuota(std::string name) {
return std::make_shared<MemoryQuota>(std::move(name));
}
std::vector<std::shared_ptr<BasicMemoryQuota>> AllMemoryQuotas();
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_RESOURCE_QUOTA_MEMORY_QUOTA_H

@ -18,6 +18,7 @@
#include <atomic>
#include <chrono>
#include <random>
#include <set>
#include <thread>
#include <vector>
@ -169,6 +170,23 @@ TEST(MemoryQuotaTest, NoBunchingIfIdle) {
EXPECT_GE(count_reclaimers_called.load(std::memory_order_relaxed), 8000);
}
TEST(MemoryQuotaTest, AllMemoryQuotas) {
auto gather = []() {
std::set<std::string> all_names;
for (const auto& q : AllMemoryQuotas()) {
all_names.emplace(q->name());
}
return all_names;
};
auto m1 = MakeMemoryQuota("m1");
auto m2 = MakeMemoryQuota("m2");
EXPECT_EQ(gather(), std::set<std::string>({"m1", "m2"}));
m1.reset();
EXPECT_EQ(gather(), std::set<std::string>({"m2"}));
}
} // namespace testing
namespace memory_quota_detail {

Loading…
Cancel
Save