|
|
|
@ -159,6 +159,7 @@ GrpcMemoryAllocatorImpl::GrpcMemoryAllocatorImpl( |
|
|
|
|
std::shared_ptr<BasicMemoryQuota> memory_quota, std::string name) |
|
|
|
|
: memory_quota_(memory_quota), name_(std::move(name)) { |
|
|
|
|
memory_quota_->Take(taken_bytes_); |
|
|
|
|
memory_quota_->AddNewAllocator(this); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GrpcMemoryAllocatorImpl::~GrpcMemoryAllocatorImpl() { |
|
|
|
@ -166,6 +167,7 @@ GrpcMemoryAllocatorImpl::~GrpcMemoryAllocatorImpl() { |
|
|
|
|
sizeof(GrpcMemoryAllocatorImpl) == |
|
|
|
|
taken_bytes_.load(std::memory_order_relaxed)); |
|
|
|
|
memory_quota_->Return(taken_bytes_); |
|
|
|
|
memory_quota_->RemoveAllocator(this); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcMemoryAllocatorImpl::Shutdown() { |
|
|
|
@ -188,12 +190,17 @@ size_t GrpcMemoryAllocatorImpl::Reserve(MemoryRequest request) { |
|
|
|
|
// inlined asserts.
|
|
|
|
|
GPR_ASSERT(request.min() <= request.max()); |
|
|
|
|
GPR_ASSERT(request.max() <= MemoryRequest::max_allowed_size()); |
|
|
|
|
size_t old_free = free_bytes_.load(std::memory_order_relaxed); |
|
|
|
|
|
|
|
|
|
while (true) { |
|
|
|
|
// Attempt to reserve memory from our pool.
|
|
|
|
|
auto reservation = TryReserve(request); |
|
|
|
|
if (reservation.has_value()) { |
|
|
|
|
size_t new_free = free_bytes_.load(std::memory_order_relaxed); |
|
|
|
|
memory_quota_->MaybeMoveAllocator(this, old_free, new_free); |
|
|
|
|
return *reservation; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// If that failed, grab more from the quota and retry.
|
|
|
|
|
Replenish(); |
|
|
|
|
} |
|
|
|
@ -304,6 +311,8 @@ void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimer() { |
|
|
|
|
// Figure out how many bytes we can return to the quota.
|
|
|
|
|
size_t return_bytes = p->free_bytes_.exchange(0, std::memory_order_acq_rel); |
|
|
|
|
if (return_bytes == 0) return; |
|
|
|
|
p->memory_quota_->MaybeMoveAllocator(p, /*old_free_bytes=*/return_bytes, |
|
|
|
|
/*new_free_bytes=*/0); |
|
|
|
|
// Subtract that from our outstanding balance.
|
|
|
|
|
p->taken_bytes_.fetch_sub(return_bytes); |
|
|
|
|
// And return them to the quota.
|
|
|
|
@ -424,6 +433,21 @@ void BasicMemoryQuota::Take(size_t amount) { |
|
|
|
|
if (prior >= 0 && prior < static_cast<intptr_t>(amount)) { |
|
|
|
|
if (reclaimer_activity_ != nullptr) reclaimer_activity_->ForceWakeup(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (IsFreeLargeAllocatorEnabled()) { |
|
|
|
|
GrpcMemoryAllocatorImpl* chosen_allocator = nullptr; |
|
|
|
|
int shard_idx = rand() % big_allocators_.shards.size(); |
|
|
|
|
auto& shard = big_allocators_.shards[shard_idx]; |
|
|
|
|
|
|
|
|
|
if (shard.shard_mu.TryLock()) { |
|
|
|
|
if (!shard.allocators.empty()) { |
|
|
|
|
chosen_allocator = *shard.allocators.begin(); |
|
|
|
|
} |
|
|
|
|
shard.shard_mu.Unlock(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (chosen_allocator != nullptr) chosen_allocator->ReturnFree(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void BasicMemoryQuota::FinishReclamation(uint64_t token, Waker waker) { |
|
|
|
@ -448,6 +472,107 @@ void BasicMemoryQuota::Return(size_t amount) { |
|
|
|
|
free_bytes_.fetch_add(amount, std::memory_order_relaxed); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void BasicMemoryQuota::AddNewAllocator(GrpcMemoryAllocatorImpl* allocator) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "Adding allocator %p", allocator); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
AllocatorBucket::Shard& shard = small_allocators_.SelectShard(allocator); |
|
|
|
|
|
|
|
|
|
{ |
|
|
|
|
absl::MutexLock l(&shard.shard_mu); |
|
|
|
|
shard.allocators.emplace(allocator); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void BasicMemoryQuota::RemoveAllocator(GrpcMemoryAllocatorImpl* allocator) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "Removing allocator %p", allocator); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
AllocatorBucket::Shard& small_shard = |
|
|
|
|
small_allocators_.SelectShard(allocator); |
|
|
|
|
|
|
|
|
|
{ |
|
|
|
|
absl::MutexLock l(&small_shard.shard_mu); |
|
|
|
|
if (small_shard.allocators.erase(allocator) == 1) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
AllocatorBucket::Shard& big_shard = big_allocators_.SelectShard(allocator); |
|
|
|
|
|
|
|
|
|
{ |
|
|
|
|
absl::MutexLock l(&big_shard.shard_mu); |
|
|
|
|
big_shard.allocators.erase(allocator); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void BasicMemoryQuota::MaybeMoveAllocator(GrpcMemoryAllocatorImpl* allocator, |
|
|
|
|
size_t old_free_bytes, |
|
|
|
|
size_t new_free_bytes) { |
|
|
|
|
while (true) { |
|
|
|
|
if (new_free_bytes < kSmallAllocatorThreshold) { |
|
|
|
|
// Still in small bucket. No move.
|
|
|
|
|
if (old_free_bytes < kSmallAllocatorThreshold) return; |
|
|
|
|
MaybeMoveAllocatorBigToSmall(allocator); |
|
|
|
|
} else if (new_free_bytes > kBigAllocatorThreshold) { |
|
|
|
|
// Still in big bucket. No move.
|
|
|
|
|
if (old_free_bytes > kBigAllocatorThreshold) return; |
|
|
|
|
MaybeMoveAllocatorSmallToBig(allocator); |
|
|
|
|
} else { |
|
|
|
|
// Somewhere between thresholds. No move.
|
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Loop to make sure move is eventually stable.
|
|
|
|
|
old_free_bytes = new_free_bytes; |
|
|
|
|
new_free_bytes = allocator->GetFreeBytes(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void BasicMemoryQuota::MaybeMoveAllocatorBigToSmall( |
|
|
|
|
GrpcMemoryAllocatorImpl* allocator) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "Moving allocator %p to small", allocator); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
AllocatorBucket::Shard& old_shard = big_allocators_.SelectShard(allocator); |
|
|
|
|
|
|
|
|
|
{ |
|
|
|
|
absl::MutexLock l(&old_shard.shard_mu); |
|
|
|
|
if (old_shard.allocators.erase(allocator) == 0) return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
AllocatorBucket::Shard& new_shard = small_allocators_.SelectShard(allocator); |
|
|
|
|
|
|
|
|
|
{ |
|
|
|
|
absl::MutexLock l(&new_shard.shard_mu); |
|
|
|
|
new_shard.allocators.emplace(allocator); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void BasicMemoryQuota::MaybeMoveAllocatorSmallToBig( |
|
|
|
|
GrpcMemoryAllocatorImpl* allocator) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "Moving allocator %p to big", allocator); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
AllocatorBucket::Shard& old_shard = small_allocators_.SelectShard(allocator); |
|
|
|
|
|
|
|
|
|
{ |
|
|
|
|
absl::MutexLock l(&old_shard.shard_mu); |
|
|
|
|
if (old_shard.allocators.erase(allocator) == 0) return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
AllocatorBucket::Shard& new_shard = big_allocators_.SelectShard(allocator); |
|
|
|
|
|
|
|
|
|
{ |
|
|
|
|
absl::MutexLock l(&new_shard.shard_mu); |
|
|
|
|
new_shard.allocators.emplace(allocator); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
BasicMemoryQuota::PressureInfo BasicMemoryQuota::GetPressureInfo() { |
|
|
|
|
double free = free_bytes_.load(); |
|
|
|
|
if (free < 0) free = 0; |
|
|
|
|