diff --git a/BUILD b/BUILD index 2588e85212e..9e4a4c70b87 100644 --- a/BUILD +++ b/BUILD @@ -4829,7 +4829,6 @@ grpc_cc_library( "//src/core:iomgr_fwd", "//src/core:iomgr_port", "//src/core:match", - "//src/core:max_concurrent_streams_policy", "//src/core:memory_quota", "//src/core:metadata_batch", "//src/core:metadata_info", diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl index 88bead1ccad..64a197963a1 100644 --- a/bazel/experiments.bzl +++ b/bazel/experiments.bzl @@ -31,7 +31,6 @@ EXPERIMENT_ENABLES = { "peer_state_based_framing": "peer_state_based_framing", "pick_first_new": "pick_first_new", "promise_based_inproc_transport": "promise_based_inproc_transport", - "rstpit": "rstpit", "schedule_cancellation_over_write": "schedule_cancellation_over_write", "server_privacy": "server_privacy", "tcp_frame_size_tuning": "tcp_frame_size_tuning", @@ -60,7 +59,6 @@ EXPERIMENTS = { "flow_control_test": [ "multiping", "peer_state_based_framing", - "rstpit", "tcp_frame_size_tuning", "tcp_rcv_lowat", ], @@ -108,7 +106,6 @@ EXPERIMENTS = { "flow_control_test": [ "multiping", "peer_state_based_framing", - "rstpit", "tcp_frame_size_tuning", "tcp_rcv_lowat", ], @@ -150,7 +147,6 @@ EXPERIMENTS = { "flow_control_test": [ "multiping", "peer_state_based_framing", - "rstpit", "tcp_frame_size_tuning", "tcp_rcv_lowat", ], diff --git a/src/core/BUILD b/src/core/BUILD index 1acce192d0d..442d5c3f63e 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -7188,23 +7188,6 @@ grpc_cc_library( ], ) -grpc_cc_library( - name = "max_concurrent_streams_policy", - srcs = [ - "ext/transport/chttp2/transport/max_concurrent_streams_policy.cc", - ], - hdrs = [ - "ext/transport/chttp2/transport/max_concurrent_streams_policy.h", - ], - external_deps = [ - "absl/log:check", - ], - deps = [ - "//:gpr", - "//:gpr_platform", - ], -) - grpc_cc_library( name = "huffsyms", srcs = [ diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index cddcecf1b5a..0e64643cbf6 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -65,7 +65,6 @@ #include "src/core/ext/transport/chttp2/transport/http2_settings.h" #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/legacy_frame.h" -#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" #include "src/core/ext/transport/chttp2/transport/ping_abuse_policy.h" #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h" #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h" @@ -547,7 +546,6 @@ static void read_channel_args(grpc_chttp2_transport* t, value = channel_args.GetInt(GRPC_ARG_MAX_CONCURRENT_STREAMS).value_or(-1); if (value >= 0) { t->settings.mutable_local().SetMaxConcurrentStreams(value); - t->max_concurrent_streams_policy.SetTarget(value); } } else if (channel_args.Contains(GRPC_ARG_MAX_CONCURRENT_STREAMS)) { VLOG(2) << GRPC_ARG_MAX_CONCURRENT_STREAMS diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 5aac16542dc..64f398807ba 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -52,7 +52,6 @@ #include "src/core/ext/transport/chttp2/transport/hpack_parser.h" #include "src/core/ext/transport/chttp2/transport/http2_settings.h" #include "src/core/ext/transport/chttp2/transport/legacy_frame.h" -#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" #include "src/core/ext/transport/chttp2/transport/ping_abuse_policy.h" #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h" #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h" @@ -383,8 +382,6 @@ struct grpc_chttp2_transport final : public grpc_core::FilterStackTransport, grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid; grpc_closure retry_initiate_ping_locked; - grpc_core::Chttp2MaxConcurrentStreamsPolicy max_concurrent_streams_policy; - /// ping acks size_t ping_ack_count = 0; size_t ping_ack_capacity = 0; diff --git a/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc b/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc deleted file mode 100644 index 355e40898a6..00000000000 --- a/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2023 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" - -#include - -#include "absl/log/check.h" - -#include -#include - -namespace grpc_core { - -void Chttp2MaxConcurrentStreamsPolicy::AddDemerit() { - ++new_demerits_; - ++unacked_demerits_; -} - -void Chttp2MaxConcurrentStreamsPolicy::FlushedSettings() { - sent_demerits_ += std::exchange(new_demerits_, 0); -} - -void Chttp2MaxConcurrentStreamsPolicy::AckLastSend() { - CHECK(unacked_demerits_ >= sent_demerits_); - unacked_demerits_ -= std::exchange(sent_demerits_, 0); -} - -uint32_t Chttp2MaxConcurrentStreamsPolicy::AdvertiseValue() const { - if (target_ < unacked_demerits_) return 0; - return target_ - unacked_demerits_; -} - -} // namespace grpc_core diff --git a/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h b/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h deleted file mode 100644 index a2a56d2bfe5..00000000000 --- a/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2023 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_MAX_CONCURRENT_STREAMS_POLICY_H -#define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_MAX_CONCURRENT_STREAMS_POLICY_H - -#include -#include - -#include - -namespace grpc_core { - -class Chttp2MaxConcurrentStreamsPolicy { - public: - // Set the target number of concurrent streams. - // If everything is idle we should advertise this number. - void SetTarget(uint32_t target) { target_ = target; } - - // Add one demerit to the current target. - // We need to do one full settings round trip after this to clear this - // demerit. - // It will reduce our advertised max concurrent streams by one. - void AddDemerit(); - - // Notify the policy that we've sent a settings frame. - // Newly added demerits since the last settings frame was sent will be cleared - // once that settings frame is acknowledged. - void FlushedSettings(); - - // Notify the policy that we've received an acknowledgement for the last - // settings frame we sent. - void AckLastSend(); - - // Returns what we should advertise as max concurrent streams. - uint32_t AdvertiseValue() const; - - private: - uint32_t target_ = std::numeric_limits::max(); - // Demerit flow: - // When we add a demerit, we add to both new & unacked. - // When we flush settings, we move new to sent. - // When we ack settings, we remove what we sent from unacked. - // eg: - // we add 10 demerits - now new=10, sent=0, unacked=10 - // we send settings - now new=0, sent=10, unacked=10 - // we add 5 demerits - now new=5, sent=10, unacked=15 - // we get the settings ack - now new=5, sent=0, unacked=5 - uint32_t new_demerits_ = 0; - uint32_t sent_demerits_ = 0; - uint32_t unacked_demerits_ = 0; -}; - -} // namespace grpc_core - -#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_MAX_CONCURRENT_STREAMS_POLICY_H diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 8eefef09ec3..070294b2cf4 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -55,7 +55,6 @@ #include "src/core/ext/transport/chttp2/transport/http2_settings.h" #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/legacy_frame.h" -#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h" #include "src/core/lib/backoff/random_early_detection.h" #include "src/core/lib/debug/trace.h" @@ -650,7 +649,7 @@ static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t, } else if (GPR_UNLIKELY( t->max_concurrent_streams_overload_protection && t->streams_allocated.load(std::memory_order_relaxed) > - t->max_concurrent_streams_policy.AdvertiseValue())) { + t->settings.local().max_concurrent_streams())) { // We have more streams allocated than we'd like, so apply some pushback // by refusing this stream. ++t->num_pending_induced_frames; @@ -659,13 +658,12 @@ static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t, GRPC_HTTP2_REFUSED_STREAM, nullptr)); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM); return init_header_skip_frame_parser(t, priority_type, is_eoh); - } else if (GPR_UNLIKELY( - t->stream_map.size() >= - t->max_concurrent_streams_policy.AdvertiseValue() && - grpc_core::RandomEarlyDetection( - t->max_concurrent_streams_policy.AdvertiseValue(), - t->settings.acked().max_concurrent_streams()) - .Reject(t->stream_map.size(), t->bitgen))) { + } else if (GPR_UNLIKELY(t->stream_map.size() >= + t->settings.local().max_concurrent_streams() && + grpc_core::RandomEarlyDetection( + t->settings.local().max_concurrent_streams(), + t->settings.acked().max_concurrent_streams()) + .Reject(t->stream_map.size(), t->bitgen))) { // We are under the limit of max concurrent streams for the current // setting, but are over the next value that will be advertised. // Apply some backpressure by randomly not accepting new streams. @@ -827,9 +825,6 @@ static grpc_error_handle init_rst_stream_parser(grpc_chttp2_transport* t) { s->call_tracer_wrapper.RecordIncomingBytes({9, 0, 0}); t->parser = grpc_chttp2_transport::Parser{ "rst_stream", grpc_chttp2_rst_stream_parser_parse, &t->simple.rst_stream}; - if (!t->is_client && grpc_core::IsRstpitEnabled()) { - t->max_concurrent_streams_policy.AddDemerit(); - } return absl::OkStatus(); } @@ -854,7 +849,6 @@ static grpc_error_handle init_settings_frame_parser(grpc_chttp2_transport* t) { return err; } if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) { - t->max_concurrent_streams_policy.AckLastSend(); if (!t->settings.AckLastSend()) { return GRPC_ERROR_CREATE("Received unexpected settings ack"); } diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 1095bfc2c40..029608120cc 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -49,7 +49,6 @@ #include "src/core/ext/transport/chttp2/transport/http2_settings.h" #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/legacy_frame.h" -#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h" #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h" #include "src/core/ext/transport/chttp2/transport/write_size_policy.h" @@ -260,8 +259,6 @@ class WriteContext { } void FlushSettings() { - t_->settings.mutable_local().SetMaxConcurrentStreams( - t_->max_concurrent_streams_policy.AdvertiseValue()); auto update = t_->settings.MaybeSendUpdate(); if (update.has_value()) { grpc_core::Http2Frame frame(std::move(*update)); @@ -280,7 +277,6 @@ class WriteContext { }); } t_->flow_control.FlushedSettings(); - t_->max_concurrent_streams_policy.FlushedSettings(); grpc_core::global_stats().IncrementHttp2SettingsWrites(); } } diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index 70326c37823..5ce7bfb84f4 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -69,10 +69,6 @@ const char* const additional_constraints_pick_first_new = "{}"; const char* const description_promise_based_inproc_transport = "Use promises for the in-process transport."; const char* const additional_constraints_promise_based_inproc_transport = "{}"; -const char* const description_rstpit = - "On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short " - "duration"; -const char* const additional_constraints_rstpit = "{}"; const char* const description_schedule_cancellation_over_write = "Allow cancellation op to be scheduled over a write"; const char* const additional_constraints_schedule_cancellation_over_write = @@ -142,8 +138,6 @@ const ExperimentMetadata g_experiment_metadata[] = { description_promise_based_inproc_transport, additional_constraints_promise_based_inproc_transport, nullptr, 0, false, false}, - {"rstpit", description_rstpit, additional_constraints_rstpit, nullptr, 0, - false, true}, {"schedule_cancellation_over_write", description_schedule_cancellation_over_write, additional_constraints_schedule_cancellation_over_write, nullptr, 0, false, @@ -219,10 +213,6 @@ const char* const additional_constraints_pick_first_new = "{}"; const char* const description_promise_based_inproc_transport = "Use promises for the in-process transport."; const char* const additional_constraints_promise_based_inproc_transport = "{}"; -const char* const description_rstpit = - "On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short " - "duration"; -const char* const additional_constraints_rstpit = "{}"; const char* const description_schedule_cancellation_over_write = "Allow cancellation op to be scheduled over a write"; const char* const additional_constraints_schedule_cancellation_over_write = @@ -292,8 +282,6 @@ const ExperimentMetadata g_experiment_metadata[] = { description_promise_based_inproc_transport, additional_constraints_promise_based_inproc_transport, nullptr, 0, false, false}, - {"rstpit", description_rstpit, additional_constraints_rstpit, nullptr, 0, - false, true}, {"schedule_cancellation_over_write", description_schedule_cancellation_over_write, additional_constraints_schedule_cancellation_over_write, nullptr, 0, false, @@ -369,10 +357,6 @@ const char* const additional_constraints_pick_first_new = "{}"; const char* const description_promise_based_inproc_transport = "Use promises for the in-process transport."; const char* const additional_constraints_promise_based_inproc_transport = "{}"; -const char* const description_rstpit = - "On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short " - "duration"; -const char* const additional_constraints_rstpit = "{}"; const char* const description_schedule_cancellation_over_write = "Allow cancellation op to be scheduled over a write"; const char* const additional_constraints_schedule_cancellation_over_write = @@ -442,8 +426,6 @@ const ExperimentMetadata g_experiment_metadata[] = { description_promise_based_inproc_transport, additional_constraints_promise_based_inproc_transport, nullptr, 0, false, false}, - {"rstpit", description_rstpit, additional_constraints_rstpit, nullptr, 0, - false, true}, {"schedule_cancellation_over_write", description_schedule_cancellation_over_write, additional_constraints_schedule_cancellation_over_write, nullptr, 0, false, diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index 498557c5118..89f6b6010ae 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -75,7 +75,6 @@ inline bool IsPeerStateBasedFramingEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW inline bool IsPickFirstNewEnabled() { return true; } inline bool IsPromiseBasedInprocTransportEnabled() { return false; } -inline bool IsRstpitEnabled() { return false; } inline bool IsScheduleCancellationOverWriteEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; } @@ -109,7 +108,6 @@ inline bool IsPeerStateBasedFramingEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW inline bool IsPickFirstNewEnabled() { return true; } inline bool IsPromiseBasedInprocTransportEnabled() { return false; } -inline bool IsRstpitEnabled() { return false; } inline bool IsScheduleCancellationOverWriteEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; } @@ -142,7 +140,6 @@ inline bool IsPeerStateBasedFramingEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW inline bool IsPickFirstNewEnabled() { return true; } inline bool IsPromiseBasedInprocTransportEnabled() { return false; } -inline bool IsRstpitEnabled() { return false; } inline bool IsScheduleCancellationOverWriteEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; } @@ -171,7 +168,6 @@ enum ExperimentIds { kExperimentIdPeerStateBasedFraming, kExperimentIdPickFirstNew, kExperimentIdPromiseBasedInprocTransport, - kExperimentIdRstpit, kExperimentIdScheduleCancellationOverWrite, kExperimentIdServerPrivacy, kExperimentIdTcpFrameSizeTuning, @@ -238,10 +234,6 @@ inline bool IsPickFirstNewEnabled() { inline bool IsPromiseBasedInprocTransportEnabled() { return IsExperimentEnabled(); } -#define GRPC_EXPERIMENT_IS_INCLUDED_RSTPIT -inline bool IsRstpitEnabled() { - return IsExperimentEnabled(); -} #define GRPC_EXPERIMENT_IS_INCLUDED_SCHEDULE_CANCELLATION_OVER_WRITE inline bool IsScheduleCancellationOverWriteEnabled() { return IsExperimentEnabled(); diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index 5d69cf1bc13..5815f2d66f5 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -130,12 +130,6 @@ owner: ctiller@google.com test_tags: [] allow_in_fuzzing_config: false # experiment currently crashes if enabled -- name: rstpit - description: - On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short duration - expiry: 2024/08/03 - owner: ctiller@google.com - test_tags: [flow_control_test] - name: schedule_cancellation_over_write description: Allow cancellation op to be scheduled over a write expiry: 2024/08/01 diff --git a/test/core/transport/chttp2/BUILD b/test/core/transport/chttp2/BUILD index 54e1b521b77..d2cd2a2a27b 100644 --- a/test/core/transport/chttp2/BUILD +++ b/test/core/transport/chttp2/BUILD @@ -346,16 +346,6 @@ grpc_cc_test( ], ) -grpc_cc_test( - name = "max_concurrent_streams_policy_test", - srcs = ["max_concurrent_streams_policy_test.cc"], - external_deps = ["gtest"], - language = "C++", - deps = [ - "//src/core:max_concurrent_streams_policy", - ], -) - grpc_cc_test( name = "streams_not_seen_test", srcs = ["streams_not_seen_test.cc"], diff --git a/test/core/transport/chttp2/max_concurrent_streams_policy_test.cc b/test/core/transport/chttp2/max_concurrent_streams_policy_test.cc deleted file mode 100644 index 11b347dbcd8..00000000000 --- a/test/core/transport/chttp2/max_concurrent_streams_policy_test.cc +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2023 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" - -#include - -#include "gtest/gtest.h" - -namespace grpc_core { -namespace { - -TEST(MaxConcurrentStreamsPolicyTest, NoOpWorks) { - Chttp2MaxConcurrentStreamsPolicy policy; - policy.SetTarget(100); - EXPECT_EQ(policy.AdvertiseValue(), 100); -} - -TEST(MaxConcurrentStreamsPolicyTest, BasicFlow) { - Chttp2MaxConcurrentStreamsPolicy policy; - policy.SetTarget(100); - EXPECT_EQ(policy.AdvertiseValue(), 100); - policy.AddDemerit(); - EXPECT_EQ(policy.AdvertiseValue(), 99); - policy.FlushedSettings(); - EXPECT_EQ(policy.AdvertiseValue(), 99); - policy.AckLastSend(); - EXPECT_EQ(policy.AdvertiseValue(), 100); -} - -} // namespace -} // namespace grpc_core - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -}