diff --git a/src/core/lib/resource_quota/memory_quota.cc b/src/core/lib/resource_quota/memory_quota.cc index 2c0792d16ab..710ff1820e5 100644 --- a/src/core/lib/resource_quota/memory_quota.cc +++ b/src/core/lib/resource_quota/memory_quota.cc @@ -165,7 +165,7 @@ GrpcMemoryAllocatorImpl::GrpcMemoryAllocatorImpl( GrpcMemoryAllocatorImpl::~GrpcMemoryAllocatorImpl() { GPR_ASSERT(free_bytes_.load(std::memory_order_acquire) + sizeof(GrpcMemoryAllocatorImpl) == - taken_bytes_.load(std::memory_order_relaxed)); + taken_bytes_); memory_quota_->Return(taken_bytes_); } @@ -174,7 +174,7 @@ void GrpcMemoryAllocatorImpl::Shutdown() { OrphanablePtr reclamation_handles[kNumReclamationPasses]; { - MutexLock lock(&reclaimer_mu_); + MutexLock lock(&memory_quota_mu_); GPR_ASSERT(!shutdown_); shutdown_ = true; memory_quota = memory_quota_; @@ -207,11 +207,16 @@ absl::optional GrpcMemoryAllocatorImpl::TryReserve( // Scale the request down according to memory pressure if we have that // flexibility. if (scaled_size_over_min != 0) { - const auto pressure_and_max_recommended_allocation_size = - memory_quota_->InstantaneousPressureAndMaxRecommendedAllocationSize(); - double pressure = pressure_and_max_recommended_allocation_size.first; - size_t max_recommended_allocation_size = - pressure_and_max_recommended_allocation_size.second; + double pressure; + size_t max_recommended_allocation_size; + { + MutexLock lock(&memory_quota_mu_); + const auto pressure_and_max_recommended_allocation_size = + memory_quota_->InstantaneousPressureAndMaxRecommendedAllocationSize(); + pressure = pressure_and_max_recommended_allocation_size.first; + max_recommended_allocation_size = + pressure_and_max_recommended_allocation_size.second; + } // Reduce allocation size proportional to the pressure > 80% usage. if (pressure > 0.8) { scaled_size_over_min = @@ -260,7 +265,9 @@ void GrpcMemoryAllocatorImpl::MaybeDonateBack() { gpr_log(GPR_INFO, "[%p|%s] Early return %" PRIdPTR " bytes", this, name_.c_str(), ret); } - GPR_ASSERT(taken_bytes_.fetch_sub(ret, std::memory_order_relaxed) >= ret); + MutexLock lock(&memory_quota_mu_); + GPR_ASSERT(taken_bytes_ >= ret); + taken_bytes_ -= ret; memory_quota_->Return(ret); return; } @@ -268,26 +275,29 @@ void GrpcMemoryAllocatorImpl::MaybeDonateBack() { } void GrpcMemoryAllocatorImpl::Replenish() { + MutexLock lock(&memory_quota_mu_); + GPR_ASSERT(!shutdown_); // Attempt a fairly low rate exponential growth request size, bounded between // some reasonable limits declared at top of file. - auto amount = Clamp(taken_bytes_.load(std::memory_order_relaxed) / 3, - kMinReplenishBytes, kMaxReplenishBytes); + auto amount = Clamp(taken_bytes_ / 3, kMinReplenishBytes, kMaxReplenishBytes); // Take the requested amount from the quota. memory_quota_->Take(amount); // Record that we've taken it. - taken_bytes_.fetch_add(amount, std::memory_order_relaxed); + taken_bytes_ += amount; // Add the taken amount to the free pool. free_bytes_.fetch_add(amount, std::memory_order_acq_rel); // See if we can add ourselves as a reclaimer. - MaybeRegisterReclaimer(); + MaybeRegisterReclaimerLocked(); } void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimer() { + MutexLock lock(&memory_quota_mu_); + MaybeRegisterReclaimerLocked(); +} + +void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimerLocked() { // If the reclaimer is already registered, then there's nothing to do. - if (registered_reclaimer_.exchange(true, std::memory_order_relaxed)) { - return; - } - MutexLock lock(&reclaimer_mu_); + if (registered_reclaimer_) return; if (shutdown_) return; // Grab references to the things we'll need auto self = shared_from_this(); @@ -298,17 +308,49 @@ void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimer() { auto self = self_weak.lock(); if (self == nullptr) return; auto* p = static_cast(self.get()); - p->registered_reclaimer_.store(false, std::memory_order_relaxed); + MutexLock lock(&p->memory_quota_mu_); + p->registered_reclaimer_ = false; // Figure out how many bytes we can return to the quota. size_t return_bytes = p->free_bytes_.exchange(0, std::memory_order_acq_rel); if (return_bytes == 0) return; // Subtract that from our outstanding balance. - p->taken_bytes_.fetch_sub(return_bytes); + p->taken_bytes_ -= return_bytes; // And return them to the quota. p->memory_quota_->Return(return_bytes); }); } +void GrpcMemoryAllocatorImpl::Rebind( + std::shared_ptr memory_quota) { + MutexLock lock(&memory_quota_mu_); + GPR_ASSERT(!shutdown_); + if (memory_quota_ == memory_quota) return; + // Return memory to the original memory quota. + memory_quota_->Return(taken_bytes_); + // Reassign any queued reclaimers + for (size_t i = 0; i < kNumReclamationPasses; i++) { + if (reclamation_handles_[i] != nullptr) { + reclamation_handles_[i]->Requeue(memory_quota->reclaimer_queue(i)); + } + } + // Switch to the new memory quota, leaving the old one in memory_quota so that + // when we unref it, we are outside of lock. + memory_quota_.swap(memory_quota); + // Drop our freed memory down to zero, to avoid needing to ask the new + // quota for memory we're not currently using. + taken_bytes_ -= free_bytes_.exchange(0, std::memory_order_acq_rel); + // And let the new quota know how much we're already using. + memory_quota_->Take(taken_bytes_); +} + +// +// MemoryOwner +// + +void MemoryOwner::Rebind(MemoryQuota* quota) { + impl()->Rebind(quota->memory_quota_); +} + // // BasicMemoryQuota // diff --git a/src/core/lib/resource_quota/memory_quota.h b/src/core/lib/resource_quota/memory_quota.h index 06741a0d803..a74b813c26c 100644 --- a/src/core/lib/resource_quota/memory_quota.h +++ b/src/core/lib/resource_quota/memory_quota.h @@ -288,6 +288,11 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl { std::shared_ptr memory_quota, std::string name); ~GrpcMemoryAllocatorImpl() override; + // Rebind - Swaps the underlying quota for this allocator, taking care to + // make sure memory allocated is moved to allocations against the new quota. + void Rebind(std::shared_ptr memory_quota) + ABSL_LOCKS_EXCLUDED(memory_quota_mu_); + // Reserve bytes from the quota. // If we enter overcommit, reclamation will begin concurrently. // Returns the number of bytes reserved. @@ -310,7 +315,7 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl { // Post a reclamation function. template void PostReclaimer(ReclamationPass pass, F fn) { - MutexLock lock(&reclaimer_mu_); + MutexLock lock(&memory_quota_mu_); GPR_ASSERT(!shutdown_); InsertReclaimer(static_cast(pass), std::move(fn)); } @@ -320,6 +325,7 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl { // Read the instantaneous memory pressure double InstantaneousPressure() const { + MutexLock lock(&memory_quota_mu_); return memory_quota_->InstantaneousPressureAndMaxRecommendedAllocationSize() .first; } @@ -338,34 +344,39 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl { void MaybeDonateBack(); // Replenish bytes from the quota, without blocking, possibly entering // overcommit. - void Replenish(); + void Replenish() ABSL_LOCKS_EXCLUDED(memory_quota_mu_); // If we have not already, register a reclamation function against the quota // to sweep any free memory back to that quota. - void MaybeRegisterReclaimer() ABSL_LOCKS_EXCLUDED(reclaimer_mu_); + void MaybeRegisterReclaimer() ABSL_LOCKS_EXCLUDED(memory_quota_mu_); + void MaybeRegisterReclaimerLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(memory_quota_mu_); template void InsertReclaimer(size_t pass, F fn) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(reclaimer_mu_) { + ABSL_EXCLUSIVE_LOCKS_REQUIRED(memory_quota_mu_) { reclamation_handles_[pass] = memory_quota_->reclaimer_queue(pass)->Insert(std::move(fn)); } - // Backing resource quota. - const std::shared_ptr memory_quota_; // Amount of memory this allocator has cached for its own use: to avoid quota // contention, each MemoryAllocator can keep some memory in addition to what // it is immediately using, and the quota can pull it back under memory // pressure. std::atomic free_bytes_{0}; + // Mutex guarding the backing resource quota. + mutable Mutex memory_quota_mu_; + // Backing resource quota. + std::shared_ptr memory_quota_ + ABSL_GUARDED_BY(memory_quota_mu_); // Amount of memory taken from the quota by this allocator. - std::atomic taken_bytes_{sizeof(GrpcMemoryAllocatorImpl)}; - std::atomic registered_reclaimer_; - Mutex reclaimer_mu_; - bool shutdown_ ABSL_GUARDED_BY(reclaimer_mu_) = false; + size_t taken_bytes_ ABSL_GUARDED_BY(memory_quota_mu_) = + sizeof(GrpcMemoryAllocatorImpl); + bool shutdown_ ABSL_GUARDED_BY(memory_quota_mu_) = false; + bool registered_reclaimer_ ABSL_GUARDED_BY(memory_quota_mu_) = false; // Indices into the various reclaimer queues, used so that we can cancel // reclamation should we shutdown or get rebound. OrphanablePtr reclamation_handles_[kNumReclamationPasses] ABSL_GUARDED_BY( - reclaimer_mu_); + memory_quota_mu_); // Name of this allocator. std::string name_; }; @@ -391,6 +402,9 @@ class MemoryOwner final : public MemoryAllocator { impl()->PostReclaimer(pass, std::move(fn)); } + // Rebind to a different quota. + void Rebind(MemoryQuota* quota); + // Instantaneous memory pressure in the underlying quota. double InstantaneousPressure() const { return impl()->InstantaneousPressure(); diff --git a/test/core/resource_quota/memory_quota_fuzzer.cc b/test/core/resource_quota/memory_quota_fuzzer.cc index b155aa1bd1b..c4600d9288c 100644 --- a/test/core/resource_quota/memory_quota_fuzzer.cc +++ b/test/core/resource_quota/memory_quota_fuzzer.cc @@ -85,6 +85,12 @@ class Fuzzer { uint64_t{std::numeric_limits::max()})); }); break; + case memory_quota_fuzzer::Action::kRebindQuota: + WithQuota(action.quota(), [this, action](MemoryQuota* q) { + WithAllocator(action.allocator(), + [q](MemoryOwner* a) { a->Rebind(q); }); + }); + break; case memory_quota_fuzzer::Action::kCreateAllocation: { auto min = action.create_allocation().min(); auto max = action.create_allocation().max(); diff --git a/test/core/resource_quota/memory_quota_fuzzer.proto b/test/core/resource_quota/memory_quota_fuzzer.proto index b71fc4094df..63d8a2e08e2 100644 --- a/test/core/resource_quota/memory_quota_fuzzer.proto +++ b/test/core/resource_quota/memory_quota_fuzzer.proto @@ -35,7 +35,6 @@ message AllocationRequest { } message Action { - reserved 15; int32 quota = 1; int32 allocator = 2; int32 allocation = 3; @@ -46,6 +45,7 @@ message Action { Empty create_allocator = 12; Empty delete_allocator = 13; uint64 set_quota_size = 14; + Empty rebind_quota = 15; AllocationRequest create_allocation = 16; Empty delete_allocation = 17; Reclaimer post_reclaimer = 18; diff --git a/test/core/resource_quota/memory_quota_stress_test.cc b/test/core/resource_quota/memory_quota_stress_test.cc index 2ec52d8a49f..0c8b3075091 100644 --- a/test/core/resource_quota/memory_quota_stress_test.cc +++ b/test/core/resource_quota/memory_quota_stress_test.cc @@ -40,8 +40,10 @@ class StressTest { void Run(int seconds) { std::vector threads; + // A few threads constantly rebinding allocators to different quotas. + threads.reserve(2 + 2 + 3 * allocators_.size()); + for (int i = 0; i < 2; i++) threads.push_back(Run(Rebinder)); // And another few threads constantly resizing quotas. - threads.reserve(2 + allocators_.size()); for (int i = 0; i < 2; i++) threads.push_back(Run(Resizer)); // For each (allocator, pass), start a thread continuously allocating from @@ -164,6 +166,13 @@ class StressTest { // Type alias since we always pass around these shared pointers. using StatePtr = std::shared_ptr; + // Choose one allocator, one quota, rebind the allocator to the quota. + static void Rebinder(StatePtr st) { + auto* allocator = st->RandomAllocator(); + auto* quota = st->RandomQuota(); + allocator->Rebind(quota); + } + // Choose one allocator, resize it to a randomly chosen size. static void Resizer(StatePtr st) { auto* quota = st->RandomQuota(); diff --git a/test/core/resource_quota/memory_quota_test.cc b/test/core/resource_quota/memory_quota_test.cc index 7614d67b2f9..2c25b968cd5 100644 --- a/test/core/resource_quota/memory_quota_test.cc +++ b/test/core/resource_quota/memory_quota_test.cc @@ -101,6 +101,48 @@ TEST(MemoryQuotaTest, CreateSomeObjectsAndExpectReclamation) { EXPECT_EQ(object2.get(), nullptr); } +TEST(MemoryQuotaTest, BasicRebind) { + ExecCtx exec_ctx; + + MemoryQuota memory_quota("foo"); + memory_quota.SetSize(4096); + MemoryQuota memory_quota2("foo2"); + memory_quota2.SetSize(4096); + + auto memory_allocator = memory_quota2.CreateMemoryOwner("bar"); + auto object = memory_allocator.MakeUnique>(); + + memory_allocator.Rebind(&memory_quota); + auto memory_allocator2 = memory_quota2.CreateMemoryOwner("bar2"); + + auto checker1 = CallChecker::Make(); + memory_allocator2.PostReclaimer( + ReclamationPass::kDestructive, + [checker1](absl::optional sweep) { + checker1->Called(); + // Taken memory should be reassigned to + // memory_quota, so this should be cancelled + EXPECT_FALSE(sweep.has_value()); + }); + + auto checker2 = CallChecker::Make(); + memory_allocator.PostReclaimer( + ReclamationPass::kDestructive, + [&object, checker2](absl::optional sweep) { + checker2->Called(); + EXPECT_TRUE(sweep.has_value()); + // The new memory allocator should reclaim + // the object allocated against the previous + // quota because that's now part of this + // quota. + object.reset(); + }); + + auto object2 = memory_allocator.MakeUnique>(); + exec_ctx.Flush(); + EXPECT_EQ(object.get(), nullptr); +} + TEST(MemoryQuotaTest, ReserveRangeNoPressure) { MemoryQuota memory_quota("foo"); auto memory_allocator = memory_quota.CreateMemoryAllocator("bar");