[chttp2] Limit request count before receiving settings ack (#34638)

Previously chttp2 would allow infinite requests prior to a settings ack
- as the agreed upon limit for requests in that state is infinite.
Instead, after MAX_CONCURRENT_STREAMS requests have been attempted,
start blanket cancelling requests until the settings ack is received.
This can be done efficiently without allocating request state
structures.
pull/34639/head
Craig Tiller 1 year ago committed by GitHub
parent 98c7e8fb12
commit 954b285dd2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      bazel/experiments.bzl
  2. 6
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  3. 3
      src/core/ext/transport/chttp2/transport/internal.h
  4. 34
      src/core/ext/transport/chttp2/transport/parsing.cc
  5. 24
      src/core/lib/experiments/experiments.cc
  6. 12
      src/core/lib/experiments/experiments.h
  7. 6
      src/core/lib/experiments/experiments.yaml
  8. 2
      src/core/lib/experiments/rollouts.yaml
  9. 18
      test/core/bad_client/bad_client.cc
  10. 1
      test/core/bad_client/bad_client.h
  11. 1
      test/core/bad_client/generate_tests.bzl
  12. 98
      test/core/bad_client/tests/initial_settings_frame.cc

@ -64,6 +64,9 @@ EXPERIMENTS = {
],
},
"on": {
"bad_client_test": [
"block_excessive_requests_before_settings_ack",
],
"cpp_lb_end2end_test": [
"pick_first_happy_eyeballs",
"round_robin_delegate_to_pick_first",
@ -134,6 +137,9 @@ EXPERIMENTS = {
],
},
"on": {
"bad_client_test": [
"block_excessive_requests_before_settings_ack",
],
"cpp_lb_end2end_test": [
"pick_first_happy_eyeballs",
"round_robin_delegate_to_pick_first",
@ -214,6 +220,9 @@ EXPERIMENTS = {
],
},
"on": {
"bad_client_test": [
"block_excessive_requests_before_settings_ack",
],
"cpp_lb_end2end_test": [
"pick_first_happy_eyeballs",
"round_robin_delegate_to_pick_first",

@ -618,6 +618,12 @@ grpc_chttp2_transport::grpc_chttp2_transport(
read_channel_args(this, channel_args, is_client);
// Initially allow *UP TO* MAX_CONCURRENT_STREAMS incoming before we start
// blanket cancelling them.
num_incoming_streams_before_settings_ack =
settings[GRPC_LOCAL_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS];
grpc_core::ExecCtx exec_ctx;
combiner->Run(
grpc_core::InitTransportClosure<init_keepalive_pings_if_enabled_locked>(

@ -327,6 +327,9 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
/// last new stream id
uint32_t last_new_stream_id = 0;
/// Number of incoming streams allowed before a settings ACK is required
uint32_t num_incoming_streams_before_settings_ack = 0;
/// ping queues for various ping insertion points
grpc_core::Chttp2PingAbusePolicy ping_abuse_policy;
grpc_core::Chttp2PingRatePolicy ping_rate_policy;

@ -22,6 +22,7 @@
#include <string.h>
#include <initializer_list>
#include <limits>
#include <string>
#include "absl/base/attributes.h"
@ -33,6 +34,7 @@
#include "absl/strings/string_view.h"
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/log.h>
#include "src/core/ext/transport/chttp2/transport/flow_control.h"
@ -53,6 +55,7 @@
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/closure.h"
@ -639,7 +642,25 @@ static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t,
t, std::string(t->peer_string.as_string_view()).c_str(),
t->incoming_stream_id, t->last_new_stream_id));
return init_header_skip_frame_parser(t, priority_type, is_eoh);
} else if (grpc_core::IsBlockExcessiveRequestsBeforeSettingsAckEnabled() &&
t->num_incoming_streams_before_settings_ack == 0) {
GRPC_CHTTP2_IF_TRACING(gpr_log(
GPR_ERROR,
"transport:%p SERVER peer:%s rejecting grpc_chttp2_stream id=%d, "
"last grpc_chttp2_stream id=%d before settings have been "
"acknowledged",
t, std::string(t->peer_string.as_string_view()).c_str(),
t->incoming_stream_id, t->last_new_stream_id));
++t->num_pending_induced_frames;
grpc_slice_buffer_add(
&t->qbuf,
grpc_chttp2_rst_stream_create(t->incoming_stream_id,
GRPC_HTTP2_ENHANCE_YOUR_CALM, nullptr));
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
t->last_new_stream_id = t->incoming_stream_id;
return init_header_skip_frame_parser(t, priority_type, is_eoh);
}
--t->num_incoming_streams_before_settings_ack;
t->last_new_stream_id = t->incoming_stream_id;
s = t->incoming_stream =
grpc_chttp2_parsing_accept_stream(t, t->incoming_stream_id);
@ -650,9 +671,12 @@ static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t,
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_trace_chttp2_new_stream)) {
gpr_log(GPR_INFO, "[t:%p fd:%d peer:%s] Accepting new stream", t,
grpc_endpoint_get_fd(t->ep),
std::string(t->peer_string.as_string_view()).c_str());
gpr_log(GPR_INFO,
"[t:%p fd:%d peer:%s] Accepting new stream; "
"num_incoming_streams_before_settings_ack=%u",
t, grpc_endpoint_get_fd(t->ep),
std::string(t->peer_string.as_string_view()).c_str(),
t->num_incoming_streams_before_settings_ack);
}
if (t->channelz_socket != nullptr) {
t->channelz_socket->RecordStreamStartedFromRemote();
@ -798,6 +822,10 @@ static grpc_error_handle init_settings_frame_parser(grpc_chttp2_transport* t) {
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
t, nullptr);
t->sent_local_settings = false;
// This is more streams than can be started in http2, so setting this
// effictively removes the limit for the rest of the connection.
t->num_incoming_streams_before_settings_ack =
std::numeric_limits<uint32_t>::max();
}
t->parser = grpc_chttp2_transport::Parser{
"settings", grpc_chttp2_settings_parser_parse, &t->simple.settings};

@ -134,6 +134,10 @@ const char* const description_work_serializer_clears_time_cache =
"Have the work serializer clear the time cache when it dispatches work.";
const char* const additional_constraints_work_serializer_clears_time_cache =
"{}";
const char* const description_block_excessive_requests_before_settings_ack =
"If set, block excessive requests before receiving SETTINGS ACK.";
const char* const
additional_constraints_block_excessive_requests_before_settings_ack = "{}";
const char* const description_ping_on_rst_stream =
"Send a ping on receiving some RST_STREAM frames on the server (proportion "
"configurable via grpc.http2.ping_on_rst_stream_percent channel arg).";
@ -216,6 +220,10 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"work_serializer_clears_time_cache",
description_work_serializer_clears_time_cache,
additional_constraints_work_serializer_clears_time_cache, true, true},
{"block_excessive_requests_before_settings_ack",
description_block_excessive_requests_before_settings_ack,
additional_constraints_block_excessive_requests_before_settings_ack, true,
true},
{"ping_on_rst_stream", description_ping_on_rst_stream,
additional_constraints_ping_on_rst_stream, true, true},
};
@ -336,6 +344,10 @@ const char* const description_work_serializer_clears_time_cache =
"Have the work serializer clear the time cache when it dispatches work.";
const char* const additional_constraints_work_serializer_clears_time_cache =
"{}";
const char* const description_block_excessive_requests_before_settings_ack =
"If set, block excessive requests before receiving SETTINGS ACK.";
const char* const
additional_constraints_block_excessive_requests_before_settings_ack = "{}";
const char* const description_ping_on_rst_stream =
"Send a ping on receiving some RST_STREAM frames on the server (proportion "
"configurable via grpc.http2.ping_on_rst_stream_percent channel arg).";
@ -418,6 +430,10 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"work_serializer_clears_time_cache",
description_work_serializer_clears_time_cache,
additional_constraints_work_serializer_clears_time_cache, true, true},
{"block_excessive_requests_before_settings_ack",
description_block_excessive_requests_before_settings_ack,
additional_constraints_block_excessive_requests_before_settings_ack, true,
true},
{"ping_on_rst_stream", description_ping_on_rst_stream,
additional_constraints_ping_on_rst_stream, true, true},
};
@ -538,6 +554,10 @@ const char* const description_work_serializer_clears_time_cache =
"Have the work serializer clear the time cache when it dispatches work.";
const char* const additional_constraints_work_serializer_clears_time_cache =
"{}";
const char* const description_block_excessive_requests_before_settings_ack =
"If set, block excessive requests before receiving SETTINGS ACK.";
const char* const
additional_constraints_block_excessive_requests_before_settings_ack = "{}";
const char* const description_ping_on_rst_stream =
"Send a ping on receiving some RST_STREAM frames on the server (proportion "
"configurable via grpc.http2.ping_on_rst_stream_percent channel arg).";
@ -620,6 +640,10 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"work_serializer_clears_time_cache",
description_work_serializer_clears_time_cache,
additional_constraints_work_serializer_clears_time_cache, true, true},
{"block_excessive_requests_before_settings_ack",
description_block_excessive_requests_before_settings_ack,
additional_constraints_block_excessive_requests_before_settings_ack, true,
true},
{"ping_on_rst_stream", description_ping_on_rst_stream,
additional_constraints_ping_on_rst_stream, true, true},
};

@ -105,6 +105,8 @@ inline bool IsCallStatusOverrideOnCancellationEnabled() {
}
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_BLOCK_EXCESSIVE_REQUESTS_BEFORE_SETTINGS_ACK
inline bool IsBlockExcessiveRequestsBeforeSettingsAckEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PING_ON_RST_STREAM
inline bool IsPingOnRstStreamEnabled() { return true; }
@ -157,6 +159,8 @@ inline bool IsCallStatusOverrideOnCancellationEnabled() {
}
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_BLOCK_EXCESSIVE_REQUESTS_BEFORE_SETTINGS_ACK
inline bool IsBlockExcessiveRequestsBeforeSettingsAckEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PING_ON_RST_STREAM
inline bool IsPingOnRstStreamEnabled() { return true; }
@ -209,6 +213,8 @@ inline bool IsCallStatusOverrideOnCancellationEnabled() {
}
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_BLOCK_EXCESSIVE_REQUESTS_BEFORE_SETTINGS_ACK
inline bool IsBlockExcessiveRequestsBeforeSettingsAckEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PING_ON_RST_STREAM
inline bool IsPingOnRstStreamEnabled() { return true; }
#endif
@ -245,6 +251,7 @@ enum ExperimentIds {
kExperimentIdRegisteredMethodLookupInTransport,
kExperimentIdCallStatusOverrideOnCancellation,
kExperimentIdWorkSerializerClearsTimeCache,
kExperimentIdBlockExcessiveRequestsBeforeSettingsAck,
kExperimentIdPingOnRstStream,
kNumExperiments
};
@ -368,6 +375,11 @@ inline bool IsCallStatusOverrideOnCancellationEnabled() {
inline bool IsWorkSerializerClearsTimeCacheEnabled() {
return IsExperimentEnabled(kExperimentIdWorkSerializerClearsTimeCache);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_BLOCK_EXCESSIVE_REQUESTS_BEFORE_SETTINGS_ACK
inline bool IsBlockExcessiveRequestsBeforeSettingsAckEnabled() {
return IsExperimentEnabled(
kExperimentIdBlockExcessiveRequestsBeforeSettingsAck);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_PING_ON_RST_STREAM
inline bool IsPingOnRstStreamEnabled() {
return IsExperimentEnabled(kExperimentIdPingOnRstStream);

@ -228,6 +228,12 @@
expiry: 2024/01/01
owner: ctiller@google.com
test_tags: []
- name: block_excessive_requests_before_settings_ack
description:
If set, block excessive requests before receiving SETTINGS ACK.
expiry: 2024/03/03
owner: ctiller@google.com
test_tags: [bad_client_test]
- name: ping_on_rst_stream
description:
Send a ping on receiving some RST_STREAM frames on the server

@ -106,5 +106,7 @@
default: debug
- name: work_serializer_clears_time_cache
default: true
- name: block_excessive_requests_before_settings_ack
default: true
- name: ping_on_rst_stream
default: true

@ -21,6 +21,7 @@
#include <inttypes.h>
#include <limits.h>
#include <grpc/impl/channel_arg_names.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -28,6 +29,7 @@
#include <grpc/support/time.h>
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/config/core_configuration.h"
@ -205,12 +207,16 @@ void grpc_run_bad_client_test(
grpc_completion_queue* shutdown_cq;
grpc_completion_queue* client_cq;
const auto server_args = grpc_core::ChannelArgs().Set(
GRPC_ARG_MAX_CONCURRENT_STREAMS,
(flags & GRPC_BAD_CLIENT_MAX_CONCURRENT_REQUESTS_OF_ONE) ? 1 : 10000);
// Init grpc
grpc_init();
sfd = grpc_iomgr_create_endpoint_pair("fixture", nullptr);
// Create server, completion events
a.server = grpc_server_create(nullptr, nullptr);
a.server = grpc_server_create(server_args.ToC().get(), nullptr);
a.cq = grpc_completion_queue_create_for_next(nullptr);
client_cq = grpc_completion_queue_create_for_next(nullptr);
grpc_server_register_completion_queue(a.server, a.cq, nullptr);
@ -219,11 +225,11 @@ void grpc_run_bad_client_test(
GRPC_BAD_CLIENT_REGISTERED_HOST,
GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, 0);
grpc_server_start(a.server);
transport =
grpc_create_chttp2_transport(grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr),
sfd.server, false);
transport = grpc_create_chttp2_transport(
grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(server_args.ToC().get()),
sfd.server, false);
server_setup_transport(&a, transport);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);

@ -47,6 +47,7 @@ struct grpc_bad_client_arg {
// Flags for grpc_run_bad_client_test
#define GRPC_BAD_CLIENT_DISCONNECT 1
#define GRPC_BAD_CLIENT_LARGE_REQUEST 2
#define GRPC_BAD_CLIENT_MAX_CONCURRENT_REQUESTS_OF_ONE 4
// Test runner.
//

@ -57,6 +57,7 @@ def grpc_bad_client_tests():
name = "%s_bad_client_test" % t,
srcs = ["tests/%s.cc" % t],
deps = [":bad_client_test"],
tags = ["bad_client_test"],
external_deps = [
"gtest",
],

@ -16,15 +16,75 @@
//
//
#include <string>
#include "absl/strings/str_cat.h"
#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/support/log.h>
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/surface/server.h"
#include "test/core/bad_client/bad_client.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/util/test_config.h"
#define PFX_STR "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define ONE_SETTING_HDR "\x00\x00\x06\x04\x00\x00\x00\x00\x00"
#define ZERO_SETTING_HDR "\x00\x00\x00\x04\x00\x00\x00\x00\x00"
#define SETTING_ACK "\x00\x00\x00\x04\x01\x00\x00\x00\x00"
#define RST_STREAM_1 "\x00\x00\x04\x03\x00\x00\x00\x00\x01\x00\x00\x00\x00"
#define RST_STREAM_3 "\x00\x00\x04\x03\x00\x00\x00\x00\x03\x00\x00\x00\x00"
#define FOOBAR_0 \
"\x00\x00\xca\x01\x04\x00\x00\x00\x01" /* headers: generated from \
simple_request.headers in this \
directory */ \
"\x10\x05:path\x09/foo/bar0" \
"\x10\x07:scheme\x04http" \
"\x10\x07:method\x04POST" \
"\x10\x0a:authority\x09localhost" \
"\x10\x0c" \
"content-type\x10" \
"application/grpc" \
"\x10\x14grpc-accept-encoding\x15" \
"deflate,identity,gzip" \
"\x10\x02te\x08trailers" \
"\x10\x0auser-agent\"bad-client grpc-c/0.12.0.0 (linux)"
#define FOOBAR_1 \
"\x00\x00\xca\x01\x04\x00\x00\x00\x05" /* headers: generated from \
simple_request.headers in this \
directory */ \
"\x10\x05:path\x09/foo/bar1" \
"\x10\x07:scheme\x04http" \
"\x10\x07:method\x04POST" \
"\x10\x0a:authority\x09localhost" \
"\x10\x0c" \
"content-type\x10" \
"application/grpc" \
"\x10\x14grpc-accept-encoding\x15" \
"deflate,identity,gzip" \
"\x10\x02te\x08trailers" \
"\x10\x0auser-agent\"bad-client grpc-c/0.12.0.0 (linux)"
#define FOOBAR_2 \
"\x00\x00\xca\x01\x04\x00\x00\x00\x03" /* headers: generated from \
simple_request.headers in this \
directory */ \
"\x10\x05:path\x09/foo/bar2" \
"\x10\x07:scheme\x04http" \
"\x10\x07:method\x04POST" \
"\x10\x0a:authority\x09localhost" \
"\x10\x0c" \
"content-type\x10" \
"application/grpc" \
"\x10\x14grpc-accept-encoding\x15" \
"deflate,identity,gzip" \
"\x10\x02te\x08trailers" \
"\x10\x0auser-agent\"bad-client grpc-c/0.12.0.0 (linux)"
static void verifier(grpc_server* server, grpc_completion_queue* cq,
void* /*registered_method*/) {
@ -35,6 +95,36 @@ static void verifier(grpc_server* server, grpc_completion_queue* cq,
}
}
static void single_request_verifier(grpc_server* server,
grpc_completion_queue* cq,
void* /*registered_method*/) {
grpc_call_error error;
grpc_call* s;
grpc_call_details call_details;
grpc_core::CqVerifier cqv(cq);
grpc_metadata_array request_metadata_recv;
for (int i = 0; i < 2; i++) {
grpc_call_details_init(&call_details);
grpc_metadata_array_init(&request_metadata_recv);
error = grpc_server_request_call(server, &s, &call_details,
&request_metadata_recv, cq, cq,
grpc_core::CqVerifier::tag(101));
GPR_ASSERT(GRPC_CALL_OK == error);
cqv.Expect(grpc_core::CqVerifier::tag(101), true);
cqv.Verify();
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.host, "localhost"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method,
absl::StrCat("/foo/bar", i).c_str()));
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_call_unref(s);
}
}
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();
@ -111,6 +201,14 @@ int main(int argc, char** argv) {
PFX_STR ONE_SETTING_HDR "\x00\x99\x00\x00\x00\x00",
GRPC_BAD_CLIENT_DISCONNECT);
// too many requests before the settings ack is sent should be cancelled
if (grpc_core::IsBlockExcessiveRequestsBeforeSettingsAckEnabled()) {
GRPC_RUN_BAD_CLIENT_TEST(single_request_verifier, nullptr,
PFX_STR ZERO_SETTING_HDR FOOBAR_0 FOOBAR_2
SETTING_ACK RST_STREAM_1 RST_STREAM_3 FOOBAR_1,
GRPC_BAD_CLIENT_MAX_CONCURRENT_REQUESTS_OF_ONE);
}
grpc_shutdown();
return 0;
}

Loading…
Cancel
Save