[resource_quota] Refine pressure api (#30464)

Better label the elements of the pressure api, explicitly calling out the control value vs the pressure value.
pull/30515/head
Craig Tiller 3 years ago committed by GitHub
parent c866d65966
commit 65fc31a988
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      src/core/ext/transport/chttp2/transport/flow_control.cc
  2. 29
      src/core/lib/resource_quota/memory_quota.cc
  3. 29
      src/core/lib/resource_quota/memory_quota.h
  4. 8
      test/core/resource_quota/memory_quota_test.cc

@ -204,7 +204,9 @@ static double AdjustForMemoryPressure(double memory_pressure, double target) {
double TransportFlowControl::TargetLogBdp() { double TransportFlowControl::TargetLogBdp() {
return AdjustForMemoryPressure( return AdjustForMemoryPressure(
memory_owner_->is_valid() ? memory_owner_->InstantaneousPressure() : 0.0, memory_owner_->is_valid()
? memory_owner_->GetPressureInfo().pressure_control_value
: 0.0,
1 + log2(bdp_estimator_.EstimateBdp())); 1 + log2(bdp_estimator_.EstimateBdp()));
} }
@ -222,7 +224,8 @@ double
TransportFlowControl::TargetInitialWindowSizeBasedOnMemoryPressureAndBdp() TransportFlowControl::TargetInitialWindowSizeBasedOnMemoryPressureAndBdp()
const { const {
const double bdp = bdp_estimator_.EstimateBdp() * 2.0; const double bdp = bdp_estimator_.EstimateBdp() * 2.0;
const double memory_pressure = memory_owner_->InstantaneousPressure(); const double memory_pressure =
memory_owner_->GetPressureInfo().pressure_control_value;
// Linear interpolation between two values. // Linear interpolation between two values.
// Given a line segment between the two points (t_min, a), and (t_max, b), // Given a line segment between the two points (t_min, a), and (t_max, b),
// and a value t such that t_min <= t <= t_max, return the value on the line // and a value t such that t_min <= t <= t_max, return the value on the line

@ -219,11 +219,10 @@ absl::optional<size_t> GrpcMemoryAllocatorImpl::TryReserve(
// Scale the request down according to memory pressure if we have that // Scale the request down according to memory pressure if we have that
// flexibility. // flexibility.
if (scaled_size_over_min != 0) { if (scaled_size_over_min != 0) {
const auto pressure_and_max_recommended_allocation_size = const auto pressure_info = memory_quota_->GetPressureInfo();
memory_quota_->InstantaneousPressureAndMaxRecommendedAllocationSize(); double pressure = pressure_info.pressure_control_value;
double pressure = pressure_and_max_recommended_allocation_size.first;
size_t max_recommended_allocation_size = size_t max_recommended_allocation_size =
pressure_and_max_recommended_allocation_size.second; pressure_info.max_recommended_allocation_size;
// Reduce allocation size proportional to the pressure > 80% usage. // Reduce allocation size proportional to the pressure > 80% usage.
if (pressure > 0.8) { if (pressure > 0.8) {
scaled_size_over_min = scaled_size_over_min =
@ -463,22 +462,26 @@ void BasicMemoryQuota::Return(size_t amount) {
free_bytes_.fetch_add(amount, std::memory_order_relaxed); free_bytes_.fetch_add(amount, std::memory_order_relaxed);
} }
std::pair<double, size_t> BasicMemoryQuota::PressureInfo BasicMemoryQuota::GetPressureInfo() {
BasicMemoryQuota::InstantaneousPressureAndMaxRecommendedAllocationSize() {
static const bool kSmoothMemoryPressure = static const bool kSmoothMemoryPressure =
GPR_GLOBAL_CONFIG_GET(grpc_experimental_smooth_memory_presure); GPR_GLOBAL_CONFIG_GET(grpc_experimental_smooth_memory_presure);
double free = free_bytes_.load(); double free = free_bytes_.load();
if (free < 0) free = 0; if (free < 0) free = 0;
size_t quota_size = quota_size_.load(); size_t quota_size = quota_size_.load();
double size = quota_size; double size = quota_size;
if (size < 1) return std::make_pair(1.0, 1); if (size < 1) return PressureInfo{1, 1, 1};
double pressure = (size - free) / size; PressureInfo pressure_info;
if (pressure < 0.0) pressure = 0.0; pressure_info.instantaneous_pressure = std::max(0.0, (size - free) / size);
if (pressure > 1.0) pressure = 1.0;
if (kSmoothMemoryPressure) { if (kSmoothMemoryPressure) {
pressure = pressure_tracker_.AddSampleAndGetEstimate(pressure); pressure_info.pressure_control_value =
pressure_tracker_.AddSampleAndGetControlValue(
pressure_info.instantaneous_pressure);
} else {
pressure_info.pressure_control_value =
std::min(pressure_info.instantaneous_pressure, 1.0);
} }
return std::make_pair(pressure, quota_size / 16); pressure_info.max_recommended_allocation_size = quota_size / 16;
return pressure_info;
} }
// //
@ -487,7 +490,7 @@ BasicMemoryQuota::InstantaneousPressureAndMaxRecommendedAllocationSize() {
namespace memory_quota_detail { namespace memory_quota_detail {
double PressureTracker::AddSampleAndGetEstimate(double sample) { double PressureTracker::AddSampleAndGetControlValue(double sample) {
static const double kSetPoint = static const double kSetPoint =
GPR_GLOBAL_CONFIG_GET(grpc_experimental_resource_quota_set_point) / 100.0; GPR_GLOBAL_CONFIG_GET(grpc_experimental_resource_quota_set_point) / 100.0;

@ -234,7 +234,7 @@ namespace memory_quota_detail {
// be) but to be eventually accurate. // be) but to be eventually accurate.
class PressureTracker { class PressureTracker {
public: public:
double AddSampleAndGetEstimate(double sample); double AddSampleAndGetControlValue(double sample);
private: private:
std::atomic<double> max_this_round_{0.0}; std::atomic<double> max_this_round_{0.0};
@ -252,6 +252,17 @@ class PressureTracker {
class BasicMemoryQuota final class BasicMemoryQuota final
: public std::enable_shared_from_this<BasicMemoryQuota> { : public std::enable_shared_from_this<BasicMemoryQuota> {
public: public:
// Data about current memory pressure.
struct PressureInfo {
// The current instantaneously measured memory pressure.
double instantaneous_pressure;
// A control value that can be used to scale buffer sizes up or down to
// adjust memory pressure to our target set point.
double pressure_control_value;
// Maximum recommended individual allocation size.
size_t max_recommended_allocation_size;
};
explicit BasicMemoryQuota(std::string name) : name_(std::move(name)) {} explicit BasicMemoryQuota(std::string name) : name_(std::move(name)) {}
// Start the reclamation activity. // Start the reclamation activity.
@ -272,8 +283,7 @@ class BasicMemoryQuota final
// Return some memory to the quota. // Return some memory to the quota.
void Return(size_t amount); void Return(size_t amount);
// Instantaneous memory pressure approximation. // Instantaneous memory pressure approximation.
std::pair<double, size_t> PressureInfo GetPressureInfo();
InstantaneousPressureAndMaxRecommendedAllocationSize();
// Get a reclamation queue // Get a reclamation queue
ReclaimerQueue* reclaimer_queue(size_t i) { return &reclaimers_[i]; } ReclaimerQueue* reclaimer_queue(size_t i) { return &reclaimers_[i]; }
@ -352,9 +362,8 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl {
void Shutdown() override; void Shutdown() override;
// Read the instantaneous memory pressure // Read the instantaneous memory pressure
double InstantaneousPressure() const { BasicMemoryQuota::PressureInfo GetPressureInfo() const {
return memory_quota_->InstantaneousPressureAndMaxRecommendedAllocationSize() return memory_quota_->GetPressureInfo();
.first;
} }
// Name of this allocator // Name of this allocator
@ -434,8 +443,8 @@ class MemoryOwner final : public MemoryAllocator {
} }
// Instantaneous memory pressure in the underlying quota. // Instantaneous memory pressure in the underlying quota.
double InstantaneousPressure() const { BasicMemoryQuota::PressureInfo GetPressureInfo() const {
return impl()->InstantaneousPressure(); return impl()->GetPressureInfo();
} }
template <typename T, typename... Args> template <typename T, typename... Args>
@ -485,8 +494,8 @@ class MemoryQuota final
// Return true if the instantaneous memory pressure is high. // Return true if the instantaneous memory pressure is high.
bool IsMemoryPressureHigh() const { bool IsMemoryPressureHigh() const {
static constexpr double kMemoryPressureHighThreshold = 0.9; static constexpr double kMemoryPressureHighThreshold = 0.9;
return memory_quota_->InstantaneousPressureAndMaxRecommendedAllocationSize() return memory_quota_->GetPressureInfo().pressure_control_value >
.first > kMemoryPressureHighThreshold; kMemoryPressureHighThreshold;
} }
private: private:

@ -187,14 +187,14 @@ TEST(PressureTrackerTest, Decays) {
{ {
ExecCtx exec_ctx; ExecCtx exec_ctx;
exec_ctx.TestOnlySetNow(step_time()); exec_ctx.TestOnlySetNow(step_time());
EXPECT_EQ(tracker.AddSampleAndGetEstimate(0.0), 0.0); EXPECT_EQ(tracker.AddSampleAndGetControlValue(0.0), 0.0);
} }
// If memory pressure goes to 100% or higher, we should *immediately* snap to // If memory pressure goes to 100% or higher, we should *immediately* snap to
// reporting 100%. // reporting 100%.
{ {
ExecCtx exec_ctx; ExecCtx exec_ctx;
exec_ctx.TestOnlySetNow(step_time()); exec_ctx.TestOnlySetNow(step_time());
EXPECT_EQ(tracker.AddSampleAndGetEstimate(1.0), 1.0); EXPECT_EQ(tracker.AddSampleAndGetControlValue(1.0), 1.0);
} }
// Once memory pressure reduces, we should *eventually* get back to reporting // Once memory pressure reduces, we should *eventually* get back to reporting
// close to zero, and monotonically decrease. // close to zero, and monotonically decrease.
@ -203,7 +203,7 @@ TEST(PressureTrackerTest, Decays) {
while (true) { while (true) {
ExecCtx exec_ctx; ExecCtx exec_ctx;
exec_ctx.TestOnlySetNow(step_time()); exec_ctx.TestOnlySetNow(step_time());
double new_reported = tracker.AddSampleAndGetEstimate(0.0); double new_reported = tracker.AddSampleAndGetControlValue(0.0);
EXPECT_LE(new_reported, last_reported); EXPECT_LE(new_reported, last_reported);
last_reported = new_reported; last_reported = new_reported;
if (new_reported < 0.1) break; if (new_reported < 0.1) break;
@ -223,7 +223,7 @@ TEST(PressureTrackerTest, ManyThreads) {
std::uniform_real_distribution<double> dist(0.0, 1.0); std::uniform_real_distribution<double> dist(0.0, 1.0);
while (!shutdown.load(std::memory_order_relaxed)) { while (!shutdown.load(std::memory_order_relaxed)) {
ExecCtx exec_ctx; ExecCtx exec_ctx;
tracker.AddSampleAndGetEstimate(dist(rng)); tracker.AddSampleAndGetControlValue(dist(rng));
} }
}); });
} }

Loading…
Cancel
Save