diff --git a/CMakeLists.txt b/CMakeLists.txt index d16019a0c9e..a8c1e5d7532 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8078,6 +8078,7 @@ target_link_libraries(chunked_vector_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ZLIB_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::flat_hash_set absl::any_invocable absl::function_ref absl::hash @@ -10804,6 +10805,7 @@ target_link_libraries(flow_control_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ZLIB_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::flat_hash_set absl::any_invocable absl::function_ref absl::hash @@ -10872,6 +10874,7 @@ target_link_libraries(for_each_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ZLIB_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::flat_hash_set absl::any_invocable absl::function_ref absl::hash @@ -14388,6 +14391,7 @@ target_link_libraries(map_pipe_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ZLIB_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::flat_hash_set absl::any_invocable absl::function_ref absl::hash @@ -15858,6 +15862,7 @@ target_link_libraries(pipe_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ZLIB_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::flat_hash_set absl::any_invocable absl::function_ref absl::hash @@ -21056,6 +21061,7 @@ target_link_libraries(try_concurrently_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ZLIB_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::flat_hash_set absl::any_invocable absl::function_ref absl::hash diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl index dfa7625a720..22e1ff130a6 100644 --- a/bazel/experiments.bzl +++ b/bazel/experiments.bzl @@ -51,6 +51,7 @@ EXPERIMENTS = { "promise_based_client_call", ], "resource_quota_test": [ + "free_large_allocator", "memory_pressure_controller", "unconstrained_max_quota_buffer_size", ], diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 5aa3b5fa327..2664932c549 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -5622,6 +5622,7 @@ targets: - src/core/lib/slice/slice_string_helpers.cc - test/core/gprpp/chunked_vector_test.cc deps: + - absl/container:flat_hash_set - absl/functional:any_invocable - absl/functional:function_ref - absl/hash:hash @@ -6895,6 +6896,7 @@ targets: - src/core/lib/transport/pid_controller.cc - test/core/transport/chttp2/flow_control_test.cc deps: + - absl/container:flat_hash_set - absl/functional:any_invocable - absl/functional:function_ref - absl/hash:hash @@ -6989,6 +6991,7 @@ targets: - src/core/lib/slice/slice_string_helpers.cc - test/core/promise/for_each_test.cc deps: + - absl/container:flat_hash_set - absl/functional:any_invocable - absl/functional:function_ref - absl/hash:hash @@ -8927,6 +8930,7 @@ targets: - src/core/lib/slice/slice_string_helpers.cc - test/core/promise/map_pipe_test.cc deps: + - absl/container:flat_hash_set - absl/functional:any_invocable - absl/functional:function_ref - absl/hash:hash @@ -9584,6 +9588,7 @@ targets: - src/core/lib/slice/slice_string_helpers.cc - test/core/promise/pipe_test.cc deps: + - absl/container:flat_hash_set - absl/functional:any_invocable - absl/functional:function_ref - absl/hash:hash @@ -11906,6 +11911,7 @@ targets: - src/core/lib/slice/slice_string_helpers.cc - test/core/promise/try_concurrently_test.cc deps: + - absl/container:flat_hash_set - absl/functional:any_invocable - absl/functional:function_ref - absl/hash:hash diff --git a/src/core/BUILD b/src/core/BUILD index 8be251fc146..f0308588773 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -946,6 +946,7 @@ grpc_cc_library( ], external_deps = [ "absl/base:core_headers", + "absl/container:flat_hash_set", "absl/status", "absl/strings", "absl/types:optional", diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index 840c5f2fffa..69cad8fdd92 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -51,6 +51,8 @@ const char* const description_promise_based_client_call = "(ie when all filters in a stack are promise based)"; const char* const description_posix_event_engine_enable_polling = "If set, enables polling on the default posix event engine."; +const char* const description_free_large_allocator = + "If set, return all free bytes from a \042big\042 allocator"; #ifdef NDEBUG const bool kDefaultForDebugOnly = false; #else @@ -78,6 +80,7 @@ const ExperimentMetadata g_experiment_metadata[] = { {"promise_based_client_call", description_promise_based_client_call, false}, {"posix_event_engine_enable_polling", description_posix_event_engine_enable_polling, kDefaultForDebugOnly}, + {"free_large_allocator", description_free_large_allocator, false}, }; } // namespace grpc_core diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index b3a60bf966d..238c0c474ce 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -45,6 +45,7 @@ inline bool IsPromiseBasedClientCallEnabled() { inline bool IsPosixEventEngineEnablePollingEnabled() { return IsExperimentEnabled(11); } +inline bool IsFreeLargeAllocatorEnabled() { return IsExperimentEnabled(12); } struct ExperimentMetadata { const char* name; @@ -52,7 +53,7 @@ struct ExperimentMetadata { bool default_value; }; -constexpr const size_t kNumExperiments = 12; +constexpr const size_t kNumExperiments = 13; extern const ExperimentMetadata g_experiment_metadata[kNumExperiments]; } // namespace grpc_core diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index 0f47aad073f..3505de38229 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -127,3 +127,11 @@ expiry: 2023/01/01 owner: vigneshbabu@google.com test_tags: ["event_engine_client_test"] +- name: free_large_allocator + description: + If set, return all free bytes from a "big" allocator + default: false + expiry: 2023/04/01 + owner: alishananda@google.com + test_tags: [resource_quota_test] + diff --git a/src/core/lib/resource_quota/memory_quota.cc b/src/core/lib/resource_quota/memory_quota.cc index ee718b5e029..c7ab2956820 100644 --- a/src/core/lib/resource_quota/memory_quota.cc +++ b/src/core/lib/resource_quota/memory_quota.cc @@ -159,6 +159,7 @@ GrpcMemoryAllocatorImpl::GrpcMemoryAllocatorImpl( std::shared_ptr memory_quota, std::string name) : memory_quota_(memory_quota), name_(std::move(name)) { memory_quota_->Take(taken_bytes_); + memory_quota_->AddNewAllocator(this); } GrpcMemoryAllocatorImpl::~GrpcMemoryAllocatorImpl() { @@ -166,6 +167,7 @@ GrpcMemoryAllocatorImpl::~GrpcMemoryAllocatorImpl() { sizeof(GrpcMemoryAllocatorImpl) == taken_bytes_.load(std::memory_order_relaxed)); memory_quota_->Return(taken_bytes_); + memory_quota_->RemoveAllocator(this); } void GrpcMemoryAllocatorImpl::Shutdown() { @@ -188,12 +190,17 @@ size_t GrpcMemoryAllocatorImpl::Reserve(MemoryRequest request) { // inlined asserts. GPR_ASSERT(request.min() <= request.max()); GPR_ASSERT(request.max() <= MemoryRequest::max_allowed_size()); + size_t old_free = free_bytes_.load(std::memory_order_relaxed); + while (true) { // Attempt to reserve memory from our pool. auto reservation = TryReserve(request); if (reservation.has_value()) { + size_t new_free = free_bytes_.load(std::memory_order_relaxed); + memory_quota_->MaybeMoveAllocator(this, old_free, new_free); return *reservation; } + // If that failed, grab more from the quota and retry. Replenish(); } @@ -304,6 +311,8 @@ void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimer() { // Figure out how many bytes we can return to the quota. size_t return_bytes = p->free_bytes_.exchange(0, std::memory_order_acq_rel); if (return_bytes == 0) return; + p->memory_quota_->MaybeMoveAllocator(p, /*old_free_bytes=*/return_bytes, + /*new_free_bytes=*/0); // Subtract that from our outstanding balance. p->taken_bytes_.fetch_sub(return_bytes); // And return them to the quota. @@ -424,6 +433,21 @@ void BasicMemoryQuota::Take(size_t amount) { if (prior >= 0 && prior < static_cast(amount)) { if (reclaimer_activity_ != nullptr) reclaimer_activity_->ForceWakeup(); } + + if (IsFreeLargeAllocatorEnabled()) { + GrpcMemoryAllocatorImpl* chosen_allocator = nullptr; + int shard_idx = rand() % big_allocators_.shards.size(); + auto& shard = big_allocators_.shards[shard_idx]; + + if (shard.shard_mu.TryLock()) { + if (!shard.allocators.empty()) { + chosen_allocator = *shard.allocators.begin(); + } + shard.shard_mu.Unlock(); + } + + if (chosen_allocator != nullptr) chosen_allocator->ReturnFree(); + } } void BasicMemoryQuota::FinishReclamation(uint64_t token, Waker waker) { @@ -448,6 +472,107 @@ void BasicMemoryQuota::Return(size_t amount) { free_bytes_.fetch_add(amount, std::memory_order_relaxed); } +void BasicMemoryQuota::AddNewAllocator(GrpcMemoryAllocatorImpl* allocator) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { + gpr_log(GPR_INFO, "Adding allocator %p", allocator); + } + + AllocatorBucket::Shard& shard = small_allocators_.SelectShard(allocator); + + { + absl::MutexLock l(&shard.shard_mu); + shard.allocators.emplace(allocator); + } +} + +void BasicMemoryQuota::RemoveAllocator(GrpcMemoryAllocatorImpl* allocator) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { + gpr_log(GPR_INFO, "Removing allocator %p", allocator); + } + + AllocatorBucket::Shard& small_shard = + small_allocators_.SelectShard(allocator); + + { + absl::MutexLock l(&small_shard.shard_mu); + if (small_shard.allocators.erase(allocator) == 1) { + return; + } + } + + AllocatorBucket::Shard& big_shard = big_allocators_.SelectShard(allocator); + + { + absl::MutexLock l(&big_shard.shard_mu); + big_shard.allocators.erase(allocator); + } +} + +void BasicMemoryQuota::MaybeMoveAllocator(GrpcMemoryAllocatorImpl* allocator, + size_t old_free_bytes, + size_t new_free_bytes) { + while (true) { + if (new_free_bytes < kSmallAllocatorThreshold) { + // Still in small bucket. No move. + if (old_free_bytes < kSmallAllocatorThreshold) return; + MaybeMoveAllocatorBigToSmall(allocator); + } else if (new_free_bytes > kBigAllocatorThreshold) { + // Still in big bucket. No move. + if (old_free_bytes > kBigAllocatorThreshold) return; + MaybeMoveAllocatorSmallToBig(allocator); + } else { + // Somewhere between thresholds. No move. + return; + } + + // Loop to make sure move is eventually stable. + old_free_bytes = new_free_bytes; + new_free_bytes = allocator->GetFreeBytes(); + } +} + +void BasicMemoryQuota::MaybeMoveAllocatorBigToSmall( + GrpcMemoryAllocatorImpl* allocator) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { + gpr_log(GPR_INFO, "Moving allocator %p to small", allocator); + } + + AllocatorBucket::Shard& old_shard = big_allocators_.SelectShard(allocator); + + { + absl::MutexLock l(&old_shard.shard_mu); + if (old_shard.allocators.erase(allocator) == 0) return; + } + + AllocatorBucket::Shard& new_shard = small_allocators_.SelectShard(allocator); + + { + absl::MutexLock l(&new_shard.shard_mu); + new_shard.allocators.emplace(allocator); + } +} + +void BasicMemoryQuota::MaybeMoveAllocatorSmallToBig( + GrpcMemoryAllocatorImpl* allocator) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { + gpr_log(GPR_INFO, "Moving allocator %p to big", allocator); + } + + AllocatorBucket::Shard& old_shard = small_allocators_.SelectShard(allocator); + + { + absl::MutexLock l(&old_shard.shard_mu); + if (old_shard.allocators.erase(allocator) == 0) return; + } + + AllocatorBucket::Shard& new_shard = big_allocators_.SelectShard(allocator); + + { + absl::MutexLock l(&new_shard.shard_mu); + new_shard.allocators.emplace(allocator); + } +} + BasicMemoryQuota::PressureInfo BasicMemoryQuota::GetPressureInfo() { double free = free_bytes_.load(); if (free < 0) free = 0; diff --git a/src/core/lib/resource_quota/memory_quota.h b/src/core/lib/resource_quota/memory_quota.h index c0da890ed58..2a17d1bba04 100644 --- a/src/core/lib/resource_quota/memory_quota.h +++ b/src/core/lib/resource_quota/memory_quota.h @@ -19,6 +19,7 @@ #include +#include #include #include #include @@ -27,6 +28,7 @@ #include #include "absl/base/thread_annotations.h" +#include "absl/container/flat_hash_set.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" @@ -35,6 +37,7 @@ #include #include "src/core/lib/experiments/experiments.h" +#include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" @@ -42,11 +45,13 @@ #include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/poll.h" #include "src/core/lib/resource_quota/periodic_update.h" +#include "src/core/lib/resource_quota/trace.h" namespace grpc_core { class BasicMemoryQuota; class MemoryQuota; +class GrpcMemoryAllocatorImpl; using grpc_event_engine::experimental::MemoryRequest; @@ -274,6 +279,12 @@ class PressureTracker { }; } // namespace memory_quota_detail +// Minimum number of free bytes in order for allocator to move to big bucket. +static constexpr size_t kBigAllocatorThreshold = 0.5 * 1024 * 1024; +// Maximum number of free bytes in order for allocator to move to small +// bucket. +static constexpr size_t kSmallAllocatorThreshold = 0.1 * 1024 * 1024; + class BasicMemoryQuota final : public std::enable_shared_from_this { public: @@ -307,6 +318,13 @@ class BasicMemoryQuota final void FinishReclamation(uint64_t token, Waker waker); // Return some memory to the quota. void Return(size_t amount); + // Add allocator to list of allocators in small bucket. Returns allocator id. + void AddNewAllocator(GrpcMemoryAllocatorImpl* allocator); + // Remove allocator from list of allocators. + void RemoveAllocator(GrpcMemoryAllocatorImpl* allocator); + // Determine whether to move allocator to different bucket and if so, move. + void MaybeMoveAllocator(GrpcMemoryAllocatorImpl* allocator, + size_t old_free_bytes, size_t new_free_bytes); // Instantaneous memory pressure approximation. PressureInfo GetPressureInfo(); // Get a reclamation queue @@ -319,8 +337,29 @@ class BasicMemoryQuota final friend class ReclamationSweep; class WaitForSweepPromise; + class AllocatorBucket { + public: + struct Shard { + absl::flat_hash_set allocators + ABSL_GUARDED_BY(shard_mu); + absl::Mutex shard_mu; + }; + + Shard& SelectShard(void* key) { + const size_t hash = HashPointer(key, shards.size()); + return shards[hash % shards.size()]; + } + + std::array shards; + }; + static constexpr intptr_t kInitialSize = std::numeric_limits::max(); + // Move allocator from big bucket to small bucket. + void MaybeMoveAllocatorBigToSmall(GrpcMemoryAllocatorImpl* allocator); + // Move allocator from small bucket to big bucket. + void MaybeMoveAllocatorSmallToBig(GrpcMemoryAllocatorImpl* allocator); + // The amount of memory that's free in this quota. // We use intptr_t as a reasonable proxy for ssize_t that's portable. // We allow arbitrary overcommit and so this must allow negative values. @@ -330,6 +369,10 @@ class BasicMemoryQuota final // Reclaimer queues. ReclaimerQueue reclaimers_[kNumReclamationPasses]; + // List of all allocators sorted into 2 buckets, small (<100 KB free bytes) + // and large (>500 KB free bytes). + AllocatorBucket small_allocators_; + AllocatorBucket big_allocators_; // The reclaimer activity consumes reclaimers whenever we are in overcommit to // try and get back under memory limits. ActivityPtr reclaimer_activity_; @@ -371,10 +414,25 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl { // Try to immediately return some free'ed memory back to the total quota. MaybeDonateBack(); } + size_t new_free = free_bytes_.load(std::memory_order_relaxed); + memory_quota_->MaybeMoveAllocator(this, prev_free, new_free); if (prev_free != 0) return; MaybeRegisterReclaimer(); } + // Return all free bytes to quota. + void ReturnFree() { + size_t ret = free_bytes_.exchange(0, std::memory_order_acq_rel); + if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { + gpr_log(GPR_INFO, "Allocator %p returning %zu bytes to quota", this, ret); + } + if (ret == 0) return; + memory_quota_->MaybeMoveAllocator(this, /*old_free_bytes=*/ret, + /*new_free_bytes=*/0); + taken_bytes_.fetch_sub(ret, std::memory_order_relaxed); + memory_quota_->Return(ret); + } + // Post a reclamation function. template void PostReclaimer(ReclamationPass pass, F fn) { @@ -394,8 +452,13 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl { // Name of this allocator absl::string_view name() const { return name_; } + size_t GetFreeBytes() const { + return free_bytes_.load(std::memory_order_relaxed); + } + private: static constexpr size_t kMaxQuotaBufferSize = 1024 * 1024; + // Primitive reservation function. absl::optional TryReserve(MemoryRequest request) GRPC_MUST_USE_RESULT; // This function may be invoked during a memory release operation. @@ -433,6 +496,7 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl { OrphanablePtr reclamation_handles_[kNumReclamationPasses] ABSL_GUARDED_BY( reclaimer_mu_); + // Name of this allocator. std::string name_; };