From 9c15d6327edbe84f8cca488ded08cb670a8040c2 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 26 Oct 2023 09:54:14 -0700 Subject: [PATCH] Revert "[server] Cap size of pending request queue with RealRequestMatcher" (#34803) Reverts grpc/grpc#34782 --- BUILD | 4 +- CMakeLists.txt | 3 - build_autogenerated.yaml | 4 - grpc.gyp | 3 - src/core/lib/experiments/experiments.cc | 30 --- src/core/lib/experiments/experiments.h | 11 -- src/core/lib/experiments/experiments.yaml | 11 -- src/core/lib/experiments/rollouts.yaml | 2 - src/core/lib/surface/server.cc | 224 ++++++---------------- src/core/lib/surface/server.h | 21 +- test/core/end2end/tests/filter_context.cc | 4 - 11 files changed, 60 insertions(+), 257 deletions(-) diff --git a/BUILD b/BUILD index 8fa1d326cf1..b93d89d0892 100644 --- a/BUILD +++ b/BUILD @@ -1493,13 +1493,13 @@ grpc_cc_library( "absl/functional:function_ref", "absl/hash", "absl/meta:type_traits", - "absl/random", "absl/status", "absl/status:statusor", "absl/strings", "absl/strings:str_format", "absl/time", "absl/types:optional", + "absl/types:variant", "absl/utility", "madler_zlib", ], @@ -1578,6 +1578,7 @@ grpc_cc_library( "//src/core:latch", "//src/core:loop", "//src/core:map", + "//src/core:match", "//src/core:memory_quota", "//src/core:metadata_compression_traits", "//src/core:no_destruct", @@ -1589,7 +1590,6 @@ grpc_cc_library( "//src/core:posix_event_engine_base_hdrs", "//src/core:promise_status", "//src/core:race", - "//src/core:random_early_detection", "//src/core:ref_counted", "//src/core:ref_counted_string", "//src/core:resolved_address", diff --git a/CMakeLists.txt b/CMakeLists.txt index 9e07ad89bba..278adc9fb98 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4874,7 +4874,6 @@ add_library(grpc_authorization_provider src/core/lib/address_utils/parse_address.cc src/core/lib/address_utils/sockaddr_utils.cc src/core/lib/backoff/backoff.cc - src/core/lib/backoff/random_early_detection.cc src/core/lib/channel/call_tracer.cc src/core/lib/channel/channel_args.cc src/core/lib/channel/channel_args_preconditioning.cc @@ -5177,8 +5176,6 @@ target_link_libraries(grpc_authorization_provider absl::function_ref absl::hash absl::type_traits - absl::random_bit_gen_ref - absl::random_distributions absl::statusor absl::span absl::utility diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index b637abfddd2..deba1f3005f 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -4124,7 +4124,6 @@ libs: - src/core/lib/address_utils/sockaddr_utils.h - src/core/lib/avl/avl.h - src/core/lib/backoff/backoff.h - - src/core/lib/backoff/random_early_detection.h - src/core/lib/channel/call_finalization.h - src/core/lib/channel/call_tracer.h - src/core/lib/channel/channel_args.h @@ -4445,7 +4444,6 @@ libs: - src/core/lib/address_utils/parse_address.cc - src/core/lib/address_utils/sockaddr_utils.cc - src/core/lib/backoff/backoff.cc - - src/core/lib/backoff/random_early_detection.cc - src/core/lib/channel/call_tracer.cc - src/core/lib/channel/channel_args.cc - src/core/lib/channel/channel_args_preconditioning.cc @@ -4713,8 +4711,6 @@ libs: - absl/functional:function_ref - absl/hash:hash - absl/meta:type_traits - - absl/random:bit_gen_ref - - absl/random:distributions - absl/status:statusor - absl/types:span - absl/utility:utility diff --git a/grpc.gyp b/grpc.gyp index 9d76638ef4b..1eb5a1f7cca 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -1976,8 +1976,6 @@ 'absl/functional:function_ref', 'absl/hash:hash', 'absl/meta:type_traits', - 'absl/random:bit_gen_ref', - 'absl/random:distributions', 'absl/status:statusor', 'absl/types:span', 'absl/utility:utility', @@ -1994,7 +1992,6 @@ 'src/core/lib/address_utils/parse_address.cc', 'src/core/lib/address_utils/sockaddr_utils.cc', 'src/core/lib/backoff/backoff.cc', - 'src/core/lib/backoff/random_early_detection.cc', 'src/core/lib/channel/call_tracer.cc', 'src/core/lib/channel/channel_args.cc', 'src/core/lib/channel/channel_args_preconditioning.cc', diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index 97d72147923..0a7a70fe7d1 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -95,14 +95,6 @@ const char* const description_peer_state_based_framing = "on the peer's memory pressure which is reflected in its max http2 frame " "size."; const char* const additional_constraints_peer_state_based_framing = "{}"; -const char* const description_pending_queue_cap = - "In the sync & async apis (but not the callback api), cap the number of " - "received but unrequested requests in the server for each call type. A " - "received message is one that was read from the wire on the server. A " - "requested message is one explicitly requested by the application using " - "grpc_server_request_call or grpc_server_request_registered_call (or their " - "wrappers in the C++ API)."; -const char* const additional_constraints_pending_queue_cap = "{}"; const char* const description_pick_first_happy_eyeballs = "Use Happy Eyeballs in pick_first."; const char* const additional_constraints_pick_first_happy_eyeballs = "{}"; @@ -256,8 +248,6 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_overload_protection, true, true}, {"peer_state_based_framing", description_peer_state_based_framing, additional_constraints_peer_state_based_framing, false, true}, - {"pending_queue_cap", description_pending_queue_cap, - additional_constraints_pending_queue_cap, true, true}, {"pick_first_happy_eyeballs", description_pick_first_happy_eyeballs, additional_constraints_pick_first_happy_eyeballs, true, true}, {"ping_on_rst_stream", description_ping_on_rst_stream, @@ -391,14 +381,6 @@ const char* const description_peer_state_based_framing = "on the peer's memory pressure which is reflected in its max http2 frame " "size."; const char* const additional_constraints_peer_state_based_framing = "{}"; -const char* const description_pending_queue_cap = - "In the sync & async apis (but not the callback api), cap the number of " - "received but unrequested requests in the server for each call type. A " - "received message is one that was read from the wire on the server. A " - "requested message is one explicitly requested by the application using " - "grpc_server_request_call or grpc_server_request_registered_call (or their " - "wrappers in the C++ API)."; -const char* const additional_constraints_pending_queue_cap = "{}"; const char* const description_pick_first_happy_eyeballs = "Use Happy Eyeballs in pick_first."; const char* const additional_constraints_pick_first_happy_eyeballs = "{}"; @@ -552,8 +534,6 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_overload_protection, true, true}, {"peer_state_based_framing", description_peer_state_based_framing, additional_constraints_peer_state_based_framing, false, true}, - {"pending_queue_cap", description_pending_queue_cap, - additional_constraints_pending_queue_cap, true, true}, {"pick_first_happy_eyeballs", description_pick_first_happy_eyeballs, additional_constraints_pick_first_happy_eyeballs, true, true}, {"ping_on_rst_stream", description_ping_on_rst_stream, @@ -687,14 +667,6 @@ const char* const description_peer_state_based_framing = "on the peer's memory pressure which is reflected in its max http2 frame " "size."; const char* const additional_constraints_peer_state_based_framing = "{}"; -const char* const description_pending_queue_cap = - "In the sync & async apis (but not the callback api), cap the number of " - "received but unrequested requests in the server for each call type. A " - "received message is one that was read from the wire on the server. A " - "requested message is one explicitly requested by the application using " - "grpc_server_request_call or grpc_server_request_registered_call (or their " - "wrappers in the C++ API)."; -const char* const additional_constraints_pending_queue_cap = "{}"; const char* const description_pick_first_happy_eyeballs = "Use Happy Eyeballs in pick_first."; const char* const additional_constraints_pick_first_happy_eyeballs = "{}"; @@ -848,8 +820,6 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_overload_protection, true, true}, {"peer_state_based_framing", description_peer_state_based_framing, additional_constraints_peer_state_based_framing, false, true}, - {"pending_queue_cap", description_pending_queue_cap, - additional_constraints_pending_queue_cap, true, true}, {"pick_first_happy_eyeballs", description_pick_first_happy_eyeballs, additional_constraints_pick_first_happy_eyeballs, true, true}, {"ping_on_rst_stream", description_ping_on_rst_stream, diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index f90ae0f05fa..3b6fbd31782 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -96,8 +96,6 @@ inline bool IsMultipingEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_OVERLOAD_PROTECTION inline bool IsOverloadProtectionEnabled() { return true; } inline bool IsPeerStateBasedFramingEnabled() { return false; } -#define GRPC_EXPERIMENT_IS_INCLUDED_PENDING_QUEUE_CAP -inline bool IsPendingQueueCapEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS inline bool IsPickFirstHappyEyeballsEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_PING_ON_RST_STREAM @@ -176,8 +174,6 @@ inline bool IsMultipingEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_OVERLOAD_PROTECTION inline bool IsOverloadProtectionEnabled() { return true; } inline bool IsPeerStateBasedFramingEnabled() { return false; } -#define GRPC_EXPERIMENT_IS_INCLUDED_PENDING_QUEUE_CAP -inline bool IsPendingQueueCapEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS inline bool IsPickFirstHappyEyeballsEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_PING_ON_RST_STREAM @@ -255,8 +251,6 @@ inline bool IsMultipingEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_OVERLOAD_PROTECTION inline bool IsOverloadProtectionEnabled() { return true; } inline bool IsPeerStateBasedFramingEnabled() { return false; } -#define GRPC_EXPERIMENT_IS_INCLUDED_PENDING_QUEUE_CAP -inline bool IsPendingQueueCapEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS inline bool IsPickFirstHappyEyeballsEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_PING_ON_RST_STREAM @@ -318,7 +312,6 @@ enum ExperimentIds { kExperimentIdMultiping, kExperimentIdOverloadProtection, kExperimentIdPeerStateBasedFraming, - kExperimentIdPendingQueueCap, kExperimentIdPickFirstHappyEyeballs, kExperimentIdPingOnRstStream, kExperimentIdPromiseBasedClientCall, @@ -431,10 +424,6 @@ inline bool IsOverloadProtectionEnabled() { inline bool IsPeerStateBasedFramingEnabled() { return IsExperimentEnabled(kExperimentIdPeerStateBasedFraming); } -#define GRPC_EXPERIMENT_IS_INCLUDED_PENDING_QUEUE_CAP -inline bool IsPendingQueueCapEnabled() { - return IsExperimentEnabled(kExperimentIdPendingQueueCap); -} #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS inline bool IsPickFirstHappyEyeballsEnabled() { return IsExperimentEnabled(kExperimentIdPickFirstHappyEyeballs); diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index bec9c44e645..90d4a34cde8 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -165,17 +165,6 @@ expiry: 2024/01/01 owner: vigneshbabu@google.com test_tags: ["flow_control_test"] -- name: pending_queue_cap - description: - In the sync & async apis (but not the callback api), cap the number of - received but unrequested requests in the server for each call type. - A received message is one that was read from the wire on the server. - A requested message is one explicitly requested by the application using - grpc_server_request_call or grpc_server_request_registered_call (or their - wrappers in the C++ API). - expiry: 2024/05/05 - owner: ctiller@google.com - test_tags: [] - name: pick_first_happy_eyeballs description: Use Happy Eyeballs in pick_first. diff --git a/src/core/lib/experiments/rollouts.yaml b/src/core/lib/experiments/rollouts.yaml index a796fe22eaf..ffc067fee6a 100644 --- a/src/core/lib/experiments/rollouts.yaml +++ b/src/core/lib/experiments/rollouts.yaml @@ -94,8 +94,6 @@ default: true - name: peer_state_based_framing default: false -- name: pending_queue_cap - default: true - name: pick_first_happy_eyeballs default: true - name: ping_on_rst_stream diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 895691f8a00..ddb627a6693 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -25,7 +25,6 @@ #include #include #include -#include #include #include #include @@ -36,6 +35,7 @@ #include "absl/container/flat_hash_map.h" #include "absl/status/status.h" #include "absl/types/optional.h" +#include "absl/types/variant.h" #include #include @@ -55,12 +55,12 @@ #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/debug_location.h" +#include "src/core/lib/gprpp/match.h" #include "src/core/lib/gprpp/mpscq.h" #include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/promise/activity.h" -#include "src/core/lib/promise/cancel_callback.h" #include "src/core/lib/promise/context.h" #include "src/core/lib/promise/map.h" #include "src/core/lib/promise/pipe.h" @@ -253,12 +253,12 @@ class Server::RequestMatcherInterface { // application to explicitly request RPCs and then matching those to incoming // RPCs, along with a slow path by which incoming RPCs are put on a locked // pending list if they aren't able to be matched to an application request. -class Server::RealRequestMatcherFilterStack : public RequestMatcherInterface { +class Server::RealRequestMatcher : public RequestMatcherInterface { public: - explicit RealRequestMatcherFilterStack(Server* server) + explicit RealRequestMatcher(Server* server) : server_(server), requests_per_cq_(server->cqs_.size()) {} - ~RealRequestMatcherFilterStack() override { + ~RealRequestMatcher() override { for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) { GPR_ASSERT(queue.Pop() == nullptr); } @@ -266,8 +266,15 @@ class Server::RealRequestMatcherFilterStack : public RequestMatcherInterface { void ZombifyPending() override { while (!pending_.empty()) { - pending_.front().calld->SetState(CallData::CallState::ZOMBIED); - pending_.front().calld->KillZombie(); + Match( + pending_.front(), + [](CallData* calld) { + calld->SetState(CallData::CallState::ZOMBIED); + calld->KillZombie(); + }, + [](const std::shared_ptr& w) { + w->Finish(absl::InternalError("Server closed")); + }); pending_.pop(); } } @@ -293,34 +300,39 @@ class Server::RealRequestMatcherFilterStack : public RequestMatcherInterface { // matching calls struct NextPendingCall { RequestedCall* rc = nullptr; - CallData* pending; + PendingCall pending; }; - while (true) { + auto pop_next_pending = [this, request_queue_index] { NextPendingCall pending_call; { MutexLock lock(&server_->mu_call_); - while (!pending_.empty() && - pending_.front().Age() > server_->max_time_in_pending_queue_) { - pending_.pop(); - } if (!pending_.empty()) { pending_call.rc = reinterpret_cast( requests_per_cq_[request_queue_index].Pop()); if (pending_call.rc != nullptr) { - pending_call.pending = pending_.front().calld; + pending_call.pending = std::move(pending_.front()); pending_.pop(); } } } - if (pending_call.rc == nullptr) break; - if (!pending_call.pending->MaybeActivate()) { - // Zombied Call - pending_call.pending->KillZombie(); - requests_per_cq_[request_queue_index].Push( - &pending_call.rc->mpscq_node); - } else { - pending_call.pending->Publish(request_queue_index, pending_call.rc); - } + return pending_call; + }; + while (true) { + NextPendingCall next_pending = pop_next_pending(); + if (next_pending.rc == nullptr) break; + Match( + next_pending.pending, + [&](CallData* calld) { + if (!calld->MaybeActivate()) { + // Zombied Call + calld->KillZombie(); + } else { + calld->Publish(request_queue_index, next_pending.rc); + } + }, + [&](const std::shared_ptr& w) { + w->Finish(server(), request_queue_index, next_pending.rc); + }); } } } @@ -357,7 +369,7 @@ class Server::RealRequestMatcherFilterStack : public RequestMatcherInterface { } if (rc == nullptr) { calld->SetState(CallData::CallState::PENDING); - pending_.push(PendingCall{calld}); + pending_.push(calld); return; } } @@ -365,91 +377,6 @@ class Server::RealRequestMatcherFilterStack : public RequestMatcherInterface { calld->Publish(cq_idx, rc); } - ArenaPromise> MatchRequest(size_t) override { - Crash("not implemented for filter stack request matcher"); - } - - Server* server() const final { return server_; } - - private: - Server* const server_; - struct PendingCall { - CallData* calld; - Timestamp created = Timestamp::Now(); - Duration Age() { return Timestamp::Now() - created; } - }; - std::queue pending_; - std::vector requests_per_cq_; -}; - -class Server::RealRequestMatcherPromises : public RequestMatcherInterface { - public: - explicit RealRequestMatcherPromises(Server* server) - : server_(server), requests_per_cq_(server->cqs_.size()) {} - - ~RealRequestMatcherPromises() override { - for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) { - GPR_ASSERT(queue.Pop() == nullptr); - } - } - - void ZombifyPending() override { - while (!pending_.empty()) { - pending_.front()->Finish(absl::InternalError("Server closed")); - pending_.pop(); - } - } - - void KillRequests(grpc_error_handle error) override { - for (size_t i = 0; i < requests_per_cq_.size(); i++) { - RequestedCall* rc; - while ((rc = reinterpret_cast( - requests_per_cq_[i].Pop())) != nullptr) { - server_->FailCall(i, rc, error); - } - } - } - - size_t request_queue_count() const override { - return requests_per_cq_.size(); - } - - void RequestCallWithPossiblePublish(size_t request_queue_index, - RequestedCall* call) override { - if (requests_per_cq_[request_queue_index].Push(&call->mpscq_node)) { - // this was the first queued request: we need to lock and start - // matching calls - struct NextPendingCall { - RequestedCall* rc = nullptr; - PendingCall pending; - }; - while (true) { - NextPendingCall pending_call; - { - MutexLock lock(&server_->mu_call_); - if (!pending_.empty()) { - pending_call.rc = reinterpret_cast( - requests_per_cq_[request_queue_index].Pop()); - if (pending_call.rc != nullptr) { - pending_call.pending = std::move(pending_.front()); - pending_.pop(); - } - } - } - if (pending_call.rc == nullptr) break; - if (!pending_call.pending->Finish(server(), request_queue_index, - pending_call.rc)) { - requests_per_cq_[request_queue_index].Push( - &pending_call.rc->mpscq_node); - } - } - } - } - - void MatchOrQueue(size_t, CallData*) override { - Crash("not implemented for promises"); - } - ArenaPromise> MatchRequest( size_t start_request_queue_index) override { for (size_t i = 0; i < requests_per_cq_.size(); i++) { @@ -469,36 +396,25 @@ class Server::RealRequestMatcherPromises : public RequestMatcherInterface { size_t cq_idx = 0; size_t loop_count; { - std::vector> removed_pending; MutexLock lock(&server_->mu_call_); - while (!pending_.empty() && - pending_.front()->Age() > server_->max_time_in_pending_queue_) { - removed_pending.push_back(std::move(pending_.front())); - pending_.pop(); - } for (loop_count = 0; loop_count < requests_per_cq_.size(); loop_count++) { cq_idx = (start_request_queue_index + loop_count) % requests_per_cq_.size(); rc = reinterpret_cast(requests_per_cq_[cq_idx].Pop()); - if (rc != nullptr) break; + if (rc != nullptr) { + break; + } } if (rc == nullptr) { - if (server_->pending_backlog_protector_.Reject(pending_.size(), - server_->bitgen_)) { - return Immediate(absl::ResourceExhaustedError( - "Too many pending requests for this server")); - } auto w = std::make_shared( Activity::current()->MakeOwningWaker()); pending_.push(w); - return OnCancel( - [w]() -> Poll> { - std::unique_ptr> r( - w->result.exchange(nullptr, std::memory_order_acq_rel)); - if (r == nullptr) return Pending{}; - return std::move(*r); - }, - [w]() { w->Expire(); }); + return [w]() -> Poll> { + std::unique_ptr> r( + w->result.exchange(nullptr, std::memory_order_acq_rel)); + if (r == nullptr) return Pending{}; + return std::move(*r); + }; } } return Immediate(MatchResult(server(), cq_idx, rc)); @@ -509,40 +425,23 @@ class Server::RealRequestMatcherPromises : public RequestMatcherInterface { private: Server* const server_; struct ActivityWaiter { - using ResultType = absl::StatusOr; explicit ActivityWaiter(Waker waker) : waker(std::move(waker)) {} ~ActivityWaiter() { delete result.load(std::memory_order_acquire); } void Finish(absl::Status status) { - delete result.exchange(new ResultType(std::move(status)), - std::memory_order_acq_rel); - waker.WakeupAsync(); + result.store(new absl::StatusOr(std::move(status)), + std::memory_order_release); + waker.Wakeup(); } - // Returns true if requested_call consumed, false otherwise. - GRPC_MUST_USE_RESULT bool Finish(Server* server, size_t cq_idx, - RequestedCall* requested_call) { - ResultType* expected = nullptr; - ResultType* new_value = - new ResultType(MatchResult(server, cq_idx, requested_call)); - if (!result.compare_exchange_strong(expected, new_value, - std::memory_order_acq_rel, - std::memory_order_acq_rel)) { - GPR_ASSERT(new_value->value().TakeCall() == requested_call); - delete new_value; - return false; - } - waker.WakeupAsync(); - return true; + void Finish(Server* server, size_t cq_idx, RequestedCall* requested_call) { + result.store(new absl::StatusOr( + MatchResult(server, cq_idx, requested_call)), + std::memory_order_release); + waker.Wakeup(); } - void Expire() { - delete result.exchange(new ResultType(absl::CancelledError()), - std::memory_order_acq_rel); - } - Duration Age() { return Timestamp::Now() - created; } Waker waker; - std::atomic result{nullptr}; - const Timestamp created = Timestamp::Now(); + std::atomic*> result{nullptr}; }; - using PendingCall = std::shared_ptr; + using PendingCall = absl::variant>; std::queue pending_; std::vector requests_per_cq_; }; @@ -817,15 +716,6 @@ void Server::AddListener(OrphanablePtr listener) { } void Server::Start() { - auto make_real_request_matcher = - [this]() -> std::unique_ptr { - if (IsPromiseBasedServerCallEnabled()) { - return std::make_unique(this); - } else { - return std::make_unique(this); - } - }; - started_ = true; for (grpc_completion_queue* cq : cqs_) { if (grpc_cq_can_listen(cq)) { @@ -833,11 +723,11 @@ void Server::Start() { } } if (unregistered_request_matcher_ == nullptr) { - unregistered_request_matcher_ = make_real_request_matcher(); + unregistered_request_matcher_ = std::make_unique(this); } for (std::unique_ptr& rm : registered_methods_) { if (rm->matcher == nullptr) { - rm->matcher = make_real_request_matcher(); + rm->matcher = std::make_unique(this); } } { diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index f5999bcda30..954ac6283a6 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -22,7 +22,6 @@ #include #include -#include #include #include #include @@ -35,7 +34,6 @@ #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" #include "absl/hash/hash.h" -#include "absl/random/random.h" #include "absl/status/statusor.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" @@ -44,7 +42,6 @@ #include #include -#include "src/core/lib/backoff/random_early_detection.h" #include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_fwd.h" @@ -69,10 +66,6 @@ #include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/transport.h" -#define GRPC_ARG_SERVER_MAX_PENDING_REQUESTS "grpc.server.max_pending_requests" -#define GRPC_ARG_SERVER_MAX_PENDING_REQUESTS_HARD_LIMIT \ - "grpc.server.max_pending_requests_hard_limit" - namespace grpc_core { extern TraceFlag grpc_server_channel_trace; @@ -232,8 +225,7 @@ class Server : public InternallyRefCounted, }; class RequestMatcherInterface; - class RealRequestMatcherFilterStack; - class RealRequestMatcherPromises; + class RealRequestMatcher; class AllocatingRequestMatcherBase; class AllocatingRequestMatcherBatch; class AllocatingRequestMatcherRegistered; @@ -513,17 +505,6 @@ class Server : public InternallyRefCounted, bool shutdown_published_ ABSL_GUARDED_BY(mu_global_) = false; std::vector shutdown_tags_ ABSL_GUARDED_BY(mu_global_); - RandomEarlyDetection pending_backlog_protector_ ABSL_GUARDED_BY(mu_call_){ - static_cast( - std::max(0, channel_args_.GetInt(GRPC_ARG_SERVER_MAX_PENDING_REQUESTS) - .value_or(1000))), - static_cast(std::max( - 0, - channel_args_.GetInt(GRPC_ARG_SERVER_MAX_PENDING_REQUESTS_HARD_LIMIT) - .value_or(3000)))}; - Duration max_time_in_pending_queue_{Duration::Seconds(30)}; - absl::BitGen bitgen_ ABSL_GUARDED_BY(mu_call_); - std::list channels_; std::list listeners_; diff --git a/test/core/end2end/tests/filter_context.cc b/test/core/end2end/tests/filter_context.cc index 8156384fbd6..834ff403577 100644 --- a/test/core/end2end/tests/filter_context.cc +++ b/test/core/end2end/tests/filter_context.cc @@ -29,7 +29,6 @@ #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/context.h" #include "src/core/lib/config/core_configuration.h" -#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error.h" @@ -101,9 +100,6 @@ CORE_END2END_TEST(CoreEnd2endTest, FilterContext) { CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) { for (auto type : {GRPC_CLIENT_CHANNEL, GRPC_CLIENT_SUBCHANNEL, GRPC_CLIENT_DIRECT_CHANNEL, GRPC_SERVER_CHANNEL}) { - if (type == GRPC_SERVER_CHANNEL && IsPromiseBasedServerCallEnabled()) { - continue; - } builder->channel_init()->RegisterFilter(type, &test_filter); } });