Rewrite memory quota management in C++ with Promises library (#27327)

* first pass new memory quota

* forget the ee stuff - it should be a wrapper on top

* beginning to finalize

* compiles

* basic tests pass

* fixes

* Automated change: Fix sanity tests

* merge

* Automated change: Fix sanity tests

* add rebind test

* flesh out the rest

* Automated change: Fix sanity tests

* add increase

* prog

* Automated change: Fix sanity tests

* allow cancellation during run

* fixes

* clang-format

* better allocation strategy

* better allocation strategy

* Automated change: Fix sanity tests

* Update memory_quota.cc

* format

* better comment

* remove block size - this is probably unnecessary complexity

* fmt

* cleanup

* size_t

* Automated change: Fix sanity tests

* fixes

* move makeslice into memoryallocator

* move makeslice into memoryallocator

* add container allocator, tests

* Automated change: Fix sanity tests

* fixes

* Automated change: Fix sanity tests

* add some docs

* Automated change: Fix sanity tests

* fix doc

* comment vector

* Automated change: Fix sanity tests

* fixes

* ditch the thread

* exec_ctx integration

* Automated change: Fix sanity tests

* progress!

* fuzzer

* initial_corpora

* Automated change: Fix sanity tests

* bigger_objects

* better-fuzzer

* add stress test

* Automated change: Fix sanity tests

* Remove unused header

* Iwyu

* Automated change: Fix sanity tests

* Portability fix, comment

* Fix unused arg

* Remove unused names

* Removed unused name

* Automated change: Fix sanity tests

* windows

* Automated change: Fix sanity tests

* cleanup

* fix-mac

* cleanup, eliminate atomicbarrier

* exclude some platforms

* Automated change: Fix sanity tests

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/27499/head
Craig Tiller 3 years ago committed by GitHub
parent bed585bdcb
commit 82c99362b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 54
      BUILD
  2. 242
      CMakeLists.txt
  3. 207
      build_autogenerated.yaml
  4. 6
      src/core/lib/promise/activity.h
  5. 426
      src/core/lib/resource_quota/memory_quota.cc
  6. 405
      src/core/lib/resource_quota/memory_quota.h
  7. 27
      src/core/lib/resource_quota/resource_quota.cc
  8. 48
      src/core/lib/resource_quota/resource_quota.h
  9. 43
      src/core/lib/resource_quota/thread_quota.cc
  10. 55
      src/core/lib/resource_quota/thread_quota.h
  11. 94
      test/core/resource_quota/BUILD
  12. 169
      test/core/resource_quota/memory_quota_fuzzer.cc
  13. 57
      test/core/resource_quota/memory_quota_fuzzer.proto
  14. 1
      test/core/resource_quota/memory_quota_fuzzer_corpus/0
  15. 210
      test/core/resource_quota/memory_quota_stress_test.cc
  16. 176
      test/core/resource_quota/memory_quota_test.cc
  17. 37
      test/core/resource_quota/resource_quota_test.cc
  18. 45
      test/core/resource_quota/thread_quota_test.cc
  19. 140
      tools/run_tests/generated/tests.json

54
BUILD

@ -1284,6 +1284,60 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "memory_quota",
srcs = [
"src/core/lib/resource_quota/memory_quota.cc",
],
hdrs = [
"src/core/lib/resource_quota/memory_quota.h",
],
deps = [
"activity",
"dual_ref_counted",
"exec_ctx_wakeup_scheduler",
"gpr_base",
"loop",
"orphanable",
"poll",
"race",
"ref_counted_ptr",
"seq",
"slice_refcount",
"useful",
],
)
grpc_cc_library(
name = "thread_quota",
srcs = [
"src/core/lib/resource_quota/thread_quota.cc",
],
hdrs = [
"src/core/lib/resource_quota/thread_quota.h",
],
deps = [
"gpr_base",
"ref_counted",
],
)
grpc_cc_library(
name = "resource_quota",
srcs = [
"src/core/lib/resource_quota/resource_quota.cc",
],
hdrs = [
"src/core/lib/resource_quota/resource_quota.h",
],
deps = [
"gpr_base",
"memory_quota",
"ref_counted",
"thread_quota",
],
)
grpc_cc_library(
name = "slice_refcount",
srcs = [

242
CMakeLists.txt generated

@ -645,6 +645,9 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_c lame_client_test)
add_dependencies(buildtests_c load_file_test)
add_dependencies(buildtests_c manual_constructor_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_c memory_quota_stress_test)
endif()
add_dependencies(buildtests_c message_compress_test)
add_dependencies(buildtests_c metadata_test)
add_dependencies(buildtests_c minimal_stack_is_minimal_test)
@ -671,7 +674,6 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_c resolve_address_using_native_resolver_posix_test)
endif()
add_dependencies(buildtests_c resolve_address_using_native_resolver_test)
add_dependencies(buildtests_c resource_quota_test)
add_dependencies(buildtests_c secure_channel_create_test)
add_dependencies(buildtests_c secure_endpoint_test)
add_dependencies(buildtests_c security_connector_test)
@ -709,6 +711,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_c tcp_server_posix_test)
endif()
add_dependencies(buildtests_c test_core_gpr_time_test)
add_dependencies(buildtests_c test_core_iomgr_resource_quota_test)
add_dependencies(buildtests_c test_core_security_credentials_test)
add_dependencies(buildtests_c test_core_slice_slice_test)
add_dependencies(buildtests_c thd_test)
@ -916,6 +919,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx loop_test)
add_dependencies(buildtests_cxx match_test)
add_dependencies(buildtests_cxx matchers_test)
add_dependencies(buildtests_cxx memory_quota_test)
add_dependencies(buildtests_cxx message_allocator_end2end_test)
add_dependencies(buildtests_cxx metadata_map_test)
add_dependencies(buildtests_cxx miscompile_with_no_unique_address_test)
@ -989,11 +993,13 @@ if(gRPC_BUILD_TESTS)
endif()
add_dependencies(buildtests_cxx string_ref_test)
add_dependencies(buildtests_cxx table_test)
add_dependencies(buildtests_cxx test_core_resource_quota_resource_quota_test)
add_dependencies(buildtests_cxx test_cpp_client_credentials_test)
add_dependencies(buildtests_cxx test_cpp_server_credentials_test)
add_dependencies(buildtests_cxx test_cpp_util_slice_test)
add_dependencies(buildtests_cxx test_cpp_util_time_test)
add_dependencies(buildtests_cxx thread_manager_test)
add_dependencies(buildtests_cxx thread_quota_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx thread_stress_test)
endif()
@ -6145,6 +6151,49 @@ target_link_libraries(manual_constructor_test
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_executable(memory_quota_stress_test
src/core/lib/debug/trace.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/iomgr_internal.cc
src/core/lib/promise/activity.cc
src/core/lib/resource_quota/memory_quota.cc
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_refcount.cc
src/core/lib/slice/slice_string_helpers.cc
src/core/lib/slice/static_slice.cc
test/core/resource_quota/memory_quota_stress_test.cc
)
target_include_directories(memory_quota_stress_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
)
target_link_libraries(memory_quota_stress_test
${_gRPC_ALLTARGETS_LIBRARIES}
absl::statusor
absl::variant
gpr
)
endif()
endif()
if(gRPC_BUILD_TESTS)
@ -6650,33 +6699,6 @@ target_link_libraries(resolve_address_using_native_resolver_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(resource_quota_test
test/core/iomgr/resource_quota_test.cc
)
target_include_directories(resource_quota_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
)
target_link_libraries(resource_quota_test
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)
@ -7373,6 +7395,33 @@ target_link_libraries(test_core_gpr_time_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(test_core_iomgr_resource_quota_test
test/core/iomgr/resource_quota_test.cc
)
target_include_directories(test_core_iomgr_resource_quota_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
)
target_link_libraries(test_core_iomgr_resource_quota_test
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)
@ -13280,6 +13329,55 @@ target_link_libraries(matchers_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(memory_quota_test
src/core/lib/debug/trace.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/iomgr_internal.cc
src/core/lib/promise/activity.cc
src/core/lib/resource_quota/memory_quota.cc
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_refcount.cc
src/core/lib/slice/slice_string_helpers.cc
src/core/lib/slice/static_slice.cc
test/core/resource_quota/memory_quota_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(memory_quota_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(memory_quota_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::statusor
absl::variant
gpr
)
endif()
if(gRPC_BUILD_TESTS)
@ -15857,6 +15955,57 @@ target_link_libraries(table_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(test_core_resource_quota_resource_quota_test
src/core/lib/debug/trace.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/iomgr_internal.cc
src/core/lib/promise/activity.cc
src/core/lib/resource_quota/memory_quota.cc
src/core/lib/resource_quota/resource_quota.cc
src/core/lib/resource_quota/thread_quota.cc
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_refcount.cc
src/core/lib/slice/slice_string_helpers.cc
src/core/lib/slice/static_slice.cc
test/core/resource_quota/resource_quota_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(test_core_resource_quota_resource_quota_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(test_core_resource_quota_resource_quota_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::statusor
absl::variant
gpr
)
endif()
if(gRPC_BUILD_TESTS)
@ -16035,6 +16184,43 @@ target_link_libraries(thread_manager_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(thread_quota_test
src/core/lib/debug/trace.cc
src/core/lib/resource_quota/thread_quota.cc
test/core/resource_quota/thread_quota_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(thread_quota_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(thread_quota_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
gpr
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)

@ -3668,6 +3668,64 @@ targets:
deps:
- grpc_test_util
uses_polling: false
- name: memory_quota_stress_test
build: test
language: c
headers:
- src/core/lib/debug/trace.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/combiner.h
- src/core/lib/iomgr/error.h
- src/core/lib/iomgr/error_internal.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/iomgr_internal.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/detail/switch.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/resource_quota/memory_quota.h
- src/core/lib/slice/slice_internal.h
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_refcount_base.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/lib/slice/slice_utils.h
- src/core/lib/slice/static_slice.h
src:
- src/core/lib/debug/trace.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/iomgr_internal.cc
- src/core/lib/promise/activity.cc
- src/core/lib/resource_quota/memory_quota.cc
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_refcount.cc
- src/core/lib/slice/slice_string_helpers.cc
- src/core/lib/slice/static_slice.cc
- test/core/resource_quota/memory_quota_stress_test.cc
deps:
- absl/status:statusor
- absl/types:variant
- gpr
platforms:
- linux
- posix
uses_polling: false
- name: message_compress_test
build: test
language: c
@ -3863,14 +3921,6 @@ targets:
- grpc_test_util
args:
- --resolver=native
- name: resource_quota_test
build: test
language: c
headers: []
src:
- test/core/iomgr/resource_quota_test.cc
deps:
- grpc_test_util
- name: secure_channel_create_test
build: test
language: c
@ -4129,6 +4179,14 @@ targets:
deps:
- grpc_test_util
uses_polling: false
- name: test_core_iomgr_resource_quota_test
build: test
language: c
headers: []
src:
- test/core/iomgr/resource_quota_test.cc
deps:
- grpc_test_util
- name: test_core_security_credentials_test
build: test
language: c
@ -6612,6 +6670,62 @@ targets:
- test/core/security/matchers_test.cc
deps:
- grpc_test_util
- name: memory_quota_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/debug/trace.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/combiner.h
- src/core/lib/iomgr/error.h
- src/core/lib/iomgr/error_internal.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/iomgr_internal.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/detail/switch.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/resource_quota/memory_quota.h
- src/core/lib/slice/slice_internal.h
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_refcount_base.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/lib/slice/slice_utils.h
- src/core/lib/slice/static_slice.h
src:
- src/core/lib/debug/trace.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/iomgr_internal.cc
- src/core/lib/promise/activity.cc
- src/core/lib/resource_quota/memory_quota.cc
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_refcount.cc
- src/core/lib/slice/slice_string_helpers.cc
- src/core/lib/slice/static_slice.cc
- test/core/resource_quota/memory_quota_test.cc
deps:
- absl/status:statusor
- absl/types:variant
- gpr
uses_polling: false
- name: message_allocator_end2end_test
gtest: true
build: test
@ -7659,6 +7773,66 @@ targets:
- absl/types:optional
- absl/utility:utility
uses_polling: false
- name: test_core_resource_quota_resource_quota_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/debug/trace.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/combiner.h
- src/core/lib/iomgr/error.h
- src/core/lib/iomgr/error_internal.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/iomgr_internal.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/detail/switch.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/resource_quota/memory_quota.h
- src/core/lib/resource_quota/resource_quota.h
- src/core/lib/resource_quota/thread_quota.h
- src/core/lib/slice/slice_internal.h
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_refcount_base.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/lib/slice/slice_utils.h
- src/core/lib/slice/static_slice.h
src:
- src/core/lib/debug/trace.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/iomgr_internal.cc
- src/core/lib/promise/activity.cc
- src/core/lib/resource_quota/memory_quota.cc
- src/core/lib/resource_quota/resource_quota.cc
- src/core/lib/resource_quota/thread_quota.cc
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_refcount.cc
- src/core/lib/slice/slice_string_helpers.cc
- src/core/lib/slice/static_slice.cc
- test/core/resource_quota/resource_quota_test.cc
deps:
- absl/status:statusor
- absl/types:variant
- gpr
uses_polling: false
- name: test_cpp_client_credentials_test
gtest: true
build: test
@ -7709,6 +7883,23 @@ targets:
deps:
- grpc++_test_config
- grpc++_test_util
- name: thread_quota_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/debug/trace.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
- src/core/lib/resource_quota/thread_quota.h
src:
- src/core/lib/debug/trace.cc
- src/core/lib/resource_quota/thread_quota.cc
- test/core/resource_quota/thread_quota_test.cc
deps:
- gpr
uses_polling: false
- name: thread_stress_test
gtest: true
build: test

@ -341,7 +341,7 @@ class PromiseActivity final
}
void RunScheduledWakeup() {
GPR_ASSERT(wakeup_scheduled_.exchange(false, std::memory_order_relaxed));
GPR_ASSERT(wakeup_scheduled_.exchange(false, std::memory_order_acq_rel));
Step();
WakeupComplete();
}
@ -360,9 +360,11 @@ class PromiseActivity final
WakeupComplete();
return;
}
if (!wakeup_scheduled_.exchange(true, std::memory_order_relaxed)) {
if (!wakeup_scheduled_.exchange(true, std::memory_order_acq_rel)) {
// Can't safely run, so ask to run later.
wakeup_scheduler_.ScheduleWakeup(this);
} else {
WakeupComplete();
}
}

@ -0,0 +1,426 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/race.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/slice/slice_refcount.h"
namespace grpc_core {
// Maximum number of bytes an allocator will request from a quota in one step.
// Larger allocations than this will require multiple allocation requests.
static constexpr size_t kMaxReplenishBytes = 1024 * 1024;
// Minimum number of bytes an allocator will request from a quota in one step.
static constexpr size_t kMinReplenishBytes = 4096;
//
// Reclaimer
//
ReclamationSweep::~ReclamationSweep() {
if (memory_quota_ != nullptr) {
memory_quota_->FinishReclamation(sweep_token_);
}
}
//
// ReclaimerQueue
//
const ReclaimerQueue::Index ReclaimerQueue::kInvalidIndex;
void ReclaimerQueue::Insert(RefCountedPtr<MemoryAllocator> allocator,
ReclamationFunction reclaimer, Index* index) {
MutexLock lock(&mu_);
if (*index < entries_.size() && entries_[*index].allocator == allocator) {
entries_[*index].reclaimer.swap(reclaimer);
return;
}
if (free_entries_.empty()) {
*index = entries_.size();
entries_.emplace_back(std::move(allocator), std::move(reclaimer));
} else {
*index = free_entries_.back();
free_entries_.pop_back();
Entry& entry = entries_[*index];
entry.allocator = std::move(allocator);
entry.reclaimer = std::move(reclaimer);
}
if (queue_.empty()) waker_.Wakeup();
queue_.push(*index);
}
ReclamationFunction ReclaimerQueue::Cancel(Index index,
MemoryAllocator* allocator) {
MutexLock lock(&mu_);
if (index >= entries_.size()) return nullptr;
Entry& entry = entries_[index];
if (entry.allocator.get() != allocator) return {};
entry.allocator.reset();
return std::move(entry.reclaimer);
}
Poll<ReclamationFunction> ReclaimerQueue::PollNext() {
MutexLock lock(&mu_);
while (true) {
if (queue_.empty()) {
waker_ = Activity::current()->MakeNonOwningWaker();
return Pending{};
}
Index index = queue_.front();
queue_.pop();
free_entries_.push_back(index);
Entry& entry = entries_[index];
if (entry.allocator != nullptr) {
entry.allocator.reset();
return std::move(entry.reclaimer);
}
}
}
//
// MemoryAllocator
//
MemoryAllocator::MemoryAllocator(RefCountedPtr<MemoryQuota> memory_quota)
: InternallyRefCounted("MemoryAllocator"), memory_quota_(memory_quota) {
Reserve(sizeof(MemoryQuota));
}
MemoryAllocator::~MemoryAllocator() {
GPR_ASSERT(free_bytes_.load(std::memory_order_acquire) +
sizeof(MemoryQuota) ==
taken_bytes_);
memory_quota_->Return(taken_bytes_);
}
void MemoryAllocator::Orphan() {
ReclamationFunction old_reclaimers[kNumReclamationPasses];
{
MutexLock lock(&memory_quota_mu_);
for (size_t i = 0; i < kNumReclamationPasses; i++) {
old_reclaimers[i] =
memory_quota_->reclaimers_[i].Cancel(reclamation_indices_[i], this);
}
}
InternallyRefCounted<MemoryAllocator>::Unref();
}
size_t MemoryAllocator::Reserve(MemoryRequest request) {
while (true) {
// Attempt to reserve memory from our pool.
auto reservation = TryReserve(request);
if (reservation.has_value()) return *reservation;
// If that failed, grab more from the quota and retry.
Replenish();
}
}
absl::optional<size_t> MemoryAllocator::TryReserve(MemoryRequest request) {
// How much memory should we request? (see the scaling below)
size_t scaled_size_over_min = request.max() - request.min();
// Scale the request down according to memory pressure if we have that
// flexibility.
if (scaled_size_over_min != 0) {
double pressure;
{
MutexLock lock(&memory_quota_mu_);
pressure = memory_quota_->InstantaneousPressure();
}
// Reduce allocation size proportional to the pressure > 80% usage.
if (pressure > 0.8) {
scaled_size_over_min =
std::min(scaled_size_over_min,
static_cast<size_t>((request.max() - request.min()) *
(1.0 - pressure) / 0.2));
}
}
// How much do we want to reserve?
const size_t reserve = request.min() + scaled_size_over_min;
// See how many bytes are available.
size_t available = free_bytes_.load(std::memory_order_acquire);
while (true) {
// Does the current free pool satisfy the request?
if (available < reserve) {
return {};
}
// Try to reserve the requested amount.
// If the amount of free memory changed through this loop, then available
// will be set to the new value and we'll repeat.
if (free_bytes_.compare_exchange_weak(available, available - reserve,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
return reserve;
}
}
}
void MemoryAllocator::Replenish() {
MutexLock lock(&memory_quota_mu_);
// Attempt a fairly low rate exponential growth request size, bounded between
// some reasonable limits declared at top of file.
auto amount = Clamp(taken_bytes_ / 3, kMinReplenishBytes, kMaxReplenishBytes);
// Take the requested amount from the quota.
gpr_log(GPR_DEBUG, "%p: take %" PRIdMAX " bytes from quota", this, amount);
memory_quota_->Take(amount);
// Record that we've taken it.
taken_bytes_ += amount;
// Add the taken amount to the free pool.
free_bytes_.fetch_add(amount, std::memory_order_acq_rel);
// See if we can add ourselves as a reclaimer.
MaybeRegisterReclaimerLocked();
}
void MemoryAllocator::MaybeRegisterReclaimer() {
MutexLock lock(&memory_quota_mu_);
MaybeRegisterReclaimerLocked();
}
void MemoryAllocator::MaybeRegisterReclaimerLocked() {
// If the reclaimer is already registered, then there's nothing to do.
if (reclamation_indices_[0] != ReclaimerQueue::kInvalidIndex) return;
// Grab references to the things we'll need
auto self = Ref(DEBUG_LOCATION, "reclaimer");
gpr_log(GPR_DEBUG, "%p: register reclaimer; idx=%" PRIdMAX, this,
reclamation_indices_[0]);
memory_quota_->reclaimers_[0].Insert(
self,
[self](ReclamationSweep) {
MutexLock lock(&self->memory_quota_mu_);
// Figure out how many bytes we can return to the quota.
size_t return_bytes =
self->free_bytes_.exchange(0, std::memory_order_acq_rel);
gpr_log(GPR_DEBUG, "%p: sweep reclaimer - return %" PRIdMAX, self.get(),
return_bytes);
if (return_bytes == 0) return;
// Subtract that from our outstanding balance.
self->taken_bytes_ -= return_bytes;
// And return them to the quota.
self->memory_quota_->Return(return_bytes);
},
&reclamation_indices_[0]);
}
void MemoryAllocator::Rebind(RefCountedPtr<MemoryQuota> memory_quota) {
MutexLock lock(&memory_quota_mu_);
if (memory_quota_ == memory_quota) return;
// Return memory to the original memory quota.
memory_quota_->Return(taken_bytes_);
// Fetch back any reclaimers that are queued.
ReclamationFunction reclaimers[kNumReclamationPasses];
for (size_t i = 0; i < kNumReclamationPasses; i++) {
reclaimers[i] =
memory_quota_->reclaimers_[i].Cancel(reclamation_indices_[i], this);
}
// Switch to the new memory quota, leaving the old one in memory_quota so that
// when we unref it, we are outside of lock.
memory_quota_.swap(memory_quota);
// Drop our freed memory down to zero, to avoid needing to ask the new
// quota for memory we're not currently using.
taken_bytes_ -= free_bytes_.exchange(0, std::memory_order_acq_rel);
// And let the new quota know how much we're already using.
memory_quota_->Take(taken_bytes_);
// Reinsert active reclaimers.
for (size_t i = 0; i < kNumReclamationPasses; i++) {
if (reclaimers[i] == nullptr) continue;
memory_quota_->reclaimers_[i].Insert(Ref(DEBUG_LOCATION, "rebind"),
std::move(reclaimers[i]),
&reclamation_indices_[i]);
}
}
void MemoryAllocator::PostReclaimer(ReclamationPass pass,
ReclamationFunction fn) {
MutexLock lock(&memory_quota_mu_);
auto pass_num = static_cast<int>(pass);
memory_quota_->reclaimers_[pass_num].Insert(
Ref(DEBUG_LOCATION, "post_reclaimer"), std::move(fn),
&reclamation_indices_[pass_num]);
}
namespace {
// Reference count for a slice allocated by MemoryAllocator::MakeSlice.
// Takes care of releasing memory back when the slice is destroyed.
class SliceRefCount {
public:
static void Destroy(void* p) {
auto* rc = static_cast<SliceRefCount*>(p);
rc->~SliceRefCount();
gpr_free(rc);
}
SliceRefCount(RefCountedPtr<MemoryAllocator> allocator, size_t size)
: base_(grpc_slice_refcount::Type::REGULAR, &refs_, Destroy, this,
&base_),
allocator_(std::move(allocator)),
size_(size) {
// Nothing to do here.
}
~SliceRefCount() { allocator_->Release(size_); }
grpc_slice_refcount* base_refcount() { return &base_; }
private:
grpc_slice_refcount base_;
RefCount refs_;
RefCountedPtr<MemoryAllocator> allocator_;
size_t size_;
};
} // namespace
grpc_slice MemoryAllocator::MakeSlice(MemoryRequest request) {
auto size = Reserve(request.Increase(sizeof(SliceRefCount)));
void* p = gpr_malloc(size);
new (p) SliceRefCount(Ref(DEBUG_LOCATION, "slice"), size);
grpc_slice slice;
slice.refcount = static_cast<SliceRefCount*>(p)->base_refcount();
slice.data.refcounted.bytes =
static_cast<uint8_t*>(p) + sizeof(SliceRefCount);
slice.data.refcounted.length = size - sizeof(SliceRefCount);
return slice;
}
//
// MemoryQuota
//
class MemoryQuota::WaitForSweepPromise {
public:
WaitForSweepPromise(WeakRefCountedPtr<MemoryQuota> memory_quota,
uint64_t token)
: memory_quota_(memory_quota), token_(token) {}
struct Empty {};
Poll<Empty> operator()() {
if (memory_quota_->reclamation_counter_.load(std::memory_order_relaxed) !=
token_) {
return Empty{};
} else {
return Pending{};
}
}
private:
WeakRefCountedPtr<MemoryQuota> memory_quota_;
uint64_t token_;
};
MemoryQuota::MemoryQuota() : DualRefCounted("MemoryQuota") {
auto self = WeakRef();
// Reclamation loop:
// basically, wait until we are in overcommit (free_bytes_ < 0), and then:
// while (free_bytes_ < 0) reclaim_memory()
// ... and repeat
auto reclamation_loop = Loop(Seq(
[self]() -> Poll<int> {
// If there's free memory we no longer need to reclaim memory!
if (self->free_bytes_.load(std::memory_order_acquire) > 0) {
return Pending{};
}
return 0;
},
[self]() {
// Race biases to the first thing that completes... so this will
// choose the highest priority/least destructive thing to do that's
// available.
return Race(self->reclaimers_[0].Next(), self->reclaimers_[1].Next(),
self->reclaimers_[2].Next(), self->reclaimers_[3].Next());
},
[self](ReclamationFunction reclaimer) {
// One of the reclaimer queues gave us a way to get back memory.
// Call the reclaimer with a token that contains enough to wake us
// up again.
const uint64_t token =
self->reclamation_counter_.fetch_add(1, std::memory_order_relaxed) +
1;
reclaimer(ReclamationSweep(self, token));
// Return a promise that will wait for our barrier. This will be
// awoken by the token above being destroyed. So, once that token is
// destroyed, we'll be able to proceed.
return WaitForSweepPromise(self, token);
},
[]() -> LoopCtl<absl::Status> {
// Continue the loop!
return Continue{};
}));
reclaimer_activity_ =
MakeActivity(std::move(reclamation_loop), ExecCtxWakeupScheduler(),
[](absl::Status status) {
GPR_ASSERT(status.code() == absl::StatusCode::kCancelled);
});
}
void MemoryQuota::SetSize(size_t new_size) {
size_t old_size = quota_size_.exchange(new_size, std::memory_order_relaxed);
if (old_size < new_size) {
// We're growing the quota.
Return(new_size - old_size);
} else {
// We're shrinking the quota.
Take(old_size - new_size);
}
}
void MemoryQuota::Take(size_t amount) {
// If there's a request for nothing, then do nothing!
if (amount == 0) return;
GPR_DEBUG_ASSERT(amount <= std::numeric_limits<intptr_t>::max());
// Grab memory from the quota.
auto prior = free_bytes_.fetch_sub(amount, std::memory_order_acq_rel);
// If we push into overcommit, awake the reclaimer.
if (prior >= 0 && prior < static_cast<intptr_t>(amount)) {
reclaimer_activity_->ForceWakeup();
}
}
void MemoryQuota::Orphan() { reclaimer_activity_.reset(); }
void MemoryQuota::FinishReclamation(uint64_t token) {
uint64_t current = reclamation_counter_.load(std::memory_order_relaxed);
if (current != token) return;
if (reclamation_counter_.compare_exchange_strong(current, current + 1,
std::memory_order_relaxed,
std::memory_order_relaxed)) {
reclaimer_activity_->ForceWakeup();
}
}
void MemoryQuota::Return(size_t amount) {
free_bytes_.fetch_add(amount, std::memory_order_relaxed);
}
size_t MemoryQuota::InstantaneousPressure() const {
double free = free_bytes_.load();
if (free < 0) free = 0;
double size = quota_size_.load();
if (size < 1) return 1.0;
double pressure = (size - free) / size;
if (pressure < 0.0) pressure = 0.0;
if (pressure > 1.0) pressure = 1.0;
return pressure;
}
} // namespace grpc_core

@ -0,0 +1,405 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_RESOURCE_QUOTA_MEMORY_QUOTA_H
#define GRPC_CORE_LIB_RESOURCE_QUOTA_MEMORY_QUOTA_H
#include <grpc/support/port_platform.h>
#include <algorithm>
#include <cstddef>
#include <limits>
#include <memory>
#include <queue>
#include <vector>
#include <grpc/slice.h>
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
namespace grpc_core {
class Reclaimer;
class MemoryAllocator;
class MemoryQuota;
// Reclamation passes.
// When memory is tight, we start trying to claim some back from memory
// reclaimers. We do this in multiple passes: if there is a less destructive
// operation available, we do that, otherwise we do something more destructive.
enum class ReclamationPass {
// Non-empty reclamation ought to take index 0, but to simplify API we don't
// expose that publicly (it's an internal detail), and hence index zero is
// here unnamed.
// Benign reclamation is intended for reclamation steps that are not
// observable outside of gRPC (besides maybe causing an increase in CPU
// usage).
// Examples of such reclamation would be resizing buffers to fit the current
// load needs, rather than whatever was the peak usage requirement.
kBenign = 1,
// Idle reclamation is intended for reclamation steps that are observable
// outside of gRPC, but do not cause application work to be lost.
// Examples of such reclamation would be dropping channels that are not being
// used.
kIdle = 2,
// Destructive reclamation is our last resort, and is these reclamations are
// allowed to drop work - such as cancelling in flight requests.
kDestructive = 3,
};
static constexpr size_t kNumReclamationPasses = 4;
// Reservation request - how much memory do we want to allocate?
class MemoryRequest {
public:
// Request a fixed amount of memory.
// NOLINTNEXTLINE(google-explicit-constructor)
MemoryRequest(size_t n) : min_(n), max_(n) {}
// Request a range of memory.
MemoryRequest(size_t min, size_t max) : min_(std::min(min, max)), max_(max) {}
// Increase the size by amount
MemoryRequest Increase(size_t amount) const {
return MemoryRequest(min_ + amount, max_ + amount);
}
size_t min() const { return min_; }
size_t max() const { return max_; }
private:
size_t min_;
size_t max_;
};
// For each reclamation function run we construct a ReclamationSweep.
// When this object is finally destroyed (it may be moved several times first),
// then that reclamation is complete and we may continue the reclamation loop.
class ReclamationSweep {
public:
ReclamationSweep(WeakRefCountedPtr<MemoryQuota> memory_quota,
uint64_t sweep_token)
: memory_quota_(std::move(memory_quota)), sweep_token_(sweep_token) {}
~ReclamationSweep();
ReclamationSweep(const ReclamationSweep&) = delete;
ReclamationSweep& operator=(const ReclamationSweep&) = delete;
ReclamationSweep(ReclamationSweep&&) = default;
ReclamationSweep& operator=(ReclamationSweep&&) = default;
// Has enough work been done that we would not be called upon again
// immediately to do reclamation work if we stopped and requeued. Reclaimers
// with a variable amount of work to do can use this to ascertain when they
// can stop more efficiently than going through the reclaimer queue once per
// work item.
bool IsSufficient() const;
private:
WeakRefCountedPtr<MemoryQuota> memory_quota_;
uint64_t sweep_token_;
};
using ReclamationFunction = std::function<void(ReclamationSweep)>;
class ReclaimerQueue {
public:
using Index = size_t;
// An invalid index usable as an empty value.
// This value will not be returned from Insert ever.
static constexpr Index kInvalidIndex = std::numeric_limits<Index>::max();
// Insert a new element at the back of the queue.
// If there is already an element from allocator at *index, then it is
// replaced with the new reclaimer and *index is unchanged. If there is not,
// then *index is set to the index of the newly queued entry.
// Associates the reclamation function with an allocator, and keeps that
// allocator alive, so that we can use the pointer as an ABA guard.
void Insert(RefCountedPtr<MemoryAllocator> allocator,
ReclamationFunction reclaimer, Index* index)
ABSL_LOCKS_EXCLUDED(mu_);
// Cancel a reclamation function - returns the function if cancelled
// successfully, or nullptr if the reclamation was already begun and could not
// be cancelled. allocator must be the same as was passed to Insert.
ReclamationFunction Cancel(Index index, MemoryAllocator* allocator)
ABSL_LOCKS_EXCLUDED(mu_);
// Poll to see if an entry is available: returns Pending if not, or the
// removed reclamation function if so.
Poll<ReclamationFunction> PollNext() ABSL_LOCKS_EXCLUDED(mu_);
// This callable is the promise backing Next - it resolves when there is an
// entry available. This really just redirects to calling PollNext().
class NextPromise {
public:
explicit NextPromise(ReclaimerQueue* queue) : queue_(queue) {}
Poll<ReclamationFunction> operator()() { return queue_->PollNext(); }
private:
// Borrowed ReclaimerQueue backing this promise.
ReclaimerQueue* queue_;
};
NextPromise Next() { return NextPromise(this); }
private:
// One entry in the reclaimer queue
struct Entry {
Entry(RefCountedPtr<MemoryAllocator> allocator,
ReclamationFunction reclaimer)
: allocator(allocator), reclaimer(reclaimer) {}
// The allocator we'd be reclaiming for.
RefCountedPtr<MemoryAllocator> allocator;
// The reclamation function to call.
ReclamationFunction reclaimer;
};
// Guarding mutex.
Mutex mu_;
// Entries in the queue (or empty entries waiting to be queued).
// We actually queue indices into this vector - and do this so that
// we can use the memory allocator pointer as an ABA protection.
std::vector<Entry> entries_ ABSL_GUARDED_BY(mu_);
// Which entries in entries_ are not allocated right now.
std::vector<size_t> free_entries_ ABSL_GUARDED_BY(mu_);
// Allocated entries waiting to be consumed.
std::queue<Index> queue_ ABSL_GUARDED_BY(mu_);
// Potentially one activity can be waiting for new entries on the queue.
Waker waker_ ABSL_GUARDED_BY(mu_);
};
// MemoryAllocator grants the owner the ability to allocate memory from an
// underlying resource quota.
class MemoryAllocator final : public InternallyRefCounted<MemoryAllocator> {
public:
explicit MemoryAllocator(RefCountedPtr<MemoryQuota> memory_quota);
~MemoryAllocator() override;
void Orphan() override;
// Rebind - Swaps the underlying quota for this allocator, taking care to
// make sure memory allocated is moved to allocations against the new quota.
void Rebind(RefCountedPtr<MemoryQuota> memory_quota)
ABSL_LOCKS_EXCLUDED(memory_quota_mu_);
// Reserve bytes from the quota.
// If we enter overcommit, reclamation will begin concurrently.
// Returns the number of bytes reserved.
size_t Reserve(MemoryRequest request) ABSL_LOCKS_EXCLUDED(memory_quota_mu_);
// Release some bytes that were previously reserved.
void Release(size_t n) ABSL_LOCKS_EXCLUDED(memory_quota_mu_) {
// Add the released memory to our free bytes counter... if this increases
// from 0 to non-zero, then we have more to do, otherwise, we're actually
// done.
if (free_bytes_.fetch_add(n, std::memory_order_release) != 0) return;
MaybeRegisterReclaimer();
}
// An automatic releasing reservation of memory.
class Reservation {
public:
Reservation() = default;
Reservation(const Reservation&) = delete;
Reservation& operator=(const Reservation&) = delete;
Reservation(Reservation&&) = default;
Reservation& operator=(Reservation&&) = default;
~Reservation() {
if (allocator_ != nullptr) allocator_->Release(size_);
}
private:
friend class MemoryAllocator;
Reservation(RefCountedPtr<MemoryAllocator> allocator, size_t size)
: allocator_(allocator), size_(size) {}
RefCountedPtr<MemoryAllocator> allocator_ = nullptr;
size_t size_ = 0;
};
// Reserve bytes from the quota and automatically release them when
// Reservation is destroyed.
Reservation MakeReservation(MemoryRequest request) {
return Reservation(Ref(DEBUG_LOCATION, "Reservation"), Reserve(request));
}
// Post a reclaimer for some reclamation pass.
void PostReclaimer(ReclamationPass pass,
std::function<void(ReclamationSweep)>);
// Allocate a new object of type T, with constructor arguments.
// The returned type is wrapped, and upon destruction the reserved memory
// will be released to the allocator automatically. As such, T must have a
// virtual destructor so we can insert the necessary hook.
template <typename T, typename... Args>
absl::enable_if_t<std::has_virtual_destructor<T>::value, T*> New(
Args&&... args) ABSL_LOCKS_EXCLUDED(memory_quota_mu_) {
// Wrap T such that when it's destroyed, we can release memory back to the
// allocator.
class Wrapper final : public T {
public:
explicit Wrapper(RefCountedPtr<MemoryAllocator> allocator, Args&&... args)
: T(std::forward<Args>(args)...), allocator_(std::move(allocator)) {}
~Wrapper() override { allocator_->Release(sizeof(*this)); }
private:
const RefCountedPtr<MemoryAllocator> allocator_;
};
Reserve(sizeof(Wrapper));
return new Wrapper(Ref(DEBUG_LOCATION, "Wrapper"),
std::forward<Args>(args)...);
}
// Construct a unique ptr immediately.
template <typename T, typename... Args>
std::unique_ptr<T> MakeUnique(Args&&... args)
ABSL_LOCKS_EXCLUDED(memory_quota_mu_) {
return std::unique_ptr<T>(New<T>(std::forward<Args>(args)...));
}
// Allocate a slice, using MemoryRequest to size the number of returned bytes.
// For a variable length request, check the returned slice length to verify
// how much memory was allocated.
// Takes care of reserving memory for any relevant control structures also.
grpc_slice MakeSlice(MemoryRequest request);
// A C++ allocator for containers of T.
template <typename T>
class Container {
public:
// Construct the allocator: \a underlying_allocator is borrowed, and must
// outlive this object.
explicit Container(MemoryAllocator* underlying_allocator)
: underlying_allocator_(underlying_allocator) {}
MemoryAllocator* underlying_allocator() const {
return underlying_allocator_;
}
using value_type = T;
template <typename U>
explicit Container(const Container<U>& other)
: underlying_allocator_(other.underlying_allocator()) {}
T* allocate(size_t n) {
underlying_allocator_->Reserve(n * sizeof(T));
return static_cast<T*>(::operator new(n * sizeof(T)));
}
void deallocate(T* p, size_t n) {
::operator delete(p);
underlying_allocator_->Release(n * sizeof(T));
}
private:
MemoryAllocator* underlying_allocator_;
};
private:
// Primitive reservation function.
absl::optional<size_t> TryReserve(MemoryRequest request) GRPC_MUST_USE_RESULT;
// Replenish bytes from the quota, without blocking, possibly entering
// overcommit.
void Replenish() ABSL_LOCKS_EXCLUDED(memory_quota_mu_);
// If we have not already, register a reclamation function against the quota
// to sweep any free memory back to that quota.
void MaybeRegisterReclaimer() ABSL_LOCKS_EXCLUDED(memory_quota_mu_);
void MaybeRegisterReclaimerLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(memory_quota_mu_);
// Amount of memory this allocator has cached for its own use: to avoid quota
// contention, each MemoryAllocator can keep some memory in addition to what
// it is immediately using, and the quota can pull it back under memory
// pressure.
std::atomic<size_t> free_bytes_{0};
// Mutex guarding the backing resource quota.
Mutex memory_quota_mu_;
// Backing resource quota.
RefCountedPtr<MemoryQuota> memory_quota_ ABSL_GUARDED_BY(memory_quota_mu_);
// Amount of memory taken from the quota by this allocator.
size_t taken_bytes_ ABSL_GUARDED_BY(memory_quota_mu_) = 0;
// Indices into the various reclaimer queues, used so that we can cancel
// reclamation should we shutdown or get rebound.
ReclaimerQueue::Index
reclamation_indices_[kNumReclamationPasses] ABSL_GUARDED_BY(
memory_quota_mu_) = {
ReclaimerQueue::kInvalidIndex, ReclaimerQueue::kInvalidIndex,
ReclaimerQueue::kInvalidIndex, ReclaimerQueue::kInvalidIndex};
};
// Wrapper type around std::vector to make initialization against a
// MemoryAllocator based container allocator easy.
template <typename T>
class Vector : public std::vector<T, MemoryAllocator::Container<T>> {
public:
explicit Vector(MemoryAllocator* allocator)
: std::vector<T, MemoryAllocator::Container<T>>(
MemoryAllocator::Container<T>(allocator)) {}
};
// MemoryQuota tracks the amount of memory available as part of a ResourceQuota.
class MemoryQuota final : public DualRefCounted<MemoryQuota> {
public:
MemoryQuota();
OrphanablePtr<MemoryAllocator> MakeMemoryAllocator() {
return MakeOrphanable<MemoryAllocator>(
Ref(DEBUG_LOCATION, "MakeMemoryAllocator"));
}
// Resize the quota to new_size.
void SetSize(size_t new_size);
private:
friend class MemoryAllocator;
friend class ReclamationSweep;
class WaitForSweepPromise;
void Orphan() override;
// Forcefully take some memory from the quota, potentially entering
// overcommit.
void Take(size_t amount);
// Finish reclamation pass.
void FinishReclamation(uint64_t token);
// Return some memory to the quota.
void Return(size_t amount);
// Instantaneous memory pressure approximation.
size_t InstantaneousPressure() const;
static constexpr intptr_t kInitialSize = std::numeric_limits<intptr_t>::max();
// The amount of memory that's free in this quota.
// We use intptr_t as a reasonable proxy for ssize_t that's portable.
// We allow arbitrary overcommit and so this must allow negative values.
std::atomic<intptr_t> free_bytes_{kInitialSize};
// The total number of bytes in this quota.
std::atomic<size_t> quota_size_{kInitialSize};
// Reclaimer queues.
ReclaimerQueue reclaimers_[kNumReclamationPasses];
// The reclaimer activity consumes reclaimers whenever we are in overcommit to
// try and get back under memory limits.
ActivityPtr reclaimer_activity_;
// Each time we do a reclamation sweep, we increment this counter and give it
// to the sweep in question. In this way, should we choose to cancel a sweep
// we can do so and not get confused when the sweep reports back that it's
// completed.
// We also increment this counter on completion of a sweep, as an indicator
// that the wait has ended.
std::atomic<uint64_t> reclamation_counter_{0};
};
} // namespace grpc_core
#endif // GRPC_CORE_LIB_RESOURCE_QUOTA_MEMORY_QUOTA_H

@ -0,0 +1,27 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/resource_quota/resource_quota.h"
namespace grpc_core {
ResourceQuota::ResourceQuota()
: memory_quota_(MakeRefCounted<MemoryQuota>()),
thread_quota_(MakeRefCounted<ThreadQuota>()) {}
ResourceQuota::~ResourceQuota() = default;
} // namespace grpc_core

@ -0,0 +1,48 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_RESOURCE_QUOTA_RESOURCE_QUOTA_H
#define GRPC_CORE_LIB_RESOURCE_QUOTA_RESOURCE_QUOTA_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/resource_quota/thread_quota.h"
namespace grpc_core {
class ResourceQuota : public RefCounted<ResourceQuota> {
public:
ResourceQuota();
~ResourceQuota() override;
ResourceQuota(const ResourceQuota&) = delete;
ResourceQuota& operator=(const ResourceQuota&) = delete;
const RefCountedPtr<MemoryQuota>& memory_quota() const {
return memory_quota_;
}
const RefCountedPtr<ThreadQuota>& thread_quota() const {
return thread_quota_;
}
private:
RefCountedPtr<MemoryQuota> memory_quota_;
RefCountedPtr<ThreadQuota> thread_quota_;
};
} // namespace grpc_core
#endif // GRPC_CORE_LIB_RESOURCE_QUOTA_RESOURCE_QUOTA_H

@ -0,0 +1,43 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/resource_quota/thread_quota.h"
namespace grpc_core {
ThreadQuota::ThreadQuota() = default;
ThreadQuota::~ThreadQuota() = default;
void ThreadQuota::SetMax(size_t new_max) {
MutexLock lock(&mu_);
max_ = new_max;
}
bool ThreadQuota::Reserve(size_t num_threads) {
MutexLock lock(&mu_);
if (allocated_ + num_threads > max_) return false;
allocated_ += num_threads;
return true;
}
void ThreadQuota::Release(size_t num_threads) {
MutexLock lock(&mu_);
GPR_ASSERT(num_threads <= allocated_);
allocated_ -= num_threads;
}
} // namespace grpc_core

@ -0,0 +1,55 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_RESOURCE_QUOTA_THREAD_QUOTA_H
#define GRPC_CORE_LIB_RESOURCE_QUOTA_THREAD_QUOTA_H
#include <grpc/support/port_platform.h>
#include <cstddef>
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_core {
// Tracks the amount of threads in a resource quota.
class ThreadQuota : public RefCounted<ThreadQuota> {
public:
ThreadQuota();
~ThreadQuota() override;
ThreadQuota(const ThreadQuota&) = delete;
ThreadQuota& operator=(const ThreadQuota&) = delete;
// Set the maximum number of threads that can be used by this quota.
// If there are more, new reservations will fail until the quota is available.
void SetMax(size_t new_max);
// Try to allocate some number of threads.
// Returns true if the allocation succeeded, false otherwise.
bool Reserve(size_t num_threads);
// Release some number of threads.
void Release(size_t num_threads);
private:
Mutex mu_;
size_t allocated_ ABSL_GUARDED_BY(mu_) = 0;
size_t max_ ABSL_GUARDED_BY(mu_) = std::numeric_limits<size_t>::max();
};
} // namespace grpc_core
#endif // GRPC_CORE_LIB_RESOURCE_QUOTA_THREAD_QUOTA_H

@ -0,0 +1,94 @@
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package")
licenses(["notice"])
grpc_package(name = "test/core/promise")
load("//test/core/util:grpc_fuzzer.bzl", "grpc_proto_fuzzer")
grpc_cc_test(
name = "memory_quota_test",
srcs = ["memory_quota_test.cc"],
external_deps = [
"gtest",
],
language = "c++",
uses_polling = False,
deps = [
"//:memory_quota",
"//test/core/util:grpc_suppressions",
],
)
grpc_cc_test(
name = "thread_quota_test",
srcs = ["thread_quota_test.cc"],
external_deps = [
"gtest",
],
language = "c++",
uses_polling = False,
deps = [
"//:thread_quota",
"//test/core/util:grpc_suppressions",
],
)
grpc_cc_test(
name = "resource_quota_test",
srcs = ["resource_quota_test.cc"],
external_deps = [
"gtest",
],
language = "c++",
uses_polling = False,
deps = [
"//:resource_quota",
"//test/core/util:grpc_suppressions",
],
)
grpc_cc_test(
name = "memory_quota_stress_test",
srcs = ["memory_quota_stress_test.cc"],
language = "c++",
# We only run this test under Linux, and really only care about the
# TSAN results.
tags = [
"no_mac",
"no_windows",
],
uses_polling = False,
deps = [
"//:memory_quota",
"//test/core/util:grpc_suppressions",
],
)
grpc_proto_fuzzer(
name = "memory_quota_fuzzer",
srcs = ["memory_quota_fuzzer.cc"],
corpus = "memory_quota_fuzzer_corpus",
language = "C++",
proto = "memory_quota_fuzzer.proto",
tags = ["no_windows"],
uses_polling = False,
deps = [
"//:memory_quota",
"//test/core/util:grpc_test_util",
],
)

@ -0,0 +1,169 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <map>
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/libfuzzer/libfuzzer_macro.h"
#include "test/core/resource_quota/memory_quota_fuzzer.pb.h"
bool squelch = true;
bool leak_check = true;
namespace grpc_core {
namespace {
ReclamationPass MapReclamationPass(memory_quota_fuzzer::Reclaimer::Pass pass) {
switch (pass) {
case memory_quota_fuzzer::Reclaimer::BENIGN:
return ReclamationPass::kBenign;
case memory_quota_fuzzer::Reclaimer::IDLE:
return ReclamationPass::kIdle;
case memory_quota_fuzzer::Reclaimer::DESTRUCTIVE:
return ReclamationPass::kDestructive;
default:
return ReclamationPass::kBenign;
}
}
class Fuzzer {
public:
void Run(const memory_quota_fuzzer::Msg& msg) {
grpc_core::ExecCtx exec_ctx;
RunMsg(msg);
memory_quotas_.clear();
memory_allocators_.clear();
allocations_.clear();
}
private:
void RunMsg(const memory_quota_fuzzer::Msg& msg) {
for (int i = 0; i < msg.actions_size(); ++i) {
const auto& action = msg.actions(i);
switch (action.action_type_case()) {
case memory_quota_fuzzer::Action::kFlushExecCtx:
ExecCtx::Get()->Flush();
break;
case memory_quota_fuzzer::Action::kCreateQuota:
memory_quotas_.emplace(action.quota(),
RefCountedPtr<MemoryQuota>(new MemoryQuota()));
break;
case memory_quota_fuzzer::Action::kDeleteQuota:
memory_quotas_.erase(action.quota());
break;
case memory_quota_fuzzer::Action::kCreateAllocator:
WithQuota(action.quota(),
[this, action](RefCountedPtr<MemoryQuota> q) {
memory_allocators_.emplace(action.allocator(),
q->MakeMemoryAllocator());
});
break;
case memory_quota_fuzzer::Action::kDeleteAllocator:
memory_allocators_.erase(action.allocator());
break;
case memory_quota_fuzzer::Action::kSetQuotaSize:
WithQuota(action.quota(), [action](RefCountedPtr<MemoryQuota> q) {
q->SetSize(Clamp(action.set_quota_size(), uint64_t{0},
uint64_t{std::numeric_limits<ssize_t>::max()}));
});
break;
case memory_quota_fuzzer::Action::kRebindQuota:
WithQuota(action.quota(),
[this, action](RefCountedPtr<MemoryQuota> q) {
WithAllocator(action.allocator(),
[q](MemoryAllocator* a) { a->Rebind(q); });
});
break;
case memory_quota_fuzzer::Action::kCreateAllocation: {
auto min = action.create_allocation().min();
auto max = action.create_allocation().max();
if (min > max) break;
MemoryRequest req(min, max);
WithAllocator(
action.allocator(), [this, action, req](MemoryAllocator* a) {
auto alloc = a->MakeReservation(req);
allocations_.emplace(action.allocation(), std::move(alloc));
});
} break;
case memory_quota_fuzzer::Action::kDeleteAllocation:
allocations_.erase(action.allocation());
break;
case memory_quota_fuzzer::Action::kPostReclaimer: {
std::function<void(ReclamationSweep)> reclaimer;
auto cfg = action.post_reclaimer();
if (cfg.synchronous()) {
reclaimer = [this, cfg](ReclamationSweep) { RunMsg(cfg.msg()); };
} else {
reclaimer = [cfg, this](ReclamationSweep sweep) {
struct Args {
ReclamationSweep sweep;
memory_quota_fuzzer::Msg msg;
Fuzzer* fuzzer;
};
auto* args = new Args{std::move(sweep), cfg.msg(), this};
auto* closure = GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error*) {
auto* args = static_cast<Args*>(arg);
args->fuzzer->RunMsg(args->msg);
delete args;
},
args, nullptr);
ExecCtx::Get()->Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
};
auto pass = MapReclamationPass(cfg.pass());
WithAllocator(action.allocator(),
[pass, reclaimer](MemoryAllocator* a) {
a->PostReclaimer(pass, reclaimer);
});
}
} break;
case memory_quota_fuzzer::Action::ACTION_TYPE_NOT_SET:
break;
}
}
}
template <typename F>
void WithQuota(int quota, F f) {
auto it = memory_quotas_.find(quota);
if (it == memory_quotas_.end()) return;
f(it->second);
}
template <typename F>
void WithAllocator(int allocator, F f) {
auto it = memory_allocators_.find(allocator);
if (it == memory_allocators_.end()) return;
f(it->second.get());
}
std::map<int, RefCountedPtr<MemoryQuota>> memory_quotas_;
std::map<int, OrphanablePtr<MemoryAllocator>> memory_allocators_;
std::map<int, MemoryAllocator::Reservation> allocations_;
};
} // namespace
} // namespace grpc_core
static void dont_log(gpr_log_func_args* /*args*/) {}
DEFINE_PROTO_FUZZER(const memory_quota_fuzzer::Msg& msg) {
if (squelch) gpr_set_log_function(dont_log);
gpr_log_verbosity_init();
grpc_tracer_init();
grpc_core::Fuzzer().Run(msg);
}

@ -0,0 +1,57 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package memory_quota_fuzzer;
message Empty {}
message Reclaimer {
enum Pass {
BENIGN = 0;
IDLE = 1;
DESTRUCTIVE = 2;
}
bool synchronous = 1;
Pass pass = 2;
Msg msg = 3;
}
message AllocationRequest {
uint32 min = 1;
uint32 max = 2;
}
message Action {
int32 quota = 1;
int32 allocator = 2;
int32 allocation = 3;
oneof action_type {
Empty flush_exec_ctx = 7;
Empty create_quota = 10;
Empty delete_quota = 11;
Empty create_allocator = 12;
Empty delete_allocator = 13;
uint64 set_quota_size = 14;
Empty rebind_quota = 15;
AllocationRequest create_allocation = 16;
Empty delete_allocation = 17;
Reclaimer post_reclaimer = 18;
}
}
message Msg {
repeated Action actions = 2;
}

@ -0,0 +1,210 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <random>
#include <thread>
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/resource_quota/memory_quota.h"
namespace grpc_core {
namespace {
class StressTest {
public:
// Create a stress test with some size.
StressTest(size_t num_quotas, size_t num_allocators) {
for (size_t i = 0; i < num_quotas; ++i) {
quotas_.emplace_back(new MemoryQuota());
}
std::random_device g;
std::uniform_int_distribution<size_t> dist(0, num_quotas - 1);
for (size_t i = 0; i < num_allocators; ++i) {
allocators_.emplace_back(quotas_[dist(g)]->MakeMemoryAllocator());
}
}
// Run the thread for some period of time.
void Run(int seconds) {
std::vector<std::thread> threads;
// A few threads constantly rebinding allocators to different quotas.
threads.reserve(2);
for (int i = 0; i < 2; i++) threads.push_back(Run(Rebinder));
// And another few threads constantly resizing quotas.
for (int i = 0; i < 2; i++) threads.push_back(Run(Resizer));
// For each (allocator, pass), start a thread continuously allocating from
// that allocator. Whenever the first allocation is made, schedule a
// reclaimer for that pass.
for (size_t i = 0; i < allocators_.size(); i++) {
auto* allocator = allocators_[i].get();
for (ReclamationPass pass :
{ReclamationPass::kBenign, ReclamationPass::kIdle,
ReclamationPass::kDestructive}) {
threads.push_back(Run([allocator, pass](StatePtr st) mutable {
if (st->RememberReservation(
allocator->MakeReservation(st->RandomRequest()))) {
allocator->PostReclaimer(
pass, [st](ReclamationSweep) { st->ForgetReservations(); });
}
}));
}
}
// All threads started, wait for the alloted time.
std::this_thread::sleep_for(std::chrono::seconds(seconds));
// Toggle the completion bit, and then wait for the threads.
done_.store(true, std::memory_order_relaxed);
while (!threads.empty()) {
threads.back().join();
threads.pop_back();
}
}
private:
// Per-thread state.
// Not everything is used on every thread, but it's not terrible having the
// extra state around and it does simplify things somewhat.
class State {
public:
explicit State(StressTest* test)
: test_(test),
quotas_distribution_(0, test_->quotas_.size() - 1),
allocators_distribution_(0, test_->allocators_.size() - 1),
size_distribution_(1, 4 * 1024 * 1024),
quota_size_distribution_(1024 * 1024, size_t(8) * 1024 * 1024 * 1024),
choose_variable_size_(1, 100) {}
// Choose a random quota, and return an owned pointer to it.
// Not thread-safe, only callable from the owning thread.
RefCountedPtr<MemoryQuota> RandomQuota() {
return test_->quotas_[quotas_distribution_(g_)];
}
// Choose a random allocator, and return a borrowed pointer to it.
// Not thread-safe, only callable from the owning thread.
MemoryAllocator* RandomAllocator() {
return test_->allocators_[allocators_distribution_(g_)].get();
}
// Random memory request size - 1% of allocations are chosen to be variable
// sized - the rest are fixed (since variable sized create some contention
// problems between allocator threads of different passes on the same
// allocator).
// Not thread-safe, only callable from the owning thread.
MemoryRequest RandomRequest() {
size_t a = size_distribution_(g_);
if (choose_variable_size_(g_) == 1) {
size_t b = size_distribution_(g_);
return MemoryRequest(std::min(a, b), std::max(a, b));
}
return MemoryRequest(a);
}
// Choose a new size for a backing quota.
// Not thread-safe, only callable from the owning thread.
size_t RandomQuotaSize() { return quota_size_distribution_(g_); }
// Remember a reservation, return true if it's the first remembered since
// the last reclamation.
// Thread-safe.
bool RememberReservation(MemoryAllocator::Reservation reservation)
ABSL_LOCKS_EXCLUDED(mu_) {
MutexLock lock(&mu_);
bool was_empty = reservations_.empty();
reservations_.emplace_back(std::move(reservation));
return was_empty;
}
// Return all reservations made until this moment, so that they can be
// dropped.
std::vector<MemoryAllocator::Reservation> ForgetReservations()
ABSL_LOCKS_EXCLUDED(mu_) {
MutexLock lock(&mu_);
return std::move(reservations_);
}
private:
// Owning test.
StressTest* const test_;
// Random number generator.
std::mt19937 g_{std::random_device()()};
// Distribution to choose a quota.
std::uniform_int_distribution<size_t> quotas_distribution_;
// Distribution to choose an allocator.
std::uniform_int_distribution<size_t> allocators_distribution_;
// Distribution to choose an allocation size.
std::uniform_int_distribution<size_t> size_distribution_;
// Distribution to choose a quota size.
std::uniform_int_distribution<size_t> quota_size_distribution_;
// Distribution to choose whether to make a variable-sized allocation.
std::uniform_int_distribution<size_t> choose_variable_size_;
// Mutex to protect the reservation list.
Mutex mu_;
// Reservations remembered by this thread.
std::vector<MemoryAllocator::Reservation> reservations_
ABSL_GUARDED_BY(mu_);
};
// Type alias since we always pass around these shared pointers.
using StatePtr = std::shared_ptr<State>;
// Choose one allocator, one quota, rebind the allocator to the quota.
static void Rebinder(StatePtr st) {
MemoryAllocator* allocator = st->RandomAllocator();
RefCountedPtr<MemoryQuota> quota = st->RandomQuota();
allocator->Rebind(std::move(quota));
}
// Choose one allocator, resize it to a randomly chosen size.
static void Resizer(StatePtr st) {
RefCountedPtr<MemoryQuota> quota = st->RandomQuota();
size_t size = st->RandomQuotaSize();
quota->SetSize(size);
}
// Create a thread that repeatedly runs a function until the test is done.
// We create one instance of State that we pass as a StatePtr to said
// function as the current overall state for this thread.
// Monitors done_ to see when we should stop.
// Ensures there's an ExecCtx for each iteration of the loop.
template <typename Fn>
std::thread Run(Fn fn) {
return std::thread([this, fn]() mutable {
auto state = std::make_shared<State>(this);
while (!done_.load(std::memory_order_relaxed)) {
ExecCtx exec_ctx;
fn(state);
}
});
}
// Flag for when the test is completed.
std::atomic<bool> done_{false};
// Memory quotas to test against. We build this up at construction time, but
// then don't resize, so we can load from it continuously from all of the
// threads.
std::vector<RefCountedPtr<MemoryQuota>> quotas_;
// Memory allocators to test against. Similarly, built at construction time,
// and then the shape of this vector is not changed.
std::vector<OrphanablePtr<MemoryAllocator>> allocators_;
};
} // namespace
} // namespace grpc_core
int main(int, char**) { grpc_core::StressTest(16, 64).Run(8); }

@ -0,0 +1,176 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/resource_quota/memory_quota.h"
#include <gtest/gtest.h>
#include "absl/synchronization/notification.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/slice/slice_refcount.h"
namespace grpc_core {
namespace testing {
//
// Helpers
//
template <size_t kSize>
struct Sized {
char blah[kSize];
virtual ~Sized() {}
};
//
// MemoryRequestTest
//
TEST(MemoryRequestTest, ConversionFromSize) {
MemoryRequest request = 3;
EXPECT_EQ(request.min(), 3);
EXPECT_EQ(request.max(), 3);
}
TEST(MemoryRequestTest, MinMax) {
MemoryRequest request(3, 7);
EXPECT_EQ(request.min(), 3);
EXPECT_EQ(request.max(), 7);
}
//
// MemoryQuotaTest
//
TEST(MemoryQuotaTest, NoOp) {
RefCountedPtr<MemoryQuota> memory_quota = MakeRefCounted<MemoryQuota>();
}
TEST(MemoryQuotaTest, CreateAllocatorNoOp) {
RefCountedPtr<MemoryQuota> memory_quota = MakeRefCounted<MemoryQuota>();
auto memory_allocator = memory_quota->MakeMemoryAllocator();
}
TEST(MemoryQuotaTest, CreateObjectFromAllocator) {
RefCountedPtr<MemoryQuota> memory_quota = MakeRefCounted<MemoryQuota>();
auto memory_allocator = memory_quota->MakeMemoryAllocator();
auto object = memory_allocator->MakeUnique<Sized<4096>>();
}
TEST(MemoryQuotaTest, CreateSomeObjectsAndExpectReclamation) {
ExecCtx exec_ctx;
RefCountedPtr<MemoryQuota> memory_quota = MakeRefCounted<MemoryQuota>();
memory_quota->SetSize(4096);
auto memory_allocator = memory_quota->MakeMemoryAllocator();
auto object = memory_allocator->MakeUnique<Sized<2048>>();
memory_allocator->PostReclaimer(
ReclamationPass::kDestructive,
[&object](ReclamationSweep) { object.reset(); });
auto object2 = memory_allocator->MakeUnique<Sized<2048>>();
exec_ctx.Flush();
EXPECT_EQ(object.get(), nullptr);
memory_allocator->PostReclaimer(
ReclamationPass::kDestructive,
[&object2](ReclamationSweep) { object2.reset(); });
auto object3 = memory_allocator->MakeUnique<Sized<2048>>();
exec_ctx.Flush();
EXPECT_EQ(object2.get(), nullptr);
}
TEST(MemoryQuotaTest, BasicRebind) {
ExecCtx exec_ctx;
RefCountedPtr<MemoryQuota> memory_quota = MakeRefCounted<MemoryQuota>();
memory_quota->SetSize(4096);
RefCountedPtr<MemoryQuota> memory_quota2 = MakeRefCounted<MemoryQuota>();
memory_quota2->SetSize(4096);
auto memory_allocator = memory_quota2->MakeMemoryAllocator();
auto object = memory_allocator->MakeUnique<Sized<2048>>();
memory_allocator->Rebind(memory_quota);
auto memory_allocator2 = memory_quota2->MakeMemoryAllocator();
memory_allocator2->PostReclaimer(ReclamationPass::kDestructive,
[](ReclamationSweep) {
// Taken memory should be reassigned to
// memory_quota, so this should never be
// reached.
abort();
});
memory_allocator->PostReclaimer(ReclamationPass::kDestructive,
[&object](ReclamationSweep) {
// The new memory allocator should reclaim
// the object allocated against the previous
// quota because that's now part of this
// quota.
object.reset();
});
auto object2 = memory_allocator->MakeUnique<Sized<2048>>();
exec_ctx.Flush();
EXPECT_EQ(object.get(), nullptr);
}
TEST(MemoryQuotaTest, ReserveRangeNoPressure) {
RefCountedPtr<MemoryQuota> memory_quota = MakeRefCounted<MemoryQuota>();
auto memory_allocator = memory_quota->MakeMemoryAllocator();
size_t total = 0;
for (int i = 0; i < 10000; i++) {
auto n = memory_allocator->Reserve(MemoryRequest(100, 40000));
EXPECT_EQ(n, 40000);
total += n;
}
memory_allocator->Release(total);
}
TEST(MemoryQuotaTest, MakeSlice) {
RefCountedPtr<MemoryQuota> memory_quota = MakeRefCounted<MemoryQuota>();
auto memory_allocator = memory_quota->MakeMemoryAllocator();
std::vector<grpc_slice> slices;
for (int i = 1; i < 1000; i++) {
int min = i;
int max = 10 * i - 9;
slices.push_back(memory_allocator->MakeSlice(MemoryRequest(min, max)));
}
for (grpc_slice slice : slices) {
grpc_slice_unref_internal(slice);
}
}
TEST(MemoryQuotaTest, ContainerAllocator) {
RefCountedPtr<MemoryQuota> memory_quota = MakeRefCounted<MemoryQuota>();
auto memory_allocator = memory_quota->MakeMemoryAllocator();
Vector<int> vec(memory_allocator.get());
for (int i = 0; i < 100000; i++) {
vec.push_back(i);
}
}
} // namespace testing
} // namespace grpc_core
// Hook needed to run ExecCtx outside of iomgr.
void grpc_set_default_iomgr_platform() {}
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
gpr_log_verbosity_init();
return RUN_ALL_TESTS();
}

@ -0,0 +1,37 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/resource_quota/resource_quota.h"
#include <gtest/gtest.h>
namespace grpc_core {
namespace testing {
TEST(ResourceQuotaTest, Works) {
auto q = MakeRefCounted<ResourceQuota>();
EXPECT_NE(q->thread_quota(), nullptr);
EXPECT_NE(q->memory_quota(), nullptr);
}
} // namespace testing
} // namespace grpc_core
// Hook needed to run ExecCtx outside of iomgr.
void grpc_set_default_iomgr_platform() {}
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -0,0 +1,45 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/resource_quota/thread_quota.h"
#include <gtest/gtest.h>
namespace grpc_core {
namespace testing {
TEST(ThreadQuotaTest, Works) {
auto q = MakeRefCounted<ThreadQuota>();
EXPECT_TRUE(q->Reserve(128));
q->SetMax(10);
EXPECT_FALSE(q->Reserve(128));
EXPECT_FALSE(q->Reserve(1));
q->Release(118);
EXPECT_FALSE(q->Reserve(1));
q->Release(1);
EXPECT_TRUE(q->Reserve(1));
EXPECT_FALSE(q->Reserve(1));
q->Release(10);
}
} // namespace testing
} // namespace grpc_core
// Hook needed to run ExecCtx outside of iomgr.
void grpc_set_default_iomgr_platform() {}
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -1777,6 +1777,26 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c",
"name": "memory_quota_stress_test",
"platforms": [
"linux",
"posix"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
@ -2209,30 +2229,6 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c",
"name": "resource_quota_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
@ -2795,6 +2791,30 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c",
"name": "test_core_iomgr_resource_quota_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
@ -5657,6 +5677,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "memory_quota_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
@ -6987,6 +7031,30 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "test_core_resource_quota_resource_quota_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
@ -7107,6 +7175,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "thread_quota_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save