[chttp+resource_quota] Add an experiment to quickly reject requests under very high memory pressure (#37927)

Currently the destructive reclaimer single threaded cancels existing requests, but we admit new rpcs on every channel (to be eventually cancelled, probably).

We've got evidence that this (shockingly) doesn't scale and senders can easily overwhelm and oom a server.

Instead under this experiment now we'll always reject new work under very high load, and allow the reclaimer to mop up any remaining work to get back to within bounds.

Closes #37927

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37927 from ctiller:fast_reject 835726473a
PiperOrigin-RevId: 686553599
pull/37936/head
Craig Tiller 4 months ago committed by Copybara-Service
parent 3e87a31ce0
commit bd4792dab6
  1. 1
      bazel/experiments.bzl
  2. 14
      src/core/ext/transport/chttp2/transport/parsing.cc
  3. 18
      src/core/lib/experiments/experiments.cc
  4. 8
      src/core/lib/experiments/experiments.h
  5. 7
      src/core/lib/experiments/experiments.yaml
  6. 12
      src/core/lib/resource_quota/memory_quota.h
  7. 6
      src/core/telemetry/stats_data.cc
  8. 6
      src/core/telemetry/stats_data.h
  9. 2
      src/core/telemetry/stats_data.yaml

@ -31,6 +31,7 @@ EXPERIMENT_ENABLES = {
"multiping": "multiping",
"pick_first_new": "pick_first_new",
"promise_based_inproc_transport": "promise_based_inproc_transport",
"rq_fast_reject": "rq_fast_reject",
"schedule_cancellation_over_write": "schedule_cancellation_over_write",
"server_privacy": "server_privacy",
"tcp_frame_size_tuning": "tcp_frame_size_tuning",

@ -68,6 +68,8 @@
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/telemetry/call_tracer.h"
#include "src/core/telemetry/stats.h"
#include "src/core/telemetry/stats_data.h"
#include "src/core/util/random_early_detection.h"
#include "src/core/util/ref_counted_ptr.h"
#include "src/core/util/status_helper.h"
@ -642,6 +644,18 @@ static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t,
GRPC_HTTP2_REFUSED_STREAM, nullptr));
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
return init_header_skip_frame_parser(t, priority_type, is_eoh);
} else if (grpc_core::IsRqFastRejectEnabled() &&
GPR_UNLIKELY(t->memory_owner.IsMemoryPressureHigh())) {
// We have more streams allocated than we'd like, so apply some pushback
// by refusing this stream.
grpc_core::global_stats().IncrementRqCallsRejected();
++t->num_pending_induced_frames;
grpc_slice_buffer_add(
&t->qbuf,
grpc_chttp2_rst_stream_create(t->incoming_stream_id,
GRPC_HTTP2_ENHANCE_YOUR_CALM, nullptr));
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
return init_header_skip_frame_parser(t, priority_type, is_eoh);
} else if (GPR_UNLIKELY(
t->max_concurrent_streams_overload_protection &&
t->streams_allocated.load(std::memory_order_relaxed) >

@ -68,6 +68,10 @@ const char* const additional_constraints_pick_first_new = "{}";
const char* const description_promise_based_inproc_transport =
"Use promises for the in-process transport.";
const char* const additional_constraints_promise_based_inproc_transport = "{}";
const char* const description_rq_fast_reject =
"Resource quota rejects requests immediately (before allocating the "
"request structure) under very high memory pressure.";
const char* const additional_constraints_rq_fast_reject = "{}";
const char* const description_schedule_cancellation_over_write =
"Allow cancellation op to be scheduled over a write";
const char* const additional_constraints_schedule_cancellation_over_write =
@ -137,6 +141,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
description_promise_based_inproc_transport,
additional_constraints_promise_based_inproc_transport, nullptr, 0, false,
false},
{"rq_fast_reject", description_rq_fast_reject,
additional_constraints_rq_fast_reject, nullptr, 0, false, true},
{"schedule_cancellation_over_write",
description_schedule_cancellation_over_write,
additional_constraints_schedule_cancellation_over_write, nullptr, 0, false,
@ -209,6 +215,10 @@ const char* const additional_constraints_pick_first_new = "{}";
const char* const description_promise_based_inproc_transport =
"Use promises for the in-process transport.";
const char* const additional_constraints_promise_based_inproc_transport = "{}";
const char* const description_rq_fast_reject =
"Resource quota rejects requests immediately (before allocating the "
"request structure) under very high memory pressure.";
const char* const additional_constraints_rq_fast_reject = "{}";
const char* const description_schedule_cancellation_over_write =
"Allow cancellation op to be scheduled over a write";
const char* const additional_constraints_schedule_cancellation_over_write =
@ -278,6 +288,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
description_promise_based_inproc_transport,
additional_constraints_promise_based_inproc_transport, nullptr, 0, false,
false},
{"rq_fast_reject", description_rq_fast_reject,
additional_constraints_rq_fast_reject, nullptr, 0, false, true},
{"schedule_cancellation_over_write",
description_schedule_cancellation_over_write,
additional_constraints_schedule_cancellation_over_write, nullptr, 0, false,
@ -350,6 +362,10 @@ const char* const additional_constraints_pick_first_new = "{}";
const char* const description_promise_based_inproc_transport =
"Use promises for the in-process transport.";
const char* const additional_constraints_promise_based_inproc_transport = "{}";
const char* const description_rq_fast_reject =
"Resource quota rejects requests immediately (before allocating the "
"request structure) under very high memory pressure.";
const char* const additional_constraints_rq_fast_reject = "{}";
const char* const description_schedule_cancellation_over_write =
"Allow cancellation op to be scheduled over a write";
const char* const additional_constraints_schedule_cancellation_over_write =
@ -419,6 +435,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
description_promise_based_inproc_transport,
additional_constraints_promise_based_inproc_transport, nullptr, 0, false,
false},
{"rq_fast_reject", description_rq_fast_reject,
additional_constraints_rq_fast_reject, nullptr, 0, false, true},
{"schedule_cancellation_over_write",
description_schedule_cancellation_over_write,
additional_constraints_schedule_cancellation_over_write, nullptr, 0, false,

@ -75,6 +75,7 @@ inline bool IsMultipingEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW
inline bool IsPickFirstNewEnabled() { return true; }
inline bool IsPromiseBasedInprocTransportEnabled() { return false; }
inline bool IsRqFastRejectEnabled() { return false; }
inline bool IsScheduleCancellationOverWriteEnabled() { return false; }
inline bool IsServerPrivacyEnabled() { return false; }
inline bool IsTcpFrameSizeTuningEnabled() { return false; }
@ -108,6 +109,7 @@ inline bool IsMultipingEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW
inline bool IsPickFirstNewEnabled() { return true; }
inline bool IsPromiseBasedInprocTransportEnabled() { return false; }
inline bool IsRqFastRejectEnabled() { return false; }
inline bool IsScheduleCancellationOverWriteEnabled() { return false; }
inline bool IsServerPrivacyEnabled() { return false; }
inline bool IsTcpFrameSizeTuningEnabled() { return false; }
@ -141,6 +143,7 @@ inline bool IsMultipingEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW
inline bool IsPickFirstNewEnabled() { return true; }
inline bool IsPromiseBasedInprocTransportEnabled() { return false; }
inline bool IsRqFastRejectEnabled() { return false; }
inline bool IsScheduleCancellationOverWriteEnabled() { return false; }
inline bool IsServerPrivacyEnabled() { return false; }
inline bool IsTcpFrameSizeTuningEnabled() { return false; }
@ -170,6 +173,7 @@ enum ExperimentIds {
kExperimentIdMultiping,
kExperimentIdPickFirstNew,
kExperimentIdPromiseBasedInprocTransport,
kExperimentIdRqFastReject,
kExperimentIdScheduleCancellationOverWrite,
kExperimentIdServerPrivacy,
kExperimentIdTcpFrameSizeTuning,
@ -236,6 +240,10 @@ inline bool IsPickFirstNewEnabled() {
inline bool IsPromiseBasedInprocTransportEnabled() {
return IsExperimentEnabled<kExperimentIdPromiseBasedInprocTransport>();
}
#define GRPC_EXPERIMENT_IS_INCLUDED_RQ_FAST_REJECT
inline bool IsRqFastRejectEnabled() {
return IsExperimentEnabled<kExperimentIdRqFastReject>();
}
#define GRPC_EXPERIMENT_IS_INCLUDED_SCHEDULE_CANCELLATION_OVER_WRITE
inline bool IsScheduleCancellationOverWriteEnabled() {
return IsExperimentEnabled<kExperimentIdScheduleCancellationOverWrite>();

@ -124,6 +124,13 @@
owner: ctiller@google.com
test_tags: []
allow_in_fuzzing_config: false # experiment currently crashes if enabled
- name: rq_fast_reject
description:
Resource quota rejects requests immediately (before allocating the request
structure) under very high memory pressure.
expiry: 2025/06/06
owner: ctiller@google.com
test_tags: []
- name: schedule_cancellation_over_write
description: Allow cancellation op to be scheduled over a write
expiry: 2024/12/01

@ -530,6 +530,14 @@ class MemoryOwner final : public MemoryAllocator {
// Is this object valid (ie has not been moved out of or reset)
bool is_valid() const { return impl() != nullptr; }
static double memory_pressure_high_threshold() { return 0.99; }
// Return true if the controlled memory pressure is high.
bool IsMemoryPressureHigh() const {
return GetPressureInfo().pressure_control_value >
memory_pressure_high_threshold();
}
private:
const GrpcMemoryAllocatorImpl* impl() const {
return static_cast<const GrpcMemoryAllocatorImpl*>(get_internal_impl_ptr());
@ -563,11 +571,9 @@ class MemoryQuota final
// Resize the quota to new_size.
void SetSize(size_t new_size) { memory_quota_->SetSize(new_size); }
// Return true if the controlled memory pressure is high.
bool IsMemoryPressureHigh() const {
static constexpr double kMemoryPressureHighThreshold = 0.99;
return memory_quota_->GetPressureInfo().pressure_control_value >
kMemoryPressureHighThreshold;
MemoryOwner::memory_pressure_high_threshold();
}
private:

@ -129,6 +129,7 @@ const absl::string_view
"insecure_connections_created",
"rq_connections_dropped",
"rq_calls_dropped",
"rq_calls_rejected",
"syscall_write",
"syscall_read",
"tcp_read_alloc_8k",
@ -168,6 +169,7 @@ const absl::string_view GlobalStats::counter_doc[static_cast<int>(
"Number of insecure connections created",
"Number of connections dropped due to resource quota exceeded",
"Number of calls dropped due to resource quota exceeded",
"Number of calls rejected (never started) due to resource quota exceeded",
"Number of write syscalls (or equivalent - eg sendmsg) made by this "
"process",
"Number of read syscalls (or equivalent - eg recvmsg) made by this process",
@ -469,6 +471,7 @@ GlobalStats::GlobalStats()
insecure_connections_created{0},
rq_connections_dropped{0},
rq_calls_dropped{0},
rq_calls_rejected{0},
syscall_write{0},
syscall_read{0},
tcp_read_alloc_8k{0},
@ -610,6 +613,8 @@ std::unique_ptr<GlobalStats> GlobalStatsCollector::Collect() const {
data.rq_connections_dropped.load(std::memory_order_relaxed);
result->rq_calls_dropped +=
data.rq_calls_dropped.load(std::memory_order_relaxed);
result->rq_calls_rejected +=
data.rq_calls_rejected.load(std::memory_order_relaxed);
result->syscall_write += data.syscall_write.load(std::memory_order_relaxed);
result->syscall_read += data.syscall_read.load(std::memory_order_relaxed);
result->tcp_read_alloc_8k +=
@ -728,6 +733,7 @@ std::unique_ptr<GlobalStats> GlobalStats::Diff(const GlobalStats& other) const {
result->rq_connections_dropped =
rq_connections_dropped - other.rq_connections_dropped;
result->rq_calls_dropped = rq_calls_dropped - other.rq_calls_dropped;
result->rq_calls_rejected = rq_calls_rejected - other.rq_calls_rejected;
result->syscall_write = syscall_write - other.syscall_write;
result->syscall_read = syscall_read - other.syscall_read;
result->tcp_read_alloc_8k = tcp_read_alloc_8k - other.tcp_read_alloc_8k;

@ -206,6 +206,7 @@ struct GlobalStats {
kInsecureConnectionsCreated,
kRqConnectionsDropped,
kRqCallsDropped,
kRqCallsRejected,
kSyscallWrite,
kSyscallRead,
kTcpReadAlloc8k,
@ -285,6 +286,7 @@ struct GlobalStats {
uint64_t insecure_connections_created;
uint64_t rq_connections_dropped;
uint64_t rq_calls_dropped;
uint64_t rq_calls_rejected;
uint64_t syscall_write;
uint64_t syscall_read;
uint64_t tcp_read_alloc_8k;
@ -382,6 +384,9 @@ class GlobalStatsCollector {
void IncrementRqCallsDropped() {
data_.this_cpu().rq_calls_dropped.fetch_add(1, std::memory_order_relaxed);
}
void IncrementRqCallsRejected() {
data_.this_cpu().rq_calls_rejected.fetch_add(1, std::memory_order_relaxed);
}
void IncrementSyscallWrite() {
data_.this_cpu().syscall_write.fetch_add(1, std::memory_order_relaxed);
}
@ -573,6 +578,7 @@ class GlobalStatsCollector {
std::atomic<uint64_t> insecure_connections_created{0};
std::atomic<uint64_t> rq_connections_dropped{0};
std::atomic<uint64_t> rq_calls_dropped{0};
std::atomic<uint64_t> rq_calls_rejected{0};
std::atomic<uint64_t> syscall_write{0};
std::atomic<uint64_t> syscall_read{0};
std::atomic<uint64_t> tcp_read_alloc_8k{0};

@ -37,6 +37,8 @@
doc: Number of connections dropped due to resource quota exceeded
- counter: rq_calls_dropped
doc: Number of calls dropped due to resource quota exceeded
- counter: rq_calls_rejected
doc: Number of calls rejected (never started) due to resource quota exceeded
# tcp
- counter: syscall_write
doc: Number of write syscalls (or equivalent - eg sendmsg) made by this process

Loading…
Cancel
Save