diff --git a/BUILD b/BUILD index 530904c7a5f..8b753cdee12 100644 --- a/BUILD +++ b/BUILD @@ -1284,6 +1284,60 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "memory_quota", + srcs = [ + "src/core/lib/resource_quota/memory_quota.cc", + ], + hdrs = [ + "src/core/lib/resource_quota/memory_quota.h", + ], + deps = [ + "activity", + "dual_ref_counted", + "exec_ctx_wakeup_scheduler", + "gpr_base", + "loop", + "orphanable", + "poll", + "race", + "ref_counted_ptr", + "seq", + "slice_refcount", + "useful", + ], +) + +grpc_cc_library( + name = "thread_quota", + srcs = [ + "src/core/lib/resource_quota/thread_quota.cc", + ], + hdrs = [ + "src/core/lib/resource_quota/thread_quota.h", + ], + deps = [ + "gpr_base", + "ref_counted", + ], +) + +grpc_cc_library( + name = "resource_quota", + srcs = [ + "src/core/lib/resource_quota/resource_quota.cc", + ], + hdrs = [ + "src/core/lib/resource_quota/resource_quota.h", + ], + deps = [ + "gpr_base", + "memory_quota", + "ref_counted", + "thread_quota", + ], +) + grpc_cc_library( name = "slice_refcount", srcs = [ diff --git a/CMakeLists.txt b/CMakeLists.txt index 14f6e9903f2..1a2f9e6cc11 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -645,6 +645,9 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_c lame_client_test) add_dependencies(buildtests_c load_file_test) add_dependencies(buildtests_c manual_constructor_test) + if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) + add_dependencies(buildtests_c memory_quota_stress_test) + endif() add_dependencies(buildtests_c message_compress_test) add_dependencies(buildtests_c metadata_test) add_dependencies(buildtests_c minimal_stack_is_minimal_test) @@ -671,7 +674,6 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_c resolve_address_using_native_resolver_posix_test) endif() add_dependencies(buildtests_c resolve_address_using_native_resolver_test) - add_dependencies(buildtests_c resource_quota_test) add_dependencies(buildtests_c secure_channel_create_test) add_dependencies(buildtests_c secure_endpoint_test) add_dependencies(buildtests_c security_connector_test) @@ -709,6 +711,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_c tcp_server_posix_test) endif() add_dependencies(buildtests_c test_core_gpr_time_test) + add_dependencies(buildtests_c test_core_iomgr_resource_quota_test) add_dependencies(buildtests_c test_core_security_credentials_test) add_dependencies(buildtests_c test_core_slice_slice_test) add_dependencies(buildtests_c thd_test) @@ -916,6 +919,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx loop_test) add_dependencies(buildtests_cxx match_test) add_dependencies(buildtests_cxx matchers_test) + add_dependencies(buildtests_cxx memory_quota_test) add_dependencies(buildtests_cxx message_allocator_end2end_test) add_dependencies(buildtests_cxx metadata_map_test) add_dependencies(buildtests_cxx miscompile_with_no_unique_address_test) @@ -989,11 +993,13 @@ if(gRPC_BUILD_TESTS) endif() add_dependencies(buildtests_cxx string_ref_test) add_dependencies(buildtests_cxx table_test) + add_dependencies(buildtests_cxx test_core_resource_quota_resource_quota_test) add_dependencies(buildtests_cxx test_cpp_client_credentials_test) add_dependencies(buildtests_cxx test_cpp_server_credentials_test) add_dependencies(buildtests_cxx test_cpp_util_slice_test) add_dependencies(buildtests_cxx test_cpp_util_time_test) add_dependencies(buildtests_cxx thread_manager_test) + add_dependencies(buildtests_cxx thread_quota_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx thread_stress_test) endif() @@ -6145,6 +6151,49 @@ target_link_libraries(manual_constructor_test ) +endif() +if(gRPC_BUILD_TESTS) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) + + add_executable(memory_quota_stress_test + src/core/lib/debug/trace.cc + src/core/lib/iomgr/combiner.cc + src/core/lib/iomgr/error.cc + src/core/lib/iomgr/exec_ctx.cc + src/core/lib/iomgr/executor.cc + src/core/lib/iomgr/iomgr_internal.cc + src/core/lib/promise/activity.cc + src/core/lib/resource_quota/memory_quota.cc + src/core/lib/slice/slice.cc + src/core/lib/slice/slice_refcount.cc + src/core/lib/slice/slice_string_helpers.cc + src/core/lib/slice/static_slice.cc + test/core/resource_quota/memory_quota_stress_test.cc + ) + + target_include_directories(memory_quota_stress_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + ) + + target_link_libraries(memory_quota_stress_test + ${_gRPC_ALLTARGETS_LIBRARIES} + absl::statusor + absl::variant + gpr + ) + + +endif() endif() if(gRPC_BUILD_TESTS) @@ -6650,33 +6699,6 @@ target_link_libraries(resolve_address_using_native_resolver_test ) -endif() -if(gRPC_BUILD_TESTS) - -add_executable(resource_quota_test - test/core/iomgr/resource_quota_test.cc -) - -target_include_directories(resource_quota_test - PRIVATE - ${CMAKE_CURRENT_SOURCE_DIR} - ${CMAKE_CURRENT_SOURCE_DIR}/include - ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} - ${_gRPC_RE2_INCLUDE_DIR} - ${_gRPC_SSL_INCLUDE_DIR} - ${_gRPC_UPB_GENERATED_DIR} - ${_gRPC_UPB_GRPC_GENERATED_DIR} - ${_gRPC_UPB_INCLUDE_DIR} - ${_gRPC_XXHASH_INCLUDE_DIR} - ${_gRPC_ZLIB_INCLUDE_DIR} -) - -target_link_libraries(resource_quota_test - ${_gRPC_ALLTARGETS_LIBRARIES} - grpc_test_util -) - - endif() if(gRPC_BUILD_TESTS) @@ -7373,6 +7395,33 @@ target_link_libraries(test_core_gpr_time_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(test_core_iomgr_resource_quota_test + test/core/iomgr/resource_quota_test.cc +) + +target_include_directories(test_core_iomgr_resource_quota_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} +) + +target_link_libraries(test_core_iomgr_resource_quota_test + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util +) + + endif() if(gRPC_BUILD_TESTS) @@ -13280,6 +13329,55 @@ target_link_libraries(matchers_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(memory_quota_test + src/core/lib/debug/trace.cc + src/core/lib/iomgr/combiner.cc + src/core/lib/iomgr/error.cc + src/core/lib/iomgr/exec_ctx.cc + src/core/lib/iomgr/executor.cc + src/core/lib/iomgr/iomgr_internal.cc + src/core/lib/promise/activity.cc + src/core/lib/resource_quota/memory_quota.cc + src/core/lib/slice/slice.cc + src/core/lib/slice/slice_refcount.cc + src/core/lib/slice/slice_string_helpers.cc + src/core/lib/slice/static_slice.cc + test/core/resource_quota/memory_quota_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(memory_quota_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(memory_quota_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + absl::statusor + absl::variant + gpr +) + + endif() if(gRPC_BUILD_TESTS) @@ -15857,6 +15955,57 @@ target_link_libraries(table_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(test_core_resource_quota_resource_quota_test + src/core/lib/debug/trace.cc + src/core/lib/iomgr/combiner.cc + src/core/lib/iomgr/error.cc + src/core/lib/iomgr/exec_ctx.cc + src/core/lib/iomgr/executor.cc + src/core/lib/iomgr/iomgr_internal.cc + src/core/lib/promise/activity.cc + src/core/lib/resource_quota/memory_quota.cc + src/core/lib/resource_quota/resource_quota.cc + src/core/lib/resource_quota/thread_quota.cc + src/core/lib/slice/slice.cc + src/core/lib/slice/slice_refcount.cc + src/core/lib/slice/slice_string_helpers.cc + src/core/lib/slice/static_slice.cc + test/core/resource_quota/resource_quota_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(test_core_resource_quota_resource_quota_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(test_core_resource_quota_resource_quota_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + absl::statusor + absl::variant + gpr +) + + endif() if(gRPC_BUILD_TESTS) @@ -16035,6 +16184,43 @@ target_link_libraries(thread_manager_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(thread_quota_test + src/core/lib/debug/trace.cc + src/core/lib/resource_quota/thread_quota.cc + test/core/resource_quota/thread_quota_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(thread_quota_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(thread_quota_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + gpr +) + + endif() if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 03c2643d5cf..497f6ee7d12 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -3668,6 +3668,64 @@ targets: deps: - grpc_test_util uses_polling: false +- name: memory_quota_stress_test + build: test + language: c + headers: + - src/core/lib/debug/trace.h + - src/core/lib/gprpp/atomic_utils.h + - src/core/lib/gprpp/dual_ref_counted.h + - src/core/lib/gprpp/orphanable.h + - src/core/lib/gprpp/ref_counted.h + - src/core/lib/gprpp/ref_counted_ptr.h + - src/core/lib/iomgr/closure.h + - src/core/lib/iomgr/combiner.h + - src/core/lib/iomgr/error.h + - src/core/lib/iomgr/error_internal.h + - src/core/lib/iomgr/exec_ctx.h + - src/core/lib/iomgr/executor.h + - src/core/lib/iomgr/iomgr_internal.h + - src/core/lib/promise/activity.h + - src/core/lib/promise/context.h + - src/core/lib/promise/detail/basic_seq.h + - src/core/lib/promise/detail/promise_factory.h + - src/core/lib/promise/detail/promise_like.h + - src/core/lib/promise/detail/status.h + - src/core/lib/promise/detail/switch.h + - src/core/lib/promise/exec_ctx_wakeup_scheduler.h + - src/core/lib/promise/loop.h + - src/core/lib/promise/poll.h + - src/core/lib/promise/race.h + - src/core/lib/promise/seq.h + - src/core/lib/resource_quota/memory_quota.h + - src/core/lib/slice/slice_internal.h + - src/core/lib/slice/slice_refcount.h + - src/core/lib/slice/slice_refcount_base.h + - src/core/lib/slice/slice_string_helpers.h + - src/core/lib/slice/slice_utils.h + - src/core/lib/slice/static_slice.h + src: + - src/core/lib/debug/trace.cc + - src/core/lib/iomgr/combiner.cc + - src/core/lib/iomgr/error.cc + - src/core/lib/iomgr/exec_ctx.cc + - src/core/lib/iomgr/executor.cc + - src/core/lib/iomgr/iomgr_internal.cc + - src/core/lib/promise/activity.cc + - src/core/lib/resource_quota/memory_quota.cc + - src/core/lib/slice/slice.cc + - src/core/lib/slice/slice_refcount.cc + - src/core/lib/slice/slice_string_helpers.cc + - src/core/lib/slice/static_slice.cc + - test/core/resource_quota/memory_quota_stress_test.cc + deps: + - absl/status:statusor + - absl/types:variant + - gpr + platforms: + - linux + - posix + uses_polling: false - name: message_compress_test build: test language: c @@ -3863,14 +3921,6 @@ targets: - grpc_test_util args: - --resolver=native -- name: resource_quota_test - build: test - language: c - headers: [] - src: - - test/core/iomgr/resource_quota_test.cc - deps: - - grpc_test_util - name: secure_channel_create_test build: test language: c @@ -4129,6 +4179,14 @@ targets: deps: - grpc_test_util uses_polling: false +- name: test_core_iomgr_resource_quota_test + build: test + language: c + headers: [] + src: + - test/core/iomgr/resource_quota_test.cc + deps: + - grpc_test_util - name: test_core_security_credentials_test build: test language: c @@ -6612,6 +6670,62 @@ targets: - test/core/security/matchers_test.cc deps: - grpc_test_util +- name: memory_quota_test + gtest: true + build: test + language: c++ + headers: + - src/core/lib/debug/trace.h + - src/core/lib/gprpp/atomic_utils.h + - src/core/lib/gprpp/dual_ref_counted.h + - src/core/lib/gprpp/orphanable.h + - src/core/lib/gprpp/ref_counted.h + - src/core/lib/gprpp/ref_counted_ptr.h + - src/core/lib/iomgr/closure.h + - src/core/lib/iomgr/combiner.h + - src/core/lib/iomgr/error.h + - src/core/lib/iomgr/error_internal.h + - src/core/lib/iomgr/exec_ctx.h + - src/core/lib/iomgr/executor.h + - src/core/lib/iomgr/iomgr_internal.h + - src/core/lib/promise/activity.h + - src/core/lib/promise/context.h + - src/core/lib/promise/detail/basic_seq.h + - src/core/lib/promise/detail/promise_factory.h + - src/core/lib/promise/detail/promise_like.h + - src/core/lib/promise/detail/status.h + - src/core/lib/promise/detail/switch.h + - src/core/lib/promise/exec_ctx_wakeup_scheduler.h + - src/core/lib/promise/loop.h + - src/core/lib/promise/poll.h + - src/core/lib/promise/race.h + - src/core/lib/promise/seq.h + - src/core/lib/resource_quota/memory_quota.h + - src/core/lib/slice/slice_internal.h + - src/core/lib/slice/slice_refcount.h + - src/core/lib/slice/slice_refcount_base.h + - src/core/lib/slice/slice_string_helpers.h + - src/core/lib/slice/slice_utils.h + - src/core/lib/slice/static_slice.h + src: + - src/core/lib/debug/trace.cc + - src/core/lib/iomgr/combiner.cc + - src/core/lib/iomgr/error.cc + - src/core/lib/iomgr/exec_ctx.cc + - src/core/lib/iomgr/executor.cc + - src/core/lib/iomgr/iomgr_internal.cc + - src/core/lib/promise/activity.cc + - src/core/lib/resource_quota/memory_quota.cc + - src/core/lib/slice/slice.cc + - src/core/lib/slice/slice_refcount.cc + - src/core/lib/slice/slice_string_helpers.cc + - src/core/lib/slice/static_slice.cc + - test/core/resource_quota/memory_quota_test.cc + deps: + - absl/status:statusor + - absl/types:variant + - gpr + uses_polling: false - name: message_allocator_end2end_test gtest: true build: test @@ -7659,6 +7773,66 @@ targets: - absl/types:optional - absl/utility:utility uses_polling: false +- name: test_core_resource_quota_resource_quota_test + gtest: true + build: test + language: c++ + headers: + - src/core/lib/debug/trace.h + - src/core/lib/gprpp/atomic_utils.h + - src/core/lib/gprpp/dual_ref_counted.h + - src/core/lib/gprpp/orphanable.h + - src/core/lib/gprpp/ref_counted.h + - src/core/lib/gprpp/ref_counted_ptr.h + - src/core/lib/iomgr/closure.h + - src/core/lib/iomgr/combiner.h + - src/core/lib/iomgr/error.h + - src/core/lib/iomgr/error_internal.h + - src/core/lib/iomgr/exec_ctx.h + - src/core/lib/iomgr/executor.h + - src/core/lib/iomgr/iomgr_internal.h + - src/core/lib/promise/activity.h + - src/core/lib/promise/context.h + - src/core/lib/promise/detail/basic_seq.h + - src/core/lib/promise/detail/promise_factory.h + - src/core/lib/promise/detail/promise_like.h + - src/core/lib/promise/detail/status.h + - src/core/lib/promise/detail/switch.h + - src/core/lib/promise/exec_ctx_wakeup_scheduler.h + - src/core/lib/promise/loop.h + - src/core/lib/promise/poll.h + - src/core/lib/promise/race.h + - src/core/lib/promise/seq.h + - src/core/lib/resource_quota/memory_quota.h + - src/core/lib/resource_quota/resource_quota.h + - src/core/lib/resource_quota/thread_quota.h + - src/core/lib/slice/slice_internal.h + - src/core/lib/slice/slice_refcount.h + - src/core/lib/slice/slice_refcount_base.h + - src/core/lib/slice/slice_string_helpers.h + - src/core/lib/slice/slice_utils.h + - src/core/lib/slice/static_slice.h + src: + - src/core/lib/debug/trace.cc + - src/core/lib/iomgr/combiner.cc + - src/core/lib/iomgr/error.cc + - src/core/lib/iomgr/exec_ctx.cc + - src/core/lib/iomgr/executor.cc + - src/core/lib/iomgr/iomgr_internal.cc + - src/core/lib/promise/activity.cc + - src/core/lib/resource_quota/memory_quota.cc + - src/core/lib/resource_quota/resource_quota.cc + - src/core/lib/resource_quota/thread_quota.cc + - src/core/lib/slice/slice.cc + - src/core/lib/slice/slice_refcount.cc + - src/core/lib/slice/slice_string_helpers.cc + - src/core/lib/slice/static_slice.cc + - test/core/resource_quota/resource_quota_test.cc + deps: + - absl/status:statusor + - absl/types:variant + - gpr + uses_polling: false - name: test_cpp_client_credentials_test gtest: true build: test @@ -7709,6 +7883,23 @@ targets: deps: - grpc++_test_config - grpc++_test_util +- name: thread_quota_test + gtest: true + build: test + language: c++ + headers: + - src/core/lib/debug/trace.h + - src/core/lib/gprpp/atomic_utils.h + - src/core/lib/gprpp/ref_counted.h + - src/core/lib/gprpp/ref_counted_ptr.h + - src/core/lib/resource_quota/thread_quota.h + src: + - src/core/lib/debug/trace.cc + - src/core/lib/resource_quota/thread_quota.cc + - test/core/resource_quota/thread_quota_test.cc + deps: + - gpr + uses_polling: false - name: thread_stress_test gtest: true build: test diff --git a/src/core/lib/promise/activity.h b/src/core/lib/promise/activity.h index f034c087f08..b0d7f8294f5 100644 --- a/src/core/lib/promise/activity.h +++ b/src/core/lib/promise/activity.h @@ -341,7 +341,7 @@ class PromiseActivity final } void RunScheduledWakeup() { - GPR_ASSERT(wakeup_scheduled_.exchange(false, std::memory_order_relaxed)); + GPR_ASSERT(wakeup_scheduled_.exchange(false, std::memory_order_acq_rel)); Step(); WakeupComplete(); } @@ -360,9 +360,11 @@ class PromiseActivity final WakeupComplete(); return; } - if (!wakeup_scheduled_.exchange(true, std::memory_order_relaxed)) { + if (!wakeup_scheduled_.exchange(true, std::memory_order_acq_rel)) { // Can't safely run, so ask to run later. wakeup_scheduler_.ScheduleWakeup(this); + } else { + WakeupComplete(); } } diff --git a/src/core/lib/resource_quota/memory_quota.cc b/src/core/lib/resource_quota/memory_quota.cc new file mode 100644 index 00000000000..8353c9db65c --- /dev/null +++ b/src/core/lib/resource_quota/memory_quota.cc @@ -0,0 +1,426 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/lib/resource_quota/memory_quota.h" + +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h" +#include "src/core/lib/promise/loop.h" +#include "src/core/lib/promise/race.h" +#include "src/core/lib/promise/seq.h" +#include "src/core/lib/slice/slice_refcount.h" + +namespace grpc_core { + +// Maximum number of bytes an allocator will request from a quota in one step. +// Larger allocations than this will require multiple allocation requests. +static constexpr size_t kMaxReplenishBytes = 1024 * 1024; + +// Minimum number of bytes an allocator will request from a quota in one step. +static constexpr size_t kMinReplenishBytes = 4096; + +// +// Reclaimer +// + +ReclamationSweep::~ReclamationSweep() { + if (memory_quota_ != nullptr) { + memory_quota_->FinishReclamation(sweep_token_); + } +} + +// +// ReclaimerQueue +// + +const ReclaimerQueue::Index ReclaimerQueue::kInvalidIndex; + +void ReclaimerQueue::Insert(RefCountedPtr allocator, + ReclamationFunction reclaimer, Index* index) { + MutexLock lock(&mu_); + if (*index < entries_.size() && entries_[*index].allocator == allocator) { + entries_[*index].reclaimer.swap(reclaimer); + return; + } + if (free_entries_.empty()) { + *index = entries_.size(); + entries_.emplace_back(std::move(allocator), std::move(reclaimer)); + } else { + *index = free_entries_.back(); + free_entries_.pop_back(); + Entry& entry = entries_[*index]; + entry.allocator = std::move(allocator); + entry.reclaimer = std::move(reclaimer); + } + if (queue_.empty()) waker_.Wakeup(); + queue_.push(*index); +} + +ReclamationFunction ReclaimerQueue::Cancel(Index index, + MemoryAllocator* allocator) { + MutexLock lock(&mu_); + if (index >= entries_.size()) return nullptr; + Entry& entry = entries_[index]; + if (entry.allocator.get() != allocator) return {}; + entry.allocator.reset(); + return std::move(entry.reclaimer); +} + +Poll ReclaimerQueue::PollNext() { + MutexLock lock(&mu_); + while (true) { + if (queue_.empty()) { + waker_ = Activity::current()->MakeNonOwningWaker(); + return Pending{}; + } + Index index = queue_.front(); + queue_.pop(); + free_entries_.push_back(index); + Entry& entry = entries_[index]; + if (entry.allocator != nullptr) { + entry.allocator.reset(); + return std::move(entry.reclaimer); + } + } +} + +// +// MemoryAllocator +// + +MemoryAllocator::MemoryAllocator(RefCountedPtr memory_quota) + : InternallyRefCounted("MemoryAllocator"), memory_quota_(memory_quota) { + Reserve(sizeof(MemoryQuota)); +} + +MemoryAllocator::~MemoryAllocator() { + GPR_ASSERT(free_bytes_.load(std::memory_order_acquire) + + sizeof(MemoryQuota) == + taken_bytes_); + memory_quota_->Return(taken_bytes_); +} + +void MemoryAllocator::Orphan() { + ReclamationFunction old_reclaimers[kNumReclamationPasses]; + { + MutexLock lock(&memory_quota_mu_); + for (size_t i = 0; i < kNumReclamationPasses; i++) { + old_reclaimers[i] = + memory_quota_->reclaimers_[i].Cancel(reclamation_indices_[i], this); + } + } + InternallyRefCounted::Unref(); +} + +size_t MemoryAllocator::Reserve(MemoryRequest request) { + while (true) { + // Attempt to reserve memory from our pool. + auto reservation = TryReserve(request); + if (reservation.has_value()) return *reservation; + // If that failed, grab more from the quota and retry. + Replenish(); + } +} + +absl::optional MemoryAllocator::TryReserve(MemoryRequest request) { + // How much memory should we request? (see the scaling below) + size_t scaled_size_over_min = request.max() - request.min(); + // Scale the request down according to memory pressure if we have that + // flexibility. + if (scaled_size_over_min != 0) { + double pressure; + { + MutexLock lock(&memory_quota_mu_); + pressure = memory_quota_->InstantaneousPressure(); + } + // Reduce allocation size proportional to the pressure > 80% usage. + if (pressure > 0.8) { + scaled_size_over_min = + std::min(scaled_size_over_min, + static_cast((request.max() - request.min()) * + (1.0 - pressure) / 0.2)); + } + } + + // How much do we want to reserve? + const size_t reserve = request.min() + scaled_size_over_min; + // See how many bytes are available. + size_t available = free_bytes_.load(std::memory_order_acquire); + while (true) { + // Does the current free pool satisfy the request? + if (available < reserve) { + return {}; + } + // Try to reserve the requested amount. + // If the amount of free memory changed through this loop, then available + // will be set to the new value and we'll repeat. + if (free_bytes_.compare_exchange_weak(available, available - reserve, + std::memory_order_acq_rel, + std::memory_order_acquire)) { + return reserve; + } + } +} + +void MemoryAllocator::Replenish() { + MutexLock lock(&memory_quota_mu_); + // Attempt a fairly low rate exponential growth request size, bounded between + // some reasonable limits declared at top of file. + auto amount = Clamp(taken_bytes_ / 3, kMinReplenishBytes, kMaxReplenishBytes); + // Take the requested amount from the quota. + gpr_log(GPR_DEBUG, "%p: take %" PRIdMAX " bytes from quota", this, amount); + memory_quota_->Take(amount); + // Record that we've taken it. + taken_bytes_ += amount; + // Add the taken amount to the free pool. + free_bytes_.fetch_add(amount, std::memory_order_acq_rel); + // See if we can add ourselves as a reclaimer. + MaybeRegisterReclaimerLocked(); +} + +void MemoryAllocator::MaybeRegisterReclaimer() { + MutexLock lock(&memory_quota_mu_); + MaybeRegisterReclaimerLocked(); +} + +void MemoryAllocator::MaybeRegisterReclaimerLocked() { + // If the reclaimer is already registered, then there's nothing to do. + if (reclamation_indices_[0] != ReclaimerQueue::kInvalidIndex) return; + // Grab references to the things we'll need + auto self = Ref(DEBUG_LOCATION, "reclaimer"); + gpr_log(GPR_DEBUG, "%p: register reclaimer; idx=%" PRIdMAX, this, + reclamation_indices_[0]); + memory_quota_->reclaimers_[0].Insert( + self, + [self](ReclamationSweep) { + MutexLock lock(&self->memory_quota_mu_); + // Figure out how many bytes we can return to the quota. + size_t return_bytes = + self->free_bytes_.exchange(0, std::memory_order_acq_rel); + gpr_log(GPR_DEBUG, "%p: sweep reclaimer - return %" PRIdMAX, self.get(), + return_bytes); + if (return_bytes == 0) return; + // Subtract that from our outstanding balance. + self->taken_bytes_ -= return_bytes; + // And return them to the quota. + self->memory_quota_->Return(return_bytes); + }, + &reclamation_indices_[0]); +} + +void MemoryAllocator::Rebind(RefCountedPtr memory_quota) { + MutexLock lock(&memory_quota_mu_); + if (memory_quota_ == memory_quota) return; + // Return memory to the original memory quota. + memory_quota_->Return(taken_bytes_); + // Fetch back any reclaimers that are queued. + ReclamationFunction reclaimers[kNumReclamationPasses]; + for (size_t i = 0; i < kNumReclamationPasses; i++) { + reclaimers[i] = + memory_quota_->reclaimers_[i].Cancel(reclamation_indices_[i], this); + } + // Switch to the new memory quota, leaving the old one in memory_quota so that + // when we unref it, we are outside of lock. + memory_quota_.swap(memory_quota); + // Drop our freed memory down to zero, to avoid needing to ask the new + // quota for memory we're not currently using. + taken_bytes_ -= free_bytes_.exchange(0, std::memory_order_acq_rel); + // And let the new quota know how much we're already using. + memory_quota_->Take(taken_bytes_); + // Reinsert active reclaimers. + for (size_t i = 0; i < kNumReclamationPasses; i++) { + if (reclaimers[i] == nullptr) continue; + memory_quota_->reclaimers_[i].Insert(Ref(DEBUG_LOCATION, "rebind"), + std::move(reclaimers[i]), + &reclamation_indices_[i]); + } +} + +void MemoryAllocator::PostReclaimer(ReclamationPass pass, + ReclamationFunction fn) { + MutexLock lock(&memory_quota_mu_); + auto pass_num = static_cast(pass); + memory_quota_->reclaimers_[pass_num].Insert( + Ref(DEBUG_LOCATION, "post_reclaimer"), std::move(fn), + &reclamation_indices_[pass_num]); +} + +namespace { + +// Reference count for a slice allocated by MemoryAllocator::MakeSlice. +// Takes care of releasing memory back when the slice is destroyed. +class SliceRefCount { + public: + static void Destroy(void* p) { + auto* rc = static_cast(p); + rc->~SliceRefCount(); + gpr_free(rc); + } + SliceRefCount(RefCountedPtr allocator, size_t size) + : base_(grpc_slice_refcount::Type::REGULAR, &refs_, Destroy, this, + &base_), + allocator_(std::move(allocator)), + size_(size) { + // Nothing to do here. + } + ~SliceRefCount() { allocator_->Release(size_); } + + grpc_slice_refcount* base_refcount() { return &base_; } + + private: + grpc_slice_refcount base_; + RefCount refs_; + RefCountedPtr allocator_; + size_t size_; +}; + +} // namespace + +grpc_slice MemoryAllocator::MakeSlice(MemoryRequest request) { + auto size = Reserve(request.Increase(sizeof(SliceRefCount))); + void* p = gpr_malloc(size); + new (p) SliceRefCount(Ref(DEBUG_LOCATION, "slice"), size); + grpc_slice slice; + slice.refcount = static_cast(p)->base_refcount(); + slice.data.refcounted.bytes = + static_cast(p) + sizeof(SliceRefCount); + slice.data.refcounted.length = size - sizeof(SliceRefCount); + return slice; +} + +// +// MemoryQuota +// + +class MemoryQuota::WaitForSweepPromise { + public: + WaitForSweepPromise(WeakRefCountedPtr memory_quota, + uint64_t token) + : memory_quota_(memory_quota), token_(token) {} + + struct Empty {}; + Poll operator()() { + if (memory_quota_->reclamation_counter_.load(std::memory_order_relaxed) != + token_) { + return Empty{}; + } else { + return Pending{}; + } + } + + private: + WeakRefCountedPtr memory_quota_; + uint64_t token_; +}; + +MemoryQuota::MemoryQuota() : DualRefCounted("MemoryQuota") { + auto self = WeakRef(); + + // Reclamation loop: + // basically, wait until we are in overcommit (free_bytes_ < 0), and then: + // while (free_bytes_ < 0) reclaim_memory() + // ... and repeat + auto reclamation_loop = Loop(Seq( + [self]() -> Poll { + // If there's free memory we no longer need to reclaim memory! + if (self->free_bytes_.load(std::memory_order_acquire) > 0) { + return Pending{}; + } + return 0; + }, + [self]() { + // Race biases to the first thing that completes... so this will + // choose the highest priority/least destructive thing to do that's + // available. + return Race(self->reclaimers_[0].Next(), self->reclaimers_[1].Next(), + self->reclaimers_[2].Next(), self->reclaimers_[3].Next()); + }, + [self](ReclamationFunction reclaimer) { + // One of the reclaimer queues gave us a way to get back memory. + // Call the reclaimer with a token that contains enough to wake us + // up again. + const uint64_t token = + self->reclamation_counter_.fetch_add(1, std::memory_order_relaxed) + + 1; + reclaimer(ReclamationSweep(self, token)); + // Return a promise that will wait for our barrier. This will be + // awoken by the token above being destroyed. So, once that token is + // destroyed, we'll be able to proceed. + return WaitForSweepPromise(self, token); + }, + []() -> LoopCtl { + // Continue the loop! + return Continue{}; + })); + + reclaimer_activity_ = + MakeActivity(std::move(reclamation_loop), ExecCtxWakeupScheduler(), + [](absl::Status status) { + GPR_ASSERT(status.code() == absl::StatusCode::kCancelled); + }); +} + +void MemoryQuota::SetSize(size_t new_size) { + size_t old_size = quota_size_.exchange(new_size, std::memory_order_relaxed); + if (old_size < new_size) { + // We're growing the quota. + Return(new_size - old_size); + } else { + // We're shrinking the quota. + Take(old_size - new_size); + } +} + +void MemoryQuota::Take(size_t amount) { + // If there's a request for nothing, then do nothing! + if (amount == 0) return; + GPR_DEBUG_ASSERT(amount <= std::numeric_limits::max()); + // Grab memory from the quota. + auto prior = free_bytes_.fetch_sub(amount, std::memory_order_acq_rel); + // If we push into overcommit, awake the reclaimer. + if (prior >= 0 && prior < static_cast(amount)) { + reclaimer_activity_->ForceWakeup(); + } +} + +void MemoryQuota::Orphan() { reclaimer_activity_.reset(); } + +void MemoryQuota::FinishReclamation(uint64_t token) { + uint64_t current = reclamation_counter_.load(std::memory_order_relaxed); + if (current != token) return; + if (reclamation_counter_.compare_exchange_strong(current, current + 1, + std::memory_order_relaxed, + std::memory_order_relaxed)) { + reclaimer_activity_->ForceWakeup(); + } +} + +void MemoryQuota::Return(size_t amount) { + free_bytes_.fetch_add(amount, std::memory_order_relaxed); +} + +size_t MemoryQuota::InstantaneousPressure() const { + double free = free_bytes_.load(); + if (free < 0) free = 0; + double size = quota_size_.load(); + if (size < 1) return 1.0; + double pressure = (size - free) / size; + if (pressure < 0.0) pressure = 0.0; + if (pressure > 1.0) pressure = 1.0; + return pressure; +} + +} // namespace grpc_core diff --git a/src/core/lib/resource_quota/memory_quota.h b/src/core/lib/resource_quota/memory_quota.h new file mode 100644 index 00000000000..ed985270571 --- /dev/null +++ b/src/core/lib/resource_quota/memory_quota.h @@ -0,0 +1,405 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_RESOURCE_QUOTA_MEMORY_QUOTA_H +#define GRPC_CORE_LIB_RESOURCE_QUOTA_MEMORY_QUOTA_H + +#include + +#include +#include +#include +#include +#include +#include + +#include + +#include "src/core/lib/gprpp/dual_ref_counted.h" +#include "src/core/lib/gprpp/orphanable.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/poll.h" + +namespace grpc_core { + +class Reclaimer; +class MemoryAllocator; +class MemoryQuota; + +// Reclamation passes. +// When memory is tight, we start trying to claim some back from memory +// reclaimers. We do this in multiple passes: if there is a less destructive +// operation available, we do that, otherwise we do something more destructive. +enum class ReclamationPass { + // Non-empty reclamation ought to take index 0, but to simplify API we don't + // expose that publicly (it's an internal detail), and hence index zero is + // here unnamed. + + // Benign reclamation is intended for reclamation steps that are not + // observable outside of gRPC (besides maybe causing an increase in CPU + // usage). + // Examples of such reclamation would be resizing buffers to fit the current + // load needs, rather than whatever was the peak usage requirement. + kBenign = 1, + // Idle reclamation is intended for reclamation steps that are observable + // outside of gRPC, but do not cause application work to be lost. + // Examples of such reclamation would be dropping channels that are not being + // used. + kIdle = 2, + // Destructive reclamation is our last resort, and is these reclamations are + // allowed to drop work - such as cancelling in flight requests. + kDestructive = 3, +}; +static constexpr size_t kNumReclamationPasses = 4; + +// Reservation request - how much memory do we want to allocate? +class MemoryRequest { + public: + // Request a fixed amount of memory. + // NOLINTNEXTLINE(google-explicit-constructor) + MemoryRequest(size_t n) : min_(n), max_(n) {} + // Request a range of memory. + MemoryRequest(size_t min, size_t max) : min_(std::min(min, max)), max_(max) {} + + // Increase the size by amount + MemoryRequest Increase(size_t amount) const { + return MemoryRequest(min_ + amount, max_ + amount); + } + + size_t min() const { return min_; } + size_t max() const { return max_; } + + private: + size_t min_; + size_t max_; +}; + +// For each reclamation function run we construct a ReclamationSweep. +// When this object is finally destroyed (it may be moved several times first), +// then that reclamation is complete and we may continue the reclamation loop. +class ReclamationSweep { + public: + ReclamationSweep(WeakRefCountedPtr memory_quota, + uint64_t sweep_token) + : memory_quota_(std::move(memory_quota)), sweep_token_(sweep_token) {} + ~ReclamationSweep(); + + ReclamationSweep(const ReclamationSweep&) = delete; + ReclamationSweep& operator=(const ReclamationSweep&) = delete; + ReclamationSweep(ReclamationSweep&&) = default; + ReclamationSweep& operator=(ReclamationSweep&&) = default; + + // Has enough work been done that we would not be called upon again + // immediately to do reclamation work if we stopped and requeued. Reclaimers + // with a variable amount of work to do can use this to ascertain when they + // can stop more efficiently than going through the reclaimer queue once per + // work item. + bool IsSufficient() const; + + private: + WeakRefCountedPtr memory_quota_; + uint64_t sweep_token_; +}; + +using ReclamationFunction = std::function; + +class ReclaimerQueue { + public: + using Index = size_t; + + // An invalid index usable as an empty value. + // This value will not be returned from Insert ever. + static constexpr Index kInvalidIndex = std::numeric_limits::max(); + + // Insert a new element at the back of the queue. + // If there is already an element from allocator at *index, then it is + // replaced with the new reclaimer and *index is unchanged. If there is not, + // then *index is set to the index of the newly queued entry. + // Associates the reclamation function with an allocator, and keeps that + // allocator alive, so that we can use the pointer as an ABA guard. + void Insert(RefCountedPtr allocator, + ReclamationFunction reclaimer, Index* index) + ABSL_LOCKS_EXCLUDED(mu_); + // Cancel a reclamation function - returns the function if cancelled + // successfully, or nullptr if the reclamation was already begun and could not + // be cancelled. allocator must be the same as was passed to Insert. + ReclamationFunction Cancel(Index index, MemoryAllocator* allocator) + ABSL_LOCKS_EXCLUDED(mu_); + // Poll to see if an entry is available: returns Pending if not, or the + // removed reclamation function if so. + Poll PollNext() ABSL_LOCKS_EXCLUDED(mu_); + + // This callable is the promise backing Next - it resolves when there is an + // entry available. This really just redirects to calling PollNext(). + class NextPromise { + public: + explicit NextPromise(ReclaimerQueue* queue) : queue_(queue) {} + Poll operator()() { return queue_->PollNext(); } + + private: + // Borrowed ReclaimerQueue backing this promise. + ReclaimerQueue* queue_; + }; + NextPromise Next() { return NextPromise(this); } + + private: + // One entry in the reclaimer queue + struct Entry { + Entry(RefCountedPtr allocator, + ReclamationFunction reclaimer) + : allocator(allocator), reclaimer(reclaimer) {} + // The allocator we'd be reclaiming for. + RefCountedPtr allocator; + // The reclamation function to call. + ReclamationFunction reclaimer; + }; + // Guarding mutex. + Mutex mu_; + // Entries in the queue (or empty entries waiting to be queued). + // We actually queue indices into this vector - and do this so that + // we can use the memory allocator pointer as an ABA protection. + std::vector entries_ ABSL_GUARDED_BY(mu_); + // Which entries in entries_ are not allocated right now. + std::vector free_entries_ ABSL_GUARDED_BY(mu_); + // Allocated entries waiting to be consumed. + std::queue queue_ ABSL_GUARDED_BY(mu_); + // Potentially one activity can be waiting for new entries on the queue. + Waker waker_ ABSL_GUARDED_BY(mu_); +}; + +// MemoryAllocator grants the owner the ability to allocate memory from an +// underlying resource quota. +class MemoryAllocator final : public InternallyRefCounted { + public: + explicit MemoryAllocator(RefCountedPtr memory_quota); + ~MemoryAllocator() override; + + void Orphan() override; + + // Rebind - Swaps the underlying quota for this allocator, taking care to + // make sure memory allocated is moved to allocations against the new quota. + void Rebind(RefCountedPtr memory_quota) + ABSL_LOCKS_EXCLUDED(memory_quota_mu_); + + // Reserve bytes from the quota. + // If we enter overcommit, reclamation will begin concurrently. + // Returns the number of bytes reserved. + size_t Reserve(MemoryRequest request) ABSL_LOCKS_EXCLUDED(memory_quota_mu_); + + // Release some bytes that were previously reserved. + void Release(size_t n) ABSL_LOCKS_EXCLUDED(memory_quota_mu_) { + // Add the released memory to our free bytes counter... if this increases + // from 0 to non-zero, then we have more to do, otherwise, we're actually + // done. + if (free_bytes_.fetch_add(n, std::memory_order_release) != 0) return; + MaybeRegisterReclaimer(); + } + + // An automatic releasing reservation of memory. + class Reservation { + public: + Reservation() = default; + Reservation(const Reservation&) = delete; + Reservation& operator=(const Reservation&) = delete; + Reservation(Reservation&&) = default; + Reservation& operator=(Reservation&&) = default; + ~Reservation() { + if (allocator_ != nullptr) allocator_->Release(size_); + } + + private: + friend class MemoryAllocator; + Reservation(RefCountedPtr allocator, size_t size) + : allocator_(allocator), size_(size) {} + + RefCountedPtr allocator_ = nullptr; + size_t size_ = 0; + }; + + // Reserve bytes from the quota and automatically release them when + // Reservation is destroyed. + Reservation MakeReservation(MemoryRequest request) { + return Reservation(Ref(DEBUG_LOCATION, "Reservation"), Reserve(request)); + } + + // Post a reclaimer for some reclamation pass. + void PostReclaimer(ReclamationPass pass, + std::function); + + // Allocate a new object of type T, with constructor arguments. + // The returned type is wrapped, and upon destruction the reserved memory + // will be released to the allocator automatically. As such, T must have a + // virtual destructor so we can insert the necessary hook. + template + absl::enable_if_t::value, T*> New( + Args&&... args) ABSL_LOCKS_EXCLUDED(memory_quota_mu_) { + // Wrap T such that when it's destroyed, we can release memory back to the + // allocator. + class Wrapper final : public T { + public: + explicit Wrapper(RefCountedPtr allocator, Args&&... args) + : T(std::forward(args)...), allocator_(std::move(allocator)) {} + ~Wrapper() override { allocator_->Release(sizeof(*this)); } + + private: + const RefCountedPtr allocator_; + }; + Reserve(sizeof(Wrapper)); + return new Wrapper(Ref(DEBUG_LOCATION, "Wrapper"), + std::forward(args)...); + } + + // Construct a unique ptr immediately. + template + std::unique_ptr MakeUnique(Args&&... args) + ABSL_LOCKS_EXCLUDED(memory_quota_mu_) { + return std::unique_ptr(New(std::forward(args)...)); + } + + // Allocate a slice, using MemoryRequest to size the number of returned bytes. + // For a variable length request, check the returned slice length to verify + // how much memory was allocated. + // Takes care of reserving memory for any relevant control structures also. + grpc_slice MakeSlice(MemoryRequest request); + + // A C++ allocator for containers of T. + template + class Container { + public: + // Construct the allocator: \a underlying_allocator is borrowed, and must + // outlive this object. + explicit Container(MemoryAllocator* underlying_allocator) + : underlying_allocator_(underlying_allocator) {} + + MemoryAllocator* underlying_allocator() const { + return underlying_allocator_; + } + + using value_type = T; + template + explicit Container(const Container& other) + : underlying_allocator_(other.underlying_allocator()) {} + T* allocate(size_t n) { + underlying_allocator_->Reserve(n * sizeof(T)); + return static_cast(::operator new(n * sizeof(T))); + } + void deallocate(T* p, size_t n) { + ::operator delete(p); + underlying_allocator_->Release(n * sizeof(T)); + } + + private: + MemoryAllocator* underlying_allocator_; + }; + + private: + // Primitive reservation function. + absl::optional TryReserve(MemoryRequest request) GRPC_MUST_USE_RESULT; + // Replenish bytes from the quota, without blocking, possibly entering + // overcommit. + void Replenish() ABSL_LOCKS_EXCLUDED(memory_quota_mu_); + // If we have not already, register a reclamation function against the quota + // to sweep any free memory back to that quota. + void MaybeRegisterReclaimer() ABSL_LOCKS_EXCLUDED(memory_quota_mu_); + void MaybeRegisterReclaimerLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(memory_quota_mu_); + + // Amount of memory this allocator has cached for its own use: to avoid quota + // contention, each MemoryAllocator can keep some memory in addition to what + // it is immediately using, and the quota can pull it back under memory + // pressure. + std::atomic free_bytes_{0}; + // Mutex guarding the backing resource quota. + Mutex memory_quota_mu_; + // Backing resource quota. + RefCountedPtr memory_quota_ ABSL_GUARDED_BY(memory_quota_mu_); + // Amount of memory taken from the quota by this allocator. + size_t taken_bytes_ ABSL_GUARDED_BY(memory_quota_mu_) = 0; + // Indices into the various reclaimer queues, used so that we can cancel + // reclamation should we shutdown or get rebound. + ReclaimerQueue::Index + reclamation_indices_[kNumReclamationPasses] ABSL_GUARDED_BY( + memory_quota_mu_) = { + ReclaimerQueue::kInvalidIndex, ReclaimerQueue::kInvalidIndex, + ReclaimerQueue::kInvalidIndex, ReclaimerQueue::kInvalidIndex}; +}; + +// Wrapper type around std::vector to make initialization against a +// MemoryAllocator based container allocator easy. +template +class Vector : public std::vector> { + public: + explicit Vector(MemoryAllocator* allocator) + : std::vector>( + MemoryAllocator::Container(allocator)) {} +}; + +// MemoryQuota tracks the amount of memory available as part of a ResourceQuota. +class MemoryQuota final : public DualRefCounted { + public: + MemoryQuota(); + + OrphanablePtr MakeMemoryAllocator() { + return MakeOrphanable( + Ref(DEBUG_LOCATION, "MakeMemoryAllocator")); + } + + // Resize the quota to new_size. + void SetSize(size_t new_size); + + private: + friend class MemoryAllocator; + friend class ReclamationSweep; + class WaitForSweepPromise; + + void Orphan() override; + + // Forcefully take some memory from the quota, potentially entering + // overcommit. + void Take(size_t amount); + // Finish reclamation pass. + void FinishReclamation(uint64_t token); + // Return some memory to the quota. + void Return(size_t amount); + // Instantaneous memory pressure approximation. + size_t InstantaneousPressure() const; + + static constexpr intptr_t kInitialSize = std::numeric_limits::max(); + + // The amount of memory that's free in this quota. + // We use intptr_t as a reasonable proxy for ssize_t that's portable. + // We allow arbitrary overcommit and so this must allow negative values. + std::atomic free_bytes_{kInitialSize}; + // The total number of bytes in this quota. + std::atomic quota_size_{kInitialSize}; + + // Reclaimer queues. + ReclaimerQueue reclaimers_[kNumReclamationPasses]; + // The reclaimer activity consumes reclaimers whenever we are in overcommit to + // try and get back under memory limits. + ActivityPtr reclaimer_activity_; + // Each time we do a reclamation sweep, we increment this counter and give it + // to the sweep in question. In this way, should we choose to cancel a sweep + // we can do so and not get confused when the sweep reports back that it's + // completed. + // We also increment this counter on completion of a sweep, as an indicator + // that the wait has ended. + std::atomic reclamation_counter_{0}; +}; + +} // namespace grpc_core + +#endif // GRPC_CORE_LIB_RESOURCE_QUOTA_MEMORY_QUOTA_H diff --git a/src/core/lib/resource_quota/resource_quota.cc b/src/core/lib/resource_quota/resource_quota.cc new file mode 100644 index 00000000000..5654f2da2a5 --- /dev/null +++ b/src/core/lib/resource_quota/resource_quota.cc @@ -0,0 +1,27 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/lib/resource_quota/resource_quota.h" + +namespace grpc_core { + +ResourceQuota::ResourceQuota() + : memory_quota_(MakeRefCounted()), + thread_quota_(MakeRefCounted()) {} + +ResourceQuota::~ResourceQuota() = default; + +} // namespace grpc_core diff --git a/src/core/lib/resource_quota/resource_quota.h b/src/core/lib/resource_quota/resource_quota.h new file mode 100644 index 00000000000..361ceeb7a37 --- /dev/null +++ b/src/core/lib/resource_quota/resource_quota.h @@ -0,0 +1,48 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_RESOURCE_QUOTA_RESOURCE_QUOTA_H +#define GRPC_CORE_LIB_RESOURCE_QUOTA_RESOURCE_QUOTA_H + +#include + +#include "src/core/lib/resource_quota/memory_quota.h" +#include "src/core/lib/resource_quota/thread_quota.h" + +namespace grpc_core { + +class ResourceQuota : public RefCounted { + public: + ResourceQuota(); + ~ResourceQuota() override; + + ResourceQuota(const ResourceQuota&) = delete; + ResourceQuota& operator=(const ResourceQuota&) = delete; + + const RefCountedPtr& memory_quota() const { + return memory_quota_; + } + + const RefCountedPtr& thread_quota() const { + return thread_quota_; + } + + private: + RefCountedPtr memory_quota_; + RefCountedPtr thread_quota_; +}; + +} // namespace grpc_core + +#endif // GRPC_CORE_LIB_RESOURCE_QUOTA_RESOURCE_QUOTA_H diff --git a/src/core/lib/resource_quota/thread_quota.cc b/src/core/lib/resource_quota/thread_quota.cc new file mode 100644 index 00000000000..c935be0d7f2 --- /dev/null +++ b/src/core/lib/resource_quota/thread_quota.cc @@ -0,0 +1,43 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/lib/resource_quota/thread_quota.h" + +namespace grpc_core { + +ThreadQuota::ThreadQuota() = default; + +ThreadQuota::~ThreadQuota() = default; + +void ThreadQuota::SetMax(size_t new_max) { + MutexLock lock(&mu_); + max_ = new_max; +} + +bool ThreadQuota::Reserve(size_t num_threads) { + MutexLock lock(&mu_); + if (allocated_ + num_threads > max_) return false; + allocated_ += num_threads; + return true; +} + +void ThreadQuota::Release(size_t num_threads) { + MutexLock lock(&mu_); + GPR_ASSERT(num_threads <= allocated_); + allocated_ -= num_threads; +} + +} // namespace grpc_core diff --git a/src/core/lib/resource_quota/thread_quota.h b/src/core/lib/resource_quota/thread_quota.h new file mode 100644 index 00000000000..0f236dd7afd --- /dev/null +++ b/src/core/lib/resource_quota/thread_quota.h @@ -0,0 +1,55 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_RESOURCE_QUOTA_THREAD_QUOTA_H +#define GRPC_CORE_LIB_RESOURCE_QUOTA_THREAD_QUOTA_H + +#include + +#include + +#include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/sync.h" + +namespace grpc_core { + +// Tracks the amount of threads in a resource quota. +class ThreadQuota : public RefCounted { + public: + ThreadQuota(); + ~ThreadQuota() override; + + ThreadQuota(const ThreadQuota&) = delete; + ThreadQuota& operator=(const ThreadQuota&) = delete; + + // Set the maximum number of threads that can be used by this quota. + // If there are more, new reservations will fail until the quota is available. + void SetMax(size_t new_max); + + // Try to allocate some number of threads. + // Returns true if the allocation succeeded, false otherwise. + bool Reserve(size_t num_threads); + + // Release some number of threads. + void Release(size_t num_threads); + + private: + Mutex mu_; + size_t allocated_ ABSL_GUARDED_BY(mu_) = 0; + size_t max_ ABSL_GUARDED_BY(mu_) = std::numeric_limits::max(); +}; + +} // namespace grpc_core + +#endif // GRPC_CORE_LIB_RESOURCE_QUOTA_THREAD_QUOTA_H diff --git a/test/core/resource_quota/BUILD b/test/core/resource_quota/BUILD new file mode 100644 index 00000000000..9139eac904e --- /dev/null +++ b/test/core/resource_quota/BUILD @@ -0,0 +1,94 @@ +# Copyright 2021 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package") + +licenses(["notice"]) + +grpc_package(name = "test/core/promise") + +load("//test/core/util:grpc_fuzzer.bzl", "grpc_proto_fuzzer") + +grpc_cc_test( + name = "memory_quota_test", + srcs = ["memory_quota_test.cc"], + external_deps = [ + "gtest", + ], + language = "c++", + uses_polling = False, + deps = [ + "//:memory_quota", + "//test/core/util:grpc_suppressions", + ], +) + +grpc_cc_test( + name = "thread_quota_test", + srcs = ["thread_quota_test.cc"], + external_deps = [ + "gtest", + ], + language = "c++", + uses_polling = False, + deps = [ + "//:thread_quota", + "//test/core/util:grpc_suppressions", + ], +) + +grpc_cc_test( + name = "resource_quota_test", + srcs = ["resource_quota_test.cc"], + external_deps = [ + "gtest", + ], + language = "c++", + uses_polling = False, + deps = [ + "//:resource_quota", + "//test/core/util:grpc_suppressions", + ], +) + +grpc_cc_test( + name = "memory_quota_stress_test", + srcs = ["memory_quota_stress_test.cc"], + language = "c++", + # We only run this test under Linux, and really only care about the + # TSAN results. + tags = [ + "no_mac", + "no_windows", + ], + uses_polling = False, + deps = [ + "//:memory_quota", + "//test/core/util:grpc_suppressions", + ], +) + +grpc_proto_fuzzer( + name = "memory_quota_fuzzer", + srcs = ["memory_quota_fuzzer.cc"], + corpus = "memory_quota_fuzzer_corpus", + language = "C++", + proto = "memory_quota_fuzzer.proto", + tags = ["no_windows"], + uses_polling = False, + deps = [ + "//:memory_quota", + "//test/core/util:grpc_test_util", + ], +) diff --git a/test/core/resource_quota/memory_quota_fuzzer.cc b/test/core/resource_quota/memory_quota_fuzzer.cc new file mode 100644 index 00000000000..be703c399af --- /dev/null +++ b/test/core/resource_quota/memory_quota_fuzzer.cc @@ -0,0 +1,169 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/resource_quota/memory_quota.h" +#include "src/libfuzzer/libfuzzer_macro.h" +#include "test/core/resource_quota/memory_quota_fuzzer.pb.h" + +bool squelch = true; +bool leak_check = true; + +namespace grpc_core { + +namespace { +ReclamationPass MapReclamationPass(memory_quota_fuzzer::Reclaimer::Pass pass) { + switch (pass) { + case memory_quota_fuzzer::Reclaimer::BENIGN: + return ReclamationPass::kBenign; + case memory_quota_fuzzer::Reclaimer::IDLE: + return ReclamationPass::kIdle; + case memory_quota_fuzzer::Reclaimer::DESTRUCTIVE: + return ReclamationPass::kDestructive; + default: + return ReclamationPass::kBenign; + } +} + +class Fuzzer { + public: + void Run(const memory_quota_fuzzer::Msg& msg) { + grpc_core::ExecCtx exec_ctx; + RunMsg(msg); + memory_quotas_.clear(); + memory_allocators_.clear(); + allocations_.clear(); + } + + private: + void RunMsg(const memory_quota_fuzzer::Msg& msg) { + for (int i = 0; i < msg.actions_size(); ++i) { + const auto& action = msg.actions(i); + switch (action.action_type_case()) { + case memory_quota_fuzzer::Action::kFlushExecCtx: + ExecCtx::Get()->Flush(); + break; + case memory_quota_fuzzer::Action::kCreateQuota: + memory_quotas_.emplace(action.quota(), + RefCountedPtr(new MemoryQuota())); + break; + case memory_quota_fuzzer::Action::kDeleteQuota: + memory_quotas_.erase(action.quota()); + break; + case memory_quota_fuzzer::Action::kCreateAllocator: + WithQuota(action.quota(), + [this, action](RefCountedPtr q) { + memory_allocators_.emplace(action.allocator(), + q->MakeMemoryAllocator()); + }); + break; + case memory_quota_fuzzer::Action::kDeleteAllocator: + memory_allocators_.erase(action.allocator()); + break; + case memory_quota_fuzzer::Action::kSetQuotaSize: + WithQuota(action.quota(), [action](RefCountedPtr q) { + q->SetSize(Clamp(action.set_quota_size(), uint64_t{0}, + uint64_t{std::numeric_limits::max()})); + }); + break; + case memory_quota_fuzzer::Action::kRebindQuota: + WithQuota(action.quota(), + [this, action](RefCountedPtr q) { + WithAllocator(action.allocator(), + [q](MemoryAllocator* a) { a->Rebind(q); }); + }); + break; + case memory_quota_fuzzer::Action::kCreateAllocation: { + auto min = action.create_allocation().min(); + auto max = action.create_allocation().max(); + if (min > max) break; + MemoryRequest req(min, max); + WithAllocator( + action.allocator(), [this, action, req](MemoryAllocator* a) { + auto alloc = a->MakeReservation(req); + allocations_.emplace(action.allocation(), std::move(alloc)); + }); + } break; + case memory_quota_fuzzer::Action::kDeleteAllocation: + allocations_.erase(action.allocation()); + break; + case memory_quota_fuzzer::Action::kPostReclaimer: { + std::function reclaimer; + auto cfg = action.post_reclaimer(); + if (cfg.synchronous()) { + reclaimer = [this, cfg](ReclamationSweep) { RunMsg(cfg.msg()); }; + } else { + reclaimer = [cfg, this](ReclamationSweep sweep) { + struct Args { + ReclamationSweep sweep; + memory_quota_fuzzer::Msg msg; + Fuzzer* fuzzer; + }; + auto* args = new Args{std::move(sweep), cfg.msg(), this}; + auto* closure = GRPC_CLOSURE_CREATE( + [](void* arg, grpc_error*) { + auto* args = static_cast(arg); + args->fuzzer->RunMsg(args->msg); + delete args; + }, + args, nullptr); + ExecCtx::Get()->Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE); + }; + auto pass = MapReclamationPass(cfg.pass()); + WithAllocator(action.allocator(), + [pass, reclaimer](MemoryAllocator* a) { + a->PostReclaimer(pass, reclaimer); + }); + } + } break; + case memory_quota_fuzzer::Action::ACTION_TYPE_NOT_SET: + break; + } + } + } + + template + void WithQuota(int quota, F f) { + auto it = memory_quotas_.find(quota); + if (it == memory_quotas_.end()) return; + f(it->second); + } + + template + void WithAllocator(int allocator, F f) { + auto it = memory_allocators_.find(allocator); + if (it == memory_allocators_.end()) return; + f(it->second.get()); + } + + std::map> memory_quotas_; + std::map> memory_allocators_; + std::map allocations_; +}; + +} // namespace + +} // namespace grpc_core + +static void dont_log(gpr_log_func_args* /*args*/) {} + +DEFINE_PROTO_FUZZER(const memory_quota_fuzzer::Msg& msg) { + if (squelch) gpr_set_log_function(dont_log); + gpr_log_verbosity_init(); + grpc_tracer_init(); + grpc_core::Fuzzer().Run(msg); +} diff --git a/test/core/resource_quota/memory_quota_fuzzer.proto b/test/core/resource_quota/memory_quota_fuzzer.proto new file mode 100644 index 00000000000..63d8a2e08e2 --- /dev/null +++ b/test/core/resource_quota/memory_quota_fuzzer.proto @@ -0,0 +1,57 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package memory_quota_fuzzer; + +message Empty {} + +message Reclaimer { + enum Pass { + BENIGN = 0; + IDLE = 1; + DESTRUCTIVE = 2; + } + bool synchronous = 1; + Pass pass = 2; + Msg msg = 3; +} + +message AllocationRequest { + uint32 min = 1; + uint32 max = 2; +} + +message Action { + int32 quota = 1; + int32 allocator = 2; + int32 allocation = 3; + oneof action_type { + Empty flush_exec_ctx = 7; + Empty create_quota = 10; + Empty delete_quota = 11; + Empty create_allocator = 12; + Empty delete_allocator = 13; + uint64 set_quota_size = 14; + Empty rebind_quota = 15; + AllocationRequest create_allocation = 16; + Empty delete_allocation = 17; + Reclaimer post_reclaimer = 18; + } +} + +message Msg { + repeated Action actions = 2; +} diff --git a/test/core/resource_quota/memory_quota_fuzzer_corpus/0 b/test/core/resource_quota/memory_quota_fuzzer_corpus/0 new file mode 100644 index 00000000000..8b137891791 --- /dev/null +++ b/test/core/resource_quota/memory_quota_fuzzer_corpus/0 @@ -0,0 +1 @@ + diff --git a/test/core/resource_quota/memory_quota_stress_test.cc b/test/core/resource_quota/memory_quota_stress_test.cc new file mode 100644 index 00000000000..56d8c93c818 --- /dev/null +++ b/test/core/resource_quota/memory_quota_stress_test.cc @@ -0,0 +1,210 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/resource_quota/memory_quota.h" + +namespace grpc_core { + +namespace { +class StressTest { + public: + // Create a stress test with some size. + StressTest(size_t num_quotas, size_t num_allocators) { + for (size_t i = 0; i < num_quotas; ++i) { + quotas_.emplace_back(new MemoryQuota()); + } + std::random_device g; + std::uniform_int_distribution dist(0, num_quotas - 1); + for (size_t i = 0; i < num_allocators; ++i) { + allocators_.emplace_back(quotas_[dist(g)]->MakeMemoryAllocator()); + } + } + + // Run the thread for some period of time. + void Run(int seconds) { + std::vector threads; + + // A few threads constantly rebinding allocators to different quotas. + threads.reserve(2); + for (int i = 0; i < 2; i++) threads.push_back(Run(Rebinder)); + // And another few threads constantly resizing quotas. + for (int i = 0; i < 2; i++) threads.push_back(Run(Resizer)); + + // For each (allocator, pass), start a thread continuously allocating from + // that allocator. Whenever the first allocation is made, schedule a + // reclaimer for that pass. + for (size_t i = 0; i < allocators_.size(); i++) { + auto* allocator = allocators_[i].get(); + for (ReclamationPass pass : + {ReclamationPass::kBenign, ReclamationPass::kIdle, + ReclamationPass::kDestructive}) { + threads.push_back(Run([allocator, pass](StatePtr st) mutable { + if (st->RememberReservation( + allocator->MakeReservation(st->RandomRequest()))) { + allocator->PostReclaimer( + pass, [st](ReclamationSweep) { st->ForgetReservations(); }); + } + })); + } + } + + // All threads started, wait for the alloted time. + std::this_thread::sleep_for(std::chrono::seconds(seconds)); + + // Toggle the completion bit, and then wait for the threads. + done_.store(true, std::memory_order_relaxed); + while (!threads.empty()) { + threads.back().join(); + threads.pop_back(); + } + } + + private: + // Per-thread state. + // Not everything is used on every thread, but it's not terrible having the + // extra state around and it does simplify things somewhat. + class State { + public: + explicit State(StressTest* test) + : test_(test), + quotas_distribution_(0, test_->quotas_.size() - 1), + allocators_distribution_(0, test_->allocators_.size() - 1), + size_distribution_(1, 4 * 1024 * 1024), + quota_size_distribution_(1024 * 1024, size_t(8) * 1024 * 1024 * 1024), + choose_variable_size_(1, 100) {} + + // Choose a random quota, and return an owned pointer to it. + // Not thread-safe, only callable from the owning thread. + RefCountedPtr RandomQuota() { + return test_->quotas_[quotas_distribution_(g_)]; + } + + // Choose a random allocator, and return a borrowed pointer to it. + // Not thread-safe, only callable from the owning thread. + MemoryAllocator* RandomAllocator() { + return test_->allocators_[allocators_distribution_(g_)].get(); + } + + // Random memory request size - 1% of allocations are chosen to be variable + // sized - the rest are fixed (since variable sized create some contention + // problems between allocator threads of different passes on the same + // allocator). + // Not thread-safe, only callable from the owning thread. + MemoryRequest RandomRequest() { + size_t a = size_distribution_(g_); + if (choose_variable_size_(g_) == 1) { + size_t b = size_distribution_(g_); + return MemoryRequest(std::min(a, b), std::max(a, b)); + } + return MemoryRequest(a); + } + + // Choose a new size for a backing quota. + // Not thread-safe, only callable from the owning thread. + size_t RandomQuotaSize() { return quota_size_distribution_(g_); } + + // Remember a reservation, return true if it's the first remembered since + // the last reclamation. + // Thread-safe. + bool RememberReservation(MemoryAllocator::Reservation reservation) + ABSL_LOCKS_EXCLUDED(mu_) { + MutexLock lock(&mu_); + bool was_empty = reservations_.empty(); + reservations_.emplace_back(std::move(reservation)); + return was_empty; + } + + // Return all reservations made until this moment, so that they can be + // dropped. + std::vector ForgetReservations() + ABSL_LOCKS_EXCLUDED(mu_) { + MutexLock lock(&mu_); + return std::move(reservations_); + } + + private: + // Owning test. + StressTest* const test_; + // Random number generator. + std::mt19937 g_{std::random_device()()}; + // Distribution to choose a quota. + std::uniform_int_distribution quotas_distribution_; + // Distribution to choose an allocator. + std::uniform_int_distribution allocators_distribution_; + // Distribution to choose an allocation size. + std::uniform_int_distribution size_distribution_; + // Distribution to choose a quota size. + std::uniform_int_distribution quota_size_distribution_; + // Distribution to choose whether to make a variable-sized allocation. + std::uniform_int_distribution choose_variable_size_; + + // Mutex to protect the reservation list. + Mutex mu_; + // Reservations remembered by this thread. + std::vector reservations_ + ABSL_GUARDED_BY(mu_); + }; + // Type alias since we always pass around these shared pointers. + using StatePtr = std::shared_ptr; + + // Choose one allocator, one quota, rebind the allocator to the quota. + static void Rebinder(StatePtr st) { + MemoryAllocator* allocator = st->RandomAllocator(); + RefCountedPtr quota = st->RandomQuota(); + allocator->Rebind(std::move(quota)); + } + + // Choose one allocator, resize it to a randomly chosen size. + static void Resizer(StatePtr st) { + RefCountedPtr quota = st->RandomQuota(); + size_t size = st->RandomQuotaSize(); + quota->SetSize(size); + } + + // Create a thread that repeatedly runs a function until the test is done. + // We create one instance of State that we pass as a StatePtr to said + // function as the current overall state for this thread. + // Monitors done_ to see when we should stop. + // Ensures there's an ExecCtx for each iteration of the loop. + template + std::thread Run(Fn fn) { + return std::thread([this, fn]() mutable { + auto state = std::make_shared(this); + while (!done_.load(std::memory_order_relaxed)) { + ExecCtx exec_ctx; + fn(state); + } + }); + } + + // Flag for when the test is completed. + std::atomic done_{false}; + + // Memory quotas to test against. We build this up at construction time, but + // then don't resize, so we can load from it continuously from all of the + // threads. + std::vector> quotas_; + // Memory allocators to test against. Similarly, built at construction time, + // and then the shape of this vector is not changed. + std::vector> allocators_; +}; +} // namespace + +} // namespace grpc_core + +int main(int, char**) { grpc_core::StressTest(16, 64).Run(8); } diff --git a/test/core/resource_quota/memory_quota_test.cc b/test/core/resource_quota/memory_quota_test.cc new file mode 100644 index 00000000000..1639a191a6c --- /dev/null +++ b/test/core/resource_quota/memory_quota_test.cc @@ -0,0 +1,176 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/lib/resource_quota/memory_quota.h" + +#include + +#include "absl/synchronization/notification.h" + +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/slice/slice_refcount.h" + +namespace grpc_core { +namespace testing { + +// +// Helpers +// + +template +struct Sized { + char blah[kSize]; + virtual ~Sized() {} +}; + +// +// MemoryRequestTest +// + +TEST(MemoryRequestTest, ConversionFromSize) { + MemoryRequest request = 3; + EXPECT_EQ(request.min(), 3); + EXPECT_EQ(request.max(), 3); +} + +TEST(MemoryRequestTest, MinMax) { + MemoryRequest request(3, 7); + EXPECT_EQ(request.min(), 3); + EXPECT_EQ(request.max(), 7); +} + +// +// MemoryQuotaTest +// + +TEST(MemoryQuotaTest, NoOp) { + RefCountedPtr memory_quota = MakeRefCounted(); +} + +TEST(MemoryQuotaTest, CreateAllocatorNoOp) { + RefCountedPtr memory_quota = MakeRefCounted(); + auto memory_allocator = memory_quota->MakeMemoryAllocator(); +} + +TEST(MemoryQuotaTest, CreateObjectFromAllocator) { + RefCountedPtr memory_quota = MakeRefCounted(); + auto memory_allocator = memory_quota->MakeMemoryAllocator(); + auto object = memory_allocator->MakeUnique>(); +} + +TEST(MemoryQuotaTest, CreateSomeObjectsAndExpectReclamation) { + ExecCtx exec_ctx; + + RefCountedPtr memory_quota = MakeRefCounted(); + memory_quota->SetSize(4096); + auto memory_allocator = memory_quota->MakeMemoryAllocator(); + auto object = memory_allocator->MakeUnique>(); + + memory_allocator->PostReclaimer( + ReclamationPass::kDestructive, + [&object](ReclamationSweep) { object.reset(); }); + auto object2 = memory_allocator->MakeUnique>(); + exec_ctx.Flush(); + EXPECT_EQ(object.get(), nullptr); + + memory_allocator->PostReclaimer( + ReclamationPass::kDestructive, + [&object2](ReclamationSweep) { object2.reset(); }); + auto object3 = memory_allocator->MakeUnique>(); + exec_ctx.Flush(); + EXPECT_EQ(object2.get(), nullptr); +} + +TEST(MemoryQuotaTest, BasicRebind) { + ExecCtx exec_ctx; + + RefCountedPtr memory_quota = MakeRefCounted(); + memory_quota->SetSize(4096); + RefCountedPtr memory_quota2 = MakeRefCounted(); + memory_quota2->SetSize(4096); + + auto memory_allocator = memory_quota2->MakeMemoryAllocator(); + auto object = memory_allocator->MakeUnique>(); + + memory_allocator->Rebind(memory_quota); + auto memory_allocator2 = memory_quota2->MakeMemoryAllocator(); + + memory_allocator2->PostReclaimer(ReclamationPass::kDestructive, + [](ReclamationSweep) { + // Taken memory should be reassigned to + // memory_quota, so this should never be + // reached. + abort(); + }); + + memory_allocator->PostReclaimer(ReclamationPass::kDestructive, + [&object](ReclamationSweep) { + // The new memory allocator should reclaim + // the object allocated against the previous + // quota because that's now part of this + // quota. + object.reset(); + }); + + auto object2 = memory_allocator->MakeUnique>(); + exec_ctx.Flush(); + EXPECT_EQ(object.get(), nullptr); +} + +TEST(MemoryQuotaTest, ReserveRangeNoPressure) { + RefCountedPtr memory_quota = MakeRefCounted(); + auto memory_allocator = memory_quota->MakeMemoryAllocator(); + size_t total = 0; + for (int i = 0; i < 10000; i++) { + auto n = memory_allocator->Reserve(MemoryRequest(100, 40000)); + EXPECT_EQ(n, 40000); + total += n; + } + memory_allocator->Release(total); +} + +TEST(MemoryQuotaTest, MakeSlice) { + RefCountedPtr memory_quota = MakeRefCounted(); + auto memory_allocator = memory_quota->MakeMemoryAllocator(); + std::vector slices; + for (int i = 1; i < 1000; i++) { + int min = i; + int max = 10 * i - 9; + slices.push_back(memory_allocator->MakeSlice(MemoryRequest(min, max))); + } + for (grpc_slice slice : slices) { + grpc_slice_unref_internal(slice); + } +} + +TEST(MemoryQuotaTest, ContainerAllocator) { + RefCountedPtr memory_quota = MakeRefCounted(); + auto memory_allocator = memory_quota->MakeMemoryAllocator(); + Vector vec(memory_allocator.get()); + for (int i = 0; i < 100000; i++) { + vec.push_back(i); + } +} + +} // namespace testing +} // namespace grpc_core + +// Hook needed to run ExecCtx outside of iomgr. +void grpc_set_default_iomgr_platform() {} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + gpr_log_verbosity_init(); + return RUN_ALL_TESTS(); +} diff --git a/test/core/resource_quota/resource_quota_test.cc b/test/core/resource_quota/resource_quota_test.cc new file mode 100644 index 00000000000..07852a312cf --- /dev/null +++ b/test/core/resource_quota/resource_quota_test.cc @@ -0,0 +1,37 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/lib/resource_quota/resource_quota.h" + +#include + +namespace grpc_core { +namespace testing { + +TEST(ResourceQuotaTest, Works) { + auto q = MakeRefCounted(); + EXPECT_NE(q->thread_quota(), nullptr); + EXPECT_NE(q->memory_quota(), nullptr); +} + +} // namespace testing +} // namespace grpc_core + +// Hook needed to run ExecCtx outside of iomgr. +void grpc_set_default_iomgr_platform() {} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/core/resource_quota/thread_quota_test.cc b/test/core/resource_quota/thread_quota_test.cc new file mode 100644 index 00000000000..a2014be647c --- /dev/null +++ b/test/core/resource_quota/thread_quota_test.cc @@ -0,0 +1,45 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/lib/resource_quota/thread_quota.h" + +#include + +namespace grpc_core { +namespace testing { + +TEST(ThreadQuotaTest, Works) { + auto q = MakeRefCounted(); + EXPECT_TRUE(q->Reserve(128)); + q->SetMax(10); + EXPECT_FALSE(q->Reserve(128)); + EXPECT_FALSE(q->Reserve(1)); + q->Release(118); + EXPECT_FALSE(q->Reserve(1)); + q->Release(1); + EXPECT_TRUE(q->Reserve(1)); + EXPECT_FALSE(q->Reserve(1)); + q->Release(10); +} + +} // namespace testing +} // namespace grpc_core + +// Hook needed to run ExecCtx outside of iomgr. +void grpc_set_default_iomgr_platform() {} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 5250f6f0192..47edfb04aa4 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -1777,6 +1777,26 @@ ], "uses_polling": false }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "posix" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": false, + "language": "c", + "name": "memory_quota_stress_test", + "platforms": [ + "linux", + "posix" + ], + "uses_polling": false + }, { "args": [], "benchmark": false, @@ -2209,30 +2229,6 @@ ], "uses_polling": true }, - { - "args": [], - "benchmark": false, - "ci_platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "cpu_cost": 1.0, - "exclude_configs": [], - "exclude_iomgrs": [], - "flaky": false, - "gtest": false, - "language": "c", - "name": "resource_quota_test", - "platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "uses_polling": true - }, { "args": [], "benchmark": false, @@ -2795,6 +2791,30 @@ ], "uses_polling": false }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": false, + "language": "c", + "name": "test_core_iomgr_resource_quota_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false, @@ -5657,6 +5677,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "memory_quota_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false, @@ -6987,6 +7031,30 @@ ], "uses_polling": false }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "test_core_resource_quota_resource_quota_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false, @@ -7107,6 +7175,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "thread_quota_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,