Integrate new resource quota, event engine (#27522)

* Integrate new resource quota, event engine

* Automated change: Fix sanity tests

* windows fix

* Automated change: Fix sanity tests

* Update memory_allocator.h

* Automated change: Fix sanity tests

* first round review feedback

* review feedback

* Automated change: Fix sanity tests

* Update memory_quota.h

* get re-export formatted right

* Automated change: Fix sanity tests

* Update memory_allocator.cc

* Automated change: Fix sanity tests

* MemoryOwner has-a MemoryAllocator

* using fix

* review feedback

* Automated change: Fix sanity tests

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/27643/head
Craig Tiller 3 years ago committed by GitHub
parent 3d83dd3776
commit 79ef60f079
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      BUILD
  2. 11
      CMakeLists.txt
  3. 8
      Makefile
  4. 13
      build_autogenerated.yaml
  5. 1
      config.m4
  6. 1
      config.w32
  7. 2
      gRPC-C++.podspec
  8. 6
      gRPC-Core.podspec
  9. 5
      grpc.gemspec
  10. 2
      grpc.gyp
  11. 18
      include/grpc/event_engine/event_engine.h
  12. 98
      include/grpc/event_engine/internal/memory_allocator_impl.h
  13. 210
      include/grpc/event_engine/memory_allocator.h
  14. 71
      include/grpc/event_engine/slice_allocator.h
  15. 5
      package.xml
  16. 67
      src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc
  17. 74
      src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h
  18. 70
      src/core/lib/event_engine/memory_allocator.cc
  19. 167
      src/core/lib/resource_quota/memory_quota.cc
  20. 330
      src/core/lib/resource_quota/memory_quota.h
  21. 2
      src/core/lib/resource_quota/resource_quota.cc
  22. 10
      src/core/lib/resource_quota/resource_quota.h
  23. 1
      src/python/grpcio/grpc_core_dependencies.py
  24. 38
      test/core/resource_quota/memory_quota_fuzzer.cc
  25. 32
      test/core/resource_quota/memory_quota_stress_test.cc
  26. 104
      test/core/resource_quota/memory_quota_test.cc
  27. 3
      tools/doxygen/Doxyfile.c++
  28. 5
      tools/doxygen/Doxyfile.c++.internal
  29. 3
      tools/doxygen/Doxyfile.core
  30. 5
      tools/doxygen/Doxyfile.core.internal

24
BUILD

@ -161,7 +161,8 @@ GRPC_PUBLIC_EVENT_ENGINE_HDRS = [
"include/grpc/event_engine/endpoint_config.h",
"include/grpc/event_engine/event_engine.h",
"include/grpc/event_engine/port.h",
"include/grpc/event_engine/slice_allocator.h",
"include/grpc/event_engine/memory_allocator.h",
"include/grpc/event_engine/internal/memory_allocator_impl.h",
]
GRPC_SECURE_PUBLIC_HDRS = [
@ -1296,6 +1297,24 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "event_engine_memory_allocator",
srcs = [
"src/core/lib/event_engine/memory_allocator.cc",
],
hdrs = [
"include/grpc/event_engine/internal/memory_allocator_impl.h",
"include/grpc/event_engine/memory_allocator.h",
],
language = "c++",
deps = [
"gpr_platform",
"ref_counted",
"slice",
"slice_refcount",
],
)
grpc_cc_library(
name = "memory_quota",
srcs = [
@ -1307,6 +1326,7 @@ grpc_cc_library(
deps = [
"activity",
"dual_ref_counted",
"event_engine_memory_allocator",
"exec_ctx_wakeup_scheduler",
"gpr_base",
"loop",
@ -3172,7 +3192,6 @@ grpc_cc_library(
"src/core/ext/transport/chttp2/transport/bin_decoder.cc",
"src/core/ext/transport/chttp2/transport/bin_encoder.cc",
"src/core/ext/transport/chttp2/transport/chttp2_plugin.cc",
"src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc",
"src/core/ext/transport/chttp2/transport/chttp2_transport.cc",
"src/core/ext/transport/chttp2/transport/context_list.cc",
"src/core/ext/transport/chttp2/transport/flow_control.cc",
@ -3198,7 +3217,6 @@ grpc_cc_library(
hdrs = [
"src/core/ext/transport/chttp2/transport/bin_decoder.h",
"src/core/ext/transport/chttp2/transport/bin_encoder.h",
"src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h",
"src/core/ext/transport/chttp2/transport/chttp2_transport.h",
"src/core/ext/transport/chttp2/transport/context_list.h",
"src/core/ext/transport/chttp2/transport/flow_control.h",

11
CMakeLists.txt generated

@ -1575,7 +1575,6 @@ add_library(grpc
src/core/ext/transport/chttp2/transport/bin_decoder.cc
src/core/ext/transport/chttp2/transport/bin_encoder.cc
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc
src/core/ext/transport/chttp2/transport/chttp2_transport.cc
src/core/ext/transport/chttp2/transport/context_list.cc
src/core/ext/transport/chttp2/transport/flow_control.cc
@ -2115,8 +2114,9 @@ foreach(_hdr
include/grpc/compression.h
include/grpc/event_engine/endpoint_config.h
include/grpc/event_engine/event_engine.h
include/grpc/event_engine/internal/memory_allocator_impl.h
include/grpc/event_engine/memory_allocator.h
include/grpc/event_engine/port.h
include/grpc/event_engine/slice_allocator.h
include/grpc/fork.h
include/grpc/grpc.h
include/grpc/grpc_posix.h
@ -2406,7 +2406,6 @@ add_library(grpc_unsecure
src/core/ext/transport/chttp2/transport/bin_decoder.cc
src/core/ext/transport/chttp2/transport/bin_encoder.cc
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc
src/core/ext/transport/chttp2/transport/chttp2_transport.cc
src/core/ext/transport/chttp2/transport/context_list.cc
src/core/ext/transport/chttp2/transport/flow_control.cc
@ -2671,8 +2670,9 @@ foreach(_hdr
include/grpc/compression.h
include/grpc/event_engine/endpoint_config.h
include/grpc/event_engine/event_engine.h
include/grpc/event_engine/internal/memory_allocator_impl.h
include/grpc/event_engine/memory_allocator.h
include/grpc/event_engine/port.h
include/grpc/event_engine/slice_allocator.h
include/grpc/fork.h
include/grpc/grpc.h
include/grpc/grpc_posix.h
@ -6062,6 +6062,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_executable(memory_quota_stress_test
src/core/lib/debug/trace.cc
src/core/lib/event_engine/memory_allocator.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/exec_ctx.cc
@ -12594,6 +12595,7 @@ if(gRPC_BUILD_TESTS)
add_executable(memory_quota_test
src/core/lib/debug/trace.cc
src/core/lib/event_engine/memory_allocator.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/exec_ctx.cc
@ -15184,6 +15186,7 @@ if(gRPC_BUILD_TESTS)
add_executable(test_core_resource_quota_resource_quota_test
src/core/lib/debug/trace.cc
src/core/lib/event_engine/memory_allocator.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/exec_ctx.cc

8
Makefile generated

@ -1132,7 +1132,6 @@ LIBGRPC_SRC = \
src/core/ext/transport/chttp2/transport/bin_decoder.cc \
src/core/ext/transport/chttp2/transport/bin_encoder.cc \
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc \
src/core/ext/transport/chttp2/transport/chttp2_transport.cc \
src/core/ext/transport/chttp2/transport/context_list.cc \
src/core/ext/transport/chttp2/transport/flow_control.cc \
@ -1620,8 +1619,9 @@ PUBLIC_HEADERS_C += \
include/grpc/compression.h \
include/grpc/event_engine/endpoint_config.h \
include/grpc/event_engine/event_engine.h \
include/grpc/event_engine/internal/memory_allocator_impl.h \
include/grpc/event_engine/memory_allocator.h \
include/grpc/event_engine/port.h \
include/grpc/event_engine/slice_allocator.h \
include/grpc/fork.h \
include/grpc/grpc.h \
include/grpc/grpc_posix.h \
@ -1810,7 +1810,6 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/transport/chttp2/transport/bin_decoder.cc \
src/core/ext/transport/chttp2/transport/bin_encoder.cc \
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc \
src/core/ext/transport/chttp2/transport/chttp2_transport.cc \
src/core/ext/transport/chttp2/transport/context_list.cc \
src/core/ext/transport/chttp2/transport/flow_control.cc \
@ -2024,8 +2023,9 @@ PUBLIC_HEADERS_C += \
include/grpc/compression.h \
include/grpc/event_engine/endpoint_config.h \
include/grpc/event_engine/event_engine.h \
include/grpc/event_engine/internal/memory_allocator_impl.h \
include/grpc/event_engine/memory_allocator.h \
include/grpc/event_engine/port.h \
include/grpc/event_engine/slice_allocator.h \
include/grpc/fork.h \
include/grpc/grpc.h \
include/grpc/grpc_posix.h \

@ -421,8 +421,9 @@ libs:
- include/grpc/compression.h
- include/grpc/event_engine/endpoint_config.h
- include/grpc/event_engine/event_engine.h
- include/grpc/event_engine/internal/memory_allocator_impl.h
- include/grpc/event_engine/memory_allocator.h
- include/grpc/event_engine/port.h
- include/grpc/event_engine/slice_allocator.h
- include/grpc/fork.h
- include/grpc/grpc.h
- include/grpc/grpc_posix.h
@ -500,7 +501,6 @@ libs:
- src/core/ext/transport/chttp2/server/chttp2_server.h
- src/core/ext/transport/chttp2/transport/bin_decoder.h
- src/core/ext/transport/chttp2/transport/bin_encoder.h
- src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h
- src/core/ext/transport/chttp2/transport/chttp2_transport.h
- src/core/ext/transport/chttp2/transport/context_list.h
- src/core/ext/transport/chttp2/transport/flow_control.h
@ -1046,7 +1046,6 @@ libs:
- src/core/ext/transport/chttp2/transport/bin_decoder.cc
- src/core/ext/transport/chttp2/transport/bin_encoder.cc
- src/core/ext/transport/chttp2/transport/chttp2_plugin.cc
- src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc
- src/core/ext/transport/chttp2/transport/chttp2_transport.cc
- src/core/ext/transport/chttp2/transport/context_list.cc
- src/core/ext/transport/chttp2/transport/flow_control.cc
@ -1669,8 +1668,9 @@ libs:
- include/grpc/compression.h
- include/grpc/event_engine/endpoint_config.h
- include/grpc/event_engine/event_engine.h
- include/grpc/event_engine/internal/memory_allocator_impl.h
- include/grpc/event_engine/memory_allocator.h
- include/grpc/event_engine/port.h
- include/grpc/event_engine/slice_allocator.h
- include/grpc/fork.h
- include/grpc/grpc.h
- include/grpc/grpc_posix.h
@ -1744,7 +1744,6 @@ libs:
- src/core/ext/transport/chttp2/server/chttp2_server.h
- src/core/ext/transport/chttp2/transport/bin_decoder.h
- src/core/ext/transport/chttp2/transport/bin_encoder.h
- src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h
- src/core/ext/transport/chttp2/transport/chttp2_transport.h
- src/core/ext/transport/chttp2/transport/context_list.h
- src/core/ext/transport/chttp2/transport/flow_control.h
@ -2015,7 +2014,6 @@ libs:
- src/core/ext/transport/chttp2/transport/bin_decoder.cc
- src/core/ext/transport/chttp2/transport/bin_encoder.cc
- src/core/ext/transport/chttp2/transport/chttp2_plugin.cc
- src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc
- src/core/ext/transport/chttp2/transport/chttp2_transport.cc
- src/core/ext/transport/chttp2/transport/context_list.cc
- src/core/ext/transport/chttp2/transport/flow_control.cc
@ -3687,6 +3685,7 @@ targets:
- src/core/lib/slice/static_slice.h
src:
- src/core/lib/debug/trace.cc
- src/core/lib/event_engine/memory_allocator.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/exec_ctx.cc
@ -6519,6 +6518,7 @@ targets:
- src/core/lib/slice/static_slice.h
src:
- src/core/lib/debug/trace.cc
- src/core/lib/event_engine/memory_allocator.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/exec_ctx.cc
@ -7613,6 +7613,7 @@ targets:
- src/core/lib/slice/static_slice.h
src:
- src/core/lib/debug/trace.cc
- src/core/lib/event_engine/memory_allocator.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/exec_ctx.cc

1
config.m4 generated

@ -127,7 +127,6 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/transport/chttp2/transport/bin_decoder.cc \
src/core/ext/transport/chttp2/transport/bin_encoder.cc \
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc \
src/core/ext/transport/chttp2/transport/chttp2_transport.cc \
src/core/ext/transport/chttp2/transport/context_list.cc \
src/core/ext/transport/chttp2/transport/flow_control.cc \

1
config.w32 generated

@ -93,7 +93,6 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\transport\\chttp2\\transport\\bin_decoder.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\bin_encoder.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\chttp2_plugin.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\chttp2_slice_allocator.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\chttp2_transport.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\context_list.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\flow_control.cc " +

2
gRPC-C++.podspec generated

@ -271,7 +271,6 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/server/chttp2_server.h',
'src/core/ext/transport/chttp2/transport/bin_decoder.h',
'src/core/ext/transport/chttp2/transport/bin_encoder.h',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h',
'src/core/ext/transport/chttp2/transport/chttp2_transport.h',
'src/core/ext/transport/chttp2/transport/context_list.h',
'src/core/ext/transport/chttp2/transport/flow_control.h',
@ -942,7 +941,6 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/server/chttp2_server.h',
'src/core/ext/transport/chttp2/transport/bin_decoder.h',
'src/core/ext/transport/chttp2/transport/bin_encoder.h',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h',
'src/core/ext/transport/chttp2/transport/chttp2_transport.h',
'src/core/ext/transport/chttp2/transport/context_list.h',
'src/core/ext/transport/chttp2/transport/flow_control.h',

6
gRPC-Core.podspec generated

@ -119,8 +119,9 @@ Pod::Spec.new do |s|
'include/grpc/compression.h',
'include/grpc/event_engine/endpoint_config.h',
'include/grpc/event_engine/event_engine.h',
'include/grpc/event_engine/internal/memory_allocator_impl.h',
'include/grpc/event_engine/memory_allocator.h',
'include/grpc/event_engine/port.h',
'include/grpc/event_engine/slice_allocator.h',
'include/grpc/fork.h',
'include/grpc/grpc.h',
'include/grpc/grpc_posix.h',
@ -349,8 +350,6 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/transport/bin_encoder.cc',
'src/core/ext/transport/chttp2/transport/bin_encoder.h',
'src/core/ext/transport/chttp2/transport/chttp2_plugin.cc',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h',
'src/core/ext/transport/chttp2/transport/chttp2_transport.cc',
'src/core/ext/transport/chttp2/transport/chttp2_transport.h',
'src/core/ext/transport/chttp2/transport/context_list.cc',
@ -1528,7 +1527,6 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/server/chttp2_server.h',
'src/core/ext/transport/chttp2/transport/bin_decoder.h',
'src/core/ext/transport/chttp2/transport/bin_encoder.h',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h',
'src/core/ext/transport/chttp2/transport/chttp2_transport.h',
'src/core/ext/transport/chttp2/transport/context_list.h',
'src/core/ext/transport/chttp2/transport/flow_control.h',

5
grpc.gemspec generated

@ -55,8 +55,9 @@ Gem::Specification.new do |s|
s.files += %w( include/grpc/compression.h )
s.files += %w( include/grpc/event_engine/endpoint_config.h )
s.files += %w( include/grpc/event_engine/event_engine.h )
s.files += %w( include/grpc/event_engine/internal/memory_allocator_impl.h )
s.files += %w( include/grpc/event_engine/memory_allocator.h )
s.files += %w( include/grpc/event_engine/port.h )
s.files += %w( include/grpc/event_engine/slice_allocator.h )
s.files += %w( include/grpc/fork.h )
s.files += %w( include/grpc/grpc.h )
s.files += %w( include/grpc/grpc_posix.h )
@ -261,8 +262,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/transport/chttp2/transport/bin_encoder.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/bin_encoder.h )
s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_plugin.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h )
s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_transport.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_transport.h )
s.files += %w( src/core/ext/transport/chttp2/transport/context_list.cc )

2
grpc.gyp generated

@ -574,7 +574,6 @@
'src/core/ext/transport/chttp2/transport/bin_decoder.cc',
'src/core/ext/transport/chttp2/transport/bin_encoder.cc',
'src/core/ext/transport/chttp2/transport/chttp2_plugin.cc',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc',
'src/core/ext/transport/chttp2/transport/chttp2_transport.cc',
'src/core/ext/transport/chttp2/transport/context_list.cc',
'src/core/ext/transport/chttp2/transport/flow_control.cc',
@ -1228,7 +1227,6 @@
'src/core/ext/transport/chttp2/transport/bin_decoder.cc',
'src/core/ext/transport/chttp2/transport/bin_encoder.cc',
'src/core/ext/transport/chttp2/transport/chttp2_plugin.cc',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc',
'src/core/ext/transport/chttp2/transport/chttp2_transport.cc',
'src/core/ext/transport/chttp2/transport/context_list.cc',
'src/core/ext/transport/chttp2/transport/flow_control.cc',

@ -24,8 +24,8 @@
#include "absl/time/time.h"
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/port.h>
#include <grpc/event_engine/slice_allocator.h>
// TODO(hork): Define the Endpoint::Write metrics collection system
namespace grpc_event_engine {
@ -123,7 +123,7 @@ class EventEngine {
/// created when connections are established, and Endpoint operations are
/// gRPC's primary means of communication.
///
/// Endpoints must use the provided SliceAllocator for all data buffer memory
/// Endpoints must use the provided MemoryAllocator for all data buffer memory
/// allocations. gRPC allows applications to set memory constraints per
/// Channel or Server, and the implementation depends on all dynamic memory
/// allocation being handled by the quota system.
@ -192,7 +192,7 @@ class EventEngine {
public:
/// Called when the listener has accepted a new client connection.
using AcceptCallback = std::function<void(
std::unique_ptr<Endpoint>, const SliceAllocator& slice_allocator)>;
std::unique_ptr<Endpoint>, MemoryAllocator memory_allocator)>;
virtual ~Listener() = default;
/// Bind an address/port to this Listener.
///
@ -215,13 +215,13 @@ class EventEngine {
/// exactly once, when the Listener is shut down. The status passed to it will
/// indicate if there was a problem during shutdown.
///
/// The provided \a SliceAllocatorFactory is used to create \a SliceAllocators
/// for Endpoint construction.
/// The provided \a MemoryAllocatorFactory is used to create \a
/// MemoryAllocators for Endpoint construction.
virtual absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback on_accept,
std::function<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<SliceAllocatorFactory> slice_allocator_factory) = 0;
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) = 0;
/// Creates a client network connection to a remote network listener.
///
/// May return an error status immediately if there was a failure in the
@ -230,15 +230,15 @@ class EventEngine {
/// expected that the \a on_connect callback will be asynchronously executed
/// exactly once by the EventEngine.
///
/// Implementation Note: it is important that the \a slice_allocator be used
/// Implementation Note: it is important that the \a memory_allocator be used
/// for all read/write buffer allocations in the EventEngine implementation.
/// This allows gRPC's \a ResourceQuota system to monitor and control memory
/// usage with graceful degradation mechanisms. Please see the \a
/// SliceAllocator API for more information.
/// MemoryAllocator API for more information.
virtual absl::Status Connect(OnConnectCallback on_connect,
const ResolvedAddress& addr,
const EndpointConfig& args,
std::unique_ptr<SliceAllocator> slice_allocator,
MemoryAllocator memory_allocator,
absl::Time deadline) = 0;
/// Provides asynchronous resolution.

@ -0,0 +1,98 @@
// Copyright 2021 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_EVENT_ENGINE_INTERNAL_MEMORY_ALLOCATOR_IMPL_H
#define GRPC_EVENT_ENGINE_INTERNAL_MEMORY_ALLOCATOR_IMPL_H
#include <grpc/impl/codegen/port_platform.h>
#include <algorithm>
#include <memory>
#include <type_traits>
#include <vector>
#include <grpc/slice.h>
// forward-declaring an internal struct, not used publicly.
struct grpc_slice_buffer;
namespace grpc_event_engine {
namespace experimental {
/// Reservation request - how much memory do we want to allocate?
class MemoryRequest {
public:
/// Request a fixed amount of memory.
// NOLINTNEXTLINE(google-explicit-constructor)
MemoryRequest(size_t n) : min_(n), max_(n) {}
/// Request a range of memory.
/// Requires: \a min <= \a max.
/// Requires: \a max <= max_size()
MemoryRequest(size_t min, size_t max) : min_(min), max_(max) {}
/// Maximum allowable request size - hard coded to 1GB.
static constexpr size_t max_allowed_size() { return 1024 * 1024 * 1024; }
/// Increase the size by \a amount.
/// Undefined behavior if min() + amount or max() + amount overflows.
MemoryRequest Increase(size_t amount) const {
return MemoryRequest(min_ + amount, max_ + amount);
}
size_t min() const { return min_; }
size_t max() const { return max_; }
private:
size_t min_;
size_t max_;
};
namespace internal {
/// Underlying memory allocation interface.
/// This is an internal interface, not intended to be used by users.
/// Its interface is subject to change at any time.
class MemoryAllocatorImpl
: public std::enable_shared_from_this<MemoryAllocatorImpl> {
public:
MemoryAllocatorImpl() {}
virtual ~MemoryAllocatorImpl() {}
MemoryAllocatorImpl(const MemoryAllocatorImpl&) = delete;
MemoryAllocatorImpl& operator=(const MemoryAllocatorImpl&) = delete;
/// Reserve bytes from the quota.
/// If we enter overcommit, reclamation will begin concurrently.
/// Returns the number of bytes reserved.
/// If MemoryRequest is invalid, this function will abort.
/// If MemoryRequest is valid, this function is infallible, and will always
/// succeed at reserving the some number of bytes between request.min() and
/// request.max() inclusively.
virtual size_t Reserve(MemoryRequest request) = 0;
/// Release some bytes that were previously reserved.
/// If more bytes are released than were reserved, we will have undefined
/// behavior.
virtual void Release(size_t n) = 0;
/// Shutdown this allocator.
/// Further usage of Reserve() is undefined behavior.
virtual void Shutdown() = 0;
};
} // namespace internal
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_EVENT_ENGINE_INTERNAL_MEMORY_ALLOCATOR_IMPL_H

@ -0,0 +1,210 @@
// Copyright 2021 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_EVENT_ENGINE_MEMORY_ALLOCATOR_H
#define GRPC_EVENT_ENGINE_MEMORY_ALLOCATOR_H
#include <grpc/impl/codegen/port_platform.h>
#include <algorithm>
#include <memory>
#include <type_traits>
#include <vector>
#include <grpc/event_engine/internal/memory_allocator_impl.h>
#include <grpc/slice.h>
// forward-declaring an internal struct, not used publicly.
struct grpc_slice_buffer;
namespace grpc_event_engine {
namespace experimental {
// TODO(nnoble): needs implementation
class SliceBuffer {
public:
SliceBuffer() { abort(); }
explicit SliceBuffer(grpc_slice_buffer*) { abort(); }
grpc_slice_buffer* RawSliceBuffer() { return slice_buffer_; }
private:
grpc_slice_buffer* slice_buffer_;
};
class MemoryAllocator {
public:
/// Construct a MemoryAllocator given an internal::MemoryAllocatorImpl
/// implementation. The constructed MemoryAllocator will call
/// MemoryAllocatorImpl::Shutdown() upon destruction.
explicit MemoryAllocator(
std::shared_ptr<internal::MemoryAllocatorImpl> allocator)
: allocator_(std::move(allocator)) {}
~MemoryAllocator() {
if (allocator_ != nullptr) allocator_->Shutdown();
}
MemoryAllocator(const MemoryAllocator&) = delete;
MemoryAllocator& operator=(const MemoryAllocator&) = delete;
MemoryAllocator(MemoryAllocator&&) = default;
MemoryAllocator& operator=(MemoryAllocator&&) = default;
/// Reserve bytes from the quota.
/// If we enter overcommit, reclamation will begin concurrently.
/// Returns the number of bytes reserved.
size_t Reserve(MemoryRequest request) { return allocator_->Reserve(request); }
/// Release some bytes that were previously reserved.
void Release(size_t n) { return allocator_->Release(n); }
/// Return a pointer to the underlying implementation.
/// Note that the interface of said implementatoin is unstable and likely to
/// change at any time.
internal::MemoryAllocatorImpl* get_internal_impl_ptr() {
return allocator_.get();
}
//
// The remainder of this type are helper functions implemented in terms of
// Reserve/Release.
//
/// An automatic releasing reservation of memory.
class Reservation {
public:
Reservation() = default;
Reservation(const Reservation&) = delete;
Reservation& operator=(const Reservation&) = delete;
Reservation(Reservation&&) = default;
Reservation& operator=(Reservation&&) = default;
~Reservation() {
if (allocator_ != nullptr) allocator_->Release(size_);
}
private:
friend class MemoryAllocator;
Reservation(std::shared_ptr<internal::MemoryAllocatorImpl> allocator,
size_t size)
: allocator_(std::move(allocator)), size_(size) {}
std::shared_ptr<internal::MemoryAllocatorImpl> allocator_;
size_t size_ = 0;
};
/// Reserve bytes from the quota and automatically release them when
/// Reservation is destroyed.
Reservation MakeReservation(MemoryRequest request) {
return Reservation(allocator_, Reserve(request));
}
/// Allocate a new object of type T, with constructor arguments.
/// The returned type is wrapped, and upon destruction the reserved memory
/// will be released to the allocator automatically. As such, T must have a
/// virtual destructor so we can insert the necessary hook.
template <typename T, typename... Args>
typename std::enable_if<std::has_virtual_destructor<T>::value, T*>::type New(
Args&&... args) {
// Wrap T such that when it's destroyed, we can release memory back to the
// allocator.
class Wrapper final : public T {
public:
explicit Wrapper(std::shared_ptr<internal::MemoryAllocatorImpl> allocator,
Args&&... args)
: T(std::forward<Args>(args)...), allocator_(std::move(allocator)) {}
~Wrapper() override { allocator_->Release(sizeof(*this)); }
private:
const std::shared_ptr<internal::MemoryAllocatorImpl> allocator_;
};
Reserve(sizeof(Wrapper));
return new Wrapper(allocator_, std::forward<Args>(args)...);
}
/// Construct a unique_ptr immediately.
template <typename T, typename... Args>
std::unique_ptr<T> MakeUnique(Args&&... args) {
return std::unique_ptr<T>(New<T>(std::forward<Args>(args)...));
}
/// Allocate a slice, using MemoryRequest to size the number of returned
/// bytes. For a variable length request, check the returned slice length to
/// verify how much memory was allocated. Takes care of reserving memory for
/// any relevant control structures also.
grpc_slice MakeSlice(MemoryRequest request);
/// A C++ allocator for containers of T.
template <typename T>
class Container {
public:
using value_type = T;
/// Construct the allocator: \a underlying_allocator is borrowed, and must
/// outlive this object.
explicit Container(MemoryAllocator* underlying_allocator)
: underlying_allocator_(underlying_allocator) {}
template <typename U>
explicit Container(const Container<U>& other)
: underlying_allocator_(other.underlying_allocator()) {}
MemoryAllocator* underlying_allocator() const {
return underlying_allocator_;
}
T* allocate(size_t n) {
underlying_allocator_->Reserve(n * sizeof(T));
return static_cast<T*>(::operator new(n * sizeof(T)));
}
void deallocate(T* p, size_t n) {
::operator delete(p);
underlying_allocator_->Release(n * sizeof(T));
}
private:
MemoryAllocator* underlying_allocator_;
};
protected:
const std::shared_ptr<internal::MemoryAllocatorImpl>& allocator() {
return allocator_;
}
private:
std::shared_ptr<internal::MemoryAllocatorImpl> allocator_;
};
// Wrapper type around std::vector to make initialization against a
// MemoryAllocator based container allocator easy.
template <typename T>
class Vector : public std::vector<T, MemoryAllocator::Container<T>> {
public:
explicit Vector(MemoryAllocator* allocator)
: std::vector<T, MemoryAllocator::Container<T>>(
MemoryAllocator::Container<T>(allocator)) {}
};
class MemoryAllocatorFactory {
public:
virtual ~MemoryAllocatorFactory() = default;
/// On Endpoint creation, call \a CreateMemoryAllocator to create a new
/// allocator for the endpoint.
/// Typically we'll want to:
/// auto allocator = factory->CreateMemoryAllocator();
/// auto* endpoint = allocator->New<MyEndpoint>(std::move(allocator), ...);
virtual MemoryAllocator CreateMemoryAllocator() = 0;
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_EVENT_ENGINE_MEMORY_ALLOCATOR_H

@ -1,71 +0,0 @@
// Copyright 2021 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_EVENT_ENGINE_SLICE_ALLOCATOR_H
#define GRPC_EVENT_ENGINE_SLICE_ALLOCATOR_H
#include <grpc/support/port_platform.h>
#include <functional>
#include "absl/status/status.h"
// forward-declaring an internal struct, not used publicly.
struct grpc_resource_quota;
struct grpc_resource_user;
struct grpc_slice_buffer;
namespace grpc_event_engine {
namespace experimental {
// TODO(nnoble): needs implementation
class SliceBuffer {
public:
SliceBuffer() { abort(); }
explicit SliceBuffer(grpc_slice_buffer*) { abort(); }
grpc_slice_buffer* RawSliceBuffer() { return slice_buffer_; }
private:
grpc_slice_buffer* slice_buffer_;
};
class SliceAllocator {
public:
using AllocateCallback = std::function<void(absl::Status)>;
virtual ~SliceAllocator() = default;
/// Requests \a size bytes from gRPC, and populates \a dest with the allocated
/// slices. Ownership of the \a SliceBuffer is not transferred.
///
/// gRPC provides a ResourceQuota system to cap the amount of memory used by
/// the library. When a memory limit has been reached, slice allocation is
/// interrupted to attempt to reclaim memory from participating gRPC
/// internals. When there is sufficient memory available, slice allocation
/// proceeds as normal.
virtual absl::Status Allocate(size_t size, SliceBuffer* dest,
SliceAllocator::AllocateCallback cb) = 0;
};
class SliceAllocatorFactory {
public:
virtual ~SliceAllocatorFactory() = default;
/// On Endpoint creation, call \a CreateSliceAllocator with the name of the
/// endpoint peer (a URI string, most likely).
virtual std::unique_ptr<SliceAllocator> CreateSliceAllocator(
absl::string_view peer_name) = 0;
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_EVENT_ENGINE_SLICE_ALLOCATOR_H

5
package.xml generated

@ -35,8 +35,9 @@
<file baseinstalldir="/" name="include/grpc/compression.h" role="src" />
<file baseinstalldir="/" name="include/grpc/event_engine/endpoint_config.h" role="src" />
<file baseinstalldir="/" name="include/grpc/event_engine/event_engine.h" role="src" />
<file baseinstalldir="/" name="include/grpc/event_engine/internal/memory_allocator_impl.h" role="src" />
<file baseinstalldir="/" name="include/grpc/event_engine/memory_allocator.h" role="src" />
<file baseinstalldir="/" name="include/grpc/event_engine/port.h" role="src" />
<file baseinstalldir="/" name="include/grpc/event_engine/slice_allocator.h" role="src" />
<file baseinstalldir="/" name="include/grpc/fork.h" role="src" />
<file baseinstalldir="/" name="include/grpc/grpc.h" role="src" />
<file baseinstalldir="/" name="include/grpc/grpc_posix.h" role="src" />
@ -241,8 +242,6 @@
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/bin_encoder.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/bin_encoder.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_plugin.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_transport.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_transport.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/context_list.cc" role="src" />

@ -1,67 +0,0 @@
// Copyright 2021 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h"
#include <functional>
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include <grpc/event_engine/slice_allocator.h>
#include "src/core/lib/iomgr/resource_quota.h"
namespace grpc_event_engine {
namespace experimental {
Chttp2SliceAllocator::Chttp2SliceAllocator(grpc_resource_user* user)
: resource_user_(user) {}
Chttp2SliceAllocator::~Chttp2SliceAllocator() {
if (resource_user_ != nullptr) {
grpc_resource_user_unref(resource_user_);
}
}
absl::Status Chttp2SliceAllocator::Allocate(
size_t size, SliceBuffer* dest, SliceAllocator::AllocateCallback cb) {
// TODO(hork): merge the implementation from the uv-ee branch.
(void)size;
(void)dest;
(void)cb;
return absl::OkStatus();
}
Chttp2SliceAllocatorFactory::Chttp2SliceAllocatorFactory(
grpc_resource_quota* quota)
: resource_quota_(quota) {
grpc_resource_quota_ref_internal(resource_quota_);
}
Chttp2SliceAllocatorFactory::~Chttp2SliceAllocatorFactory() {
if (resource_quota_ != nullptr) {
grpc_resource_quota_unref_internal(resource_quota_);
}
}
std::unique_ptr<SliceAllocator>
Chttp2SliceAllocatorFactory::CreateSliceAllocator(absl::string_view peer_name) {
return absl::make_unique<Chttp2SliceAllocator>(
grpc_resource_user_create(resource_quota_, peer_name));
}
} // namespace experimental
} // namespace grpc_event_engine

@ -1,74 +0,0 @@
// Copyright 2021 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_SLICE_ALLOCATOR_H
#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_SLICE_ALLOCATOR_H
#include <grpc/support/port_platform.h>
#include <functional>
#include "absl/status/status.h"
#include <grpc/event_engine/slice_allocator.h>
#include "src/core/lib/iomgr/resource_quota.h"
namespace grpc_event_engine {
namespace experimental {
class Chttp2SliceAllocator
: public grpc_event_engine::experimental::SliceAllocator {
public:
/// gRPC-internal constructor. Takes ownership of a resource_user ref from the
/// caller.
explicit Chttp2SliceAllocator(grpc_resource_user* user);
// Not copyable
Chttp2SliceAllocator(Chttp2SliceAllocator& other) = delete;
Chttp2SliceAllocator& operator=(const Chttp2SliceAllocator& other) = delete;
// Not Moveable
Chttp2SliceAllocator(Chttp2SliceAllocator&& other) = delete;
Chttp2SliceAllocator& operator=(Chttp2SliceAllocator&& other) = delete;
~Chttp2SliceAllocator() override;
absl::Status Allocate(size_t size, SliceBuffer* dest,
SliceAllocator::AllocateCallback cb) override;
private:
grpc_resource_user* resource_user_;
};
class Chttp2SliceAllocatorFactory
: public grpc_event_engine::experimental::SliceAllocatorFactory {
public:
// gRPC-internal constructor
explicit Chttp2SliceAllocatorFactory(grpc_resource_quota* quota);
// Not copyable
Chttp2SliceAllocatorFactory(Chttp2SliceAllocatorFactory& other) = delete;
Chttp2SliceAllocatorFactory& operator=(
const Chttp2SliceAllocatorFactory& other) = delete;
// Not Moveable
Chttp2SliceAllocatorFactory(Chttp2SliceAllocatorFactory&& other) = delete;
Chttp2SliceAllocatorFactory& operator=(Chttp2SliceAllocatorFactory&& other) =
delete;
~Chttp2SliceAllocatorFactory() override;
std::unique_ptr<SliceAllocator> CreateSliceAllocator(
absl::string_view peer_name) override;
private:
grpc_resource_quota* resource_quota_;
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_SLICE_ALLOCATOR_H

@ -0,0 +1,70 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <grpc/event_engine/memory_allocator.h>
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/slice/slice_refcount.h"
namespace grpc_event_engine {
namespace experimental {
namespace {
// Reference count for a slice allocated by MemoryAllocator::MakeSlice.
// Takes care of releasing memory back when the slice is destroyed.
class SliceRefCount {
public:
static void Destroy(void* p) {
auto* rc = static_cast<SliceRefCount*>(p);
rc->~SliceRefCount();
gpr_free(rc);
}
SliceRefCount(std::shared_ptr<internal::MemoryAllocatorImpl> allocator,
size_t size)
: base_(grpc_slice_refcount::Type::REGULAR, &refs_, Destroy, this,
&base_),
allocator_(std::move(allocator)),
size_(size) {
// Nothing to do here.
}
~SliceRefCount() { allocator_->Release(size_); }
grpc_slice_refcount* base_refcount() { return &base_; }
private:
grpc_slice_refcount base_;
grpc_core::RefCount refs_;
std::shared_ptr<internal::MemoryAllocatorImpl> allocator_;
size_t size_;
};
} // namespace
grpc_slice MemoryAllocator::MakeSlice(MemoryRequest request) {
auto size = Reserve(request.Increase(sizeof(SliceRefCount)));
void* p = gpr_malloc(size);
new (p) SliceRefCount(allocator_, size);
grpc_slice slice;
slice.refcount = static_cast<SliceRefCount*>(p)->base_refcount();
slice.data.refcounted.bytes =
static_cast<uint8_t*>(p) + sizeof(SliceRefCount);
slice.data.refcounted.length = size - sizeof(SliceRefCount);
return slice;
}
} // namespace experimental
} // namespace grpc_event_engine

@ -21,7 +21,6 @@
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/race.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/slice/slice_refcount.h"
namespace grpc_core {
@ -48,8 +47,9 @@ ReclamationSweep::~ReclamationSweep() {
const ReclaimerQueue::Index ReclaimerQueue::kInvalidIndex;
void ReclaimerQueue::Insert(RefCountedPtr<MemoryAllocator> allocator,
ReclamationFunction reclaimer, Index* index) {
void ReclaimerQueue::Insert(
std::shared_ptr<EventEngineMemoryAllocatorImpl> allocator,
ReclamationFunction reclaimer, Index* index) {
MutexLock lock(&mu_);
if (*index < entries_.size() && entries_[*index].allocator == allocator) {
entries_[*index].reclaimer.swap(reclaimer);
@ -69,8 +69,8 @@ void ReclaimerQueue::Insert(RefCountedPtr<MemoryAllocator> allocator,
queue_.push(*index);
}
ReclamationFunction ReclaimerQueue::Cancel(Index index,
MemoryAllocator* allocator) {
ReclamationFunction ReclaimerQueue::Cancel(
Index index, EventEngineMemoryAllocatorImpl* allocator) {
MutexLock lock(&mu_);
if (index >= entries_.size()) return nullptr;
Entry& entry = entries_[index];
@ -98,34 +98,36 @@ Poll<ReclamationFunction> ReclaimerQueue::PollNext() {
}
//
// MemoryAllocator
// GrpcMemoryAllocatorImpl
//
MemoryAllocator::MemoryAllocator(RefCountedPtr<MemoryQuota> memory_quota)
: InternallyRefCounted("MemoryAllocator"), memory_quota_(memory_quota) {
Reserve(sizeof(MemoryQuota));
GrpcMemoryAllocatorImpl::GrpcMemoryAllocatorImpl(
std::shared_ptr<BasicMemoryQuota> memory_quota)
: memory_quota_(memory_quota) {
memory_quota_->Take(taken_bytes_);
}
MemoryAllocator::~MemoryAllocator() {
GrpcMemoryAllocatorImpl::~GrpcMemoryAllocatorImpl() {
GPR_ASSERT(free_bytes_.load(std::memory_order_acquire) +
sizeof(MemoryQuota) ==
sizeof(GrpcMemoryAllocatorImpl) ==
taken_bytes_);
memory_quota_->Return(taken_bytes_);
}
void MemoryAllocator::Orphan() {
void GrpcMemoryAllocatorImpl::Shutdown() {
ReclamationFunction old_reclaimers[kNumReclamationPasses];
{
MutexLock lock(&memory_quota_mu_);
for (size_t i = 0; i < kNumReclamationPasses; i++) {
old_reclaimers[i] =
memory_quota_->reclaimers_[i].Cancel(reclamation_indices_[i], this);
}
MutexLock lock(&memory_quota_mu_);
for (size_t i = 0; i < kNumReclamationPasses; i++) {
old_reclaimers[i] =
memory_quota_->CancelReclaimer(i, reclamation_indices_[i], this);
}
InternallyRefCounted<MemoryAllocator>::Unref();
}
size_t MemoryAllocator::Reserve(MemoryRequest request) {
size_t GrpcMemoryAllocatorImpl::Reserve(MemoryRequest request) {
// Validate request - performed here so we don't bloat the generated code with
// inlined asserts.
GPR_ASSERT(request.min() <= request.max());
GPR_ASSERT(request.max() <= MemoryRequest::max_allowed_size());
while (true) {
// Attempt to reserve memory from our pool.
auto reservation = TryReserve(request);
@ -135,7 +137,8 @@ size_t MemoryAllocator::Reserve(MemoryRequest request) {
}
}
absl::optional<size_t> MemoryAllocator::TryReserve(MemoryRequest request) {
absl::optional<size_t> GrpcMemoryAllocatorImpl::TryReserve(
MemoryRequest request) {
// How much memory should we request? (see the scaling below)
size_t scaled_size_over_min = request.max() - request.min();
// Scale the request down according to memory pressure if we have that
@ -175,13 +178,12 @@ absl::optional<size_t> MemoryAllocator::TryReserve(MemoryRequest request) {
}
}
void MemoryAllocator::Replenish() {
void GrpcMemoryAllocatorImpl::Replenish() {
MutexLock lock(&memory_quota_mu_);
// Attempt a fairly low rate exponential growth request size, bounded between
// some reasonable limits declared at top of file.
auto amount = Clamp(taken_bytes_ / 3, kMinReplenishBytes, kMaxReplenishBytes);
// Take the requested amount from the quota.
gpr_log(GPR_DEBUG, "%p: take %" PRIdMAX " bytes from quota", this, amount);
memory_quota_->Take(amount);
// Record that we've taken it.
taken_bytes_ += amount;
@ -191,37 +193,35 @@ void MemoryAllocator::Replenish() {
MaybeRegisterReclaimerLocked();
}
void MemoryAllocator::MaybeRegisterReclaimer() {
void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimer() {
MutexLock lock(&memory_quota_mu_);
MaybeRegisterReclaimerLocked();
}
void MemoryAllocator::MaybeRegisterReclaimerLocked() {
void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimerLocked() {
// If the reclaimer is already registered, then there's nothing to do.
if (reclamation_indices_[0] != ReclaimerQueue::kInvalidIndex) return;
// Grab references to the things we'll need
auto self = Ref(DEBUG_LOCATION, "reclaimer");
gpr_log(GPR_DEBUG, "%p: register reclaimer; idx=%" PRIdMAX, this,
reclamation_indices_[0]);
memory_quota_->reclaimers_[0].Insert(
self,
auto self = shared_from_this();
memory_quota_->InsertReclaimer(
0, self,
[self](ReclamationSweep) {
MutexLock lock(&self->memory_quota_mu_);
auto* p = static_cast<GrpcMemoryAllocatorImpl*>(self.get());
MutexLock lock(&p->memory_quota_mu_);
// Figure out how many bytes we can return to the quota.
size_t return_bytes =
self->free_bytes_.exchange(0, std::memory_order_acq_rel);
gpr_log(GPR_DEBUG, "%p: sweep reclaimer - return %" PRIdMAX, self.get(),
return_bytes);
p->free_bytes_.exchange(0, std::memory_order_acq_rel);
if (return_bytes == 0) return;
// Subtract that from our outstanding balance.
self->taken_bytes_ -= return_bytes;
p->taken_bytes_ -= return_bytes;
// And return them to the quota.
self->memory_quota_->Return(return_bytes);
p->memory_quota_->Return(return_bytes);
},
&reclamation_indices_[0]);
}
void MemoryAllocator::Rebind(RefCountedPtr<MemoryQuota> memory_quota) {
void GrpcMemoryAllocatorImpl::Rebind(
std::shared_ptr<BasicMemoryQuota> memory_quota) {
MutexLock lock(&memory_quota_mu_);
if (memory_quota_ == memory_quota) return;
// Return memory to the original memory quota.
@ -230,7 +230,7 @@ void MemoryAllocator::Rebind(RefCountedPtr<MemoryQuota> memory_quota) {
ReclamationFunction reclaimers[kNumReclamationPasses];
for (size_t i = 0; i < kNumReclamationPasses; i++) {
reclaimers[i] =
memory_quota_->reclaimers_[i].Cancel(reclamation_indices_[i], this);
memory_quota_->CancelReclaimer(i, reclamation_indices_[i], this);
}
// Switch to the new memory quota, leaving the old one in memory_quota so that
// when we unref it, we are outside of lock.
@ -243,73 +243,38 @@ void MemoryAllocator::Rebind(RefCountedPtr<MemoryQuota> memory_quota) {
// Reinsert active reclaimers.
for (size_t i = 0; i < kNumReclamationPasses; i++) {
if (reclaimers[i] == nullptr) continue;
memory_quota_->reclaimers_[i].Insert(Ref(DEBUG_LOCATION, "rebind"),
std::move(reclaimers[i]),
&reclamation_indices_[i]);
memory_quota_->InsertReclaimer(i, shared_from_this(),
std::move(reclaimers[i]),
&reclamation_indices_[i]);
}
}
void MemoryAllocator::PostReclaimer(ReclamationPass pass,
ReclamationFunction fn) {
void GrpcMemoryAllocatorImpl::PostReclaimer(ReclamationPass pass,
ReclamationFunction fn) {
MutexLock lock(&memory_quota_mu_);
auto pass_num = static_cast<int>(pass);
memory_quota_->reclaimers_[pass_num].Insert(
Ref(DEBUG_LOCATION, "post_reclaimer"), std::move(fn),
&reclamation_indices_[pass_num]);
auto pass_num = static_cast<size_t>(pass);
memory_quota_->InsertReclaimer(pass_num, shared_from_this(), std::move(fn),
&reclamation_indices_[pass_num]);
}
namespace {
// Reference count for a slice allocated by MemoryAllocator::MakeSlice.
// Takes care of releasing memory back when the slice is destroyed.
class SliceRefCount {
public:
static void Destroy(void* p) {
auto* rc = static_cast<SliceRefCount*>(p);
rc->~SliceRefCount();
gpr_free(rc);
}
SliceRefCount(RefCountedPtr<MemoryAllocator> allocator, size_t size)
: base_(grpc_slice_refcount::Type::REGULAR, &refs_, Destroy, this,
&base_),
allocator_(std::move(allocator)),
size_(size) {
// Nothing to do here.
}
~SliceRefCount() { allocator_->Release(size_); }
grpc_slice_refcount* base_refcount() { return &base_; }
private:
grpc_slice_refcount base_;
RefCount refs_;
RefCountedPtr<MemoryAllocator> allocator_;
size_t size_;
};
//
// MemoryOwner
//
} // namespace
grpc_slice MemoryAllocator::MakeSlice(MemoryRequest request) {
auto size = Reserve(request.Increase(sizeof(SliceRefCount)));
void* p = gpr_malloc(size);
new (p) SliceRefCount(Ref(DEBUG_LOCATION, "slice"), size);
grpc_slice slice;
slice.refcount = static_cast<SliceRefCount*>(p)->base_refcount();
slice.data.refcounted.bytes =
static_cast<uint8_t*>(p) + sizeof(SliceRefCount);
slice.data.refcounted.length = size - sizeof(SliceRefCount);
return slice;
void MemoryOwner::Rebind(MemoryQuota* quota) {
static_cast<GrpcMemoryAllocatorImpl*>(allocator_.get_internal_impl_ptr())
->Rebind(quota->memory_quota_);
}
//
// MemoryQuota
// BasicMemoryQuota
//
class MemoryQuota::WaitForSweepPromise {
class BasicMemoryQuota::WaitForSweepPromise {
public:
WaitForSweepPromise(WeakRefCountedPtr<MemoryQuota> memory_quota,
WaitForSweepPromise(std::shared_ptr<BasicMemoryQuota> memory_quota,
uint64_t token)
: memory_quota_(memory_quota), token_(token) {}
: memory_quota_(std::move(memory_quota)), token_(token) {}
struct Empty {};
Poll<Empty> operator()() {
@ -322,12 +287,12 @@ class MemoryQuota::WaitForSweepPromise {
}
private:
WeakRefCountedPtr<MemoryQuota> memory_quota_;
std::shared_ptr<BasicMemoryQuota> memory_quota_;
uint64_t token_;
};
MemoryQuota::MemoryQuota() : DualRefCounted("MemoryQuota") {
auto self = WeakRef();
void BasicMemoryQuota::Start() {
auto self = shared_from_this();
// Reclamation loop:
// basically, wait until we are in overcommit (free_bytes_ < 0), and then:
@ -373,7 +338,9 @@ MemoryQuota::MemoryQuota() : DualRefCounted("MemoryQuota") {
});
}
void MemoryQuota::SetSize(size_t new_size) {
void BasicMemoryQuota::Stop() { reclaimer_activity_.reset(); }
void BasicMemoryQuota::SetSize(size_t new_size) {
size_t old_size = quota_size_.exchange(new_size, std::memory_order_relaxed);
if (old_size < new_size) {
// We're growing the quota.
@ -384,7 +351,7 @@ void MemoryQuota::SetSize(size_t new_size) {
}
}
void MemoryQuota::Take(size_t amount) {
void BasicMemoryQuota::Take(size_t amount) {
// If there's a request for nothing, then do nothing!
if (amount == 0) return;
GPR_DEBUG_ASSERT(amount <= std::numeric_limits<intptr_t>::max());
@ -396,9 +363,7 @@ void MemoryQuota::Take(size_t amount) {
}
}
void MemoryQuota::Orphan() { reclaimer_activity_.reset(); }
void MemoryQuota::FinishReclamation(uint64_t token) {
void BasicMemoryQuota::FinishReclamation(uint64_t token) {
uint64_t current = reclamation_counter_.load(std::memory_order_relaxed);
if (current != token) return;
if (reclamation_counter_.compare_exchange_strong(current, current + 1,
@ -408,11 +373,11 @@ void MemoryQuota::FinishReclamation(uint64_t token) {
}
}
void MemoryQuota::Return(size_t amount) {
void BasicMemoryQuota::Return(size_t amount) {
free_bytes_.fetch_add(amount, std::memory_order_relaxed);
}
size_t MemoryQuota::InstantaneousPressure() const {
size_t BasicMemoryQuota::InstantaneousPressure() const {
double free = free_bytes_.load();
if (free < 0) free = 0;
double size = quota_size_.load();

@ -24,21 +24,28 @@
#include <queue>
#include <vector>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/slice.h>
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
namespace grpc_core {
class Reclaimer;
class MemoryAllocator;
class BasicMemoryQuota;
class MemoryQuota;
using grpc_event_engine::experimental::MemoryRequest;
// Pull in impl under a different name to keep the gRPC/EventEngine separation
// clear.
using EventEngineMemoryAllocatorImpl =
grpc_event_engine::experimental::internal::MemoryAllocatorImpl;
using grpc_event_engine::experimental::MemoryAllocator;
template <typename T>
using Vector = grpc_event_engine::experimental::Vector<T>;
// Reclamation passes.
// When memory is tight, we start trying to claim some back from memory
// reclaimers. We do this in multiple passes: if there is a less destructive
@ -65,34 +72,12 @@ enum class ReclamationPass {
};
static constexpr size_t kNumReclamationPasses = 4;
// Reservation request - how much memory do we want to allocate?
class MemoryRequest {
public:
// Request a fixed amount of memory.
// NOLINTNEXTLINE(google-explicit-constructor)
MemoryRequest(size_t n) : min_(n), max_(n) {}
// Request a range of memory.
MemoryRequest(size_t min, size_t max) : min_(std::min(min, max)), max_(max) {}
// Increase the size by amount
MemoryRequest Increase(size_t amount) const {
return MemoryRequest(min_ + amount, max_ + amount);
}
size_t min() const { return min_; }
size_t max() const { return max_; }
private:
size_t min_;
size_t max_;
};
// For each reclamation function run we construct a ReclamationSweep.
// When this object is finally destroyed (it may be moved several times first),
// then that reclamation is complete and we may continue the reclamation loop.
class ReclamationSweep {
public:
ReclamationSweep(WeakRefCountedPtr<MemoryQuota> memory_quota,
ReclamationSweep(std::shared_ptr<BasicMemoryQuota> memory_quota,
uint64_t sweep_token)
: memory_quota_(std::move(memory_quota)), sweep_token_(sweep_token) {}
~ReclamationSweep();
@ -110,7 +95,7 @@ class ReclamationSweep {
bool IsSufficient() const;
private:
WeakRefCountedPtr<MemoryQuota> memory_quota_;
std::shared_ptr<BasicMemoryQuota> memory_quota_;
uint64_t sweep_token_;
};
@ -130,13 +115,14 @@ class ReclaimerQueue {
// then *index is set to the index of the newly queued entry.
// Associates the reclamation function with an allocator, and keeps that
// allocator alive, so that we can use the pointer as an ABA guard.
void Insert(RefCountedPtr<MemoryAllocator> allocator,
void Insert(std::shared_ptr<EventEngineMemoryAllocatorImpl> allocator,
ReclamationFunction reclaimer, Index* index)
ABSL_LOCKS_EXCLUDED(mu_);
// Cancel a reclamation function - returns the function if cancelled
// successfully, or nullptr if the reclamation was already begun and could not
// be cancelled. allocator must be the same as was passed to Insert.
ReclamationFunction Cancel(Index index, MemoryAllocator* allocator)
ReclamationFunction Cancel(Index index,
EventEngineMemoryAllocatorImpl* allocator)
ABSL_LOCKS_EXCLUDED(mu_);
// Poll to see if an entry is available: returns Pending if not, or the
// removed reclamation function if so.
@ -158,11 +144,11 @@ class ReclaimerQueue {
private:
// One entry in the reclaimer queue
struct Entry {
Entry(RefCountedPtr<MemoryAllocator> allocator,
Entry(std::shared_ptr<EventEngineMemoryAllocatorImpl> allocator,
ReclamationFunction reclaimer)
: allocator(allocator), reclaimer(reclaimer) {}
: allocator(std::move(allocator)), reclaimer(reclaimer) {}
// The allocator we'd be reclaiming for.
RefCountedPtr<MemoryAllocator> allocator;
std::shared_ptr<EventEngineMemoryAllocatorImpl> allocator;
// The reclamation function to call.
ReclamationFunction reclaimer;
};
@ -180,27 +166,89 @@ class ReclaimerQueue {
Waker waker_ ABSL_GUARDED_BY(mu_);
};
// MemoryAllocator grants the owner the ability to allocate memory from an
// underlying resource quota.
class MemoryAllocator final : public InternallyRefCounted<MemoryAllocator> {
class BasicMemoryQuota final
: public std::enable_shared_from_this<BasicMemoryQuota> {
public:
explicit MemoryAllocator(RefCountedPtr<MemoryQuota> memory_quota);
~MemoryAllocator() override;
// Start the reclamation activity.
void Start();
// Stop the reclamation activity.
// Until reclamation is stopped, it's possible that circular references to the
// BasicMemoryQuota remain. i.e. to guarantee deletion, a singular owning
// object should call BasicMemoryQuota::Stop().
void Stop();
// Resize the quota to new_size.
void SetSize(size_t new_size);
// Forcefully take some memory from the quota, potentially entering
// overcommit.
void Take(size_t amount);
// Finish reclamation pass.
void FinishReclamation(uint64_t token);
// Return some memory to the quota.
void Return(size_t amount);
// Instantaneous memory pressure approximation.
size_t InstantaneousPressure() const;
// Cancel a reclaimer
ReclamationFunction CancelReclaimer(
size_t reclaimer, typename ReclaimerQueue::Index index,
EventEngineMemoryAllocatorImpl* allocator) {
return reclaimers_[reclaimer].Cancel(index, allocator);
}
// Insert a reclaimer
void InsertReclaimer(
size_t reclaimer,
std::shared_ptr<EventEngineMemoryAllocatorImpl> allocator,
ReclamationFunction fn, ReclaimerQueue::Index* index) {
reclaimers_[reclaimer].Insert(std::move(allocator), std::move(fn), index);
}
private:
friend class ReclamationSweep;
class WaitForSweepPromise;
static constexpr intptr_t kInitialSize = std::numeric_limits<intptr_t>::max();
// The amount of memory that's free in this quota.
// We use intptr_t as a reasonable proxy for ssize_t that's portable.
// We allow arbitrary overcommit and so this must allow negative values.
std::atomic<intptr_t> free_bytes_{kInitialSize};
// The total number of bytes in this quota.
std::atomic<size_t> quota_size_{kInitialSize};
// Reclaimer queues.
ReclaimerQueue reclaimers_[kNumReclamationPasses];
// The reclaimer activity consumes reclaimers whenever we are in overcommit to
// try and get back under memory limits.
ActivityPtr reclaimer_activity_;
// Each time we do a reclamation sweep, we increment this counter and give it
// to the sweep in question. In this way, should we choose to cancel a sweep
// we can do so and not get confused when the sweep reports back that it's
// completed.
// We also increment this counter on completion of a sweep, as an indicator
// that the wait has ended.
std::atomic<uint64_t> reclamation_counter_{0};
};
void Orphan() override;
// MemoryAllocatorImpl grants the owner the ability to allocate memory from an
// underlying resource quota.
class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl {
public:
explicit GrpcMemoryAllocatorImpl(
std::shared_ptr<BasicMemoryQuota> memory_quota);
~GrpcMemoryAllocatorImpl() override;
// Rebind - Swaps the underlying quota for this allocator, taking care to
// Rebind - Swaps the underlying quota for this allocator, taking care to
// make sure memory allocated is moved to allocations against the new quota.
void Rebind(RefCountedPtr<MemoryQuota> memory_quota)
void Rebind(std::shared_ptr<BasicMemoryQuota> memory_quota)
ABSL_LOCKS_EXCLUDED(memory_quota_mu_);
// Reserve bytes from the quota.
// If we enter overcommit, reclamation will begin concurrently.
// Returns the number of bytes reserved.
size_t Reserve(MemoryRequest request) ABSL_LOCKS_EXCLUDED(memory_quota_mu_);
size_t Reserve(MemoryRequest request) override;
// Release some bytes that were previously reserved.
void Release(size_t n) ABSL_LOCKS_EXCLUDED(memory_quota_mu_) {
void Release(size_t n) override {
// Add the released memory to our free bytes counter... if this increases
// from 0 to non-zero, then we have more to do, otherwise, we're actually
// done.
@ -208,102 +256,12 @@ class MemoryAllocator final : public InternallyRefCounted<MemoryAllocator> {
MaybeRegisterReclaimer();
}
// An automatic releasing reservation of memory.
class Reservation {
public:
Reservation() = default;
Reservation(const Reservation&) = delete;
Reservation& operator=(const Reservation&) = delete;
Reservation(Reservation&&) = default;
Reservation& operator=(Reservation&&) = default;
~Reservation() {
if (allocator_ != nullptr) allocator_->Release(size_);
}
private:
friend class MemoryAllocator;
Reservation(RefCountedPtr<MemoryAllocator> allocator, size_t size)
: allocator_(allocator), size_(size) {}
RefCountedPtr<MemoryAllocator> allocator_ = nullptr;
size_t size_ = 0;
};
// Reserve bytes from the quota and automatically release them when
// Reservation is destroyed.
Reservation MakeReservation(MemoryRequest request) {
return Reservation(Ref(DEBUG_LOCATION, "Reservation"), Reserve(request));
}
// Post a reclaimer for some reclamation pass.
// Post a reclamation function.
void PostReclaimer(ReclamationPass pass,
std::function<void(ReclamationSweep)>);
// Allocate a new object of type T, with constructor arguments.
// The returned type is wrapped, and upon destruction the reserved memory
// will be released to the allocator automatically. As such, T must have a
// virtual destructor so we can insert the necessary hook.
template <typename T, typename... Args>
absl::enable_if_t<std::has_virtual_destructor<T>::value, T*> New(
Args&&... args) ABSL_LOCKS_EXCLUDED(memory_quota_mu_) {
// Wrap T such that when it's destroyed, we can release memory back to the
// allocator.
class Wrapper final : public T {
public:
explicit Wrapper(RefCountedPtr<MemoryAllocator> allocator, Args&&... args)
: T(std::forward<Args>(args)...), allocator_(std::move(allocator)) {}
~Wrapper() override { allocator_->Release(sizeof(*this)); }
private:
const RefCountedPtr<MemoryAllocator> allocator_;
};
Reserve(sizeof(Wrapper));
return new Wrapper(Ref(DEBUG_LOCATION, "Wrapper"),
std::forward<Args>(args)...);
}
// Construct a unique ptr immediately.
template <typename T, typename... Args>
std::unique_ptr<T> MakeUnique(Args&&... args)
ABSL_LOCKS_EXCLUDED(memory_quota_mu_) {
return std::unique_ptr<T>(New<T>(std::forward<Args>(args)...));
}
// Allocate a slice, using MemoryRequest to size the number of returned bytes.
// For a variable length request, check the returned slice length to verify
// how much memory was allocated.
// Takes care of reserving memory for any relevant control structures also.
grpc_slice MakeSlice(MemoryRequest request);
// A C++ allocator for containers of T.
template <typename T>
class Container {
public:
// Construct the allocator: \a underlying_allocator is borrowed, and must
// outlive this object.
explicit Container(MemoryAllocator* underlying_allocator)
: underlying_allocator_(underlying_allocator) {}
MemoryAllocator* underlying_allocator() const {
return underlying_allocator_;
}
using value_type = T;
template <typename U>
explicit Container(const Container<U>& other)
: underlying_allocator_(other.underlying_allocator()) {}
T* allocate(size_t n) {
underlying_allocator_->Reserve(n * sizeof(T));
return static_cast<T*>(::operator new(n * sizeof(T)));
}
void deallocate(T* p, size_t n) {
::operator delete(p);
underlying_allocator_->Release(n * sizeof(T));
}
private:
MemoryAllocator* underlying_allocator_;
};
// Shutdown the allocator.
void Shutdown() override;
private:
// Primitive reservation function.
@ -325,9 +283,11 @@ class MemoryAllocator final : public InternallyRefCounted<MemoryAllocator> {
// Mutex guarding the backing resource quota.
Mutex memory_quota_mu_;
// Backing resource quota.
RefCountedPtr<MemoryQuota> memory_quota_ ABSL_GUARDED_BY(memory_quota_mu_);
std::shared_ptr<BasicMemoryQuota> memory_quota_
ABSL_GUARDED_BY(memory_quota_mu_);
// Amount of memory taken from the quota by this allocator.
size_t taken_bytes_ ABSL_GUARDED_BY(memory_quota_mu_) = 0;
size_t taken_bytes_ ABSL_GUARDED_BY(memory_quota_mu_) =
sizeof(GrpcMemoryAllocatorImpl);
// Indices into the various reclaimer queues, used so that we can cancel
// reclamation should we shutdown or get rebound.
ReclaimerQueue::Index
@ -337,67 +297,61 @@ class MemoryAllocator final : public InternallyRefCounted<MemoryAllocator> {
ReclaimerQueue::kInvalidIndex, ReclaimerQueue::kInvalidIndex};
};
// Wrapper type around std::vector to make initialization against a
// MemoryAllocator based container allocator easy.
template <typename T>
class Vector : public std::vector<T, MemoryAllocator::Container<T>> {
// MemoryOwner is an enhanced MemoryAllocator that can also reclaim memory, and
// be rebound to a different memory quota.
class MemoryOwner {
public:
explicit Vector(MemoryAllocator* allocator)
: std::vector<T, MemoryAllocator::Container<T>>(
MemoryAllocator::Container<T>(allocator)) {}
};
explicit MemoryOwner(std::shared_ptr<GrpcMemoryAllocatorImpl> allocator)
: allocator_(std::move(allocator)) {}
// MemoryQuota tracks the amount of memory available as part of a ResourceQuota.
class MemoryQuota final : public DualRefCounted<MemoryQuota> {
public:
MemoryQuota();
MemoryAllocator* allocator() { return &allocator_; }
OrphanablePtr<MemoryAllocator> MakeMemoryAllocator() {
return MakeOrphanable<MemoryAllocator>(
Ref(DEBUG_LOCATION, "MakeMemoryAllocator"));
// Post a reclaimer for some reclamation pass.
void PostReclaimer(ReclamationPass pass,
std::function<void(ReclamationSweep)> fn) {
static_cast<GrpcMemoryAllocatorImpl*>(allocator_.get_internal_impl_ptr())
->PostReclaimer(pass, std::move(fn));
}
// Resize the quota to new_size.
void SetSize(size_t new_size);
// Rebind to a different quota.
void Rebind(MemoryQuota* quota);
private:
friend class MemoryAllocator;
friend class ReclamationSweep;
class WaitForSweepPromise;
MemoryAllocator allocator_;
};
void Orphan() override;
// MemoryQuota tracks the amount of memory available as part of a ResourceQuota.
class MemoryQuota final
: public grpc_event_engine::experimental::MemoryAllocatorFactory {
public:
MemoryQuota() : memory_quota_(std::make_shared<BasicMemoryQuota>()) {
memory_quota_->Start();
}
~MemoryQuota() override {
if (memory_quota_ != nullptr) memory_quota_->Stop();
}
// Forcefully take some memory from the quota, potentially entering
// overcommit.
void Take(size_t amount);
// Finish reclamation pass.
void FinishReclamation(uint64_t token);
// Return some memory to the quota.
void Return(size_t amount);
// Instantaneous memory pressure approximation.
size_t InstantaneousPressure() const;
MemoryQuota(const MemoryQuota&) = delete;
MemoryQuota& operator=(const MemoryQuota&) = delete;
MemoryQuota(MemoryQuota&&) = default;
MemoryQuota& operator=(MemoryQuota&&) = default;
static constexpr intptr_t kInitialSize = std::numeric_limits<intptr_t>::max();
MemoryAllocator CreateMemoryAllocator() override {
auto impl = std::make_shared<GrpcMemoryAllocatorImpl>(memory_quota_);
return MemoryAllocator(std::move(impl));
}
// The amount of memory that's free in this quota.
// We use intptr_t as a reasonable proxy for ssize_t that's portable.
// We allow arbitrary overcommit and so this must allow negative values.
std::atomic<intptr_t> free_bytes_{kInitialSize};
// The total number of bytes in this quota.
std::atomic<size_t> quota_size_{kInitialSize};
MemoryOwner CreateMemoryOwner() {
auto impl = std::make_shared<GrpcMemoryAllocatorImpl>(memory_quota_);
return MemoryOwner(std::move(impl));
}
// Reclaimer queues.
ReclaimerQueue reclaimers_[kNumReclamationPasses];
// The reclaimer activity consumes reclaimers whenever we are in overcommit to
// try and get back under memory limits.
ActivityPtr reclaimer_activity_;
// Each time we do a reclamation sweep, we increment this counter and give it
// to the sweep in question. In this way, should we choose to cancel a sweep
// we can do so and not get confused when the sweep reports back that it's
// completed.
// We also increment this counter on completion of a sweep, as an indicator
// that the wait has ended.
std::atomic<uint64_t> reclamation_counter_{0};
// Resize the quota to new_size.
void SetSize(size_t new_size) { memory_quota_->SetSize(new_size); }
private:
friend class MemoryOwner;
std::shared_ptr<BasicMemoryQuota> memory_quota_;
};
} // namespace grpc_core

@ -19,7 +19,7 @@
namespace grpc_core {
ResourceQuota::ResourceQuota()
: memory_quota_(MakeRefCounted<MemoryQuota>()),
: memory_quota_(std::make_shared<MemoryQuota>()),
thread_quota_(MakeRefCounted<ThreadQuota>()) {}
ResourceQuota::~ResourceQuota() = default;

@ -30,16 +30,12 @@ class ResourceQuota : public RefCounted<ResourceQuota> {
ResourceQuota(const ResourceQuota&) = delete;
ResourceQuota& operator=(const ResourceQuota&) = delete;
const RefCountedPtr<MemoryQuota>& memory_quota() const {
return memory_quota_;
}
std::shared_ptr<MemoryQuota> memory_quota() { return memory_quota_; }
const RefCountedPtr<ThreadQuota>& thread_quota() const {
return thread_quota_;
}
const RefCountedPtr<ThreadQuota>& thread_quota() { return thread_quota_; }
private:
RefCountedPtr<MemoryQuota> memory_quota_;
std::shared_ptr<MemoryQuota> memory_quota_;
RefCountedPtr<ThreadQuota> thread_quota_;
};

@ -102,7 +102,6 @@ CORE_SOURCE_FILES = [
'src/core/ext/transport/chttp2/transport/bin_decoder.cc',
'src/core/ext/transport/chttp2/transport/bin_encoder.cc',
'src/core/ext/transport/chttp2/transport/chttp2_plugin.cc',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc',
'src/core/ext/transport/chttp2/transport/chttp2_transport.cc',
'src/core/ext/transport/chttp2/transport/context_list.cc',
'src/core/ext/transport/chttp2/transport/flow_control.cc',

@ -58,43 +58,41 @@ class Fuzzer {
ExecCtx::Get()->Flush();
break;
case memory_quota_fuzzer::Action::kCreateQuota:
memory_quotas_.emplace(action.quota(),
RefCountedPtr<MemoryQuota>(new MemoryQuota()));
memory_quotas_.emplace(action.quota(), MemoryQuota());
break;
case memory_quota_fuzzer::Action::kDeleteQuota:
memory_quotas_.erase(action.quota());
break;
case memory_quota_fuzzer::Action::kCreateAllocator:
WithQuota(action.quota(),
[this, action](RefCountedPtr<MemoryQuota> q) {
memory_allocators_.emplace(action.allocator(),
q->MakeMemoryAllocator());
});
WithQuota(action.quota(), [this, action](MemoryQuota* q) {
memory_allocators_.emplace(action.allocator(),
q->CreateMemoryOwner());
});
break;
case memory_quota_fuzzer::Action::kDeleteAllocator:
memory_allocators_.erase(action.allocator());
break;
case memory_quota_fuzzer::Action::kSetQuotaSize:
WithQuota(action.quota(), [action](RefCountedPtr<MemoryQuota> q) {
WithQuota(action.quota(), [action](MemoryQuota* q) {
q->SetSize(Clamp(action.set_quota_size(), uint64_t{0},
uint64_t{std::numeric_limits<ssize_t>::max()}));
});
break;
case memory_quota_fuzzer::Action::kRebindQuota:
WithQuota(action.quota(),
[this, action](RefCountedPtr<MemoryQuota> q) {
WithAllocator(action.allocator(),
[q](MemoryAllocator* a) { a->Rebind(q); });
});
WithQuota(action.quota(), [this, action](MemoryQuota* q) {
WithAllocator(action.allocator(),
[q](MemoryOwner* a) { a->Rebind(q); });
});
break;
case memory_quota_fuzzer::Action::kCreateAllocation: {
auto min = action.create_allocation().min();
auto max = action.create_allocation().max();
if (min > max) break;
if (max > MemoryRequest::max_allowed_size()) break;
MemoryRequest req(min, max);
WithAllocator(
action.allocator(), [this, action, req](MemoryAllocator* a) {
auto alloc = a->MakeReservation(req);
action.allocator(), [this, action, req](MemoryOwner* a) {
auto alloc = a->allocator()->MakeReservation(req);
allocations_.emplace(action.allocation(), std::move(alloc));
});
} break;
@ -125,7 +123,7 @@ class Fuzzer {
};
auto pass = MapReclamationPass(cfg.pass());
WithAllocator(action.allocator(),
[pass, reclaimer](MemoryAllocator* a) {
[pass, reclaimer](MemoryOwner* a) {
a->PostReclaimer(pass, reclaimer);
});
}
@ -140,18 +138,18 @@ class Fuzzer {
void WithQuota(int quota, F f) {
auto it = memory_quotas_.find(quota);
if (it == memory_quotas_.end()) return;
f(it->second);
f(&it->second);
}
template <typename F>
void WithAllocator(int allocator, F f) {
auto it = memory_allocators_.find(allocator);
if (it == memory_allocators_.end()) return;
f(it->second.get());
f(&it->second);
}
std::map<int, RefCountedPtr<MemoryQuota>> memory_quotas_;
std::map<int, OrphanablePtr<MemoryAllocator>> memory_allocators_;
std::map<int, MemoryQuota> memory_quotas_;
std::map<int, MemoryOwner> memory_allocators_;
std::map<int, MemoryAllocator::Reservation> allocations_;
};

@ -26,12 +26,12 @@ class StressTest {
// Create a stress test with some size.
StressTest(size_t num_quotas, size_t num_allocators) {
for (size_t i = 0; i < num_quotas; ++i) {
quotas_.emplace_back(new MemoryQuota());
quotas_.emplace_back();
}
std::random_device g;
std::uniform_int_distribution<size_t> dist(0, num_quotas - 1);
for (size_t i = 0; i < num_allocators; ++i) {
allocators_.emplace_back(quotas_[dist(g)]->MakeMemoryAllocator());
allocators_.emplace_back(quotas_[dist(g)].CreateMemoryOwner());
}
}
@ -40,7 +40,7 @@ class StressTest {
std::vector<std::thread> threads;
// A few threads constantly rebinding allocators to different quotas.
threads.reserve(2);
threads.reserve(2 + 2 + 3 * allocators_.size());
for (int i = 0; i < 2; i++) threads.push_back(Run(Rebinder));
// And another few threads constantly resizing quotas.
for (int i = 0; i < 2; i++) threads.push_back(Run(Resizer));
@ -49,13 +49,13 @@ class StressTest {
// that allocator. Whenever the first allocation is made, schedule a
// reclaimer for that pass.
for (size_t i = 0; i < allocators_.size(); i++) {
auto* allocator = allocators_[i].get();
auto* allocator = &allocators_[i];
for (ReclamationPass pass :
{ReclamationPass::kBenign, ReclamationPass::kIdle,
ReclamationPass::kDestructive}) {
threads.push_back(Run([allocator, pass](StatePtr st) mutable {
if (st->RememberReservation(
allocator->MakeReservation(st->RandomRequest()))) {
if (st->RememberReservation(allocator->allocator()->MakeReservation(
st->RandomRequest()))) {
allocator->PostReclaimer(
pass, [st](ReclamationSweep) { st->ForgetReservations(); });
}
@ -90,14 +90,14 @@ class StressTest {
// Choose a random quota, and return an owned pointer to it.
// Not thread-safe, only callable from the owning thread.
RefCountedPtr<MemoryQuota> RandomQuota() {
return test_->quotas_[quotas_distribution_(g_)];
MemoryQuota* RandomQuota() {
return &test_->quotas_[quotas_distribution_(g_)];
}
// Choose a random allocator, and return a borrowed pointer to it.
// Not thread-safe, only callable from the owning thread.
MemoryAllocator* RandomAllocator() {
return test_->allocators_[allocators_distribution_(g_)].get();
MemoryOwner* RandomAllocator() {
return &test_->allocators_[allocators_distribution_(g_)];
}
// Random memory request size - 1% of allocations are chosen to be variable
@ -164,14 +164,14 @@ class StressTest {
// Choose one allocator, one quota, rebind the allocator to the quota.
static void Rebinder(StatePtr st) {
MemoryAllocator* allocator = st->RandomAllocator();
RefCountedPtr<MemoryQuota> quota = st->RandomQuota();
allocator->Rebind(std::move(quota));
auto* allocator = st->RandomAllocator();
auto* quota = st->RandomQuota();
allocator->Rebind(quota);
}
// Choose one allocator, resize it to a randomly chosen size.
static void Resizer(StatePtr st) {
RefCountedPtr<MemoryQuota> quota = st->RandomQuota();
auto* quota = st->RandomQuota();
size_t size = st->RandomQuotaSize();
quota->SetSize(size);
}
@ -198,10 +198,10 @@ class StressTest {
// Memory quotas to test against. We build this up at construction time, but
// then don't resize, so we can load from it continuously from all of the
// threads.
std::vector<RefCountedPtr<MemoryQuota>> quotas_;
std::vector<MemoryQuota> quotas_;
// Memory allocators to test against. Similarly, built at construction time,
// and then the shape of this vector is not changed.
std::vector<OrphanablePtr<MemoryAllocator>> allocators_;
std::vector<MemoryOwner> allocators_;
};
} // namespace

@ -54,40 +54,38 @@ TEST(MemoryRequestTest, MinMax) {
// MemoryQuotaTest
//
TEST(MemoryQuotaTest, NoOp) {
RefCountedPtr<MemoryQuota> memory_quota = MakeRefCounted<MemoryQuota>();
}
TEST(MemoryQuotaTest, NoOp) { MemoryQuota(); }
TEST(MemoryQuotaTest, CreateAllocatorNoOp) {
RefCountedPtr<MemoryQuota> memory_quota = MakeRefCounted<MemoryQuota>();
auto memory_allocator = memory_quota->MakeMemoryAllocator();
MemoryQuota memory_quota;
auto memory_allocator = memory_quota.CreateMemoryAllocator();
}
TEST(MemoryQuotaTest, CreateObjectFromAllocator) {
RefCountedPtr<MemoryQuota> memory_quota = MakeRefCounted<MemoryQuota>();
auto memory_allocator = memory_quota->MakeMemoryAllocator();
auto object = memory_allocator->MakeUnique<Sized<4096>>();
MemoryQuota memory_quota;
auto memory_allocator = memory_quota.CreateMemoryAllocator();
auto object = memory_allocator.MakeUnique<Sized<4096>>();
}
TEST(MemoryQuotaTest, CreateSomeObjectsAndExpectReclamation) {
ExecCtx exec_ctx;
RefCountedPtr<MemoryQuota> memory_quota = MakeRefCounted<MemoryQuota>();
memory_quota->SetSize(4096);
auto memory_allocator = memory_quota->MakeMemoryAllocator();
auto object = memory_allocator->MakeUnique<Sized<2048>>();
MemoryQuota memory_quota;
memory_quota.SetSize(4096);
auto memory_allocator = memory_quota.CreateMemoryOwner();
auto object = memory_allocator.allocator()->MakeUnique<Sized<2048>>();
memory_allocator->PostReclaimer(
memory_allocator.PostReclaimer(
ReclamationPass::kDestructive,
[&object](ReclamationSweep) { object.reset(); });
auto object2 = memory_allocator->MakeUnique<Sized<2048>>();
auto object2 = memory_allocator.allocator()->MakeUnique<Sized<2048>>();
exec_ctx.Flush();
EXPECT_EQ(object.get(), nullptr);
memory_allocator->PostReclaimer(
memory_allocator.PostReclaimer(
ReclamationPass::kDestructive,
[&object2](ReclamationSweep) { object2.reset(); });
auto object3 = memory_allocator->MakeUnique<Sized<2048>>();
auto object3 = memory_allocator.allocator()->MakeUnique<Sized<2048>>();
exec_ctx.Flush();
EXPECT_EQ(object2.get(), nullptr);
}
@ -95,59 +93,59 @@ TEST(MemoryQuotaTest, CreateSomeObjectsAndExpectReclamation) {
TEST(MemoryQuotaTest, BasicRebind) {
ExecCtx exec_ctx;
RefCountedPtr<MemoryQuota> memory_quota = MakeRefCounted<MemoryQuota>();
memory_quota->SetSize(4096);
RefCountedPtr<MemoryQuota> memory_quota2 = MakeRefCounted<MemoryQuota>();
memory_quota2->SetSize(4096);
auto memory_allocator = memory_quota2->MakeMemoryAllocator();
auto object = memory_allocator->MakeUnique<Sized<2048>>();
memory_allocator->Rebind(memory_quota);
auto memory_allocator2 = memory_quota2->MakeMemoryAllocator();
memory_allocator2->PostReclaimer(ReclamationPass::kDestructive,
[](ReclamationSweep) {
// Taken memory should be reassigned to
// memory_quota, so this should never be
// reached.
abort();
});
memory_allocator->PostReclaimer(ReclamationPass::kDestructive,
[&object](ReclamationSweep) {
// The new memory allocator should reclaim
// the object allocated against the previous
// quota because that's now part of this
// quota.
object.reset();
MemoryQuota memory_quota;
memory_quota.SetSize(4096);
MemoryQuota memory_quota2;
memory_quota2.SetSize(4096);
auto memory_allocator = memory_quota2.CreateMemoryOwner();
auto object = memory_allocator.allocator()->MakeUnique<Sized<2048>>();
memory_allocator.Rebind(&memory_quota);
auto memory_allocator2 = memory_quota2.CreateMemoryOwner();
memory_allocator2.PostReclaimer(ReclamationPass::kDestructive,
[](ReclamationSweep) {
// Taken memory should be reassigned to
// memory_quota, so this should never be
// reached.
abort();
});
auto object2 = memory_allocator->MakeUnique<Sized<2048>>();
memory_allocator.PostReclaimer(ReclamationPass::kDestructive,
[&object](ReclamationSweep) {
// The new memory allocator should reclaim
// the object allocated against the previous
// quota because that's now part of this
// quota.
object.reset();
});
auto object2 = memory_allocator.allocator()->MakeUnique<Sized<2048>>();
exec_ctx.Flush();
EXPECT_EQ(object.get(), nullptr);
}
TEST(MemoryQuotaTest, ReserveRangeNoPressure) {
RefCountedPtr<MemoryQuota> memory_quota = MakeRefCounted<MemoryQuota>();
auto memory_allocator = memory_quota->MakeMemoryAllocator();
MemoryQuota memory_quota;
auto memory_allocator = memory_quota.CreateMemoryAllocator();
size_t total = 0;
for (int i = 0; i < 10000; i++) {
auto n = memory_allocator->Reserve(MemoryRequest(100, 40000));
auto n = memory_allocator.Reserve(MemoryRequest(100, 40000));
EXPECT_EQ(n, 40000);
total += n;
}
memory_allocator->Release(total);
memory_allocator.Release(total);
}
TEST(MemoryQuotaTest, MakeSlice) {
RefCountedPtr<MemoryQuota> memory_quota = MakeRefCounted<MemoryQuota>();
auto memory_allocator = memory_quota->MakeMemoryAllocator();
MemoryQuota memory_quota;
auto memory_allocator = memory_quota.CreateMemoryAllocator();
std::vector<grpc_slice> slices;
for (int i = 1; i < 1000; i++) {
int min = i;
int max = 10 * i - 9;
slices.push_back(memory_allocator->MakeSlice(MemoryRequest(min, max)));
slices.push_back(memory_allocator.MakeSlice(MemoryRequest(min, max)));
}
for (grpc_slice slice : slices) {
grpc_slice_unref_internal(slice);
@ -155,9 +153,9 @@ TEST(MemoryQuotaTest, MakeSlice) {
}
TEST(MemoryQuotaTest, ContainerAllocator) {
RefCountedPtr<MemoryQuota> memory_quota = MakeRefCounted<MemoryQuota>();
auto memory_allocator = memory_quota->MakeMemoryAllocator();
Vector<int> vec(memory_allocator.get());
MemoryQuota memory_quota;
auto memory_allocator = memory_quota.CreateMemoryAllocator();
Vector<int> vec(&memory_allocator);
for (int i = 0; i < 100000; i++) {
vec.push_back(i);
}

@ -883,8 +883,9 @@ include/grpc/census.h \
include/grpc/compression.h \
include/grpc/event_engine/endpoint_config.h \
include/grpc/event_engine/event_engine.h \
include/grpc/event_engine/internal/memory_allocator_impl.h \
include/grpc/event_engine/memory_allocator.h \
include/grpc/event_engine/port.h \
include/grpc/event_engine/slice_allocator.h \
include/grpc/fork.h \
include/grpc/grpc.h \
include/grpc/grpc_posix.h \

@ -883,8 +883,9 @@ include/grpc/census.h \
include/grpc/compression.h \
include/grpc/event_engine/endpoint_config.h \
include/grpc/event_engine/event_engine.h \
include/grpc/event_engine/internal/memory_allocator_impl.h \
include/grpc/event_engine/memory_allocator.h \
include/grpc/event_engine/port.h \
include/grpc/event_engine/slice_allocator.h \
include/grpc/fork.h \
include/grpc/grpc.h \
include/grpc/grpc_posix.h \
@ -1198,8 +1199,6 @@ src/core/ext/transport/chttp2/transport/bin_decoder.h \
src/core/ext/transport/chttp2/transport/bin_encoder.cc \
src/core/ext/transport/chttp2/transport/bin_encoder.h \
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h \
src/core/ext/transport/chttp2/transport/chttp2_transport.cc \
src/core/ext/transport/chttp2/transport/chttp2_transport.h \
src/core/ext/transport/chttp2/transport/context_list.cc \

@ -813,8 +813,9 @@ include/grpc/census.h \
include/grpc/compression.h \
include/grpc/event_engine/endpoint_config.h \
include/grpc/event_engine/event_engine.h \
include/grpc/event_engine/internal/memory_allocator_impl.h \
include/grpc/event_engine/memory_allocator.h \
include/grpc/event_engine/port.h \
include/grpc/event_engine/slice_allocator.h \
include/grpc/fork.h \
include/grpc/grpc.h \
include/grpc/grpc_posix.h \

@ -813,8 +813,9 @@ include/grpc/census.h \
include/grpc/compression.h \
include/grpc/event_engine/endpoint_config.h \
include/grpc/event_engine/event_engine.h \
include/grpc/event_engine/internal/memory_allocator_impl.h \
include/grpc/event_engine/memory_allocator.h \
include/grpc/event_engine/port.h \
include/grpc/event_engine/slice_allocator.h \
include/grpc/fork.h \
include/grpc/grpc.h \
include/grpc/grpc_posix.h \
@ -1033,8 +1034,6 @@ src/core/ext/transport/chttp2/transport/bin_decoder.h \
src/core/ext/transport/chttp2/transport/bin_encoder.cc \
src/core/ext/transport/chttp2/transport/bin_encoder.h \
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h \
src/core/ext/transport/chttp2/transport/chttp2_transport.cc \
src/core/ext/transport/chttp2/transport/chttp2_transport.h \
src/core/ext/transport/chttp2/transport/context_list.cc \

Loading…
Cancel
Save