[woah] Remove deadline filter (#36477)

Also begin to eliminate `CallContext` in favor of just exposing `Call` - ultimately there's not really a need to introduce two types here, so I'm going to wind that idea back over a few PRs.

I've avoided making this an experiment as the changes required were quite structural.

Closes #36477

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36477 from ctiller:deadline-time 9856eeebe6
PiperOrigin-RevId: 629599230
pull/36492/head
Craig Tiller 11 months ago committed by Copybara-Service
parent a0bee78a28
commit e614dafd89
  1. 2
      BUILD
  2. 2
      CMakeLists.txt
  3. 1
      Makefile
  4. 2
      Package.swift
  5. 18
      bazel/experiments.bzl
  6. 4
      build_autogenerated.yaml
  7. 2
      config.m4
  8. 2
      config.w32
  9. 2
      gRPC-C++.podspec
  10. 3
      gRPC-Core.podspec
  11. 2
      grpc.gemspec
  12. 3
      include/grpc/impl/channel_arg_names.h
  13. 2
      package.xml
  14. 37
      src/core/BUILD
  15. 56
      src/core/client_channel/client_channel_filter.cc
  16. 2
      src/core/client_channel/client_channel_filter.h
  17. 408
      src/core/ext/filters/deadline/deadline_filter.cc
  18. 85
      src/core/ext/filters/deadline/deadline_filter.h
  19. 13
      src/core/ext/filters/message_size/message_size_filter.cc
  20. 30
      src/core/lib/channel/context.h
  21. 2
      src/core/lib/experiments/experiments.yaml
  22. 290
      src/core/lib/surface/call.cc
  23. 177
      src/core/lib/surface/call.h
  24. 7
      src/core/lib/surface/server.cc
  25. 2
      src/core/lib/transport/call_filters.h
  26. 2
      src/core/plugin_registry/grpc_plugin_registry.cc
  27. 1
      src/python/grpcio/grpc_core_dependencies.py
  28. 20
      test/core/channel/minimal_stack_is_minimal_test.cc
  29. 39
      test/core/end2end/end2end_tests.h
  30. 8
      test/core/end2end/tests/http2_stats.cc
  31. 2
      tools/doxygen/Doxyfile.c++.internal
  32. 2
      tools/doxygen/Doxyfile.core.internal

@ -851,7 +851,6 @@ grpc_cc_library(
# standard plugins
"census",
"//src/core:grpc_backend_metric_filter",
"//src/core:grpc_deadline_filter",
"//src/core:grpc_client_authority_filter",
"//src/core:grpc_lb_policy_grpclb",
"//src/core:grpc_lb_policy_outlier_detection",
@ -3697,7 +3696,6 @@ grpc_cc_library(
"//src/core:gpr_atm",
"//src/core:gpr_manual_constructor",
"//src/core:grpc_backend_metric_data",
"//src/core:grpc_deadline_filter",
"//src/core:grpc_message_size_filter",
"//src/core:grpc_service_config",
"//src/core:init_internally",

2
CMakeLists.txt generated

@ -1846,7 +1846,6 @@ add_library(grpc
src/core/ext/filters/census/grpc_context.cc
src/core/ext/filters/channel_idle/idle_filter_state.cc
src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc
src/core/ext/filters/deadline/deadline_filter.cc
src/core/ext/filters/fault_injection/fault_injection_filter.cc
src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc
src/core/ext/filters/http/client/http_client_filter.cc
@ -2943,7 +2942,6 @@ add_library(grpc_unsecure
src/core/ext/filters/census/grpc_context.cc
src/core/ext/filters/channel_idle/idle_filter_state.cc
src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc
src/core/ext/filters/deadline/deadline_filter.cc
src/core/ext/filters/fault_injection/fault_injection_filter.cc
src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc
src/core/ext/filters/http/client/http_client_filter.cc

1
Makefile generated

@ -689,7 +689,6 @@ LIBGRPC_SRC = \
src/core/ext/filters/census/grpc_context.cc \
src/core/ext/filters/channel_idle/idle_filter_state.cc \
src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc \
src/core/ext/filters/deadline/deadline_filter.cc \
src/core/ext/filters/fault_injection/fault_injection_filter.cc \
src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc \
src/core/ext/filters/http/client/http_client_filter.cc \

2
Package.swift generated

@ -165,8 +165,6 @@ let package = Package(
"src/core/ext/filters/channel_idle/idle_filter_state.h",
"src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc",
"src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h",
"src/core/ext/filters/deadline/deadline_filter.cc",
"src/core/ext/filters/deadline/deadline_filter.h",
"src/core/ext/filters/fault_injection/fault_injection_filter.cc",
"src/core/ext/filters/fault_injection/fault_injection_filter.h",
"src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc",

@ -62,9 +62,6 @@ EXPERIMENTS = {
"core_end2end_test": [
"promise_based_server_call",
],
"cpp_end2end_test": [
"promise_based_server_call",
],
"endpoint_test": [
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
@ -83,9 +80,6 @@ EXPERIMENTS = {
"free_large_allocator",
"unconstrained_max_quota_buffer_size",
],
"xds_end2end_test": [
"promise_based_server_call",
],
},
"on": {
"core_end2end_test": [
@ -112,9 +106,6 @@ EXPERIMENTS = {
"core_end2end_test": [
"promise_based_server_call",
],
"cpp_end2end_test": [
"promise_based_server_call",
],
"endpoint_test": [
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
@ -133,9 +124,6 @@ EXPERIMENTS = {
"free_large_allocator",
"unconstrained_max_quota_buffer_size",
],
"xds_end2end_test": [
"promise_based_server_call",
],
},
"on": {
"cpp_lb_end2end_test": [
@ -159,9 +147,6 @@ EXPERIMENTS = {
"promise_based_client_call",
"promise_based_server_call",
],
"cpp_end2end_test": [
"promise_based_server_call",
],
"endpoint_test": [
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
@ -186,9 +171,6 @@ EXPERIMENTS = {
"free_large_allocator",
"unconstrained_max_quota_buffer_size",
],
"xds_end2end_test": [
"promise_based_server_call",
],
},
"on": {
"cancel_ares_query_test": [

@ -246,7 +246,6 @@ libs:
- src/core/ext/filters/backend_metrics/backend_metric_provider.h
- src/core/ext/filters/channel_idle/idle_filter_state.h
- src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h
- src/core/ext/filters/deadline/deadline_filter.h
- src/core/ext/filters/fault_injection/fault_injection_filter.h
- src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h
- src/core/ext/filters/http/client/http_client_filter.h
@ -1267,7 +1266,6 @@ libs:
- src/core/ext/filters/census/grpc_context.cc
- src/core/ext/filters/channel_idle/idle_filter_state.cc
- src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc
- src/core/ext/filters/deadline/deadline_filter.cc
- src/core/ext/filters/fault_injection/fault_injection_filter.cc
- src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc
- src/core/ext/filters/http/client/http_client_filter.cc
@ -2233,7 +2231,6 @@ libs:
- src/core/ext/filters/backend_metrics/backend_metric_provider.h
- src/core/ext/filters/channel_idle/idle_filter_state.h
- src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h
- src/core/ext/filters/deadline/deadline_filter.h
- src/core/ext/filters/fault_injection/fault_injection_filter.h
- src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h
- src/core/ext/filters/http/client/http_client_filter.h
@ -2723,7 +2720,6 @@ libs:
- src/core/ext/filters/census/grpc_context.cc
- src/core/ext/filters/channel_idle/idle_filter_state.cc
- src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc
- src/core/ext/filters/deadline/deadline_filter.cc
- src/core/ext/filters/fault_injection/fault_injection_filter.cc
- src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc
- src/core/ext/filters/http/client/http_client_filter.cc

2
config.m4 generated

@ -64,7 +64,6 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/census/grpc_context.cc \
src/core/ext/filters/channel_idle/idle_filter_state.cc \
src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc \
src/core/ext/filters/deadline/deadline_filter.cc \
src/core/ext/filters/fault_injection/fault_injection_filter.cc \
src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc \
src/core/ext/filters/http/client/http_client_filter.cc \
@ -1391,7 +1390,6 @@ if test "$PHP_GRPC" != "no"; then
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/backend_metrics)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/census)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/channel_idle)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/deadline)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/fault_injection)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http/client)

2
config.w32 generated

@ -29,7 +29,6 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\census\\grpc_context.cc " +
"src\\core\\ext\\filters\\channel_idle\\idle_filter_state.cc " +
"src\\core\\ext\\filters\\channel_idle\\legacy_channel_idle_filter.cc " +
"src\\core\\ext\\filters\\deadline\\deadline_filter.cc " +
"src\\core\\ext\\filters\\fault_injection\\fault_injection_filter.cc " +
"src\\core\\ext\\filters\\fault_injection\\fault_injection_service_config_parser.cc " +
"src\\core\\ext\\filters\\http\\client\\http_client_filter.cc " +
@ -1385,7 +1384,6 @@ if (PHP_GRPC != "no") {
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\backend_metrics");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\census");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\channel_idle");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\deadline");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\fault_injection");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\http");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\http\\client");

2
gRPC-C++.podspec generated

@ -286,7 +286,6 @@ Pod::Spec.new do |s|
'src/core/ext/filters/backend_metrics/backend_metric_provider.h',
'src/core/ext/filters/channel_idle/idle_filter_state.h',
'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h',
'src/core/ext/filters/deadline/deadline_filter.h',
'src/core/ext/filters/fault_injection/fault_injection_filter.h',
'src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h',
'src/core/ext/filters/http/client/http_client_filter.h',
@ -1575,7 +1574,6 @@ Pod::Spec.new do |s|
'src/core/ext/filters/backend_metrics/backend_metric_provider.h',
'src/core/ext/filters/channel_idle/idle_filter_state.h',
'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h',
'src/core/ext/filters/deadline/deadline_filter.h',
'src/core/ext/filters/fault_injection/fault_injection_filter.h',
'src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h',
'src/core/ext/filters/http/client/http_client_filter.h',

3
gRPC-Core.podspec generated

@ -282,8 +282,6 @@ Pod::Spec.new do |s|
'src/core/ext/filters/channel_idle/idle_filter_state.h',
'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc',
'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h',
'src/core/ext/filters/deadline/deadline_filter.cc',
'src/core/ext/filters/deadline/deadline_filter.h',
'src/core/ext/filters/fault_injection/fault_injection_filter.cc',
'src/core/ext/filters/fault_injection/fault_injection_filter.h',
'src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc',
@ -2375,7 +2373,6 @@ Pod::Spec.new do |s|
'src/core/ext/filters/backend_metrics/backend_metric_provider.h',
'src/core/ext/filters/channel_idle/idle_filter_state.h',
'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h',
'src/core/ext/filters/deadline/deadline_filter.h',
'src/core/ext/filters/fault_injection/fault_injection_filter.h',
'src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h',
'src/core/ext/filters/http/client/http_client_filter.h',

2
grpc.gemspec generated

@ -171,8 +171,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/channel_idle/idle_filter_state.h )
s.files += %w( src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc )
s.files += %w( src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h )
s.files += %w( src/core/ext/filters/deadline/deadline_filter.cc )
s.files += %w( src/core/ext/filters/deadline/deadline_filter.h )
s.files += %w( src/core/ext/filters/fault_injection/fault_injection_filter.cc )
s.files += %w( src/core/ext/filters/fault_injection/fault_injection_filter.h )
s.files += %w( src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc )

@ -67,9 +67,6 @@
application will see the compressed message in the byte buffer. */
#define GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION \
"grpc.per_message_decompression"
/** Enable/disable support for deadline checking. Defaults to 1, unless
GRPC_ARG_MINIMAL_STACK is enabled, in which case it defaults to 0 */
#define GRPC_ARG_ENABLE_DEADLINE_CHECKS "grpc.enable_deadline_checking"
/** Initial stream ID for http2 transports. Int valued. */
#define GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER \
"grpc.http2.initial_sequence_number"

2
package.xml generated

@ -153,8 +153,6 @@
<file baseinstalldir="/" name="src/core/ext/filters/channel_idle/idle_filter_state.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/deadline/deadline_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/deadline/deadline_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/fault_injection/fault_injection_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/fault_injection/fault_injection_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc" role="src" />

@ -4524,42 +4524,6 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "grpc_deadline_filter",
srcs = [
"ext/filters/deadline/deadline_filter.cc",
],
hdrs = [
"ext/filters/deadline/deadline_filter.h",
],
external_deps = [
"absl/status",
"absl/types:optional",
],
language = "c++",
deps = [
"arena",
"arena_promise",
"channel_fwd",
"channel_stack_type",
"closure",
"context",
"error",
"metadata_batch",
"status_helper",
"time",
"//:call_combiner",
"//:channel_arg_names",
"//:config",
"//:debug_location",
"//:exec_ctx",
"//:gpr",
"//:grpc_base",
"//:grpc_public_hdrs",
"//:iomgr_timer",
],
)
grpc_cc_library(
name = "grpc_client_authority_filter",
srcs = [
@ -4613,7 +4577,6 @@ grpc_cc_library(
"channel_fwd",
"channel_stack_type",
"context",
"grpc_deadline_filter",
"grpc_service_config",
"json",
"json_args",

@ -61,7 +61,6 @@
#include "src/core/client_channel/retry_filter.h"
#include "src/core/client_channel/subchannel.h"
#include "src/core/client_channel/subchannel_interface_internal.h"
#include "src/core/ext/filters/deadline/deadline_filter.h"
#include "src/core/handshaker/proxy_mapper_registry.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
@ -210,14 +209,14 @@ class ClientChannelFilter::FilterBasedCallData final
const grpc_call_element_args& args);
~FilterBasedCallData() override;
grpc_call_element* elem() const { return deadline_state_.elem; }
grpc_call_stack* owning_call() const { return deadline_state_.call_stack; }
CallCombiner* call_combiner() const { return deadline_state_.call_combiner; }
grpc_call_element* elem() const { return elem_; }
grpc_call_stack* owning_call() const { return owning_call_; }
CallCombiner* call_combiner() const { return call_combiner_; }
ClientChannelFilter* chand() const override {
return static_cast<ClientChannelFilter*>(elem()->channel_data);
}
Arena* arena() const override { return deadline_state_.arena; }
Arena* arena() const override { return arena_; }
grpc_polling_entity* pollent() override { return pollent_; }
grpc_metadata_batch* send_initial_metadata() override {
return pending_batches_[0]
@ -270,10 +269,8 @@ class ClientChannelFilter::FilterBasedCallData final
void ResetDeadline(Duration timeout) override {
const Timestamp per_method_deadline =
Timestamp::FromCycleCounterRoundUp(call_start_time_) + timeout;
if (per_method_deadline < deadline_) {
deadline_ = per_method_deadline;
grpc_deadline_state_reset(&deadline_state_, deadline_);
}
static_cast<Call*>(call_context_[GRPC_CONTEXT_CALL].value)
->UpdateDeadline(per_method_deadline);
}
void CreateDynamicCall();
@ -286,8 +283,10 @@ class ClientChannelFilter::FilterBasedCallData final
gpr_cycle_counter call_start_time_;
Timestamp deadline_;
// State for handling deadlines.
grpc_deadline_state deadline_state_;
Arena* const arena_;
grpc_call_element* const elem_;
grpc_call_stack* const owning_call_;
CallCombiner* const call_combiner_;
grpc_polling_entity* pollent_ = nullptr;
@ -387,11 +386,12 @@ class ClientChannelFilter::PromiseBasedCallData final
}
void ResetDeadline(Duration timeout) override {
Call* call = GetContext<Call>();
CallContext* call_context = GetContext<CallContext>();
const Timestamp per_method_deadline =
Timestamp::FromCycleCounterRoundUp(call_context->call_start_time()) +
timeout;
call_context->UpdateDeadline(per_method_deadline);
call->UpdateDeadline(per_method_deadline);
}
ClientChannelFilter* chand_;
@ -1230,9 +1230,6 @@ RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
ClientChannelFilter::ClientChannelFilter(grpc_channel_element_args* args,
grpc_error_handle* error)
: channel_args_(args->channel_args),
deadline_checking_enabled_(
channel_args_.GetBool(GRPC_ARG_ENABLE_DEADLINE_CHECKS)
.value_or(!channel_args_.WantMinimalStack())),
owning_stack_(args->channel_stack),
client_channel_factory_(channel_args_.GetObject<ClientChannelFactory>()),
channelz_node_(channel_args_.GetObject<channelz::ChannelNode>()),
@ -2112,8 +2109,7 @@ grpc_error_handle ClientChannelFilter::CallData::ApplyServiceConfigToCallLocked(
if (method_params != nullptr) {
// If the deadline from the service config is shorter than the one
// from the client API, reset the deadline timer.
if (chand()->deadline_checking_enabled_ &&
method_params->timeout() != Duration::Zero()) {
if (method_params->timeout() != Duration::Zero()) {
ResetDeadline(method_params->timeout());
}
// If the service config set wait_for_ready and the application
@ -2213,12 +2209,10 @@ ClientChannelFilter::FilterBasedCallData::FilterBasedCallData(
call_context_(args.context),
call_start_time_(args.start_time),
deadline_(args.deadline),
deadline_state_(
elem, args,
GPR_LIKELY(static_cast<ClientChannelFilter*>(elem->channel_data)
->deadline_checking_enabled_)
? args.deadline
: Timestamp::InfFuture()) {
arena_(args.arena),
elem_(elem),
owning_call_(args.call_stack),
call_combiner_(args.call_combiner) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: created call", chand(), this);
}
@ -2262,10 +2256,6 @@ void ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch(
gpr_log(GPR_INFO, "chand=%p calld=%p: batch started from above: %s", chand,
calld, grpc_transport_stream_op_batch_string(batch, false).c_str());
}
if (GPR_LIKELY(chand->deadline_checking_enabled_)) {
grpc_deadline_state_client_start_transport_stream_op_batch(
&calld->deadline_state_, batch);
}
// Intercept recv_trailing_metadata to commit the call, in case we wind up
// failing the call before we get down to the retry or LB call layer.
if (batch->recv_trailing_metadata) {
@ -3056,7 +3046,6 @@ ClientChannelFilter::FilterBasedLoadBalancedCall::FilterBasedLoadBalancedCall(
absl::AnyInvocable<void()> on_commit, bool is_transparent_retry)
: LoadBalancedCall(chand, args.context, std::move(on_commit),
is_transparent_retry),
deadline_(args.deadline),
arena_(args.arena),
owning_call_(args.call_stack),
call_combiner_(args.call_combiner),
@ -3356,8 +3345,12 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall::
// Get status from error.
grpc_status_code code;
std::string message;
grpc_error_get_status(error, self->deadline_, &code, &message,
/*http_error=*/nullptr, /*error_string=*/nullptr);
grpc_error_get_status(
error,
static_cast<Call*>(self->call_context()[GRPC_CONTEXT_CALL].value)
->deadline(),
&code, &message,
/*http_error=*/nullptr, /*error_string=*/nullptr);
status = absl::Status(static_cast<absl::StatusCode>(code), message);
} else {
// Get status from headers.
@ -3495,7 +3488,8 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall::CreateSubchannelCall() {
CHECK_NE(path, nullptr);
SubchannelCall::Args call_args = {
connected_subchannel()->Ref(), pollent_, path->Ref(), /*start_time=*/0,
deadline_, arena_,
static_cast<Call*>(call_context()[GRPC_CONTEXT_CALL].value)->deadline(),
arena_,
// TODO(roth): When we implement hedging support, we will probably
// need to use a separate call context for each subchannel call.
call_context(), call_combiner_};

@ -287,7 +287,6 @@ class ClientChannelFilter final {
// Fields set at construction and never modified.
//
ChannelArgs channel_args_;
const bool deadline_checking_enabled_;
grpc_channel_stack* owning_stack_;
ClientChannelFactory* client_channel_factory_;
RefCountedPtr<ServiceConfig> default_service_config_;
@ -558,7 +557,6 @@ class ClientChannelFilter::FilterBasedLoadBalancedCall final
// TODO(roth): Instead of duplicating these fields in every filter
// that uses any one of them, we should store them in the call
// context. This will save per-call memory overhead.
Timestamp deadline_;
Arena* arena_;
grpc_call_stack* owning_call_;
CallCombiner* call_combiner_;

@ -1,408 +0,0 @@
//
// Copyright 2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/deadline/deadline_filter.h"
#include <functional>
#include <memory>
#include <new>
#include <utility>
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include <grpc/impl/channel_arg_names.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/metadata_batch.h"
namespace grpc_core {
// A fire-and-forget class representing a pending deadline timer.
// Allocated on the call arena.
class TimerState {
public:
TimerState(grpc_deadline_state* deadline_state, Timestamp deadline)
: deadline_state_(deadline_state) {
GRPC_CALL_STACK_REF(deadline_state->call_stack, "DeadlineTimerState");
GRPC_CLOSURE_INIT(&closure_, TimerCallback, this, nullptr);
grpc_timer_init(&timer_, deadline, &closure_);
}
void Cancel() { grpc_timer_cancel(&timer_); }
private:
// The on_complete callback used when sending a cancel_error batch down the
// filter stack. Yields the call combiner when the batch returns.
static void YieldCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
TimerState* self = static_cast<TimerState*>(arg);
GRPC_CALL_COMBINER_STOP(self->deadline_state_->call_combiner,
"got on_complete from cancel_stream batch");
GRPC_CALL_STACK_UNREF(self->deadline_state_->call_stack,
"DeadlineTimerState");
}
// This is called via the call combiner, so access to deadline_state is
// synchronized.
static void SendCancelOpInCallCombiner(void* arg, grpc_error_handle error) {
TimerState* self = static_cast<TimerState*>(arg);
grpc_transport_stream_op_batch* batch = grpc_make_transport_stream_op(
GRPC_CLOSURE_INIT(&self->closure_, YieldCallCombiner, self, nullptr));
batch->cancel_stream = true;
batch->payload->cancel_stream.cancel_error = error;
grpc_call_element* elem = self->deadline_state_->elem;
elem->filter->start_transport_stream_op_batch(elem, batch);
}
// Timer callback.
static void TimerCallback(void* arg, grpc_error_handle error) {
TimerState* self = static_cast<TimerState*>(arg);
if (error != absl::CancelledError()) {
error = grpc_error_set_int(GRPC_ERROR_CREATE("Deadline Exceeded"),
StatusIntProperty::kRpcStatus,
GRPC_STATUS_DEADLINE_EXCEEDED);
self->deadline_state_->call_combiner->Cancel(error);
GRPC_CLOSURE_INIT(&self->closure_, SendCancelOpInCallCombiner, self,
nullptr);
GRPC_CALL_COMBINER_START(self->deadline_state_->call_combiner,
&self->closure_, error,
"deadline exceeded -- sending cancel_stream op");
} else {
GRPC_CALL_STACK_UNREF(self->deadline_state_->call_stack,
"DeadlineTimerState");
}
}
// NOTE: This object's dtor is never called, so do not add any data
// members that require destruction!
// TODO(roth): We should ideally call this object's dtor somewhere,
// but that would require adding more synchronization, because we'd
// need to call the dtor only after both (a) the timer callback
// finishes and (b) the filter sees the call completion and attempts
// to cancel the timer.
grpc_deadline_state* deadline_state_;
grpc_timer timer_;
grpc_closure closure_;
};
} // namespace grpc_core
//
// grpc_deadline_state
//
// Starts the deadline timer.
// This is called via the call combiner, so access to deadline_state is
// synchronized.
static void start_timer_if_needed(grpc_deadline_state* deadline_state,
grpc_core::Timestamp deadline) {
if (deadline == grpc_core::Timestamp::InfFuture()) return;
GPR_ASSERT(deadline_state->timer_state == nullptr);
deadline_state->timer_state =
deadline_state->arena->New<grpc_core::TimerState>(deadline_state,
deadline);
}
// Cancels the deadline timer.
// This is called via the call combiner, so access to deadline_state is
// synchronized.
static void cancel_timer_if_needed(grpc_deadline_state* deadline_state) {
if (deadline_state->timer_state != nullptr) {
deadline_state->timer_state->Cancel();
deadline_state->timer_state = nullptr;
}
}
// Callback run when we receive trailing metadata.
static void recv_trailing_metadata_ready(void* arg, grpc_error_handle error) {
grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
cancel_timer_if_needed(deadline_state);
// Invoke the original callback.
grpc_core::Closure::Run(DEBUG_LOCATION,
deadline_state->original_recv_trailing_metadata_ready,
error);
}
// Inject our own recv_trailing_metadata_ready callback into op.
static void inject_recv_trailing_metadata_ready(
grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
deadline_state->original_recv_trailing_metadata_ready =
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready,
recv_trailing_metadata_ready, deadline_state,
grpc_schedule_on_exec_ctx);
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&deadline_state->recv_trailing_metadata_ready;
}
// Callback and associated state for starting the timer after call stack
// initialization has been completed.
struct start_timer_after_init_state {
start_timer_after_init_state(grpc_deadline_state* deadline_state,
grpc_core::Timestamp deadline)
: deadline_state(deadline_state), deadline(deadline) {}
~start_timer_after_init_state() {
start_timer_if_needed(deadline_state, deadline);
}
bool in_call_combiner = false;
grpc_deadline_state* deadline_state;
grpc_core::Timestamp deadline;
grpc_closure closure;
};
static void start_timer_after_init(void* arg, grpc_error_handle error) {
struct start_timer_after_init_state* state =
static_cast<struct start_timer_after_init_state*>(arg);
grpc_deadline_state* deadline_state = state->deadline_state;
if (!state->in_call_combiner) {
// We are initially called without holding the call combiner, so we
// need to bounce ourselves into it.
state->in_call_combiner = true;
GRPC_CALL_COMBINER_START(deadline_state->call_combiner, &state->closure,
error, "scheduling deadline timer");
return;
}
delete state;
GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
"done scheduling deadline timer");
}
grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem,
const grpc_call_element_args& args,
grpc_core::Timestamp deadline)
: elem(elem),
call_stack(args.call_stack),
call_combiner(args.call_combiner),
arena(args.arena) {
// Deadline will always be infinite on servers, so the timer will only be
// set on clients with a finite deadline.
if (deadline != grpc_core::Timestamp::InfFuture()) {
// When the deadline passes, we indicate the failure by sending down
// an op with cancel_error set. However, we can't send down any ops
// until after the call stack is fully initialized. If we start the
// timer here, we have no guarantee that the timer won't pop before
// call stack initialization is finished. To avoid that problem, we
// create a closure to start the timer, and we schedule that closure
// to be run after call stack initialization is done.
struct start_timer_after_init_state* state =
new start_timer_after_init_state(this, deadline);
GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state,
grpc_schedule_on_exec_ctx);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->closure, absl::OkStatus());
}
}
grpc_deadline_state::~grpc_deadline_state() { cancel_timer_if_needed(this); }
void grpc_deadline_state_reset(grpc_deadline_state* deadline_state,
grpc_core::Timestamp new_deadline) {
cancel_timer_if_needed(deadline_state);
start_timer_if_needed(deadline_state, new_deadline);
}
void grpc_deadline_state_client_start_transport_stream_op_batch(
grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
if (op->cancel_stream) {
cancel_timer_if_needed(deadline_state);
} else {
// Make sure we know when the call is complete, so that we can cancel
// the timer.
if (op->recv_trailing_metadata) {
inject_recv_trailing_metadata_ready(deadline_state, op);
}
}
}
//
// filter code
//
// Constructor for channel_data. Used for both client and server filters.
static grpc_error_handle deadline_init_channel_elem(
grpc_channel_element* /*elem*/, grpc_channel_element_args* args) {
GPR_ASSERT(!args->is_last);
return absl::OkStatus();
}
// Destructor for channel_data. Used for both client and server filters.
static void deadline_destroy_channel_elem(grpc_channel_element* /*elem*/) {}
// Additional call data used only for the server filter.
struct server_call_data {
grpc_deadline_state deadline_state; // Must be first.
// The closure for receiving initial metadata.
grpc_closure recv_initial_metadata_ready;
// Received initial metadata batch.
grpc_metadata_batch* recv_initial_metadata;
// The original recv_initial_metadata_ready closure, which we chain to
// after our own closure is invoked.
grpc_closure* next_recv_initial_metadata_ready;
};
// Constructor for call_data. Used for both client and server filters.
static grpc_error_handle deadline_init_call_elem(
grpc_call_element* elem, const grpc_call_element_args* args) {
new (elem->call_data) grpc_deadline_state(elem, *args, args->deadline);
return absl::OkStatus();
}
// Destructor for call_data. Used for both client and server filters.
static void deadline_destroy_call_elem(
grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/) {
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
deadline_state->~grpc_deadline_state();
}
// Method for starting a call op for client filter.
static void deadline_client_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
grpc_deadline_state_client_start_transport_stream_op_batch(
static_cast<grpc_deadline_state*>(elem->call_data), op);
// Chain to next filter.
grpc_call_next_op(elem, op);
}
// Callback for receiving initial metadata on the server.
static void recv_initial_metadata_ready(void* arg, grpc_error_handle error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
start_timer_if_needed(
&calld->deadline_state,
calld->recv_initial_metadata->get(grpc_core::GrpcTimeoutMetadata())
.value_or(grpc_core::Timestamp::InfFuture()));
// Invoke the next callback.
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->next_recv_initial_metadata_ready, error);
}
// Method for starting a call op for server filter.
static void deadline_server_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
if (op->cancel_stream) {
cancel_timer_if_needed(&calld->deadline_state);
} else {
// If we're receiving initial metadata, we need to get the deadline
// from the recv_initial_metadata_ready callback. So we inject our
// own callback into that hook.
if (op->recv_initial_metadata) {
calld->next_recv_initial_metadata_ready =
op->payload->recv_initial_metadata.recv_initial_metadata_ready;
calld->recv_initial_metadata =
op->payload->recv_initial_metadata.recv_initial_metadata;
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
op->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->recv_initial_metadata_ready;
}
// Make sure we know when the call is complete, so that we can cancel
// the timer.
// Note that we trigger this on recv_trailing_metadata, even though
// the client never sends trailing metadata, because this is the
// hook that tells us when the call is complete on the server side.
if (op->recv_trailing_metadata) {
inject_recv_trailing_metadata_ready(&calld->deadline_state, op);
}
}
// Chain to next filter.
grpc_call_next_op(elem, op);
}
const grpc_channel_filter grpc_client_deadline_filter = {
deadline_client_start_transport_stream_op_batch,
[](grpc_channel_element*, grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory) {
return next_promise_factory(std::move(call_args));
},
/* init_call: */ nullptr,
grpc_channel_next_op,
sizeof(grpc_deadline_state),
deadline_init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
deadline_destroy_call_elem,
0, // sizeof(channel_data)
deadline_init_channel_elem,
grpc_channel_stack_no_post_init,
deadline_destroy_channel_elem,
grpc_channel_next_get_info,
"deadline",
};
const grpc_channel_filter grpc_server_deadline_filter = {
deadline_server_start_transport_stream_op_batch,
[](grpc_channel_element*, grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory) {
auto deadline = call_args.client_initial_metadata->get(
grpc_core::GrpcTimeoutMetadata());
if (deadline.has_value()) {
grpc_core::GetContext<grpc_core::CallContext>()->UpdateDeadline(
*deadline);
}
return next_promise_factory(std::move(call_args));
},
[](grpc_channel_element*, grpc_core::CallSpineInterface* spine) {
grpc_core::DownCast<grpc_core::PipeBasedCallSpine*>(spine)
->client_initial_metadata()
.receiver.InterceptAndMap([](grpc_core::ClientMetadataHandle md) {
auto deadline = md->get(grpc_core::GrpcTimeoutMetadata());
if (deadline.has_value()) {
grpc_core::GetContext<grpc_core::CallContext>()->UpdateDeadline(
*deadline);
}
return md;
});
},
grpc_channel_next_op,
sizeof(server_call_data),
deadline_init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
deadline_destroy_call_elem,
0, // sizeof(channel_data)
deadline_init_channel_elem,
grpc_channel_stack_no_post_init,
deadline_destroy_channel_elem,
grpc_channel_next_get_info,
"deadline",
};
namespace grpc_core {
void RegisterDeadlineFilter(CoreConfiguration::Builder* builder) {
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL, &grpc_client_deadline_filter)
.ExcludeFromMinimalStack()
.IfChannelArg(GRPC_ARG_ENABLE_DEADLINE_CHECKS, true);
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &grpc_server_deadline_filter)
.ExcludeFromMinimalStack()
.IfChannelArg(GRPC_ARG_ENABLE_DEADLINE_CHECKS, true);
}
} // namespace grpc_core

@ -1,85 +0,0 @@
//
// Copyright 2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_SRC_CORE_EXT_FILTERS_DEADLINE_DEADLINE_FILTER_H
#define GRPC_SRC_CORE_EXT_FILTERS_DEADLINE_DEADLINE_FILTER_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
class TimerState;
} // namespace grpc_core
// State used for filters that enforce call deadlines.
// Must be the first field in the filter's call_data.
struct grpc_deadline_state {
grpc_deadline_state(grpc_call_element* elem,
const grpc_call_element_args& args,
grpc_core::Timestamp deadline);
~grpc_deadline_state();
// We take a reference to the call stack for the timer callback.
grpc_call_element* elem;
grpc_call_stack* call_stack;
grpc_core::CallCombiner* call_combiner;
grpc_core::Arena* arena;
grpc_core::TimerState* timer_state = nullptr;
// Closure to invoke when we receive trailing metadata.
// We use this to cancel the timer.
grpc_closure recv_trailing_metadata_ready;
// The original recv_trailing_metadata_ready closure, which we chain to
// after our own closure is invoked.
grpc_closure* original_recv_trailing_metadata_ready;
};
// Cancels the existing timer and starts a new one with new_deadline.
//
// Note: It is generally safe to call this with an earlier deadline
// value than the current one, but not the reverse. No checks are done
// to ensure that the timer callback is not invoked while it is in the
// process of being reset, which means that attempting to increase the
// deadline may result in the timer being called twice.
//
// Note: Must be called while holding the call combiner.
void grpc_deadline_state_reset(grpc_deadline_state* deadline_state,
grpc_core::Timestamp new_deadline);
// To be called from the client-side filter's start_transport_stream_op_batch()
// method. Ensures that the deadline timer is cancelled when the call
// is completed.
//
// Note: It is the caller's responsibility to chain to the next filter if
// necessary after this function returns.
//
// Note: Must be called while holding the call combiner.
void grpc_deadline_state_client_start_transport_stream_op_batch(
grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op);
// Deadline filters for direct client channels and server channels.
// Note: Deadlines for non-direct client channels are handled by the
// client_channel filter.
extern const grpc_channel_filter grpc_client_deadline_filter;
extern const grpc_channel_filter grpc_server_deadline_filter;
#endif // GRPC_SRC_CORE_EXT_FILTERS_DEADLINE_DEADLINE_FILTER_H

@ -29,7 +29,6 @@
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/deadline/deadline_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/config/core_configuration.h"
@ -250,18 +249,10 @@ void RegisterMessageSizeFilter(CoreConfiguration::Builder* builder) {
builder->channel_init()
->RegisterFilter<ClientMessageSizeFilter>(GRPC_CLIENT_DIRECT_CHANNEL)
.ExcludeFromMinimalStack()
.If(HasMessageSizeLimits)
// TODO(ctiller): ordering constraint is here to match the ordering that
// existed prior to ordering constraints did. Re-examine the ordering of
// filters from first principles.
.Before({&grpc_client_deadline_filter});
.If(HasMessageSizeLimits);
builder->channel_init()
->RegisterFilter<ServerMessageSizeFilter>(GRPC_SERVER_CHANNEL)
.ExcludeFromMinimalStack()
.If(HasMessageSizeLimits)
// TODO(ctiller): ordering constraint is here to match the ordering that
// existed prior to ordering constraints did. Re-examine the ordering of
// filters from first principles.
.Before({&grpc_server_deadline_filter});
.If(HasMessageSizeLimits);
}
} // namespace grpc_core

@ -29,9 +29,12 @@
/// This enum represents the indexes into the array, where each index
/// contains a different type of value.
typedef enum {
/// grpc_call* associated with this context.
GRPC_CONTEXT_CALL = 0,
/// Value is either a \a grpc_client_security_context or a
/// \a grpc_server_security_context.
GRPC_CONTEXT_SECURITY = 0,
GRPC_CONTEXT_SECURITY,
/// Value is a \a census_context.
GRPC_CONTEXT_TRACING,
@ -68,10 +71,35 @@ struct grpc_call_context_element {
};
namespace grpc_core {
class Call;
// Bind the legacy context array into the new style structure
// TODO(ctiller): remove as we migrate these contexts to the new system.
template <>
struct ContextType<grpc_call_context_element> {};
// Also as a transition step allow exposing a GetContext<T> that can peek into
// the legacy context array.
namespace promise_detail {
template <typename T>
struct OldStyleContext;
template <>
struct OldStyleContext<Call> {
static constexpr grpc_context_index kIndex = GRPC_CONTEXT_CALL;
};
template <typename T>
class Context<T, absl::void_t<decltype(OldStyleContext<T>::kIndex)>> {
public:
static T* get() {
return static_cast<T*>(
GetContext<grpc_call_context_element>()[OldStyleContext<T>::kIndex]
.value);
}
};
} // namespace promise_detail
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_CHANNEL_CONTEXT_H

@ -177,7 +177,7 @@
(ie when all filters in a stack are promise based)
expiry: 2024/06/14
owner: ctiller@google.com
test_tags: ["core_end2end_test", "cpp_end2end_test", "xds_end2end_test", "logging_test"]
test_tags: ["core_end2end_test", "logging_test"]
- name: rstpit
description:
On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short duration

@ -113,164 +113,13 @@ grpc_core::DebugOnlyTraceFlag grpc_call_refcount_trace(false, "call_refcount");
namespace grpc_core {
// Alias to make this type available in Call implementation without a grpc_core
// prefix.
using GrpcClosure = Closure;
///////////////////////////////////////////////////////////////////////////////
// Call
class Call : public CppImplOf<Call, grpc_call> {
public:
Arena* arena() { return arena_; }
bool is_client() const { return is_client_; }
virtual void ContextSet(grpc_context_index elem, void* value,
void (*destroy)(void* value)) = 0;
virtual void* ContextGet(grpc_context_index elem) const = 0;
virtual bool Completed() = 0;
void CancelWithStatus(grpc_status_code status, const char* description);
virtual void CancelWithError(grpc_error_handle error) = 0;
virtual void SetCompletionQueue(grpc_completion_queue* cq) = 0;
char* GetPeer();
virtual grpc_call_error StartBatch(const grpc_op* ops, size_t nops,
void* notify_tag,
bool is_notify_tag_closure) = 0;
virtual bool failed_before_recv_message() const = 0;
virtual bool is_trailers_only() const = 0;
virtual absl::string_view GetServerAuthority() const = 0;
virtual void ExternalRef() = 0;
virtual void ExternalUnref() = 0;
virtual void InternalRef(const char* reason) = 0;
virtual void InternalUnref(const char* reason) = 0;
grpc_compression_algorithm test_only_compression_algorithm() {
return incoming_compression_algorithm_;
}
uint32_t test_only_message_flags() { return test_only_last_message_flags_; }
CompressionAlgorithmSet encodings_accepted_by_peer() {
return encodings_accepted_by_peer_;
}
// This should return nullptr for the promise stack (and alternative means
// for that functionality be invented)
virtual grpc_call_stack* call_stack() = 0;
// Return the EventEngine used for this call's async execution.
virtual grpc_event_engine::experimental::EventEngine* event_engine()
const = 0;
protected:
// The maximum number of concurrent batches possible.
// Based upon the maximum number of individually queueable ops in the batch
// api:
// - initial metadata send
// - message send
// - status/close send (depending on client/server)
// - initial metadata recv
// - message recv
// - status/close recv (depending on client/server)
static constexpr size_t kMaxConcurrentBatches = 6;
struct ParentCall {
Mutex child_list_mu;
Call* first_child ABSL_GUARDED_BY(child_list_mu) = nullptr;
};
struct ChildCall {
explicit ChildCall(Call* parent) : parent(parent) {}
Call* parent;
/// siblings: children of the same parent form a list, and this list is
/// protected under
/// parent->mu
Call* sibling_next = nullptr;
Call* sibling_prev = nullptr;
};
Call(Arena* arena, bool is_client, Timestamp send_deadline,
RefCountedPtr<Channel> channel)
: channel_(std::move(channel)),
arena_(arena),
send_deadline_(send_deadline),
is_client_(is_client) {
GPR_DEBUG_ASSERT(arena_ != nullptr);
GPR_DEBUG_ASSERT(channel_ != nullptr);
}
virtual ~Call() = default;
void DeleteThis();
ParentCall* GetOrCreateParentCall();
ParentCall* parent_call();
Channel* channel() const {
GPR_DEBUG_ASSERT(channel_ != nullptr);
return channel_.get();
}
absl::Status InitParent(Call* parent, uint32_t propagation_mask);
void PublishToParent(Call* parent);
void MaybeUnpublishFromParent();
void PropagateCancellationToChildren();
Timestamp send_deadline() const { return send_deadline_; }
void set_send_deadline(Timestamp send_deadline) {
send_deadline_ = send_deadline;
}
Slice GetPeerString() const {
MutexLock lock(&peer_mu_);
return peer_string_.Ref();
}
void SetPeerString(Slice peer_string) {
MutexLock lock(&peer_mu_);
peer_string_ = std::move(peer_string);
}
void ClearPeerString() { SetPeerString(Slice(grpc_empty_slice())); }
// TODO(ctiller): cancel_func is for cancellation of the call - filter stack
// holds no mutexes here, promise stack does, and so locking is different.
// Remove this and cancel directly once promise conversion is done.
void ProcessIncomingInitialMetadata(grpc_metadata_batch& md);
// Fixup outgoing metadata before sending - adds compression, protects
// internal headers against external modification.
void PrepareOutgoingInitialMetadata(const grpc_op& op,
grpc_metadata_batch& md);
void NoteLastMessageFlags(uint32_t flags) {
test_only_last_message_flags_ = flags;
}
grpc_compression_algorithm incoming_compression_algorithm() const {
return incoming_compression_algorithm_;
}
void HandleCompressionAlgorithmDisabled(
grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE;
void HandleCompressionAlgorithmNotAccepted(
grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE;
gpr_cycle_counter start_time() const { return start_time_; }
private:
RefCountedPtr<Channel> channel_;
Arena* const arena_;
std::atomic<ParentCall*> parent_call_{nullptr};
ChildCall* child_ = nullptr;
Timestamp send_deadline_;
const bool is_client_;
// flag indicating that cancellation is inherited
bool cancellation_is_inherited_ = false;
// Compression algorithm for *incoming* data
grpc_compression_algorithm incoming_compression_algorithm_ =
GRPC_COMPRESS_NONE;
// Supported encodings (compression algorithms), a bitset.
// Always support no compression.
CompressionAlgorithmSet encodings_accepted_by_peer_{GRPC_COMPRESS_NONE};
uint32_t test_only_last_message_flags_ = 0;
// Peer name is protected by a mutex because it can be accessed by the
// application at the same moment as it is being set by the completion
// of the recv_initial_metadata op. The mutex should be mostly uncontended.
mutable Mutex peer_mu_;
Slice peer_string_;
gpr_cycle_counter start_time_ = gpr_get_cycle_counter();
};
Call::ParentCall* Call::GetOrCreateParentCall() {
ParentCall* p = parent_call_.load(std::memory_order_acquire);
if (p == nullptr) {
@ -503,6 +352,43 @@ void Call::HandleCompressionAlgorithmDisabled(
GRPC_STATUS_UNIMPLEMENTED));
}
void Call::UpdateDeadline(Timestamp deadline) {
MutexLock lock(&deadline_mu_);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "[call %p] UpdateDeadline from=%s to=%s", this,
deadline_.ToString().c_str(), deadline.ToString().c_str());
}
if (deadline >= deadline_) return;
auto* const event_engine = channel()->event_engine();
if (deadline_ != Timestamp::InfFuture()) {
if (!event_engine->Cancel(deadline_task_)) return;
} else {
InternalRef("deadline");
}
deadline_ = deadline;
deadline_task_ = event_engine->RunAfter(deadline - Timestamp::Now(), this);
}
void Call::ResetDeadline() {
{
MutexLock lock(&deadline_mu_);
if (deadline_ == Timestamp::InfFuture()) return;
auto* const event_engine = channel()->event_engine();
if (!event_engine->Cancel(deadline_task_)) return;
deadline_ = Timestamp::InfFuture();
}
InternalUnref("deadline[reset]");
}
void Call::Run() {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
CancelWithError(grpc_error_set_int(
absl::DeadlineExceededError("Deadline Exceeded"),
StatusIntProperty::kRpcStatus, GRPC_STATUS_DEADLINE_EXCEEDED));
InternalUnref("deadline[run]");
}
///////////////////////////////////////////////////////////////////////////////
// FilterStackCall
// To be removed once promise conversion is complete
@ -678,7 +564,9 @@ class FilterStackCall final : public Call {
: Call(arena, args.server_transport_data == nullptr, args.send_deadline,
args.channel->Ref()),
cq_(args.cq),
stream_op_payload_(context_) {}
stream_op_payload_(context_) {
context_[GRPC_CONTEXT_CALL].value = this;
}
static void ReleaseCall(void* call, grpc_error_handle);
static void DestroyCall(void* call, grpc_error_handle);
@ -912,6 +800,10 @@ grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args,
}
}
if (args->send_deadline != Timestamp::InfFuture()) {
call->UpdateDeadline(args->send_deadline);
}
CSliceUnref(path);
return error;
@ -1027,8 +919,13 @@ void FilterStackCall::CancelWithError(grpc_error_handle error) {
if (!gpr_atm_rel_cas(&cancelled_with_error_, 0, 1)) {
return;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) {
gpr_log(GPR_INFO, "CancelWithError %s %s", is_client() ? "CLI" : "SVR",
StatusToString(error).c_str());
}
ClearPeerString();
InternalRef("termination");
ResetDeadline();
// Inform the call combiner of the cancellation, so that it can cancel
// any in-flight asynchronous actions that may be holding the call
// combiner. This ensures that the cancel_stream batch can be sent
@ -1047,9 +944,10 @@ void FilterStackCall::CancelWithError(grpc_error_handle error) {
void FilterStackCall::SetFinalStatus(grpc_error_handle error) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) {
gpr_log(GPR_DEBUG, "set_final_status %s %s", is_client() ? "CLI" : "SVR",
gpr_log(GPR_INFO, "set_final_status %s %s", is_client() ? "CLI" : "SVR",
StatusToString(error).c_str());
}
ResetDeadline();
if (is_client()) {
std::string status_details;
grpc_error_get_status(error, send_deadline(), final_op_.client.status,
@ -1353,9 +1251,9 @@ void FilterStackCall::BatchControl::PostCompletion() {
if (completion_data_.notify_tag.is_closure) {
call_ = nullptr;
Closure::Run(DEBUG_LOCATION,
static_cast<grpc_closure*>(completion_data_.notify_tag.tag),
error);
GrpcClosure::Run(
DEBUG_LOCATION,
static_cast<grpc_closure*>(completion_data_.notify_tag.tag), error);
call->InternalUnref("completion");
} else {
grpc_cq_end_op(
@ -1477,7 +1375,7 @@ void FilterStackCall::BatchControl::ReceivingInitialMetadataReady(
}
}
if (saved_rsr_closure != nullptr) {
Closure::Run(DEBUG_LOCATION, saved_rsr_closure, error);
GrpcClosure::Run(DEBUG_LOCATION, saved_rsr_closure, error);
}
FinishStep(PendingOp::kRecvInitialMetadata);
@ -1968,10 +1866,7 @@ bool ValidateMetadata(size_t count, grpc_metadata* metadata) {
// PromiseBasedCall
// Will be folded into Call once the promise conversion is done
class BasicPromiseBasedCall : public Call,
public Party,
public grpc_event_engine::experimental::
EventEngine::Closure /* for deadlines */ {
class BasicPromiseBasedCall : public Call, public Party {
public:
using Call::arena;
@ -1986,6 +1881,7 @@ class BasicPromiseBasedCall : public Call,
if (args.cq != nullptr) {
GRPC_CQ_INTERNAL_REF(args.cq, "bind");
}
context_[GRPC_CONTEXT_CALL].value = this;
}
~BasicPromiseBasedCall() override {
@ -1997,9 +1893,6 @@ class BasicPromiseBasedCall : public Call,
}
}
// Implementation of EventEngine::Closure, called when deadline expires
void Run() final;
virtual void OrphanCall() = 0;
virtual ServerCallContext* server_call_context() { return nullptr; }
@ -2057,13 +1950,6 @@ class BasicPromiseBasedCall : public Call,
return context_[elem].value;
}
void UpdateDeadline(Timestamp deadline) ABSL_LOCKS_EXCLUDED(deadline_mu_);
void ResetDeadline() ABSL_LOCKS_EXCLUDED(deadline_mu_);
Timestamp deadline() {
MutexLock lock(&deadline_mu_);
return deadline_;
}
// Accept the stats from the context (call once we have proof the transport is
// done with them).
void AcceptTransportStatsFromContext() {
@ -2139,52 +2025,11 @@ class BasicPromiseBasedCall : public Call,
// Contexts for various subsystems (security, tracing, ...).
grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {};
grpc_call_stats final_stats_{};
// Current deadline.
Mutex deadline_mu_;
Timestamp deadline_ ABSL_GUARDED_BY(deadline_mu_) = Timestamp::InfFuture();
grpc_event_engine::experimental::EventEngine::TaskHandle ABSL_GUARDED_BY(
deadline_mu_) deadline_task_;
Slice final_message_;
grpc_status_code final_status_ = GRPC_STATUS_UNKNOWN;
grpc_completion_queue* cq_;
};
void BasicPromiseBasedCall::UpdateDeadline(Timestamp deadline) {
MutexLock lock(&deadline_mu_);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[call] UpdateDeadline from=%s to=%s",
DebugTag().c_str(), deadline_.ToString().c_str(),
deadline.ToString().c_str());
}
if (deadline >= deadline_) return;
auto* const event_engine = channel()->event_engine();
if (deadline_ != Timestamp::InfFuture()) {
if (!event_engine->Cancel(deadline_task_)) return;
} else {
InternalRef("deadline");
}
deadline_ = deadline;
deadline_task_ = event_engine->RunAfter(deadline - Timestamp::Now(), this);
}
void BasicPromiseBasedCall::ResetDeadline() {
{
MutexLock lock(&deadline_mu_);
if (deadline_ == Timestamp::InfFuture()) return;
auto* const event_engine = channel()->event_engine();
if (!event_engine->Cancel(deadline_task_)) return;
deadline_ = Timestamp::InfFuture();
}
InternalUnref("deadline[reset]");
}
void BasicPromiseBasedCall::Run() {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
CancelWithError(absl::DeadlineExceededError("Deadline exceeded"));
InternalUnref("deadline[run]");
}
class PromiseBasedCall : public BasicPromiseBasedCall {
public:
PromiseBasedCall(Arena* arena, uint32_t initial_external_refs,
@ -2680,12 +2525,6 @@ void CallContext::IncrementRefCount(const char* reason) {
void CallContext::Unref(const char* reason) { call_->InternalUnref(reason); }
void CallContext::UpdateDeadline(Timestamp deadline) {
call_->UpdateDeadline(deadline);
}
Timestamp CallContext::deadline() const { return call_->deadline(); }
ServerCallContext* CallContext::server_call_context() {
return call_->server_call_context();
}
@ -3697,7 +3536,12 @@ ServerPromiseBasedCall::MakeTopOfServerCallPromise(
server_to_client_messages_ = call_args.server_to_client_messages;
client_to_server_messages_ = call_args.client_to_server_messages;
server_initial_metadata_ = call_args.server_initial_metadata;
set_send_deadline(deadline());
absl::optional<Timestamp> deadline =
client_initial_metadata_->get(GrpcTimeoutMetadata());
if (deadline.has_value()) {
set_send_deadline(*deadline);
UpdateDeadline(*deadline);
}
ProcessIncomingInitialMetadata(*client_initial_metadata_);
ExternalRef();
publish(c_ptr());

@ -76,6 +76,179 @@ typedef struct grpc_call_create_args {
} grpc_call_create_args;
namespace grpc_core {
class Call : public CppImplOf<Call, grpc_call>,
public grpc_event_engine::experimental::EventEngine::
Closure /* for deadlines */ {
public:
Arena* arena() { return arena_; }
bool is_client() const { return is_client_; }
virtual void ContextSet(grpc_context_index elem, void* value,
void (*destroy)(void* value)) = 0;
virtual void* ContextGet(grpc_context_index elem) const = 0;
virtual bool Completed() = 0;
void CancelWithStatus(grpc_status_code status, const char* description);
virtual void CancelWithError(grpc_error_handle error) = 0;
virtual void SetCompletionQueue(grpc_completion_queue* cq) = 0;
char* GetPeer();
virtual grpc_call_error StartBatch(const grpc_op* ops, size_t nops,
void* notify_tag,
bool is_notify_tag_closure) = 0;
virtual bool failed_before_recv_message() const = 0;
virtual bool is_trailers_only() const = 0;
virtual absl::string_view GetServerAuthority() const = 0;
virtual void ExternalRef() = 0;
virtual void ExternalUnref() = 0;
virtual void InternalRef(const char* reason) = 0;
virtual void InternalUnref(const char* reason) = 0;
void UpdateDeadline(Timestamp deadline) ABSL_LOCKS_EXCLUDED(deadline_mu_);
void ResetDeadline() ABSL_LOCKS_EXCLUDED(deadline_mu_);
Timestamp deadline() {
MutexLock lock(&deadline_mu_);
return deadline_;
}
grpc_compression_algorithm test_only_compression_algorithm() {
return incoming_compression_algorithm_;
}
uint32_t test_only_message_flags() { return test_only_last_message_flags_; }
CompressionAlgorithmSet encodings_accepted_by_peer() {
return encodings_accepted_by_peer_;
}
// This should return nullptr for the promise stack (and alternative means
// for that functionality be invented)
virtual grpc_call_stack* call_stack() = 0;
// Return the EventEngine used for this call's async execution.
virtual grpc_event_engine::experimental::EventEngine* event_engine()
const = 0;
// Implementation of EventEngine::Closure, called when deadline expires
void Run() final;
protected:
// The maximum number of concurrent batches possible.
// Based upon the maximum number of individually queueable ops in the batch
// api:
// - initial metadata send
// - message send
// - status/close send (depending on client/server)
// - initial metadata recv
// - message recv
// - status/close recv (depending on client/server)
static constexpr size_t kMaxConcurrentBatches = 6;
struct ParentCall {
Mutex child_list_mu;
Call* first_child ABSL_GUARDED_BY(child_list_mu) = nullptr;
};
struct ChildCall {
explicit ChildCall(Call* parent) : parent(parent) {}
Call* parent;
/// siblings: children of the same parent form a list, and this list is
/// protected under
/// parent->mu
Call* sibling_next = nullptr;
Call* sibling_prev = nullptr;
};
Call(Arena* arena, bool is_client, Timestamp send_deadline,
RefCountedPtr<Channel> channel)
: channel_(std::move(channel)),
arena_(arena),
send_deadline_(send_deadline),
is_client_(is_client) {
GPR_DEBUG_ASSERT(arena_ != nullptr);
GPR_DEBUG_ASSERT(channel_ != nullptr);
}
~Call() override = default;
void DeleteThis();
ParentCall* GetOrCreateParentCall();
ParentCall* parent_call();
Channel* channel() const {
GPR_DEBUG_ASSERT(channel_ != nullptr);
return channel_.get();
}
absl::Status InitParent(Call* parent, uint32_t propagation_mask);
void PublishToParent(Call* parent);
void MaybeUnpublishFromParent();
void PropagateCancellationToChildren();
Timestamp send_deadline() const { return send_deadline_; }
void set_send_deadline(Timestamp send_deadline) {
send_deadline_ = send_deadline;
}
Slice GetPeerString() const {
MutexLock lock(&peer_mu_);
return peer_string_.Ref();
}
void SetPeerString(Slice peer_string) {
MutexLock lock(&peer_mu_);
peer_string_ = std::move(peer_string);
}
void ClearPeerString() { SetPeerString(Slice(grpc_empty_slice())); }
// TODO(ctiller): cancel_func is for cancellation of the call - filter stack
// holds no mutexes here, promise stack does, and so locking is different.
// Remove this and cancel directly once promise conversion is done.
void ProcessIncomingInitialMetadata(grpc_metadata_batch& md);
// Fixup outgoing metadata before sending - adds compression, protects
// internal headers against external modification.
void PrepareOutgoingInitialMetadata(const grpc_op& op,
grpc_metadata_batch& md);
void NoteLastMessageFlags(uint32_t flags) {
test_only_last_message_flags_ = flags;
}
grpc_compression_algorithm incoming_compression_algorithm() const {
return incoming_compression_algorithm_;
}
void HandleCompressionAlgorithmDisabled(
grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE;
void HandleCompressionAlgorithmNotAccepted(
grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE;
gpr_cycle_counter start_time() const { return start_time_; }
private:
RefCountedPtr<Channel> channel_;
Arena* const arena_;
std::atomic<ParentCall*> parent_call_{nullptr};
ChildCall* child_ = nullptr;
Timestamp send_deadline_;
const bool is_client_;
// flag indicating that cancellation is inherited
bool cancellation_is_inherited_ = false;
// Compression algorithm for *incoming* data
grpc_compression_algorithm incoming_compression_algorithm_ =
GRPC_COMPRESS_NONE;
// Supported encodings (compression algorithms), a bitset.
// Always support no compression.
CompressionAlgorithmSet encodings_accepted_by_peer_{GRPC_COMPRESS_NONE};
uint32_t test_only_last_message_flags_ = 0;
// Peer name is protected by a mutex because it can be accessed by the
// application at the same moment as it is being set by the completion
// of the recv_initial_metadata op. The mutex should be mostly uncontended.
mutable Mutex peer_mu_;
Slice peer_string_;
// Current deadline.
Mutex deadline_mu_;
Timestamp deadline_ ABSL_GUARDED_BY(deadline_mu_) = Timestamp::InfFuture();
grpc_event_engine::experimental::EventEngine::TaskHandle ABSL_GUARDED_BY(
deadline_mu_) deadline_task_;
gpr_cycle_counter start_time_ = gpr_get_cycle_counter();
};
class BasicPromiseBasedCall;
class ServerPromiseBasedCall;
@ -106,10 +279,6 @@ class CallContext {
public:
explicit CallContext(BasicPromiseBasedCall* call) : call_(call) {}
// Update the deadline (if deadline < the current deadline).
void UpdateDeadline(Timestamp deadline);
Timestamp deadline() const;
// Run some action in the call activity context. This is needed to adapt some
// legacy systems to promises, and will likely disappear once that conversion
// is complete.

@ -234,7 +234,7 @@ struct Server::RequestedCall {
template <typename OptionalPayload>
void Complete(OptionalPayload payload, ClientMetadata& md) {
Timestamp deadline = GetContext<CallContext>()->deadline();
Timestamp deadline = GetContext<Call>()->deadline();
switch (type) {
case RequestedCall::Type::BATCH_CALL:
GPR_ASSERT(!payload.has_value());
@ -1479,6 +1479,10 @@ void Server::ChannelData::InitCall(RefCountedPtr<CallSpineInterface> call) {
auto* rc = mr.TakeCall();
rc->Complete(std::move(std::get<0>(r)), *md);
auto* call_context = GetContext<CallContext>();
const auto* deadline = md->get_pointer(GrpcTimeoutMetadata());
if (deadline != nullptr) {
GetContext<Call>()->UpdateDeadline(*deadline);
}
*rc->call = call_context->c_call();
grpc_call_ref(*rc->call);
grpc_call_set_completion_queue(call_context->c_call(),
@ -1828,6 +1832,7 @@ void Server::CallData::RecvInitialMetadataReady(void* arg,
auto op_deadline = calld->recv_initial_metadata_->get(GrpcTimeoutMetadata());
if (op_deadline.has_value()) {
calld->deadline_ = *op_deadline;
Call::FromC(calld->call_)->UpdateDeadline(*op_deadline);
}
if (calld->host_.has_value() && calld->path_.has_value()) {
// do nothing

@ -926,7 +926,7 @@ struct StackData {
filter_destructor.push_back(FilterDestructor{
call_offset,
[](void* call_data) {
static_cast<typename FilterType::Call*>(call_data)->~Call();
Destruct(static_cast<typename FilterType::Call*>(call_data));
},
});
}

@ -44,7 +44,6 @@ extern void SecurityRegisterHandshakerFactories(
extern void RegisterClientAuthorityFilter(CoreConfiguration::Builder* builder);
extern void RegisterLegacyChannelIdleFilters(
CoreConfiguration::Builder* builder);
extern void RegisterDeadlineFilter(CoreConfiguration::Builder* builder);
extern void RegisterGrpcLbPolicy(CoreConfiguration::Builder* builder);
extern void RegisterHttpFilters(CoreConfiguration::Builder* builder);
extern void RegisterMessageSizeFilter(CoreConfiguration::Builder* builder);
@ -111,7 +110,6 @@ void BuildCoreConfiguration(CoreConfiguration::Builder* builder) {
RegisterConnectedChannel(builder);
RegisterGrpcLbPolicy(builder);
RegisterHttpFilters(builder);
RegisterDeadlineFilter(builder);
RegisterMessageSizeFilter(builder);
RegisterServiceConfigChannelArgFilter(builder);
RegisterResourceQuota(builder);

@ -38,7 +38,6 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/census/grpc_context.cc',
'src/core/ext/filters/channel_idle/idle_filter_state.cc',
'src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc',
'src/core/ext/filters/deadline/deadline_filter.cc',
'src/core/ext/filters/fault_injection/fault_injection_filter.cc',
'src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc',
'src/core/ext/filters/http/client/http_client_filter.cc',

@ -138,29 +138,29 @@ TEST(ChannelStackFilters, LooksAsExpected) {
// tests with a default stack
EXPECT_EQ(MakeStack("unknown", no_args, GRPC_CLIENT_DIRECT_CHANNEL),
std::vector<std::string>(
{"authority", "message_size", "deadline", "connected"}));
EXPECT_EQ(
MakeStack("unknown", no_args, GRPC_CLIENT_DIRECT_CHANNEL),
std::vector<std::string>({"authority", "message_size", "connected"}));
EXPECT_EQ(
MakeStack("unknown", no_args, GRPC_CLIENT_SUBCHANNEL),
std::vector<std::string>({"authority", "message_size", "connected"}));
EXPECT_EQ(MakeStack("unknown", no_args, GRPC_SERVER_CHANNEL),
std::vector<std::string>({"server", "message_size", "deadline",
"server_call_tracer", "connected"}));
std::vector<std::string>(
{"server", "message_size", "server_call_tracer", "connected"}));
EXPECT_EQ(
MakeStack("chttp2", no_args, GRPC_CLIENT_DIRECT_CHANNEL),
std::vector<std::string>({"authority", "message_size", "deadline",
"http-client", "compression", "connected"}));
std::vector<std::string>({"authority", "message_size", "http-client",
"compression", "connected"}));
EXPECT_EQ(
MakeStack("chttp2", no_args, GRPC_CLIENT_SUBCHANNEL),
std::vector<std::string>({"authority", "message_size", "http-client",
"compression", "connected"}));
EXPECT_EQ(MakeStack("chttp2", no_args, GRPC_SERVER_CHANNEL),
std::vector<std::string>({"server", "message_size", "deadline",
"http-server", "compression",
"server_call_tracer", "connected"}));
std::vector<std::string>({"server", "message_size", "http-server",
"compression", "server_call_tracer",
"connected"}));
EXPECT_EQ(MakeStack(nullptr, no_args, GRPC_CLIENT_CHANNEL),
std::vector<std::string>({"client_idle", "client-channel"}));
}

@ -181,6 +181,36 @@ class CoreEnd2endTest : public ::testing::Test {
void* p;
};
// Safe notification to use for core e2e tests.
// Since when we're fuzzing we don't run background threads, the normal
// Notification type isn't safe to wait on (for some background timer to fire
// for instance...), consequently we need to use this.
class TestNotification {
public:
explicit TestNotification(CoreEnd2endTest* test) : test_(test) {}
void WaitForNotificationWithTimeout(absl::Duration wait_time) {
if (g_is_fuzzing_core_e2e_tests) {
Timestamp end = Timestamp::Now() + Duration::NanosecondsRoundUp(
ToInt64Nanoseconds(wait_time));
while (true) {
if (base_.HasBeenNotified()) return;
auto now = Timestamp::Now();
if (now >= end) return;
test_->step_fn_(now - end);
}
} else {
base_.WaitForNotificationWithTimeout(wait_time);
}
}
void Notify() { base_.Notify(); }
private:
Notification base_;
CoreEnd2endTest* const test_;
};
// CallBuilder - results in a call to either grpc_channel_create_call or
// grpc_channel_create_registered_call.
// Affords a fluent interface to specify optional arguments.
@ -753,7 +783,14 @@ class CoreEnd2endTest : public ::testing::Test {
cq_,
g_is_fuzzing_core_e2e_tests ? CqVerifier::FailUsingGprCrashWithStdio
: CqVerifier::FailUsingGprCrash,
std::move(step_fn_));
step_fn_ == nullptr
? nullptr
: absl::AnyInvocable<void(
grpc_event_engine::experimental::EventEngine::Duration)
const>(
[this](
grpc_event_engine::experimental::EventEngine::Duration
d) { step_fn_(d); }));
}
return *cq_verifier_;
}

@ -57,8 +57,8 @@ namespace grpc_core {
namespace {
Mutex* g_mu;
Notification* g_client_call_ended_notify;
Notification* g_server_call_ended_notify;
CoreEnd2endTest::TestNotification* g_client_call_ended_notify;
CoreEnd2endTest::TestNotification* g_server_call_ended_notify;
class FakeCallTracer : public ClientCallTracer {
public:
@ -197,8 +197,8 @@ CORE_END2END_TEST(Http2FullstackSingleHopTest, StreamStats) {
GTEST_SKIP() << "Test needs http2_stats_fix experiment to be enabled";
}
g_mu = new Mutex();
g_client_call_ended_notify = new Notification();
g_server_call_ended_notify = new Notification();
g_client_call_ended_notify = new CoreEnd2endTest::TestNotification(this);
g_server_call_ended_notify = new CoreEnd2endTest::TestNotification(this);
GlobalStatsPluginRegistry::RegisterStatsPlugin(
std::make_shared<NewFakeStatsPlugin>());
auto send_from_client = RandomSlice(10);

@ -1131,8 +1131,6 @@ src/core/ext/filters/channel_idle/idle_filter_state.cc \
src/core/ext/filters/channel_idle/idle_filter_state.h \
src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc \
src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h \
src/core/ext/filters/deadline/deadline_filter.cc \
src/core/ext/filters/deadline/deadline_filter.h \
src/core/ext/filters/fault_injection/fault_injection_filter.cc \
src/core/ext/filters/fault_injection/fault_injection_filter.h \
src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc \

@ -936,8 +936,6 @@ src/core/ext/filters/channel_idle/idle_filter_state.cc \
src/core/ext/filters/channel_idle/idle_filter_state.h \
src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc \
src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h \
src/core/ext/filters/deadline/deadline_filter.cc \
src/core/ext/filters/deadline/deadline_filter.h \
src/core/ext/filters/fault_injection/fault_injection_filter.cc \
src/core/ext/filters/fault_injection/fault_injection_filter.h \
src/core/ext/filters/fault_injection/fault_injection_service_config_parser.cc \

Loading…
Cancel
Save