[channel-stack] Make ordering explicit (#32852)

Ditch the old priority scheme for ordering filters, instead explicitly
mark up before/after constraints.

---------

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/34714/head
Craig Tiller 1 year ago committed by GitHub
parent 4db2625512
commit 975184f04b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      BUILD
  2. 38
      CMakeLists.txt
  3. 2
      Makefile
  4. 2
      Package.swift
  5. 19
      build_autogenerated.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 2
      gRPC-C++.podspec
  9. 3
      gRPC-Core.podspec
  10. 2
      grpc.gemspec
  11. 3
      grpc.gyp
  12. 2
      package.xml
  13. 44
      src/core/BUILD
  14. 13
      src/core/ext/filters/backend_metrics/backend_metric_filter.cc
  15. 30
      src/core/ext/filters/channel_idle/channel_idle_filter.cc
  16. 4
      src/core/ext/filters/client_channel/client_channel.cc
  17. 21
      src/core/ext/filters/client_channel/client_channel_plugin.cc
  18. 16
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  19. 20
      src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
  20. 8
      src/core/ext/filters/client_channel/subchannel.cc
  21. 31
      src/core/ext/filters/deadline/deadline_filter.cc
  22. 4
      src/core/ext/filters/deadline/deadline_filter.h
  23. 28
      src/core/ext/filters/http/client_authority_filter.cc
  24. 67
      src/core/ext/filters/http/http_filters_plugin.cc
  25. 16
      src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
  26. 34
      src/core/ext/filters/logging/logging_filter.cc
  27. 63
      src/core/ext/filters/message_size/message_size_filter.cc
  28. 45
      src/core/ext/xds/xds_channel_stack_modifier.cc
  29. 8
      src/core/ext/xds/xds_channel_stack_modifier.h
  30. 10
      src/core/lib/channel/channel_stack.cc
  31. 19
      src/core/lib/channel/channel_stack_builder.h
  32. 20
      src/core/lib/channel/channel_stack_builder_impl.cc
  33. 19
      src/core/lib/channel/channel_stack_trace.cc
  34. 24
      src/core/lib/channel/channel_stack_trace.h
  35. 64
      src/core/lib/channel/connected_channel.cc
  36. 10
      src/core/lib/channel/server_call_tracer_filter.cc
  37. 31
      src/core/lib/surface/builtins.cc
  38. 5
      src/core/lib/surface/channel.cc
  39. 381
      src/core/lib/surface/channel_init.cc
  40. 176
      src/core/lib/surface/channel_init.h
  41. 59
      src/core/lib/surface/init.cc
  42. 2
      src/core/plugin_registry/grpc_plugin_registry.cc
  43. 87
      src/cpp/common/channel_filter.cc
  44. 13
      src/cpp/ext/filters/census/grpc_plugin.cc
  45. 1
      src/cpp/ext/otel/BUILD
  46. 24
      src/cpp/ext/otel/otel_plugin.cc
  47. 1
      src/python/grpcio/grpc_core_dependencies.py
  48. 118
      test/core/channel/channel_stack_builder_test.cc
  49. 10
      test/core/channel/minimal_stack_is_minimal_test.cc
  50. 7
      test/core/end2end/tests/filter_causes_close.cc
  51. 42
      test/core/end2end/tests/filter_context.cc
  52. 39
      test/core/end2end/tests/filter_init_fails.cc
  53. 20
      test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc
  54. 18
      test/core/end2end/tests/retry_recv_message_replay.cc
  55. 20
      test/core/end2end/tests/retry_recv_trailing_metadata_error.cc
  56. 18
      test/core/end2end/tests/retry_send_op_fails.cc
  57. 21
      test/core/end2end/tests/retry_transparent_goaway.cc
  58. 21
      test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc
  59. 14
      test/core/surface/BUILD
  60. 212
      test/core/surface/channel_init_test.cc
  61. 27
      test/core/transport/chttp2/streams_not_seen_test.cc
  62. 8
      test/core/xds/xds_channel_stack_modifier_test.cc
  63. 2
      test/cpp/ext/filters/logging/library.h
  64. 5
      test/cpp/ext/filters/logging/logging_census_integration_test.cc
  65. 2
      tools/doxygen/Doxyfile.c++.internal
  66. 2
      tools/doxygen/Doxyfile.core.internal
  67. 24
      tools/run_tests/generated/tests.json

@ -1543,6 +1543,7 @@ grpc_cc_library(
"//src/core:channel_args_preconditioning",
"//src/core:channel_fwd",
"//src/core:channel_init",
"//src/core:channel_stack_trace",
"//src/core:channel_stack_type",
"//src/core:chunked_vector",
"//src/core:closure",
@ -2359,7 +2360,6 @@ grpc_cc_library(
language = "c++",
visibility = ["@grpc:grpc_opencensus_plugin"],
deps = [
"channel_stack_builder",
"config",
"gpr",
"grpc++_base",
@ -2881,7 +2881,6 @@ grpc_cc_library(
"//src/core:channel_args",
"//src/core:channel_fwd",
"//src/core:channel_stack_type",
"//src/core:transport_fwd",
],
)
@ -3063,7 +3062,6 @@ grpc_cc_library(
deps = [
"backoff",
"channel_arg_names",
"channel_stack_builder",
"config",
"config_vars",
"debug_location",
@ -3110,6 +3108,7 @@ grpc_cc_library(
"//src/core:gpr_manual_constructor",
"//src/core:grpc_backend_metric_data",
"//src/core:grpc_deadline_filter",
"//src/core:grpc_message_size_filter",
"//src/core:grpc_service_config",
"//src/core:init_internally",
"//src/core:iomgr_fwd",
@ -3593,7 +3592,6 @@ grpc_cc_library(
visibility = ["@grpc:http"],
deps = [
"channel_arg_names",
"channel_stack_builder",
"config",
"gpr",
"grpc_base",
@ -3606,7 +3604,6 @@ grpc_cc_library(
"//src/core:arena_promise",
"//src/core:channel_args",
"//src/core:channel_fwd",
"//src/core:channel_init",
"//src/core:channel_stack_type",
"//src/core:context",
"//src/core:grpc_message_size_filter",

38
CMakeLists.txt generated

@ -924,6 +924,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx channel_args_test)
add_dependencies(buildtests_cxx channel_arguments_test)
add_dependencies(buildtests_cxx channel_creds_registry_test)
add_dependencies(buildtests_cxx channel_init_test)
add_dependencies(buildtests_cxx channel_stack_builder_test)
add_dependencies(buildtests_cxx channel_stack_test)
add_dependencies(buildtests_cxx channel_trace_test)
@ -2201,6 +2202,7 @@ add_library(grpc
src/core/lib/channel/channel_stack.cc
src/core/lib/channel/channel_stack_builder.cc
src/core/lib/channel/channel_stack_builder_impl.cc
src/core/lib/channel/channel_stack_trace.cc
src/core/lib/channel/channel_trace.cc
src/core/lib/channel/channelz.cc
src/core/lib/channel/channelz_registry.cc
@ -2923,6 +2925,7 @@ add_library(grpc_unsecure
src/core/lib/channel/channel_stack.cc
src/core/lib/channel/channel_stack_builder.cc
src/core/lib/channel/channel_stack_builder_impl.cc
src/core/lib/channel/channel_stack_trace.cc
src/core/lib/channel/channel_trace.cc
src/core/lib/channel/channelz.cc
src/core/lib/channel/channelz_registry.cc
@ -4878,6 +4881,7 @@ add_library(grpc_authorization_provider
src/core/lib/channel/channel_stack.cc
src/core/lib/channel/channel_stack_builder.cc
src/core/lib/channel/channel_stack_builder_impl.cc
src/core/lib/channel/channel_stack_trace.cc
src/core/lib/channel/channel_trace.cc
src/core/lib/channel/channelz.cc
src/core/lib/channel/channelz_registry.cc
@ -8469,6 +8473,39 @@ target_link_libraries(channel_creds_registry_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(channel_init_test
test/core/surface/channel_init_test.cc
)
target_compile_features(channel_init_test PUBLIC cxx_std_14)
target_include_directories(channel_init_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(channel_init_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)
@ -24079,6 +24116,7 @@ add_executable(test_core_transport_chaotic_good_frame_test
src/core/lib/channel/channel_stack.cc
src/core/lib/channel/channel_stack_builder.cc
src/core/lib/channel/channel_stack_builder_impl.cc
src/core/lib/channel/channel_stack_trace.cc
src/core/lib/channel/channel_trace.cc
src/core/lib/channel/channelz.cc
src/core/lib/channel/channelz_registry.cc

2
Makefile generated

@ -1418,6 +1418,7 @@ LIBGRPC_SRC = \
src/core/lib/channel/channel_stack.cc \
src/core/lib/channel/channel_stack_builder.cc \
src/core/lib/channel/channel_stack_builder_impl.cc \
src/core/lib/channel/channel_stack_trace.cc \
src/core/lib/channel/channel_trace.cc \
src/core/lib/channel/channelz.cc \
src/core/lib/channel/channelz_registry.cc \
@ -1992,6 +1993,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/channel/channel_stack.cc \
src/core/lib/channel/channel_stack_builder.cc \
src/core/lib/channel/channel_stack_builder_impl.cc \
src/core/lib/channel/channel_stack_trace.cc \
src/core/lib/channel/channel_trace.cc \
src/core/lib/channel/channelz.cc \
src/core/lib/channel/channelz_registry.cc \

2
Package.swift generated

@ -1037,6 +1037,8 @@ let package = Package(
"src/core/lib/channel/channel_stack_builder.h",
"src/core/lib/channel/channel_stack_builder_impl.cc",
"src/core/lib/channel/channel_stack_builder_impl.h",
"src/core/lib/channel/channel_stack_trace.cc",
"src/core/lib/channel/channel_stack_trace.h",
"src/core/lib/channel/channel_trace.cc",
"src/core/lib/channel/channel_trace.h",
"src/core/lib/channel/channelz.cc",

@ -674,6 +674,7 @@ libs:
- src/core/lib/channel/channel_stack.h
- src/core/lib/channel/channel_stack_builder.h
- src/core/lib/channel/channel_stack_builder_impl.h
- src/core/lib/channel/channel_stack_trace.h
- src/core/lib/channel/channel_trace.h
- src/core/lib/channel/channelz.h
- src/core/lib/channel/channelz_registry.h
@ -1502,6 +1503,7 @@ libs:
- src/core/lib/channel/channel_stack.cc
- src/core/lib/channel/channel_stack_builder.cc
- src/core/lib/channel/channel_stack_builder_impl.cc
- src/core/lib/channel/channel_stack_trace.cc
- src/core/lib/channel/channel_trace.cc
- src/core/lib/channel/channelz.cc
- src/core/lib/channel/channelz_registry.cc
@ -2101,6 +2103,7 @@ libs:
- src/core/lib/channel/channel_stack.h
- src/core/lib/channel/channel_stack_builder.h
- src/core/lib/channel/channel_stack_builder_impl.h
- src/core/lib/channel/channel_stack_trace.h
- src/core/lib/channel/channel_trace.h
- src/core/lib/channel/channelz.h
- src/core/lib/channel/channelz_registry.h
@ -2543,6 +2546,7 @@ libs:
- src/core/lib/channel/channel_stack.cc
- src/core/lib/channel/channel_stack_builder.cc
- src/core/lib/channel/channel_stack_builder_impl.cc
- src/core/lib/channel/channel_stack_trace.cc
- src/core/lib/channel/channel_trace.cc
- src/core/lib/channel/channelz.cc
- src/core/lib/channel/channelz_registry.cc
@ -4127,6 +4131,7 @@ libs:
- src/core/lib/channel/channel_stack.h
- src/core/lib/channel/channel_stack_builder.h
- src/core/lib/channel/channel_stack_builder_impl.h
- src/core/lib/channel/channel_stack_trace.h
- src/core/lib/channel/channel_trace.h
- src/core/lib/channel/channelz.h
- src/core/lib/channel/channelz_registry.h
@ -4444,6 +4449,7 @@ libs:
- src/core/lib/channel/channel_stack.cc
- src/core/lib/channel/channel_stack_builder.cc
- src/core/lib/channel/channel_stack_builder_impl.cc
- src/core/lib/channel/channel_stack_trace.cc
- src/core/lib/channel/channel_trace.cc
- src/core/lib/channel/channelz.cc
- src/core/lib/channel/channelz_registry.cc
@ -6600,6 +6606,17 @@ targets:
deps:
- gtest
- grpc_test_util
- name: channel_init_test
gtest: true
build: test
language: c++
headers: []
src:
- test/core/surface/channel_init_test.cc
deps:
- gtest
- grpc_test_util
uses_polling: false
- name: channel_stack_builder_test
gtest: true
build: test
@ -15738,6 +15755,7 @@ targets:
- src/core/lib/channel/channel_stack.h
- src/core/lib/channel/channel_stack_builder.h
- src/core/lib/channel/channel_stack_builder_impl.h
- src/core/lib/channel/channel_stack_trace.h
- src/core/lib/channel/channel_trace.h
- src/core/lib/channel/channelz.h
- src/core/lib/channel/channelz_registry.h
@ -16037,6 +16055,7 @@ targets:
- src/core/lib/channel/channel_stack.cc
- src/core/lib/channel/channel_stack_builder.cc
- src/core/lib/channel/channel_stack_builder_impl.cc
- src/core/lib/channel/channel_stack_trace.cc
- src/core/lib/channel/channel_trace.cc
- src/core/lib/channel/channelz.cc
- src/core/lib/channel/channelz_registry.cc

1
config.m4 generated

@ -505,6 +505,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/channel/channel_stack.cc \
src/core/lib/channel/channel_stack_builder.cc \
src/core/lib/channel/channel_stack_builder_impl.cc \
src/core/lib/channel/channel_stack_trace.cc \
src/core/lib/channel/channel_trace.cc \
src/core/lib/channel/channelz.cc \
src/core/lib/channel/channelz_registry.cc \

1
config.w32 generated

@ -470,6 +470,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\channel\\channel_stack.cc " +
"src\\core\\lib\\channel\\channel_stack_builder.cc " +
"src\\core\\lib\\channel\\channel_stack_builder_impl.cc " +
"src\\core\\lib\\channel\\channel_stack_trace.cc " +
"src\\core\\lib\\channel\\channel_trace.cc " +
"src\\core\\lib\\channel\\channelz.cc " +
"src\\core\\lib\\channel\\channelz_registry.cc " +

2
gRPC-C++.podspec generated

@ -746,6 +746,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/channel_stack.h',
'src/core/lib/channel/channel_stack_builder.h',
'src/core/lib/channel/channel_stack_builder_impl.h',
'src/core/lib/channel/channel_stack_trace.h',
'src/core/lib/channel/channel_trace.h',
'src/core/lib/channel/channelz.h',
'src/core/lib/channel/channelz_registry.h',
@ -1820,6 +1821,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/channel_stack.h',
'src/core/lib/channel/channel_stack_builder.h',
'src/core/lib/channel/channel_stack_builder_impl.h',
'src/core/lib/channel/channel_stack_trace.h',
'src/core/lib/channel/channel_trace.h',
'src/core/lib/channel/channelz.h',
'src/core/lib/channel/channelz_registry.h',

3
gRPC-Core.podspec generated

@ -1140,6 +1140,8 @@ Pod::Spec.new do |s|
'src/core/lib/channel/channel_stack_builder.h',
'src/core/lib/channel/channel_stack_builder_impl.cc',
'src/core/lib/channel/channel_stack_builder_impl.h',
'src/core/lib/channel/channel_stack_trace.cc',
'src/core/lib/channel/channel_stack_trace.h',
'src/core/lib/channel/channel_trace.cc',
'src/core/lib/channel/channel_trace.h',
'src/core/lib/channel/channelz.cc',
@ -2578,6 +2580,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/channel_stack.h',
'src/core/lib/channel/channel_stack_builder.h',
'src/core/lib/channel/channel_stack_builder_impl.h',
'src/core/lib/channel/channel_stack_trace.h',
'src/core/lib/channel/channel_trace.h',
'src/core/lib/channel/channelz.h',
'src/core/lib/channel/channelz_registry.h',

2
grpc.gemspec generated

@ -1043,6 +1043,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/channel/channel_stack_builder.h )
s.files += %w( src/core/lib/channel/channel_stack_builder_impl.cc )
s.files += %w( src/core/lib/channel/channel_stack_builder_impl.h )
s.files += %w( src/core/lib/channel/channel_stack_trace.cc )
s.files += %w( src/core/lib/channel/channel_stack_trace.h )
s.files += %w( src/core/lib/channel/channel_trace.cc )
s.files += %w( src/core/lib/channel/channel_trace.h )
s.files += %w( src/core/lib/channel/channelz.cc )

3
grpc.gyp generated

@ -736,6 +736,7 @@
'src/core/lib/channel/channel_stack.cc',
'src/core/lib/channel/channel_stack_builder.cc',
'src/core/lib/channel/channel_stack_builder_impl.cc',
'src/core/lib/channel/channel_stack_trace.cc',
'src/core/lib/channel/channel_trace.cc',
'src/core/lib/channel/channelz.cc',
'src/core/lib/channel/channelz_registry.cc',
@ -1251,6 +1252,7 @@
'src/core/lib/channel/channel_stack.cc',
'src/core/lib/channel/channel_stack_builder.cc',
'src/core/lib/channel/channel_stack_builder_impl.cc',
'src/core/lib/channel/channel_stack_trace.cc',
'src/core/lib/channel/channel_trace.cc',
'src/core/lib/channel/channelz.cc',
'src/core/lib/channel/channelz_registry.cc',
@ -1995,6 +1997,7 @@
'src/core/lib/channel/channel_stack.cc',
'src/core/lib/channel/channel_stack_builder.cc',
'src/core/lib/channel/channel_stack_builder_impl.cc',
'src/core/lib/channel/channel_stack_trace.cc',
'src/core/lib/channel/channel_trace.cc',
'src/core/lib/channel/channelz.cc',
'src/core/lib/channel/channelz_registry.cc',

2
package.xml generated

@ -1025,6 +1025,8 @@
<file baseinstalldir="/" name="src/core/lib/channel/channel_stack_builder.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/channel_stack_builder_impl.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/channel_stack_builder_impl.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/channel_stack_trace.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/channel_stack_trace.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/channel_trace.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/channel_trace.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/channelz.cc" role="src" />

@ -2612,6 +2612,21 @@ grpc_cc_library(
deps = ["//:gpr_platform"],
)
grpc_cc_library(
name = "channel_stack_trace",
srcs = [
"lib/channel/channel_stack_trace.cc",
],
hdrs = [
"lib/channel/channel_stack_trace.h",
],
language = "c++",
deps = [
"//:gpr_platform",
"//:grpc_trace",
],
)
grpc_cc_library(
name = "channel_init",
srcs = [
@ -2620,12 +2635,22 @@ grpc_cc_library(
hdrs = [
"lib/surface/channel_init.h",
],
external_deps = ["absl/functional:any_invocable"],
external_deps = [
"absl/functional:any_invocable",
"absl/strings",
"absl/types:optional",
],
language = "c++",
deps = [
"channel_args",
"channel_fwd",
"channel_stack_trace",
"channel_stack_type",
"//:channel_stack_builder",
"//:debug_location",
"//:gpr",
"//:gpr_platform",
"//:grpc_trace",
],
)
@ -3740,7 +3765,6 @@ grpc_cc_library(
"arena_promise",
"channel_args",
"channel_fwd",
"channel_init",
"channel_stack_type",
"closure",
"error",
@ -3758,7 +3782,6 @@ grpc_cc_library(
"time",
"try_seq",
"//:channel_arg_names",
"//:channel_stack_builder",
"//:config",
"//:debug_location",
"//:exec_ctx",
@ -3787,9 +3810,7 @@ grpc_cc_library(
deps = [
"arena",
"arena_promise",
"channel_args",
"channel_fwd",
"channel_init",
"channel_stack_type",
"closure",
"context",
@ -3797,7 +3818,6 @@ grpc_cc_library(
"status_helper",
"time",
"//:channel_arg_names",
"//:channel_stack_builder",
"//:config",
"//:debug_location",
"//:exec_ctx",
@ -3830,10 +3850,10 @@ grpc_cc_library(
"channel_stack_type",
"slice",
"//:channel_arg_names",
"//:channel_stack_builder",
"//:config",
"//:gpr_platform",
"//:grpc_base",
"//:grpc_security_base",
],
)
@ -3858,9 +3878,9 @@ grpc_cc_library(
"arena_promise",
"channel_args",
"channel_fwd",
"channel_init",
"channel_stack_type",
"context",
"grpc_deadline_filter",
"grpc_service_config",
"json",
"json_args",
@ -3872,7 +3892,6 @@ grpc_cc_library(
"slice_buffer",
"validation_errors",
"//:channel_arg_names",
"//:channel_stack_builder",
"//:config",
"//:gpr",
"//:grpc_base",
@ -4042,7 +4061,6 @@ grpc_cc_library(
"arena_promise",
"channel_args",
"channel_fwd",
"channel_init",
"channel_stack_type",
"closure",
"context",
@ -4071,7 +4089,6 @@ grpc_cc_library(
"validation_errors",
"//:backoff",
"//:channel_arg_names",
"//:channel_stack_builder",
"//:config",
"//:debug_location",
"//:endpoint_addresses",
@ -4408,6 +4425,7 @@ grpc_cc_library(
deps = [
"channel_args",
"channel_fwd",
"channel_init",
"channel_stack_type",
"ref_counted",
"useful",
@ -5284,7 +5302,6 @@ grpc_cc_library(
"seq",
"slice",
"//:channel_arg_names",
"//:channel_stack_builder",
"//:config",
"//:gpr",
"//:gpr_platform",
@ -5324,7 +5341,6 @@ grpc_cc_library(
"map",
"slice",
"//:channel_arg_names",
"//:channel_stack_builder",
"//:config",
"//:gpr",
"//:gpr_platform",
@ -6126,12 +6142,12 @@ grpc_cc_library(
"slice_buffer",
"time",
"//:channel_arg_names",
"//:channel_stack_builder",
"//:config",
"//:gpr",
"//:gpr_platform",
"//:grpc_base",
"//:grpc_client_channel",
"//:grpc_opencensus_plugin",
"//:grpc_public_hdrs",
"//:grpc_resolver",
"//:legacy_context",

@ -17,7 +17,6 @@
#include "src/core/ext/filters/backend_metrics/backend_metric_filter.h"
#include <inttypes.h>
#include <limits.h>
#include <stddef.h>
#include <functional>
@ -35,7 +34,6 @@
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
@ -151,14 +149,9 @@ ArenaPromise<ServerMetadataHandle> BackendMetricFilter::MakeCallPromise(
}
void RegisterBackendMetricFilter(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, INT_MAX, [](ChannelStackBuilder* builder) {
if (builder->channel_args().Contains(
GRPC_ARG_SERVER_CALL_METRIC_RECORDING)) {
builder->PrependFilter(&BackendMetricFilter::kFilter);
}
return true;
});
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &BackendMetricFilter::kFilter)
.IfHasChannelArg(GRPC_ARG_SERVER_CALL_METRIC_RECORDING);
}
} // namespace grpc_core

@ -31,7 +31,6 @@
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
@ -51,7 +50,6 @@
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/sleep.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/http2_errors.h"
#include "src/core/lib/transport/metadata_batch.h"
@ -300,25 +298,17 @@ const grpc_channel_filter MaxAgeFilter::kFilter =
MakePromiseBasedFilter<MaxAgeFilter, FilterEndpoint::kServer>("max_age");
void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](ChannelStackBuilder* builder) {
auto channel_args = builder->channel_args();
if (!channel_args.WantMinimalStack() &&
GetClientIdleTimeout(channel_args) != Duration::Infinity()) {
builder->PrependFilter(&ClientIdleFilter::kFilter);
}
return true;
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_CHANNEL, &ClientIdleFilter::kFilter)
.ExcludeFromMinimalStack()
.If([](const ChannelArgs& channel_args) {
return GetClientIdleTimeout(channel_args) != Duration::Infinity();
});
builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](ChannelStackBuilder* builder) {
auto channel_args = builder->channel_args();
if (!channel_args.WantMinimalStack() &&
MaxAgeFilter::Config::FromChannelArgs(channel_args).enable()) {
builder->PrependFilter(&MaxAgeFilter::kFilter);
}
return true;
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &MaxAgeFilter::kFilter)
.ExcludeFromMinimalStack()
.If([](const ChannelArgs& channel_args) {
return MaxAgeFilter::Config::FromChannelArgs(channel_args).enable();
});
}

@ -1212,7 +1212,9 @@ RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
ClientChannel::ClientChannel(grpc_channel_element_args* args,
grpc_error_handle* error)
: channel_args_(args->channel_args),
deadline_checking_enabled_(grpc_deadline_checking_enabled(channel_args_)),
deadline_checking_enabled_(
channel_args_.GetBool(GRPC_ARG_ENABLE_DEADLINE_CHECKS)
.value_or(!channel_args_.WantMinimalStack())),
owning_stack_(args->channel_stack),
client_channel_factory_(channel_args_.GetObject<ClientChannelFactory>()),
channelz_node_(channel_args_.GetObject<channelz::ChannelNode>()),

@ -26,9 +26,7 @@
#include "src/core/ext/filters/client_channel/client_channel_service_config.h"
#include "src/core/ext/filters/client_channel/retry_service_config.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
namespace grpc_core {
@ -42,15 +40,16 @@ bool IsEverythingBelowClientChannelPromiseSafe(const ChannelArgs& args) {
void BuildClientChannelConfiguration(CoreConfiguration::Builder* builder) {
internal::ClientChannelServiceConfigParser::Register(builder);
internal::RetryServiceConfigParser::Register(builder);
builder->channel_init()->RegisterStage(
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](ChannelStackBuilder* builder) {
builder->AppendFilter(
IsEverythingBelowClientChannelPromiseSafe(builder->channel_args())
? &ClientChannel::kFilterVtableWithPromises
: &ClientChannel::kFilterVtableWithoutPromises);
return true;
});
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_CHANNEL,
&ClientChannel::kFilterVtableWithPromises)
.If(IsEverythingBelowClientChannelPromiseSafe)
.Terminal();
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_CHANNEL,
&ClientChannel::kFilterVtableWithoutPromises)
.IfNot(IsEverythingBelowClientChannelPromiseSafe)
.Terminal();
}
} // namespace grpc_core

@ -102,7 +102,6 @@
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
@ -140,7 +139,6 @@
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata_batch.h"
@ -1872,16 +1870,10 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
void RegisterGrpcLbPolicy(CoreConfiguration::Builder* builder) {
builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
std::make_unique<GrpcLbFactory>());
builder->channel_init()->RegisterStage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](ChannelStackBuilder* builder) {
auto enable = builder->channel_args().GetBool(
GRPC_ARG_GRPCLB_ENABLE_LOAD_REPORTING_FILTER);
if (enable.has_value() && *enable) {
builder->PrependFilter(&ClientLoadReportingFilter::kFilter);
}
return true;
});
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&ClientLoadReportingFilter::kFilter)
.IfChannelArg(GRPC_ARG_GRPCLB_ENABLE_LOAD_REPORTING_FILTER, false);
}
} // namespace grpc_core

@ -31,10 +31,10 @@
#include <grpc/impl/channel_arg_names.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/message_size/message_size_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
@ -46,7 +46,6 @@
#include "src/core/lib/service_config/service_config_call_data.h"
#include "src/core/lib/service_config/service_config_impl.h"
#include "src/core/lib/service_config/service_config_parser.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
@ -108,17 +107,12 @@ const grpc_channel_filter kServiceConfigChannelArgFilter =
void RegisterServiceConfigChannelArgFilter(
CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](ChannelStackBuilder* builder) {
auto channel_args = builder->channel_args();
if (channel_args.WantMinimalStack() ||
!channel_args.GetString(GRPC_ARG_SERVICE_CONFIG).has_value()) {
return true;
}
builder->PrependFilter(&kServiceConfigChannelArgFilter);
return true;
});
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
&kServiceConfigChannelArgFilter)
.ExcludeFromMinimalStack()
.IfHasChannelArg(GRPC_ARG_SERVICE_CONFIG)
.Before({&ClientMessageSizeFilter::kFilter});
}
} // namespace grpc_core

@ -66,6 +66,7 @@
#include "src/core/lib/surface/init_internally.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/transport_impl.h"
// Backoff parameters.
#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
@ -768,10 +769,11 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
bool Subchannel::PublishTransportLocked() {
// Construct channel stack.
ChannelStackBuilderImpl builder("subchannel", GRPC_CLIENT_SUBCHANNEL,
connecting_result_.channel_args);
// Builder takes ownership of transport.
builder.SetTransport(std::exchange(connecting_result_.transport, nullptr));
ChannelStackBuilderImpl builder(
"subchannel", GRPC_CLIENT_SUBCHANNEL,
connecting_result_.channel_args.SetObject(
std::exchange(connecting_result_.transport, nullptr)));
if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) {
return false;
}

@ -30,8 +30,6 @@
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/status_helper.h"
@ -41,7 +39,6 @@
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/metadata_batch.h"
@ -384,27 +381,15 @@ const grpc_channel_filter grpc_server_deadline_filter = {
"deadline",
};
bool grpc_deadline_checking_enabled(
const grpc_core::ChannelArgs& channel_args) {
return channel_args.GetBool(GRPC_ARG_ENABLE_DEADLINE_CHECKS)
.value_or(!channel_args.WantMinimalStack());
}
namespace grpc_core {
void RegisterDeadlineFilter(CoreConfiguration::Builder* builder) {
auto register_filter = [builder](grpc_channel_stack_type type,
const grpc_channel_filter* filter) {
builder->channel_init()->RegisterStage(
type, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[filter](ChannelStackBuilder* builder) {
auto args = builder->channel_args();
if (grpc_deadline_checking_enabled(args)) {
builder->PrependFilter(filter);
}
return true;
});
};
register_filter(GRPC_CLIENT_DIRECT_CHANNEL, &grpc_client_deadline_filter);
register_filter(GRPC_SERVER_CHANNEL, &grpc_server_deadline_filter);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL, &grpc_client_deadline_filter)
.ExcludeFromMinimalStack()
.IfChannelArg(GRPC_ARG_ENABLE_DEADLINE_CHECKS, true);
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &grpc_server_deadline_filter)
.ExcludeFromMinimalStack()
.IfChannelArg(GRPC_ARG_ENABLE_DEADLINE_CHECKS, true);
}
} // namespace grpc_core

@ -19,7 +19,6 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/gprpp/time.h"
@ -77,9 +76,6 @@ void grpc_deadline_state_reset(grpc_deadline_state* deadline_state,
void grpc_deadline_state_client_start_transport_stream_op_batch(
grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op);
// Should deadline checking be performed (according to channel args)
bool grpc_deadline_checking_enabled(const grpc_core::ChannelArgs& args);
// Deadline filters for direct client channels and server channels.
// Note: Deadlines for non-direct client channels are handled by the
// client_channel filter.

@ -20,8 +20,6 @@
#include "src/core/ext/filters/http/client_authority_filter.h"
#include <limits.h>
#include <functional>
#include <memory>
@ -32,8 +30,8 @@
#include <grpc/impl/channel_arg_names.h>
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/security/transport/auth_filters.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/metadata_batch.h"
@ -69,22 +67,22 @@ const grpc_channel_filter ClientAuthorityFilter::kFilter =
"authority");
namespace {
bool add_client_authority_filter(ChannelStackBuilder* builder) {
if (builder->channel_args()
.GetBool(GRPC_ARG_DISABLE_CLIENT_AUTHORITY_FILTER)
.value_or(false)) {
return true;
}
builder->PrependFilter(&ClientAuthorityFilter::kFilter);
return true;
bool NeedsClientAuthorityFilter(const ChannelArgs& args) {
return !args.GetBool(GRPC_ARG_DISABLE_CLIENT_AUTHORITY_FILTER)
.value_or(false);
}
} // namespace
void RegisterClientAuthorityFilter(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(GRPC_CLIENT_SUBCHANNEL, INT_MAX,
add_client_authority_filter);
builder->channel_init()->RegisterStage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX,
add_client_authority_filter);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &ClientAuthorityFilter::kFilter)
.If(NeedsClientAuthorityFilter)
.Before({&ClientAuthFilter::kFilter});
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
&ClientAuthorityFilter::kFilter)
.If(NeedsClientAuthorityFilter)
.Before({&ClientAuthFilter::kFilter});
}
} // namespace grpc_core

@ -23,48 +23,47 @@
#include "src/core/ext/filters/http/client/http_client_filter.h"
#include "src/core/ext/filters/http/message_compress/compression_filter.h"
#include "src/core/ext/filters/http/server/http_server_filter.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/ext/filters/message_size/message_size_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/transport_fwd.h"
#include "src/core/lib/transport/transport_impl.h"
static bool is_building_http_like_transport(
grpc_core::ChannelStackBuilder* builder) {
grpc_transport* t = builder->transport();
namespace grpc_core {
namespace {
bool IsBuildingHttpLikeTransport(const ChannelArgs& args) {
grpc_transport* t = args.GetObject<grpc_transport>();
return t != nullptr && strstr(t->vtable->name, "http");
}
} // namespace
namespace grpc_core {
void RegisterHttpFilters(CoreConfiguration::Builder* builder) {
auto compression = [builder](grpc_channel_stack_type channel_type,
const grpc_channel_filter* filter) {
builder->channel_init()->RegisterStage(
channel_type, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[filter](ChannelStackBuilder* builder) {
if (!is_building_http_like_transport(builder)) return true;
builder->PrependFilter(filter);
return true;
});
};
auto http = [builder](grpc_channel_stack_type channel_type,
const grpc_channel_filter* filter) {
builder->channel_init()->RegisterStage(
channel_type, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[filter](ChannelStackBuilder* builder) {
if (is_building_http_like_transport(builder)) {
builder->PrependFilter(filter);
}
return true;
});
};
compression(GRPC_CLIENT_SUBCHANNEL, &ClientCompressionFilter::kFilter);
compression(GRPC_CLIENT_DIRECT_CHANNEL, &ClientCompressionFilter::kFilter);
compression(GRPC_SERVER_CHANNEL, &ServerCompressionFilter::kFilter);
http(GRPC_CLIENT_SUBCHANNEL, &HttpClientFilter::kFilter);
http(GRPC_CLIENT_DIRECT_CHANNEL, &HttpClientFilter::kFilter);
http(GRPC_SERVER_CHANNEL, &HttpServerFilter::kFilter);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&ClientCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
&ClientCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &ServerCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpServerFilter::kFilter, &ServerMessageSizeFilter::kFilter});
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &HttpClientFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&ClientMessageSizeFilter::kFilter});
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL, &HttpClientFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&ClientMessageSizeFilter::kFilter});
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &HttpServerFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&ServerMessageSizeFilter::kFilter});
}
} // namespace grpc_core

@ -20,7 +20,6 @@
#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
#include <limits.h>
#include <stdint.h>
#include <stdlib.h>
@ -51,7 +50,6 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
@ -257,10 +255,6 @@ ArenaPromise<ServerMetadataHandle> ServerLoadReportingFilter::MakeCallPromise(
}
namespace {
bool MaybeAddServerLoadReportingFilter(const ChannelArgs& args) {
return args.GetBool(GRPC_ARG_ENABLE_LOAD_REPORTING).value_or(false);
}
const grpc_channel_filter kFilter =
MakePromiseBasedFilter<ServerLoadReportingFilter, FilterEndpoint::kServer>(
"server_load_reporting");
@ -281,13 +275,9 @@ struct ServerLoadReportingFilterStaticRegistrar {
grpc::load_reporter::MeasureEndBytesReceived();
grpc::load_reporter::MeasureEndLatencyMs();
grpc::load_reporter::MeasureOtherCallMetric();
builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, INT_MAX, [](ChannelStackBuilder* cs_builder) {
if (MaybeAddServerLoadReportingFilter(cs_builder->channel_args())) {
cs_builder->PrependFilter(&kFilter);
}
return true;
});
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &kFilter)
.IfChannelArg(GRPC_ARG_ENABLE_LOAD_REPORTING, false);
});
}
} server_load_reporting_filter_static_registrar;

@ -21,7 +21,6 @@
#include "src/core/ext/filters/logging/logging_filter.h"
#include <inttypes.h>
#include <limits.h>
#include <algorithm>
#include <cstddef>
@ -54,7 +53,6 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
@ -73,6 +71,7 @@
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/uri/uri_parser.h"
#include "src/cpp/ext/filters/census/client_filter.h"
namespace grpc_core {
@ -553,28 +552,15 @@ const grpc_channel_filter ServerLoggingFilter::kFilter =
void RegisterLoggingFilter(LoggingSink* sink) {
g_logging_sink = sink;
CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, INT_MAX, [](ChannelStackBuilder* builder) {
// TODO(yashykt) : Figure out a good place to place this channel
// arg
if (builder->channel_args()
.GetInt("grpc.experimental.enable_observability")
.value_or(true)) {
builder->PrependFilter(&ServerLoggingFilter::kFilter);
}
return true;
});
builder->channel_init()->RegisterStage(
GRPC_CLIENT_CHANNEL, INT_MAX, [](ChannelStackBuilder* builder) {
// TODO(yashykt) : Figure out a good place to place this channel
// arg
if (builder->channel_args()
.GetInt("grpc.experimental.enable_observability")
.value_or(true)) {
builder->PrependFilter(&ClientLoggingFilter::kFilter);
}
return true;
});
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &ServerLoggingFilter::kFilter)
// TODO(yashykt) : Figure out a good place to place this channel arg
.IfChannelArg("grpc.experimental.enable_observability", true);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_CHANNEL, &ClientLoggingFilter::kFilter)
// TODO(yashykt) : Figure out a good place to place this channel arg
.IfChannelArg("grpc.experimental.enable_observability", true)
.After({&grpc::internal::OpenCensusClientFilter::kFilter});
});
}

@ -30,9 +30,9 @@
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/deadline/deadline_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/promise/activity.h"
@ -44,7 +44,6 @@
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/surface/call_trace.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
@ -245,45 +244,39 @@ ArenaPromise<ServerMetadataHandle> ServerMessageSizeFilter::MakeCallPromise(
}
namespace {
// Used for GRPC_CLIENT_SUBCHANNEL
bool MaybeAddMessageSizeFilterToSubchannel(ChannelStackBuilder* builder) {
if (builder->channel_args().WantMinimalStack()) {
return true;
}
builder->PrependFilter(&ClientMessageSizeFilter::kFilter);
return true;
}
// Used for GRPC_CLIENT_DIRECT_CHANNEL and GRPC_SERVER_CHANNEL. Adds the
// filter only if message size limits or service config is specified.
auto MaybeAddMessageSizeFilter(const grpc_channel_filter* filter) {
return [filter](ChannelStackBuilder* builder) {
auto channel_args = builder->channel_args();
if (channel_args.WantMinimalStack()) {
return true;
}
MessageSizeParsedConfig limits =
MessageSizeParsedConfig::GetFromChannelArgs(channel_args);
const bool enable =
limits.max_send_size().has_value() ||
limits.max_recv_size().has_value() ||
channel_args.GetString(GRPC_ARG_SERVICE_CONFIG).has_value();
if (enable) builder->PrependFilter(filter);
return true;
};
bool HasMessageSizeLimits(const ChannelArgs& channel_args) {
MessageSizeParsedConfig limits =
MessageSizeParsedConfig::GetFromChannelArgs(channel_args);
return limits.max_send_size().has_value() ||
limits.max_recv_size().has_value() ||
channel_args.GetString(GRPC_ARG_SERVICE_CONFIG).has_value();
}
} // namespace
void RegisterMessageSizeFilter(CoreConfiguration::Builder* builder) {
MessageSizeParser::Register(builder);
builder->channel_init()->RegisterStage(GRPC_CLIENT_SUBCHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
MaybeAddMessageSizeFilterToSubchannel);
builder->channel_init()->RegisterStage(
GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
MaybeAddMessageSizeFilter(&ClientMessageSizeFilter::kFilter));
builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
MaybeAddMessageSizeFilter(&ServerMessageSizeFilter::kFilter));
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&ClientMessageSizeFilter::kFilter)
.ExcludeFromMinimalStack();
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
&ClientMessageSizeFilter::kFilter)
.ExcludeFromMinimalStack()
.If(HasMessageSizeLimits)
// TODO(ctiller): ordering constraint is here to match the ordering that
// existed prior to ordering constraints did. Re-examine the ordering of
// filters from first principles.
.Before({&grpc_client_deadline_filter});
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &ServerMessageSizeFilter::kFilter)
.ExcludeFromMinimalStack()
.If(HasMessageSizeLimits)
// TODO(ctiller): ordering constraint is here to match the ordering that
// existed prior to ordering constraints did. Re-examine the ordering of
// filters from first principles.
.Before({&grpc_server_deadline_filter});
}
} // namespace grpc_core

@ -20,15 +20,15 @@
#include "src/core/ext/xds/xds_channel_stack_modifier.h"
#include <limits.h>
#include <string.h>
#include <algorithm>
#include <initializer_list>
#include <string>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
namespace grpc_core {
@ -57,31 +57,19 @@ const char* kXdsChannelStackModifierChannelArgName =
} // namespace
bool XdsChannelStackModifier::ModifyChannelStack(ChannelStackBuilder* builder) {
// Insert the filters after the census filter if present.
auto it = builder->mutable_stack()->begin();
while (it != builder->mutable_stack()->end()) {
const char* filter_name_at_it = (*it)->name;
if (strcmp("census_server", filter_name_at_it) == 0) {
break;
void XdsChannelStackModifier::ModifyChannelStack(ChannelStackBuilder& builder) {
// Insert the filters after predicate filters if present.
auto insert_before = builder.mutable_stack()->end();
for (auto it = builder.mutable_stack()->begin();
it != builder.mutable_stack()->end(); ++it) {
for (absl::string_view predicate_name : {"server", "census_server"}) {
if (predicate_name == (*it)->name) insert_before = it + 1;
}
++it;
}
if (it == builder->mutable_stack()->end()) {
// No census filter found. Reset iterator to the beginning. This will result
// in prepending the list of xDS HTTP filters to the current stack. Note
// that this stage is run before the stage that adds the top server filter,
// resulting in these filters being finally placed after the `server`
// filter.
it = builder->mutable_stack()->begin();
} else {
++it;
}
for (const grpc_channel_filter* filter : filters_) {
it = builder->mutable_stack()->insert(it, filter);
++it;
insert_before = builder.mutable_stack()->insert(insert_before, filter);
++insert_before;
}
return true;
}
grpc_arg XdsChannelStackModifier::MakeChannelArg() const {
@ -104,14 +92,15 @@ XdsChannelStackModifier::GetFromChannelArgs(const grpc_channel_args& args) {
}
void RegisterXdsChannelStackModifier(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, INT_MAX, [](ChannelStackBuilder* builder) {
builder->channel_init()->RegisterPostProcessor(
GRPC_SERVER_CHANNEL,
ChannelInit::PostProcessorSlot::kXdsChannelStackModifier,
[](ChannelStackBuilder& builder) {
auto channel_stack_modifier =
builder->channel_args().GetObjectRef<XdsChannelStackModifier>();
builder.channel_args().GetObjectRef<XdsChannelStackModifier>();
if (channel_stack_modifier != nullptr) {
return channel_stack_modifier->ModifyChannelStack(builder);
}
return true;
});
}

@ -37,16 +37,14 @@
namespace grpc_core {
// XdsChannelStackModifier allows for inserting xDS HTTP filters into the
// channel stack. It is registered to mutate the
// `ChannelStackBuilder` object via
// ChannelInit::Builder::RegisterStage.
// channel stack. It is registered to mutate the `ChannelStackBuilder` object
// via ChannelInit::Builder::RegisterPostProcessor.
class XdsChannelStackModifier : public RefCounted<XdsChannelStackModifier> {
public:
explicit XdsChannelStackModifier(
std::vector<const grpc_channel_filter*> filters)
: filters_(std::move(filters)) {}
// Returns true on success, false otherwise.
bool ModifyChannelStack(ChannelStackBuilder* builder);
void ModifyChannelStack(ChannelStackBuilder& builder);
grpc_arg MakeChannelArg() const;
static RefCountedPtr<XdsChannelStackModifier> GetFromChannelArgs(
const grpc_channel_args& args);

@ -28,12 +28,20 @@
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack_trace.h"
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/surface/channel_init.h"
using grpc_event_engine::experimental::EventEngine;
grpc_core::TraceFlag grpc_trace_channel(false, "channel");
grpc_core::TraceFlag grpc_trace_channel_stack(false, "channel_stack");
static int register_get_name_fn = []() {
grpc_core::NameFromChannelFilter = [](const grpc_channel_filter* filter) {
return filter->name;
};
return 0;
}();
// Memory layouts.

@ -23,13 +23,10 @@
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/transport_fwd.h"
namespace grpc_core {
@ -53,16 +50,6 @@ class ChannelStackBuilder {
// Query the target.
absl::string_view target() const { return target_; }
// Set the transport.
ChannelStackBuilder& SetTransport(grpc_transport* transport) {
GPR_ASSERT(transport_ == nullptr);
transport_ = transport;
return *this;
}
// Query the transport.
grpc_transport* transport() const { return transport_; }
// Query the channel args.
const ChannelArgs& channel_args() const { return args_; }
@ -77,6 +64,10 @@ class ChannelStackBuilder {
// The type of channel stack being built.
grpc_channel_stack_type channel_stack_type() const { return type_; }
// TODO(ctiller): re-evaluate the need for AppendFilter, PrependFilter.
// Their usefulness is largely zero now that we have ordering constraints in
// channel init.
// Helper to add a filter to the front of the stack.
void PrependFilter(const grpc_channel_filter* filter);
@ -107,8 +98,6 @@ class ChannelStackBuilder {
const grpc_channel_stack_type type_;
// The target
std::string target_{unknown_target()};
// The transport
grpc_transport* transport_ = nullptr;
// Channel args
ChannelArgs args_;
// The in-progress stack

@ -27,19 +27,15 @@
#include "absl/status/status.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/surface/call_trace.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
@ -82,20 +78,6 @@ ChannelStackBuilderImpl::Build() {
auto* channel_stack =
static_cast<grpc_channel_stack*>(gpr_zalloc(channel_stack_size));
ChannelArgs final_args = channel_args();
if (transport() != nullptr) {
static const grpc_arg_pointer_vtable vtable = {
// copy
[](void* p) { return p; },
// destroy
[](void*) {},
// cmp
[](void* a, void* b) { return QsortCompare(a, b); },
};
final_args = final_args.Set(GRPC_ARG_TRANSPORT,
ChannelArgs::Pointer(transport(), &vtable));
}
// and initialize it
grpc_error_handle error = grpc_channel_stack_init(
1,
@ -104,7 +86,7 @@ ChannelStackBuilderImpl::Build() {
grpc_channel_stack_destroy(stk);
gpr_free(stk);
},
channel_stack, stack.data(), stack.size(), final_args, name(),
channel_stack, stack.data(), stack.size(), channel_args(), name(),
channel_stack);
if (!error.ok()) {

@ -0,0 +1,19 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_stack_trace.h"
grpc_core::TraceFlag grpc_trace_channel_stack(false, "channel_stack");

@ -0,0 +1,24 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_CHANNEL_CHANNEL_STACK_TRACE_H
#define GRPC_SRC_CORE_LIB_CHANNEL_CHANNEL_STACK_TRACE_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/debug/trace.h"
extern grpc_core::TraceFlag grpc_trace_channel_stack;
#endif // GRPC_SRC_CORE_LIB_CHANNEL_CHANNEL_STACK_TRACE_H

@ -41,6 +41,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/alloc.h"
@ -908,31 +909,54 @@ const grpc_channel_filter kServerEmulatedFilter =
MakeConnectedFilter<nullptr>();
#endif
bool TransportSupportsPromiseBasedCalls(const ChannelArgs& args) {
grpc_transport* transport =
args.GetPointer<grpc_transport>(GRPC_ARG_TRANSPORT);
return transport->vtable->make_call_promise != nullptr;
}
bool TransportDoesNotSupportPromiseBasedCalls(const ChannelArgs& args) {
return !TransportSupportsPromiseBasedCalls(args);
}
} // namespace
} // namespace grpc_core
bool grpc_add_connected_filter(grpc_core::ChannelStackBuilder* builder) {
grpc_transport* t = builder->transport();
GPR_ASSERT(t != nullptr);
// Choose the right vtable for the connected filter.
void RegisterConnectedChannel(CoreConfiguration::Builder* builder) {
// We can't know promise based call or not here (that decision needs the
// collaboration of all of the filters on the channel, and we don't want
// ordering constraints on when we add filters).
// We can know if this results in a promise based call how we'll create
// our promise (if indeed we can), and so that is the choice made here.
if (t->vtable->make_call_promise != nullptr) {
// Option 1, and our ideal: the transport supports promise based calls,
// and so we simply use the transport directly.
builder->AppendFilter(&grpc_core::kPromiseBasedTransportFilter);
} else if (grpc_channel_stack_type_is_client(builder->channel_stack_type())) {
// Option 2: the transport does not support promise based calls, but
// we're on the client and so we have an implementation that we can use
// to convert to batches.
builder->AppendFilter(&grpc_core::kClientEmulatedFilter);
} else {
// Option 3: the transport does not support promise based calls, and
// we're on the server so we use the server filter.
builder->AppendFilter(&grpc_core::kServerEmulatedFilter);
}
return true;
// Option 1, and our ideal: the transport supports promise based calls,
// and so we simply use the transport directly.
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &kPromiseBasedTransportFilter)
.Terminal()
.If(TransportSupportsPromiseBasedCalls);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
&kPromiseBasedTransportFilter)
.Terminal()
.If(TransportSupportsPromiseBasedCalls);
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &kPromiseBasedTransportFilter)
.Terminal()
.If(TransportSupportsPromiseBasedCalls);
// Option 2: the transport does not support promise based calls.
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &kClientEmulatedFilter)
.Terminal()
.If(TransportDoesNotSupportPromiseBasedCalls);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL, &kClientEmulatedFilter)
.Terminal()
.If(TransportDoesNotSupportPromiseBasedCalls);
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &kServerEmulatedFilter)
.Terminal()
.If(TransportDoesNotSupportPromiseBasedCalls);
}
} // namespace grpc_core

@ -27,7 +27,6 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
@ -36,7 +35,6 @@
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/transport.h"
@ -98,12 +96,8 @@ ArenaPromise<ServerMetadataHandle> ServerCallTracerFilter::MakeCallPromise(
} // namespace
void RegisterServerCallTracerFilter(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](ChannelStackBuilder* builder) {
builder->AppendFilter(&ServerCallTracerFilter::kFilter);
return true;
});
builder->channel_init()->RegisterFilter(GRPC_SERVER_CHANNEL,
&ServerCallTracerFilter::kFilter);
}
} // namespace grpc_core

@ -16,13 +16,8 @@
#include "src/core/lib/surface/builtins.h"
#include <limits.h>
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/surface/lame_client.h"
#include "src/core/lib/surface/server.h"
@ -31,26 +26,12 @@ namespace grpc_core {
void RegisterBuiltins(CoreConfiguration::Builder* builder) {
RegisterServerCallTracerFilter(builder);
builder->channel_init()->RegisterStage(GRPC_CLIENT_SUBCHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_add_connected_filter);
builder->channel_init()->RegisterStage(GRPC_CLIENT_DIRECT_CHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_add_connected_filter);
builder->channel_init()->RegisterStage(GRPC_SERVER_CHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_add_connected_filter);
builder->channel_init()->RegisterStage(
GRPC_CLIENT_LAME_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](ChannelStackBuilder* builder) {
builder->AppendFilter(&LameClientFilter::kFilter);
return true;
});
builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, INT_MAX, [](ChannelStackBuilder* builder) {
builder->PrependFilter(&Server::kServerTopFilter);
return true;
});
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_LAME_CHANNEL, &LameClientFilter::kFilter)
.Terminal();
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &Server::kServerTopFilter)
.BeforeAll();
}
} // namespace grpc_core

@ -59,6 +59,7 @@
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/surface/init_internally.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/transport/transport_impl.h"
// IWYU pragma: no_include <type_traits>
@ -223,8 +224,8 @@ absl::StatusOr<RefCountedPtr<Channel>> Channel::Create(
}
ChannelStackBuilderImpl builder(
grpc_channel_stack_type_string(channel_stack_type), channel_stack_type,
args);
builder.SetTarget(target).SetTransport(optional_transport);
args.SetObject(optional_transport));
builder.SetTarget(target);
if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) {
return nullptr;
}

@ -20,34 +20,387 @@
#include "src/core/lib/surface/channel_init.h"
#include <string.h>
#include <algorithm>
#include <map>
#include <set>
#include <string>
#include <type_traits>
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_stack_trace.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/surface/channel_stack_type.h"
namespace grpc_core {
void ChannelInit::Builder::RegisterStage(grpc_channel_stack_type type,
int priority, Stage stage) {
slots_[type].emplace_back(std::move(stage), priority);
const char* (*NameFromChannelFilter)(const grpc_channel_filter*);
namespace {
struct CompareChannelFiltersByName {
bool operator()(const grpc_channel_filter* a,
const grpc_channel_filter* b) const {
return strcmp(NameFromChannelFilter(a), NameFromChannelFilter(b)) < 0;
}
};
} // namespace
ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::After(
std::initializer_list<const grpc_channel_filter*> filters) {
for (auto filter : filters) {
after_.push_back(filter);
}
return *this;
}
ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::Before(
std::initializer_list<const grpc_channel_filter*> filters) {
for (auto filter : filters) {
before_.push_back(filter);
}
return *this;
}
ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::If(
InclusionPredicate predicate) {
predicates_.emplace_back(std::move(predicate));
return *this;
}
ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::IfNot(
InclusionPredicate predicate) {
predicates_.emplace_back(
[predicate = std::move(predicate)](const ChannelArgs& args) {
return !predicate(args);
});
return *this;
}
ChannelInit::FilterRegistration&
ChannelInit::FilterRegistration::IfHasChannelArg(const char* arg) {
return If([arg](const ChannelArgs& args) { return args.Contains(arg); });
}
ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::IfChannelArg(
const char* arg, bool default_value) {
return If([arg, default_value](const ChannelArgs& args) {
return args.GetBool(arg).value_or(default_value);
});
}
ChannelInit::FilterRegistration&
ChannelInit::FilterRegistration::ExcludeFromMinimalStack() {
return If([](const ChannelArgs& args) { return !args.WantMinimalStack(); });
}
ChannelInit::FilterRegistration& ChannelInit::Builder::RegisterFilter(
grpc_channel_stack_type type, const grpc_channel_filter* filter,
SourceLocation registration_source) {
filters_[type].emplace_back(
std::make_unique<FilterRegistration>(filter, registration_source));
return *filters_[type].back();
}
ChannelInit::StackConfig ChannelInit::BuildStackConfig(
const std::vector<std::unique_ptr<ChannelInit::FilterRegistration>>&
registrations,
PostProcessor* post_processors, grpc_channel_stack_type type) {
// Phase 1: Build a map from filter to the set of filters that must be
// initialized before it.
// We order this map (and the set of dependent filters) by filter name to
// ensure algorithm ordering stability is deterministic for a given build.
// We should not require this, but at the time of writing it's expected that
// this will help overall stability.
using F = const grpc_channel_filter*;
std::map<F, FilterRegistration*> filter_to_registration;
using DependencyMap = std::map<F, std::set<F, CompareChannelFiltersByName>,
CompareChannelFiltersByName>;
DependencyMap dependencies;
std::vector<Filter> terminal_filters;
for (const auto& registration : registrations) {
if (filter_to_registration.count(registration->filter_) > 0) {
const auto first =
filter_to_registration[registration->filter_]->registration_source_;
const auto second = registration->registration_source_;
Crash(absl::StrCat("Duplicate registration of channel filter ",
NameFromChannelFilter(registration->filter_),
"\nfirst: ", first.file(), ":", first.line(),
"\nsecond: ", second.file(), ":", second.line()));
}
filter_to_registration[registration->filter_] = registration.get();
if (registration->terminal_) {
GPR_ASSERT(registration->after_.empty());
GPR_ASSERT(registration->before_.empty());
GPR_ASSERT(!registration->before_all_);
terminal_filters.emplace_back(registration->filter_,
std::move(registration->predicates_),
registration->registration_source_);
} else {
dependencies[registration->filter_]; // Ensure it's in the map.
}
}
for (const auto& registration : registrations) {
if (registration->terminal_) continue;
GPR_ASSERT(filter_to_registration.count(registration->filter_) > 0);
for (F after : registration->after_) {
if (filter_to_registration.count(after) == 0) {
gpr_log(
GPR_DEBUG, "%s",
absl::StrCat(
"Filter ", NameFromChannelFilter(after),
" not registered, but is referenced in the after clause of ",
NameFromChannelFilter(registration->filter_),
" when building channel stack ",
grpc_channel_stack_type_string(type))
.c_str());
continue;
}
dependencies[registration->filter_].insert(after);
}
for (F before : registration->before_) {
if (filter_to_registration.count(before) == 0) {
gpr_log(
GPR_DEBUG, "%s",
absl::StrCat(
"Filter ", NameFromChannelFilter(before),
" not registered, but is referenced in the before clause of ",
NameFromChannelFilter(registration->filter_),
" when building channel stack ",
grpc_channel_stack_type_string(type))
.c_str());
continue;
}
dependencies[before].insert(registration->filter_);
}
if (registration->before_all_) {
for (const auto& other : registrations) {
if (other.get() == registration.get()) continue;
if (other->terminal_) continue;
dependencies[other->filter_].insert(registration->filter_);
}
}
}
// Phase 2: Build a list of filters in dependency order.
// We can simply iterate through and add anything with no dependency.
// We then remove that filter from the dependency list of all other filters.
// We repeat until we have no more filters to add.
auto build_remaining_dependency_graph =
[](const DependencyMap& dependencies) {
std::string result;
for (const auto& p : dependencies) {
absl::StrAppend(&result, NameFromChannelFilter(p.first), " ->");
for (const auto& d : p.second) {
absl::StrAppend(&result, " ", NameFromChannelFilter(d));
}
absl::StrAppend(&result, "\n");
}
return result;
};
const DependencyMap original = dependencies;
auto take_ready_dependency = [&]() {
for (auto it = dependencies.begin(); it != dependencies.end(); ++it) {
if (it->second.empty()) {
auto r = it->first;
dependencies.erase(it);
return r;
}
}
Crash(absl::StrCat(
"Unresolvable graph of channel filters - remaining graph:\n",
build_remaining_dependency_graph(dependencies), "original:\n",
build_remaining_dependency_graph(original)));
};
std::vector<Filter> filters;
while (!dependencies.empty()) {
auto filter = take_ready_dependency();
filters.emplace_back(filter,
std::move(filter_to_registration[filter]->predicates_),
filter_to_registration[filter]->registration_source_);
for (auto& p : dependencies) {
p.second.erase(filter);
}
}
// Collect post processors that need to be applied.
// We've already ensured the one-per-slot constraint, so now we can just
// collect everything up into a vector and run it in order.
std::vector<PostProcessor> post_processor_functions;
for (int i = 0; i < static_cast<int>(PostProcessorSlot::kCount); i++) {
if (post_processors[i] == nullptr) continue;
post_processor_functions.emplace_back(std::move(post_processors[i]));
}
// Log out the graph we built if that's been requested.
if (grpc_trace_channel_stack.enabled()) {
// It can happen that multiple threads attempt to construct a core config at
// once.
// This is benign - the first one wins and others are discarded.
// However, it messes up our logging and makes it harder to reason about the
// graph, so we add some protection here.
static Mutex* const m = new Mutex();
MutexLock lock(m);
// List the channel stack type (since we'll be repeatedly printing graphs in
// this loop).
gpr_log(GPR_INFO,
"ORDERED CHANNEL STACK %s:", grpc_channel_stack_type_string(type));
// First build up a map of filter -> file:line: strings, because it helps
// the readability of this log to get later fields aligned vertically.
std::map<const grpc_channel_filter*, std::string> loc_strs;
size_t max_loc_str_len = 0;
size_t max_filter_name_len = 0;
auto add_loc_str = [&max_loc_str_len, &loc_strs, &filter_to_registration,
&max_filter_name_len](
const grpc_channel_filter* filter) {
max_filter_name_len =
std::max(strlen(NameFromChannelFilter(filter)), max_filter_name_len);
const auto registration =
filter_to_registration[filter]->registration_source_;
absl::string_view file = registration.file();
auto slash_pos = file.rfind('/');
if (slash_pos != file.npos) {
file = file.substr(slash_pos + 1);
}
auto loc_str = absl::StrCat(file, ":", registration.line(), ":");
max_loc_str_len = std::max(max_loc_str_len, loc_str.length());
loc_strs.emplace(filter, std::move(loc_str));
};
for (const auto& filter : filters) {
add_loc_str(filter.filter);
}
for (const auto& terminal : terminal_filters) {
add_loc_str(terminal.filter);
}
for (auto& loc_str : loc_strs) {
loc_str.second = absl::StrCat(
loc_str.second,
std::string(max_loc_str_len + 2 - loc_str.second.length(), ' '));
}
// For each regular filter, print the location registered, the name of the
// filter, and if it needed to occur after some other filters list those
// filters too.
// Note that we use the processed after list here - earlier we turned Before
// registrations into After registrations and we used those converted
// registrations to build the final ordering.
// If you're trying to track down why 'A' is listed as after 'B', look at
// the following:
// - If A is registered with .After({B}), then A will be 'after' B here.
// - If B is registered with .Before({A}), then A will be 'after' B here.
// - If B is registered as BeforeAll, then A will be 'after' B here.
for (const auto& filter : filters) {
auto dep_it = original.find(filter.filter);
std::string after_str;
if (dep_it != original.end() && !dep_it->second.empty()) {
after_str = absl::StrCat(
std::string(max_filter_name_len + 1 -
strlen(NameFromChannelFilter(filter.filter)),
' '),
"after ",
absl::StrJoin(
dep_it->second, ", ",
[](std::string* out, const grpc_channel_filter* filter) {
out->append(NameFromChannelFilter(filter));
}));
}
const auto filter_str =
absl::StrCat(" ", loc_strs[filter.filter],
NameFromChannelFilter(filter.filter), after_str);
gpr_log(GPR_INFO, "%s", filter_str.c_str());
}
// Finally list out the terminal filters and where they were registered
// from.
for (const auto& terminal : terminal_filters) {
const auto filter_str = absl::StrCat(
" ", loc_strs[terminal.filter],
NameFromChannelFilter(terminal.filter),
std::string(max_filter_name_len + 1 -
strlen(NameFromChannelFilter(terminal.filter)),
' '),
"[terminal]");
gpr_log(GPR_INFO, "%s", filter_str.c_str());
}
}
// Check if there are no terminal filters: this would be an error.
// GRPC_CLIENT_DYNAMIC stacks don't use this mechanism, so we don't check that
// condition here.
// Right now we only log: many tests end up with a core configuration that
// is invalid.
// TODO(ctiller): evaluate if we can turn this into a crash one day.
// Right now it forces too many tests to know about channel initialization,
// either by supplying a valid configuration or by including an opt-out flag.
if (terminal_filters.empty() && type != GRPC_CLIENT_DYNAMIC) {
gpr_log(
GPR_ERROR,
"No terminal filters registered for channel stack type %s; this is "
"common for unit tests messing with CoreConfiguration, but will result "
"in a ChannelInit::CreateStack that never completes successfully.",
grpc_channel_stack_type_string(type));
}
return StackConfig{std::move(filters), std::move(terminal_filters),
std::move(post_processor_functions)};
};
ChannelInit ChannelInit::Builder::Build() {
ChannelInit result;
for (int i = 0; i < GRPC_NUM_CHANNEL_STACK_TYPES; i++) {
auto& slots = slots_[i];
std::stable_sort(
slots.begin(), slots.end(),
[](const Slot& a, const Slot& b) { return a.priority < b.priority; });
auto& result_slots = result.slots_[i];
result_slots.reserve(slots.size());
for (auto& slot : slots) {
result_slots.emplace_back(std::move(slot.stage));
}
result.stack_configs_[i] =
BuildStackConfig(filters_[i], post_processors_[i],
static_cast<grpc_channel_stack_type>(i));
}
return result;
}
bool ChannelInit::Filter::CheckPredicates(const ChannelArgs& args) const {
for (const auto& predicate : predicates) {
if (!predicate(args)) return false;
}
return true;
}
bool ChannelInit::CreateStack(ChannelStackBuilder* builder) const {
for (const auto& stage : slots_[builder->channel_stack_type()]) {
if (!stage(builder)) return false;
const auto& stack_config = stack_configs_[builder->channel_stack_type()];
for (const auto& filter : stack_config.filters) {
if (!filter.CheckPredicates(builder->channel_args())) continue;
builder->AppendFilter(filter.filter);
}
int found_terminators = 0;
for (const auto& terminator : stack_config.terminators) {
if (!terminator.CheckPredicates(builder->channel_args())) continue;
builder->AppendFilter(terminator.filter);
++found_terminators;
}
if (found_terminators != 1) {
std::string error = absl::StrCat(
found_terminators,
" terminating filters found creating a channel of type ",
grpc_channel_stack_type_string(builder->channel_stack_type()),
" with arguments ", builder->channel_args().ToString(),
" (we insist upon one and only one terminating "
"filter)\n");
if (stack_config.terminators.empty()) {
absl::StrAppend(&error, " No terminal filters were registered");
} else {
for (const auto& terminator : stack_config.terminators) {
absl::StrAppend(
&error, " ", NameFromChannelFilter(terminator.filter),
" registered @ ", terminator.registration_source.file(), ":",
terminator.registration_source.line(), ": enabled = ",
terminator.CheckPredicates(builder->channel_args()) ? "true"
: "false",
"\n");
}
}
gpr_log(GPR_ERROR, "%s", error.c_str());
return false;
}
for (const auto& post_processor : stack_config.post_processors) {
post_processor(*builder);
}
return true;
}

@ -21,16 +21,23 @@
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <initializer_list>
#include <memory>
#include <utility>
#include <vector>
#include "absl/functional/any_invocable.h"
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/surface/channel_stack_type.h"
#define GRPC_CHANNEL_INIT_BUILTIN_PRIORITY 10000
/// This module provides a way for plugins (and the grpc core library itself)
/// to register mutators for channel stacks.
/// It also provides a universal entry path to run those mutators to build
@ -38,46 +45,161 @@
namespace grpc_core {
// HACK HACK HACK
// Right now grpc_channel_filter has a bunch of dependencies high in the stack,
// but this code needs to live as a dependency of CoreConfiguration so we need
// to be careful to ensure no dependency loops.
//
// We absolutely must be able to get the name from a filter - for stability and
// for debuggability.
//
// So we export this function, and have it filled in by the higher level code at
// static initialization time.
//
// TODO(ctiller): remove this. When we define a FilterFactory type, that type
// can be specified with the right constraints to be depended upon by this code,
// and that type can export a `string_view Name()` method.
extern const char* (*NameFromChannelFilter)(const grpc_channel_filter*);
class ChannelInit {
public:
/// One stage of mutation: call functions against \a builder to influence the
/// finally constructed channel stack
using Stage = absl::AnyInvocable<bool(ChannelStackBuilder* builder) const>;
// Predicate for if a filter registration applies
using InclusionPredicate = absl::AnyInvocable<bool(const ChannelArgs&) const>;
// Post processor for the channel stack - applied in PostProcessorSlot order
using PostProcessor = absl::AnyInvocable<void(ChannelStackBuilder&) const>;
// Post processing slots - up to one PostProcessor per slot can be registered
// They run after filters registered are added to the channel stack builder,
// but before Build is called - allowing ad-hoc mutation to the channel stack.
enum class PostProcessorSlot : uint8_t {
kAuthSubstitution,
kXdsChannelStackModifier,
kCount
};
class FilterRegistration {
public:
explicit FilterRegistration(const grpc_channel_filter* filter,
SourceLocation registration_source)
: filter_(filter), registration_source_(registration_source) {}
FilterRegistration(const FilterRegistration&) = delete;
FilterRegistration& operator=(const FilterRegistration&) = delete;
// Ensure that this filter is placed *after* the filters listed here.
// By Build() time all filters listed here must also be registered against
// the same channel stack type as this registration.
FilterRegistration& After(
std::initializer_list<const grpc_channel_filter*> filters);
// Ensure that this filter is placed *before* the filters listed here.
// By Build() time all filters listed here must also be registered against
// the same channel stack type as this registration.
FilterRegistration& Before(
std::initializer_list<const grpc_channel_filter*> filters);
// Add a predicate for this filters inclusion.
// If the predicate returns true the filter will be included in the stack.
// Predicates do not affect the ordering of the filter stack: we first
// topologically sort (once, globally) and only later apply predicates
// per-channel creation.
// Multiple predicates can be added to each registration.
FilterRegistration& If(InclusionPredicate predicate);
FilterRegistration& IfNot(InclusionPredicate predicate);
// Add a predicate that only includes this filter if a channel arg is
// present.
FilterRegistration& IfHasChannelArg(const char* arg);
// Add a predicate that only includes this filter if a boolean channel arg
// is true (with default_value being used if the argument is not present).
FilterRegistration& IfChannelArg(const char* arg, bool default_value);
// Mark this filter as being terminal.
// Exactly one terminal filter will be added at the end of each filter
// stack.
// If multiple are defined they are tried in registration order, and the
// first terminal filter whos predicates succeed is selected.
FilterRegistration& Terminal() {
terminal_ = true;
return *this;
}
// Ensure this filter appears at the top of the stack.
// Effectively adds a 'Before' constraint on every other filter.
// Adding this to more than one filter will cause a loop.
FilterRegistration& BeforeAll() {
before_all_ = true;
return *this;
}
// Add a predicate that ensures this filter does not appear in the minimal
// stack.
FilterRegistration& ExcludeFromMinimalStack();
private:
friend class ChannelInit;
const grpc_channel_filter* const filter_;
std::vector<const grpc_channel_filter*> after_;
std::vector<const grpc_channel_filter*> before_;
std::vector<InclusionPredicate> predicates_;
bool terminal_ = false;
bool before_all_ = false;
SourceLocation registration_source_;
};
class Builder {
public:
/// Register one stage of mutators.
/// Stages are run in priority order (lowest to highest), and then in
/// registration order (in the case of a tie).
/// Stages are registered against one of the pre-determined channel stack
/// types.
/// If the channel stack type is GRPC_CLIENT_SUBCHANNEL, the caller should
/// ensure that subchannels with different filter lists will always have
/// different channel args. This requires setting a channel arg in case the
/// registration function relies on some condition other than channel args
/// to decide whether to add a filter or not.
void RegisterStage(grpc_channel_stack_type type, int priority, Stage stage);
/// Finalize registration. No more calls to grpc_channel_init_register_stage
/// are allowed.
// Register a builder in the normal filter registration pass.
// This occurs first during channel build time.
// The FilterRegistration methods can be called to declaratively define
// properties of the filter being registered.
FilterRegistration& RegisterFilter(grpc_channel_stack_type type,
const grpc_channel_filter* filter,
SourceLocation registration_source = {});
// Register a post processor for the builder.
// These run after the main graph has been placed into the builder.
// At most one filter per slot per channel stack type can be added.
// If at all possible, prefer to use the RegisterFilter() mechanism to add
// filters to the system - this should be a last resort escape hatch.
void RegisterPostProcessor(grpc_channel_stack_type type,
PostProcessorSlot slot,
PostProcessor post_processor) {
auto& slot_value = post_processors_[type][static_cast<int>(slot)];
GPR_ASSERT(slot_value == nullptr);
slot_value = std::move(post_processor);
}
/// Finalize registration.
ChannelInit Build();
private:
struct Slot {
Slot(Stage stage, int priority)
: stage(std::move(stage)), priority(priority) {}
Stage stage;
int priority;
};
std::vector<Slot> slots_[GRPC_NUM_CHANNEL_STACK_TYPES];
std::vector<std::unique_ptr<FilterRegistration>>
filters_[GRPC_NUM_CHANNEL_STACK_TYPES];
PostProcessor post_processors_[GRPC_NUM_CHANNEL_STACK_TYPES]
[static_cast<int>(PostProcessorSlot::kCount)];
};
/// Construct a channel stack of some sort: see channel_stack.h for details
/// \a builder is the channel stack builder to build into.
GRPC_MUST_USE_RESULT
bool CreateStack(ChannelStackBuilder* builder) const;
private:
std::vector<Stage> slots_[GRPC_NUM_CHANNEL_STACK_TYPES];
struct Filter {
Filter(const grpc_channel_filter* filter,
std::vector<InclusionPredicate> predicates,
SourceLocation registration_source)
: filter(filter),
predicates(std::move(predicates)),
registration_source(registration_source) {}
const grpc_channel_filter* filter;
std::vector<InclusionPredicate> predicates;
SourceLocation registration_source;
bool CheckPredicates(const ChannelArgs& args) const;
};
struct StackConfig {
std::vector<Filter> filters;
std::vector<Filter> terminators;
std::vector<PostProcessor> post_processors;
};
StackConfig stack_configs_[GRPC_NUM_CHANNEL_STACK_TYPES];
static StackConfig BuildStackConfig(
const std::vector<std::unique_ptr<FilterRegistration>>& registrations,
PostProcessor* post_processors, grpc_channel_stack_type type);
};
} // namespace grpc_core

@ -20,21 +20,16 @@
#include "src/core/lib/surface/init.h"
#include <limits.h>
#include "absl/base/thread_annotations.h"
#include <grpc/fork.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/impl/channel_arg_names.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/forkable.h"
@ -70,48 +65,22 @@ static int g_initializations ABSL_GUARDED_BY(g_init_mu) = []() {
static grpc_core::CondVar* g_shutting_down_cv;
static bool g_shutting_down ABSL_GUARDED_BY(g_init_mu) = false;
static bool maybe_prepend_client_auth_filter(
grpc_core::ChannelStackBuilder* builder) {
if (builder->channel_args().Contains(GRPC_ARG_SECURITY_CONNECTOR)) {
builder->PrependFilter(&grpc_core::ClientAuthFilter::kFilter);
}
return true;
}
static bool maybe_prepend_server_auth_filter(
grpc_core::ChannelStackBuilder* builder) {
if (builder->channel_args().Contains(GRPC_SERVER_CREDENTIALS_ARG)) {
builder->PrependFilter(&grpc_core::ServerAuthFilter::kFilter);
}
return true;
}
static bool maybe_prepend_grpc_server_authz_filter(
grpc_core::ChannelStackBuilder* builder) {
if (builder->channel_args().GetPointer<grpc_authorization_policy_provider>(
GRPC_ARG_AUTHORIZATION_POLICY_PROVIDER) != nullptr) {
builder->PrependFilter(&grpc_core::GrpcServerAuthzFilter::kFilterVtable);
}
return true;
}
namespace grpc_core {
void RegisterSecurityFilters(CoreConfiguration::Builder* builder) {
// Register the auth client with a priority < INT_MAX to allow the authority
// filter -on which the auth filter depends- to be higher on the channel
// stack.
builder->channel_init()->RegisterStage(GRPC_CLIENT_SUBCHANNEL, INT_MAX - 1,
maybe_prepend_client_auth_filter);
builder->channel_init()->RegisterStage(GRPC_CLIENT_DIRECT_CHANNEL,
INT_MAX - 1,
maybe_prepend_client_auth_filter);
builder->channel_init()->RegisterStage(GRPC_SERVER_CHANNEL, INT_MAX - 1,
maybe_prepend_server_auth_filter);
// Register the GrpcServerAuthzFilter with a priority less than
// server_auth_filter to allow server_auth_filter on which the grpc filter
// depends on to be higher on the channel stack.
builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, INT_MAX - 2, maybe_prepend_grpc_server_authz_filter);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &ClientAuthFilter::kFilter)
.IfHasChannelArg(GRPC_ARG_SECURITY_CONNECTOR);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL, &ClientAuthFilter::kFilter)
.IfHasChannelArg(GRPC_ARG_SECURITY_CONNECTOR);
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &ServerAuthFilter::kFilter)
.IfHasChannelArg(GRPC_SERVER_CREDENTIALS_ARG);
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL,
&GrpcServerAuthzFilter::kFilterVtable)
.IfHasChannelArg(GRPC_ARG_AUTHORIZATION_POLICY_PROVIDER)
.After({&ServerAuthFilter::kFilter});
}
} // namespace grpc_core

@ -63,6 +63,7 @@ extern void RegisterRoundRobinLbPolicy(CoreConfiguration::Builder* builder);
extern void RegisterWeightedRoundRobinLbPolicy(
CoreConfiguration::Builder* builder);
extern void RegisterHttpProxyMapper(CoreConfiguration::Builder* builder);
extern void RegisterConnectedChannel(CoreConfiguration::Builder* builder);
#ifndef GRPC_NO_RLS
extern void RegisterRlsLbPolicy(CoreConfiguration::Builder* builder);
#endif // !GRPC_NO_RLS
@ -88,6 +89,7 @@ void BuildCoreConfiguration(CoreConfiguration::Builder* builder) {
SecurityRegisterHandshakerFactories(builder);
RegisterClientAuthorityFilter(builder);
RegisterChannelIdleFilters(builder);
RegisterConnectedChannel(builder);
RegisterGrpcLbPolicy(builder);
RegisterHttpFilters(builder);
RegisterDeadlineFilter(builder);

@ -0,0 +1,87 @@
//
//
// Copyright 2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include "src/cpp/common/channel_filter.h"
#include <type_traits>
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/slice/slice.h"
namespace grpc {
// MetadataBatch
void MetadataBatch::AddMetadata(const string& key, const string& value) {
batch_->Append(key, grpc_core::Slice::FromCopiedString(value),
[&](absl::string_view error, const grpc_core::Slice&) {
gpr_log(GPR_INFO, "%s",
absl::StrCat("MetadataBatch::AddMetadata error:",
error, " key=", key, " value=", value)
.c_str());
});
}
// ChannelData
void ChannelData::StartTransportOp(grpc_channel_element* elem,
TransportOp* op) {
grpc_channel_next_op(elem, op->op());
}
void ChannelData::GetInfo(grpc_channel_element* elem,
const grpc_channel_info* channel_info) {
grpc_channel_next_get_info(elem, channel_info);
}
// CallData
void CallData::StartTransportStreamOpBatch(grpc_call_element* elem,
TransportStreamOpBatch* op) {
grpc_call_next_op(elem, op->op());
}
void CallData::SetPollsetOrPollsetSet(grpc_call_element* elem,
grpc_polling_entity* pollent) {
grpc_call_stack_ignore_set_pollset_or_pollset_set(elem, pollent);
}
namespace internal {
void RegisterChannelFilter(
grpc_channel_stack_type stack_type, int,
std::function<bool(const grpc_core::ChannelArgs&)> include_filter,
const grpc_channel_filter* filter) {
grpc_core::CoreConfiguration::RegisterBuilder(
[stack_type, filter, include_filter = std::move(include_filter)](
grpc_core::CoreConfiguration::Builder* builder) {
auto& f = builder->channel_init()->RegisterFilter(stack_type, filter);
if (include_filter) f.If(include_filter);
});
}
} // namespace internal
} // namespace grpc

@ -20,8 +20,6 @@
#include "src/cpp/ext/filters/census/grpc_plugin.h"
#include <limits.h>
#include <atomic>
#include "absl/base/attributes.h"
@ -33,7 +31,6 @@
#include <grpcpp/server_context.h>
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/cpp/ext/filters/census/client_filter.h"
@ -47,13 +44,9 @@ void RegisterOpenCensusPlugin() {
new grpc::internal::OpenCensusServerCallTracerFactory);
grpc_core::CoreConfiguration::RegisterBuilder(
[](grpc_core::CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_CLIENT_CHANNEL, /*priority=*/INT_MAX,
[](grpc_core::ChannelStackBuilder* builder) {
builder->PrependFilter(
&grpc::internal::OpenCensusClientFilter::kFilter);
return true;
});
builder->channel_init()->RegisterFilter(
GRPC_CLIENT_CHANNEL,
&grpc::internal::OpenCensusClientFilter::kFilter);
});
// Access measures to ensure they are initialized. Otherwise, creating a view

@ -58,7 +58,6 @@ grpc_cc_library(
language = "c++",
visibility = ["//:__subpackages__"],
deps = [
"//:channel_stack_builder",
"//:config",
"//:gpr",
"//:gpr_platform",

@ -20,8 +20,6 @@
#include "src/cpp/ext/otel/otel_plugin.h"
#include <limits.h>
#include <type_traits>
#include <utility>
@ -36,7 +34,6 @@
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/cpp/ext/otel/otel_client_filter.h"
@ -251,20 +248,17 @@ void OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() {
grpc_core::CoreConfiguration::RegisterBuilder(
[target_selector = std::move(target_selector_)](
grpc_core::CoreConfiguration::Builder* builder) mutable {
builder->channel_init()->RegisterStage(
GRPC_CLIENT_CHANNEL, /*priority=*/INT_MAX,
[target_selector = std::move(target_selector)](
grpc_core::ChannelStackBuilder* builder) {
builder->channel_init()
->RegisterFilter(
GRPC_CLIENT_CHANNEL,
&grpc::internal::OpenTelemetryClientFilter::kFilter)
.If([target_selector = std::move(target_selector)](
const grpc_core::ChannelArgs& args) {
// Only register the filter if no channel selector has been set or
// the target selector returns true for the target.
if (target_selector == nullptr ||
target_selector(builder->channel_args()
.GetString(GRPC_ARG_SERVER_URI)
.value_or(""))) {
builder->PrependFilter(
&grpc::internal::OpenTelemetryClientFilter::kFilter);
}
return true;
return target_selector == nullptr ||
target_selector(
args.GetString(GRPC_ARG_SERVER_URI).value_or(""));
});
});
}

@ -479,6 +479,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/channel/channel_stack.cc',
'src/core/lib/channel/channel_stack_builder.cc',
'src/core/lib/channel/channel_stack_builder_impl.cc',
'src/core/lib/channel/channel_stack_trace.cc',
'src/core/lib/channel/channel_trace.cc',
'src/core/lib/channel/channelz.cc',
'src/core/lib/channel/channelz_registry.cc',

@ -18,23 +18,19 @@
#include "src/core/lib/channel/channel_stack_builder.h"
#include <limits.h>
#include <string.h>
#include <algorithm>
#include <map>
#include <utility>
#include "absl/status/status.h"
#include "gtest/gtest.h"
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder_impl.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "test/core/util/test_config.h"
namespace grpc_core {
@ -57,70 +53,57 @@ void CallDestroyFunc(grpc_call_element* /*elem*/,
const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/) {}
bool g_replacement_fn_called = false;
bool g_original_fn_called = false;
void SetReplacementFnCalled(grpc_channel_stack*, grpc_channel_element*) {
g_replacement_fn_called = true;
}
void SetOriginalFnCalled(grpc_channel_stack*, grpc_channel_element*) {
g_original_fn_called = true;
const grpc_channel_filter* FilterNamed(const char* name) {
static auto* filters =
new std::map<absl::string_view, const grpc_channel_filter*>;
auto it = filters->find(name);
if (it != filters->end()) return it->second;
return filters
->emplace(
name,
new grpc_channel_filter{
grpc_call_next_op, nullptr, grpc_channel_next_op, 0, CallInitFunc,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
CallDestroyFunc, 0, ChannelInitFunc,
[](grpc_channel_stack*, grpc_channel_element*) {},
ChannelDestroyFunc, grpc_channel_next_get_info, name})
.first->second;
}
TEST(ChannelStackBuilderTest, ReplaceFilter) {
grpc_channel_credentials* creds = grpc_insecure_credentials_create();
grpc_channel* channel =
grpc_channel_create("target name isn't used", creds, nullptr);
grpc_channel_credentials_release(creds);
GPR_ASSERT(channel != nullptr);
// Make sure the high priority filter has been created.
GPR_ASSERT(g_replacement_fn_called);
// ... and that the low priority one hasn't.
GPR_ASSERT(!g_original_fn_called);
grpc_channel_destroy(channel);
}
const grpc_channel_filter replacement_filter = {
grpc_call_next_op, nullptr,
grpc_channel_next_op, 0,
CallInitFunc, grpc_call_stack_ignore_set_pollset_or_pollset_set,
CallDestroyFunc, 0,
ChannelInitFunc, SetReplacementFnCalled,
ChannelDestroyFunc, grpc_channel_next_get_info,
"filter_name"};
const grpc_channel_filter original_filter = {
grpc_call_next_op, nullptr,
grpc_channel_next_op, 0,
CallInitFunc, grpc_call_stack_ignore_set_pollset_or_pollset_set,
CallDestroyFunc, 0,
ChannelInitFunc, SetOriginalFnCalled,
ChannelDestroyFunc, grpc_channel_next_get_info,
"filter_name"};
bool AddReplacementFilter(ChannelStackBuilder* builder) {
// Get rid of any other version of the filter, as determined by having the
// same name.
auto* stk = builder->mutable_stack();
stk->erase(std::remove_if(stk->begin(), stk->end(),
[](const grpc_channel_filter* entry) {
return strcmp(entry->name, "filter_name") == 0;
}),
stk->end());
builder->PrependFilter(&replacement_filter);
return true;
TEST(ChannelStackBuilder, UnknownTarget) {
ChannelStackBuilderImpl builder("alpha-beta-gamma", GRPC_CLIENT_CHANNEL,
ChannelArgs());
EXPECT_EQ(builder.target(), "unknown");
}
bool AddOriginalFilter(ChannelStackBuilder* builder) {
builder->PrependFilter(&original_filter);
return true;
TEST(ChannelStackBuilder, CanPrepend) {
ExecCtx exec_ctx;
ChannelStackBuilderImpl builder("alpha-beta-gamma", GRPC_CLIENT_CHANNEL,
ChannelArgs());
builder.PrependFilter(FilterNamed("filter1"));
builder.PrependFilter(FilterNamed("filter2"));
auto stack = builder.Build();
EXPECT_TRUE(stack.ok());
EXPECT_EQ((*stack)->count, 2);
EXPECT_EQ(grpc_channel_stack_element(stack->get(), 0)->filter,
FilterNamed("filter2"));
EXPECT_EQ(grpc_channel_stack_element(stack->get(), 1)->filter,
FilterNamed("filter1"));
}
TEST(ChannelStackBuilder, UnknownTarget) {
TEST(ChannelStackBuilder, CanAppend) {
ExecCtx exec_ctx;
ChannelStackBuilderImpl builder("alpha-beta-gamma", GRPC_CLIENT_CHANNEL,
ChannelArgs());
EXPECT_EQ(builder.target(), "unknown");
builder.AppendFilter(FilterNamed("filter1"));
builder.AppendFilter(FilterNamed("filter2"));
auto stack = builder.Build();
EXPECT_TRUE(stack.ok());
EXPECT_EQ((*stack)->count, 2);
EXPECT_EQ(grpc_channel_stack_element(stack->get(), 0)->filter,
FilterNamed("filter1"));
EXPECT_EQ(grpc_channel_stack_element(stack->get(), 1)->filter,
FilterNamed("filter2"));
}
} // namespace
@ -130,15 +113,6 @@ TEST(ChannelStackBuilder, UnknownTarget) {
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
grpc_core::CoreConfiguration::RegisterBuilder(
[](grpc_core::CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_CLIENT_CHANNEL, INT_MAX,
grpc_core::testing::AddOriginalFilter);
builder->channel_init()->RegisterStage(
GRPC_CLIENT_CHANNEL, INT_MAX,
grpc_core::testing::AddReplacementFilter);
});
grpc_init();
int ret = RUN_ALL_TESTS();
grpc_shutdown();

@ -54,19 +54,19 @@
#include "test/core/util/test_config.h"
std::vector<std::string> MakeStack(const char* transport_name,
const grpc_core::ChannelArgs& channel_args,
grpc_core::ChannelArgs channel_args,
grpc_channel_stack_type channel_stack_type) {
// create phony channel stack
grpc_core::ChannelStackBuilderImpl builder("test", channel_stack_type,
channel_args);
grpc_transport_vtable fake_transport_vtable;
memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable));
fake_transport_vtable.name = transport_name;
grpc_transport fake_transport = {&fake_transport_vtable};
builder.SetTarget("foo.test.google.fr");
if (transport_name != nullptr) {
builder.SetTransport(&fake_transport);
channel_args = channel_args.SetObject(&fake_transport);
}
grpc_core::ChannelStackBuilderImpl builder("test", channel_stack_type,
channel_args);
builder.SetTarget("foo.test.google.fr");
{
grpc_core::ExecCtx exec_ctx;
GPR_ASSERT(grpc_core::CoreConfiguration::Get().channel_init().CreateStack(

@ -28,7 +28,6 @@
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/status_helper.h"
@ -117,11 +116,7 @@ const grpc_channel_filter test_filter = {
CORE_END2END_TEST(CoreEnd2endTest, FilterCausesClose) {
CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, 0, [](ChannelStackBuilder* builder) {
builder->PrependFilter(&test_filter);
return true;
});
builder->channel_init()->RegisterFilter(GRPC_SERVER_CHANNEL, &test_filter);
});
auto c = NewClientCall("/foo").Timeout(Duration::Seconds(5)).Create();
CoreEnd2endTest::IncomingStatusOnClient server_status;

@ -16,11 +16,8 @@
//
//
#include <limits.h>
#include <algorithm>
#include <initializer_list>
#include <vector>
#include <memory>
#include "absl/status/status.h"
#include "gtest/gtest.h"
@ -30,7 +27,6 @@
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/time.h"
@ -85,19 +81,18 @@ grpc_error_handle init_channel_elem(grpc_channel_element* /*elem*/,
void destroy_channel_elem(grpc_channel_element* /*elem*/) {}
const grpc_channel_filter test_filter = {
start_transport_stream_op_batch,
nullptr,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
destroy_call_elem,
0,
init_channel_elem,
grpc_channel_stack_no_post_init,
destroy_channel_elem,
start_transport_stream_op_batch, nullptr, grpc_channel_next_op,
sizeof(call_data), init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set, destroy_call_elem, 0,
init_channel_elem, grpc_channel_stack_no_post_init, destroy_channel_elem,
grpc_channel_next_get_info,
"filter_context"};
// Want to add the filter as close to the end as possible, to
// make sure that all of the filters work well together.
// However, we can't add it at the very end, because the
// connected channel filter must be the last one.
// Channel init code falls back to lexical ordering of filters if there are
// otherwise no dependencies, so we leverage that.
"zzzzzzz_filter_context"};
// Simple request to test that filters see a consistent view of the
// call context.
@ -105,18 +100,7 @@ CORE_END2END_TEST(CoreEnd2endTest, FilterContext) {
CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
for (auto type : {GRPC_CLIENT_CHANNEL, GRPC_CLIENT_SUBCHANNEL,
GRPC_CLIENT_DIRECT_CHANNEL, GRPC_SERVER_CHANNEL}) {
builder->channel_init()->RegisterStage(
type, INT_MAX, [](ChannelStackBuilder* builder) {
// Want to add the filter as close to the end as possible, to
// make sure that all of the filters work well together.
// However, we can't add it at the very end, because the
// connected channel filter must be the last one. So we add it
// right before the last one.
auto it = builder->mutable_stack()->end();
--it;
builder->mutable_stack()->insert(it, &test_filter);
return true;
});
builder->channel_init()->RegisterFilter(type, &test_filter);
}
});
auto c = NewClientCall("/foo").Timeout(Duration::Seconds(5)).Create();

@ -16,12 +16,8 @@
//
//
#include <limits.h>
#include <algorithm>
#include <memory>
#include <utility>
#include <vector>
#include "absl/status/status.h"
#include "absl/types/optional.h"
@ -33,7 +29,6 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/time.h"
@ -85,33 +80,23 @@ const grpc_channel_filter test_filter = {
return Immediate(ServerMetadataFromStatus(
absl::PermissionDeniedError("access denied")));
},
grpc_channel_next_op,
0,
init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
destroy_call_elem,
0,
init_channel_elem,
grpc_channel_stack_no_post_init,
destroy_channel_elem,
grpc_channel_next_op, 0, init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set, destroy_call_elem, 0,
init_channel_elem, grpc_channel_stack_no_post_init, destroy_channel_elem,
grpc_channel_next_get_info,
"filter_init_fails"};
// Want to add the filter as close to the end as possible,
// to make sure that all of the filters work well together.
// However, we can't add it at the very end, because either the
// client_channel filter or connected_channel filter must be the
// last one.
// Filter ordering code falls back to lexical ordering in the absense of
// other dependencies, so name this appropriately.
"zzzzzz_filter_init_fails"};
void RegisterFilter(grpc_channel_stack_type type) {
CoreConfiguration::RegisterBuilder(
[type](CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
type, INT_MAX, [](ChannelStackBuilder* builder) {
// Want to add the filter as close to the end as possible,
// to make sure that all of the filters work well together.
// However, we can't add it at the very end, because either the
// client_channel filter or connected_channel filter must be the
// last one. So we add it right before the last one.
auto it = builder->mutable_stack()->end();
--it;
builder->mutable_stack()->insert(it, &test_filter);
return true;
});
builder->channel_init()->RegisterFilter(type, &test_filter);
});
}

@ -30,7 +30,6 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/time.h"
@ -173,22 +172,13 @@ grpc_channel_filter FailSendOpsFilter::kFilterVtable = {
"FailSendOpsFilter",
};
bool MaybeAddFilter(ChannelStackBuilder* builder) {
// Skip on proxy (which explicitly disables retries).
if (!builder->channel_args()
.GetBool(GRPC_ARG_ENABLE_RETRIES)
.value_or(true)) {
return true;
}
// Install filter.
builder->PrependFilter(&FailSendOpsFilter::kFilterVtable);
return true;
}
void RegisterFilter() {
CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(GRPC_CLIENT_SUBCHANNEL, 0,
MaybeAddFilter);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&FailSendOpsFilter::kFilterVtable)
// Skip on proxy (which explicitly disables retries).
.IfChannelArg(GRPC_ARG_ENABLE_RETRIES, true);
});
}

@ -29,7 +29,6 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/time.h"
@ -131,18 +130,11 @@ grpc_channel_filter FailFirstSendOpFilter::kFilterVtable = {
// a grpc_error.
CORE_END2END_TEST(RetryTest, RetryRecvMessageReplay) {
CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_CLIENT_SUBCHANNEL, 0, [](ChannelStackBuilder* builder) {
// Skip on proxy (which explicitly disables retries).
if (!builder->channel_args()
.GetBool(GRPC_ARG_ENABLE_RETRIES)
.value_or(true)) {
return true;
}
// Install filter.
builder->PrependFilter(&FailFirstSendOpFilter::kFilterVtable);
return true;
});
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&FailFirstSendOpFilter::kFilterVtable)
// Skip on proxy (which explicitly disables retries).
.IfChannelArg(GRPC_ARG_ENABLE_RETRIES, true);
});
InitServer(ChannelArgs());
InitClient(ChannelArgs().Set(

@ -27,7 +27,6 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/status_helper.h"
@ -118,18 +117,6 @@ grpc_channel_filter InjectStatusFilter::kFilterVtable = {
"InjectStatusFilter",
};
bool AddFilter(ChannelStackBuilder* builder) {
// Skip on proxy (which explicitly disables retries).
if (!builder->channel_args()
.GetBool(GRPC_ARG_ENABLE_RETRIES)
.value_or(true)) {
return true;
}
// Install filter.
builder->PrependFilter(&InjectStatusFilter::kFilterVtable);
return true;
}
// Tests that we honor the error passed to recv_trailing_metadata_ready
// when determining the call's status, even if the op completion runs before
// the recv_trailing_metadata op is started from the surface.
@ -138,8 +125,11 @@ bool AddFilter(ChannelStackBuilder* builder) {
// so no retry is done
CORE_END2END_TEST(RetryTest, RetryRecvTrailingMetadataError) {
CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(GRPC_CLIENT_SUBCHANNEL, 0,
AddFilter);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&InjectStatusFilter::kFilterVtable)
// Skip on proxy (which explicitly disables retries).
.IfChannelArg(GRPC_ARG_ENABLE_RETRIES, true);
});
InitServer(ChannelArgs());
InitClient(ChannelArgs().Set(

@ -29,7 +29,6 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/time.h"
@ -133,18 +132,11 @@ grpc_channel_filter FailFirstCallFilter::kFilterVtable = {
// attempts are allowed
CORE_END2END_TEST(RetryTest, RetrySendOpFails) {
CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_CLIENT_SUBCHANNEL, 0, [](ChannelStackBuilder* builder) {
// Skip on proxy (which explicitly disables retries).
if (!builder->channel_args()
.GetBool(GRPC_ARG_ENABLE_RETRIES)
.value_or(true)) {
return true;
}
// Install filter.
builder->PrependFilter(&FailFirstCallFilter::kFilterVtable);
return true;
});
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&FailFirstCallFilter::kFilterVtable)
// Skip on proxy (which explicitly disables retries).
.IfChannelArg(GRPC_ARG_ENABLE_RETRIES, true);
});
InitServer(ChannelArgs());
InitClient(ChannelArgs().Set(

@ -24,17 +24,14 @@
#include <grpc/impl/channel_arg_names.h>
#include <grpc/status.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
@ -135,19 +132,11 @@ grpc_channel_filter FailFirstCallFilter::kFilterVtable = {
// Tests transparent retries when the call was never sent out on the wire.
CORE_END2END_TEST(RetryTest, TransparentGoaway) {
CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY + 1,
[](ChannelStackBuilder* builder) {
// Skip on proxy (which explicitly disables retries).
if (!builder->channel_args()
.GetBool(GRPC_ARG_ENABLE_RETRIES)
.value_or(true)) {
return true;
}
// Install filter.
builder->PrependFilter(&FailFirstCallFilter::kFilterVtable);
return true;
});
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&FailFirstCallFilter::kFilterVtable)
// Skip on proxy (which explicitly disables retries).
.IfChannelArg(GRPC_ARG_ENABLE_RETRIES, true);
});
auto c =
NewClientCall("/service/method").Timeout(Duration::Minutes(1)).Create();

@ -26,17 +26,14 @@
#include <grpc/impl/channel_arg_names.h>
#include <grpc/status.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
@ -134,19 +131,11 @@ grpc_channel_filter FailFirstTenCallsFilter::kFilterVtable = {
// Tests transparent retries when the call was never sent out on the wire.
CORE_END2END_TEST(RetryTest, RetryTransparentNotSentOnWire) {
CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY + 1,
[](ChannelStackBuilder* builder) {
// Skip on proxy (which explicitly disables retries).
if (!builder->channel_args()
.GetBool(GRPC_ARG_ENABLE_RETRIES)
.value_or(true)) {
return true;
}
// Install filter.
builder->PrependFilter(&FailFirstTenCallsFilter::kFilterVtable);
return true;
});
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&FailFirstTenCallsFilter::kFilterVtable)
// Skip on proxy (which explicitly disables retries).
.IfChannelArg(GRPC_ARG_ENABLE_RETRIES, true);
});
auto c =
NewClientCall("/service/method").Timeout(Duration::Minutes(1)).Create();

@ -32,6 +32,20 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "channel_init_test",
srcs = ["channel_init_test.cc"],
external_deps = ["gtest"],
language = "C++",
uses_event_engine = False,
uses_polling = False,
deps = [
"//:gpr",
"//:grpc",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "grpc_completion_queue_test",
srcs = ["completion_queue_test.cc"],

@ -0,0 +1,212 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/surface/channel_init.h"
#include <map>
#include <string>
#include "absl/strings/string_view.h"
#include "gtest/gtest.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder_impl.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "test/core/util/test_config.h"
namespace grpc_core {
namespace {
const grpc_channel_filter* FilterNamed(const char* name) {
static auto* filters =
new std::map<absl::string_view, const grpc_channel_filter*>;
auto it = filters->find(name);
if (it != filters->end()) return it->second;
return filters
->emplace(name,
new grpc_channel_filter{nullptr, nullptr, nullptr, 0, nullptr,
nullptr, nullptr, 0, nullptr, nullptr,
nullptr, nullptr, name})
.first->second;
}
std::vector<std::string> GetFilterNames(const ChannelInit& init,
grpc_channel_stack_type type,
const ChannelArgs& args) {
ChannelStackBuilderImpl b("test", type, args);
if (!init.CreateStack(&b)) return {};
std::vector<std::string> names;
for (auto f : b.stack()) {
names.push_back(f->name);
}
EXPECT_NE(names, std::vector<std::string>());
return names;
}
TEST(ChannelInitTest, Empty) {
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("terminator")).Terminal();
auto init = b.Build();
EXPECT_EQ(GetFilterNames(init, GRPC_CLIENT_CHANNEL, ChannelArgs()),
std::vector<std::string>({"terminator"}));
}
TEST(ChannelInitTest, OneClientFilter) {
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("foo"));
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("terminator")).Terminal();
b.RegisterFilter(GRPC_SERVER_CHANNEL, FilterNamed("terminator")).Terminal();
auto init = b.Build();
EXPECT_EQ(GetFilterNames(init, GRPC_CLIENT_CHANNEL, ChannelArgs()),
std::vector<std::string>({"foo", "terminator"}));
EXPECT_EQ(GetFilterNames(init, GRPC_SERVER_CHANNEL, ChannelArgs()),
std::vector<std::string>({"terminator"}));
}
TEST(ChannelInitTest, DefaultLexicalOrdering) {
// ChannelInit defaults to lexical ordering in the absense of other
// constraints, to ensure that a stable ordering is produced between builds.
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("foo"));
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("bar"));
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("baz"));
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("aaa")).Terminal();
auto init = b.Build();
EXPECT_EQ(GetFilterNames(init, GRPC_CLIENT_CHANNEL, ChannelArgs()),
std::vector<std::string>({"bar", "baz", "foo", "aaa"}));
}
TEST(ChannelInitTest, AfterConstraintsApply) {
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("foo"));
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("bar"))
.After({FilterNamed("foo")});
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("baz"));
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("aaa")).Terminal();
auto init = b.Build();
EXPECT_EQ(GetFilterNames(init, GRPC_CLIENT_CHANNEL, ChannelArgs()),
std::vector<std::string>({"baz", "foo", "bar", "aaa"}));
}
TEST(ChannelInitTest, BeforeConstraintsApply) {
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("foo"))
.Before({FilterNamed("bar")});
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("bar"));
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("baz"));
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("aaa")).Terminal();
auto init = b.Build();
EXPECT_EQ(GetFilterNames(init, GRPC_CLIENT_CHANNEL, ChannelArgs()),
std::vector<std::string>({"baz", "foo", "bar", "aaa"}));
}
TEST(ChannelInitTest, PredicatesCanFilter) {
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("foo"))
.IfChannelArg("foo", true);
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("bar"))
.IfChannelArg("bar", false);
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("aaa")).Terminal();
auto init = b.Build();
EXPECT_EQ(GetFilterNames(init, GRPC_CLIENT_CHANNEL, ChannelArgs()),
std::vector<std::string>({"foo", "aaa"}));
EXPECT_EQ(GetFilterNames(init, GRPC_CLIENT_CHANNEL,
ChannelArgs().Set("foo", false)),
std::vector<std::string>({"aaa"}));
EXPECT_EQ(
GetFilterNames(init, GRPC_CLIENT_CHANNEL, ChannelArgs().Set("bar", true)),
std::vector<std::string>({"bar", "foo", "aaa"}));
EXPECT_EQ(GetFilterNames(init, GRPC_CLIENT_CHANNEL,
ChannelArgs().Set("bar", true).Set("foo", false)),
std::vector<std::string>({"bar", "aaa"}));
}
TEST(ChannelInitTest, CanAddTerminalFilter) {
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("foo"));
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("bar")).Terminal();
auto init = b.Build();
EXPECT_EQ(GetFilterNames(init, GRPC_CLIENT_CHANNEL, ChannelArgs()),
std::vector<std::string>({"foo", "bar"}));
}
TEST(ChannelInitTest, CanAddMultipleTerminalFilters) {
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("foo"));
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("bar"))
.Terminal()
.IfChannelArg("bar", false);
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("baz"))
.Terminal()
.IfChannelArg("baz", false);
auto init = b.Build();
EXPECT_EQ(GetFilterNames(init, GRPC_CLIENT_CHANNEL, ChannelArgs()),
std::vector<std::string>());
EXPECT_EQ(
GetFilterNames(init, GRPC_CLIENT_CHANNEL, ChannelArgs().Set("bar", true)),
std::vector<std::string>({"foo", "bar"}));
EXPECT_EQ(
GetFilterNames(init, GRPC_CLIENT_CHANNEL, ChannelArgs().Set("baz", true)),
std::vector<std::string>({"foo", "baz"}));
EXPECT_EQ(GetFilterNames(init, GRPC_CLIENT_CHANNEL,
ChannelArgs().Set("bar", true).Set("baz", true)),
std::vector<std::string>());
}
TEST(ChannelInitTest, CanAddBeforeAllOnce) {
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("foo")).BeforeAll();
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("bar"));
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("baz"));
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("aaa")).Terminal();
EXPECT_EQ(GetFilterNames(b.Build(), GRPC_CLIENT_CHANNEL, ChannelArgs()),
std::vector<std::string>({"foo", "bar", "baz", "aaa"}));
}
TEST(ChannelInitTest, CanAddBeforeAllTwice) {
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("foo")).BeforeAll();
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("bar")).BeforeAll();
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("baz"));
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("aaa")).Terminal();
EXPECT_DEATH_IF_SUPPORTED(b.Build(), "Unresolvable graph of channel filters");
}
TEST(ChannelInitTest, CanPostProcessFilters) {
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("foo"));
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("aaa")).Terminal();
int called_post_processor = 0;
b.RegisterPostProcessor(
GRPC_CLIENT_CHANNEL,
ChannelInit::PostProcessorSlot::kXdsChannelStackModifier,
[&called_post_processor](ChannelStackBuilder& b) {
++called_post_processor;
b.mutable_stack()->push_back(FilterNamed("bar"));
});
auto init = b.Build();
EXPECT_EQ(called_post_processor, 0);
EXPECT_EQ(GetFilterNames(init, GRPC_CLIENT_CHANNEL, ChannelArgs()),
std::vector<std::string>({"foo", "aaa", "bar"}));
}
} // namespace
} // namespace grpc_core
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestGrpcScope grpc_scope;
return RUN_ALL_TESTS();
}

@ -23,7 +23,6 @@
#include <stdlib.h>
#include <string.h>
#include <algorithm>
#include <atomic>
#include <memory>
#include <new>
@ -55,7 +54,6 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/debug_location.h"
@ -211,7 +209,13 @@ grpc_channel_filter TrailingMetadataRecordingFilter::kFilterVtable = {
grpc_channel_stack_no_post_init,
Destroy,
grpc_channel_next_get_info,
"trailing-metadata-recording-filter",
// Want to add the filter as close to the end as possible, to
// make sure that all of the filters work well together.
// However, we can't add it at the very end, because the
// connected channel filter must be the last one.
// Channel init code falls back to lexical ordering of filters if there are
// otherwise no dependencies, so we leverage that.
"zzzzzz_trailing-metadata-recording-filter",
};
bool TrailingMetadataRecordingFilter::trailing_metadata_available_;
absl::optional<GrpcStreamNetworkState::ValueType>
@ -791,22 +795,7 @@ int main(int argc, char** argv) {
grpc_core::CoreConfiguration::RunWithSpecialConfiguration(
[](grpc_core::CoreConfiguration::Builder* builder) {
grpc_core::BuildCoreConfiguration(builder);
auto register_stage = [builder](grpc_channel_stack_type type,
const grpc_channel_filter* filter) {
builder->channel_init()->RegisterStage(
type, INT_MAX, [filter](grpc_core::ChannelStackBuilder* builder) {
// Want to add the filter as close to the end as possible, to
// make sure that all of the filters work well together.
// However, we can't add it at the very end, because the
// connected channel filter must be the last one. So we add it
// right before the last one.
auto it = builder->mutable_stack()->end();
--it;
builder->mutable_stack()->insert(it, filter);
return true;
});
};
register_stage(
builder->channel_init()->RegisterFilter(
GRPC_CLIENT_SUBCHANNEL,
&grpc_core::TrailingMetadataRecordingFilter::kFilterVtable);
},

@ -90,14 +90,14 @@ TEST(XdsChannelStackModifierTest, XdsHttpFiltersInsertion) {
grpc_arg arg = channel_stack_modifier->MakeChannelArg();
// Create a phony ChannelStackBuilder object
grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
ChannelStackBuilderImpl builder("test", GRPC_SERVER_CHANNEL,
ChannelArgs::FromC(args));
grpc_channel_args_destroy(args);
grpc_transport_vtable fake_transport_vtable;
memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable));
fake_transport_vtable.name = "fake";
grpc_transport fake_transport = {&fake_transport_vtable};
builder.SetTransport(&fake_transport);
ChannelStackBuilderImpl builder(
"test", GRPC_SERVER_CHANNEL,
ChannelArgs::FromC(args).SetObject<grpc_transport>(&fake_transport));
grpc_channel_args_destroy(args);
// Construct channel stack and verify that the test filters were successfully
// added
{

@ -30,6 +30,7 @@
#include "gtest/gtest.h"
#include <grpc++/grpc++.h>
#include <grpcpp/opencensus.h>
#include <grpcpp/support/status.h>
#include "src/core/ext/filters/logging/logging_filter.h"
@ -103,6 +104,7 @@ class LoggingTest : public ::testing::Test {
protected:
static void SetUpTestSuite() {
g_test_logging_sink = new TestLoggingSink;
grpc::RegisterOpenCensusPlugin();
grpc_core::RegisterLoggingFilter(g_test_logging_sink);
}

@ -56,10 +56,7 @@ using ::testing::UnorderedElementsAre;
class LoggingCensusIntegrationTest : public LoggingTest {
protected:
static void SetUpTestSuite() {
LoggingTest::SetUpTestSuite();
RegisterOpenCensusPlugin();
}
static void SetUpTestSuite() { LoggingTest::SetUpTestSuite(); }
static void TearDownTestSuite() { LoggingTest::TearDownTestSuite(); }
};

@ -2042,6 +2042,8 @@ src/core/lib/channel/channel_stack_builder.cc \
src/core/lib/channel/channel_stack_builder.h \
src/core/lib/channel/channel_stack_builder_impl.cc \
src/core/lib/channel/channel_stack_builder_impl.h \
src/core/lib/channel/channel_stack_trace.cc \
src/core/lib/channel/channel_stack_trace.h \
src/core/lib/channel/channel_trace.cc \
src/core/lib/channel/channel_trace.h \
src/core/lib/channel/channelz.cc \

@ -1818,6 +1818,8 @@ src/core/lib/channel/channel_stack_builder.cc \
src/core/lib/channel/channel_stack_builder.h \
src/core/lib/channel/channel_stack_builder_impl.cc \
src/core/lib/channel/channel_stack_builder_impl.h \
src/core/lib/channel/channel_stack_trace.cc \
src/core/lib/channel/channel_stack_trace.h \
src/core/lib/channel/channel_trace.cc \
src/core/lib/channel/channel_trace.h \
src/core/lib/channel/channelz.cc \

@ -1729,6 +1729,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "channel_init_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save