From bd4792dab6de97fa35821ed5716a8bb51d998c7f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 16 Oct 2024 10:20:21 -0700 Subject: [PATCH] [chttp+resource_quota] Add an experiment to quickly reject requests under very high memory pressure (#37927) Currently the destructive reclaimer single threaded cancels existing requests, but we admit new rpcs on every channel (to be eventually cancelled, probably). We've got evidence that this (shockingly) doesn't scale and senders can easily overwhelm and oom a server. Instead under this experiment now we'll always reject new work under very high load, and allow the reclaimer to mop up any remaining work to get back to within bounds. Closes #37927 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37927 from ctiller:fast_reject 835726473aae8023e9071e67b1b1ac5daa41ebb5 PiperOrigin-RevId: 686553599 --- bazel/experiments.bzl | 1 + .../ext/transport/chttp2/transport/parsing.cc | 14 ++++++++++++++ src/core/lib/experiments/experiments.cc | 18 ++++++++++++++++++ src/core/lib/experiments/experiments.h | 8 ++++++++ src/core/lib/experiments/experiments.yaml | 7 +++++++ src/core/lib/resource_quota/memory_quota.h | 12 +++++++++--- src/core/telemetry/stats_data.cc | 6 ++++++ src/core/telemetry/stats_data.h | 6 ++++++ src/core/telemetry/stats_data.yaml | 2 ++ 9 files changed, 71 insertions(+), 3 deletions(-) diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl index f9035a02199..635160b0657 100644 --- a/bazel/experiments.bzl +++ b/bazel/experiments.bzl @@ -31,6 +31,7 @@ EXPERIMENT_ENABLES = { "multiping": "multiping", "pick_first_new": "pick_first_new", "promise_based_inproc_transport": "promise_based_inproc_transport", + "rq_fast_reject": "rq_fast_reject", "schedule_cancellation_over_write": "schedule_cancellation_over_write", "server_privacy": "server_privacy", "tcp_frame_size_tuning": "tcp_frame_size_tuning", diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 7fa290db466..f1c7e07a217 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -68,6 +68,8 @@ #include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/transport.h" #include "src/core/telemetry/call_tracer.h" +#include "src/core/telemetry/stats.h" +#include "src/core/telemetry/stats_data.h" #include "src/core/util/random_early_detection.h" #include "src/core/util/ref_counted_ptr.h" #include "src/core/util/status_helper.h" @@ -642,6 +644,18 @@ static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t, GRPC_HTTP2_REFUSED_STREAM, nullptr)); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM); return init_header_skip_frame_parser(t, priority_type, is_eoh); + } else if (grpc_core::IsRqFastRejectEnabled() && + GPR_UNLIKELY(t->memory_owner.IsMemoryPressureHigh())) { + // We have more streams allocated than we'd like, so apply some pushback + // by refusing this stream. + grpc_core::global_stats().IncrementRqCallsRejected(); + ++t->num_pending_induced_frames; + grpc_slice_buffer_add( + &t->qbuf, + grpc_chttp2_rst_stream_create(t->incoming_stream_id, + GRPC_HTTP2_ENHANCE_YOUR_CALM, nullptr)); + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM); + return init_header_skip_frame_parser(t, priority_type, is_eoh); } else if (GPR_UNLIKELY( t->max_concurrent_streams_overload_protection && t->streams_allocated.load(std::memory_order_relaxed) > diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index 664498e252d..4c542dd2310 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -68,6 +68,10 @@ const char* const additional_constraints_pick_first_new = "{}"; const char* const description_promise_based_inproc_transport = "Use promises for the in-process transport."; const char* const additional_constraints_promise_based_inproc_transport = "{}"; +const char* const description_rq_fast_reject = + "Resource quota rejects requests immediately (before allocating the " + "request structure) under very high memory pressure."; +const char* const additional_constraints_rq_fast_reject = "{}"; const char* const description_schedule_cancellation_over_write = "Allow cancellation op to be scheduled over a write"; const char* const additional_constraints_schedule_cancellation_over_write = @@ -137,6 +141,8 @@ const ExperimentMetadata g_experiment_metadata[] = { description_promise_based_inproc_transport, additional_constraints_promise_based_inproc_transport, nullptr, 0, false, false}, + {"rq_fast_reject", description_rq_fast_reject, + additional_constraints_rq_fast_reject, nullptr, 0, false, true}, {"schedule_cancellation_over_write", description_schedule_cancellation_over_write, additional_constraints_schedule_cancellation_over_write, nullptr, 0, false, @@ -209,6 +215,10 @@ const char* const additional_constraints_pick_first_new = "{}"; const char* const description_promise_based_inproc_transport = "Use promises for the in-process transport."; const char* const additional_constraints_promise_based_inproc_transport = "{}"; +const char* const description_rq_fast_reject = + "Resource quota rejects requests immediately (before allocating the " + "request structure) under very high memory pressure."; +const char* const additional_constraints_rq_fast_reject = "{}"; const char* const description_schedule_cancellation_over_write = "Allow cancellation op to be scheduled over a write"; const char* const additional_constraints_schedule_cancellation_over_write = @@ -278,6 +288,8 @@ const ExperimentMetadata g_experiment_metadata[] = { description_promise_based_inproc_transport, additional_constraints_promise_based_inproc_transport, nullptr, 0, false, false}, + {"rq_fast_reject", description_rq_fast_reject, + additional_constraints_rq_fast_reject, nullptr, 0, false, true}, {"schedule_cancellation_over_write", description_schedule_cancellation_over_write, additional_constraints_schedule_cancellation_over_write, nullptr, 0, false, @@ -350,6 +362,10 @@ const char* const additional_constraints_pick_first_new = "{}"; const char* const description_promise_based_inproc_transport = "Use promises for the in-process transport."; const char* const additional_constraints_promise_based_inproc_transport = "{}"; +const char* const description_rq_fast_reject = + "Resource quota rejects requests immediately (before allocating the " + "request structure) under very high memory pressure."; +const char* const additional_constraints_rq_fast_reject = "{}"; const char* const description_schedule_cancellation_over_write = "Allow cancellation op to be scheduled over a write"; const char* const additional_constraints_schedule_cancellation_over_write = @@ -419,6 +435,8 @@ const ExperimentMetadata g_experiment_metadata[] = { description_promise_based_inproc_transport, additional_constraints_promise_based_inproc_transport, nullptr, 0, false, false}, + {"rq_fast_reject", description_rq_fast_reject, + additional_constraints_rq_fast_reject, nullptr, 0, false, true}, {"schedule_cancellation_over_write", description_schedule_cancellation_over_write, additional_constraints_schedule_cancellation_over_write, nullptr, 0, false, diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index f39ff23df80..56b2c65459a 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -75,6 +75,7 @@ inline bool IsMultipingEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW inline bool IsPickFirstNewEnabled() { return true; } inline bool IsPromiseBasedInprocTransportEnabled() { return false; } +inline bool IsRqFastRejectEnabled() { return false; } inline bool IsScheduleCancellationOverWriteEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; } @@ -108,6 +109,7 @@ inline bool IsMultipingEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW inline bool IsPickFirstNewEnabled() { return true; } inline bool IsPromiseBasedInprocTransportEnabled() { return false; } +inline bool IsRqFastRejectEnabled() { return false; } inline bool IsScheduleCancellationOverWriteEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; } @@ -141,6 +143,7 @@ inline bool IsMultipingEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW inline bool IsPickFirstNewEnabled() { return true; } inline bool IsPromiseBasedInprocTransportEnabled() { return false; } +inline bool IsRqFastRejectEnabled() { return false; } inline bool IsScheduleCancellationOverWriteEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; } @@ -170,6 +173,7 @@ enum ExperimentIds { kExperimentIdMultiping, kExperimentIdPickFirstNew, kExperimentIdPromiseBasedInprocTransport, + kExperimentIdRqFastReject, kExperimentIdScheduleCancellationOverWrite, kExperimentIdServerPrivacy, kExperimentIdTcpFrameSizeTuning, @@ -236,6 +240,10 @@ inline bool IsPickFirstNewEnabled() { inline bool IsPromiseBasedInprocTransportEnabled() { return IsExperimentEnabled(); } +#define GRPC_EXPERIMENT_IS_INCLUDED_RQ_FAST_REJECT +inline bool IsRqFastRejectEnabled() { + return IsExperimentEnabled(); +} #define GRPC_EXPERIMENT_IS_INCLUDED_SCHEDULE_CANCELLATION_OVER_WRITE inline bool IsScheduleCancellationOverWriteEnabled() { return IsExperimentEnabled(); diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index 85a704b1b9d..c4f4e8983fb 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -124,6 +124,13 @@ owner: ctiller@google.com test_tags: [] allow_in_fuzzing_config: false # experiment currently crashes if enabled +- name: rq_fast_reject + description: + Resource quota rejects requests immediately (before allocating the request + structure) under very high memory pressure. + expiry: 2025/06/06 + owner: ctiller@google.com + test_tags: [] - name: schedule_cancellation_over_write description: Allow cancellation op to be scheduled over a write expiry: 2024/12/01 diff --git a/src/core/lib/resource_quota/memory_quota.h b/src/core/lib/resource_quota/memory_quota.h index 85395b448d3..f53dd6d5ac2 100644 --- a/src/core/lib/resource_quota/memory_quota.h +++ b/src/core/lib/resource_quota/memory_quota.h @@ -530,6 +530,14 @@ class MemoryOwner final : public MemoryAllocator { // Is this object valid (ie has not been moved out of or reset) bool is_valid() const { return impl() != nullptr; } + static double memory_pressure_high_threshold() { return 0.99; } + + // Return true if the controlled memory pressure is high. + bool IsMemoryPressureHigh() const { + return GetPressureInfo().pressure_control_value > + memory_pressure_high_threshold(); + } + private: const GrpcMemoryAllocatorImpl* impl() const { return static_cast(get_internal_impl_ptr()); @@ -563,11 +571,9 @@ class MemoryQuota final // Resize the quota to new_size. void SetSize(size_t new_size) { memory_quota_->SetSize(new_size); } - // Return true if the controlled memory pressure is high. bool IsMemoryPressureHigh() const { - static constexpr double kMemoryPressureHighThreshold = 0.99; return memory_quota_->GetPressureInfo().pressure_control_value > - kMemoryPressureHighThreshold; + MemoryOwner::memory_pressure_high_threshold(); } private: diff --git a/src/core/telemetry/stats_data.cc b/src/core/telemetry/stats_data.cc index 1b59c68f9c5..6992bcc7587 100644 --- a/src/core/telemetry/stats_data.cc +++ b/src/core/telemetry/stats_data.cc @@ -129,6 +129,7 @@ const absl::string_view "insecure_connections_created", "rq_connections_dropped", "rq_calls_dropped", + "rq_calls_rejected", "syscall_write", "syscall_read", "tcp_read_alloc_8k", @@ -168,6 +169,7 @@ const absl::string_view GlobalStats::counter_doc[static_cast( "Number of insecure connections created", "Number of connections dropped due to resource quota exceeded", "Number of calls dropped due to resource quota exceeded", + "Number of calls rejected (never started) due to resource quota exceeded", "Number of write syscalls (or equivalent - eg sendmsg) made by this " "process", "Number of read syscalls (or equivalent - eg recvmsg) made by this process", @@ -469,6 +471,7 @@ GlobalStats::GlobalStats() insecure_connections_created{0}, rq_connections_dropped{0}, rq_calls_dropped{0}, + rq_calls_rejected{0}, syscall_write{0}, syscall_read{0}, tcp_read_alloc_8k{0}, @@ -610,6 +613,8 @@ std::unique_ptr GlobalStatsCollector::Collect() const { data.rq_connections_dropped.load(std::memory_order_relaxed); result->rq_calls_dropped += data.rq_calls_dropped.load(std::memory_order_relaxed); + result->rq_calls_rejected += + data.rq_calls_rejected.load(std::memory_order_relaxed); result->syscall_write += data.syscall_write.load(std::memory_order_relaxed); result->syscall_read += data.syscall_read.load(std::memory_order_relaxed); result->tcp_read_alloc_8k += @@ -728,6 +733,7 @@ std::unique_ptr GlobalStats::Diff(const GlobalStats& other) const { result->rq_connections_dropped = rq_connections_dropped - other.rq_connections_dropped; result->rq_calls_dropped = rq_calls_dropped - other.rq_calls_dropped; + result->rq_calls_rejected = rq_calls_rejected - other.rq_calls_rejected; result->syscall_write = syscall_write - other.syscall_write; result->syscall_read = syscall_read - other.syscall_read; result->tcp_read_alloc_8k = tcp_read_alloc_8k - other.tcp_read_alloc_8k; diff --git a/src/core/telemetry/stats_data.h b/src/core/telemetry/stats_data.h index 9961302a868..f2ae015580f 100644 --- a/src/core/telemetry/stats_data.h +++ b/src/core/telemetry/stats_data.h @@ -206,6 +206,7 @@ struct GlobalStats { kInsecureConnectionsCreated, kRqConnectionsDropped, kRqCallsDropped, + kRqCallsRejected, kSyscallWrite, kSyscallRead, kTcpReadAlloc8k, @@ -285,6 +286,7 @@ struct GlobalStats { uint64_t insecure_connections_created; uint64_t rq_connections_dropped; uint64_t rq_calls_dropped; + uint64_t rq_calls_rejected; uint64_t syscall_write; uint64_t syscall_read; uint64_t tcp_read_alloc_8k; @@ -382,6 +384,9 @@ class GlobalStatsCollector { void IncrementRqCallsDropped() { data_.this_cpu().rq_calls_dropped.fetch_add(1, std::memory_order_relaxed); } + void IncrementRqCallsRejected() { + data_.this_cpu().rq_calls_rejected.fetch_add(1, std::memory_order_relaxed); + } void IncrementSyscallWrite() { data_.this_cpu().syscall_write.fetch_add(1, std::memory_order_relaxed); } @@ -573,6 +578,7 @@ class GlobalStatsCollector { std::atomic insecure_connections_created{0}; std::atomic rq_connections_dropped{0}; std::atomic rq_calls_dropped{0}; + std::atomic rq_calls_rejected{0}; std::atomic syscall_write{0}; std::atomic syscall_read{0}; std::atomic tcp_read_alloc_8k{0}; diff --git a/src/core/telemetry/stats_data.yaml b/src/core/telemetry/stats_data.yaml index c9580cded0b..ce85f00e508 100644 --- a/src/core/telemetry/stats_data.yaml +++ b/src/core/telemetry/stats_data.yaml @@ -37,6 +37,8 @@ doc: Number of connections dropped due to resource quota exceeded - counter: rq_calls_dropped doc: Number of calls dropped due to resource quota exceeded +- counter: rq_calls_rejected + doc: Number of calls rejected (never started) due to resource quota exceeded # tcp - counter: syscall_write doc: Number of write syscalls (or equivalent - eg sendmsg) made by this process