[server] Cap size of pending request queue with RealRequestMatcher (#34782)

Also break the filter stack and promise based versions apart so that I
can re-understand this code.

---------

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/34799/head
Craig Tiller 1 year ago committed by GitHub
parent 3b2f9c606c
commit f09cd072fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      BUILD
  2. 3
      CMakeLists.txt
  3. 4
      build_autogenerated.yaml
  4. 3
      grpc.gyp
  5. 30
      src/core/lib/experiments/experiments.cc
  6. 11
      src/core/lib/experiments/experiments.h
  7. 11
      src/core/lib/experiments/experiments.yaml
  8. 2
      src/core/lib/experiments/rollouts.yaml
  9. 224
      src/core/lib/surface/server.cc
  10. 21
      src/core/lib/surface/server.h
  11. 4
      test/core/end2end/tests/filter_context.cc

@ -1493,13 +1493,13 @@ grpc_cc_library(
"absl/functional:function_ref", "absl/functional:function_ref",
"absl/hash", "absl/hash",
"absl/meta:type_traits", "absl/meta:type_traits",
"absl/random",
"absl/status", "absl/status",
"absl/status:statusor", "absl/status:statusor",
"absl/strings", "absl/strings",
"absl/strings:str_format", "absl/strings:str_format",
"absl/time", "absl/time",
"absl/types:optional", "absl/types:optional",
"absl/types:variant",
"absl/utility", "absl/utility",
"madler_zlib", "madler_zlib",
], ],
@ -1578,7 +1578,6 @@ grpc_cc_library(
"//src/core:latch", "//src/core:latch",
"//src/core:loop", "//src/core:loop",
"//src/core:map", "//src/core:map",
"//src/core:match",
"//src/core:memory_quota", "//src/core:memory_quota",
"//src/core:metadata_compression_traits", "//src/core:metadata_compression_traits",
"//src/core:no_destruct", "//src/core:no_destruct",
@ -1590,6 +1589,7 @@ grpc_cc_library(
"//src/core:posix_event_engine_base_hdrs", "//src/core:posix_event_engine_base_hdrs",
"//src/core:promise_status", "//src/core:promise_status",
"//src/core:race", "//src/core:race",
"//src/core:random_early_detection",
"//src/core:ref_counted", "//src/core:ref_counted",
"//src/core:ref_counted_string", "//src/core:ref_counted_string",
"//src/core:resolved_address", "//src/core:resolved_address",

3
CMakeLists.txt generated

@ -4874,6 +4874,7 @@ add_library(grpc_authorization_provider
src/core/lib/address_utils/parse_address.cc src/core/lib/address_utils/parse_address.cc
src/core/lib/address_utils/sockaddr_utils.cc src/core/lib/address_utils/sockaddr_utils.cc
src/core/lib/backoff/backoff.cc src/core/lib/backoff/backoff.cc
src/core/lib/backoff/random_early_detection.cc
src/core/lib/channel/call_tracer.cc src/core/lib/channel/call_tracer.cc
src/core/lib/channel/channel_args.cc src/core/lib/channel/channel_args.cc
src/core/lib/channel/channel_args_preconditioning.cc src/core/lib/channel/channel_args_preconditioning.cc
@ -5176,6 +5177,8 @@ target_link_libraries(grpc_authorization_provider
absl::function_ref absl::function_ref
absl::hash absl::hash
absl::type_traits absl::type_traits
absl::random_bit_gen_ref
absl::random_distributions
absl::statusor absl::statusor
absl::span absl::span
absl::utility absl::utility

@ -4124,6 +4124,7 @@ libs:
- src/core/lib/address_utils/sockaddr_utils.h - src/core/lib/address_utils/sockaddr_utils.h
- src/core/lib/avl/avl.h - src/core/lib/avl/avl.h
- src/core/lib/backoff/backoff.h - src/core/lib/backoff/backoff.h
- src/core/lib/backoff/random_early_detection.h
- src/core/lib/channel/call_finalization.h - src/core/lib/channel/call_finalization.h
- src/core/lib/channel/call_tracer.h - src/core/lib/channel/call_tracer.h
- src/core/lib/channel/channel_args.h - src/core/lib/channel/channel_args.h
@ -4444,6 +4445,7 @@ libs:
- src/core/lib/address_utils/parse_address.cc - src/core/lib/address_utils/parse_address.cc
- src/core/lib/address_utils/sockaddr_utils.cc - src/core/lib/address_utils/sockaddr_utils.cc
- src/core/lib/backoff/backoff.cc - src/core/lib/backoff/backoff.cc
- src/core/lib/backoff/random_early_detection.cc
- src/core/lib/channel/call_tracer.cc - src/core/lib/channel/call_tracer.cc
- src/core/lib/channel/channel_args.cc - src/core/lib/channel/channel_args.cc
- src/core/lib/channel/channel_args_preconditioning.cc - src/core/lib/channel/channel_args_preconditioning.cc
@ -4711,6 +4713,8 @@ libs:
- absl/functional:function_ref - absl/functional:function_ref
- absl/hash:hash - absl/hash:hash
- absl/meta:type_traits - absl/meta:type_traits
- absl/random:bit_gen_ref
- absl/random:distributions
- absl/status:statusor - absl/status:statusor
- absl/types:span - absl/types:span
- absl/utility:utility - absl/utility:utility

3
grpc.gyp generated

@ -1976,6 +1976,8 @@
'absl/functional:function_ref', 'absl/functional:function_ref',
'absl/hash:hash', 'absl/hash:hash',
'absl/meta:type_traits', 'absl/meta:type_traits',
'absl/random:bit_gen_ref',
'absl/random:distributions',
'absl/status:statusor', 'absl/status:statusor',
'absl/types:span', 'absl/types:span',
'absl/utility:utility', 'absl/utility:utility',
@ -1992,6 +1994,7 @@
'src/core/lib/address_utils/parse_address.cc', 'src/core/lib/address_utils/parse_address.cc',
'src/core/lib/address_utils/sockaddr_utils.cc', 'src/core/lib/address_utils/sockaddr_utils.cc',
'src/core/lib/backoff/backoff.cc', 'src/core/lib/backoff/backoff.cc',
'src/core/lib/backoff/random_early_detection.cc',
'src/core/lib/channel/call_tracer.cc', 'src/core/lib/channel/call_tracer.cc',
'src/core/lib/channel/channel_args.cc', 'src/core/lib/channel/channel_args.cc',
'src/core/lib/channel/channel_args_preconditioning.cc', 'src/core/lib/channel/channel_args_preconditioning.cc',

@ -95,6 +95,14 @@ const char* const description_peer_state_based_framing =
"on the peer's memory pressure which is reflected in its max http2 frame " "on the peer's memory pressure which is reflected in its max http2 frame "
"size."; "size.";
const char* const additional_constraints_peer_state_based_framing = "{}"; const char* const additional_constraints_peer_state_based_framing = "{}";
const char* const description_pending_queue_cap =
"In the sync & async apis (but not the callback api), cap the number of "
"received but unrequested requests in the server for each call type. A "
"received message is one that was read from the wire on the server. A "
"requested message is one explicitly requested by the application using "
"grpc_server_request_call or grpc_server_request_registered_call (or their "
"wrappers in the C++ API).";
const char* const additional_constraints_pending_queue_cap = "{}";
const char* const description_pick_first_happy_eyeballs = const char* const description_pick_first_happy_eyeballs =
"Use Happy Eyeballs in pick_first."; "Use Happy Eyeballs in pick_first.";
const char* const additional_constraints_pick_first_happy_eyeballs = "{}"; const char* const additional_constraints_pick_first_happy_eyeballs = "{}";
@ -248,6 +256,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_overload_protection, true, true}, additional_constraints_overload_protection, true, true},
{"peer_state_based_framing", description_peer_state_based_framing, {"peer_state_based_framing", description_peer_state_based_framing,
additional_constraints_peer_state_based_framing, false, true}, additional_constraints_peer_state_based_framing, false, true},
{"pending_queue_cap", description_pending_queue_cap,
additional_constraints_pending_queue_cap, true, true},
{"pick_first_happy_eyeballs", description_pick_first_happy_eyeballs, {"pick_first_happy_eyeballs", description_pick_first_happy_eyeballs,
additional_constraints_pick_first_happy_eyeballs, true, true}, additional_constraints_pick_first_happy_eyeballs, true, true},
{"ping_on_rst_stream", description_ping_on_rst_stream, {"ping_on_rst_stream", description_ping_on_rst_stream,
@ -381,6 +391,14 @@ const char* const description_peer_state_based_framing =
"on the peer's memory pressure which is reflected in its max http2 frame " "on the peer's memory pressure which is reflected in its max http2 frame "
"size."; "size.";
const char* const additional_constraints_peer_state_based_framing = "{}"; const char* const additional_constraints_peer_state_based_framing = "{}";
const char* const description_pending_queue_cap =
"In the sync & async apis (but not the callback api), cap the number of "
"received but unrequested requests in the server for each call type. A "
"received message is one that was read from the wire on the server. A "
"requested message is one explicitly requested by the application using "
"grpc_server_request_call or grpc_server_request_registered_call (or their "
"wrappers in the C++ API).";
const char* const additional_constraints_pending_queue_cap = "{}";
const char* const description_pick_first_happy_eyeballs = const char* const description_pick_first_happy_eyeballs =
"Use Happy Eyeballs in pick_first."; "Use Happy Eyeballs in pick_first.";
const char* const additional_constraints_pick_first_happy_eyeballs = "{}"; const char* const additional_constraints_pick_first_happy_eyeballs = "{}";
@ -534,6 +552,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_overload_protection, true, true}, additional_constraints_overload_protection, true, true},
{"peer_state_based_framing", description_peer_state_based_framing, {"peer_state_based_framing", description_peer_state_based_framing,
additional_constraints_peer_state_based_framing, false, true}, additional_constraints_peer_state_based_framing, false, true},
{"pending_queue_cap", description_pending_queue_cap,
additional_constraints_pending_queue_cap, true, true},
{"pick_first_happy_eyeballs", description_pick_first_happy_eyeballs, {"pick_first_happy_eyeballs", description_pick_first_happy_eyeballs,
additional_constraints_pick_first_happy_eyeballs, true, true}, additional_constraints_pick_first_happy_eyeballs, true, true},
{"ping_on_rst_stream", description_ping_on_rst_stream, {"ping_on_rst_stream", description_ping_on_rst_stream,
@ -667,6 +687,14 @@ const char* const description_peer_state_based_framing =
"on the peer's memory pressure which is reflected in its max http2 frame " "on the peer's memory pressure which is reflected in its max http2 frame "
"size."; "size.";
const char* const additional_constraints_peer_state_based_framing = "{}"; const char* const additional_constraints_peer_state_based_framing = "{}";
const char* const description_pending_queue_cap =
"In the sync & async apis (but not the callback api), cap the number of "
"received but unrequested requests in the server for each call type. A "
"received message is one that was read from the wire on the server. A "
"requested message is one explicitly requested by the application using "
"grpc_server_request_call or grpc_server_request_registered_call (or their "
"wrappers in the C++ API).";
const char* const additional_constraints_pending_queue_cap = "{}";
const char* const description_pick_first_happy_eyeballs = const char* const description_pick_first_happy_eyeballs =
"Use Happy Eyeballs in pick_first."; "Use Happy Eyeballs in pick_first.";
const char* const additional_constraints_pick_first_happy_eyeballs = "{}"; const char* const additional_constraints_pick_first_happy_eyeballs = "{}";
@ -820,6 +848,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_overload_protection, true, true}, additional_constraints_overload_protection, true, true},
{"peer_state_based_framing", description_peer_state_based_framing, {"peer_state_based_framing", description_peer_state_based_framing,
additional_constraints_peer_state_based_framing, false, true}, additional_constraints_peer_state_based_framing, false, true},
{"pending_queue_cap", description_pending_queue_cap,
additional_constraints_pending_queue_cap, true, true},
{"pick_first_happy_eyeballs", description_pick_first_happy_eyeballs, {"pick_first_happy_eyeballs", description_pick_first_happy_eyeballs,
additional_constraints_pick_first_happy_eyeballs, true, true}, additional_constraints_pick_first_happy_eyeballs, true, true},
{"ping_on_rst_stream", description_ping_on_rst_stream, {"ping_on_rst_stream", description_ping_on_rst_stream,

@ -96,6 +96,8 @@ inline bool IsMultipingEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_OVERLOAD_PROTECTION #define GRPC_EXPERIMENT_IS_INCLUDED_OVERLOAD_PROTECTION
inline bool IsOverloadProtectionEnabled() { return true; } inline bool IsOverloadProtectionEnabled() { return true; }
inline bool IsPeerStateBasedFramingEnabled() { return false; } inline bool IsPeerStateBasedFramingEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PENDING_QUEUE_CAP
inline bool IsPendingQueueCapEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS
inline bool IsPickFirstHappyEyeballsEnabled() { return true; } inline bool IsPickFirstHappyEyeballsEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PING_ON_RST_STREAM #define GRPC_EXPERIMENT_IS_INCLUDED_PING_ON_RST_STREAM
@ -174,6 +176,8 @@ inline bool IsMultipingEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_OVERLOAD_PROTECTION #define GRPC_EXPERIMENT_IS_INCLUDED_OVERLOAD_PROTECTION
inline bool IsOverloadProtectionEnabled() { return true; } inline bool IsOverloadProtectionEnabled() { return true; }
inline bool IsPeerStateBasedFramingEnabled() { return false; } inline bool IsPeerStateBasedFramingEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PENDING_QUEUE_CAP
inline bool IsPendingQueueCapEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS
inline bool IsPickFirstHappyEyeballsEnabled() { return true; } inline bool IsPickFirstHappyEyeballsEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PING_ON_RST_STREAM #define GRPC_EXPERIMENT_IS_INCLUDED_PING_ON_RST_STREAM
@ -251,6 +255,8 @@ inline bool IsMultipingEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_OVERLOAD_PROTECTION #define GRPC_EXPERIMENT_IS_INCLUDED_OVERLOAD_PROTECTION
inline bool IsOverloadProtectionEnabled() { return true; } inline bool IsOverloadProtectionEnabled() { return true; }
inline bool IsPeerStateBasedFramingEnabled() { return false; } inline bool IsPeerStateBasedFramingEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PENDING_QUEUE_CAP
inline bool IsPendingQueueCapEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS
inline bool IsPickFirstHappyEyeballsEnabled() { return true; } inline bool IsPickFirstHappyEyeballsEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PING_ON_RST_STREAM #define GRPC_EXPERIMENT_IS_INCLUDED_PING_ON_RST_STREAM
@ -312,6 +318,7 @@ enum ExperimentIds {
kExperimentIdMultiping, kExperimentIdMultiping,
kExperimentIdOverloadProtection, kExperimentIdOverloadProtection,
kExperimentIdPeerStateBasedFraming, kExperimentIdPeerStateBasedFraming,
kExperimentIdPendingQueueCap,
kExperimentIdPickFirstHappyEyeballs, kExperimentIdPickFirstHappyEyeballs,
kExperimentIdPingOnRstStream, kExperimentIdPingOnRstStream,
kExperimentIdPromiseBasedClientCall, kExperimentIdPromiseBasedClientCall,
@ -424,6 +431,10 @@ inline bool IsOverloadProtectionEnabled() {
inline bool IsPeerStateBasedFramingEnabled() { inline bool IsPeerStateBasedFramingEnabled() {
return IsExperimentEnabled(kExperimentIdPeerStateBasedFraming); return IsExperimentEnabled(kExperimentIdPeerStateBasedFraming);
} }
#define GRPC_EXPERIMENT_IS_INCLUDED_PENDING_QUEUE_CAP
inline bool IsPendingQueueCapEnabled() {
return IsExperimentEnabled(kExperimentIdPendingQueueCap);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS
inline bool IsPickFirstHappyEyeballsEnabled() { inline bool IsPickFirstHappyEyeballsEnabled() {
return IsExperimentEnabled(kExperimentIdPickFirstHappyEyeballs); return IsExperimentEnabled(kExperimentIdPickFirstHappyEyeballs);

@ -165,6 +165,17 @@
expiry: 2024/01/01 expiry: 2024/01/01
owner: vigneshbabu@google.com owner: vigneshbabu@google.com
test_tags: ["flow_control_test"] test_tags: ["flow_control_test"]
- name: pending_queue_cap
description:
In the sync & async apis (but not the callback api), cap the number of
received but unrequested requests in the server for each call type.
A received message is one that was read from the wire on the server.
A requested message is one explicitly requested by the application using
grpc_server_request_call or grpc_server_request_registered_call (or their
wrappers in the C++ API).
expiry: 2024/05/05
owner: ctiller@google.com
test_tags: []
- name: pick_first_happy_eyeballs - name: pick_first_happy_eyeballs
description: description:
Use Happy Eyeballs in pick_first. Use Happy Eyeballs in pick_first.

@ -94,6 +94,8 @@
default: true default: true
- name: peer_state_based_framing - name: peer_state_based_framing
default: false default: false
- name: pending_queue_cap
default: true
- name: pick_first_happy_eyeballs - name: pick_first_happy_eyeballs
default: true default: true
- name: ping_on_rst_stream - name: ping_on_rst_stream

@ -25,6 +25,7 @@
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <list> #include <list>
#include <memory>
#include <new> #include <new>
#include <queue> #include <queue>
#include <type_traits> #include <type_traits>
@ -35,7 +36,6 @@
#include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_map.h"
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/byte_buffer.h> #include <grpc/byte_buffer.h>
#include <grpc/grpc.h> #include <grpc/grpc.h>
@ -55,12 +55,12 @@
#include "src/core/lib/gpr/useful.h" #include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/mpscq.h" #include "src/core/lib/gprpp/mpscq.h"
#include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/cancel_callback.h"
#include "src/core/lib/promise/context.h" #include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/map.h" #include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/pipe.h" #include "src/core/lib/promise/pipe.h"
@ -253,12 +253,12 @@ class Server::RequestMatcherInterface {
// application to explicitly request RPCs and then matching those to incoming // application to explicitly request RPCs and then matching those to incoming
// RPCs, along with a slow path by which incoming RPCs are put on a locked // RPCs, along with a slow path by which incoming RPCs are put on a locked
// pending list if they aren't able to be matched to an application request. // pending list if they aren't able to be matched to an application request.
class Server::RealRequestMatcher : public RequestMatcherInterface { class Server::RealRequestMatcherFilterStack : public RequestMatcherInterface {
public: public:
explicit RealRequestMatcher(Server* server) explicit RealRequestMatcherFilterStack(Server* server)
: server_(server), requests_per_cq_(server->cqs_.size()) {} : server_(server), requests_per_cq_(server->cqs_.size()) {}
~RealRequestMatcher() override { ~RealRequestMatcherFilterStack() override {
for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) { for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) {
GPR_ASSERT(queue.Pop() == nullptr); GPR_ASSERT(queue.Pop() == nullptr);
} }
@ -266,15 +266,8 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
void ZombifyPending() override { void ZombifyPending() override {
while (!pending_.empty()) { while (!pending_.empty()) {
Match( pending_.front().calld->SetState(CallData::CallState::ZOMBIED);
pending_.front(), pending_.front().calld->KillZombie();
[](CallData* calld) {
calld->SetState(CallData::CallState::ZOMBIED);
calld->KillZombie();
},
[](const std::shared_ptr<ActivityWaiter>& w) {
w->Finish(absl::InternalError("Server closed"));
});
pending_.pop(); pending_.pop();
} }
} }
@ -300,39 +293,34 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
// matching calls // matching calls
struct NextPendingCall { struct NextPendingCall {
RequestedCall* rc = nullptr; RequestedCall* rc = nullptr;
PendingCall pending; CallData* pending;
}; };
auto pop_next_pending = [this, request_queue_index] { while (true) {
NextPendingCall pending_call; NextPendingCall pending_call;
{ {
MutexLock lock(&server_->mu_call_); MutexLock lock(&server_->mu_call_);
while (!pending_.empty() &&
pending_.front().Age() > server_->max_time_in_pending_queue_) {
pending_.pop();
}
if (!pending_.empty()) { if (!pending_.empty()) {
pending_call.rc = reinterpret_cast<RequestedCall*>( pending_call.rc = reinterpret_cast<RequestedCall*>(
requests_per_cq_[request_queue_index].Pop()); requests_per_cq_[request_queue_index].Pop());
if (pending_call.rc != nullptr) { if (pending_call.rc != nullptr) {
pending_call.pending = std::move(pending_.front()); pending_call.pending = pending_.front().calld;
pending_.pop(); pending_.pop();
} }
} }
} }
return pending_call; if (pending_call.rc == nullptr) break;
}; if (!pending_call.pending->MaybeActivate()) {
while (true) { // Zombied Call
NextPendingCall next_pending = pop_next_pending(); pending_call.pending->KillZombie();
if (next_pending.rc == nullptr) break; requests_per_cq_[request_queue_index].Push(
Match( &pending_call.rc->mpscq_node);
next_pending.pending, } else {
[&](CallData* calld) { pending_call.pending->Publish(request_queue_index, pending_call.rc);
if (!calld->MaybeActivate()) { }
// Zombied Call
calld->KillZombie();
} else {
calld->Publish(request_queue_index, next_pending.rc);
}
},
[&](const std::shared_ptr<ActivityWaiter>& w) {
w->Finish(server(), request_queue_index, next_pending.rc);
});
} }
} }
} }
@ -369,7 +357,7 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
} }
if (rc == nullptr) { if (rc == nullptr) {
calld->SetState(CallData::CallState::PENDING); calld->SetState(CallData::CallState::PENDING);
pending_.push(calld); pending_.push(PendingCall{calld});
return; return;
} }
} }
@ -377,6 +365,91 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
calld->Publish(cq_idx, rc); calld->Publish(cq_idx, rc);
} }
ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(size_t) override {
Crash("not implemented for filter stack request matcher");
}
Server* server() const final { return server_; }
private:
Server* const server_;
struct PendingCall {
CallData* calld;
Timestamp created = Timestamp::Now();
Duration Age() { return Timestamp::Now() - created; }
};
std::queue<PendingCall> pending_;
std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_;
};
class Server::RealRequestMatcherPromises : public RequestMatcherInterface {
public:
explicit RealRequestMatcherPromises(Server* server)
: server_(server), requests_per_cq_(server->cqs_.size()) {}
~RealRequestMatcherPromises() override {
for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) {
GPR_ASSERT(queue.Pop() == nullptr);
}
}
void ZombifyPending() override {
while (!pending_.empty()) {
pending_.front()->Finish(absl::InternalError("Server closed"));
pending_.pop();
}
}
void KillRequests(grpc_error_handle error) override {
for (size_t i = 0; i < requests_per_cq_.size(); i++) {
RequestedCall* rc;
while ((rc = reinterpret_cast<RequestedCall*>(
requests_per_cq_[i].Pop())) != nullptr) {
server_->FailCall(i, rc, error);
}
}
}
size_t request_queue_count() const override {
return requests_per_cq_.size();
}
void RequestCallWithPossiblePublish(size_t request_queue_index,
RequestedCall* call) override {
if (requests_per_cq_[request_queue_index].Push(&call->mpscq_node)) {
// this was the first queued request: we need to lock and start
// matching calls
struct NextPendingCall {
RequestedCall* rc = nullptr;
PendingCall pending;
};
while (true) {
NextPendingCall pending_call;
{
MutexLock lock(&server_->mu_call_);
if (!pending_.empty()) {
pending_call.rc = reinterpret_cast<RequestedCall*>(
requests_per_cq_[request_queue_index].Pop());
if (pending_call.rc != nullptr) {
pending_call.pending = std::move(pending_.front());
pending_.pop();
}
}
}
if (pending_call.rc == nullptr) break;
if (!pending_call.pending->Finish(server(), request_queue_index,
pending_call.rc)) {
requests_per_cq_[request_queue_index].Push(
&pending_call.rc->mpscq_node);
}
}
}
}
void MatchOrQueue(size_t, CallData*) override {
Crash("not implemented for promises");
}
ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest( ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(
size_t start_request_queue_index) override { size_t start_request_queue_index) override {
for (size_t i = 0; i < requests_per_cq_.size(); i++) { for (size_t i = 0; i < requests_per_cq_.size(); i++) {
@ -396,25 +469,36 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
size_t cq_idx = 0; size_t cq_idx = 0;
size_t loop_count; size_t loop_count;
{ {
std::vector<std::shared_ptr<ActivityWaiter>> removed_pending;
MutexLock lock(&server_->mu_call_); MutexLock lock(&server_->mu_call_);
while (!pending_.empty() &&
pending_.front()->Age() > server_->max_time_in_pending_queue_) {
removed_pending.push_back(std::move(pending_.front()));
pending_.pop();
}
for (loop_count = 0; loop_count < requests_per_cq_.size(); loop_count++) { for (loop_count = 0; loop_count < requests_per_cq_.size(); loop_count++) {
cq_idx = cq_idx =
(start_request_queue_index + loop_count) % requests_per_cq_.size(); (start_request_queue_index + loop_count) % requests_per_cq_.size();
rc = reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].Pop()); rc = reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].Pop());
if (rc != nullptr) { if (rc != nullptr) break;
break;
}
} }
if (rc == nullptr) { if (rc == nullptr) {
if (server_->pending_backlog_protector_.Reject(pending_.size(),
server_->bitgen_)) {
return Immediate(absl::ResourceExhaustedError(
"Too many pending requests for this server"));
}
auto w = std::make_shared<ActivityWaiter>( auto w = std::make_shared<ActivityWaiter>(
Activity::current()->MakeOwningWaker()); Activity::current()->MakeOwningWaker());
pending_.push(w); pending_.push(w);
return [w]() -> Poll<absl::StatusOr<MatchResult>> { return OnCancel(
std::unique_ptr<absl::StatusOr<MatchResult>> r( [w]() -> Poll<absl::StatusOr<MatchResult>> {
w->result.exchange(nullptr, std::memory_order_acq_rel)); std::unique_ptr<absl::StatusOr<MatchResult>> r(
if (r == nullptr) return Pending{}; w->result.exchange(nullptr, std::memory_order_acq_rel));
return std::move(*r); if (r == nullptr) return Pending{};
}; return std::move(*r);
},
[w]() { w->Expire(); });
} }
} }
return Immediate(MatchResult(server(), cq_idx, rc)); return Immediate(MatchResult(server(), cq_idx, rc));
@ -425,23 +509,40 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
private: private:
Server* const server_; Server* const server_;
struct ActivityWaiter { struct ActivityWaiter {
using ResultType = absl::StatusOr<MatchResult>;
explicit ActivityWaiter(Waker waker) : waker(std::move(waker)) {} explicit ActivityWaiter(Waker waker) : waker(std::move(waker)) {}
~ActivityWaiter() { delete result.load(std::memory_order_acquire); } ~ActivityWaiter() { delete result.load(std::memory_order_acquire); }
void Finish(absl::Status status) { void Finish(absl::Status status) {
result.store(new absl::StatusOr<MatchResult>(std::move(status)), delete result.exchange(new ResultType(std::move(status)),
std::memory_order_release); std::memory_order_acq_rel);
waker.Wakeup(); waker.WakeupAsync();
} }
void Finish(Server* server, size_t cq_idx, RequestedCall* requested_call) { // Returns true if requested_call consumed, false otherwise.
result.store(new absl::StatusOr<MatchResult>( GRPC_MUST_USE_RESULT bool Finish(Server* server, size_t cq_idx,
MatchResult(server, cq_idx, requested_call)), RequestedCall* requested_call) {
std::memory_order_release); ResultType* expected = nullptr;
waker.Wakeup(); ResultType* new_value =
new ResultType(MatchResult(server, cq_idx, requested_call));
if (!result.compare_exchange_strong(expected, new_value,
std::memory_order_acq_rel,
std::memory_order_acq_rel)) {
GPR_ASSERT(new_value->value().TakeCall() == requested_call);
delete new_value;
return false;
}
waker.WakeupAsync();
return true;
} }
void Expire() {
delete result.exchange(new ResultType(absl::CancelledError()),
std::memory_order_acq_rel);
}
Duration Age() { return Timestamp::Now() - created; }
Waker waker; Waker waker;
std::atomic<absl::StatusOr<MatchResult>*> result{nullptr}; std::atomic<ResultType*> result{nullptr};
const Timestamp created = Timestamp::Now();
}; };
using PendingCall = absl::variant<CallData*, std::shared_ptr<ActivityWaiter>>; using PendingCall = std::shared_ptr<ActivityWaiter>;
std::queue<PendingCall> pending_; std::queue<PendingCall> pending_;
std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_; std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_;
}; };
@ -716,6 +817,15 @@ void Server::AddListener(OrphanablePtr<ListenerInterface> listener) {
} }
void Server::Start() { void Server::Start() {
auto make_real_request_matcher =
[this]() -> std::unique_ptr<RequestMatcherInterface> {
if (IsPromiseBasedServerCallEnabled()) {
return std::make_unique<RealRequestMatcherPromises>(this);
} else {
return std::make_unique<RealRequestMatcherFilterStack>(this);
}
};
started_ = true; started_ = true;
for (grpc_completion_queue* cq : cqs_) { for (grpc_completion_queue* cq : cqs_) {
if (grpc_cq_can_listen(cq)) { if (grpc_cq_can_listen(cq)) {
@ -723,11 +833,11 @@ void Server::Start() {
} }
} }
if (unregistered_request_matcher_ == nullptr) { if (unregistered_request_matcher_ == nullptr) {
unregistered_request_matcher_ = std::make_unique<RealRequestMatcher>(this); unregistered_request_matcher_ = make_real_request_matcher();
} }
for (std::unique_ptr<RegisteredMethod>& rm : registered_methods_) { for (std::unique_ptr<RegisteredMethod>& rm : registered_methods_) {
if (rm->matcher == nullptr) { if (rm->matcher == nullptr) {
rm->matcher = std::make_unique<RealRequestMatcher>(this); rm->matcher = make_real_request_matcher();
} }
} }
{ {

@ -22,6 +22,7 @@
#include <stddef.h> #include <stddef.h>
#include <stdint.h> #include <stdint.h>
#include <algorithm>
#include <atomic> #include <atomic>
#include <functional> #include <functional>
#include <list> #include <list>
@ -34,6 +35,7 @@
#include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h" #include "absl/container/flat_hash_set.h"
#include "absl/hash/hash.h" #include "absl/hash/hash.h"
#include "absl/random/random.h"
#include "absl/status/statusor.h" #include "absl/status/statusor.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
@ -42,6 +44,7 @@
#include <grpc/slice.h> #include <grpc/slice.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
#include "src/core/lib/backoff/random_early_detection.h"
#include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h" #include "src/core/lib/channel/channel_fwd.h"
@ -66,6 +69,10 @@
#include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport.h"
#define GRPC_ARG_SERVER_MAX_PENDING_REQUESTS "grpc.server.max_pending_requests"
#define GRPC_ARG_SERVER_MAX_PENDING_REQUESTS_HARD_LIMIT \
"grpc.server.max_pending_requests_hard_limit"
namespace grpc_core { namespace grpc_core {
extern TraceFlag grpc_server_channel_trace; extern TraceFlag grpc_server_channel_trace;
@ -225,7 +232,8 @@ class Server : public InternallyRefCounted<Server>,
}; };
class RequestMatcherInterface; class RequestMatcherInterface;
class RealRequestMatcher; class RealRequestMatcherFilterStack;
class RealRequestMatcherPromises;
class AllocatingRequestMatcherBase; class AllocatingRequestMatcherBase;
class AllocatingRequestMatcherBatch; class AllocatingRequestMatcherBatch;
class AllocatingRequestMatcherRegistered; class AllocatingRequestMatcherRegistered;
@ -505,6 +513,17 @@ class Server : public InternallyRefCounted<Server>,
bool shutdown_published_ ABSL_GUARDED_BY(mu_global_) = false; bool shutdown_published_ ABSL_GUARDED_BY(mu_global_) = false;
std::vector<ShutdownTag> shutdown_tags_ ABSL_GUARDED_BY(mu_global_); std::vector<ShutdownTag> shutdown_tags_ ABSL_GUARDED_BY(mu_global_);
RandomEarlyDetection pending_backlog_protector_ ABSL_GUARDED_BY(mu_call_){
static_cast<uint64_t>(
std::max(0, channel_args_.GetInt(GRPC_ARG_SERVER_MAX_PENDING_REQUESTS)
.value_or(1000))),
static_cast<uint64_t>(std::max(
0,
channel_args_.GetInt(GRPC_ARG_SERVER_MAX_PENDING_REQUESTS_HARD_LIMIT)
.value_or(3000)))};
Duration max_time_in_pending_queue_{Duration::Seconds(30)};
absl::BitGen bitgen_ ABSL_GUARDED_BY(mu_call_);
std::list<ChannelData*> channels_; std::list<ChannelData*> channels_;
std::list<Listener> listeners_; std::list<Listener> listeners_;

@ -29,6 +29,7 @@
#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h" #include "src/core/lib/channel/context.h"
#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/error.h"
@ -100,6 +101,9 @@ CORE_END2END_TEST(CoreEnd2endTest, FilterContext) {
CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) { CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
for (auto type : {GRPC_CLIENT_CHANNEL, GRPC_CLIENT_SUBCHANNEL, for (auto type : {GRPC_CLIENT_CHANNEL, GRPC_CLIENT_SUBCHANNEL,
GRPC_CLIENT_DIRECT_CHANNEL, GRPC_SERVER_CHANNEL}) { GRPC_CLIENT_DIRECT_CHANNEL, GRPC_SERVER_CHANNEL}) {
if (type == GRPC_SERVER_CHANNEL && IsPromiseBasedServerCallEnabled()) {
continue;
}
builder->channel_init()->RegisterFilter(type, &test_filter); builder->channel_init()->RegisterFilter(type, &test_filter);
} }
}); });

Loading…
Cancel
Save