Revert "[resource-quota] Remove rebind (#29851)" (#29896)

This reverts commit ba308b46ed.
pull/29899/head
Craig Tiller 3 years ago committed by GitHub
parent 02af4c6b71
commit ad4a387403
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 78
      src/core/lib/resource_quota/memory_quota.cc
  2. 36
      src/core/lib/resource_quota/memory_quota.h
  3. 6
      test/core/resource_quota/memory_quota_fuzzer.cc
  4. 2
      test/core/resource_quota/memory_quota_fuzzer.proto
  5. 11
      test/core/resource_quota/memory_quota_stress_test.cc
  6. 42
      test/core/resource_quota/memory_quota_test.cc

@ -165,7 +165,7 @@ GrpcMemoryAllocatorImpl::GrpcMemoryAllocatorImpl(
GrpcMemoryAllocatorImpl::~GrpcMemoryAllocatorImpl() {
GPR_ASSERT(free_bytes_.load(std::memory_order_acquire) +
sizeof(GrpcMemoryAllocatorImpl) ==
taken_bytes_.load(std::memory_order_relaxed));
taken_bytes_);
memory_quota_->Return(taken_bytes_);
}
@ -174,7 +174,7 @@ void GrpcMemoryAllocatorImpl::Shutdown() {
OrphanablePtr<ReclaimerQueue::Handle>
reclamation_handles[kNumReclamationPasses];
{
MutexLock lock(&reclaimer_mu_);
MutexLock lock(&memory_quota_mu_);
GPR_ASSERT(!shutdown_);
shutdown_ = true;
memory_quota = memory_quota_;
@ -207,11 +207,16 @@ absl::optional<size_t> GrpcMemoryAllocatorImpl::TryReserve(
// Scale the request down according to memory pressure if we have that
// flexibility.
if (scaled_size_over_min != 0) {
const auto pressure_and_max_recommended_allocation_size =
memory_quota_->InstantaneousPressureAndMaxRecommendedAllocationSize();
double pressure = pressure_and_max_recommended_allocation_size.first;
size_t max_recommended_allocation_size =
pressure_and_max_recommended_allocation_size.second;
double pressure;
size_t max_recommended_allocation_size;
{
MutexLock lock(&memory_quota_mu_);
const auto pressure_and_max_recommended_allocation_size =
memory_quota_->InstantaneousPressureAndMaxRecommendedAllocationSize();
pressure = pressure_and_max_recommended_allocation_size.first;
max_recommended_allocation_size =
pressure_and_max_recommended_allocation_size.second;
}
// Reduce allocation size proportional to the pressure > 80% usage.
if (pressure > 0.8) {
scaled_size_over_min =
@ -260,7 +265,9 @@ void GrpcMemoryAllocatorImpl::MaybeDonateBack() {
gpr_log(GPR_INFO, "[%p|%s] Early return %" PRIdPTR " bytes", this,
name_.c_str(), ret);
}
GPR_ASSERT(taken_bytes_.fetch_sub(ret, std::memory_order_relaxed) >= ret);
MutexLock lock(&memory_quota_mu_);
GPR_ASSERT(taken_bytes_ >= ret);
taken_bytes_ -= ret;
memory_quota_->Return(ret);
return;
}
@ -268,26 +275,29 @@ void GrpcMemoryAllocatorImpl::MaybeDonateBack() {
}
void GrpcMemoryAllocatorImpl::Replenish() {
MutexLock lock(&memory_quota_mu_);
GPR_ASSERT(!shutdown_);
// Attempt a fairly low rate exponential growth request size, bounded between
// some reasonable limits declared at top of file.
auto amount = Clamp(taken_bytes_.load(std::memory_order_relaxed) / 3,
kMinReplenishBytes, kMaxReplenishBytes);
auto amount = Clamp(taken_bytes_ / 3, kMinReplenishBytes, kMaxReplenishBytes);
// Take the requested amount from the quota.
memory_quota_->Take(amount);
// Record that we've taken it.
taken_bytes_.fetch_add(amount, std::memory_order_relaxed);
taken_bytes_ += amount;
// Add the taken amount to the free pool.
free_bytes_.fetch_add(amount, std::memory_order_acq_rel);
// See if we can add ourselves as a reclaimer.
MaybeRegisterReclaimer();
MaybeRegisterReclaimerLocked();
}
void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimer() {
MutexLock lock(&memory_quota_mu_);
MaybeRegisterReclaimerLocked();
}
void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimerLocked() {
// If the reclaimer is already registered, then there's nothing to do.
if (registered_reclaimer_.exchange(true, std::memory_order_relaxed)) {
return;
}
MutexLock lock(&reclaimer_mu_);
if (registered_reclaimer_) return;
if (shutdown_) return;
// Grab references to the things we'll need
auto self = shared_from_this();
@ -298,17 +308,49 @@ void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimer() {
auto self = self_weak.lock();
if (self == nullptr) return;
auto* p = static_cast<GrpcMemoryAllocatorImpl*>(self.get());
p->registered_reclaimer_.store(false, std::memory_order_relaxed);
MutexLock lock(&p->memory_quota_mu_);
p->registered_reclaimer_ = false;
// Figure out how many bytes we can return to the quota.
size_t return_bytes = p->free_bytes_.exchange(0, std::memory_order_acq_rel);
if (return_bytes == 0) return;
// Subtract that from our outstanding balance.
p->taken_bytes_.fetch_sub(return_bytes);
p->taken_bytes_ -= return_bytes;
// And return them to the quota.
p->memory_quota_->Return(return_bytes);
});
}
void GrpcMemoryAllocatorImpl::Rebind(
std::shared_ptr<BasicMemoryQuota> memory_quota) {
MutexLock lock(&memory_quota_mu_);
GPR_ASSERT(!shutdown_);
if (memory_quota_ == memory_quota) return;
// Return memory to the original memory quota.
memory_quota_->Return(taken_bytes_);
// Reassign any queued reclaimers
for (size_t i = 0; i < kNumReclamationPasses; i++) {
if (reclamation_handles_[i] != nullptr) {
reclamation_handles_[i]->Requeue(memory_quota->reclaimer_queue(i));
}
}
// Switch to the new memory quota, leaving the old one in memory_quota so that
// when we unref it, we are outside of lock.
memory_quota_.swap(memory_quota);
// Drop our freed memory down to zero, to avoid needing to ask the new
// quota for memory we're not currently using.
taken_bytes_ -= free_bytes_.exchange(0, std::memory_order_acq_rel);
// And let the new quota know how much we're already using.
memory_quota_->Take(taken_bytes_);
}
//
// MemoryOwner
//
void MemoryOwner::Rebind(MemoryQuota* quota) {
impl()->Rebind(quota->memory_quota_);
}
//
// BasicMemoryQuota
//

@ -288,6 +288,11 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl {
std::shared_ptr<BasicMemoryQuota> memory_quota, std::string name);
~GrpcMemoryAllocatorImpl() override;
// Rebind - Swaps the underlying quota for this allocator, taking care to
// make sure memory allocated is moved to allocations against the new quota.
void Rebind(std::shared_ptr<BasicMemoryQuota> memory_quota)
ABSL_LOCKS_EXCLUDED(memory_quota_mu_);
// Reserve bytes from the quota.
// If we enter overcommit, reclamation will begin concurrently.
// Returns the number of bytes reserved.
@ -310,7 +315,7 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl {
// Post a reclamation function.
template <typename F>
void PostReclaimer(ReclamationPass pass, F fn) {
MutexLock lock(&reclaimer_mu_);
MutexLock lock(&memory_quota_mu_);
GPR_ASSERT(!shutdown_);
InsertReclaimer(static_cast<size_t>(pass), std::move(fn));
}
@ -320,6 +325,7 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl {
// Read the instantaneous memory pressure
double InstantaneousPressure() const {
MutexLock lock(&memory_quota_mu_);
return memory_quota_->InstantaneousPressureAndMaxRecommendedAllocationSize()
.first;
}
@ -338,34 +344,39 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl {
void MaybeDonateBack();
// Replenish bytes from the quota, without blocking, possibly entering
// overcommit.
void Replenish();
void Replenish() ABSL_LOCKS_EXCLUDED(memory_quota_mu_);
// If we have not already, register a reclamation function against the quota
// to sweep any free memory back to that quota.
void MaybeRegisterReclaimer() ABSL_LOCKS_EXCLUDED(reclaimer_mu_);
void MaybeRegisterReclaimer() ABSL_LOCKS_EXCLUDED(memory_quota_mu_);
void MaybeRegisterReclaimerLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(memory_quota_mu_);
template <typename F>
void InsertReclaimer(size_t pass, F fn)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(reclaimer_mu_) {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(memory_quota_mu_) {
reclamation_handles_[pass] =
memory_quota_->reclaimer_queue(pass)->Insert(std::move(fn));
}
// Backing resource quota.
const std::shared_ptr<BasicMemoryQuota> memory_quota_;
// Amount of memory this allocator has cached for its own use: to avoid quota
// contention, each MemoryAllocator can keep some memory in addition to what
// it is immediately using, and the quota can pull it back under memory
// pressure.
std::atomic<size_t> free_bytes_{0};
// Mutex guarding the backing resource quota.
mutable Mutex memory_quota_mu_;
// Backing resource quota.
std::shared_ptr<BasicMemoryQuota> memory_quota_
ABSL_GUARDED_BY(memory_quota_mu_);
// Amount of memory taken from the quota by this allocator.
std::atomic<size_t> taken_bytes_{sizeof(GrpcMemoryAllocatorImpl)};
std::atomic<bool> registered_reclaimer_;
Mutex reclaimer_mu_;
bool shutdown_ ABSL_GUARDED_BY(reclaimer_mu_) = false;
size_t taken_bytes_ ABSL_GUARDED_BY(memory_quota_mu_) =
sizeof(GrpcMemoryAllocatorImpl);
bool shutdown_ ABSL_GUARDED_BY(memory_quota_mu_) = false;
bool registered_reclaimer_ ABSL_GUARDED_BY(memory_quota_mu_) = false;
// Indices into the various reclaimer queues, used so that we can cancel
// reclamation should we shutdown or get rebound.
OrphanablePtr<ReclaimerQueue::Handle>
reclamation_handles_[kNumReclamationPasses] ABSL_GUARDED_BY(
reclaimer_mu_);
memory_quota_mu_);
// Name of this allocator.
std::string name_;
};
@ -391,6 +402,9 @@ class MemoryOwner final : public MemoryAllocator {
impl()->PostReclaimer(pass, std::move(fn));
}
// Rebind to a different quota.
void Rebind(MemoryQuota* quota);
// Instantaneous memory pressure in the underlying quota.
double InstantaneousPressure() const {
return impl()->InstantaneousPressure();

@ -85,6 +85,12 @@ class Fuzzer {
uint64_t{std::numeric_limits<ssize_t>::max()}));
});
break;
case memory_quota_fuzzer::Action::kRebindQuota:
WithQuota(action.quota(), [this, action](MemoryQuota* q) {
WithAllocator(action.allocator(),
[q](MemoryOwner* a) { a->Rebind(q); });
});
break;
case memory_quota_fuzzer::Action::kCreateAllocation: {
auto min = action.create_allocation().min();
auto max = action.create_allocation().max();

@ -35,7 +35,6 @@ message AllocationRequest {
}
message Action {
reserved 15;
int32 quota = 1;
int32 allocator = 2;
int32 allocation = 3;
@ -46,6 +45,7 @@ message Action {
Empty create_allocator = 12;
Empty delete_allocator = 13;
uint64 set_quota_size = 14;
Empty rebind_quota = 15;
AllocationRequest create_allocation = 16;
Empty delete_allocation = 17;
Reclaimer post_reclaimer = 18;

@ -40,8 +40,10 @@ class StressTest {
void Run(int seconds) {
std::vector<std::thread> threads;
// A few threads constantly rebinding allocators to different quotas.
threads.reserve(2 + 2 + 3 * allocators_.size());
for (int i = 0; i < 2; i++) threads.push_back(Run(Rebinder));
// And another few threads constantly resizing quotas.
threads.reserve(2 + allocators_.size());
for (int i = 0; i < 2; i++) threads.push_back(Run(Resizer));
// For each (allocator, pass), start a thread continuously allocating from
@ -164,6 +166,13 @@ class StressTest {
// Type alias since we always pass around these shared pointers.
using StatePtr = std::shared_ptr<State>;
// Choose one allocator, one quota, rebind the allocator to the quota.
static void Rebinder(StatePtr st) {
auto* allocator = st->RandomAllocator();
auto* quota = st->RandomQuota();
allocator->Rebind(quota);
}
// Choose one allocator, resize it to a randomly chosen size.
static void Resizer(StatePtr st) {
auto* quota = st->RandomQuota();

@ -101,6 +101,48 @@ TEST(MemoryQuotaTest, CreateSomeObjectsAndExpectReclamation) {
EXPECT_EQ(object2.get(), nullptr);
}
TEST(MemoryQuotaTest, BasicRebind) {
ExecCtx exec_ctx;
MemoryQuota memory_quota("foo");
memory_quota.SetSize(4096);
MemoryQuota memory_quota2("foo2");
memory_quota2.SetSize(4096);
auto memory_allocator = memory_quota2.CreateMemoryOwner("bar");
auto object = memory_allocator.MakeUnique<Sized<2048>>();
memory_allocator.Rebind(&memory_quota);
auto memory_allocator2 = memory_quota2.CreateMemoryOwner("bar2");
auto checker1 = CallChecker::Make();
memory_allocator2.PostReclaimer(
ReclamationPass::kDestructive,
[checker1](absl::optional<ReclamationSweep> sweep) {
checker1->Called();
// Taken memory should be reassigned to
// memory_quota, so this should be cancelled
EXPECT_FALSE(sweep.has_value());
});
auto checker2 = CallChecker::Make();
memory_allocator.PostReclaimer(
ReclamationPass::kDestructive,
[&object, checker2](absl::optional<ReclamationSweep> sweep) {
checker2->Called();
EXPECT_TRUE(sweep.has_value());
// The new memory allocator should reclaim
// the object allocated against the previous
// quota because that's now part of this
// quota.
object.reset();
});
auto object2 = memory_allocator.MakeUnique<Sized<2048>>();
exec_ctx.Flush();
EXPECT_EQ(object.get(), nullptr);
}
TEST(MemoryQuotaTest, ReserveRangeNoPressure) {
MemoryQuota memory_quota("foo");
auto memory_allocator = memory_quota.CreateMemoryAllocator("bar");

Loading…
Cancel
Save