Add global structure of allocators + experiment (#31807)

* Global allocator experiment

* Fix formatting and deps

* Update experiments files formatting

* Small fixes

* Change uint formatting

* Automated change: Fix sanity tests

* Fix formatting

* Address review comments

* Clean up logging

* Automated change: Fix sanity tests

* Fix clangtidy

* remove duplicate header

* address review

* Add more info to error

* Fix tests

* Global allocator experiment

* Fix formatting and deps

* Update experiments files formatting

* Small fixes

* Change uint formatting

* Fix formatting

* Address review comments

* Clean up logging

* Automated change: Fix sanity tests

* Fix clangtidy

* remove duplicate header

* address review

* Updated experiments

* update formatting

* resolve conflict

* Address review comments

* Remove use of rand

* Automated change: Fix sanity tests

* Fix test errors

Co-authored-by: ananda1066 <ananda1066@users.noreply.github.com>
pull/31728/head^2
Alisha Nanda 2 years ago committed by GitHub
parent 0356ff3a72
commit 221d6b58d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      CMakeLists.txt
  2. 1
      bazel/experiments.bzl
  3. 6
      build_autogenerated.yaml
  4. 1
      src/core/BUILD
  5. 3
      src/core/lib/experiments/experiments.cc
  6. 3
      src/core/lib/experiments/experiments.h
  7. 8
      src/core/lib/experiments/experiments.yaml
  8. 143
      src/core/lib/resource_quota/memory_quota.cc
  9. 72
      src/core/lib/resource_quota/memory_quota.h

6
CMakeLists.txt generated

@ -8105,6 +8105,7 @@ target_link_libraries(chunked_vector_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
absl::any_invocable
absl::function_ref
absl::hash
@ -10831,6 +10832,7 @@ target_link_libraries(flow_control_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
absl::any_invocable
absl::function_ref
absl::hash
@ -10899,6 +10901,7 @@ target_link_libraries(for_each_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
absl::any_invocable
absl::function_ref
absl::hash
@ -14416,6 +14419,7 @@ target_link_libraries(map_pipe_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
absl::any_invocable
absl::function_ref
absl::hash
@ -15886,6 +15890,7 @@ target_link_libraries(pipe_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
absl::any_invocable
absl::function_ref
absl::hash
@ -21160,6 +21165,7 @@ target_link_libraries(try_concurrently_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
absl::any_invocable
absl::function_ref
absl::hash

@ -45,6 +45,7 @@ EXPERIMENTS = {
"promise_based_client_call",
],
"resource_quota_test": [
"free_large_allocator",
"memory_pressure_controller",
"unconstrained_max_quota_buffer_size",
],

@ -5642,6 +5642,7 @@ targets:
- src/core/lib/slice/slice_string_helpers.cc
- test/core/gprpp/chunked_vector_test.cc
deps:
- absl/container:flat_hash_set
- absl/functional:any_invocable
- absl/functional:function_ref
- absl/hash:hash
@ -6915,6 +6916,7 @@ targets:
- src/core/lib/transport/pid_controller.cc
- test/core/transport/chttp2/flow_control_test.cc
deps:
- absl/container:flat_hash_set
- absl/functional:any_invocable
- absl/functional:function_ref
- absl/hash:hash
@ -7009,6 +7011,7 @@ targets:
- src/core/lib/slice/slice_string_helpers.cc
- test/core/promise/for_each_test.cc
deps:
- absl/container:flat_hash_set
- absl/functional:any_invocable
- absl/functional:function_ref
- absl/hash:hash
@ -8949,6 +8952,7 @@ targets:
- src/core/lib/slice/slice_string_helpers.cc
- test/core/promise/map_pipe_test.cc
deps:
- absl/container:flat_hash_set
- absl/functional:any_invocable
- absl/functional:function_ref
- absl/hash:hash
@ -9606,6 +9610,7 @@ targets:
- src/core/lib/slice/slice_string_helpers.cc
- test/core/promise/pipe_test.cc
deps:
- absl/container:flat_hash_set
- absl/functional:any_invocable
- absl/functional:function_ref
- absl/hash:hash
@ -11951,6 +11956,7 @@ targets:
- src/core/lib/slice/slice_string_helpers.cc
- test/core/promise/try_concurrently_test.cc
deps:
- absl/container:flat_hash_set
- absl/functional:any_invocable
- absl/functional:function_ref
- absl/hash:hash

@ -946,6 +946,7 @@ grpc_cc_library(
],
external_deps = [
"absl/base:core_headers",
"absl/container:flat_hash_set",
"absl/status",
"absl/strings",
"absl/types:optional",

@ -51,6 +51,8 @@ const char* const description_promise_based_client_call =
"(ie when all filters in a stack are promise based)";
const char* const description_posix_event_engine_enable_polling =
"If set, enables polling on the default posix event engine.";
const char* const description_free_large_allocator =
"If set, return all free bytes from a \042big\042 allocator";
#ifdef NDEBUG
const bool kDefaultForDebugOnly = false;
#else
@ -77,6 +79,7 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"promise_based_client_call", description_promise_based_client_call, false},
{"posix_event_engine_enable_polling",
description_posix_event_engine_enable_polling, true},
{"free_large_allocator", description_free_large_allocator, false},
};
} // namespace grpc_core

@ -45,6 +45,7 @@ inline bool IsPromiseBasedClientCallEnabled() {
inline bool IsPosixEventEngineEnablePollingEnabled() {
return IsExperimentEnabled(11);
}
inline bool IsFreeLargeAllocatorEnabled() { return IsExperimentEnabled(12); }
struct ExperimentMetadata {
const char* name;
@ -52,7 +53,7 @@ struct ExperimentMetadata {
bool default_value;
};
constexpr const size_t kNumExperiments = 12;
constexpr const size_t kNumExperiments = 13;
extern const ExperimentMetadata g_experiment_metadata[kNumExperiments];
} // namespace grpc_core

@ -127,3 +127,11 @@
expiry: 2023/01/01
owner: vigneshbabu@google.com
test_tags: ["event_engine_client_test"]
- name: free_large_allocator
description:
If set, return all free bytes from a "big" allocator
default: false
expiry: 2023/04/01
owner: alishananda@google.com
test_tags: [resource_quota_test]

@ -158,7 +158,9 @@ Poll<RefCountedPtr<ReclaimerQueue::Handle>> ReclaimerQueue::PollNext() {
GrpcMemoryAllocatorImpl::GrpcMemoryAllocatorImpl(
std::shared_ptr<BasicMemoryQuota> memory_quota, std::string name)
: memory_quota_(memory_quota), name_(std::move(name)) {
memory_quota_->Take(taken_bytes_);
memory_quota_->Take(
/*allocator=*/this, taken_bytes_);
memory_quota_->AddNewAllocator(this);
}
GrpcMemoryAllocatorImpl::~GrpcMemoryAllocatorImpl() {
@ -166,6 +168,7 @@ GrpcMemoryAllocatorImpl::~GrpcMemoryAllocatorImpl() {
sizeof(GrpcMemoryAllocatorImpl) ==
taken_bytes_.load(std::memory_order_relaxed));
memory_quota_->Return(taken_bytes_);
memory_quota_->RemoveAllocator(this);
}
void GrpcMemoryAllocatorImpl::Shutdown() {
@ -188,12 +191,17 @@ size_t GrpcMemoryAllocatorImpl::Reserve(MemoryRequest request) {
// inlined asserts.
GPR_ASSERT(request.min() <= request.max());
GPR_ASSERT(request.max() <= MemoryRequest::max_allowed_size());
size_t old_free = free_bytes_.load(std::memory_order_relaxed);
while (true) {
// Attempt to reserve memory from our pool.
auto reservation = TryReserve(request);
if (reservation.has_value()) {
size_t new_free = free_bytes_.load(std::memory_order_relaxed);
memory_quota_->MaybeMoveAllocator(this, old_free, new_free);
return *reservation;
}
// If that failed, grab more from the quota and retry.
Replenish();
}
@ -275,7 +283,8 @@ void GrpcMemoryAllocatorImpl::Replenish() {
auto amount = Clamp(taken_bytes_.load(std::memory_order_relaxed) / 3,
kMinReplenishBytes, kMaxReplenishBytes);
// Take the requested amount from the quota.
memory_quota_->Take(amount);
memory_quota_->Take(
/*allocator=*/this, amount);
// Record that we've taken it.
taken_bytes_.fetch_add(amount, std::memory_order_relaxed);
// Add the taken amount to the free pool.
@ -304,6 +313,8 @@ void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimer() {
// Figure out how many bytes we can return to the quota.
size_t return_bytes = p->free_bytes_.exchange(0, std::memory_order_acq_rel);
if (return_bytes == 0) return;
p->memory_quota_->MaybeMoveAllocator(p, /*old_free_bytes=*/return_bytes,
/*new_free_bytes=*/0);
// Subtract that from our outstanding balance.
p->taken_bytes_.fetch_sub(return_bytes);
// And return them to the quota.
@ -410,11 +421,11 @@ void BasicMemoryQuota::SetSize(size_t new_size) {
Return(new_size - old_size);
} else {
// We're shrinking the quota.
Take(old_size - new_size);
Take(/*allocator=*/nullptr, old_size - new_size);
}
}
void BasicMemoryQuota::Take(size_t amount) {
void BasicMemoryQuota::Take(GrpcMemoryAllocatorImpl* allocator, size_t amount) {
// If there's a request for nothing, then do nothing!
if (amount == 0) return;
GPR_DEBUG_ASSERT(amount <= std::numeric_limits<intptr_t>::max());
@ -424,6 +435,29 @@ void BasicMemoryQuota::Take(size_t amount) {
if (prior >= 0 && prior < static_cast<intptr_t>(amount)) {
if (reclaimer_activity_ != nullptr) reclaimer_activity_->ForceWakeup();
}
if (IsFreeLargeAllocatorEnabled()) {
if (allocator == nullptr) return;
GrpcMemoryAllocatorImpl* chosen_allocator = nullptr;
// Use calling allocator's shard index to choose shard.
auto& shard = big_allocators_.shards[allocator->IncrementShardIndex() %
big_allocators_.shards.size()];
if (shard.shard_mu.TryLock()) {
if (!shard.allocators.empty()) {
chosen_allocator = *shard.allocators.begin();
}
shard.shard_mu.Unlock();
}
if (chosen_allocator != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
gpr_log(GPR_INFO, "Returning bytes from big allocator %p",
chosen_allocator);
}
chosen_allocator->ReturnFree();
}
}
}
void BasicMemoryQuota::FinishReclamation(uint64_t token, Waker waker) {
@ -448,6 +482,107 @@ void BasicMemoryQuota::Return(size_t amount) {
free_bytes_.fetch_add(amount, std::memory_order_relaxed);
}
void BasicMemoryQuota::AddNewAllocator(GrpcMemoryAllocatorImpl* allocator) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
gpr_log(GPR_INFO, "Adding allocator %p", allocator);
}
AllocatorBucket::Shard& shard = small_allocators_.SelectShard(allocator);
{
absl::MutexLock l(&shard.shard_mu);
shard.allocators.emplace(allocator);
}
}
void BasicMemoryQuota::RemoveAllocator(GrpcMemoryAllocatorImpl* allocator) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
gpr_log(GPR_INFO, "Removing allocator %p", allocator);
}
AllocatorBucket::Shard& small_shard =
small_allocators_.SelectShard(allocator);
{
absl::MutexLock l(&small_shard.shard_mu);
if (small_shard.allocators.erase(allocator) == 1) {
return;
}
}
AllocatorBucket::Shard& big_shard = big_allocators_.SelectShard(allocator);
{
absl::MutexLock l(&big_shard.shard_mu);
big_shard.allocators.erase(allocator);
}
}
void BasicMemoryQuota::MaybeMoveAllocator(GrpcMemoryAllocatorImpl* allocator,
size_t old_free_bytes,
size_t new_free_bytes) {
while (true) {
if (new_free_bytes < kSmallAllocatorThreshold) {
// Still in small bucket. No move.
if (old_free_bytes < kSmallAllocatorThreshold) return;
MaybeMoveAllocatorBigToSmall(allocator);
} else if (new_free_bytes > kBigAllocatorThreshold) {
// Still in big bucket. No move.
if (old_free_bytes > kBigAllocatorThreshold) return;
MaybeMoveAllocatorSmallToBig(allocator);
} else {
// Somewhere between thresholds. No move.
return;
}
// Loop to make sure move is eventually stable.
old_free_bytes = new_free_bytes;
new_free_bytes = allocator->GetFreeBytes();
}
}
void BasicMemoryQuota::MaybeMoveAllocatorBigToSmall(
GrpcMemoryAllocatorImpl* allocator) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
gpr_log(GPR_INFO, "Moving allocator %p to small", allocator);
}
AllocatorBucket::Shard& old_shard = big_allocators_.SelectShard(allocator);
{
absl::MutexLock l(&old_shard.shard_mu);
if (old_shard.allocators.erase(allocator) == 0) return;
}
AllocatorBucket::Shard& new_shard = small_allocators_.SelectShard(allocator);
{
absl::MutexLock l(&new_shard.shard_mu);
new_shard.allocators.emplace(allocator);
}
}
void BasicMemoryQuota::MaybeMoveAllocatorSmallToBig(
GrpcMemoryAllocatorImpl* allocator) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
gpr_log(GPR_INFO, "Moving allocator %p to big", allocator);
}
AllocatorBucket::Shard& old_shard = small_allocators_.SelectShard(allocator);
{
absl::MutexLock l(&old_shard.shard_mu);
if (old_shard.allocators.erase(allocator) == 0) return;
}
AllocatorBucket::Shard& new_shard = big_allocators_.SelectShard(allocator);
{
absl::MutexLock l(&new_shard.shard_mu);
new_shard.allocators.emplace(allocator);
}
}
BasicMemoryQuota::PressureInfo BasicMemoryQuota::GetPressureInfo() {
double free = free_bytes_.load();
if (free < 0) free = 0;

@ -19,6 +19,7 @@
#include <stdint.h>
#include <array>
#include <atomic>
#include <cstddef>
#include <limits>
@ -27,6 +28,7 @@
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_set.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
@ -35,6 +37,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
@ -42,11 +45,13 @@
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/resource_quota/periodic_update.h"
#include "src/core/lib/resource_quota/trace.h"
namespace grpc_core {
class BasicMemoryQuota;
class MemoryQuota;
class GrpcMemoryAllocatorImpl;
using grpc_event_engine::experimental::MemoryRequest;
@ -274,6 +279,12 @@ class PressureTracker {
};
} // namespace memory_quota_detail
// Minimum number of free bytes in order for allocator to move to big bucket.
static constexpr size_t kBigAllocatorThreshold = 0.5 * 1024 * 1024;
// Maximum number of free bytes in order for allocator to move to small
// bucket.
static constexpr size_t kSmallAllocatorThreshold = 0.1 * 1024 * 1024;
class BasicMemoryQuota final
: public std::enable_shared_from_this<BasicMemoryQuota> {
public:
@ -302,11 +313,18 @@ class BasicMemoryQuota final
void SetSize(size_t new_size);
// Forcefully take some memory from the quota, potentially entering
// overcommit.
void Take(size_t amount);
void Take(GrpcMemoryAllocatorImpl* allocator, size_t amount);
// Finish reclamation pass.
void FinishReclamation(uint64_t token, Waker waker);
// Return some memory to the quota.
void Return(size_t amount);
// Add allocator to list of allocators in small bucket. Returns allocator id.
void AddNewAllocator(GrpcMemoryAllocatorImpl* allocator);
// Remove allocator from list of allocators.
void RemoveAllocator(GrpcMemoryAllocatorImpl* allocator);
// Determine whether to move allocator to different bucket and if so, move.
void MaybeMoveAllocator(GrpcMemoryAllocatorImpl* allocator,
size_t old_free_bytes, size_t new_free_bytes);
// Instantaneous memory pressure approximation.
PressureInfo GetPressureInfo();
// Get a reclamation queue
@ -319,8 +337,29 @@ class BasicMemoryQuota final
friend class ReclamationSweep;
class WaitForSweepPromise;
class AllocatorBucket {
public:
struct Shard {
absl::flat_hash_set<GrpcMemoryAllocatorImpl*> allocators
ABSL_GUARDED_BY(shard_mu);
absl::Mutex shard_mu;
};
Shard& SelectShard(void* key) {
const size_t hash = HashPointer(key, shards.size());
return shards[hash % shards.size()];
}
std::array<Shard, 16> shards;
};
static constexpr intptr_t kInitialSize = std::numeric_limits<intptr_t>::max();
// Move allocator from big bucket to small bucket.
void MaybeMoveAllocatorBigToSmall(GrpcMemoryAllocatorImpl* allocator);
// Move allocator from small bucket to big bucket.
void MaybeMoveAllocatorSmallToBig(GrpcMemoryAllocatorImpl* allocator);
// The amount of memory that's free in this quota.
// We use intptr_t as a reasonable proxy for ssize_t that's portable.
// We allow arbitrary overcommit and so this must allow negative values.
@ -330,6 +369,10 @@ class BasicMemoryQuota final
// Reclaimer queues.
ReclaimerQueue reclaimers_[kNumReclamationPasses];
// List of all allocators sorted into 2 buckets, small (<100 KB free bytes)
// and large (>500 KB free bytes).
AllocatorBucket small_allocators_;
AllocatorBucket big_allocators_;
// The reclaimer activity consumes reclaimers whenever we are in overcommit to
// try and get back under memory limits.
ActivityPtr reclaimer_activity_;
@ -371,10 +414,25 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl {
// Try to immediately return some free'ed memory back to the total quota.
MaybeDonateBack();
}
size_t new_free = free_bytes_.load(std::memory_order_relaxed);
memory_quota_->MaybeMoveAllocator(this, prev_free, new_free);
if (prev_free != 0) return;
MaybeRegisterReclaimer();
}
// Return all free bytes to quota.
void ReturnFree() {
size_t ret = free_bytes_.exchange(0, std::memory_order_acq_rel);
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
gpr_log(GPR_INFO, "Allocator %p returning %zu bytes to quota", this, ret);
}
if (ret == 0) return;
memory_quota_->MaybeMoveAllocator(this, /*old_free_bytes=*/ret,
/*new_free_bytes=*/0);
taken_bytes_.fetch_sub(ret, std::memory_order_relaxed);
memory_quota_->Return(ret);
}
// Post a reclamation function.
template <typename F>
void PostReclaimer(ReclamationPass pass, F fn) {
@ -394,8 +452,17 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl {
// Name of this allocator
absl::string_view name() const { return name_; }
size_t GetFreeBytes() const {
return free_bytes_.load(std::memory_order_relaxed);
}
size_t IncrementShardIndex() {
return chosen_shard_idx_.fetch_add(1, std::memory_order_relaxed);
}
private:
static constexpr size_t kMaxQuotaBufferSize = 1024 * 1024;
// Primitive reservation function.
absl::optional<size_t> TryReserve(MemoryRequest request) GRPC_MUST_USE_RESULT;
// This function may be invoked during a memory release operation.
@ -423,6 +490,8 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl {
std::atomic<size_t> free_bytes_{0};
// Amount of memory taken from the quota by this allocator.
std::atomic<size_t> taken_bytes_{sizeof(GrpcMemoryAllocatorImpl)};
// Index used to randomly choose shard to return bytes from.
std::atomic<size_t> chosen_shard_idx_{0};
std::atomic<bool> registered_reclaimer_{false};
// We try to donate back some memory periodically to the central quota.
PeriodicUpdate donate_back_{Duration::Seconds(10)};
@ -433,6 +502,7 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl {
OrphanablePtr<ReclaimerQueue::Handle>
reclamation_handles_[kNumReclamationPasses] ABSL_GUARDED_BY(
reclaimer_mu_);
// Name of this allocator.
std::string name_;
};

Loading…
Cancel
Save