From ba308b46ed89f910c7a66ee52c9336b5507741d3 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 2 Jun 2022 16:22:48 -0700 Subject: [PATCH] [resource-quota] Remove rebind (#29851) * [resource-quota] Remove rebind * fix --- src/core/lib/resource_quota/memory_quota.cc | 78 +++++-------------- src/core/lib/resource_quota/memory_quota.h | 36 +++------ .../resource_quota/memory_quota_fuzzer.cc | 6 -- .../resource_quota/memory_quota_fuzzer.proto | 2 +- .../memory_quota_stress_test.cc | 11 +-- test/core/resource_quota/memory_quota_test.cc | 42 ---------- 6 files changed, 31 insertions(+), 144 deletions(-) diff --git a/src/core/lib/resource_quota/memory_quota.cc b/src/core/lib/resource_quota/memory_quota.cc index 710ff1820e5..2c0792d16ab 100644 --- a/src/core/lib/resource_quota/memory_quota.cc +++ b/src/core/lib/resource_quota/memory_quota.cc @@ -165,7 +165,7 @@ GrpcMemoryAllocatorImpl::GrpcMemoryAllocatorImpl( GrpcMemoryAllocatorImpl::~GrpcMemoryAllocatorImpl() { GPR_ASSERT(free_bytes_.load(std::memory_order_acquire) + sizeof(GrpcMemoryAllocatorImpl) == - taken_bytes_); + taken_bytes_.load(std::memory_order_relaxed)); memory_quota_->Return(taken_bytes_); } @@ -174,7 +174,7 @@ void GrpcMemoryAllocatorImpl::Shutdown() { OrphanablePtr reclamation_handles[kNumReclamationPasses]; { - MutexLock lock(&memory_quota_mu_); + MutexLock lock(&reclaimer_mu_); GPR_ASSERT(!shutdown_); shutdown_ = true; memory_quota = memory_quota_; @@ -207,16 +207,11 @@ absl::optional GrpcMemoryAllocatorImpl::TryReserve( // Scale the request down according to memory pressure if we have that // flexibility. if (scaled_size_over_min != 0) { - double pressure; - size_t max_recommended_allocation_size; - { - MutexLock lock(&memory_quota_mu_); - const auto pressure_and_max_recommended_allocation_size = - memory_quota_->InstantaneousPressureAndMaxRecommendedAllocationSize(); - pressure = pressure_and_max_recommended_allocation_size.first; - max_recommended_allocation_size = - pressure_and_max_recommended_allocation_size.second; - } + const auto pressure_and_max_recommended_allocation_size = + memory_quota_->InstantaneousPressureAndMaxRecommendedAllocationSize(); + double pressure = pressure_and_max_recommended_allocation_size.first; + size_t max_recommended_allocation_size = + pressure_and_max_recommended_allocation_size.second; // Reduce allocation size proportional to the pressure > 80% usage. if (pressure > 0.8) { scaled_size_over_min = @@ -265,9 +260,7 @@ void GrpcMemoryAllocatorImpl::MaybeDonateBack() { gpr_log(GPR_INFO, "[%p|%s] Early return %" PRIdPTR " bytes", this, name_.c_str(), ret); } - MutexLock lock(&memory_quota_mu_); - GPR_ASSERT(taken_bytes_ >= ret); - taken_bytes_ -= ret; + GPR_ASSERT(taken_bytes_.fetch_sub(ret, std::memory_order_relaxed) >= ret); memory_quota_->Return(ret); return; } @@ -275,29 +268,26 @@ void GrpcMemoryAllocatorImpl::MaybeDonateBack() { } void GrpcMemoryAllocatorImpl::Replenish() { - MutexLock lock(&memory_quota_mu_); - GPR_ASSERT(!shutdown_); // Attempt a fairly low rate exponential growth request size, bounded between // some reasonable limits declared at top of file. - auto amount = Clamp(taken_bytes_ / 3, kMinReplenishBytes, kMaxReplenishBytes); + auto amount = Clamp(taken_bytes_.load(std::memory_order_relaxed) / 3, + kMinReplenishBytes, kMaxReplenishBytes); // Take the requested amount from the quota. memory_quota_->Take(amount); // Record that we've taken it. - taken_bytes_ += amount; + taken_bytes_.fetch_add(amount, std::memory_order_relaxed); // Add the taken amount to the free pool. free_bytes_.fetch_add(amount, std::memory_order_acq_rel); // See if we can add ourselves as a reclaimer. - MaybeRegisterReclaimerLocked(); + MaybeRegisterReclaimer(); } void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimer() { - MutexLock lock(&memory_quota_mu_); - MaybeRegisterReclaimerLocked(); -} - -void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimerLocked() { // If the reclaimer is already registered, then there's nothing to do. - if (registered_reclaimer_) return; + if (registered_reclaimer_.exchange(true, std::memory_order_relaxed)) { + return; + } + MutexLock lock(&reclaimer_mu_); if (shutdown_) return; // Grab references to the things we'll need auto self = shared_from_this(); @@ -308,49 +298,17 @@ void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimerLocked() { auto self = self_weak.lock(); if (self == nullptr) return; auto* p = static_cast(self.get()); - MutexLock lock(&p->memory_quota_mu_); - p->registered_reclaimer_ = false; + p->registered_reclaimer_.store(false, std::memory_order_relaxed); // Figure out how many bytes we can return to the quota. size_t return_bytes = p->free_bytes_.exchange(0, std::memory_order_acq_rel); if (return_bytes == 0) return; // Subtract that from our outstanding balance. - p->taken_bytes_ -= return_bytes; + p->taken_bytes_.fetch_sub(return_bytes); // And return them to the quota. p->memory_quota_->Return(return_bytes); }); } -void GrpcMemoryAllocatorImpl::Rebind( - std::shared_ptr memory_quota) { - MutexLock lock(&memory_quota_mu_); - GPR_ASSERT(!shutdown_); - if (memory_quota_ == memory_quota) return; - // Return memory to the original memory quota. - memory_quota_->Return(taken_bytes_); - // Reassign any queued reclaimers - for (size_t i = 0; i < kNumReclamationPasses; i++) { - if (reclamation_handles_[i] != nullptr) { - reclamation_handles_[i]->Requeue(memory_quota->reclaimer_queue(i)); - } - } - // Switch to the new memory quota, leaving the old one in memory_quota so that - // when we unref it, we are outside of lock. - memory_quota_.swap(memory_quota); - // Drop our freed memory down to zero, to avoid needing to ask the new - // quota for memory we're not currently using. - taken_bytes_ -= free_bytes_.exchange(0, std::memory_order_acq_rel); - // And let the new quota know how much we're already using. - memory_quota_->Take(taken_bytes_); -} - -// -// MemoryOwner -// - -void MemoryOwner::Rebind(MemoryQuota* quota) { - impl()->Rebind(quota->memory_quota_); -} - // // BasicMemoryQuota // diff --git a/src/core/lib/resource_quota/memory_quota.h b/src/core/lib/resource_quota/memory_quota.h index a74b813c26c..06741a0d803 100644 --- a/src/core/lib/resource_quota/memory_quota.h +++ b/src/core/lib/resource_quota/memory_quota.h @@ -288,11 +288,6 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl { std::shared_ptr memory_quota, std::string name); ~GrpcMemoryAllocatorImpl() override; - // Rebind - Swaps the underlying quota for this allocator, taking care to - // make sure memory allocated is moved to allocations against the new quota. - void Rebind(std::shared_ptr memory_quota) - ABSL_LOCKS_EXCLUDED(memory_quota_mu_); - // Reserve bytes from the quota. // If we enter overcommit, reclamation will begin concurrently. // Returns the number of bytes reserved. @@ -315,7 +310,7 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl { // Post a reclamation function. template void PostReclaimer(ReclamationPass pass, F fn) { - MutexLock lock(&memory_quota_mu_); + MutexLock lock(&reclaimer_mu_); GPR_ASSERT(!shutdown_); InsertReclaimer(static_cast(pass), std::move(fn)); } @@ -325,7 +320,6 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl { // Read the instantaneous memory pressure double InstantaneousPressure() const { - MutexLock lock(&memory_quota_mu_); return memory_quota_->InstantaneousPressureAndMaxRecommendedAllocationSize() .first; } @@ -344,39 +338,34 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl { void MaybeDonateBack(); // Replenish bytes from the quota, without blocking, possibly entering // overcommit. - void Replenish() ABSL_LOCKS_EXCLUDED(memory_quota_mu_); + void Replenish(); // If we have not already, register a reclamation function against the quota // to sweep any free memory back to that quota. - void MaybeRegisterReclaimer() ABSL_LOCKS_EXCLUDED(memory_quota_mu_); - void MaybeRegisterReclaimerLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(memory_quota_mu_); + void MaybeRegisterReclaimer() ABSL_LOCKS_EXCLUDED(reclaimer_mu_); template void InsertReclaimer(size_t pass, F fn) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(memory_quota_mu_) { + ABSL_EXCLUSIVE_LOCKS_REQUIRED(reclaimer_mu_) { reclamation_handles_[pass] = memory_quota_->reclaimer_queue(pass)->Insert(std::move(fn)); } + // Backing resource quota. + const std::shared_ptr memory_quota_; // Amount of memory this allocator has cached for its own use: to avoid quota // contention, each MemoryAllocator can keep some memory in addition to what // it is immediately using, and the quota can pull it back under memory // pressure. std::atomic free_bytes_{0}; - // Mutex guarding the backing resource quota. - mutable Mutex memory_quota_mu_; - // Backing resource quota. - std::shared_ptr memory_quota_ - ABSL_GUARDED_BY(memory_quota_mu_); // Amount of memory taken from the quota by this allocator. - size_t taken_bytes_ ABSL_GUARDED_BY(memory_quota_mu_) = - sizeof(GrpcMemoryAllocatorImpl); - bool shutdown_ ABSL_GUARDED_BY(memory_quota_mu_) = false; - bool registered_reclaimer_ ABSL_GUARDED_BY(memory_quota_mu_) = false; + std::atomic taken_bytes_{sizeof(GrpcMemoryAllocatorImpl)}; + std::atomic registered_reclaimer_; + Mutex reclaimer_mu_; + bool shutdown_ ABSL_GUARDED_BY(reclaimer_mu_) = false; // Indices into the various reclaimer queues, used so that we can cancel // reclamation should we shutdown or get rebound. OrphanablePtr reclamation_handles_[kNumReclamationPasses] ABSL_GUARDED_BY( - memory_quota_mu_); + reclaimer_mu_); // Name of this allocator. std::string name_; }; @@ -402,9 +391,6 @@ class MemoryOwner final : public MemoryAllocator { impl()->PostReclaimer(pass, std::move(fn)); } - // Rebind to a different quota. - void Rebind(MemoryQuota* quota); - // Instantaneous memory pressure in the underlying quota. double InstantaneousPressure() const { return impl()->InstantaneousPressure(); diff --git a/test/core/resource_quota/memory_quota_fuzzer.cc b/test/core/resource_quota/memory_quota_fuzzer.cc index c4600d9288c..b155aa1bd1b 100644 --- a/test/core/resource_quota/memory_quota_fuzzer.cc +++ b/test/core/resource_quota/memory_quota_fuzzer.cc @@ -85,12 +85,6 @@ class Fuzzer { uint64_t{std::numeric_limits::max()})); }); break; - case memory_quota_fuzzer::Action::kRebindQuota: - WithQuota(action.quota(), [this, action](MemoryQuota* q) { - WithAllocator(action.allocator(), - [q](MemoryOwner* a) { a->Rebind(q); }); - }); - break; case memory_quota_fuzzer::Action::kCreateAllocation: { auto min = action.create_allocation().min(); auto max = action.create_allocation().max(); diff --git a/test/core/resource_quota/memory_quota_fuzzer.proto b/test/core/resource_quota/memory_quota_fuzzer.proto index 63d8a2e08e2..b71fc4094df 100644 --- a/test/core/resource_quota/memory_quota_fuzzer.proto +++ b/test/core/resource_quota/memory_quota_fuzzer.proto @@ -35,6 +35,7 @@ message AllocationRequest { } message Action { + reserved 15; int32 quota = 1; int32 allocator = 2; int32 allocation = 3; @@ -45,7 +46,6 @@ message Action { Empty create_allocator = 12; Empty delete_allocator = 13; uint64 set_quota_size = 14; - Empty rebind_quota = 15; AllocationRequest create_allocation = 16; Empty delete_allocation = 17; Reclaimer post_reclaimer = 18; diff --git a/test/core/resource_quota/memory_quota_stress_test.cc b/test/core/resource_quota/memory_quota_stress_test.cc index 0c8b3075091..2ec52d8a49f 100644 --- a/test/core/resource_quota/memory_quota_stress_test.cc +++ b/test/core/resource_quota/memory_quota_stress_test.cc @@ -40,10 +40,8 @@ class StressTest { void Run(int seconds) { std::vector threads; - // A few threads constantly rebinding allocators to different quotas. - threads.reserve(2 + 2 + 3 * allocators_.size()); - for (int i = 0; i < 2; i++) threads.push_back(Run(Rebinder)); // And another few threads constantly resizing quotas. + threads.reserve(2 + allocators_.size()); for (int i = 0; i < 2; i++) threads.push_back(Run(Resizer)); // For each (allocator, pass), start a thread continuously allocating from @@ -166,13 +164,6 @@ class StressTest { // Type alias since we always pass around these shared pointers. using StatePtr = std::shared_ptr; - // Choose one allocator, one quota, rebind the allocator to the quota. - static void Rebinder(StatePtr st) { - auto* allocator = st->RandomAllocator(); - auto* quota = st->RandomQuota(); - allocator->Rebind(quota); - } - // Choose one allocator, resize it to a randomly chosen size. static void Resizer(StatePtr st) { auto* quota = st->RandomQuota(); diff --git a/test/core/resource_quota/memory_quota_test.cc b/test/core/resource_quota/memory_quota_test.cc index 2c25b968cd5..7614d67b2f9 100644 --- a/test/core/resource_quota/memory_quota_test.cc +++ b/test/core/resource_quota/memory_quota_test.cc @@ -101,48 +101,6 @@ TEST(MemoryQuotaTest, CreateSomeObjectsAndExpectReclamation) { EXPECT_EQ(object2.get(), nullptr); } -TEST(MemoryQuotaTest, BasicRebind) { - ExecCtx exec_ctx; - - MemoryQuota memory_quota("foo"); - memory_quota.SetSize(4096); - MemoryQuota memory_quota2("foo2"); - memory_quota2.SetSize(4096); - - auto memory_allocator = memory_quota2.CreateMemoryOwner("bar"); - auto object = memory_allocator.MakeUnique>(); - - memory_allocator.Rebind(&memory_quota); - auto memory_allocator2 = memory_quota2.CreateMemoryOwner("bar2"); - - auto checker1 = CallChecker::Make(); - memory_allocator2.PostReclaimer( - ReclamationPass::kDestructive, - [checker1](absl::optional sweep) { - checker1->Called(); - // Taken memory should be reassigned to - // memory_quota, so this should be cancelled - EXPECT_FALSE(sweep.has_value()); - }); - - auto checker2 = CallChecker::Make(); - memory_allocator.PostReclaimer( - ReclamationPass::kDestructive, - [&object, checker2](absl::optional sweep) { - checker2->Called(); - EXPECT_TRUE(sweep.has_value()); - // The new memory allocator should reclaim - // the object allocated against the previous - // quota because that's now part of this - // quota. - object.reset(); - }); - - auto object2 = memory_allocator.MakeUnique>(); - exec_ctx.Flush(); - EXPECT_EQ(object.get(), nullptr); -} - TEST(MemoryQuotaTest, ReserveRangeNoPressure) { MemoryQuota memory_quota("foo"); auto memory_allocator = memory_quota.CreateMemoryAllocator("bar");