diff --git a/BUILD b/BUILD index b93d89d0892..8fa1d326cf1 100644 --- a/BUILD +++ b/BUILD @@ -1493,13 +1493,13 @@ grpc_cc_library( "absl/functional:function_ref", "absl/hash", "absl/meta:type_traits", + "absl/random", "absl/status", "absl/status:statusor", "absl/strings", "absl/strings:str_format", "absl/time", "absl/types:optional", - "absl/types:variant", "absl/utility", "madler_zlib", ], @@ -1578,7 +1578,6 @@ grpc_cc_library( "//src/core:latch", "//src/core:loop", "//src/core:map", - "//src/core:match", "//src/core:memory_quota", "//src/core:metadata_compression_traits", "//src/core:no_destruct", @@ -1590,6 +1589,7 @@ grpc_cc_library( "//src/core:posix_event_engine_base_hdrs", "//src/core:promise_status", "//src/core:race", + "//src/core:random_early_detection", "//src/core:ref_counted", "//src/core:ref_counted_string", "//src/core:resolved_address", diff --git a/CMakeLists.txt b/CMakeLists.txt index 278adc9fb98..9e07ad89bba 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4874,6 +4874,7 @@ add_library(grpc_authorization_provider src/core/lib/address_utils/parse_address.cc src/core/lib/address_utils/sockaddr_utils.cc src/core/lib/backoff/backoff.cc + src/core/lib/backoff/random_early_detection.cc src/core/lib/channel/call_tracer.cc src/core/lib/channel/channel_args.cc src/core/lib/channel/channel_args_preconditioning.cc @@ -5176,6 +5177,8 @@ target_link_libraries(grpc_authorization_provider absl::function_ref absl::hash absl::type_traits + absl::random_bit_gen_ref + absl::random_distributions absl::statusor absl::span absl::utility diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index deba1f3005f..b637abfddd2 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -4124,6 +4124,7 @@ libs: - src/core/lib/address_utils/sockaddr_utils.h - src/core/lib/avl/avl.h - src/core/lib/backoff/backoff.h + - src/core/lib/backoff/random_early_detection.h - src/core/lib/channel/call_finalization.h - src/core/lib/channel/call_tracer.h - src/core/lib/channel/channel_args.h @@ -4444,6 +4445,7 @@ libs: - src/core/lib/address_utils/parse_address.cc - src/core/lib/address_utils/sockaddr_utils.cc - src/core/lib/backoff/backoff.cc + - src/core/lib/backoff/random_early_detection.cc - src/core/lib/channel/call_tracer.cc - src/core/lib/channel/channel_args.cc - src/core/lib/channel/channel_args_preconditioning.cc @@ -4711,6 +4713,8 @@ libs: - absl/functional:function_ref - absl/hash:hash - absl/meta:type_traits + - absl/random:bit_gen_ref + - absl/random:distributions - absl/status:statusor - absl/types:span - absl/utility:utility diff --git a/grpc.gyp b/grpc.gyp index 1eb5a1f7cca..9d76638ef4b 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -1976,6 +1976,8 @@ 'absl/functional:function_ref', 'absl/hash:hash', 'absl/meta:type_traits', + 'absl/random:bit_gen_ref', + 'absl/random:distributions', 'absl/status:statusor', 'absl/types:span', 'absl/utility:utility', @@ -1992,6 +1994,7 @@ 'src/core/lib/address_utils/parse_address.cc', 'src/core/lib/address_utils/sockaddr_utils.cc', 'src/core/lib/backoff/backoff.cc', + 'src/core/lib/backoff/random_early_detection.cc', 'src/core/lib/channel/call_tracer.cc', 'src/core/lib/channel/channel_args.cc', 'src/core/lib/channel/channel_args_preconditioning.cc', diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index 0a7a70fe7d1..97d72147923 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -95,6 +95,14 @@ const char* const description_peer_state_based_framing = "on the peer's memory pressure which is reflected in its max http2 frame " "size."; const char* const additional_constraints_peer_state_based_framing = "{}"; +const char* const description_pending_queue_cap = + "In the sync & async apis (but not the callback api), cap the number of " + "received but unrequested requests in the server for each call type. A " + "received message is one that was read from the wire on the server. A " + "requested message is one explicitly requested by the application using " + "grpc_server_request_call or grpc_server_request_registered_call (or their " + "wrappers in the C++ API)."; +const char* const additional_constraints_pending_queue_cap = "{}"; const char* const description_pick_first_happy_eyeballs = "Use Happy Eyeballs in pick_first."; const char* const additional_constraints_pick_first_happy_eyeballs = "{}"; @@ -248,6 +256,8 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_overload_protection, true, true}, {"peer_state_based_framing", description_peer_state_based_framing, additional_constraints_peer_state_based_framing, false, true}, + {"pending_queue_cap", description_pending_queue_cap, + additional_constraints_pending_queue_cap, true, true}, {"pick_first_happy_eyeballs", description_pick_first_happy_eyeballs, additional_constraints_pick_first_happy_eyeballs, true, true}, {"ping_on_rst_stream", description_ping_on_rst_stream, @@ -381,6 +391,14 @@ const char* const description_peer_state_based_framing = "on the peer's memory pressure which is reflected in its max http2 frame " "size."; const char* const additional_constraints_peer_state_based_framing = "{}"; +const char* const description_pending_queue_cap = + "In the sync & async apis (but not the callback api), cap the number of " + "received but unrequested requests in the server for each call type. A " + "received message is one that was read from the wire on the server. A " + "requested message is one explicitly requested by the application using " + "grpc_server_request_call or grpc_server_request_registered_call (or their " + "wrappers in the C++ API)."; +const char* const additional_constraints_pending_queue_cap = "{}"; const char* const description_pick_first_happy_eyeballs = "Use Happy Eyeballs in pick_first."; const char* const additional_constraints_pick_first_happy_eyeballs = "{}"; @@ -534,6 +552,8 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_overload_protection, true, true}, {"peer_state_based_framing", description_peer_state_based_framing, additional_constraints_peer_state_based_framing, false, true}, + {"pending_queue_cap", description_pending_queue_cap, + additional_constraints_pending_queue_cap, true, true}, {"pick_first_happy_eyeballs", description_pick_first_happy_eyeballs, additional_constraints_pick_first_happy_eyeballs, true, true}, {"ping_on_rst_stream", description_ping_on_rst_stream, @@ -667,6 +687,14 @@ const char* const description_peer_state_based_framing = "on the peer's memory pressure which is reflected in its max http2 frame " "size."; const char* const additional_constraints_peer_state_based_framing = "{}"; +const char* const description_pending_queue_cap = + "In the sync & async apis (but not the callback api), cap the number of " + "received but unrequested requests in the server for each call type. A " + "received message is one that was read from the wire on the server. A " + "requested message is one explicitly requested by the application using " + "grpc_server_request_call or grpc_server_request_registered_call (or their " + "wrappers in the C++ API)."; +const char* const additional_constraints_pending_queue_cap = "{}"; const char* const description_pick_first_happy_eyeballs = "Use Happy Eyeballs in pick_first."; const char* const additional_constraints_pick_first_happy_eyeballs = "{}"; @@ -820,6 +848,8 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_overload_protection, true, true}, {"peer_state_based_framing", description_peer_state_based_framing, additional_constraints_peer_state_based_framing, false, true}, + {"pending_queue_cap", description_pending_queue_cap, + additional_constraints_pending_queue_cap, true, true}, {"pick_first_happy_eyeballs", description_pick_first_happy_eyeballs, additional_constraints_pick_first_happy_eyeballs, true, true}, {"ping_on_rst_stream", description_ping_on_rst_stream, diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index 3b6fbd31782..f90ae0f05fa 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -96,6 +96,8 @@ inline bool IsMultipingEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_OVERLOAD_PROTECTION inline bool IsOverloadProtectionEnabled() { return true; } inline bool IsPeerStateBasedFramingEnabled() { return false; } +#define GRPC_EXPERIMENT_IS_INCLUDED_PENDING_QUEUE_CAP +inline bool IsPendingQueueCapEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS inline bool IsPickFirstHappyEyeballsEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_PING_ON_RST_STREAM @@ -174,6 +176,8 @@ inline bool IsMultipingEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_OVERLOAD_PROTECTION inline bool IsOverloadProtectionEnabled() { return true; } inline bool IsPeerStateBasedFramingEnabled() { return false; } +#define GRPC_EXPERIMENT_IS_INCLUDED_PENDING_QUEUE_CAP +inline bool IsPendingQueueCapEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS inline bool IsPickFirstHappyEyeballsEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_PING_ON_RST_STREAM @@ -251,6 +255,8 @@ inline bool IsMultipingEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_OVERLOAD_PROTECTION inline bool IsOverloadProtectionEnabled() { return true; } inline bool IsPeerStateBasedFramingEnabled() { return false; } +#define GRPC_EXPERIMENT_IS_INCLUDED_PENDING_QUEUE_CAP +inline bool IsPendingQueueCapEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS inline bool IsPickFirstHappyEyeballsEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_PING_ON_RST_STREAM @@ -312,6 +318,7 @@ enum ExperimentIds { kExperimentIdMultiping, kExperimentIdOverloadProtection, kExperimentIdPeerStateBasedFraming, + kExperimentIdPendingQueueCap, kExperimentIdPickFirstHappyEyeballs, kExperimentIdPingOnRstStream, kExperimentIdPromiseBasedClientCall, @@ -424,6 +431,10 @@ inline bool IsOverloadProtectionEnabled() { inline bool IsPeerStateBasedFramingEnabled() { return IsExperimentEnabled(kExperimentIdPeerStateBasedFraming); } +#define GRPC_EXPERIMENT_IS_INCLUDED_PENDING_QUEUE_CAP +inline bool IsPendingQueueCapEnabled() { + return IsExperimentEnabled(kExperimentIdPendingQueueCap); +} #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS inline bool IsPickFirstHappyEyeballsEnabled() { return IsExperimentEnabled(kExperimentIdPickFirstHappyEyeballs); diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index 90d4a34cde8..bec9c44e645 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -165,6 +165,17 @@ expiry: 2024/01/01 owner: vigneshbabu@google.com test_tags: ["flow_control_test"] +- name: pending_queue_cap + description: + In the sync & async apis (but not the callback api), cap the number of + received but unrequested requests in the server for each call type. + A received message is one that was read from the wire on the server. + A requested message is one explicitly requested by the application using + grpc_server_request_call or grpc_server_request_registered_call (or their + wrappers in the C++ API). + expiry: 2024/05/05 + owner: ctiller@google.com + test_tags: [] - name: pick_first_happy_eyeballs description: Use Happy Eyeballs in pick_first. diff --git a/src/core/lib/experiments/rollouts.yaml b/src/core/lib/experiments/rollouts.yaml index ffc067fee6a..a796fe22eaf 100644 --- a/src/core/lib/experiments/rollouts.yaml +++ b/src/core/lib/experiments/rollouts.yaml @@ -94,6 +94,8 @@ default: true - name: peer_state_based_framing default: false +- name: pending_queue_cap + default: true - name: pick_first_happy_eyeballs default: true - name: ping_on_rst_stream diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index ddb627a6693..895691f8a00 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -35,7 +36,6 @@ #include "absl/container/flat_hash_map.h" #include "absl/status/status.h" #include "absl/types/optional.h" -#include "absl/types/variant.h" #include #include @@ -55,12 +55,12 @@ #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/debug_location.h" -#include "src/core/lib/gprpp/match.h" #include "src/core/lib/gprpp/mpscq.h" #include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/cancel_callback.h" #include "src/core/lib/promise/context.h" #include "src/core/lib/promise/map.h" #include "src/core/lib/promise/pipe.h" @@ -253,12 +253,12 @@ class Server::RequestMatcherInterface { // application to explicitly request RPCs and then matching those to incoming // RPCs, along with a slow path by which incoming RPCs are put on a locked // pending list if they aren't able to be matched to an application request. -class Server::RealRequestMatcher : public RequestMatcherInterface { +class Server::RealRequestMatcherFilterStack : public RequestMatcherInterface { public: - explicit RealRequestMatcher(Server* server) + explicit RealRequestMatcherFilterStack(Server* server) : server_(server), requests_per_cq_(server->cqs_.size()) {} - ~RealRequestMatcher() override { + ~RealRequestMatcherFilterStack() override { for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) { GPR_ASSERT(queue.Pop() == nullptr); } @@ -266,15 +266,8 @@ class Server::RealRequestMatcher : public RequestMatcherInterface { void ZombifyPending() override { while (!pending_.empty()) { - Match( - pending_.front(), - [](CallData* calld) { - calld->SetState(CallData::CallState::ZOMBIED); - calld->KillZombie(); - }, - [](const std::shared_ptr& w) { - w->Finish(absl::InternalError("Server closed")); - }); + pending_.front().calld->SetState(CallData::CallState::ZOMBIED); + pending_.front().calld->KillZombie(); pending_.pop(); } } @@ -300,39 +293,34 @@ class Server::RealRequestMatcher : public RequestMatcherInterface { // matching calls struct NextPendingCall { RequestedCall* rc = nullptr; - PendingCall pending; + CallData* pending; }; - auto pop_next_pending = [this, request_queue_index] { + while (true) { NextPendingCall pending_call; { MutexLock lock(&server_->mu_call_); + while (!pending_.empty() && + pending_.front().Age() > server_->max_time_in_pending_queue_) { + pending_.pop(); + } if (!pending_.empty()) { pending_call.rc = reinterpret_cast( requests_per_cq_[request_queue_index].Pop()); if (pending_call.rc != nullptr) { - pending_call.pending = std::move(pending_.front()); + pending_call.pending = pending_.front().calld; pending_.pop(); } } } - return pending_call; - }; - while (true) { - NextPendingCall next_pending = pop_next_pending(); - if (next_pending.rc == nullptr) break; - Match( - next_pending.pending, - [&](CallData* calld) { - if (!calld->MaybeActivate()) { - // Zombied Call - calld->KillZombie(); - } else { - calld->Publish(request_queue_index, next_pending.rc); - } - }, - [&](const std::shared_ptr& w) { - w->Finish(server(), request_queue_index, next_pending.rc); - }); + if (pending_call.rc == nullptr) break; + if (!pending_call.pending->MaybeActivate()) { + // Zombied Call + pending_call.pending->KillZombie(); + requests_per_cq_[request_queue_index].Push( + &pending_call.rc->mpscq_node); + } else { + pending_call.pending->Publish(request_queue_index, pending_call.rc); + } } } } @@ -369,7 +357,7 @@ class Server::RealRequestMatcher : public RequestMatcherInterface { } if (rc == nullptr) { calld->SetState(CallData::CallState::PENDING); - pending_.push(calld); + pending_.push(PendingCall{calld}); return; } } @@ -377,6 +365,91 @@ class Server::RealRequestMatcher : public RequestMatcherInterface { calld->Publish(cq_idx, rc); } + ArenaPromise> MatchRequest(size_t) override { + Crash("not implemented for filter stack request matcher"); + } + + Server* server() const final { return server_; } + + private: + Server* const server_; + struct PendingCall { + CallData* calld; + Timestamp created = Timestamp::Now(); + Duration Age() { return Timestamp::Now() - created; } + }; + std::queue pending_; + std::vector requests_per_cq_; +}; + +class Server::RealRequestMatcherPromises : public RequestMatcherInterface { + public: + explicit RealRequestMatcherPromises(Server* server) + : server_(server), requests_per_cq_(server->cqs_.size()) {} + + ~RealRequestMatcherPromises() override { + for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) { + GPR_ASSERT(queue.Pop() == nullptr); + } + } + + void ZombifyPending() override { + while (!pending_.empty()) { + pending_.front()->Finish(absl::InternalError("Server closed")); + pending_.pop(); + } + } + + void KillRequests(grpc_error_handle error) override { + for (size_t i = 0; i < requests_per_cq_.size(); i++) { + RequestedCall* rc; + while ((rc = reinterpret_cast( + requests_per_cq_[i].Pop())) != nullptr) { + server_->FailCall(i, rc, error); + } + } + } + + size_t request_queue_count() const override { + return requests_per_cq_.size(); + } + + void RequestCallWithPossiblePublish(size_t request_queue_index, + RequestedCall* call) override { + if (requests_per_cq_[request_queue_index].Push(&call->mpscq_node)) { + // this was the first queued request: we need to lock and start + // matching calls + struct NextPendingCall { + RequestedCall* rc = nullptr; + PendingCall pending; + }; + while (true) { + NextPendingCall pending_call; + { + MutexLock lock(&server_->mu_call_); + if (!pending_.empty()) { + pending_call.rc = reinterpret_cast( + requests_per_cq_[request_queue_index].Pop()); + if (pending_call.rc != nullptr) { + pending_call.pending = std::move(pending_.front()); + pending_.pop(); + } + } + } + if (pending_call.rc == nullptr) break; + if (!pending_call.pending->Finish(server(), request_queue_index, + pending_call.rc)) { + requests_per_cq_[request_queue_index].Push( + &pending_call.rc->mpscq_node); + } + } + } + } + + void MatchOrQueue(size_t, CallData*) override { + Crash("not implemented for promises"); + } + ArenaPromise> MatchRequest( size_t start_request_queue_index) override { for (size_t i = 0; i < requests_per_cq_.size(); i++) { @@ -396,25 +469,36 @@ class Server::RealRequestMatcher : public RequestMatcherInterface { size_t cq_idx = 0; size_t loop_count; { + std::vector> removed_pending; MutexLock lock(&server_->mu_call_); + while (!pending_.empty() && + pending_.front()->Age() > server_->max_time_in_pending_queue_) { + removed_pending.push_back(std::move(pending_.front())); + pending_.pop(); + } for (loop_count = 0; loop_count < requests_per_cq_.size(); loop_count++) { cq_idx = (start_request_queue_index + loop_count) % requests_per_cq_.size(); rc = reinterpret_cast(requests_per_cq_[cq_idx].Pop()); - if (rc != nullptr) { - break; - } + if (rc != nullptr) break; } if (rc == nullptr) { + if (server_->pending_backlog_protector_.Reject(pending_.size(), + server_->bitgen_)) { + return Immediate(absl::ResourceExhaustedError( + "Too many pending requests for this server")); + } auto w = std::make_shared( Activity::current()->MakeOwningWaker()); pending_.push(w); - return [w]() -> Poll> { - std::unique_ptr> r( - w->result.exchange(nullptr, std::memory_order_acq_rel)); - if (r == nullptr) return Pending{}; - return std::move(*r); - }; + return OnCancel( + [w]() -> Poll> { + std::unique_ptr> r( + w->result.exchange(nullptr, std::memory_order_acq_rel)); + if (r == nullptr) return Pending{}; + return std::move(*r); + }, + [w]() { w->Expire(); }); } } return Immediate(MatchResult(server(), cq_idx, rc)); @@ -425,23 +509,40 @@ class Server::RealRequestMatcher : public RequestMatcherInterface { private: Server* const server_; struct ActivityWaiter { + using ResultType = absl::StatusOr; explicit ActivityWaiter(Waker waker) : waker(std::move(waker)) {} ~ActivityWaiter() { delete result.load(std::memory_order_acquire); } void Finish(absl::Status status) { - result.store(new absl::StatusOr(std::move(status)), - std::memory_order_release); - waker.Wakeup(); + delete result.exchange(new ResultType(std::move(status)), + std::memory_order_acq_rel); + waker.WakeupAsync(); } - void Finish(Server* server, size_t cq_idx, RequestedCall* requested_call) { - result.store(new absl::StatusOr( - MatchResult(server, cq_idx, requested_call)), - std::memory_order_release); - waker.Wakeup(); + // Returns true if requested_call consumed, false otherwise. + GRPC_MUST_USE_RESULT bool Finish(Server* server, size_t cq_idx, + RequestedCall* requested_call) { + ResultType* expected = nullptr; + ResultType* new_value = + new ResultType(MatchResult(server, cq_idx, requested_call)); + if (!result.compare_exchange_strong(expected, new_value, + std::memory_order_acq_rel, + std::memory_order_acq_rel)) { + GPR_ASSERT(new_value->value().TakeCall() == requested_call); + delete new_value; + return false; + } + waker.WakeupAsync(); + return true; } + void Expire() { + delete result.exchange(new ResultType(absl::CancelledError()), + std::memory_order_acq_rel); + } + Duration Age() { return Timestamp::Now() - created; } Waker waker; - std::atomic*> result{nullptr}; + std::atomic result{nullptr}; + const Timestamp created = Timestamp::Now(); }; - using PendingCall = absl::variant>; + using PendingCall = std::shared_ptr; std::queue pending_; std::vector requests_per_cq_; }; @@ -716,6 +817,15 @@ void Server::AddListener(OrphanablePtr listener) { } void Server::Start() { + auto make_real_request_matcher = + [this]() -> std::unique_ptr { + if (IsPromiseBasedServerCallEnabled()) { + return std::make_unique(this); + } else { + return std::make_unique(this); + } + }; + started_ = true; for (grpc_completion_queue* cq : cqs_) { if (grpc_cq_can_listen(cq)) { @@ -723,11 +833,11 @@ void Server::Start() { } } if (unregistered_request_matcher_ == nullptr) { - unregistered_request_matcher_ = std::make_unique(this); + unregistered_request_matcher_ = make_real_request_matcher(); } for (std::unique_ptr& rm : registered_methods_) { if (rm->matcher == nullptr) { - rm->matcher = std::make_unique(this); + rm->matcher = make_real_request_matcher(); } } { diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index 954ac6283a6..f5999bcda30 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -34,6 +35,7 @@ #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" #include "absl/hash/hash.h" +#include "absl/random/random.h" #include "absl/status/statusor.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" @@ -42,6 +44,7 @@ #include #include +#include "src/core/lib/backoff/random_early_detection.h" #include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_fwd.h" @@ -66,6 +69,10 @@ #include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/transport.h" +#define GRPC_ARG_SERVER_MAX_PENDING_REQUESTS "grpc.server.max_pending_requests" +#define GRPC_ARG_SERVER_MAX_PENDING_REQUESTS_HARD_LIMIT \ + "grpc.server.max_pending_requests_hard_limit" + namespace grpc_core { extern TraceFlag grpc_server_channel_trace; @@ -225,7 +232,8 @@ class Server : public InternallyRefCounted, }; class RequestMatcherInterface; - class RealRequestMatcher; + class RealRequestMatcherFilterStack; + class RealRequestMatcherPromises; class AllocatingRequestMatcherBase; class AllocatingRequestMatcherBatch; class AllocatingRequestMatcherRegistered; @@ -505,6 +513,17 @@ class Server : public InternallyRefCounted, bool shutdown_published_ ABSL_GUARDED_BY(mu_global_) = false; std::vector shutdown_tags_ ABSL_GUARDED_BY(mu_global_); + RandomEarlyDetection pending_backlog_protector_ ABSL_GUARDED_BY(mu_call_){ + static_cast( + std::max(0, channel_args_.GetInt(GRPC_ARG_SERVER_MAX_PENDING_REQUESTS) + .value_or(1000))), + static_cast(std::max( + 0, + channel_args_.GetInt(GRPC_ARG_SERVER_MAX_PENDING_REQUESTS_HARD_LIMIT) + .value_or(3000)))}; + Duration max_time_in_pending_queue_{Duration::Seconds(30)}; + absl::BitGen bitgen_ ABSL_GUARDED_BY(mu_call_); + std::list channels_; std::list listeners_; diff --git a/test/core/end2end/tests/filter_context.cc b/test/core/end2end/tests/filter_context.cc index 834ff403577..8156384fbd6 100644 --- a/test/core/end2end/tests/filter_context.cc +++ b/test/core/end2end/tests/filter_context.cc @@ -29,6 +29,7 @@ #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/context.h" #include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error.h" @@ -100,6 +101,9 @@ CORE_END2END_TEST(CoreEnd2endTest, FilterContext) { CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) { for (auto type : {GRPC_CLIENT_CHANNEL, GRPC_CLIENT_SUBCHANNEL, GRPC_CLIENT_DIRECT_CHANNEL, GRPC_SERVER_CHANNEL}) { + if (type == GRPC_SERVER_CHANNEL && IsPromiseBasedServerCallEnabled()) { + continue; + } builder->channel_init()->RegisterFilter(type, &test_filter); } });