[resource_quota] Flow control experiment (#30310)

* ideation

* fix periodic update behavior around initialization

* fix

* finish it up

* finish it up

* fix

* fix

* fix

* Update BUILD

* Automated change: Fix sanity tests

* tweak

* Automated change: Fix sanity tests

* pid after all

* fix

* tweak

* Automated change: Fix sanity tests

* fix

* fix

* comment

* fix

* comment

* better comment

* make set point configurable

* fix test

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/30384/head
Craig Tiller 2 years ago committed by GitHub
parent daee661f3c
commit 93fbacb534
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 15
      CMakeLists.txt
  3. 22
      build_autogenerated.yaml
  4. 64
      src/core/ext/transport/chttp2/transport/flow_control.cc
  5. 1
      src/core/ext/transport/chttp2/transport/flow_control.h
  6. 55
      src/core/lib/resource_quota/memory_quota.cc
  7. 29
      src/core/lib/resource_quota/memory_quota.h
  8. 5
      src/core/lib/resource_quota/periodic_update.cc
  9. 10
      src/core/lib/resource_quota/periodic_update.h
  10. 76
      test/core/resource_quota/memory_quota_test.cc
  11. 15
      test/core/resource_quota/periodic_update_test.cc

@ -1865,6 +1865,7 @@ grpc_cc_library(
"map",
"orphanable",
"periodic_update",
"pid_controller",
"poll",
"race",
"ref_counted_ptr",
@ -1883,6 +1884,7 @@ grpc_cc_library(
hdrs = [
"src/core/lib/resource_quota/periodic_update.h",
],
external_deps = ["absl/functional:function_ref"],
deps = [
"exec_ctx",
"gpr_platform",

15
CMakeLists.txt generated

@ -5968,6 +5968,7 @@ add_executable(arena_promise_test
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_refcount.cc
src/core/lib/slice/slice_string_helpers.cc
src/core/lib/transport/pid_controller.cc
test/core/promise/arena_promise_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@ -5996,6 +5997,7 @@ target_link_libraries(arena_promise_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::any_invocable
absl::function_ref
absl::type_traits
absl::statusor
absl::utility
@ -7601,6 +7603,7 @@ add_executable(chunked_vector_test
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_refcount.cc
src/core/lib/slice/slice_string_helpers.cc
src/core/lib/transport/pid_controller.cc
test/core/gprpp/chunked_vector_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@ -7629,6 +7632,7 @@ target_link_libraries(chunked_vector_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::any_invocable
absl::function_ref
absl::type_traits
absl::statusor
absl::utility
@ -9860,6 +9864,7 @@ add_executable(for_each_test
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_refcount.cc
src/core/lib/slice/slice_string_helpers.cc
src/core/lib/transport/pid_controller.cc
test/core/promise/for_each_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@ -9889,6 +9894,7 @@ target_link_libraries(for_each_test
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
absl::any_invocable
absl::function_ref
absl::hash
absl::type_traits
absl::statusor
@ -12736,6 +12742,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_refcount.cc
src/core/lib/slice/slice_string_helpers.cc
src/core/lib/transport/pid_controller.cc
test/core/resource_quota/memory_quota_stress_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@ -12764,6 +12771,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::any_invocable
absl::function_ref
absl::type_traits
absl::statusor
absl::utility
@ -12796,6 +12804,7 @@ add_executable(memory_quota_test
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_refcount.cc
src/core/lib/slice/slice_string_helpers.cc
src/core/lib/transport/pid_controller.cc
test/core/resource_quota/memory_quota_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@ -12824,6 +12833,7 @@ target_link_libraries(memory_quota_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::any_invocable
absl::function_ref
absl::type_traits
absl::statusor
absl::utility
@ -13768,6 +13778,7 @@ target_link_libraries(periodic_update_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::any_invocable
absl::function_ref
absl::statusor
gpr
upb
@ -13835,6 +13846,7 @@ add_executable(pipe_test
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_refcount.cc
src/core/lib/slice/slice_string_helpers.cc
src/core/lib/transport/pid_controller.cc
test/core/promise/pipe_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@ -13863,6 +13875,7 @@ target_link_libraries(pipe_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::any_invocable
absl::function_ref
absl::type_traits
absl::statusor
absl::utility
@ -14692,6 +14705,7 @@ add_executable(resource_quota_test
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_refcount.cc
src/core/lib/slice/slice_string_helpers.cc
src/core/lib/transport/pid_controller.cc
test/core/resource_quota/resource_quota_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@ -14720,6 +14734,7 @@ target_link_libraries(resource_quota_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::any_invocable
absl::function_ref
absl::type_traits
absl::statusor
absl::utility

@ -3935,6 +3935,7 @@ targets:
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_refcount_base.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/lib/transport/pid_controller.h
- test/core/promise/test_context.h
src:
- src/core/ext/upb-generated/google/protobuf/any.upb.c
@ -3959,9 +3960,11 @@ targets:
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_refcount.cc
- src/core/lib/slice/slice_string_helpers.cc
- src/core/lib/transport/pid_controller.cc
- test/core/promise/arena_promise_test.cc
deps:
- absl/functional:any_invocable
- absl/functional:function_ref
- absl/meta:type_traits
- absl/status:statusor
- absl/utility:utility
@ -4571,6 +4574,7 @@ targets:
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_refcount_base.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/lib/transport/pid_controller.h
src:
- src/core/ext/upb-generated/google/protobuf/any.upb.c
- src/core/ext/upb-generated/google/rpc/status.upb.c
@ -4594,9 +4598,11 @@ targets:
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_refcount.cc
- src/core/lib/slice/slice_string_helpers.cc
- src/core/lib/transport/pid_controller.cc
- test/core/gprpp/chunked_vector_test.cc
deps:
- absl/functional:any_invocable
- absl/functional:function_ref
- absl/meta:type_traits
- absl/status:statusor
- absl/utility:utility
@ -5592,6 +5598,7 @@ targets:
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_refcount_base.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/lib/transport/pid_controller.h
- test/core/promise/test_wakeup_schedulers.h
src:
- src/core/ext/upb-generated/google/protobuf/any.upb.c
@ -5616,10 +5623,12 @@ targets:
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_refcount.cc
- src/core/lib/slice/slice_string_helpers.cc
- src/core/lib/transport/pid_controller.cc
- test/core/promise/for_each_test.cc
deps:
- absl/container:flat_hash_set
- absl/functional:any_invocable
- absl/functional:function_ref
- absl/hash:hash
- absl/meta:type_traits
- absl/status:statusor
@ -6659,6 +6668,7 @@ targets:
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_refcount_base.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/lib/transport/pid_controller.h
src:
- src/core/ext/upb-generated/google/protobuf/any.upb.c
- src/core/ext/upb-generated/google/rpc/status.upb.c
@ -6679,9 +6689,11 @@ targets:
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_refcount.cc
- src/core/lib/slice/slice_string_helpers.cc
- src/core/lib/transport/pid_controller.cc
- test/core/resource_quota/memory_quota_stress_test.cc
deps:
- absl/functional:any_invocable
- absl/functional:function_ref
- absl/meta:type_traits
- absl/status:statusor
- absl/utility:utility
@ -6734,6 +6746,7 @@ targets:
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_refcount_base.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/lib/transport/pid_controller.h
- test/core/resource_quota/call_checker.h
src:
- src/core/ext/upb-generated/google/protobuf/any.upb.c
@ -6755,9 +6768,11 @@ targets:
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_refcount.cc
- src/core/lib/slice/slice_string_helpers.cc
- src/core/lib/transport/pid_controller.cc
- test/core/resource_quota/memory_quota_test.cc
deps:
- absl/functional:any_invocable
- absl/functional:function_ref
- absl/meta:type_traits
- absl/status:statusor
- absl/utility:utility
@ -7160,6 +7175,7 @@ targets:
- test/core/resource_quota/periodic_update_test.cc
deps:
- absl/functional:any_invocable
- absl/functional:function_ref
- absl/status:statusor
- gpr
- upb
@ -7225,6 +7241,7 @@ targets:
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_refcount_base.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/lib/transport/pid_controller.h
- test/core/promise/test_wakeup_schedulers.h
src:
- src/core/ext/upb-generated/google/protobuf/any.upb.c
@ -7249,9 +7266,11 @@ targets:
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_refcount.cc
- src/core/lib/slice/slice_string_helpers.cc
- src/core/lib/transport/pid_controller.cc
- test/core/promise/pipe_test.cc
deps:
- absl/functional:any_invocable
- absl/functional:function_ref
- absl/meta:type_traits
- absl/status:statusor
- absl/utility:utility
@ -7598,6 +7617,7 @@ targets:
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_refcount_base.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/lib/transport/pid_controller.h
src:
- src/core/ext/upb-generated/google/protobuf/any.upb.c
- src/core/ext/upb-generated/google/rpc/status.upb.c
@ -7620,9 +7640,11 @@ targets:
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_refcount.cc
- src/core/lib/slice/slice_string_helpers.cc
- src/core/lib/transport/pid_controller.cc
- test/core/resource_quota/resource_quota_test.cc
deps:
- absl/functional:any_invocable
- absl/functional:function_ref
- absl/meta:type_traits
- absl/status:statusor
- absl/utility:utility

@ -36,11 +36,17 @@
#include <grpc/support/log.h>
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/global_config_env.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/resource_quota/memory_quota.h"
grpc_core::TraceFlag grpc_flowctl_trace(false, "flowctl");
GPR_GLOBAL_CONFIG_DEFINE_BOOL(
grpc_experimental_broad_flow_control_range, false,
"Use an enlarged memory pressure range for scaling flow control when using "
"a resource quota.");
namespace grpc_core {
namespace chttp2 {
@ -212,6 +218,58 @@ double TransportFlowControl::SmoothLogBdp(double value) {
return pid_controller_.Update(bdp_error, dt > kMaxDt ? kMaxDt : dt);
}
double
TransportFlowControl::TargetInitialWindowSizeBasedOnMemoryPressureAndBdp()
const {
const double bdp = bdp_estimator_.EstimateBdp() * 2.0;
const double memory_pressure = memory_owner_->InstantaneousPressure();
// Linear interpolation between two values.
// Given a line segment between the two points (t_min, a), and (t_max, b),
// and a value t such that t_min <= t <= t_max, return the value on the line
// segment at t.
auto lerp = [](double t, double t_min, double t_max, double a, double b) {
return a + (b - a) * (t - t_min) / (t_max - t_min);
};
// We split memory pressure into three broad regions:
// 1. Low memory pressure, the "anything goes" case - we assume no memory
// pressure concerns and advertise a huge window to keep things flowing.
// 2. Moderate memory pressure, the "adjust to BDP" case - we linearly ramp
// down window size to 2*BDP - which should still allow bytes to flow, but
// is arguably more considered.
// 3. High memory pressure - past 50% we linearly ramp down window size from
// BDP to 0 - at which point senders effectively must request to send bytes
// to us.
//
// ▲
// │
// 16mb ────┤---------x----
// │ -----
// BDP ────┤ ----x---
// │ ----
// │ -----
// │ ----
// │ -----
// │ ---x
// ├─────────┬─────────────┬────────────────────────┬─────►
// │Anything │Adjust to │Drop to zero │
// │Goes │BDP │ │
// 0% 20% 50% 100% memory
// pressure
const double kAnythingGoesPressure = 0.2;
const double kAdjustedToBdpPressure = 0.5;
const double kAnythingGoesWindow = std::max(double(1 << 24), bdp);
if (memory_pressure < kAnythingGoesPressure) {
return kAnythingGoesWindow;
} else if (memory_pressure < kAdjustedToBdpPressure) {
return lerp(memory_pressure, kAnythingGoesPressure, kAdjustedToBdpPressure,
kAnythingGoesWindow, bdp);
} else if (memory_pressure < 1.0) {
return lerp(memory_pressure, kAdjustedToBdpPressure, 1.0, bdp, 0);
} else {
return 0;
}
}
void TransportFlowControl::UpdateSetting(
int64_t* desired_value, int64_t new_desired_value,
FlowControlAction* action,
@ -227,13 +285,17 @@ void TransportFlowControl::UpdateSetting(
}
FlowControlAction TransportFlowControl::PeriodicUpdate() {
static const bool kSmoothMemoryPressure =
GPR_GLOBAL_CONFIG_GET(grpc_experimental_smooth_memory_presure);
FlowControlAction action;
if (enable_bdp_probe_) {
// get bdp estimate and update initial_window accordingly.
// target might change based on how much memory pressure we are under
// TODO(ncteisen): experiment with setting target to be huge under low
// memory pressure.
double target = pow(2, SmoothLogBdp(TargetLogBdp()));
double target = kSmoothMemoryPressure
? TargetInitialWindowSizeBasedOnMemoryPressureAndBdp()
: pow(2, SmoothLogBdp(TargetLogBdp()));
if (g_test_only_transport_target_window_estimates_mocker != nullptr) {
// Hook for simulating unusual flow control situations in tests.
target = g_test_only_transport_target_window_estimates_mocker

@ -252,6 +252,7 @@ class TransportFlowControl final {
private:
double TargetLogBdp();
double SmoothLogBdp(double value);
double TargetInitialWindowSizeBasedOnMemoryPressureAndBdp() const;
static void UpdateSetting(int64_t* desired_value, int64_t new_desired_value,
FlowControlAction* action,
FlowControlAction& (FlowControlAction::*set)(

@ -37,12 +37,17 @@
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/resource_quota/trace.h"
GPR_GLOBAL_CONFIG_DEFINE_BOOL(grpc_experimental_smooth_memory_presure, false,
"smooth the value of memory pressure over time");
GPR_GLOBAL_CONFIG_DEFINE_BOOL(
grpc_experimental_enable_periodic_resource_quota_reclamation, false,
"Enable experimental feature to reclaim resource quota periodically");
GPR_GLOBAL_CONFIG_DEFINE_INT32(
grpc_experimental_max_quota_buffer_size, 1024 * 1024,
"Maximum size for one memory allocators buffer size against a quota");
GPR_GLOBAL_CONFIG_DEFINE_INT32(
grpc_experimental_resource_quota_set_point, 60,
"Ask the resource quota to target this percentage of total quota usage.");
namespace grpc_core {
@ -459,7 +464,9 @@ void BasicMemoryQuota::Return(size_t amount) {
}
std::pair<double, size_t>
BasicMemoryQuota::InstantaneousPressureAndMaxRecommendedAllocationSize() const {
BasicMemoryQuota::InstantaneousPressureAndMaxRecommendedAllocationSize() {
static const bool kSmoothMemoryPressure =
GPR_GLOBAL_CONFIG_GET(grpc_experimental_smooth_memory_presure);
double free = free_bytes_.load();
if (free < 0) free = 0;
size_t quota_size = quota_size_.load();
@ -468,9 +475,55 @@ BasicMemoryQuota::InstantaneousPressureAndMaxRecommendedAllocationSize() const {
double pressure = (size - free) / size;
if (pressure < 0.0) pressure = 0.0;
if (pressure > 1.0) pressure = 1.0;
if (kSmoothMemoryPressure) {
pressure = pressure_tracker_.AddSampleAndGetEstimate(pressure);
}
return std::make_pair(pressure, quota_size / 16);
}
//
// PressureTracker
//
namespace memory_quota_detail {
double PressureTracker::AddSampleAndGetEstimate(double sample) {
static const double kSetPoint =
GPR_GLOBAL_CONFIG_GET(grpc_experimental_resource_quota_set_point) / 100.0;
double max_so_far = max_this_round_.load(std::memory_order_relaxed);
if (sample > max_so_far) {
max_this_round_.compare_exchange_weak(max_so_far, sample,
std::memory_order_relaxed,
std::memory_order_relaxed);
}
// If memory pressure is almost done, immediately hit the brakes and report
// full memory usage.
if (sample >= 0.99) {
report_.store(1.0, std::memory_order_relaxed);
}
update_.Tick([&](Duration dt) {
// Reset the round tracker with the new sample.
const double current_estimate =
max_this_round_.exchange(sample, std::memory_order_relaxed);
double report;
if (current_estimate > 0.99) {
// Under very high memory pressure we... just max things out.
report = pid_.Update(1e99, 1.0);
} else {
report = pid_.Update(current_estimate - kSetPoint, dt.seconds());
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
gpr_log(GPR_INFO, "RQ: pressure:%lf report:%lf error_integral:%lf",
current_estimate, report, pid_.error_integral());
}
report_.store(report, std::memory_order_relaxed);
});
return report_.load(std::memory_order_relaxed);
}
} // namespace memory_quota_detail
//
// MemoryQuota
//

@ -42,7 +42,9 @@
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/resource_quota/periodic_update.h"
#include "src/core/lib/transport/pid_controller.h"
GPR_GLOBAL_CONFIG_DECLARE_BOOL(grpc_experimental_smooth_memory_presure);
GPR_GLOBAL_CONFIG_DECLARE_BOOL(
grpc_experimental_enable_periodic_resource_quota_reclamation);
GPR_GLOBAL_CONFIG_DECLARE_INT32(grpc_experimental_max_quota_buffer_size);
@ -226,6 +228,27 @@ class ReclaimerQueue {
std::shared_ptr<State> state_;
};
namespace memory_quota_detail {
// Utility to track memory pressure.
// Tries to be conservative (returns a higher pressure than there may actually
// be) but to be eventually accurate.
class PressureTracker {
public:
double AddSampleAndGetEstimate(double sample);
private:
std::atomic<double> max_this_round_{0.0};
std::atomic<double> report_{0.0};
PidController pid_{PidController::Args()
.set_gain_p(0.05)
.set_gain_i(0.005)
.set_integral_range(100.0)
.set_min_control_value(0.0)
.set_max_control_value(1.0)};
PeriodicUpdate update_{Duration::Seconds(1)};
};
} // namespace memory_quota_detail
class BasicMemoryQuota final
: public std::enable_shared_from_this<BasicMemoryQuota> {
public:
@ -250,7 +273,7 @@ class BasicMemoryQuota final
void Return(size_t amount);
// Instantaneous memory pressure approximation.
std::pair<double, size_t>
InstantaneousPressureAndMaxRecommendedAllocationSize() const;
InstantaneousPressureAndMaxRecommendedAllocationSize();
// Get a reclamation queue
ReclaimerQueue* reclaimer_queue(size_t i) { return &reclaimers_[i]; }
@ -282,6 +305,8 @@ class BasicMemoryQuota final
// We also increment this counter on completion of a sweep, as an indicator
// that the wait has ended.
std::atomic<uint64_t> reclamation_counter_{0};
// Memory pressure smoothing
memory_quota_detail::PressureTracker pressure_tracker_;
// The name of this quota - used for debugging/tracing/etc..
std::string name_;
};
@ -307,7 +332,7 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl {
size_t prev_free = free_bytes_.fetch_add(n, std::memory_order_release);
if ((max_quota_buffer_size() > 0 &&
prev_free + n > max_quota_buffer_size()) ||
(periodic_donate_back() && donate_back_.Tick())) {
(periodic_donate_back() && donate_back_.Tick([](Duration) {}))) {
// Try to immediately return some free'ed memory back to the total quota.
MaybeDonateBack();
}

@ -23,7 +23,7 @@
namespace grpc_core {
bool PeriodicUpdate::MaybeEndPeriod() {
bool PeriodicUpdate::MaybeEndPeriod(absl::FunctionRef<void(Duration)> f) {
if (period_start_ == Timestamp::ProcessEpoch()) {
period_start_ = ExecCtx::Get()->Now();
updates_remaining_.store(1, std::memory_order_release);
@ -59,7 +59,7 @@ bool PeriodicUpdate::MaybeEndPeriod() {
// we simply discard those decrements.
updates_remaining_.store(better_guess - expected_updates_per_period_,
std::memory_order_release);
// Not quite done, return false, try for longer.
// Not quite done, return, try for longer.
return false;
}
// Finished period, start a new one and return true.
@ -70,6 +70,7 @@ bool PeriodicUpdate::MaybeEndPeriod() {
period_.seconds() * expected_updates_per_period_ / time_so_far.seconds();
if (expected_updates_per_period_ < 1) expected_updates_per_period_ = 1;
period_start_ = now;
f(time_so_far);
updates_remaining_.store(expected_updates_per_period_,
std::memory_order_release);
return true;

@ -21,6 +21,8 @@
#include <atomic>
#include "absl/functional/function_ref.h"
#include "src/core/lib/gprpp/time.h"
namespace grpc_core {
@ -36,20 +38,20 @@ class PeriodicUpdate {
public:
explicit PeriodicUpdate(Duration period) : period_(period) {}
// Tick the update, return true if we think the period expired.
GRPC_MUST_USE_RESULT bool Tick() {
// Tick the update, call f and return true if we think the period expired.
bool Tick(absl::FunctionRef<void(Duration)> f) {
// Atomically decrement the remaining ticks counter.
// If we hit 0 our estimate of period length has expired.
// See the comment next to the data members for a description of thread
// safety.
if (updates_remaining_.fetch_sub(1, std::memory_order_acquire) == 1) {
return MaybeEndPeriod();
return MaybeEndPeriod(f);
}
return false;
}
private:
GRPC_MUST_USE_RESULT bool MaybeEndPeriod();
bool MaybeEndPeriod(absl::FunctionRef<void(Duration)> f);
// Thread safety:
// When updates_remaining_ reaches 0 the thread that decremented becomes

@ -15,6 +15,10 @@
#include "src/core/lib/resource_quota/memory_quota.h"
#include <algorithm>
#include <atomic>
#include <chrono>
#include <random>
#include <thread>
#include <vector>
#include "gtest/gtest.h"
@ -161,6 +165,78 @@ TEST(MemoryQuotaTest, NoBunchingIfIdle) {
}
} // namespace testing
//
// PressureTrackerTest
//
namespace memory_quota_detail {
namespace testing {
TEST(PressureTrackerTest, NoOp) { PressureTracker(); }
TEST(PressureTrackerTest, Decays) {
PressureTracker tracker;
int cur_ms = 0;
auto step_time = [&] {
++cur_ms;
return Timestamp::ProcessEpoch() + Duration::Seconds(1) +
Duration::Milliseconds(cur_ms);
};
// At start pressure is zero and we should be reading zero back.
{
ExecCtx exec_ctx;
exec_ctx.TestOnlySetNow(step_time());
EXPECT_EQ(tracker.AddSampleAndGetEstimate(0.0), 0.0);
}
// If memory pressure goes to 100% or higher, we should *immediately* snap to
// reporting 100%.
{
ExecCtx exec_ctx;
exec_ctx.TestOnlySetNow(step_time());
EXPECT_EQ(tracker.AddSampleAndGetEstimate(1.0), 1.0);
}
// Once memory pressure reduces, we should *eventually* get back to reporting
// close to zero, and monotonically decrease.
const int got_full = cur_ms;
double last_reported = 1.0;
while (true) {
ExecCtx exec_ctx;
exec_ctx.TestOnlySetNow(step_time());
double new_reported = tracker.AddSampleAndGetEstimate(0.0);
EXPECT_LE(new_reported, last_reported);
last_reported = new_reported;
if (new_reported < 0.1) break;
}
// Verify the above happened in a somewhat reasonable time.
ASSERT_LE(cur_ms, got_full + 200000);
}
TEST(PressureTrackerTest, ManyThreads) {
PressureTracker tracker;
std::vector<std::thread> threads;
std::atomic<bool> shutdown{false};
threads.reserve(10);
for (int i = 0; i < 10; i++) {
threads.emplace_back([&tracker, &shutdown] {
std::random_device rng;
std::uniform_real_distribution<double> dist(0.0, 1.0);
while (!shutdown.load(std::memory_order_relaxed)) {
ExecCtx exec_ctx;
tracker.AddSampleAndGetEstimate(dist(rng));
}
});
}
std::this_thread::sleep_for(std::chrono::seconds(5));
shutdown.store(true, std::memory_order_relaxed);
for (auto& thread : threads) {
thread.join();
}
}
} // namespace testing
} // namespace memory_quota_detail
} // namespace grpc_core
// Hook needed to run ExecCtx outside of iomgr.

@ -42,9 +42,10 @@ TEST(PeriodicUpdateTest, SimpleTest) {
start = exec_ctx.Now();
}
// Wait until the first period has elapsed.
while (true) {
bool done = false;
while (!done) {
ExecCtx exec_ctx;
if (upd->Tick()) break;
upd->Tick([&](Duration) { done = true; });
}
// Ensure that took at least 1 second.
{
@ -54,9 +55,10 @@ TEST(PeriodicUpdateTest, SimpleTest) {
}
// Do ten more update cycles
for (int i = 0; i < 10; i++) {
while (true) {
done = false;
while (!done) {
ExecCtx exec_ctx;
if (upd->Tick()) break;
upd->Tick([&](Duration) { done = true; });
}
// Ensure the time taken was between 1 and 1.5 seconds - we make a little
// allowance for the presumed inaccuracy of this type.
@ -87,7 +89,10 @@ TEST(PeriodicUpdate, ThreadTest) {
threads.push_back(std::thread([&]() {
while (count.load() < 10) {
ExecCtx exec_ctx;
if (upd->Tick()) count.fetch_add(1);
upd->Tick([&](Duration d) {
EXPECT_GE(d, Duration::Seconds(1));
count.fetch_add(1);
});
}
}));
}

Loading…
Cancel
Save