From 57dacad8c761a12de3e5eafa45f190015c73cccb Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 11 Dec 2023 12:42:47 -0800 Subject: [PATCH] [call-v3] Idle/max-age filters (#35270) Closes #35270 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35270 from ctiller:cg-idle 5312003ca67a3cc0841e6e2ff938161fd59bfda4 PiperOrigin-RevId: 589913523 --- CMakeLists.txt | 2 + Makefile | 2 + Package.swift | 2 + bazel/experiments.bzl | 1 + build_autogenerated.yaml | 4 + config.m4 | 1 + config.w32 | 1 + gRPC-C++.podspec | 2 + gRPC-Core.podspec | 3 + grpc.gemspec | 2 + grpc.gyp | 2 + package.xml | 2 + src/core/BUILD | 2 + .../channel_idle/channel_idle_filter.cc | 19 +- .../channel_idle/channel_idle_filter.h | 28 +- .../legacy_channel_idle_filter.cc | 326 ++++++++++++++++++ .../channel_idle/legacy_channel_idle_filter.h | 143 ++++++++ src/core/lib/experiments/experiments.cc | 15 + src/core/lib/experiments/experiments.h | 8 + src/core/lib/experiments/experiments.yaml | 6 + .../plugin_registry/grpc_plugin_registry.cc | 3 + src/python/grpcio/grpc_core_dependencies.py | 1 + tools/doxygen/Doxyfile.c++.internal | 2 + tools/doxygen/Doxyfile.core.internal | 2 + 24 files changed, 558 insertions(+), 21 deletions(-) create mode 100644 src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc create mode 100644 src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h diff --git a/CMakeLists.txt b/CMakeLists.txt index b9390ecccf2..9aea587d790 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1756,6 +1756,7 @@ add_library(grpc src/core/ext/filters/census/grpc_context.cc src/core/ext/filters/channel_idle/channel_idle_filter.cc src/core/ext/filters/channel_idle/idle_filter_state.cc + src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc src/core/ext/filters/client_channel/backend_metric.cc src/core/ext/filters/client_channel/backup_poller.cc src/core/ext/filters/client_channel/channel_connectivity.cc @@ -2820,6 +2821,7 @@ add_library(grpc_unsecure src/core/ext/filters/census/grpc_context.cc src/core/ext/filters/channel_idle/channel_idle_filter.cc src/core/ext/filters/channel_idle/idle_filter_state.cc + src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc src/core/ext/filters/client_channel/backend_metric.cc src/core/ext/filters/client_channel/backup_poller.cc src/core/ext/filters/client_channel/channel_connectivity.cc diff --git a/Makefile b/Makefile index 6df49a7bac0..372949e7b47 100644 --- a/Makefile +++ b/Makefile @@ -958,6 +958,7 @@ LIBGRPC_SRC = \ src/core/ext/filters/census/grpc_context.cc \ src/core/ext/filters/channel_idle/channel_idle_filter.cc \ src/core/ext/filters/channel_idle/idle_filter_state.cc \ + src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc \ src/core/ext/filters/client_channel/backend_metric.cc \ src/core/ext/filters/client_channel/backup_poller.cc \ src/core/ext/filters/client_channel/channel_connectivity.cc \ @@ -1872,6 +1873,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/census/grpc_context.cc \ src/core/ext/filters/channel_idle/channel_idle_filter.cc \ src/core/ext/filters/channel_idle/idle_filter_state.cc \ + src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc \ src/core/ext/filters/client_channel/backend_metric.cc \ src/core/ext/filters/client_channel/backup_poller.cc \ src/core/ext/filters/client_channel/channel_connectivity.cc \ diff --git a/Package.swift b/Package.swift index 7c9d9ce10e9..a692977ac4e 100644 --- a/Package.swift +++ b/Package.swift @@ -121,6 +121,8 @@ let package = Package( "src/core/ext/filters/channel_idle/channel_idle_filter.h", "src/core/ext/filters/channel_idle/idle_filter_state.cc", "src/core/ext/filters/channel_idle/idle_filter_state.h", + "src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc", + "src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h", "src/core/ext/filters/client_channel/backend_metric.cc", "src/core/ext/filters/client_channel/backend_metric.h", "src/core/ext/filters/client_channel/backup_poller.cc", diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl index e5f40242695..4bac512e438 100644 --- a/bazel/experiments.bzl +++ b/bazel/experiments.bzl @@ -50,6 +50,7 @@ EXPERIMENT_ENABLES = { "tcp_rcv_lowat": "tcp_rcv_lowat", "trace_record_callops": "trace_record_callops", "unconstrained_max_quota_buffer_size": "unconstrained_max_quota_buffer_size", + "v3_channel_idle_filters": "v3_channel_idle_filters", "work_serializer_clears_time_cache": "work_serializer_clears_time_cache", "work_serializer_dispatch": "work_serializer_dispatch", "write_size_policy": "write_size_policy", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 09460f09a88..4789b3f5a46 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -218,6 +218,7 @@ libs: - src/core/ext/filters/backend_metrics/backend_metric_provider.h - src/core/ext/filters/channel_idle/channel_idle_filter.h - src/core/ext/filters/channel_idle/idle_filter_state.h + - src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h - src/core/ext/filters/client_channel/backend_metric.h - src/core/ext/filters/client_channel/backup_poller.h - src/core/ext/filters/client_channel/client_channel.h @@ -1209,6 +1210,7 @@ libs: - src/core/ext/filters/census/grpc_context.cc - src/core/ext/filters/channel_idle/channel_idle_filter.cc - src/core/ext/filters/channel_idle/idle_filter_state.cc + - src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc - src/core/ext/filters/client_channel/backend_metric.cc - src/core/ext/filters/client_channel/backup_poller.cc - src/core/ext/filters/client_channel/channel_connectivity.cc @@ -2154,6 +2156,7 @@ libs: - src/core/ext/filters/backend_metrics/backend_metric_provider.h - src/core/ext/filters/channel_idle/channel_idle_filter.h - src/core/ext/filters/channel_idle/idle_filter_state.h + - src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h - src/core/ext/filters/client_channel/backend_metric.h - src/core/ext/filters/client_channel/backup_poller.h - src/core/ext/filters/client_channel/client_channel.h @@ -2625,6 +2628,7 @@ libs: - src/core/ext/filters/census/grpc_context.cc - src/core/ext/filters/channel_idle/channel_idle_filter.cc - src/core/ext/filters/channel_idle/idle_filter_state.cc + - src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc - src/core/ext/filters/client_channel/backend_metric.cc - src/core/ext/filters/client_channel/backup_poller.cc - src/core/ext/filters/client_channel/channel_connectivity.cc diff --git a/config.m4 b/config.m4 index f4f52a0044c..0b2e030045f 100644 --- a/config.m4 +++ b/config.m4 @@ -45,6 +45,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/census/grpc_context.cc \ src/core/ext/filters/channel_idle/channel_idle_filter.cc \ src/core/ext/filters/channel_idle/idle_filter_state.cc \ + src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc \ src/core/ext/filters/client_channel/backend_metric.cc \ src/core/ext/filters/client_channel/backup_poller.cc \ src/core/ext/filters/client_channel/channel_connectivity.cc \ diff --git a/config.w32 b/config.w32 index 52ac38f1204..1455ff6cc05 100644 --- a/config.w32 +++ b/config.w32 @@ -10,6 +10,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\census\\grpc_context.cc " + "src\\core\\ext\\filters\\channel_idle\\channel_idle_filter.cc " + "src\\core\\ext\\filters\\channel_idle\\idle_filter_state.cc " + + "src\\core\\ext\\filters\\channel_idle\\legacy_channel_idle_filter.cc " + "src\\core\\ext\\filters\\client_channel\\backend_metric.cc " + "src\\core\\ext\\filters\\client_channel\\backup_poller.cc " + "src\\core\\ext\\filters\\client_channel\\channel_connectivity.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 14b9978f503..9364a57164e 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -251,6 +251,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/backend_metrics/backend_metric_provider.h', 'src/core/ext/filters/channel_idle/channel_idle_filter.h', 'src/core/ext/filters/channel_idle/idle_filter_state.h', + 'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h', 'src/core/ext/filters/client_channel/backend_metric.h', 'src/core/ext/filters/client_channel/backup_poller.h', 'src/core/ext/filters/client_channel/client_channel.h', @@ -1503,6 +1504,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/backend_metrics/backend_metric_provider.h', 'src/core/ext/filters/channel_idle/channel_idle_filter.h', 'src/core/ext/filters/channel_idle/idle_filter_state.h', + 'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h', 'src/core/ext/filters/client_channel/backend_metric.h', 'src/core/ext/filters/client_channel/backup_poller.h', 'src/core/ext/filters/client_channel/client_channel.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 303ac9ad5b2..0076a1b6311 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -224,6 +224,8 @@ Pod::Spec.new do |s| 'src/core/ext/filters/channel_idle/channel_idle_filter.h', 'src/core/ext/filters/channel_idle/idle_filter_state.cc', 'src/core/ext/filters/channel_idle/idle_filter_state.h', + 'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc', + 'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h', 'src/core/ext/filters/client_channel/backend_metric.cc', 'src/core/ext/filters/client_channel/backend_metric.h', 'src/core/ext/filters/client_channel/backup_poller.cc', @@ -2286,6 +2288,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/backend_metrics/backend_metric_provider.h', 'src/core/ext/filters/channel_idle/channel_idle_filter.h', 'src/core/ext/filters/channel_idle/idle_filter_state.h', + 'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h', 'src/core/ext/filters/client_channel/backend_metric.h', 'src/core/ext/filters/client_channel/backup_poller.h', 'src/core/ext/filters/client_channel/client_channel.h', diff --git a/grpc.gemspec b/grpc.gemspec index 4b796f508bd..e5962295a73 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -127,6 +127,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/channel_idle/channel_idle_filter.h ) s.files += %w( src/core/ext/filters/channel_idle/idle_filter_state.cc ) s.files += %w( src/core/ext/filters/channel_idle/idle_filter_state.h ) + s.files += %w( src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc ) + s.files += %w( src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h ) s.files += %w( src/core/ext/filters/client_channel/backend_metric.cc ) s.files += %w( src/core/ext/filters/client_channel/backend_metric.h ) s.files += %w( src/core/ext/filters/client_channel/backup_poller.cc ) diff --git a/grpc.gyp b/grpc.gyp index 4b043c457b2..e68df91099f 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -278,6 +278,7 @@ 'src/core/ext/filters/census/grpc_context.cc', 'src/core/ext/filters/channel_idle/channel_idle_filter.cc', 'src/core/ext/filters/channel_idle/idle_filter_state.cc', + 'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc', 'src/core/ext/filters/client_channel/backend_metric.cc', 'src/core/ext/filters/client_channel/backup_poller.cc', 'src/core/ext/filters/client_channel/channel_connectivity.cc', @@ -1134,6 +1135,7 @@ 'src/core/ext/filters/census/grpc_context.cc', 'src/core/ext/filters/channel_idle/channel_idle_filter.cc', 'src/core/ext/filters/channel_idle/idle_filter_state.cc', + 'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc', 'src/core/ext/filters/client_channel/backend_metric.cc', 'src/core/ext/filters/client_channel/backup_poller.cc', 'src/core/ext/filters/client_channel/channel_connectivity.cc', diff --git a/package.xml b/package.xml index e0e276e52c7..3a2b2beb84e 100644 --- a/package.xml +++ b/package.xml @@ -109,6 +109,8 @@ + + diff --git a/src/core/BUILD b/src/core/BUILD index 36bad285ae5..e5ad5240aea 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -3892,9 +3892,11 @@ grpc_cc_library( name = "grpc_channel_idle_filter", srcs = [ "ext/filters/channel_idle/channel_idle_filter.cc", + "ext/filters/channel_idle/legacy_channel_idle_filter.cc", ], hdrs = [ "ext/filters/channel_idle/channel_idle_filter.h", + "ext/filters/channel_idle/legacy_channel_idle_filter.h", ], external_deps = [ "absl/base:core_headers", diff --git a/src/core/ext/filters/channel_idle/channel_idle_filter.cc b/src/core/ext/filters/channel_idle/channel_idle_filter.cc index 40bedf4d595..cd88ca58b3d 100644 --- a/src/core/ext/filters/channel_idle/channel_idle_filter.cc +++ b/src/core/ext/filters/channel_idle/channel_idle_filter.cc @@ -56,6 +56,13 @@ namespace grpc_core { +const NoInterceptor ChannelIdleFilter::Call::OnClientInitialMetadata; +const NoInterceptor ChannelIdleFilter::Call::OnServerInitialMetadata; +const NoInterceptor ChannelIdleFilter::Call::OnServerTrailingMetadata; +const NoInterceptor ChannelIdleFilter::Call::OnClientToServerMessage; +const NoInterceptor ChannelIdleFilter::Call::OnServerToClientMessage; +const NoInterceptor ChannelIdleFilter::Call::OnFinalize; + namespace { // TODO(roth): This can go back to being a constant when the experiment @@ -221,17 +228,6 @@ void MaxAgeFilter::PostInit() { } } -// Construct a promise for one call. -ArenaPromise ChannelIdleFilter::MakeCallPromise( - CallArgs call_args, NextPromiseFactory next_promise_factory) { - using Decrementer = std::unique_ptr; - IncreaseCallCount(); - return ArenaPromise( - [decrementer = Decrementer(this), - next = next_promise_factory(std::move(call_args))]() mutable - -> Poll { return next(); }); -} - bool ChannelIdleFilter::StartTransportOp(grpc_transport_op* op) { // Catch the disconnect_with_error transport op. if (!op->disconnect_with_error.ok()) Shutdown(); @@ -298,6 +294,7 @@ const grpc_channel_filter MaxAgeFilter::kFilter = MakePromiseBasedFilter("max_age"); void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) { + if (!IsV3ChannelIdleFiltersEnabled()) return; builder->channel_init() ->RegisterFilter(GRPC_CLIENT_CHANNEL, &ClientIdleFilter::kFilter) .ExcludeFromMinimalStack() diff --git a/src/core/ext/filters/channel_idle/channel_idle_filter.h b/src/core/ext/filters/channel_idle/channel_idle_filter.h index 1a2803881be..67caadfd5a7 100644 --- a/src/core/ext/filters/channel_idle/channel_idle_filter.h +++ b/src/core/ext/filters/channel_idle/channel_idle_filter.h @@ -40,7 +40,7 @@ namespace grpc_core { -class ChannelIdleFilter : public ChannelFilter { +class ChannelIdleFilter : public ImplementChannelFilter { public: ~ChannelIdleFilter() override = default; @@ -49,9 +49,23 @@ class ChannelIdleFilter : public ChannelFilter { ChannelIdleFilter(ChannelIdleFilter&&) = default; ChannelIdleFilter& operator=(ChannelIdleFilter&&) = default; - // Construct a promise for one call. - ArenaPromise MakeCallPromise( - CallArgs call_args, NextPromiseFactory next_promise_factory) override; + class Call { + public: + explicit Call(ChannelIdleFilter* filter) : filter_(filter) { + filter_->IncreaseCallCount(); + } + ~Call() { filter_->DecreaseCallCount(); } + + static const NoInterceptor OnClientInitialMetadata; + static const NoInterceptor OnServerInitialMetadata; + static const NoInterceptor OnServerTrailingMetadata; + static const NoInterceptor OnClientToServerMessage; + static const NoInterceptor OnServerToClientMessage; + static const NoInterceptor OnFinalize; + + private: + ChannelIdleFilter* filter_; + }; bool StartTransportOp(grpc_transport_op* op) override; @@ -75,12 +89,6 @@ class ChannelIdleFilter : public ChannelFilter { private: void StartIdleTimer(); - struct CallCountDecreaser { - void operator()(ChannelIdleFilter* filter) const { - filter->DecreaseCallCount(); - } - }; - // The channel stack to which we take refs for pending callbacks. grpc_channel_stack* channel_stack_; Duration client_idle_timeout_; diff --git a/src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc b/src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc new file mode 100644 index 00000000000..23d97c9e2ff --- /dev/null +++ b/src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc @@ -0,0 +1,326 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// TODO(ctiller): Add a unit test suite for these filters once it's practical to +// mock transport operations. + +#include + +#include "src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h" + +#include +#include + +#include "absl/base/thread_annotations.h" +#include "absl/meta/type_traits.h" +#include "absl/random/random.h" +#include "absl/types/optional.h" + +#include +#include + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/promise_based_filter.h" +#include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/experiments/experiments.h" +#include "src/core/lib/gprpp/debug_location.h" +#include "src/core/lib/gprpp/no_destruct.h" +#include "src/core/lib/gprpp/orphanable.h" +#include "src/core/lib/gprpp/per_cpu.h" +#include "src/core/lib/gprpp/status_helper.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h" +#include "src/core/lib/promise/loop.h" +#include "src/core/lib/promise/poll.h" +#include "src/core/lib/promise/promise.h" +#include "src/core/lib/promise/sleep.h" +#include "src/core/lib/promise/try_seq.h" +#include "src/core/lib/surface/channel_stack_type.h" +#include "src/core/lib/transport/http2_errors.h" +#include "src/core/lib/transport/metadata_batch.h" + +namespace grpc_core { + +namespace { + +// TODO(roth): This can go back to being a constant when the experiment +// is removed. +Duration DefaultIdleTimeout() { + if (IsClientIdlenessEnabled()) return Duration::Minutes(30); + return Duration::Infinity(); +} + +// If these settings change, make sure that we are not sending a GOAWAY for +// inproc transport, since a GOAWAY to inproc ends up destroying the transport. +const auto kDefaultMaxConnectionAge = Duration::Infinity(); +const auto kDefaultMaxConnectionAgeGrace = Duration::Infinity(); +const auto kDefaultMaxConnectionIdle = Duration::Infinity(); +const auto kMaxConnectionAgeJitter = 0.1; + +TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter"); +} // namespace + +#define GRPC_IDLE_FILTER_LOG(format, ...) \ + do { \ + if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_client_idle_filter)) { \ + gpr_log(GPR_INFO, "(client idle filter) " format, ##__VA_ARGS__); \ + } \ + } while (0) + +namespace { + +Duration GetClientIdleTimeout(const ChannelArgs& args) { + return args.GetDurationFromIntMillis(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS) + .value_or(DefaultIdleTimeout()); +} + +} // namespace + +struct LegacyMaxAgeFilter::Config { + Duration max_connection_age; + Duration max_connection_idle; + Duration max_connection_age_grace; + + bool enable() const { + return max_connection_age != Duration::Infinity() || + max_connection_idle != Duration::Infinity(); + } + + // A random jitter of +/-10% will be added to MAX_CONNECTION_AGE and + // MAX_CONNECTION_IDLE to spread out reconnection storms. + static Config FromChannelArgs(const ChannelArgs& args) { + const Duration args_max_age = + args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_AGE_MS) + .value_or(kDefaultMaxConnectionAge); + const Duration args_max_idle = + args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_IDLE_MS) + .value_or(kDefaultMaxConnectionIdle); + const Duration args_max_age_grace = + args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS) + .value_or(kDefaultMaxConnectionAgeGrace); + // generate a random number between 1 - kMaxConnectionAgeJitter and + // 1 + kMaxConnectionAgeJitter + struct BitGen { + Mutex mu; + absl::BitGen bit_gen ABSL_GUARDED_BY(mu); + double MakeUniformDouble(double min, double max) { + MutexLock lock(&mu); + return absl::Uniform(bit_gen, min, max); + } + }; + static NoDestruct> bit_gen(PerCpuOptions().SetMaxShards(8)); + const double multiplier = bit_gen->this_cpu().MakeUniformDouble( + 1.0 - kMaxConnectionAgeJitter, 1.0 + kMaxConnectionAgeJitter); + // GRPC_MILLIS_INF_FUTURE - 0.5 converts the value to float, so that result + // will not be cast to int implicitly before the comparison. + return Config{args_max_age * multiplier, args_max_idle * multiplier, + args_max_age_grace}; + } +}; + +absl::StatusOr LegacyClientIdleFilter::Create( + const ChannelArgs& args, ChannelFilter::Args filter_args) { + LegacyClientIdleFilter filter(filter_args.channel_stack(), + GetClientIdleTimeout(args)); + return absl::StatusOr(std::move(filter)); +} + +absl::StatusOr LegacyMaxAgeFilter::Create( + const ChannelArgs& args, ChannelFilter::Args filter_args) { + LegacyMaxAgeFilter filter(filter_args.channel_stack(), + Config::FromChannelArgs(args)); + return absl::StatusOr(std::move(filter)); +} + +void LegacyMaxAgeFilter::Shutdown() { + max_age_activity_.Reset(); + LegacyChannelIdleFilter::Shutdown(); +} + +void LegacyMaxAgeFilter::PostInit() { + struct StartupClosure { + RefCountedPtr channel_stack; + LegacyMaxAgeFilter* filter; + grpc_closure closure; + }; + auto run_startup = [](void* p, grpc_error_handle) { + auto* startup = static_cast(p); + // Trigger idle timer + startup->filter->IncreaseCallCount(); + startup->filter->DecreaseCallCount(); + grpc_transport_op* op = grpc_make_transport_op(nullptr); + op->start_connectivity_watch.reset( + new ConnectivityWatcher(startup->filter)); + op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE; + grpc_channel_next_op( + grpc_channel_stack_element(startup->channel_stack.get(), 0), op); + delete startup; + }; + auto* startup = + new StartupClosure{this->channel_stack()->Ref(), this, grpc_closure{}}; + GRPC_CLOSURE_INIT(&startup->closure, run_startup, startup, nullptr); + ExecCtx::Run(DEBUG_LOCATION, &startup->closure, absl::OkStatus()); + + auto channel_stack = this->channel_stack()->Ref(); + + // Start the max age timer + if (max_connection_age_ != Duration::Infinity()) { + max_age_activity_.Set(MakeActivity( + TrySeq( + // First sleep until the max connection age + Sleep(Timestamp::Now() + max_connection_age_), + // Then send a goaway. + [this] { + GRPC_CHANNEL_STACK_REF(this->channel_stack(), + "max_age send_goaway"); + // Jump out of the activity to send the goaway. + auto fn = [](void* arg, grpc_error_handle) { + auto* channel_stack = static_cast(arg); + grpc_transport_op* op = grpc_make_transport_op(nullptr); + op->goaway_error = grpc_error_set_int( + GRPC_ERROR_CREATE("max_age"), + StatusIntProperty::kHttp2Error, GRPC_HTTP2_NO_ERROR); + grpc_channel_element* elem = + grpc_channel_stack_element(channel_stack, 0); + elem->filter->start_transport_op(elem, op); + GRPC_CHANNEL_STACK_UNREF(channel_stack, "max_age send_goaway"); + }; + ExecCtx::Run( + DEBUG_LOCATION, + GRPC_CLOSURE_CREATE(fn, this->channel_stack(), nullptr), + absl::OkStatus()); + return Immediate(absl::OkStatus()); + }, + // Sleep for the grace period + [this] { + return Sleep(Timestamp::Now() + max_connection_age_grace_); + }), + ExecCtxWakeupScheduler(), + [channel_stack, this](absl::Status status) { + // OnDone -- close the connection if the promise completed + // successfully. + // (if it did not, it was cancelled) + if (status.ok()) CloseChannel(); + }, + channel_stack->EventEngine())); + } +} + +// Construct a promise for one call. +ArenaPromise LegacyChannelIdleFilter::MakeCallPromise( + CallArgs call_args, NextPromiseFactory next_promise_factory) { + using Decrementer = + std::unique_ptr; + IncreaseCallCount(); + return ArenaPromise( + [decrementer = Decrementer(this), + next = next_promise_factory(std::move(call_args))]() mutable + -> Poll { return next(); }); +} + +bool LegacyChannelIdleFilter::StartTransportOp(grpc_transport_op* op) { + // Catch the disconnect_with_error transport op. + if (!op->disconnect_with_error.ok()) Shutdown(); + // Pass the op to the next filter. + return false; +} + +void LegacyChannelIdleFilter::Shutdown() { + // IncreaseCallCount() introduces a phony call and prevent the timer from + // being reset by other threads. + IncreaseCallCount(); + activity_.Reset(); +} + +void LegacyChannelIdleFilter::IncreaseCallCount() { + idle_filter_state_->IncreaseCallCount(); +} + +void LegacyChannelIdleFilter::DecreaseCallCount() { + if (idle_filter_state_->DecreaseCallCount()) { + // If there are no more calls in progress, start the idle timer. + StartIdleTimer(); + } +} + +void LegacyChannelIdleFilter::StartIdleTimer() { + GRPC_IDLE_FILTER_LOG("timer has started"); + auto idle_filter_state = idle_filter_state_; + // Hold a ref to the channel stack for the timer callback. + auto channel_stack = channel_stack_->Ref(); + auto timeout = client_idle_timeout_; + auto promise = Loop([timeout, idle_filter_state]() { + return TrySeq(Sleep(Timestamp::Now() + timeout), + [idle_filter_state]() -> Poll> { + if (idle_filter_state->CheckTimer()) { + return Continue{}; + } else { + return absl::OkStatus(); + } + }); + }); + activity_.Set(MakeActivity( + std::move(promise), ExecCtxWakeupScheduler{}, + [channel_stack, this](absl::Status status) { + if (status.ok()) CloseChannel(); + }, + channel_stack->EventEngine())); +} + +void LegacyChannelIdleFilter::CloseChannel() { + auto* op = grpc_make_transport_op(nullptr); + op->disconnect_with_error = grpc_error_set_int( + GRPC_ERROR_CREATE("enter idle"), + StatusIntProperty::ChannelConnectivityState, GRPC_CHANNEL_IDLE); + // Pass the transport op down to the channel stack. + auto* elem = grpc_channel_stack_element(channel_stack_, 0); + elem->filter->start_transport_op(elem, op); +} + +const grpc_channel_filter LegacyClientIdleFilter::kFilter = + MakePromiseBasedFilter( + "client_idle"); +const grpc_channel_filter LegacyMaxAgeFilter::kFilter = + MakePromiseBasedFilter( + "max_age"); + +void RegisterLegacyChannelIdleFilters(CoreConfiguration::Builder* builder) { + if (IsV3ChannelIdleFiltersEnabled()) return; + builder->channel_init() + ->RegisterFilter(GRPC_CLIENT_CHANNEL, &LegacyClientIdleFilter::kFilter) + .ExcludeFromMinimalStack() + .If([](const ChannelArgs& channel_args) { + return GetClientIdleTimeout(channel_args) != Duration::Infinity(); + }); + builder->channel_init() + ->RegisterFilter(GRPC_SERVER_CHANNEL, &LegacyMaxAgeFilter::kFilter) + .ExcludeFromMinimalStack() + .If([](const ChannelArgs& channel_args) { + return LegacyMaxAgeFilter::Config::FromChannelArgs(channel_args) + .enable(); + }); +} + +LegacyMaxAgeFilter::LegacyMaxAgeFilter(grpc_channel_stack* channel_stack, + const Config& max_age_config) + : LegacyChannelIdleFilter(channel_stack, + max_age_config.max_connection_idle), + max_connection_age_(max_age_config.max_connection_age), + max_connection_age_grace_(max_age_config.max_connection_age_grace) {} + +} // namespace grpc_core diff --git a/src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h b/src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h new file mode 100644 index 00000000000..8e06505d732 --- /dev/null +++ b/src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h @@ -0,0 +1,143 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_SRC_CORE_EXT_FILTERS_CHANNEL_IDLE_LEGACY_CHANNEL_IDLE_FILTER_H +#define GRPC_SRC_CORE_EXT_FILTERS_CHANNEL_IDLE_LEGACY_CHANNEL_IDLE_FILTER_H + +#include + +#include + +#include "absl/status/status.h" +#include "absl/status/statusor.h" + +#include + +#include "src/core/ext/filters/channel_idle/idle_filter_state.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_fwd.h" +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/promise_based_filter.h" +#include "src/core/lib/gprpp/orphanable.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/single_set_ptr.h" +#include "src/core/lib/gprpp/time.h" +#include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/arena_promise.h" +#include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/transport.h" + +namespace grpc_core { + +class LegacyChannelIdleFilter : public ChannelFilter { + public: + ~LegacyChannelIdleFilter() override = default; + + LegacyChannelIdleFilter(const LegacyChannelIdleFilter&) = delete; + LegacyChannelIdleFilter& operator=(const LegacyChannelIdleFilter&) = delete; + LegacyChannelIdleFilter(LegacyChannelIdleFilter&&) = default; + LegacyChannelIdleFilter& operator=(LegacyChannelIdleFilter&&) = default; + + // Construct a promise for one call. + ArenaPromise MakeCallPromise( + CallArgs call_args, NextPromiseFactory next_promise_factory) override; + + bool StartTransportOp(grpc_transport_op* op) override; + + protected: + using SingleSetActivityPtr = + SingleSetPtr; + + LegacyChannelIdleFilter(grpc_channel_stack* channel_stack, + Duration client_idle_timeout) + : channel_stack_(channel_stack), + client_idle_timeout_(client_idle_timeout) {} + + grpc_channel_stack* channel_stack() { return channel_stack_; }; + + virtual void Shutdown(); + void CloseChannel(); + + void IncreaseCallCount(); + void DecreaseCallCount(); + + private: + void StartIdleTimer(); + + struct CallCountDecreaser { + void operator()(LegacyChannelIdleFilter* filter) const { + filter->DecreaseCallCount(); + } + }; + + // The channel stack to which we take refs for pending callbacks. + grpc_channel_stack* channel_stack_; + Duration client_idle_timeout_; + std::shared_ptr idle_filter_state_{ + std::make_shared(false)}; + + SingleSetActivityPtr activity_; +}; + +class LegacyClientIdleFilter final : public LegacyChannelIdleFilter { + public: + static const grpc_channel_filter kFilter; + + static absl::StatusOr Create( + const ChannelArgs& args, ChannelFilter::Args filter_args); + + private: + using LegacyChannelIdleFilter::LegacyChannelIdleFilter; +}; + +class LegacyMaxAgeFilter final : public LegacyChannelIdleFilter { + public: + static const grpc_channel_filter kFilter; + struct Config; + + static absl::StatusOr Create( + const ChannelArgs& args, ChannelFilter::Args filter_args); + + void PostInit() override; + + private: + class ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface { + public: + explicit ConnectivityWatcher(LegacyMaxAgeFilter* filter) + : channel_stack_(filter->channel_stack()->Ref()), filter_(filter) {} + ~ConnectivityWatcher() override = default; + + void OnConnectivityStateChange(grpc_connectivity_state new_state, + const absl::Status&) override { + if (new_state == GRPC_CHANNEL_SHUTDOWN) filter_->Shutdown(); + } + + private: + RefCountedPtr channel_stack_; + LegacyMaxAgeFilter* filter_; + }; + + LegacyMaxAgeFilter(grpc_channel_stack* channel_stack, + const Config& max_age_config); + + void Shutdown() override; + + SingleSetActivityPtr max_age_activity_; + Duration max_connection_age_; + Duration max_connection_age_grace_; +}; + +} // namespace grpc_core + +#endif // GRPC_SRC_CORE_EXT_FILTERS_CHANNEL_IDLE_LEGACY_CHANNEL_IDLE_FILTER_H diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index d44fdac9f72..0639dd28155 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -147,6 +147,9 @@ const char* const description_unconstrained_max_quota_buffer_size = "Discard the cap on the max free pool size for one memory allocator"; const char* const additional_constraints_unconstrained_max_quota_buffer_size = "{}"; +const char* const description_v3_channel_idle_filters = + "Use the v3 filter API version of the idle filters."; +const char* const additional_constraints_v3_channel_idle_filters = "{}"; const char* const description_work_serializer_clears_time_cache = "Have the work serializer clear the time cache when it dispatches work."; const char* const additional_constraints_work_serializer_clears_time_cache = @@ -259,6 +262,8 @@ const ExperimentMetadata g_experiment_metadata[] = { description_unconstrained_max_quota_buffer_size, additional_constraints_unconstrained_max_quota_buffer_size, nullptr, 0, false, true}, + {"v3_channel_idle_filters", description_v3_channel_idle_filters, + additional_constraints_v3_channel_idle_filters, nullptr, 0, false, true}, {"work_serializer_clears_time_cache", description_work_serializer_clears_time_cache, additional_constraints_work_serializer_clears_time_cache, nullptr, 0, true, @@ -401,6 +406,9 @@ const char* const description_unconstrained_max_quota_buffer_size = "Discard the cap on the max free pool size for one memory allocator"; const char* const additional_constraints_unconstrained_max_quota_buffer_size = "{}"; +const char* const description_v3_channel_idle_filters = + "Use the v3 filter API version of the idle filters."; +const char* const additional_constraints_v3_channel_idle_filters = "{}"; const char* const description_work_serializer_clears_time_cache = "Have the work serializer clear the time cache when it dispatches work."; const char* const additional_constraints_work_serializer_clears_time_cache = @@ -513,6 +521,8 @@ const ExperimentMetadata g_experiment_metadata[] = { description_unconstrained_max_quota_buffer_size, additional_constraints_unconstrained_max_quota_buffer_size, nullptr, 0, false, true}, + {"v3_channel_idle_filters", description_v3_channel_idle_filters, + additional_constraints_v3_channel_idle_filters, nullptr, 0, false, true}, {"work_serializer_clears_time_cache", description_work_serializer_clears_time_cache, additional_constraints_work_serializer_clears_time_cache, nullptr, 0, true, @@ -655,6 +665,9 @@ const char* const description_unconstrained_max_quota_buffer_size = "Discard the cap on the max free pool size for one memory allocator"; const char* const additional_constraints_unconstrained_max_quota_buffer_size = "{}"; +const char* const description_v3_channel_idle_filters = + "Use the v3 filter API version of the idle filters."; +const char* const additional_constraints_v3_channel_idle_filters = "{}"; const char* const description_work_serializer_clears_time_cache = "Have the work serializer clear the time cache when it dispatches work."; const char* const additional_constraints_work_serializer_clears_time_cache = @@ -767,6 +780,8 @@ const ExperimentMetadata g_experiment_metadata[] = { description_unconstrained_max_quota_buffer_size, additional_constraints_unconstrained_max_quota_buffer_size, nullptr, 0, false, true}, + {"v3_channel_idle_filters", description_v3_channel_idle_filters, + additional_constraints_v3_channel_idle_filters, nullptr, 0, false, true}, {"work_serializer_clears_time_cache", description_work_serializer_clears_time_cache, additional_constraints_work_serializer_clears_time_cache, nullptr, 0, true, diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index 1b023557b33..bf17ee2ce3b 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -107,6 +107,7 @@ inline bool IsTcpFrameSizeTuningEnabled() { return false; } inline bool IsTcpRcvLowatEnabled() { return false; } inline bool IsTraceRecordCallopsEnabled() { return false; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } +inline bool IsV3ChannelIdleFiltersEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; } inline bool IsWorkSerializerDispatchEnabled() { return false; } @@ -169,6 +170,7 @@ inline bool IsTcpFrameSizeTuningEnabled() { return false; } inline bool IsTcpRcvLowatEnabled() { return false; } inline bool IsTraceRecordCallopsEnabled() { return false; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } +inline bool IsV3ChannelIdleFiltersEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; } inline bool IsWorkSerializerDispatchEnabled() { return false; } @@ -231,6 +233,7 @@ inline bool IsTcpFrameSizeTuningEnabled() { return false; } inline bool IsTcpRcvLowatEnabled() { return false; } inline bool IsTraceRecordCallopsEnabled() { return false; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } +inline bool IsV3ChannelIdleFiltersEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; } inline bool IsWorkSerializerDispatchEnabled() { return false; } @@ -277,6 +280,7 @@ enum ExperimentIds { kExperimentIdTcpRcvLowat, kExperimentIdTraceRecordCallops, kExperimentIdUnconstrainedMaxQuotaBufferSize, + kExperimentIdV3ChannelIdleFilters, kExperimentIdWorkSerializerClearsTimeCache, kExperimentIdWorkSerializerDispatch, kExperimentIdWriteSizePolicy, @@ -416,6 +420,10 @@ inline bool IsTraceRecordCallopsEnabled() { inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return IsExperimentEnabled(kExperimentIdUnconstrainedMaxQuotaBufferSize); } +#define GRPC_EXPERIMENT_IS_INCLUDED_V3_CHANNEL_IDLE_FILTERS +inline bool IsV3ChannelIdleFiltersEnabled() { + return IsExperimentEnabled(kExperimentIdV3ChannelIdleFilters); +} #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE inline bool IsWorkSerializerClearsTimeCacheEnabled() { return IsExperimentEnabled(kExperimentIdWorkSerializerClearsTimeCache); diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index e3b89572e43..54b3ea6ff15 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -252,6 +252,12 @@ expiry: 2024/02/01 owner: ctiller@google.com test_tags: [resource_quota_test] +- name: v3_channel_idle_filters + description: + Use the v3 filter API version of the idle filters. + expiry: 2024/04/04 + owner: ctiller@google.com + test_tags: [] - name: work_serializer_clears_time_cache description: Have the work serializer clear the time cache when it dispatches work. diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc index 33ca141b0e2..0401a799ba5 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_plugin_registry.cc @@ -40,6 +40,8 @@ extern void SecurityRegisterHandshakerFactories( CoreConfiguration::Builder* builder); extern void RegisterClientAuthorityFilter(CoreConfiguration::Builder* builder); extern void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder); +extern void RegisterLegacyChannelIdleFilters( + CoreConfiguration::Builder* builder); extern void RegisterDeadlineFilter(CoreConfiguration::Builder* builder); extern void RegisterGrpcLbPolicy(CoreConfiguration::Builder* builder); extern void RegisterHttpFilters(CoreConfiguration::Builder* builder); @@ -89,6 +91,7 @@ void BuildCoreConfiguration(CoreConfiguration::Builder* builder) { SecurityRegisterHandshakerFactories(builder); RegisterClientAuthorityFilter(builder); RegisterChannelIdleFilters(builder); + RegisterLegacyChannelIdleFilters(builder); RegisterConnectedChannel(builder); RegisterGrpcLbPolicy(builder); RegisterHttpFilters(builder); diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index d6304bccbdc..c0439924822 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -19,6 +19,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/census/grpc_context.cc', 'src/core/ext/filters/channel_idle/channel_idle_filter.cc', 'src/core/ext/filters/channel_idle/idle_filter_state.cc', + 'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc', 'src/core/ext/filters/client_channel/backend_metric.cc', 'src/core/ext/filters/client_channel/backup_poller.cc', 'src/core/ext/filters/client_channel/channel_connectivity.cc', diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 21666ae9e51..b465d10a271 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1087,6 +1087,8 @@ src/core/ext/filters/channel_idle/channel_idle_filter.cc \ src/core/ext/filters/channel_idle/channel_idle_filter.h \ src/core/ext/filters/channel_idle/idle_filter_state.cc \ src/core/ext/filters/channel_idle/idle_filter_state.h \ +src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc \ +src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h \ src/core/ext/filters/client_channel/backend_metric.cc \ src/core/ext/filters/client_channel/backend_metric.h \ src/core/ext/filters/client_channel/backup_poller.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index b7b1ae5bf25..7a3ea87d2ac 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -891,6 +891,8 @@ src/core/ext/filters/channel_idle/channel_idle_filter.cc \ src/core/ext/filters/channel_idle/channel_idle_filter.h \ src/core/ext/filters/channel_idle/idle_filter_state.cc \ src/core/ext/filters/channel_idle/idle_filter_state.h \ +src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc \ +src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h \ src/core/ext/filters/client_channel/README.md \ src/core/ext/filters/client_channel/backend_metric.cc \ src/core/ext/filters/client_channel/backend_metric.h \