[chttp2] Remove rstpit experiment

pull/37343/head
Craig Tiller 4 months ago
parent 164d4c63c8
commit 1844d92461
  1. 1
      BUILD
  2. 4
      bazel/experiments.bzl
  3. 17
      src/core/BUILD
  4. 2
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  5. 3
      src/core/ext/transport/chttp2/transport/internal.h
  6. 45
      src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc
  7. 67
      src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h
  8. 20
      src/core/ext/transport/chttp2/transport/parsing.cc
  9. 4
      src/core/ext/transport/chttp2/transport/writing.cc
  10. 18
      src/core/lib/experiments/experiments.cc
  11. 8
      src/core/lib/experiments/experiments.h
  12. 6
      src/core/lib/experiments/experiments.yaml
  13. 10
      test/core/transport/chttp2/BUILD
  14. 48
      test/core/transport/chttp2/max_concurrent_streams_policy_test.cc

@ -4829,7 +4829,6 @@ grpc_cc_library(
"//src/core:iomgr_fwd",
"//src/core:iomgr_port",
"//src/core:match",
"//src/core:max_concurrent_streams_policy",
"//src/core:memory_quota",
"//src/core:metadata_batch",
"//src/core:metadata_info",

@ -31,7 +31,6 @@ EXPERIMENT_ENABLES = {
"peer_state_based_framing": "peer_state_based_framing",
"pick_first_new": "pick_first_new",
"promise_based_inproc_transport": "promise_based_inproc_transport",
"rstpit": "rstpit",
"schedule_cancellation_over_write": "schedule_cancellation_over_write",
"server_privacy": "server_privacy",
"tcp_frame_size_tuning": "tcp_frame_size_tuning",
@ -60,7 +59,6 @@ EXPERIMENTS = {
"flow_control_test": [
"multiping",
"peer_state_based_framing",
"rstpit",
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
],
@ -108,7 +106,6 @@ EXPERIMENTS = {
"flow_control_test": [
"multiping",
"peer_state_based_framing",
"rstpit",
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
],
@ -150,7 +147,6 @@ EXPERIMENTS = {
"flow_control_test": [
"multiping",
"peer_state_based_framing",
"rstpit",
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
],

@ -7188,23 +7188,6 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "max_concurrent_streams_policy",
srcs = [
"ext/transport/chttp2/transport/max_concurrent_streams_policy.cc",
],
hdrs = [
"ext/transport/chttp2/transport/max_concurrent_streams_policy.h",
],
external_deps = [
"absl/log:check",
],
deps = [
"//:gpr",
"//:gpr_platform",
],
)
grpc_cc_library(
name = "huffsyms",
srcs = [

@ -65,7 +65,6 @@
#include "src/core/ext/transport/chttp2/transport/http2_settings.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h"
#include "src/core/ext/transport/chttp2/transport/ping_abuse_policy.h"
#include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
#include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
@ -547,7 +546,6 @@ static void read_channel_args(grpc_chttp2_transport* t,
value = channel_args.GetInt(GRPC_ARG_MAX_CONCURRENT_STREAMS).value_or(-1);
if (value >= 0) {
t->settings.mutable_local().SetMaxConcurrentStreams(value);
t->max_concurrent_streams_policy.SetTarget(value);
}
} else if (channel_args.Contains(GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
VLOG(2) << GRPC_ARG_MAX_CONCURRENT_STREAMS

@ -52,7 +52,6 @@
#include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
#include "src/core/ext/transport/chttp2/transport/http2_settings.h"
#include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h"
#include "src/core/ext/transport/chttp2/transport/ping_abuse_policy.h"
#include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
#include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
@ -383,8 +382,6 @@ struct grpc_chttp2_transport final : public grpc_core::FilterStackTransport,
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
grpc_closure retry_initiate_ping_locked;
grpc_core::Chttp2MaxConcurrentStreamsPolicy max_concurrent_streams_policy;
/// ping acks
size_t ping_ack_count = 0;
size_t ping_ack_capacity = 0;

@ -1,45 +0,0 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h"
#include <utility>
#include "absl/log/check.h"
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
namespace grpc_core {
void Chttp2MaxConcurrentStreamsPolicy::AddDemerit() {
++new_demerits_;
++unacked_demerits_;
}
void Chttp2MaxConcurrentStreamsPolicy::FlushedSettings() {
sent_demerits_ += std::exchange(new_demerits_, 0);
}
void Chttp2MaxConcurrentStreamsPolicy::AckLastSend() {
CHECK(unacked_demerits_ >= sent_demerits_);
unacked_demerits_ -= std::exchange(sent_demerits_, 0);
}
uint32_t Chttp2MaxConcurrentStreamsPolicy::AdvertiseValue() const {
if (target_ < unacked_demerits_) return 0;
return target_ - unacked_demerits_;
}
} // namespace grpc_core

@ -1,67 +0,0 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_MAX_CONCURRENT_STREAMS_POLICY_H
#define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_MAX_CONCURRENT_STREAMS_POLICY_H
#include <cstdint>
#include <limits>
#include <grpc/support/port_platform.h>
namespace grpc_core {
class Chttp2MaxConcurrentStreamsPolicy {
public:
// Set the target number of concurrent streams.
// If everything is idle we should advertise this number.
void SetTarget(uint32_t target) { target_ = target; }
// Add one demerit to the current target.
// We need to do one full settings round trip after this to clear this
// demerit.
// It will reduce our advertised max concurrent streams by one.
void AddDemerit();
// Notify the policy that we've sent a settings frame.
// Newly added demerits since the last settings frame was sent will be cleared
// once that settings frame is acknowledged.
void FlushedSettings();
// Notify the policy that we've received an acknowledgement for the last
// settings frame we sent.
void AckLastSend();
// Returns what we should advertise as max concurrent streams.
uint32_t AdvertiseValue() const;
private:
uint32_t target_ = std::numeric_limits<int32_t>::max();
// Demerit flow:
// When we add a demerit, we add to both new & unacked.
// When we flush settings, we move new to sent.
// When we ack settings, we remove what we sent from unacked.
// eg:
// we add 10 demerits - now new=10, sent=0, unacked=10
// we send settings - now new=0, sent=10, unacked=10
// we add 5 demerits - now new=5, sent=10, unacked=15
// we get the settings ack - now new=5, sent=0, unacked=5
uint32_t new_demerits_ = 0;
uint32_t sent_demerits_ = 0;
uint32_t unacked_demerits_ = 0;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_MAX_CONCURRENT_STREAMS_POLICY_H

@ -55,7 +55,6 @@
#include "src/core/ext/transport/chttp2/transport/http2_settings.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h"
#include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
#include "src/core/lib/backoff/random_early_detection.h"
#include "src/core/lib/debug/trace.h"
@ -650,7 +649,7 @@ static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t,
} else if (GPR_UNLIKELY(
t->max_concurrent_streams_overload_protection &&
t->streams_allocated.load(std::memory_order_relaxed) >
t->max_concurrent_streams_policy.AdvertiseValue())) {
t->settings.local().max_concurrent_streams())) {
// We have more streams allocated than we'd like, so apply some pushback
// by refusing this stream.
++t->num_pending_induced_frames;
@ -659,13 +658,12 @@ static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t,
GRPC_HTTP2_REFUSED_STREAM, nullptr));
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
return init_header_skip_frame_parser(t, priority_type, is_eoh);
} else if (GPR_UNLIKELY(
t->stream_map.size() >=
t->max_concurrent_streams_policy.AdvertiseValue() &&
grpc_core::RandomEarlyDetection(
t->max_concurrent_streams_policy.AdvertiseValue(),
t->settings.acked().max_concurrent_streams())
.Reject(t->stream_map.size(), t->bitgen))) {
} else if (GPR_UNLIKELY(t->stream_map.size() >=
t->settings.local().max_concurrent_streams() &&
grpc_core::RandomEarlyDetection(
t->settings.local().max_concurrent_streams(),
t->settings.acked().max_concurrent_streams())
.Reject(t->stream_map.size(), t->bitgen))) {
// We are under the limit of max concurrent streams for the current
// setting, but are over the next value that will be advertised.
// Apply some backpressure by randomly not accepting new streams.
@ -827,9 +825,6 @@ static grpc_error_handle init_rst_stream_parser(grpc_chttp2_transport* t) {
s->call_tracer_wrapper.RecordIncomingBytes({9, 0, 0});
t->parser = grpc_chttp2_transport::Parser{
"rst_stream", grpc_chttp2_rst_stream_parser_parse, &t->simple.rst_stream};
if (!t->is_client && grpc_core::IsRstpitEnabled()) {
t->max_concurrent_streams_policy.AddDemerit();
}
return absl::OkStatus();
}
@ -854,7 +849,6 @@ static grpc_error_handle init_settings_frame_parser(grpc_chttp2_transport* t) {
return err;
}
if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) {
t->max_concurrent_streams_policy.AckLastSend();
if (!t->settings.AckLastSend()) {
return GRPC_ERROR_CREATE("Received unexpected settings ack");
}

@ -49,7 +49,6 @@
#include "src/core/ext/transport/chttp2/transport/http2_settings.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h"
#include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
#include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
#include "src/core/ext/transport/chttp2/transport/write_size_policy.h"
@ -260,8 +259,6 @@ class WriteContext {
}
void FlushSettings() {
t_->settings.mutable_local().SetMaxConcurrentStreams(
t_->max_concurrent_streams_policy.AdvertiseValue());
auto update = t_->settings.MaybeSendUpdate();
if (update.has_value()) {
grpc_core::Http2Frame frame(std::move(*update));
@ -280,7 +277,6 @@ class WriteContext {
});
}
t_->flow_control.FlushedSettings();
t_->max_concurrent_streams_policy.FlushedSettings();
grpc_core::global_stats().IncrementHttp2SettingsWrites();
}
}

@ -69,10 +69,6 @@ const char* const additional_constraints_pick_first_new = "{}";
const char* const description_promise_based_inproc_transport =
"Use promises for the in-process transport.";
const char* const additional_constraints_promise_based_inproc_transport = "{}";
const char* const description_rstpit =
"On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short "
"duration";
const char* const additional_constraints_rstpit = "{}";
const char* const description_schedule_cancellation_over_write =
"Allow cancellation op to be scheduled over a write";
const char* const additional_constraints_schedule_cancellation_over_write =
@ -142,8 +138,6 @@ const ExperimentMetadata g_experiment_metadata[] = {
description_promise_based_inproc_transport,
additional_constraints_promise_based_inproc_transport, nullptr, 0, false,
false},
{"rstpit", description_rstpit, additional_constraints_rstpit, nullptr, 0,
false, true},
{"schedule_cancellation_over_write",
description_schedule_cancellation_over_write,
additional_constraints_schedule_cancellation_over_write, nullptr, 0, false,
@ -219,10 +213,6 @@ const char* const additional_constraints_pick_first_new = "{}";
const char* const description_promise_based_inproc_transport =
"Use promises for the in-process transport.";
const char* const additional_constraints_promise_based_inproc_transport = "{}";
const char* const description_rstpit =
"On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short "
"duration";
const char* const additional_constraints_rstpit = "{}";
const char* const description_schedule_cancellation_over_write =
"Allow cancellation op to be scheduled over a write";
const char* const additional_constraints_schedule_cancellation_over_write =
@ -292,8 +282,6 @@ const ExperimentMetadata g_experiment_metadata[] = {
description_promise_based_inproc_transport,
additional_constraints_promise_based_inproc_transport, nullptr, 0, false,
false},
{"rstpit", description_rstpit, additional_constraints_rstpit, nullptr, 0,
false, true},
{"schedule_cancellation_over_write",
description_schedule_cancellation_over_write,
additional_constraints_schedule_cancellation_over_write, nullptr, 0, false,
@ -369,10 +357,6 @@ const char* const additional_constraints_pick_first_new = "{}";
const char* const description_promise_based_inproc_transport =
"Use promises for the in-process transport.";
const char* const additional_constraints_promise_based_inproc_transport = "{}";
const char* const description_rstpit =
"On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short "
"duration";
const char* const additional_constraints_rstpit = "{}";
const char* const description_schedule_cancellation_over_write =
"Allow cancellation op to be scheduled over a write";
const char* const additional_constraints_schedule_cancellation_over_write =
@ -442,8 +426,6 @@ const ExperimentMetadata g_experiment_metadata[] = {
description_promise_based_inproc_transport,
additional_constraints_promise_based_inproc_transport, nullptr, 0, false,
false},
{"rstpit", description_rstpit, additional_constraints_rstpit, nullptr, 0,
false, true},
{"schedule_cancellation_over_write",
description_schedule_cancellation_over_write,
additional_constraints_schedule_cancellation_over_write, nullptr, 0, false,

@ -75,7 +75,6 @@ inline bool IsPeerStateBasedFramingEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW
inline bool IsPickFirstNewEnabled() { return true; }
inline bool IsPromiseBasedInprocTransportEnabled() { return false; }
inline bool IsRstpitEnabled() { return false; }
inline bool IsScheduleCancellationOverWriteEnabled() { return false; }
inline bool IsServerPrivacyEnabled() { return false; }
inline bool IsTcpFrameSizeTuningEnabled() { return false; }
@ -109,7 +108,6 @@ inline bool IsPeerStateBasedFramingEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW
inline bool IsPickFirstNewEnabled() { return true; }
inline bool IsPromiseBasedInprocTransportEnabled() { return false; }
inline bool IsRstpitEnabled() { return false; }
inline bool IsScheduleCancellationOverWriteEnabled() { return false; }
inline bool IsServerPrivacyEnabled() { return false; }
inline bool IsTcpFrameSizeTuningEnabled() { return false; }
@ -142,7 +140,6 @@ inline bool IsPeerStateBasedFramingEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW
inline bool IsPickFirstNewEnabled() { return true; }
inline bool IsPromiseBasedInprocTransportEnabled() { return false; }
inline bool IsRstpitEnabled() { return false; }
inline bool IsScheduleCancellationOverWriteEnabled() { return false; }
inline bool IsServerPrivacyEnabled() { return false; }
inline bool IsTcpFrameSizeTuningEnabled() { return false; }
@ -171,7 +168,6 @@ enum ExperimentIds {
kExperimentIdPeerStateBasedFraming,
kExperimentIdPickFirstNew,
kExperimentIdPromiseBasedInprocTransport,
kExperimentIdRstpit,
kExperimentIdScheduleCancellationOverWrite,
kExperimentIdServerPrivacy,
kExperimentIdTcpFrameSizeTuning,
@ -238,10 +234,6 @@ inline bool IsPickFirstNewEnabled() {
inline bool IsPromiseBasedInprocTransportEnabled() {
return IsExperimentEnabled<kExperimentIdPromiseBasedInprocTransport>();
}
#define GRPC_EXPERIMENT_IS_INCLUDED_RSTPIT
inline bool IsRstpitEnabled() {
return IsExperimentEnabled<kExperimentIdRstpit>();
}
#define GRPC_EXPERIMENT_IS_INCLUDED_SCHEDULE_CANCELLATION_OVER_WRITE
inline bool IsScheduleCancellationOverWriteEnabled() {
return IsExperimentEnabled<kExperimentIdScheduleCancellationOverWrite>();

@ -130,12 +130,6 @@
owner: ctiller@google.com
test_tags: []
allow_in_fuzzing_config: false # experiment currently crashes if enabled
- name: rstpit
description:
On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short duration
expiry: 2024/08/03
owner: ctiller@google.com
test_tags: [flow_control_test]
- name: schedule_cancellation_over_write
description: Allow cancellation op to be scheduled over a write
expiry: 2024/08/01

@ -346,16 +346,6 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "max_concurrent_streams_policy_test",
srcs = ["max_concurrent_streams_policy_test.cc"],
external_deps = ["gtest"],
language = "C++",
deps = [
"//src/core:max_concurrent_streams_policy",
],
)
grpc_cc_test(
name = "streams_not_seen_test",
srcs = ["streams_not_seen_test.cc"],

@ -1,48 +0,0 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h"
#include <memory>
#include "gtest/gtest.h"
namespace grpc_core {
namespace {
TEST(MaxConcurrentStreamsPolicyTest, NoOpWorks) {
Chttp2MaxConcurrentStreamsPolicy policy;
policy.SetTarget(100);
EXPECT_EQ(policy.AdvertiseValue(), 100);
}
TEST(MaxConcurrentStreamsPolicyTest, BasicFlow) {
Chttp2MaxConcurrentStreamsPolicy policy;
policy.SetTarget(100);
EXPECT_EQ(policy.AdvertiseValue(), 100);
policy.AddDemerit();
EXPECT_EQ(policy.AdvertiseValue(), 99);
policy.FlushedSettings();
EXPECT_EQ(policy.AdvertiseValue(), 99);
policy.AckLastSend();
EXPECT_EQ(policy.AdvertiseValue(), 100);
}
} // namespace
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
Loading…
Cancel
Save