From e614dafd893f0e12f70d16d16b28cbb204fac749 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 30 Apr 2024 19:42:52 -0700 Subject: [PATCH] [woah] Remove deadline filter (#36477) Also begin to eliminate `CallContext` in favor of just exposing `Call` - ultimately there's not really a need to introduce two types here, so I'm going to wind that idea back over a few PRs. I've avoided making this an experiment as the changes required were quite structural. Closes #36477 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36477 from ctiller:deadline-time 9856eeebe67d2ee680c05ae5426e8c94dd4c4d1c PiperOrigin-RevId: 629599230 --- BUILD | 2 - CMakeLists.txt | 2 - Makefile | 1 - Package.swift | 2 - bazel/experiments.bzl | 18 - build_autogenerated.yaml | 4 - config.m4 | 2 - config.w32 | 2 - gRPC-C++.podspec | 2 - gRPC-Core.podspec | 3 - grpc.gemspec | 2 - include/grpc/impl/channel_arg_names.h | 3 - package.xml | 2 - src/core/BUILD | 37 -- .../client_channel/client_channel_filter.cc | 56 ++- .../client_channel/client_channel_filter.h | 2 - .../ext/filters/deadline/deadline_filter.cc | 408 ------------------ .../ext/filters/deadline/deadline_filter.h | 85 ---- .../message_size/message_size_filter.cc | 13 +- src/core/lib/channel/context.h | 30 +- src/core/lib/experiments/experiments.yaml | 2 +- src/core/lib/surface/call.cc | 290 +++---------- src/core/lib/surface/call.h | 177 +++++++- src/core/lib/surface/server.cc | 7 +- src/core/lib/transport/call_filters.h | 2 +- .../plugin_registry/grpc_plugin_registry.cc | 2 - src/python/grpcio/grpc_core_dependencies.py | 1 - .../channel/minimal_stack_is_minimal_test.cc | 20 +- test/core/end2end/end2end_tests.h | 39 +- test/core/end2end/tests/http2_stats.cc | 8 +- tools/doxygen/Doxyfile.c++.internal | 2 - tools/doxygen/Doxyfile.core.internal | 2 - 32 files changed, 356 insertions(+), 872 deletions(-) delete mode 100644 src/core/ext/filters/deadline/deadline_filter.cc delete mode 100644 src/core/ext/filters/deadline/deadline_filter.h diff --git a/BUILD b/BUILD index 186560fd1a8..9de0728c1e1 100644 --- a/BUILD +++ b/BUILD @@ -851,7 +851,6 @@ grpc_cc_library( # standard plugins "census", "//src/core:grpc_backend_metric_filter", - "//src/core:grpc_deadline_filter", "//src/core:grpc_client_authority_filter", "//src/core:grpc_lb_policy_grpclb", "//src/core:grpc_lb_policy_outlier_detection", @@ -3697,7 +3696,6 @@ grpc_cc_library( "//src/core:gpr_atm", "//src/core:gpr_manual_constructor", "//src/core:grpc_backend_metric_data", - "//src/core:grpc_deadline_filter", "//src/core:grpc_message_size_filter", "//src/core:grpc_service_config", "//src/core:init_internally", diff --git a/CMakeLists.txt b/CMakeLists.txt index 952d08a5db3..f6418f03598 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1846,7 +1846,6 @@ add_library(grpc src/core/ext/filters/census/grpc_context.cc src/core/ext/filters/channel_idle/idle_filter_state.cc src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc - src/core/ext/filters/deadline/deadline_filter.cc src/core/ext/filters/fault_injection/fault_injection_filter.cc src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc src/core/ext/filters/http/client/http_client_filter.cc @@ -2943,7 +2942,6 @@ add_library(grpc_unsecure src/core/ext/filters/census/grpc_context.cc src/core/ext/filters/channel_idle/idle_filter_state.cc src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc - src/core/ext/filters/deadline/deadline_filter.cc src/core/ext/filters/fault_injection/fault_injection_filter.cc src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc src/core/ext/filters/http/client/http_client_filter.cc diff --git a/Makefile b/Makefile index fbbc677801c..c397bcad2cd 100644 --- a/Makefile +++ b/Makefile @@ -689,7 +689,6 @@ LIBGRPC_SRC = \ src/core/ext/filters/census/grpc_context.cc \ src/core/ext/filters/channel_idle/idle_filter_state.cc \ src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc \ - src/core/ext/filters/deadline/deadline_filter.cc \ src/core/ext/filters/fault_injection/fault_injection_filter.cc \ src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc \ src/core/ext/filters/http/client/http_client_filter.cc \ diff --git a/Package.swift b/Package.swift index 9b5a71cabf2..756f3643856 100644 --- a/Package.swift +++ b/Package.swift @@ -165,8 +165,6 @@ let package = Package( "src/core/ext/filters/channel_idle/idle_filter_state.h", "src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc", "src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h", - "src/core/ext/filters/deadline/deadline_filter.cc", - "src/core/ext/filters/deadline/deadline_filter.h", "src/core/ext/filters/fault_injection/fault_injection_filter.cc", "src/core/ext/filters/fault_injection/fault_injection_filter.h", "src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc", diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl index e366840a1e6..679b754b05d 100644 --- a/bazel/experiments.bzl +++ b/bazel/experiments.bzl @@ -62,9 +62,6 @@ EXPERIMENTS = { "core_end2end_test": [ "promise_based_server_call", ], - "cpp_end2end_test": [ - "promise_based_server_call", - ], "endpoint_test": [ "tcp_frame_size_tuning", "tcp_rcv_lowat", @@ -83,9 +80,6 @@ EXPERIMENTS = { "free_large_allocator", "unconstrained_max_quota_buffer_size", ], - "xds_end2end_test": [ - "promise_based_server_call", - ], }, "on": { "core_end2end_test": [ @@ -112,9 +106,6 @@ EXPERIMENTS = { "core_end2end_test": [ "promise_based_server_call", ], - "cpp_end2end_test": [ - "promise_based_server_call", - ], "endpoint_test": [ "tcp_frame_size_tuning", "tcp_rcv_lowat", @@ -133,9 +124,6 @@ EXPERIMENTS = { "free_large_allocator", "unconstrained_max_quota_buffer_size", ], - "xds_end2end_test": [ - "promise_based_server_call", - ], }, "on": { "cpp_lb_end2end_test": [ @@ -159,9 +147,6 @@ EXPERIMENTS = { "promise_based_client_call", "promise_based_server_call", ], - "cpp_end2end_test": [ - "promise_based_server_call", - ], "endpoint_test": [ "tcp_frame_size_tuning", "tcp_rcv_lowat", @@ -186,9 +171,6 @@ EXPERIMENTS = { "free_large_allocator", "unconstrained_max_quota_buffer_size", ], - "xds_end2end_test": [ - "promise_based_server_call", - ], }, "on": { "cancel_ares_query_test": [ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index ea77336413c..66468a030a5 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -246,7 +246,6 @@ libs: - src/core/ext/filters/backend_metrics/backend_metric_provider.h - src/core/ext/filters/channel_idle/idle_filter_state.h - src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h - - src/core/ext/filters/deadline/deadline_filter.h - src/core/ext/filters/fault_injection/fault_injection_filter.h - src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h - src/core/ext/filters/http/client/http_client_filter.h @@ -1267,7 +1266,6 @@ libs: - src/core/ext/filters/census/grpc_context.cc - src/core/ext/filters/channel_idle/idle_filter_state.cc - src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc - - src/core/ext/filters/deadline/deadline_filter.cc - src/core/ext/filters/fault_injection/fault_injection_filter.cc - src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc - src/core/ext/filters/http/client/http_client_filter.cc @@ -2233,7 +2231,6 @@ libs: - src/core/ext/filters/backend_metrics/backend_metric_provider.h - src/core/ext/filters/channel_idle/idle_filter_state.h - src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h - - src/core/ext/filters/deadline/deadline_filter.h - src/core/ext/filters/fault_injection/fault_injection_filter.h - src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h - src/core/ext/filters/http/client/http_client_filter.h @@ -2723,7 +2720,6 @@ libs: - src/core/ext/filters/census/grpc_context.cc - src/core/ext/filters/channel_idle/idle_filter_state.cc - src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc - - src/core/ext/filters/deadline/deadline_filter.cc - src/core/ext/filters/fault_injection/fault_injection_filter.cc - src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc - src/core/ext/filters/http/client/http_client_filter.cc diff --git a/config.m4 b/config.m4 index b6a12477bb3..f2b4c4817e5 100644 --- a/config.m4 +++ b/config.m4 @@ -64,7 +64,6 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/census/grpc_context.cc \ src/core/ext/filters/channel_idle/idle_filter_state.cc \ src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc \ - src/core/ext/filters/deadline/deadline_filter.cc \ src/core/ext/filters/fault_injection/fault_injection_filter.cc \ src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc \ src/core/ext/filters/http/client/http_client_filter.cc \ @@ -1391,7 +1390,6 @@ if test "$PHP_GRPC" != "no"; then PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/backend_metrics) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/census) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/channel_idle) - PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/deadline) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/fault_injection) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http/client) diff --git a/config.w32 b/config.w32 index 2c98626829e..4643bbfedd6 100644 --- a/config.w32 +++ b/config.w32 @@ -29,7 +29,6 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\census\\grpc_context.cc " + "src\\core\\ext\\filters\\channel_idle\\idle_filter_state.cc " + "src\\core\\ext\\filters\\channel_idle\\legacy_channel_idle_filter.cc " + - "src\\core\\ext\\filters\\deadline\\deadline_filter.cc " + "src\\core\\ext\\filters\\fault_injection\\fault_injection_filter.cc " + "src\\core\\ext\\filters\\fault_injection\\fault_injection_service_config_parser.cc " + "src\\core\\ext\\filters\\http\\client\\http_client_filter.cc " + @@ -1385,7 +1384,6 @@ if (PHP_GRPC != "no") { FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\backend_metrics"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\census"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\channel_idle"); - FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\deadline"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\fault_injection"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\http"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\http\\client"); diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 23c1b6baba2..91edbbe6cb0 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -286,7 +286,6 @@ Pod::Spec.new do |s| 'src/core/ext/filters/backend_metrics/backend_metric_provider.h', 'src/core/ext/filters/channel_idle/idle_filter_state.h', 'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h', - 'src/core/ext/filters/deadline/deadline_filter.h', 'src/core/ext/filters/fault_injection/fault_injection_filter.h', 'src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h', 'src/core/ext/filters/http/client/http_client_filter.h', @@ -1575,7 +1574,6 @@ Pod::Spec.new do |s| 'src/core/ext/filters/backend_metrics/backend_metric_provider.h', 'src/core/ext/filters/channel_idle/idle_filter_state.h', 'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h', - 'src/core/ext/filters/deadline/deadline_filter.h', 'src/core/ext/filters/fault_injection/fault_injection_filter.h', 'src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h', 'src/core/ext/filters/http/client/http_client_filter.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 03b7c558794..fe5a8f48f84 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -282,8 +282,6 @@ Pod::Spec.new do |s| 'src/core/ext/filters/channel_idle/idle_filter_state.h', 'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc', 'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h', - 'src/core/ext/filters/deadline/deadline_filter.cc', - 'src/core/ext/filters/deadline/deadline_filter.h', 'src/core/ext/filters/fault_injection/fault_injection_filter.cc', 'src/core/ext/filters/fault_injection/fault_injection_filter.h', 'src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc', @@ -2375,7 +2373,6 @@ Pod::Spec.new do |s| 'src/core/ext/filters/backend_metrics/backend_metric_provider.h', 'src/core/ext/filters/channel_idle/idle_filter_state.h', 'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h', - 'src/core/ext/filters/deadline/deadline_filter.h', 'src/core/ext/filters/fault_injection/fault_injection_filter.h', 'src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h', 'src/core/ext/filters/http/client/http_client_filter.h', diff --git a/grpc.gemspec b/grpc.gemspec index bd4fc1e426e..f803c2aa549 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -171,8 +171,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/channel_idle/idle_filter_state.h ) s.files += %w( src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc ) s.files += %w( src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h ) - s.files += %w( src/core/ext/filters/deadline/deadline_filter.cc ) - s.files += %w( src/core/ext/filters/deadline/deadline_filter.h ) s.files += %w( src/core/ext/filters/fault_injection/fault_injection_filter.cc ) s.files += %w( src/core/ext/filters/fault_injection/fault_injection_filter.h ) s.files += %w( src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc ) diff --git a/include/grpc/impl/channel_arg_names.h b/include/grpc/impl/channel_arg_names.h index 7ba19bebb17..663e9620a34 100644 --- a/include/grpc/impl/channel_arg_names.h +++ b/include/grpc/impl/channel_arg_names.h @@ -67,9 +67,6 @@ application will see the compressed message in the byte buffer. */ #define GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION \ "grpc.per_message_decompression" -/** Enable/disable support for deadline checking. Defaults to 1, unless - GRPC_ARG_MINIMAL_STACK is enabled, in which case it defaults to 0 */ -#define GRPC_ARG_ENABLE_DEADLINE_CHECKS "grpc.enable_deadline_checking" /** Initial stream ID for http2 transports. Int valued. */ #define GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER \ "grpc.http2.initial_sequence_number" diff --git a/package.xml b/package.xml index 50eb12649b2..62b2b535a98 100644 --- a/package.xml +++ b/package.xml @@ -153,8 +153,6 @@ - - diff --git a/src/core/BUILD b/src/core/BUILD index f3343501ede..cd92184a8f0 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -4524,42 +4524,6 @@ grpc_cc_library( ], ) -grpc_cc_library( - name = "grpc_deadline_filter", - srcs = [ - "ext/filters/deadline/deadline_filter.cc", - ], - hdrs = [ - "ext/filters/deadline/deadline_filter.h", - ], - external_deps = [ - "absl/status", - "absl/types:optional", - ], - language = "c++", - deps = [ - "arena", - "arena_promise", - "channel_fwd", - "channel_stack_type", - "closure", - "context", - "error", - "metadata_batch", - "status_helper", - "time", - "//:call_combiner", - "//:channel_arg_names", - "//:config", - "//:debug_location", - "//:exec_ctx", - "//:gpr", - "//:grpc_base", - "//:grpc_public_hdrs", - "//:iomgr_timer", - ], -) - grpc_cc_library( name = "grpc_client_authority_filter", srcs = [ @@ -4613,7 +4577,6 @@ grpc_cc_library( "channel_fwd", "channel_stack_type", "context", - "grpc_deadline_filter", "grpc_service_config", "json", "json_args", diff --git a/src/core/client_channel/client_channel_filter.cc b/src/core/client_channel/client_channel_filter.cc index c11f5293bfa..1ffcfb5e4ca 100644 --- a/src/core/client_channel/client_channel_filter.cc +++ b/src/core/client_channel/client_channel_filter.cc @@ -61,7 +61,6 @@ #include "src/core/client_channel/retry_filter.h" #include "src/core/client_channel/subchannel.h" #include "src/core/client_channel/subchannel_interface_internal.h" -#include "src/core/ext/filters/deadline/deadline_filter.h" #include "src/core/handshaker/proxy_mapper_registry.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" @@ -210,14 +209,14 @@ class ClientChannelFilter::FilterBasedCallData final const grpc_call_element_args& args); ~FilterBasedCallData() override; - grpc_call_element* elem() const { return deadline_state_.elem; } - grpc_call_stack* owning_call() const { return deadline_state_.call_stack; } - CallCombiner* call_combiner() const { return deadline_state_.call_combiner; } + grpc_call_element* elem() const { return elem_; } + grpc_call_stack* owning_call() const { return owning_call_; } + CallCombiner* call_combiner() const { return call_combiner_; } ClientChannelFilter* chand() const override { return static_cast(elem()->channel_data); } - Arena* arena() const override { return deadline_state_.arena; } + Arena* arena() const override { return arena_; } grpc_polling_entity* pollent() override { return pollent_; } grpc_metadata_batch* send_initial_metadata() override { return pending_batches_[0] @@ -270,10 +269,8 @@ class ClientChannelFilter::FilterBasedCallData final void ResetDeadline(Duration timeout) override { const Timestamp per_method_deadline = Timestamp::FromCycleCounterRoundUp(call_start_time_) + timeout; - if (per_method_deadline < deadline_) { - deadline_ = per_method_deadline; - grpc_deadline_state_reset(&deadline_state_, deadline_); - } + static_cast(call_context_[GRPC_CONTEXT_CALL].value) + ->UpdateDeadline(per_method_deadline); } void CreateDynamicCall(); @@ -286,8 +283,10 @@ class ClientChannelFilter::FilterBasedCallData final gpr_cycle_counter call_start_time_; Timestamp deadline_; - // State for handling deadlines. - grpc_deadline_state deadline_state_; + Arena* const arena_; + grpc_call_element* const elem_; + grpc_call_stack* const owning_call_; + CallCombiner* const call_combiner_; grpc_polling_entity* pollent_ = nullptr; @@ -387,11 +386,12 @@ class ClientChannelFilter::PromiseBasedCallData final } void ResetDeadline(Duration timeout) override { + Call* call = GetContext(); CallContext* call_context = GetContext(); const Timestamp per_method_deadline = Timestamp::FromCycleCounterRoundUp(call_context->call_start_time()) + timeout; - call_context->UpdateDeadline(per_method_deadline); + call->UpdateDeadline(per_method_deadline); } ClientChannelFilter* chand_; @@ -1230,9 +1230,6 @@ RefCountedPtr GetSubchannelPool( ClientChannelFilter::ClientChannelFilter(grpc_channel_element_args* args, grpc_error_handle* error) : channel_args_(args->channel_args), - deadline_checking_enabled_( - channel_args_.GetBool(GRPC_ARG_ENABLE_DEADLINE_CHECKS) - .value_or(!channel_args_.WantMinimalStack())), owning_stack_(args->channel_stack), client_channel_factory_(channel_args_.GetObject()), channelz_node_(channel_args_.GetObject()), @@ -2112,8 +2109,7 @@ grpc_error_handle ClientChannelFilter::CallData::ApplyServiceConfigToCallLocked( if (method_params != nullptr) { // If the deadline from the service config is shorter than the one // from the client API, reset the deadline timer. - if (chand()->deadline_checking_enabled_ && - method_params->timeout() != Duration::Zero()) { + if (method_params->timeout() != Duration::Zero()) { ResetDeadline(method_params->timeout()); } // If the service config set wait_for_ready and the application @@ -2213,12 +2209,10 @@ ClientChannelFilter::FilterBasedCallData::FilterBasedCallData( call_context_(args.context), call_start_time_(args.start_time), deadline_(args.deadline), - deadline_state_( - elem, args, - GPR_LIKELY(static_cast(elem->channel_data) - ->deadline_checking_enabled_) - ? args.deadline - : Timestamp::InfFuture()) { + arena_(args.arena), + elem_(elem), + owning_call_(args.call_stack), + call_combiner_(args.call_combiner) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: created call", chand(), this); } @@ -2262,10 +2256,6 @@ void ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch( gpr_log(GPR_INFO, "chand=%p calld=%p: batch started from above: %s", chand, calld, grpc_transport_stream_op_batch_string(batch, false).c_str()); } - if (GPR_LIKELY(chand->deadline_checking_enabled_)) { - grpc_deadline_state_client_start_transport_stream_op_batch( - &calld->deadline_state_, batch); - } // Intercept recv_trailing_metadata to commit the call, in case we wind up // failing the call before we get down to the retry or LB call layer. if (batch->recv_trailing_metadata) { @@ -3056,7 +3046,6 @@ ClientChannelFilter::FilterBasedLoadBalancedCall::FilterBasedLoadBalancedCall( absl::AnyInvocable on_commit, bool is_transparent_retry) : LoadBalancedCall(chand, args.context, std::move(on_commit), is_transparent_retry), - deadline_(args.deadline), arena_(args.arena), owning_call_(args.call_stack), call_combiner_(args.call_combiner), @@ -3356,8 +3345,12 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall:: // Get status from error. grpc_status_code code; std::string message; - grpc_error_get_status(error, self->deadline_, &code, &message, - /*http_error=*/nullptr, /*error_string=*/nullptr); + grpc_error_get_status( + error, + static_cast(self->call_context()[GRPC_CONTEXT_CALL].value) + ->deadline(), + &code, &message, + /*http_error=*/nullptr, /*error_string=*/nullptr); status = absl::Status(static_cast(code), message); } else { // Get status from headers. @@ -3495,7 +3488,8 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall::CreateSubchannelCall() { CHECK_NE(path, nullptr); SubchannelCall::Args call_args = { connected_subchannel()->Ref(), pollent_, path->Ref(), /*start_time=*/0, - deadline_, arena_, + static_cast(call_context()[GRPC_CONTEXT_CALL].value)->deadline(), + arena_, // TODO(roth): When we implement hedging support, we will probably // need to use a separate call context for each subchannel call. call_context(), call_combiner_}; diff --git a/src/core/client_channel/client_channel_filter.h b/src/core/client_channel/client_channel_filter.h index f9ae44b0d26..28c21d3280c 100644 --- a/src/core/client_channel/client_channel_filter.h +++ b/src/core/client_channel/client_channel_filter.h @@ -287,7 +287,6 @@ class ClientChannelFilter final { // Fields set at construction and never modified. // ChannelArgs channel_args_; - const bool deadline_checking_enabled_; grpc_channel_stack* owning_stack_; ClientChannelFactory* client_channel_factory_; RefCountedPtr default_service_config_; @@ -558,7 +557,6 @@ class ClientChannelFilter::FilterBasedLoadBalancedCall final // TODO(roth): Instead of duplicating these fields in every filter // that uses any one of them, we should store them in the call // context. This will save per-call memory overhead. - Timestamp deadline_; Arena* arena_; grpc_call_stack* owning_call_; CallCombiner* call_combiner_; diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc deleted file mode 100644 index a2dc24594e2..00000000000 --- a/src/core/ext/filters/deadline/deadline_filter.cc +++ /dev/null @@ -1,408 +0,0 @@ -// -// Copyright 2016 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -#include - -#include "src/core/ext/filters/deadline/deadline_filter.h" - -#include -#include -#include -#include - -#include "absl/status/status.h" -#include "absl/types/optional.h" - -#include -#include -#include - -#include "src/core/lib/config/core_configuration.h" -#include "src/core/lib/gprpp/debug_location.h" -#include "src/core/lib/gprpp/status_helper.h" -#include "src/core/lib/iomgr/error.h" -#include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/iomgr/timer.h" -#include "src/core/lib/promise/arena_promise.h" -#include "src/core/lib/promise/context.h" -#include "src/core/lib/surface/call.h" -#include "src/core/lib/surface/channel_stack_type.h" -#include "src/core/lib/transport/metadata_batch.h" - -namespace grpc_core { - -// A fire-and-forget class representing a pending deadline timer. -// Allocated on the call arena. -class TimerState { - public: - TimerState(grpc_deadline_state* deadline_state, Timestamp deadline) - : deadline_state_(deadline_state) { - GRPC_CALL_STACK_REF(deadline_state->call_stack, "DeadlineTimerState"); - GRPC_CLOSURE_INIT(&closure_, TimerCallback, this, nullptr); - grpc_timer_init(&timer_, deadline, &closure_); - } - - void Cancel() { grpc_timer_cancel(&timer_); } - - private: - // The on_complete callback used when sending a cancel_error batch down the - // filter stack. Yields the call combiner when the batch returns. - static void YieldCallCombiner(void* arg, grpc_error_handle /*ignored*/) { - TimerState* self = static_cast(arg); - GRPC_CALL_COMBINER_STOP(self->deadline_state_->call_combiner, - "got on_complete from cancel_stream batch"); - GRPC_CALL_STACK_UNREF(self->deadline_state_->call_stack, - "DeadlineTimerState"); - } - - // This is called via the call combiner, so access to deadline_state is - // synchronized. - static void SendCancelOpInCallCombiner(void* arg, grpc_error_handle error) { - TimerState* self = static_cast(arg); - grpc_transport_stream_op_batch* batch = grpc_make_transport_stream_op( - GRPC_CLOSURE_INIT(&self->closure_, YieldCallCombiner, self, nullptr)); - batch->cancel_stream = true; - batch->payload->cancel_stream.cancel_error = error; - grpc_call_element* elem = self->deadline_state_->elem; - elem->filter->start_transport_stream_op_batch(elem, batch); - } - - // Timer callback. - static void TimerCallback(void* arg, grpc_error_handle error) { - TimerState* self = static_cast(arg); - if (error != absl::CancelledError()) { - error = grpc_error_set_int(GRPC_ERROR_CREATE("Deadline Exceeded"), - StatusIntProperty::kRpcStatus, - GRPC_STATUS_DEADLINE_EXCEEDED); - self->deadline_state_->call_combiner->Cancel(error); - GRPC_CLOSURE_INIT(&self->closure_, SendCancelOpInCallCombiner, self, - nullptr); - GRPC_CALL_COMBINER_START(self->deadline_state_->call_combiner, - &self->closure_, error, - "deadline exceeded -- sending cancel_stream op"); - } else { - GRPC_CALL_STACK_UNREF(self->deadline_state_->call_stack, - "DeadlineTimerState"); - } - } - - // NOTE: This object's dtor is never called, so do not add any data - // members that require destruction! - // TODO(roth): We should ideally call this object's dtor somewhere, - // but that would require adding more synchronization, because we'd - // need to call the dtor only after both (a) the timer callback - // finishes and (b) the filter sees the call completion and attempts - // to cancel the timer. - grpc_deadline_state* deadline_state_; - grpc_timer timer_; - grpc_closure closure_; -}; - -} // namespace grpc_core - -// -// grpc_deadline_state -// - -// Starts the deadline timer. -// This is called via the call combiner, so access to deadline_state is -// synchronized. -static void start_timer_if_needed(grpc_deadline_state* deadline_state, - grpc_core::Timestamp deadline) { - if (deadline == grpc_core::Timestamp::InfFuture()) return; - GPR_ASSERT(deadline_state->timer_state == nullptr); - deadline_state->timer_state = - deadline_state->arena->New(deadline_state, - deadline); -} - -// Cancels the deadline timer. -// This is called via the call combiner, so access to deadline_state is -// synchronized. -static void cancel_timer_if_needed(grpc_deadline_state* deadline_state) { - if (deadline_state->timer_state != nullptr) { - deadline_state->timer_state->Cancel(); - deadline_state->timer_state = nullptr; - } -} - -// Callback run when we receive trailing metadata. -static void recv_trailing_metadata_ready(void* arg, grpc_error_handle error) { - grpc_deadline_state* deadline_state = static_cast(arg); - cancel_timer_if_needed(deadline_state); - // Invoke the original callback. - grpc_core::Closure::Run(DEBUG_LOCATION, - deadline_state->original_recv_trailing_metadata_ready, - error); -} - -// Inject our own recv_trailing_metadata_ready callback into op. -static void inject_recv_trailing_metadata_ready( - grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) { - deadline_state->original_recv_trailing_metadata_ready = - op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready, - recv_trailing_metadata_ready, deadline_state, - grpc_schedule_on_exec_ctx); - op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &deadline_state->recv_trailing_metadata_ready; -} - -// Callback and associated state for starting the timer after call stack -// initialization has been completed. -struct start_timer_after_init_state { - start_timer_after_init_state(grpc_deadline_state* deadline_state, - grpc_core::Timestamp deadline) - : deadline_state(deadline_state), deadline(deadline) {} - ~start_timer_after_init_state() { - start_timer_if_needed(deadline_state, deadline); - } - - bool in_call_combiner = false; - grpc_deadline_state* deadline_state; - grpc_core::Timestamp deadline; - grpc_closure closure; -}; -static void start_timer_after_init(void* arg, grpc_error_handle error) { - struct start_timer_after_init_state* state = - static_cast(arg); - grpc_deadline_state* deadline_state = state->deadline_state; - if (!state->in_call_combiner) { - // We are initially called without holding the call combiner, so we - // need to bounce ourselves into it. - state->in_call_combiner = true; - GRPC_CALL_COMBINER_START(deadline_state->call_combiner, &state->closure, - error, "scheduling deadline timer"); - return; - } - delete state; - GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner, - "done scheduling deadline timer"); -} - -grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem, - const grpc_call_element_args& args, - grpc_core::Timestamp deadline) - : elem(elem), - call_stack(args.call_stack), - call_combiner(args.call_combiner), - arena(args.arena) { - // Deadline will always be infinite on servers, so the timer will only be - // set on clients with a finite deadline. - if (deadline != grpc_core::Timestamp::InfFuture()) { - // When the deadline passes, we indicate the failure by sending down - // an op with cancel_error set. However, we can't send down any ops - // until after the call stack is fully initialized. If we start the - // timer here, we have no guarantee that the timer won't pop before - // call stack initialization is finished. To avoid that problem, we - // create a closure to start the timer, and we schedule that closure - // to be run after call stack initialization is done. - struct start_timer_after_init_state* state = - new start_timer_after_init_state(this, deadline); - GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state, - grpc_schedule_on_exec_ctx); - grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->closure, absl::OkStatus()); - } -} - -grpc_deadline_state::~grpc_deadline_state() { cancel_timer_if_needed(this); } - -void grpc_deadline_state_reset(grpc_deadline_state* deadline_state, - grpc_core::Timestamp new_deadline) { - cancel_timer_if_needed(deadline_state); - start_timer_if_needed(deadline_state, new_deadline); -} - -void grpc_deadline_state_client_start_transport_stream_op_batch( - grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) { - if (op->cancel_stream) { - cancel_timer_if_needed(deadline_state); - } else { - // Make sure we know when the call is complete, so that we can cancel - // the timer. - if (op->recv_trailing_metadata) { - inject_recv_trailing_metadata_ready(deadline_state, op); - } - } -} - -// -// filter code -// - -// Constructor for channel_data. Used for both client and server filters. -static grpc_error_handle deadline_init_channel_elem( - grpc_channel_element* /*elem*/, grpc_channel_element_args* args) { - GPR_ASSERT(!args->is_last); - return absl::OkStatus(); -} - -// Destructor for channel_data. Used for both client and server filters. -static void deadline_destroy_channel_elem(grpc_channel_element* /*elem*/) {} - -// Additional call data used only for the server filter. -struct server_call_data { - grpc_deadline_state deadline_state; // Must be first. - // The closure for receiving initial metadata. - grpc_closure recv_initial_metadata_ready; - // Received initial metadata batch. - grpc_metadata_batch* recv_initial_metadata; - // The original recv_initial_metadata_ready closure, which we chain to - // after our own closure is invoked. - grpc_closure* next_recv_initial_metadata_ready; -}; - -// Constructor for call_data. Used for both client and server filters. -static grpc_error_handle deadline_init_call_elem( - grpc_call_element* elem, const grpc_call_element_args* args) { - new (elem->call_data) grpc_deadline_state(elem, *args, args->deadline); - return absl::OkStatus(); -} - -// Destructor for call_data. Used for both client and server filters. -static void deadline_destroy_call_elem( - grpc_call_element* elem, const grpc_call_final_info* /*final_info*/, - grpc_closure* /*ignored*/) { - grpc_deadline_state* deadline_state = - static_cast(elem->call_data); - deadline_state->~grpc_deadline_state(); -} - -// Method for starting a call op for client filter. -static void deadline_client_start_transport_stream_op_batch( - grpc_call_element* elem, grpc_transport_stream_op_batch* op) { - grpc_deadline_state_client_start_transport_stream_op_batch( - static_cast(elem->call_data), op); - // Chain to next filter. - grpc_call_next_op(elem, op); -} - -// Callback for receiving initial metadata on the server. -static void recv_initial_metadata_ready(void* arg, grpc_error_handle error) { - grpc_call_element* elem = static_cast(arg); - server_call_data* calld = static_cast(elem->call_data); - start_timer_if_needed( - &calld->deadline_state, - calld->recv_initial_metadata->get(grpc_core::GrpcTimeoutMetadata()) - .value_or(grpc_core::Timestamp::InfFuture())); - // Invoke the next callback. - grpc_core::Closure::Run(DEBUG_LOCATION, - calld->next_recv_initial_metadata_ready, error); -} - -// Method for starting a call op for server filter. -static void deadline_server_start_transport_stream_op_batch( - grpc_call_element* elem, grpc_transport_stream_op_batch* op) { - server_call_data* calld = static_cast(elem->call_data); - if (op->cancel_stream) { - cancel_timer_if_needed(&calld->deadline_state); - } else { - // If we're receiving initial metadata, we need to get the deadline - // from the recv_initial_metadata_ready callback. So we inject our - // own callback into that hook. - if (op->recv_initial_metadata) { - calld->next_recv_initial_metadata_ready = - op->payload->recv_initial_metadata.recv_initial_metadata_ready; - calld->recv_initial_metadata = - op->payload->recv_initial_metadata.recv_initial_metadata; - GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, - recv_initial_metadata_ready, elem, - grpc_schedule_on_exec_ctx); - op->payload->recv_initial_metadata.recv_initial_metadata_ready = - &calld->recv_initial_metadata_ready; - } - // Make sure we know when the call is complete, so that we can cancel - // the timer. - // Note that we trigger this on recv_trailing_metadata, even though - // the client never sends trailing metadata, because this is the - // hook that tells us when the call is complete on the server side. - if (op->recv_trailing_metadata) { - inject_recv_trailing_metadata_ready(&calld->deadline_state, op); - } - } - // Chain to next filter. - grpc_call_next_op(elem, op); -} - -const grpc_channel_filter grpc_client_deadline_filter = { - deadline_client_start_transport_stream_op_batch, - [](grpc_channel_element*, grpc_core::CallArgs call_args, - grpc_core::NextPromiseFactory next_promise_factory) { - return next_promise_factory(std::move(call_args)); - }, - /* init_call: */ nullptr, - grpc_channel_next_op, - sizeof(grpc_deadline_state), - deadline_init_call_elem, - grpc_call_stack_ignore_set_pollset_or_pollset_set, - deadline_destroy_call_elem, - 0, // sizeof(channel_data) - deadline_init_channel_elem, - grpc_channel_stack_no_post_init, - deadline_destroy_channel_elem, - grpc_channel_next_get_info, - "deadline", -}; - -const grpc_channel_filter grpc_server_deadline_filter = { - deadline_server_start_transport_stream_op_batch, - [](grpc_channel_element*, grpc_core::CallArgs call_args, - grpc_core::NextPromiseFactory next_promise_factory) { - auto deadline = call_args.client_initial_metadata->get( - grpc_core::GrpcTimeoutMetadata()); - if (deadline.has_value()) { - grpc_core::GetContext()->UpdateDeadline( - *deadline); - } - return next_promise_factory(std::move(call_args)); - }, - [](grpc_channel_element*, grpc_core::CallSpineInterface* spine) { - grpc_core::DownCast(spine) - ->client_initial_metadata() - .receiver.InterceptAndMap([](grpc_core::ClientMetadataHandle md) { - auto deadline = md->get(grpc_core::GrpcTimeoutMetadata()); - if (deadline.has_value()) { - grpc_core::GetContext()->UpdateDeadline( - *deadline); - } - return md; - }); - }, - grpc_channel_next_op, - sizeof(server_call_data), - deadline_init_call_elem, - grpc_call_stack_ignore_set_pollset_or_pollset_set, - deadline_destroy_call_elem, - 0, // sizeof(channel_data) - deadline_init_channel_elem, - grpc_channel_stack_no_post_init, - deadline_destroy_channel_elem, - grpc_channel_next_get_info, - "deadline", -}; - -namespace grpc_core { -void RegisterDeadlineFilter(CoreConfiguration::Builder* builder) { - builder->channel_init() - ->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL, &grpc_client_deadline_filter) - .ExcludeFromMinimalStack() - .IfChannelArg(GRPC_ARG_ENABLE_DEADLINE_CHECKS, true); - builder->channel_init() - ->RegisterFilter(GRPC_SERVER_CHANNEL, &grpc_server_deadline_filter) - .ExcludeFromMinimalStack() - .IfChannelArg(GRPC_ARG_ENABLE_DEADLINE_CHECKS, true); -} -} // namespace grpc_core diff --git a/src/core/ext/filters/deadline/deadline_filter.h b/src/core/ext/filters/deadline/deadline_filter.h deleted file mode 100644 index 131f49d2473..00000000000 --- a/src/core/ext/filters/deadline/deadline_filter.h +++ /dev/null @@ -1,85 +0,0 @@ -// -// Copyright 2016 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -#ifndef GRPC_SRC_CORE_EXT_FILTERS_DEADLINE_DEADLINE_FILTER_H -#define GRPC_SRC_CORE_EXT_FILTERS_DEADLINE_DEADLINE_FILTER_H - -#include - -#include "src/core/lib/channel/channel_fwd.h" -#include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/gprpp/time.h" -#include "src/core/lib/iomgr/call_combiner.h" -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/resource_quota/arena.h" -#include "src/core/lib/transport/transport.h" - -namespace grpc_core { -class TimerState; -} // namespace grpc_core - -// State used for filters that enforce call deadlines. -// Must be the first field in the filter's call_data. -struct grpc_deadline_state { - grpc_deadline_state(grpc_call_element* elem, - const grpc_call_element_args& args, - grpc_core::Timestamp deadline); - ~grpc_deadline_state(); - - // We take a reference to the call stack for the timer callback. - grpc_call_element* elem; - grpc_call_stack* call_stack; - grpc_core::CallCombiner* call_combiner; - grpc_core::Arena* arena; - grpc_core::TimerState* timer_state = nullptr; - // Closure to invoke when we receive trailing metadata. - // We use this to cancel the timer. - grpc_closure recv_trailing_metadata_ready; - // The original recv_trailing_metadata_ready closure, which we chain to - // after our own closure is invoked. - grpc_closure* original_recv_trailing_metadata_ready; -}; - -// Cancels the existing timer and starts a new one with new_deadline. -// -// Note: It is generally safe to call this with an earlier deadline -// value than the current one, but not the reverse. No checks are done -// to ensure that the timer callback is not invoked while it is in the -// process of being reset, which means that attempting to increase the -// deadline may result in the timer being called twice. -// -// Note: Must be called while holding the call combiner. -void grpc_deadline_state_reset(grpc_deadline_state* deadline_state, - grpc_core::Timestamp new_deadline); - -// To be called from the client-side filter's start_transport_stream_op_batch() -// method. Ensures that the deadline timer is cancelled when the call -// is completed. -// -// Note: It is the caller's responsibility to chain to the next filter if -// necessary after this function returns. -// -// Note: Must be called while holding the call combiner. -void grpc_deadline_state_client_start_transport_stream_op_batch( - grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op); - -// Deadline filters for direct client channels and server channels. -// Note: Deadlines for non-direct client channels are handled by the -// client_channel filter. -extern const grpc_channel_filter grpc_client_deadline_filter; -extern const grpc_channel_filter grpc_server_deadline_filter; - -#endif // GRPC_SRC_CORE_EXT_FILTERS_DEADLINE_DEADLINE_FILTER_H diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index 379d4944788..0933d633272 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -29,7 +29,6 @@ #include #include -#include "src/core/ext/filters/deadline/deadline_filter.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/config/core_configuration.h" @@ -250,18 +249,10 @@ void RegisterMessageSizeFilter(CoreConfiguration::Builder* builder) { builder->channel_init() ->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL) .ExcludeFromMinimalStack() - .If(HasMessageSizeLimits) - // TODO(ctiller): ordering constraint is here to match the ordering that - // existed prior to ordering constraints did. Re-examine the ordering of - // filters from first principles. - .Before({&grpc_client_deadline_filter}); + .If(HasMessageSizeLimits); builder->channel_init() ->RegisterFilter(GRPC_SERVER_CHANNEL) .ExcludeFromMinimalStack() - .If(HasMessageSizeLimits) - // TODO(ctiller): ordering constraint is here to match the ordering that - // existed prior to ordering constraints did. Re-examine the ordering of - // filters from first principles. - .Before({&grpc_server_deadline_filter}); + .If(HasMessageSizeLimits); } } // namespace grpc_core diff --git a/src/core/lib/channel/context.h b/src/core/lib/channel/context.h index ac2e1aeda49..ddcd01395a9 100644 --- a/src/core/lib/channel/context.h +++ b/src/core/lib/channel/context.h @@ -29,9 +29,12 @@ /// This enum represents the indexes into the array, where each index /// contains a different type of value. typedef enum { + /// grpc_call* associated with this context. + GRPC_CONTEXT_CALL = 0, + /// Value is either a \a grpc_client_security_context or a /// \a grpc_server_security_context. - GRPC_CONTEXT_SECURITY = 0, + GRPC_CONTEXT_SECURITY, /// Value is a \a census_context. GRPC_CONTEXT_TRACING, @@ -68,10 +71,35 @@ struct grpc_call_context_element { }; namespace grpc_core { +class Call; + // Bind the legacy context array into the new style structure // TODO(ctiller): remove as we migrate these contexts to the new system. template <> struct ContextType {}; + +// Also as a transition step allow exposing a GetContext that can peek into +// the legacy context array. +namespace promise_detail { +template +struct OldStyleContext; + +template <> +struct OldStyleContext { + static constexpr grpc_context_index kIndex = GRPC_CONTEXT_CALL; +}; + +template +class Context::kIndex)>> { + public: + static T* get() { + return static_cast( + GetContext()[OldStyleContext::kIndex] + .value); + } +}; + +} // namespace promise_detail } // namespace grpc_core #endif // GRPC_SRC_CORE_LIB_CHANNEL_CONTEXT_H diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index 9be9619ff3c..0712c8afd85 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -177,7 +177,7 @@ (ie when all filters in a stack are promise based) expiry: 2024/06/14 owner: ctiller@google.com - test_tags: ["core_end2end_test", "cpp_end2end_test", "xds_end2end_test", "logging_test"] + test_tags: ["core_end2end_test", "logging_test"] - name: rstpit description: On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short duration diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 01099d7a054..b2d09507c33 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -113,164 +113,13 @@ grpc_core::DebugOnlyTraceFlag grpc_call_refcount_trace(false, "call_refcount"); namespace grpc_core { +// Alias to make this type available in Call implementation without a grpc_core +// prefix. +using GrpcClosure = Closure; + /////////////////////////////////////////////////////////////////////////////// // Call -class Call : public CppImplOf { - public: - Arena* arena() { return arena_; } - bool is_client() const { return is_client_; } - - virtual void ContextSet(grpc_context_index elem, void* value, - void (*destroy)(void* value)) = 0; - virtual void* ContextGet(grpc_context_index elem) const = 0; - virtual bool Completed() = 0; - void CancelWithStatus(grpc_status_code status, const char* description); - virtual void CancelWithError(grpc_error_handle error) = 0; - virtual void SetCompletionQueue(grpc_completion_queue* cq) = 0; - char* GetPeer(); - virtual grpc_call_error StartBatch(const grpc_op* ops, size_t nops, - void* notify_tag, - bool is_notify_tag_closure) = 0; - virtual bool failed_before_recv_message() const = 0; - virtual bool is_trailers_only() const = 0; - virtual absl::string_view GetServerAuthority() const = 0; - virtual void ExternalRef() = 0; - virtual void ExternalUnref() = 0; - virtual void InternalRef(const char* reason) = 0; - virtual void InternalUnref(const char* reason) = 0; - - grpc_compression_algorithm test_only_compression_algorithm() { - return incoming_compression_algorithm_; - } - uint32_t test_only_message_flags() { return test_only_last_message_flags_; } - CompressionAlgorithmSet encodings_accepted_by_peer() { - return encodings_accepted_by_peer_; - } - - // This should return nullptr for the promise stack (and alternative means - // for that functionality be invented) - virtual grpc_call_stack* call_stack() = 0; - - // Return the EventEngine used for this call's async execution. - virtual grpc_event_engine::experimental::EventEngine* event_engine() - const = 0; - - protected: - // The maximum number of concurrent batches possible. - // Based upon the maximum number of individually queueable ops in the batch - // api: - // - initial metadata send - // - message send - // - status/close send (depending on client/server) - // - initial metadata recv - // - message recv - // - status/close recv (depending on client/server) - static constexpr size_t kMaxConcurrentBatches = 6; - - struct ParentCall { - Mutex child_list_mu; - Call* first_child ABSL_GUARDED_BY(child_list_mu) = nullptr; - }; - - struct ChildCall { - explicit ChildCall(Call* parent) : parent(parent) {} - Call* parent; - /// siblings: children of the same parent form a list, and this list is - /// protected under - /// parent->mu - Call* sibling_next = nullptr; - Call* sibling_prev = nullptr; - }; - - Call(Arena* arena, bool is_client, Timestamp send_deadline, - RefCountedPtr channel) - : channel_(std::move(channel)), - arena_(arena), - send_deadline_(send_deadline), - is_client_(is_client) { - GPR_DEBUG_ASSERT(arena_ != nullptr); - GPR_DEBUG_ASSERT(channel_ != nullptr); - } - virtual ~Call() = default; - - void DeleteThis(); - - ParentCall* GetOrCreateParentCall(); - ParentCall* parent_call(); - Channel* channel() const { - GPR_DEBUG_ASSERT(channel_ != nullptr); - return channel_.get(); - } - - absl::Status InitParent(Call* parent, uint32_t propagation_mask); - void PublishToParent(Call* parent); - void MaybeUnpublishFromParent(); - void PropagateCancellationToChildren(); - - Timestamp send_deadline() const { return send_deadline_; } - void set_send_deadline(Timestamp send_deadline) { - send_deadline_ = send_deadline; - } - - Slice GetPeerString() const { - MutexLock lock(&peer_mu_); - return peer_string_.Ref(); - } - - void SetPeerString(Slice peer_string) { - MutexLock lock(&peer_mu_); - peer_string_ = std::move(peer_string); - } - - void ClearPeerString() { SetPeerString(Slice(grpc_empty_slice())); } - - // TODO(ctiller): cancel_func is for cancellation of the call - filter stack - // holds no mutexes here, promise stack does, and so locking is different. - // Remove this and cancel directly once promise conversion is done. - void ProcessIncomingInitialMetadata(grpc_metadata_batch& md); - // Fixup outgoing metadata before sending - adds compression, protects - // internal headers against external modification. - void PrepareOutgoingInitialMetadata(const grpc_op& op, - grpc_metadata_batch& md); - void NoteLastMessageFlags(uint32_t flags) { - test_only_last_message_flags_ = flags; - } - grpc_compression_algorithm incoming_compression_algorithm() const { - return incoming_compression_algorithm_; - } - - void HandleCompressionAlgorithmDisabled( - grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE; - void HandleCompressionAlgorithmNotAccepted( - grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE; - - gpr_cycle_counter start_time() const { return start_time_; } - - private: - RefCountedPtr channel_; - Arena* const arena_; - std::atomic parent_call_{nullptr}; - ChildCall* child_ = nullptr; - Timestamp send_deadline_; - const bool is_client_; - // flag indicating that cancellation is inherited - bool cancellation_is_inherited_ = false; - // Compression algorithm for *incoming* data - grpc_compression_algorithm incoming_compression_algorithm_ = - GRPC_COMPRESS_NONE; - // Supported encodings (compression algorithms), a bitset. - // Always support no compression. - CompressionAlgorithmSet encodings_accepted_by_peer_{GRPC_COMPRESS_NONE}; - uint32_t test_only_last_message_flags_ = 0; - // Peer name is protected by a mutex because it can be accessed by the - // application at the same moment as it is being set by the completion - // of the recv_initial_metadata op. The mutex should be mostly uncontended. - mutable Mutex peer_mu_; - Slice peer_string_; - gpr_cycle_counter start_time_ = gpr_get_cycle_counter(); -}; - Call::ParentCall* Call::GetOrCreateParentCall() { ParentCall* p = parent_call_.load(std::memory_order_acquire); if (p == nullptr) { @@ -503,6 +352,43 @@ void Call::HandleCompressionAlgorithmDisabled( GRPC_STATUS_UNIMPLEMENTED)); } +void Call::UpdateDeadline(Timestamp deadline) { + MutexLock lock(&deadline_mu_); + if (grpc_call_trace.enabled()) { + gpr_log(GPR_DEBUG, "[call %p] UpdateDeadline from=%s to=%s", this, + deadline_.ToString().c_str(), deadline.ToString().c_str()); + } + if (deadline >= deadline_) return; + auto* const event_engine = channel()->event_engine(); + if (deadline_ != Timestamp::InfFuture()) { + if (!event_engine->Cancel(deadline_task_)) return; + } else { + InternalRef("deadline"); + } + deadline_ = deadline; + deadline_task_ = event_engine->RunAfter(deadline - Timestamp::Now(), this); +} + +void Call::ResetDeadline() { + { + MutexLock lock(&deadline_mu_); + if (deadline_ == Timestamp::InfFuture()) return; + auto* const event_engine = channel()->event_engine(); + if (!event_engine->Cancel(deadline_task_)) return; + deadline_ = Timestamp::InfFuture(); + } + InternalUnref("deadline[reset]"); +} + +void Call::Run() { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + CancelWithError(grpc_error_set_int( + absl::DeadlineExceededError("Deadline Exceeded"), + StatusIntProperty::kRpcStatus, GRPC_STATUS_DEADLINE_EXCEEDED)); + InternalUnref("deadline[run]"); +} + /////////////////////////////////////////////////////////////////////////////// // FilterStackCall // To be removed once promise conversion is complete @@ -678,7 +564,9 @@ class FilterStackCall final : public Call { : Call(arena, args.server_transport_data == nullptr, args.send_deadline, args.channel->Ref()), cq_(args.cq), - stream_op_payload_(context_) {} + stream_op_payload_(context_) { + context_[GRPC_CONTEXT_CALL].value = this; + } static void ReleaseCall(void* call, grpc_error_handle); static void DestroyCall(void* call, grpc_error_handle); @@ -912,6 +800,10 @@ grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args, } } + if (args->send_deadline != Timestamp::InfFuture()) { + call->UpdateDeadline(args->send_deadline); + } + CSliceUnref(path); return error; @@ -1027,8 +919,13 @@ void FilterStackCall::CancelWithError(grpc_error_handle error) { if (!gpr_atm_rel_cas(&cancelled_with_error_, 0, 1)) { return; } + if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) { + gpr_log(GPR_INFO, "CancelWithError %s %s", is_client() ? "CLI" : "SVR", + StatusToString(error).c_str()); + } ClearPeerString(); InternalRef("termination"); + ResetDeadline(); // Inform the call combiner of the cancellation, so that it can cancel // any in-flight asynchronous actions that may be holding the call // combiner. This ensures that the cancel_stream batch can be sent @@ -1047,9 +944,10 @@ void FilterStackCall::CancelWithError(grpc_error_handle error) { void FilterStackCall::SetFinalStatus(grpc_error_handle error) { if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) { - gpr_log(GPR_DEBUG, "set_final_status %s %s", is_client() ? "CLI" : "SVR", + gpr_log(GPR_INFO, "set_final_status %s %s", is_client() ? "CLI" : "SVR", StatusToString(error).c_str()); } + ResetDeadline(); if (is_client()) { std::string status_details; grpc_error_get_status(error, send_deadline(), final_op_.client.status, @@ -1353,9 +1251,9 @@ void FilterStackCall::BatchControl::PostCompletion() { if (completion_data_.notify_tag.is_closure) { call_ = nullptr; - Closure::Run(DEBUG_LOCATION, - static_cast(completion_data_.notify_tag.tag), - error); + GrpcClosure::Run( + DEBUG_LOCATION, + static_cast(completion_data_.notify_tag.tag), error); call->InternalUnref("completion"); } else { grpc_cq_end_op( @@ -1477,7 +1375,7 @@ void FilterStackCall::BatchControl::ReceivingInitialMetadataReady( } } if (saved_rsr_closure != nullptr) { - Closure::Run(DEBUG_LOCATION, saved_rsr_closure, error); + GrpcClosure::Run(DEBUG_LOCATION, saved_rsr_closure, error); } FinishStep(PendingOp::kRecvInitialMetadata); @@ -1968,10 +1866,7 @@ bool ValidateMetadata(size_t count, grpc_metadata* metadata) { // PromiseBasedCall // Will be folded into Call once the promise conversion is done -class BasicPromiseBasedCall : public Call, - public Party, - public grpc_event_engine::experimental:: - EventEngine::Closure /* for deadlines */ { +class BasicPromiseBasedCall : public Call, public Party { public: using Call::arena; @@ -1986,6 +1881,7 @@ class BasicPromiseBasedCall : public Call, if (args.cq != nullptr) { GRPC_CQ_INTERNAL_REF(args.cq, "bind"); } + context_[GRPC_CONTEXT_CALL].value = this; } ~BasicPromiseBasedCall() override { @@ -1997,9 +1893,6 @@ class BasicPromiseBasedCall : public Call, } } - // Implementation of EventEngine::Closure, called when deadline expires - void Run() final; - virtual void OrphanCall() = 0; virtual ServerCallContext* server_call_context() { return nullptr; } @@ -2057,13 +1950,6 @@ class BasicPromiseBasedCall : public Call, return context_[elem].value; } - void UpdateDeadline(Timestamp deadline) ABSL_LOCKS_EXCLUDED(deadline_mu_); - void ResetDeadline() ABSL_LOCKS_EXCLUDED(deadline_mu_); - Timestamp deadline() { - MutexLock lock(&deadline_mu_); - return deadline_; - } - // Accept the stats from the context (call once we have proof the transport is // done with them). void AcceptTransportStatsFromContext() { @@ -2139,52 +2025,11 @@ class BasicPromiseBasedCall : public Call, // Contexts for various subsystems (security, tracing, ...). grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {}; grpc_call_stats final_stats_{}; - // Current deadline. - Mutex deadline_mu_; - Timestamp deadline_ ABSL_GUARDED_BY(deadline_mu_) = Timestamp::InfFuture(); - grpc_event_engine::experimental::EventEngine::TaskHandle ABSL_GUARDED_BY( - deadline_mu_) deadline_task_; Slice final_message_; grpc_status_code final_status_ = GRPC_STATUS_UNKNOWN; grpc_completion_queue* cq_; }; -void BasicPromiseBasedCall::UpdateDeadline(Timestamp deadline) { - MutexLock lock(&deadline_mu_); - if (grpc_call_trace.enabled()) { - gpr_log(GPR_DEBUG, "%s[call] UpdateDeadline from=%s to=%s", - DebugTag().c_str(), deadline_.ToString().c_str(), - deadline.ToString().c_str()); - } - if (deadline >= deadline_) return; - auto* const event_engine = channel()->event_engine(); - if (deadline_ != Timestamp::InfFuture()) { - if (!event_engine->Cancel(deadline_task_)) return; - } else { - InternalRef("deadline"); - } - deadline_ = deadline; - deadline_task_ = event_engine->RunAfter(deadline - Timestamp::Now(), this); -} - -void BasicPromiseBasedCall::ResetDeadline() { - { - MutexLock lock(&deadline_mu_); - if (deadline_ == Timestamp::InfFuture()) return; - auto* const event_engine = channel()->event_engine(); - if (!event_engine->Cancel(deadline_task_)) return; - deadline_ = Timestamp::InfFuture(); - } - InternalUnref("deadline[reset]"); -} - -void BasicPromiseBasedCall::Run() { - ApplicationCallbackExecCtx callback_exec_ctx; - ExecCtx exec_ctx; - CancelWithError(absl::DeadlineExceededError("Deadline exceeded")); - InternalUnref("deadline[run]"); -} - class PromiseBasedCall : public BasicPromiseBasedCall { public: PromiseBasedCall(Arena* arena, uint32_t initial_external_refs, @@ -2680,12 +2525,6 @@ void CallContext::IncrementRefCount(const char* reason) { void CallContext::Unref(const char* reason) { call_->InternalUnref(reason); } -void CallContext::UpdateDeadline(Timestamp deadline) { - call_->UpdateDeadline(deadline); -} - -Timestamp CallContext::deadline() const { return call_->deadline(); } - ServerCallContext* CallContext::server_call_context() { return call_->server_call_context(); } @@ -3697,7 +3536,12 @@ ServerPromiseBasedCall::MakeTopOfServerCallPromise( server_to_client_messages_ = call_args.server_to_client_messages; client_to_server_messages_ = call_args.client_to_server_messages; server_initial_metadata_ = call_args.server_initial_metadata; - set_send_deadline(deadline()); + absl::optional deadline = + client_initial_metadata_->get(GrpcTimeoutMetadata()); + if (deadline.has_value()) { + set_send_deadline(*deadline); + UpdateDeadline(*deadline); + } ProcessIncomingInitialMetadata(*client_initial_metadata_); ExternalRef(); publish(c_ptr()); diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index ddc29ed878a..efb377ede34 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -76,6 +76,179 @@ typedef struct grpc_call_create_args { } grpc_call_create_args; namespace grpc_core { + +class Call : public CppImplOf, + public grpc_event_engine::experimental::EventEngine:: + Closure /* for deadlines */ { + public: + Arena* arena() { return arena_; } + bool is_client() const { return is_client_; } + + virtual void ContextSet(grpc_context_index elem, void* value, + void (*destroy)(void* value)) = 0; + virtual void* ContextGet(grpc_context_index elem) const = 0; + virtual bool Completed() = 0; + void CancelWithStatus(grpc_status_code status, const char* description); + virtual void CancelWithError(grpc_error_handle error) = 0; + virtual void SetCompletionQueue(grpc_completion_queue* cq) = 0; + char* GetPeer(); + virtual grpc_call_error StartBatch(const grpc_op* ops, size_t nops, + void* notify_tag, + bool is_notify_tag_closure) = 0; + virtual bool failed_before_recv_message() const = 0; + virtual bool is_trailers_only() const = 0; + virtual absl::string_view GetServerAuthority() const = 0; + virtual void ExternalRef() = 0; + virtual void ExternalUnref() = 0; + virtual void InternalRef(const char* reason) = 0; + virtual void InternalUnref(const char* reason) = 0; + + void UpdateDeadline(Timestamp deadline) ABSL_LOCKS_EXCLUDED(deadline_mu_); + void ResetDeadline() ABSL_LOCKS_EXCLUDED(deadline_mu_); + Timestamp deadline() { + MutexLock lock(&deadline_mu_); + return deadline_; + } + + grpc_compression_algorithm test_only_compression_algorithm() { + return incoming_compression_algorithm_; + } + uint32_t test_only_message_flags() { return test_only_last_message_flags_; } + CompressionAlgorithmSet encodings_accepted_by_peer() { + return encodings_accepted_by_peer_; + } + + // This should return nullptr for the promise stack (and alternative means + // for that functionality be invented) + virtual grpc_call_stack* call_stack() = 0; + + // Return the EventEngine used for this call's async execution. + virtual grpc_event_engine::experimental::EventEngine* event_engine() + const = 0; + + // Implementation of EventEngine::Closure, called when deadline expires + void Run() final; + + protected: + // The maximum number of concurrent batches possible. + // Based upon the maximum number of individually queueable ops in the batch + // api: + // - initial metadata send + // - message send + // - status/close send (depending on client/server) + // - initial metadata recv + // - message recv + // - status/close recv (depending on client/server) + static constexpr size_t kMaxConcurrentBatches = 6; + + struct ParentCall { + Mutex child_list_mu; + Call* first_child ABSL_GUARDED_BY(child_list_mu) = nullptr; + }; + + struct ChildCall { + explicit ChildCall(Call* parent) : parent(parent) {} + Call* parent; + /// siblings: children of the same parent form a list, and this list is + /// protected under + /// parent->mu + Call* sibling_next = nullptr; + Call* sibling_prev = nullptr; + }; + + Call(Arena* arena, bool is_client, Timestamp send_deadline, + RefCountedPtr channel) + : channel_(std::move(channel)), + arena_(arena), + send_deadline_(send_deadline), + is_client_(is_client) { + GPR_DEBUG_ASSERT(arena_ != nullptr); + GPR_DEBUG_ASSERT(channel_ != nullptr); + } + ~Call() override = default; + + void DeleteThis(); + + ParentCall* GetOrCreateParentCall(); + ParentCall* parent_call(); + Channel* channel() const { + GPR_DEBUG_ASSERT(channel_ != nullptr); + return channel_.get(); + } + + absl::Status InitParent(Call* parent, uint32_t propagation_mask); + void PublishToParent(Call* parent); + void MaybeUnpublishFromParent(); + void PropagateCancellationToChildren(); + + Timestamp send_deadline() const { return send_deadline_; } + void set_send_deadline(Timestamp send_deadline) { + send_deadline_ = send_deadline; + } + + Slice GetPeerString() const { + MutexLock lock(&peer_mu_); + return peer_string_.Ref(); + } + + void SetPeerString(Slice peer_string) { + MutexLock lock(&peer_mu_); + peer_string_ = std::move(peer_string); + } + + void ClearPeerString() { SetPeerString(Slice(grpc_empty_slice())); } + + // TODO(ctiller): cancel_func is for cancellation of the call - filter stack + // holds no mutexes here, promise stack does, and so locking is different. + // Remove this and cancel directly once promise conversion is done. + void ProcessIncomingInitialMetadata(grpc_metadata_batch& md); + // Fixup outgoing metadata before sending - adds compression, protects + // internal headers against external modification. + void PrepareOutgoingInitialMetadata(const grpc_op& op, + grpc_metadata_batch& md); + void NoteLastMessageFlags(uint32_t flags) { + test_only_last_message_flags_ = flags; + } + grpc_compression_algorithm incoming_compression_algorithm() const { + return incoming_compression_algorithm_; + } + + void HandleCompressionAlgorithmDisabled( + grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE; + void HandleCompressionAlgorithmNotAccepted( + grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE; + + gpr_cycle_counter start_time() const { return start_time_; } + + private: + RefCountedPtr channel_; + Arena* const arena_; + std::atomic parent_call_{nullptr}; + ChildCall* child_ = nullptr; + Timestamp send_deadline_; + const bool is_client_; + // flag indicating that cancellation is inherited + bool cancellation_is_inherited_ = false; + // Compression algorithm for *incoming* data + grpc_compression_algorithm incoming_compression_algorithm_ = + GRPC_COMPRESS_NONE; + // Supported encodings (compression algorithms), a bitset. + // Always support no compression. + CompressionAlgorithmSet encodings_accepted_by_peer_{GRPC_COMPRESS_NONE}; + uint32_t test_only_last_message_flags_ = 0; + // Peer name is protected by a mutex because it can be accessed by the + // application at the same moment as it is being set by the completion + // of the recv_initial_metadata op. The mutex should be mostly uncontended. + mutable Mutex peer_mu_; + Slice peer_string_; + // Current deadline. + Mutex deadline_mu_; + Timestamp deadline_ ABSL_GUARDED_BY(deadline_mu_) = Timestamp::InfFuture(); + grpc_event_engine::experimental::EventEngine::TaskHandle ABSL_GUARDED_BY( + deadline_mu_) deadline_task_; + gpr_cycle_counter start_time_ = gpr_get_cycle_counter(); +}; + class BasicPromiseBasedCall; class ServerPromiseBasedCall; @@ -106,10 +279,6 @@ class CallContext { public: explicit CallContext(BasicPromiseBasedCall* call) : call_(call) {} - // Update the deadline (if deadline < the current deadline). - void UpdateDeadline(Timestamp deadline); - Timestamp deadline() const; - // Run some action in the call activity context. This is needed to adapt some // legacy systems to promises, and will likely disappear once that conversion // is complete. diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 9d19736ae9c..9540684920e 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -234,7 +234,7 @@ struct Server::RequestedCall { template void Complete(OptionalPayload payload, ClientMetadata& md) { - Timestamp deadline = GetContext()->deadline(); + Timestamp deadline = GetContext()->deadline(); switch (type) { case RequestedCall::Type::BATCH_CALL: GPR_ASSERT(!payload.has_value()); @@ -1479,6 +1479,10 @@ void Server::ChannelData::InitCall(RefCountedPtr call) { auto* rc = mr.TakeCall(); rc->Complete(std::move(std::get<0>(r)), *md); auto* call_context = GetContext(); + const auto* deadline = md->get_pointer(GrpcTimeoutMetadata()); + if (deadline != nullptr) { + GetContext()->UpdateDeadline(*deadline); + } *rc->call = call_context->c_call(); grpc_call_ref(*rc->call); grpc_call_set_completion_queue(call_context->c_call(), @@ -1828,6 +1832,7 @@ void Server::CallData::RecvInitialMetadataReady(void* arg, auto op_deadline = calld->recv_initial_metadata_->get(GrpcTimeoutMetadata()); if (op_deadline.has_value()) { calld->deadline_ = *op_deadline; + Call::FromC(calld->call_)->UpdateDeadline(*op_deadline); } if (calld->host_.has_value() && calld->path_.has_value()) { // do nothing diff --git a/src/core/lib/transport/call_filters.h b/src/core/lib/transport/call_filters.h index b69b44a8df3..043bf6434da 100644 --- a/src/core/lib/transport/call_filters.h +++ b/src/core/lib/transport/call_filters.h @@ -926,7 +926,7 @@ struct StackData { filter_destructor.push_back(FilterDestructor{ call_offset, [](void* call_data) { - static_cast(call_data)->~Call(); + Destruct(static_cast(call_data)); }, }); } diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc index 32e31ba6bf0..ef816a1343a 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_plugin_registry.cc @@ -44,7 +44,6 @@ extern void SecurityRegisterHandshakerFactories( extern void RegisterClientAuthorityFilter(CoreConfiguration::Builder* builder); extern void RegisterLegacyChannelIdleFilters( CoreConfiguration::Builder* builder); -extern void RegisterDeadlineFilter(CoreConfiguration::Builder* builder); extern void RegisterGrpcLbPolicy(CoreConfiguration::Builder* builder); extern void RegisterHttpFilters(CoreConfiguration::Builder* builder); extern void RegisterMessageSizeFilter(CoreConfiguration::Builder* builder); @@ -111,7 +110,6 @@ void BuildCoreConfiguration(CoreConfiguration::Builder* builder) { RegisterConnectedChannel(builder); RegisterGrpcLbPolicy(builder); RegisterHttpFilters(builder); - RegisterDeadlineFilter(builder); RegisterMessageSizeFilter(builder); RegisterServiceConfigChannelArgFilter(builder); RegisterResourceQuota(builder); diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 0fdcc21a484..06d78ce6bd1 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -38,7 +38,6 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/census/grpc_context.cc', 'src/core/ext/filters/channel_idle/idle_filter_state.cc', 'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc', - 'src/core/ext/filters/deadline/deadline_filter.cc', 'src/core/ext/filters/fault_injection/fault_injection_filter.cc', 'src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc', 'src/core/ext/filters/http/client/http_client_filter.cc', diff --git a/test/core/channel/minimal_stack_is_minimal_test.cc b/test/core/channel/minimal_stack_is_minimal_test.cc index 898048186c0..44193d7f1ca 100644 --- a/test/core/channel/minimal_stack_is_minimal_test.cc +++ b/test/core/channel/minimal_stack_is_minimal_test.cc @@ -138,29 +138,29 @@ TEST(ChannelStackFilters, LooksAsExpected) { // tests with a default stack - EXPECT_EQ(MakeStack("unknown", no_args, GRPC_CLIENT_DIRECT_CHANNEL), - std::vector( - {"authority", "message_size", "deadline", "connected"})); + EXPECT_EQ( + MakeStack("unknown", no_args, GRPC_CLIENT_DIRECT_CHANNEL), + std::vector({"authority", "message_size", "connected"})); EXPECT_EQ( MakeStack("unknown", no_args, GRPC_CLIENT_SUBCHANNEL), std::vector({"authority", "message_size", "connected"})); EXPECT_EQ(MakeStack("unknown", no_args, GRPC_SERVER_CHANNEL), - std::vector({"server", "message_size", "deadline", - "server_call_tracer", "connected"})); + std::vector( + {"server", "message_size", "server_call_tracer", "connected"})); EXPECT_EQ( MakeStack("chttp2", no_args, GRPC_CLIENT_DIRECT_CHANNEL), - std::vector({"authority", "message_size", "deadline", - "http-client", "compression", "connected"})); + std::vector({"authority", "message_size", "http-client", + "compression", "connected"})); EXPECT_EQ( MakeStack("chttp2", no_args, GRPC_CLIENT_SUBCHANNEL), std::vector({"authority", "message_size", "http-client", "compression", "connected"})); EXPECT_EQ(MakeStack("chttp2", no_args, GRPC_SERVER_CHANNEL), - std::vector({"server", "message_size", "deadline", - "http-server", "compression", - "server_call_tracer", "connected"})); + std::vector({"server", "message_size", "http-server", + "compression", "server_call_tracer", + "connected"})); EXPECT_EQ(MakeStack(nullptr, no_args, GRPC_CLIENT_CHANNEL), std::vector({"client_idle", "client-channel"})); } diff --git a/test/core/end2end/end2end_tests.h b/test/core/end2end/end2end_tests.h index 737badb909b..4575d432594 100644 --- a/test/core/end2end/end2end_tests.h +++ b/test/core/end2end/end2end_tests.h @@ -181,6 +181,36 @@ class CoreEnd2endTest : public ::testing::Test { void* p; }; + // Safe notification to use for core e2e tests. + // Since when we're fuzzing we don't run background threads, the normal + // Notification type isn't safe to wait on (for some background timer to fire + // for instance...), consequently we need to use this. + class TestNotification { + public: + explicit TestNotification(CoreEnd2endTest* test) : test_(test) {} + + void WaitForNotificationWithTimeout(absl::Duration wait_time) { + if (g_is_fuzzing_core_e2e_tests) { + Timestamp end = Timestamp::Now() + Duration::NanosecondsRoundUp( + ToInt64Nanoseconds(wait_time)); + while (true) { + if (base_.HasBeenNotified()) return; + auto now = Timestamp::Now(); + if (now >= end) return; + test_->step_fn_(now - end); + } + } else { + base_.WaitForNotificationWithTimeout(wait_time); + } + } + + void Notify() { base_.Notify(); } + + private: + Notification base_; + CoreEnd2endTest* const test_; + }; + // CallBuilder - results in a call to either grpc_channel_create_call or // grpc_channel_create_registered_call. // Affords a fluent interface to specify optional arguments. @@ -753,7 +783,14 @@ class CoreEnd2endTest : public ::testing::Test { cq_, g_is_fuzzing_core_e2e_tests ? CqVerifier::FailUsingGprCrashWithStdio : CqVerifier::FailUsingGprCrash, - std::move(step_fn_)); + step_fn_ == nullptr + ? nullptr + : absl::AnyInvocable( + [this]( + grpc_event_engine::experimental::EventEngine::Duration + d) { step_fn_(d); })); } return *cq_verifier_; } diff --git a/test/core/end2end/tests/http2_stats.cc b/test/core/end2end/tests/http2_stats.cc index 482024f46e8..f0421d67dd0 100644 --- a/test/core/end2end/tests/http2_stats.cc +++ b/test/core/end2end/tests/http2_stats.cc @@ -57,8 +57,8 @@ namespace grpc_core { namespace { Mutex* g_mu; -Notification* g_client_call_ended_notify; -Notification* g_server_call_ended_notify; +CoreEnd2endTest::TestNotification* g_client_call_ended_notify; +CoreEnd2endTest::TestNotification* g_server_call_ended_notify; class FakeCallTracer : public ClientCallTracer { public: @@ -197,8 +197,8 @@ CORE_END2END_TEST(Http2FullstackSingleHopTest, StreamStats) { GTEST_SKIP() << "Test needs http2_stats_fix experiment to be enabled"; } g_mu = new Mutex(); - g_client_call_ended_notify = new Notification(); - g_server_call_ended_notify = new Notification(); + g_client_call_ended_notify = new CoreEnd2endTest::TestNotification(this); + g_server_call_ended_notify = new CoreEnd2endTest::TestNotification(this); GlobalStatsPluginRegistry::RegisterStatsPlugin( std::make_shared()); auto send_from_client = RandomSlice(10); diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index f79173cfd12..38676f21685 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1131,8 +1131,6 @@ src/core/ext/filters/channel_idle/idle_filter_state.cc \ src/core/ext/filters/channel_idle/idle_filter_state.h \ src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc \ src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h \ -src/core/ext/filters/deadline/deadline_filter.cc \ -src/core/ext/filters/deadline/deadline_filter.h \ src/core/ext/filters/fault_injection/fault_injection_filter.cc \ src/core/ext/filters/fault_injection/fault_injection_filter.h \ src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 652ad4ddfeb..8876021891b 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -936,8 +936,6 @@ src/core/ext/filters/channel_idle/idle_filter_state.cc \ src/core/ext/filters/channel_idle/idle_filter_state.h \ src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc \ src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h \ -src/core/ext/filters/deadline/deadline_filter.cc \ -src/core/ext/filters/deadline/deadline_filter.h \ src/core/ext/filters/fault_injection/fault_injection_filter.cc \ src/core/ext/filters/fault_injection/fault_injection_filter.h \ src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc \