[call-v3] Filter executor (#35533)

A call execution environment for the V3 runtime.

The `CallFilters` class will ultimately be a (private) member of `CallSpine`, and the `StackBuilder` component will be used by a channel when all of the filters it needs are known to allow the call spine to start processing a call.

This is accompanied by a reasonably extensive test suite.

I expect to fine tune semantics, implementation, and tests over the coming weeks/months as we iterate to bring up the rest of the pieces.

Closes #35533

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35533 from ctiller:filters 689c7b527b
PiperOrigin-RevId: 599220150
pull/35549/head
Craig Tiller 11 months ago committed by Copybara-Service
parent c6755cb4b9
commit 584c0c0c98
  1. 3
      BUILD
  2. 100
      CMakeLists.txt
  3. 6
      Makefile
  4. 6
      Package.swift
  5. 181
      build_autogenerated.yaml
  6. 3
      config.m4
  7. 3
      config.w32
  8. 6
      gRPC-C++.podspec
  9. 9
      gRPC-Core.podspec
  10. 6
      grpc.gemspec
  11. 9
      grpc.gyp
  12. 6
      package.xml
  13. 51
      src/core/BUILD
  14. 3
      src/core/lib/channel/promise_based_filter.h
  15. 33
      src/core/lib/promise/status_flag.h
  16. 343
      src/core/lib/transport/call_filters.cc
  17. 1180
      src/core/lib/transport/call_filters.h
  18. 43
      src/core/lib/transport/message.cc
  19. 61
      src/core/lib/transport/message.h
  20. 37
      src/core/lib/transport/metadata.cc
  21. 78
      src/core/lib/transport/metadata.h
  22. 34
      src/core/lib/transport/transport.cc
  23. 88
      src/core/lib/transport/transport.h
  24. 3
      src/python/grpcio/grpc_core_dependencies.py
  25. 14
      test/core/transport/BUILD
  26. 1473
      test/core/transport/call_filters_test.cc
  27. 6
      tools/doxygen/Doxyfile.c++.internal
  28. 6
      tools/doxygen/Doxyfile.core.internal
  29. 24
      tools/run_tests/generated/tests.json

@ -1530,6 +1530,7 @@ grpc_cc_library(
"//src/core:arena_promise",
"//src/core:atomic_utils",
"//src/core:bitset",
"//src/core:call_filters",
"//src/core:call_final_info",
"//src/core:cancel_callback",
"//src/core:channel_args",
@ -1570,6 +1571,8 @@ grpc_cc_library(
"//src/core:loop",
"//src/core:map",
"//src/core:memory_quota",
"//src/core:message",
"//src/core:metadata",
"//src/core:metadata_batch",
"//src/core:no_destruct",
"//src/core:per_cpu",

100
CMakeLists.txt generated

@ -925,6 +925,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx byte_buffer_test)
add_dependencies(buildtests_cxx c_slice_buffer_test)
add_dependencies(buildtests_cxx call_creds_test)
add_dependencies(buildtests_cxx call_filters_test)
add_dependencies(buildtests_cxx call_finalization_test)
add_dependencies(buildtests_cxx call_host_override_test)
add_dependencies(buildtests_cxx call_tracer_test)
@ -2520,12 +2521,15 @@ add_library(grpc
src/core/lib/surface/version.cc
src/core/lib/transport/batch_builder.cc
src/core/lib/transport/bdp_estimator.cc
src/core/lib/transport/call_filters.cc
src/core/lib/transport/call_final_info.cc
src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/error_utils.cc
src/core/lib/transport/handshaker.cc
src/core/lib/transport/handshaker_registry.cc
src/core/lib/transport/http_connect_handshaker.cc
src/core/lib/transport/message.cc
src/core/lib/transport/metadata.cc
src/core/lib/transport/metadata_batch.cc
src/core/lib/transport/parsed_metadata.cc
src/core/lib/transport/status_conversion.cc
@ -3230,12 +3234,15 @@ add_library(grpc_unsecure
src/core/lib/surface/version.cc
src/core/lib/transport/batch_builder.cc
src/core/lib/transport/bdp_estimator.cc
src/core/lib/transport/call_filters.cc
src/core/lib/transport/call_final_info.cc
src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/error_utils.cc
src/core/lib/transport/handshaker.cc
src/core/lib/transport/handshaker_registry.cc
src/core/lib/transport/http_connect_handshaker.cc
src/core/lib/transport/message.cc
src/core/lib/transport/metadata.cc
src/core/lib/transport/metadata_batch.cc
src/core/lib/transport/parsed_metadata.cc
src/core/lib/transport/status_conversion.cc
@ -5312,11 +5319,14 @@ add_library(grpc_authorization_provider
src/core/lib/surface/validate_metadata.cc
src/core/lib/surface/version.cc
src/core/lib/transport/batch_builder.cc
src/core/lib/transport/call_filters.cc
src/core/lib/transport/call_final_info.cc
src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/error_utils.cc
src/core/lib/transport/handshaker.cc
src/core/lib/transport/handshaker_registry.cc
src/core/lib/transport/message.cc
src/core/lib/transport/metadata.cc
src/core/lib/transport/metadata_batch.cc
src/core/lib/transport/parsed_metadata.cc
src/core/lib/transport/status_conversion.cc
@ -8376,6 +8386,96 @@ target_link_libraries(call_creds_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(call_filters_test
src/core/ext/upb-gen/google/protobuf/any.upb_minitable.c
src/core/ext/upb-gen/google/protobuf/descriptor.upb_minitable.c
src/core/ext/upb-gen/google/rpc/status.upb_minitable.c
src/core/lib/channel/channel_args.cc
src/core/lib/compression/compression_internal.cc
src/core/lib/debug/trace.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/gprpp/ref_counted_string.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
src/core/lib/iomgr/closure.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/iomgr_internal.cc
src/core/lib/promise/activity.cc
src/core/lib/promise/trace.cc
src/core/lib/resource_quota/arena.cc
src/core/lib/resource_quota/memory_quota.cc
src/core/lib/resource_quota/periodic_update.cc
src/core/lib/resource_quota/trace.cc
src/core/lib/slice/percent_encoding.cc
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_buffer.cc
src/core/lib/slice/slice_refcount.cc
src/core/lib/slice/slice_string_helpers.cc
src/core/lib/surface/channel_stack_type.cc
src/core/lib/transport/call_filters.cc
src/core/lib/transport/call_final_info.cc
src/core/lib/transport/error_utils.cc
src/core/lib/transport/message.cc
src/core/lib/transport/metadata.cc
src/core/lib/transport/metadata_batch.cc
src/core/lib/transport/parsed_metadata.cc
src/core/lib/transport/status_conversion.cc
src/core/lib/transport/timeout_encoding.cc
test/core/transport/call_filters_test.cc
third_party/upb/upb/message/accessors.c
third_party/upb/upb/mini_descriptor/build_enum.c
third_party/upb/upb/mini_descriptor/decode.c
third_party/upb/upb/mini_descriptor/internal/base92.c
third_party/upb/upb/mini_descriptor/internal/encode.c
third_party/upb/upb/mini_descriptor/link.c
third_party/upb/upb/wire/decode.c
third_party/upb/upb/wire/decode_fast.c
third_party/upb/upb/wire/encode.c
third_party/upb/upb/wire/eps_copy_input_stream.c
third_party/upb/upb/wire/reader.c
)
target_compile_features(call_filters_test PUBLIC cxx_std_14)
target_include_directories(call_filters_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(call_filters_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
upb_message_lib
utf8_range_lib
absl::inlined_vector
absl::function_ref
absl::hash
absl::type_traits
absl::statusor
absl::utility
gpr
)
endif()
if(gRPC_BUILD_TESTS)

6
Makefile generated

@ -1706,12 +1706,15 @@ LIBGRPC_SRC = \
src/core/lib/surface/version.cc \
src/core/lib/transport/batch_builder.cc \
src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_final_info.cc \
src/core/lib/transport/connectivity_state.cc \
src/core/lib/transport/error_utils.cc \
src/core/lib/transport/handshaker.cc \
src/core/lib/transport/handshaker_registry.cc \
src/core/lib/transport/http_connect_handshaker.cc \
src/core/lib/transport/message.cc \
src/core/lib/transport/metadata.cc \
src/core/lib/transport/metadata_batch.cc \
src/core/lib/transport/parsed_metadata.cc \
src/core/lib/transport/status_conversion.cc \
@ -2251,12 +2254,15 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/surface/version.cc \
src/core/lib/transport/batch_builder.cc \
src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_final_info.cc \
src/core/lib/transport/connectivity_state.cc \
src/core/lib/transport/error_utils.cc \
src/core/lib/transport/handshaker.cc \
src/core/lib/transport/handshaker_registry.cc \
src/core/lib/transport/http_connect_handshaker.cc \
src/core/lib/transport/message.cc \
src/core/lib/transport/metadata.cc \
src/core/lib/transport/metadata_batch.cc \
src/core/lib/transport/parsed_metadata.cc \
src/core/lib/transport/status_conversion.cc \

6
Package.swift generated

@ -1889,6 +1889,8 @@ let package = Package(
"src/core/lib/transport/batch_builder.h",
"src/core/lib/transport/bdp_estimator.cc",
"src/core/lib/transport/bdp_estimator.h",
"src/core/lib/transport/call_filters.cc",
"src/core/lib/transport/call_filters.h",
"src/core/lib/transport/call_final_info.cc",
"src/core/lib/transport/call_final_info.h",
"src/core/lib/transport/connectivity_state.cc",
@ -1904,6 +1906,10 @@ let package = Package(
"src/core/lib/transport/http2_errors.h",
"src/core/lib/transport/http_connect_handshaker.cc",
"src/core/lib/transport/http_connect_handshaker.h",
"src/core/lib/transport/message.cc",
"src/core/lib/transport/message.h",
"src/core/lib/transport/metadata.cc",
"src/core/lib/transport/metadata.h",
"src/core/lib/transport/metadata_batch.cc",
"src/core/lib/transport/metadata_batch.h",
"src/core/lib/transport/metadata_compression_traits.h",

@ -1169,6 +1169,7 @@ libs:
- src/core/lib/surface/wait_for_cq_end_op.h
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/bdp_estimator.h
- src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h
- src/core/lib/transport/connectivity_state.h
- src/core/lib/transport/custom_metadata.h
@ -1178,6 +1179,8 @@ libs:
- src/core/lib/transport/handshaker_registry.h
- src/core/lib/transport/http2_errors.h
- src/core/lib/transport/http_connect_handshaker.h
- src/core/lib/transport/message.h
- src/core/lib/transport/metadata.h
- src/core/lib/transport/metadata_batch.h
- src/core/lib/transport/metadata_compression_traits.h
- src/core/lib/transport/parsed_metadata.h
@ -1967,12 +1970,15 @@ libs:
- src/core/lib/surface/version.cc
- src/core/lib/transport/batch_builder.cc
- src/core/lib/transport/bdp_estimator.cc
- src/core/lib/transport/call_filters.cc
- src/core/lib/transport/call_final_info.cc
- src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/error_utils.cc
- src/core/lib/transport/handshaker.cc
- src/core/lib/transport/handshaker_registry.cc
- src/core/lib/transport/http_connect_handshaker.cc
- src/core/lib/transport/message.cc
- src/core/lib/transport/metadata.cc
- src/core/lib/transport/metadata_batch.cc
- src/core/lib/transport/parsed_metadata.cc
- src/core/lib/transport/status_conversion.cc
@ -2604,6 +2610,7 @@ libs:
- src/core/lib/surface/wait_for_cq_end_op.h
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/bdp_estimator.h
- src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h
- src/core/lib/transport/connectivity_state.h
- src/core/lib/transport/custom_metadata.h
@ -2613,6 +2620,8 @@ libs:
- src/core/lib/transport/handshaker_registry.h
- src/core/lib/transport/http2_errors.h
- src/core/lib/transport/http_connect_handshaker.h
- src/core/lib/transport/message.h
- src/core/lib/transport/metadata.h
- src/core/lib/transport/metadata_batch.h
- src/core/lib/transport/metadata_compression_traits.h
- src/core/lib/transport/parsed_metadata.h
@ -3020,12 +3029,15 @@ libs:
- src/core/lib/surface/version.cc
- src/core/lib/transport/batch_builder.cc
- src/core/lib/transport/bdp_estimator.cc
- src/core/lib/transport/call_filters.cc
- src/core/lib/transport/call_final_info.cc
- src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/error_utils.cc
- src/core/lib/transport/handshaker.cc
- src/core/lib/transport/handshaker_registry.cc
- src/core/lib/transport/http_connect_handshaker.cc
- src/core/lib/transport/message.cc
- src/core/lib/transport/metadata.cc
- src/core/lib/transport/metadata_batch.cc
- src/core/lib/transport/parsed_metadata.cc
- src/core/lib/transport/status_conversion.cc
@ -4632,6 +4644,7 @@ libs:
- src/core/lib/surface/validate_metadata.h
- src/core/lib/surface/wait_for_cq_end_op.h
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h
- src/core/lib/transport/connectivity_state.h
- src/core/lib/transport/custom_metadata.h
@ -4640,6 +4653,8 @@ libs:
- src/core/lib/transport/handshaker_factory.h
- src/core/lib/transport/handshaker_registry.h
- src/core/lib/transport/http2_errors.h
- src/core/lib/transport/message.h
- src/core/lib/transport/metadata.h
- src/core/lib/transport/metadata_batch.h
- src/core/lib/transport/metadata_compression_traits.h
- src/core/lib/transport/parsed_metadata.h
@ -4925,11 +4940,14 @@ libs:
- src/core/lib/surface/validate_metadata.cc
- src/core/lib/surface/version.cc
- src/core/lib/transport/batch_builder.cc
- src/core/lib/transport/call_filters.cc
- src/core/lib/transport/call_final_info.cc
- src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/error_utils.cc
- src/core/lib/transport/handshaker.cc
- src/core/lib/transport/handshaker_registry.cc
- src/core/lib/transport/message.cc
- src/core/lib/transport/metadata.cc
- src/core/lib/transport/metadata_batch.cc
- src/core/lib/transport/parsed_metadata.cc
- src/core/lib/transport/status_conversion.cc
@ -6218,6 +6236,169 @@ targets:
- grpc_authorization_provider
- grpc_unsecure
- grpc_test_util
- name: call_filters_test
gtest: true
build: test
language: c++
headers:
- src/core/ext/upb-gen/google/protobuf/any.upb.h
- src/core/ext/upb-gen/google/protobuf/any.upb_minitable.h
- src/core/ext/upb-gen/google/protobuf/descriptor.upb.h
- src/core/ext/upb-gen/google/rpc/status.upb.h
- src/core/ext/upb-gen/google/rpc/status.upb_minitable.h
- src/core/lib/avl/avl.h
- src/core/lib/channel/channel_args.h
- src/core/lib/compression/compression_internal.h
- src/core/lib/debug/trace.h
- src/core/lib/experiments/config.h
- src/core/lib/experiments/experiments.h
- src/core/lib/gpr/spinlock.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/chunked_vector.h
- src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/if_list.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/packed_table.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
- src/core/lib/gprpp/ref_counted_string.h
- src/core/lib/gprpp/sorted_pack.h
- src/core/lib/gprpp/status_helper.h
- src/core/lib/gprpp/table.h
- src/core/lib/gprpp/time.h
- src/core/lib/gprpp/type_list.h
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/combiner.h
- src/core/lib/iomgr/error.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/iomgr_internal.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/seq_state.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/promise.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/promise/status_flag.h
- src/core/lib/promise/trace.h
- src/core/lib/resource_quota/arena.h
- src/core/lib/resource_quota/memory_quota.h
- src/core/lib/resource_quota/periodic_update.h
- src/core/lib/resource_quota/trace.h
- src/core/lib/slice/percent_encoding.h
- src/core/lib/slice/slice.h
- src/core/lib/slice/slice_buffer.h
- src/core/lib/slice/slice_internal.h
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/lib/surface/channel_stack_type.h
- src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h
- src/core/lib/transport/custom_metadata.h
- src/core/lib/transport/error_utils.h
- src/core/lib/transport/http2_errors.h
- src/core/lib/transport/message.h
- src/core/lib/transport/metadata.h
- src/core/lib/transport/metadata_batch.h
- src/core/lib/transport/metadata_compression_traits.h
- src/core/lib/transport/parsed_metadata.h
- src/core/lib/transport/simple_slice_based_metadata.h
- src/core/lib/transport/status_conversion.h
- src/core/lib/transport/timeout_encoding.h
- third_party/upb/upb/generated_code_support.h
- third_party/upb/upb/message/accessors.h
- third_party/upb/upb/message/internal/accessors.h
- third_party/upb/upb/mini_descriptor/build_enum.h
- third_party/upb/upb/mini_descriptor/decode.h
- third_party/upb/upb/mini_descriptor/internal/base92.h
- third_party/upb/upb/mini_descriptor/internal/decoder.h
- third_party/upb/upb/mini_descriptor/internal/encode.h
- third_party/upb/upb/mini_descriptor/internal/encode.hpp
- third_party/upb/upb/mini_descriptor/internal/modifiers.h
- third_party/upb/upb/mini_descriptor/internal/wire_constants.h
- third_party/upb/upb/mini_descriptor/link.h
- third_party/upb/upb/wire/decode.h
- third_party/upb/upb/wire/decode_fast.h
- third_party/upb/upb/wire/encode.h
- third_party/upb/upb/wire/eps_copy_input_stream.h
- third_party/upb/upb/wire/internal/constants.h
- third_party/upb/upb/wire/internal/decode.h
- third_party/upb/upb/wire/internal/swap.h
- third_party/upb/upb/wire/reader.h
- third_party/upb/upb/wire/types.h
src:
- src/core/ext/upb-gen/google/protobuf/any.upb_minitable.c
- src/core/ext/upb-gen/google/protobuf/descriptor.upb_minitable.c
- src/core/ext/upb-gen/google/rpc/status.upb_minitable.c
- src/core/lib/channel/channel_args.cc
- src/core/lib/compression/compression_internal.cc
- src/core/lib/debug/trace.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/gprpp/ref_counted_string.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
- src/core/lib/iomgr/closure.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/iomgr_internal.cc
- src/core/lib/promise/activity.cc
- src/core/lib/promise/trace.cc
- src/core/lib/resource_quota/arena.cc
- src/core/lib/resource_quota/memory_quota.cc
- src/core/lib/resource_quota/periodic_update.cc
- src/core/lib/resource_quota/trace.cc
- src/core/lib/slice/percent_encoding.cc
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_buffer.cc
- src/core/lib/slice/slice_refcount.cc
- src/core/lib/slice/slice_string_helpers.cc
- src/core/lib/surface/channel_stack_type.cc
- src/core/lib/transport/call_filters.cc
- src/core/lib/transport/call_final_info.cc
- src/core/lib/transport/error_utils.cc
- src/core/lib/transport/message.cc
- src/core/lib/transport/metadata.cc
- src/core/lib/transport/metadata_batch.cc
- src/core/lib/transport/parsed_metadata.cc
- src/core/lib/transport/status_conversion.cc
- src/core/lib/transport/timeout_encoding.cc
- test/core/transport/call_filters_test.cc
- third_party/upb/upb/message/accessors.c
- third_party/upb/upb/mini_descriptor/build_enum.c
- third_party/upb/upb/mini_descriptor/decode.c
- third_party/upb/upb/mini_descriptor/internal/base92.c
- third_party/upb/upb/mini_descriptor/internal/encode.c
- third_party/upb/upb/mini_descriptor/link.c
- third_party/upb/upb/wire/decode.c
- third_party/upb/upb/wire/decode_fast.c
- third_party/upb/upb/wire/encode.c
- third_party/upb/upb/wire/eps_copy_input_stream.c
- third_party/upb/upb/wire/reader.c
deps:
- gtest
- upb_message_lib
- utf8_range_lib
- absl/container:inlined_vector
- absl/functional:function_ref
- absl/hash:hash
- absl/meta:type_traits
- absl/status:statusor
- absl/utility:utility
- gpr
uses_polling: false
- name: call_finalization_test
gtest: true
build: test

3
config.m4 generated

@ -834,12 +834,15 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/surface/version.cc \
src/core/lib/transport/batch_builder.cc \
src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_final_info.cc \
src/core/lib/transport/connectivity_state.cc \
src/core/lib/transport/error_utils.cc \
src/core/lib/transport/handshaker.cc \
src/core/lib/transport/handshaker_registry.cc \
src/core/lib/transport/http_connect_handshaker.cc \
src/core/lib/transport/message.cc \
src/core/lib/transport/metadata.cc \
src/core/lib/transport/metadata_batch.cc \
src/core/lib/transport/parsed_metadata.cc \
src/core/lib/transport/status_conversion.cc \

3
config.w32 generated

@ -799,12 +799,15 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\surface\\version.cc " +
"src\\core\\lib\\transport\\batch_builder.cc " +
"src\\core\\lib\\transport\\bdp_estimator.cc " +
"src\\core\\lib\\transport\\call_filters.cc " +
"src\\core\\lib\\transport\\call_final_info.cc " +
"src\\core\\lib\\transport\\connectivity_state.cc " +
"src\\core\\lib\\transport\\error_utils.cc " +
"src\\core\\lib\\transport\\handshaker.cc " +
"src\\core\\lib\\transport\\handshaker_registry.cc " +
"src\\core\\lib\\transport\\http_connect_handshaker.cc " +
"src\\core\\lib\\transport\\message.cc " +
"src\\core\\lib\\transport\\metadata.cc " +
"src\\core\\lib\\transport\\metadata_batch.cc " +
"src\\core\\lib\\transport\\parsed_metadata.cc " +
"src\\core\\lib\\transport\\status_conversion.cc " +

6
gRPC-C++.podspec generated

@ -1264,6 +1264,7 @@ Pod::Spec.new do |s|
'src/core/lib/surface/wait_for_cq_end_op.h',
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/connectivity_state.h',
'src/core/lib/transport/custom_metadata.h',
@ -1273,6 +1274,8 @@ Pod::Spec.new do |s|
'src/core/lib/transport/handshaker_registry.h',
'src/core/lib/transport/http2_errors.h',
'src/core/lib/transport/http_connect_handshaker.h',
'src/core/lib/transport/message.h',
'src/core/lib/transport/metadata.h',
'src/core/lib/transport/metadata_batch.h',
'src/core/lib/transport/metadata_compression_traits.h',
'src/core/lib/transport/parsed_metadata.h',
@ -2508,6 +2511,7 @@ Pod::Spec.new do |s|
'src/core/lib/surface/wait_for_cq_end_op.h',
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/connectivity_state.h',
'src/core/lib/transport/custom_metadata.h',
@ -2517,6 +2521,8 @@ Pod::Spec.new do |s|
'src/core/lib/transport/handshaker_registry.h',
'src/core/lib/transport/http2_errors.h',
'src/core/lib/transport/http_connect_handshaker.h',
'src/core/lib/transport/message.h',
'src/core/lib/transport/metadata.h',
'src/core/lib/transport/metadata_batch.h',
'src/core/lib/transport/metadata_compression_traits.h',
'src/core/lib/transport/parsed_metadata.h',

9
gRPC-Core.podspec generated

@ -1988,6 +1988,8 @@ Pod::Spec.new do |s|
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.cc',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_filters.cc',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.cc',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/connectivity_state.cc',
@ -2003,6 +2005,10 @@ Pod::Spec.new do |s|
'src/core/lib/transport/http2_errors.h',
'src/core/lib/transport/http_connect_handshaker.cc',
'src/core/lib/transport/http_connect_handshaker.h',
'src/core/lib/transport/message.cc',
'src/core/lib/transport/message.h',
'src/core/lib/transport/metadata.cc',
'src/core/lib/transport/metadata.h',
'src/core/lib/transport/metadata_batch.cc',
'src/core/lib/transport/metadata_batch.h',
'src/core/lib/transport/metadata_compression_traits.h',
@ -3279,6 +3285,7 @@ Pod::Spec.new do |s|
'src/core/lib/surface/wait_for_cq_end_op.h',
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/connectivity_state.h',
'src/core/lib/transport/custom_metadata.h',
@ -3288,6 +3295,8 @@ Pod::Spec.new do |s|
'src/core/lib/transport/handshaker_registry.h',
'src/core/lib/transport/http2_errors.h',
'src/core/lib/transport/http_connect_handshaker.h',
'src/core/lib/transport/message.h',
'src/core/lib/transport/metadata.h',
'src/core/lib/transport/metadata_batch.h',
'src/core/lib/transport/metadata_compression_traits.h',
'src/core/lib/transport/parsed_metadata.h',

6
grpc.gemspec generated

@ -1891,6 +1891,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/transport/batch_builder.h )
s.files += %w( src/core/lib/transport/bdp_estimator.cc )
s.files += %w( src/core/lib/transport/bdp_estimator.h )
s.files += %w( src/core/lib/transport/call_filters.cc )
s.files += %w( src/core/lib/transport/call_filters.h )
s.files += %w( src/core/lib/transport/call_final_info.cc )
s.files += %w( src/core/lib/transport/call_final_info.h )
s.files += %w( src/core/lib/transport/connectivity_state.cc )
@ -1906,6 +1908,10 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/transport/http2_errors.h )
s.files += %w( src/core/lib/transport/http_connect_handshaker.cc )
s.files += %w( src/core/lib/transport/http_connect_handshaker.h )
s.files += %w( src/core/lib/transport/message.cc )
s.files += %w( src/core/lib/transport/message.h )
s.files += %w( src/core/lib/transport/metadata.cc )
s.files += %w( src/core/lib/transport/metadata.h )
s.files += %w( src/core/lib/transport/metadata_batch.cc )
s.files += %w( src/core/lib/transport/metadata_batch.h )
s.files += %w( src/core/lib/transport/metadata_compression_traits.h )

9
grpc.gyp generated

@ -1019,12 +1019,15 @@
'src/core/lib/surface/version.cc',
'src/core/lib/transport/batch_builder.cc',
'src/core/lib/transport/bdp_estimator.cc',
'src/core/lib/transport/call_filters.cc',
'src/core/lib/transport/call_final_info.cc',
'src/core/lib/transport/connectivity_state.cc',
'src/core/lib/transport/error_utils.cc',
'src/core/lib/transport/handshaker.cc',
'src/core/lib/transport/handshaker_registry.cc',
'src/core/lib/transport/http_connect_handshaker.cc',
'src/core/lib/transport/message.cc',
'src/core/lib/transport/metadata.cc',
'src/core/lib/transport/metadata_batch.cc',
'src/core/lib/transport/parsed_metadata.cc',
'src/core/lib/transport/status_conversion.cc',
@ -1504,12 +1507,15 @@
'src/core/lib/surface/version.cc',
'src/core/lib/transport/batch_builder.cc',
'src/core/lib/transport/bdp_estimator.cc',
'src/core/lib/transport/call_filters.cc',
'src/core/lib/transport/call_final_info.cc',
'src/core/lib/transport/connectivity_state.cc',
'src/core/lib/transport/error_utils.cc',
'src/core/lib/transport/handshaker.cc',
'src/core/lib/transport/handshaker_registry.cc',
'src/core/lib/transport/http_connect_handshaker.cc',
'src/core/lib/transport/message.cc',
'src/core/lib/transport/metadata.cc',
'src/core/lib/transport/metadata_batch.cc',
'src/core/lib/transport/parsed_metadata.cc',
'src/core/lib/transport/status_conversion.cc',
@ -2268,11 +2274,14 @@
'src/core/lib/surface/validate_metadata.cc',
'src/core/lib/surface/version.cc',
'src/core/lib/transport/batch_builder.cc',
'src/core/lib/transport/call_filters.cc',
'src/core/lib/transport/call_final_info.cc',
'src/core/lib/transport/connectivity_state.cc',
'src/core/lib/transport/error_utils.cc',
'src/core/lib/transport/handshaker.cc',
'src/core/lib/transport/handshaker_registry.cc',
'src/core/lib/transport/message.cc',
'src/core/lib/transport/metadata.cc',
'src/core/lib/transport/metadata_batch.cc',
'src/core/lib/transport/parsed_metadata.cc',
'src/core/lib/transport/status_conversion.cc',

6
package.xml generated

@ -1873,6 +1873,8 @@
<file baseinstalldir="/" name="src/core/lib/transport/batch_builder.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/bdp_estimator.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/bdp_estimator.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_filters.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_filters.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_final_info.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_final_info.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/connectivity_state.cc" role="src" />
@ -1888,6 +1890,10 @@
<file baseinstalldir="/" name="src/core/lib/transport/http2_errors.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/http_connect_handshaker.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/http_connect_handshaker.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/message.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/message.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/metadata.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/metadata.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/metadata_batch.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/metadata_batch.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/metadata_compression_traits.h" role="src" />

@ -6649,6 +6649,26 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "call_filters",
srcs = [
"lib/transport/call_filters.cc",
],
hdrs = [
"lib/transport/call_filters.h",
],
deps = [
"call_final_info",
"message",
"metadata",
"ref_counted",
"status_flag",
"//:gpr",
"//:promise",
"//:ref_counted_ptr",
],
)
grpc_cc_library(
name = "parsed_metadata",
srcs = [
@ -6669,6 +6689,37 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "metadata",
srcs = [
"lib/transport/metadata.cc",
],
hdrs = [
"lib/transport/metadata.h",
],
deps = [
"error_utils",
"metadata_batch",
"//:gpr_platform",
],
)
grpc_cc_library(
name = "message",
srcs = [
"lib/transport/message.cc",
],
hdrs = [
"lib/transport/message.h",
],
deps = [
"arena",
"slice_buffer",
"//:gpr_platform",
"//:grpc_public_hdrs",
],
)
grpc_cc_library(
name = "metadata_batch",
srcs = [

@ -67,6 +67,7 @@
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/transport/call_filters.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
@ -126,8 +127,6 @@ class ChannelFilter {
grpc_event_engine::experimental::GetDefaultEventEngine();
};
struct NoInterceptor {};
namespace promise_filter_detail {
// Determine if a list of interceptors has any that need to asyncronously error

@ -27,8 +27,18 @@
namespace grpc_core {
struct Failure {};
struct Success {};
struct Failure {
template <typename Sink>
friend void AbslStringify(Sink& sink, Failure) {
sink.Append("failed");
}
};
struct Success {
template <typename Sink>
friend void AbslStringify(Sink& sink, Success) {
sink.Append("ok");
}
};
inline bool IsStatusOk(Failure) { return false; }
inline bool IsStatusOk(Success) { return true; }
@ -68,10 +78,29 @@ class StatusFlag {
bool operator==(StatusFlag other) const { return value_ == other.value_; }
template <typename Sink>
friend void AbslStringify(Sink& sink, StatusFlag flag) {
if (flag.ok()) {
sink.Append("ok");
} else {
sink.Append("failed");
}
}
private:
bool value_;
};
inline bool operator==(StatusFlag flag, Failure) { return !flag.ok(); }
inline bool operator==(Failure, StatusFlag flag) { return !flag.ok(); }
inline bool operator==(StatusFlag flag, Success) { return flag.ok(); }
inline bool operator==(Success, StatusFlag flag) { return flag.ok(); }
inline bool operator!=(StatusFlag flag, Failure) { return flag.ok(); }
inline bool operator!=(Failure, StatusFlag flag) { return flag.ok(); }
inline bool operator!=(StatusFlag flag, Success) { return !flag.ok(); }
inline bool operator!=(Success, StatusFlag flag) { return !flag.ok(); }
inline bool IsStatusOk(const StatusFlag& flag) { return flag.ok(); }
template <>

@ -0,0 +1,343 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/transport/call_filters.h"
#include "src/core/lib/gprpp/crash.h"
namespace grpc_core {
namespace {
void* Offset(void* base, size_t amt) { return static_cast<char*>(base) + amt; }
} // namespace
namespace filters_detail {
template <typename T>
OperationExecutor<T>::~OperationExecutor() {
if (promise_data_ != nullptr) {
ops_->early_destroy(promise_data_);
gpr_free_aligned(promise_data_);
}
}
template <typename T>
Poll<ResultOr<T>> OperationExecutor<T>::Start(
const Layout<FallibleOperator<T>>* layout, T input, void* call_data) {
ops_ = layout->ops.data();
end_ops_ = ops_ + layout->ops.size();
if (layout->promise_size == 0) {
// No call state ==> instantaneously ready
auto r = InitStep(std::move(input), call_data);
GPR_ASSERT(r.ready());
return r;
}
promise_data_ =
gpr_malloc_aligned(layout->promise_size, layout->promise_alignment);
return InitStep(std::move(input), call_data);
}
template <typename T>
Poll<ResultOr<T>> OperationExecutor<T>::InitStep(T input, void* call_data) {
while (true) {
if (ops_ == end_ops_) {
return ResultOr<T>{std::move(input), nullptr};
}
auto p =
ops_->promise_init(promise_data_, Offset(call_data, ops_->call_offset),
ops_->channel_data, std::move(input));
if (auto* r = p.value_if_ready()) {
if (r->ok == nullptr) return std::move(*r);
input = std::move(r->ok);
++ops_;
continue;
}
return Pending{};
}
}
template <typename T>
Poll<ResultOr<T>> OperationExecutor<T>::Step(void* call_data) {
GPR_DEBUG_ASSERT(promise_data_ != nullptr);
auto p = ContinueStep(call_data);
if (p.ready()) {
gpr_free_aligned(promise_data_);
promise_data_ = nullptr;
}
return p;
}
template <typename T>
Poll<ResultOr<T>> OperationExecutor<T>::ContinueStep(void* call_data) {
auto p = ops_->poll(promise_data_);
if (auto* r = p.value_if_ready()) {
if (r->ok == nullptr) return std::move(*r);
++ops_;
return InitStep(std::move(r->ok), call_data);
}
return Pending{};
}
template <typename T>
InfallibleOperationExecutor<T>::~InfallibleOperationExecutor() {
if (promise_data_ != nullptr) {
ops_->early_destroy(promise_data_);
gpr_free_aligned(promise_data_);
}
}
template <typename T>
Poll<T> InfallibleOperationExecutor<T>::Start(
const Layout<InfallibleOperator<T>>* layout, T input, void* call_data) {
ops_ = layout->ops.data();
end_ops_ = ops_ + layout->ops.size();
if (layout->promise_size == 0) {
// No call state ==> instantaneously ready
auto r = InitStep(std::move(input), call_data);
GPR_ASSERT(r.ready());
return r;
}
promise_data_ =
gpr_malloc_aligned(layout->promise_size, layout->promise_alignment);
return InitStep(std::move(input), call_data);
}
template <typename T>
Poll<T> InfallibleOperationExecutor<T>::InitStep(T input, void* call_data) {
while (true) {
if (ops_ == end_ops_) {
return input;
}
auto p =
ops_->promise_init(promise_data_, Offset(call_data, ops_->call_offset),
ops_->channel_data, std::move(input));
if (auto* r = p.value_if_ready()) {
input = std::move(*r);
++ops_;
continue;
}
return Pending{};
}
}
template <typename T>
Poll<T> InfallibleOperationExecutor<T>::Step(void* call_data) {
GPR_DEBUG_ASSERT(promise_data_ != nullptr);
auto p = ContinueStep(call_data);
if (p.ready()) {
gpr_free_aligned(promise_data_);
promise_data_ = nullptr;
}
return p;
}
template <typename T>
Poll<T> InfallibleOperationExecutor<T>::ContinueStep(void* call_data) {
auto p = ops_->poll(promise_data_);
if (auto* r = p.value_if_ready()) {
++ops_;
return InitStep(std::move(*r), call_data);
}
return Pending{};
}
// Explicit instantiations of some types used in filters.h
// We'll need to add ServerMetadataHandle to this when it becomes different
// to ClientMetadataHandle
template class OperationExecutor<ClientMetadataHandle>;
template class OperationExecutor<MessageHandle>;
template class InfallibleOperationExecutor<ServerMetadataHandle>;
} // namespace filters_detail
///////////////////////////////////////////////////////////////////////////////
// CallFilters
CallFilters::CallFilters() : stack_(nullptr), call_data_(nullptr) {}
CallFilters::CallFilters(RefCountedPtr<Stack> stack)
: stack_(std::move(stack)),
call_data_(gpr_malloc_aligned(stack_->data_.call_data_size,
stack_->data_.call_data_alignment)) {
client_initial_metadata_state_.Start();
client_to_server_message_state_.Start();
server_initial_metadata_state_.Start();
server_to_client_message_state_.Start();
}
CallFilters::~CallFilters() {
if (call_data_ != nullptr) gpr_free_aligned(call_data_);
}
void CallFilters::SetStack(RefCountedPtr<Stack> stack) {
if (call_data_ != nullptr) gpr_free_aligned(call_data_);
stack_ = std::move(stack);
call_data_ = gpr_malloc_aligned(stack_->data_.call_data_size,
stack_->data_.call_data_alignment);
client_initial_metadata_state_.Start();
client_to_server_message_state_.Start();
server_initial_metadata_state_.Start();
server_to_client_message_state_.Start();
}
void CallFilters::Finalize(const grpc_call_final_info* final_info) {
for (auto& finalizer : stack_->data_.finalizers) {
finalizer.final(Offset(call_data_, finalizer.call_offset),
finalizer.channel_data, final_info);
}
}
void CallFilters::CancelDueToFailedPipeOperation() {
// We expect something cancelled before now
if (server_trailing_metadata_ == nullptr) return;
gpr_log(GPR_DEBUG, "Cancelling due to failed pipe operation");
server_trailing_metadata_ =
ServerMetadataFromStatus(absl::CancelledError("Failed pipe operation"));
server_trailing_metadata_waiter_.Wake();
}
///////////////////////////////////////////////////////////////////////////////
// CallFilters::StackBuilder
RefCountedPtr<CallFilters::Stack> CallFilters::StackBuilder::Build() {
if (data_.call_data_size % data_.call_data_alignment != 0) {
data_.call_data_size += data_.call_data_alignment -
data_.call_data_size % data_.call_data_alignment;
}
// server -> client needs to be reversed so that we can iterate all stacks
// in the same order
data_.server_initial_metadata.Reverse();
data_.server_to_client_messages.Reverse();
data_.server_trailing_metadata.Reverse();
return RefCountedPtr<Stack>(new Stack(std::move(data_)));
}
///////////////////////////////////////////////////////////////////////////////
// CallFilters::PipeState
void filters_detail::PipeState::Start() {
GPR_DEBUG_ASSERT(!started_);
started_ = true;
wait_recv_.Wake();
}
void filters_detail::PipeState::BeginPush() {
switch (state_) {
case ValueState::kIdle:
state_ = ValueState::kQueued;
break;
case ValueState::kWaiting:
state_ = ValueState::kReady;
wait_recv_.Wake();
break;
case ValueState::kClosed:
case ValueState::kError:
break;
case ValueState::kQueued:
case ValueState::kReady:
case ValueState::kProcessing:
Crash("Only one push allowed to be outstanding");
break;
}
}
void filters_detail::PipeState::DropPush() {
switch (state_) {
case ValueState::kQueued:
case ValueState::kReady:
case ValueState::kProcessing:
case ValueState::kWaiting:
state_ = ValueState::kError;
wait_recv_.Wake();
break;
case ValueState::kIdle:
case ValueState::kClosed:
case ValueState::kError:
break;
}
}
void filters_detail::PipeState::DropPull() {
switch (state_) {
case ValueState::kQueued:
case ValueState::kReady:
case ValueState::kProcessing:
case ValueState::kWaiting:
state_ = ValueState::kError;
wait_send_.Wake();
break;
case ValueState::kIdle:
case ValueState::kClosed:
case ValueState::kError:
break;
}
}
Poll<StatusFlag> filters_detail::PipeState::PollPush() {
switch (state_) {
case ValueState::kIdle:
// Read completed and new read started => we see waiting here
case ValueState::kWaiting:
case ValueState::kClosed:
return Success{};
case ValueState::kQueued:
case ValueState::kReady:
case ValueState::kProcessing:
return wait_send_.pending();
case ValueState::kError:
return Failure{};
}
GPR_UNREACHABLE_CODE(return Pending{});
}
Poll<StatusFlag> filters_detail::PipeState::PollPull() {
switch (state_) {
case ValueState::kWaiting:
return wait_recv_.pending();
case ValueState::kIdle:
state_ = ValueState::kWaiting;
return wait_recv_.pending();
case ValueState::kReady:
case ValueState::kQueued:
if (!started_) return wait_recv_.pending();
state_ = ValueState::kProcessing;
return Success{};
case ValueState::kProcessing:
Crash("Only one pull allowed to be outstanding");
case ValueState::kClosed:
case ValueState::kError:
return Failure{};
}
GPR_UNREACHABLE_CODE(return Pending{});
}
void filters_detail::PipeState::AckPull() {
switch (state_) {
case ValueState::kProcessing:
state_ = ValueState::kIdle;
wait_send_.Wake();
break;
case ValueState::kWaiting:
case ValueState::kIdle:
case ValueState::kQueued:
case ValueState::kReady:
case ValueState::kClosed:
Crash("AckPullValue called in invalid state");
case ValueState::kError:
break;
}
}
} // namespace grpc_core

File diff suppressed because it is too large Load Diff

@ -0,0 +1,43 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/transport/message.h"
#include <grpc/impl/grpc_types.h>
namespace grpc_core {
std::string Message::DebugString() const {
std::string out = absl::StrCat(payload_.Length(), "b");
auto flags = flags_;
auto explain = [&flags, &out](uint32_t flag, absl::string_view name) {
if (flags & flag) {
flags &= ~flag;
absl::StrAppend(&out, ":", name);
}
};
explain(GRPC_WRITE_BUFFER_HINT, "write_buffer");
explain(GRPC_WRITE_NO_COMPRESS, "no_compress");
explain(GRPC_WRITE_THROUGH, "write_through");
explain(GRPC_WRITE_INTERNAL_COMPRESS, "compress");
explain(GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED, "was_compressed");
if (flags != 0) {
absl::StrAppend(&out, ":huh=0x", absl::Hex(flags));
}
return out;
}
} // namespace grpc_core

@ -0,0 +1,61 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_TRANSPORT_MESSAGE_H
#define GRPC_SRC_CORE_LIB_TRANSPORT_MESSAGE_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
/// Internal bit flag for grpc_begin_message's \a flags signaling the use of
/// compression for the message. (Does not apply for stream compression.)
#define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u)
/// Internal bit flag for determining whether the message was compressed and had
/// to be decompressed by the message_decompress filter. (Does not apply for
/// stream compression.)
#define GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED (0x40000000u)
/// Mask of all valid internal flags.
#define GRPC_WRITE_INTERNAL_USED_MASK \
(GRPC_WRITE_INTERNAL_COMPRESS | GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED)
namespace grpc_core {
class Message {
public:
Message() = default;
~Message() = default;
Message(SliceBuffer payload, uint32_t flags)
: payload_(std::move(payload)), flags_(flags) {}
Message(const Message&) = delete;
Message& operator=(const Message&) = delete;
uint32_t flags() const { return flags_; }
uint32_t& mutable_flags() { return flags_; }
SliceBuffer* payload() { return &payload_; }
const SliceBuffer* payload() const { return &payload_; }
std::string DebugString() const;
private:
SliceBuffer payload_;
uint32_t flags_ = 0;
};
using MessageHandle = Arena::PoolPtr<Message>;
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_MESSAGE_H

@ -0,0 +1,37 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/transport/metadata.h"
#include "src/core/lib/transport/error_utils.h"
namespace grpc_core {
ServerMetadataHandle ServerMetadataFromStatus(const absl::Status& status,
Arena* arena) {
auto hdl = arena->MakePooled<ServerMetadata>(arena);
grpc_status_code code;
std::string message;
grpc_error_get_status(status, Timestamp::InfFuture(), &code, &message,
nullptr, nullptr);
hdl->Set(GrpcStatusMetadata(), code);
if (!status.ok()) {
hdl->Set(GrpcMessageMetadata(), Slice::FromCopiedString(message));
}
return hdl;
}
} // namespace grpc_core

@ -0,0 +1,78 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_TRANSPORT_METADATA_H
#define GRPC_SRC_CORE_LIB_TRANSPORT_METADATA_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/transport/metadata_batch.h"
namespace grpc_core {
// Server metadata type
// TODO(ctiller): This should be a bespoke instance of MetadataMap<>
using ServerMetadata = grpc_metadata_batch;
using ServerMetadataHandle = Arena::PoolPtr<ServerMetadata>;
// Client initial metadata type
// TODO(ctiller): This should be a bespoke instance of MetadataMap<>
using ClientMetadata = grpc_metadata_batch;
using ClientMetadataHandle = Arena::PoolPtr<ClientMetadata>;
// Ok/not-ok check for trailing metadata, so that it can be used as result types
// for TrySeq.
inline bool IsStatusOk(const ServerMetadataHandle& m) {
return m->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN) ==
GRPC_STATUS_OK;
}
ServerMetadataHandle ServerMetadataFromStatus(
const absl::Status& status, Arena* arena = GetContext<Arena>());
template <>
struct StatusCastImpl<ServerMetadataHandle, absl::Status> {
static ServerMetadataHandle Cast(const absl::Status& m) {
return ServerMetadataFromStatus(m);
}
};
template <>
struct StatusCastImpl<ServerMetadataHandle, const absl::Status&> {
static ServerMetadataHandle Cast(const absl::Status& m) {
return ServerMetadataFromStatus(m);
}
};
template <>
struct StatusCastImpl<ServerMetadataHandle, absl::Status&> {
static ServerMetadataHandle Cast(const absl::Status& m) {
return ServerMetadataFromStatus(m);
}
};
// Anything that can be first cast to absl::Status can then be cast to
// ServerMetadataHandle.
template <typename T>
struct StatusCastImpl<
ServerMetadataHandle, T,
absl::void_t<decltype(&StatusCastImpl<absl::Status, T>::Cast)>> {
static ServerMetadataHandle Cast(const T& m) {
return ServerMetadataFromStatus(StatusCast<absl::Status>(m));
}
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_METADATA_H

@ -218,40 +218,6 @@ grpc_transport_stream_op_batch* grpc_make_transport_stream_op(
namespace grpc_core {
ServerMetadataHandle ServerMetadataFromStatus(const absl::Status& status,
Arena* arena) {
auto hdl = arena->MakePooled<ServerMetadata>(arena);
grpc_status_code code;
std::string message;
grpc_error_get_status(status, Timestamp::InfFuture(), &code, &message,
nullptr, nullptr);
hdl->Set(GrpcStatusMetadata(), code);
if (!status.ok()) {
hdl->Set(GrpcMessageMetadata(), Slice::FromCopiedString(message));
}
return hdl;
}
std::string Message::DebugString() const {
std::string out = absl::StrCat(payload_.Length(), "b");
auto flags = flags_;
auto explain = [&flags, &out](uint32_t flag, absl::string_view name) {
if (flags & flag) {
flags &= ~flag;
absl::StrAppend(&out, ":", name);
}
};
explain(GRPC_WRITE_BUFFER_HINT, "write_buffer");
explain(GRPC_WRITE_NO_COMPRESS, "no_compress");
explain(GRPC_WRITE_THROUGH, "write_through");
explain(GRPC_WRITE_INTERNAL_COMPRESS, "compress");
explain(GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED, "was_compressed");
if (flags != 0) {
absl::StrAppend(&out, ":huh=0x", absl::Hex(flags));
}
return out;
}
void ForwardCall(CallHandler call_handler, CallInitiator call_initiator,
ClientMetadataHandle client_initial_metadata) {
// Send initial metadata.

@ -63,6 +63,8 @@
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/call_final_info.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/message.h"
#include "src/core/lib/transport/metadata.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport_fwd.h"
@ -74,94 +76,8 @@
#define GRPC_ARG_TRANSPORT "grpc.internal.transport"
/// Internal bit flag for grpc_begin_message's \a flags signaling the use of
/// compression for the message. (Does not apply for stream compression.)
#define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u)
/// Internal bit flag for determining whether the message was compressed and had
/// to be decompressed by the message_decompress filter. (Does not apply for
/// stream compression.)
#define GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED (0x40000000u)
/// Mask of all valid internal flags.
#define GRPC_WRITE_INTERNAL_USED_MASK \
(GRPC_WRITE_INTERNAL_COMPRESS | GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED)
namespace grpc_core {
// Server metadata type
// TODO(ctiller): This should be a bespoke instance of MetadataMap<>
using ServerMetadata = grpc_metadata_batch;
using ServerMetadataHandle = Arena::PoolPtr<ServerMetadata>;
// Client initial metadata type
// TODO(ctiller): This should be a bespoke instance of MetadataMap<>
using ClientMetadata = grpc_metadata_batch;
using ClientMetadataHandle = Arena::PoolPtr<ClientMetadata>;
class Message {
public:
Message() = default;
~Message() = default;
Message(SliceBuffer payload, uint32_t flags)
: payload_(std::move(payload)), flags_(flags) {}
Message(const Message&) = delete;
Message& operator=(const Message&) = delete;
uint32_t flags() const { return flags_; }
uint32_t& mutable_flags() { return flags_; }
SliceBuffer* payload() { return &payload_; }
const SliceBuffer* payload() const { return &payload_; }
std::string DebugString() const;
private:
SliceBuffer payload_;
uint32_t flags_ = 0;
};
using MessageHandle = Arena::PoolPtr<Message>;
// Ok/not-ok check for trailing metadata, so that it can be used as result types
// for TrySeq.
inline bool IsStatusOk(const ServerMetadataHandle& m) {
return m->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN) ==
GRPC_STATUS_OK;
}
ServerMetadataHandle ServerMetadataFromStatus(
const absl::Status& status, Arena* arena = GetContext<Arena>());
template <>
struct StatusCastImpl<ServerMetadataHandle, absl::Status> {
static ServerMetadataHandle Cast(const absl::Status& m) {
return ServerMetadataFromStatus(m);
}
};
template <>
struct StatusCastImpl<ServerMetadataHandle, const absl::Status&> {
static ServerMetadataHandle Cast(const absl::Status& m) {
return ServerMetadataFromStatus(m);
}
};
template <>
struct StatusCastImpl<ServerMetadataHandle, absl::Status&> {
static ServerMetadataHandle Cast(const absl::Status& m) {
return ServerMetadataFromStatus(m);
}
};
// Anything that can be first cast to absl::Status can then be cast to
// ServerMetadataHandle.
template <typename T>
struct StatusCastImpl<
ServerMetadataHandle, T,
absl::void_t<decltype(StatusCast<absl::Status>(std::declval<T>()))>> {
static ServerMetadataHandle Cast(const T& m) {
return ServerMetadataFromStatus(StatusCast<absl::Status>(m));
}
};
// Move only type that tracks call startup.
// Allows observation of when client_initial_metadata has been processed by the
// end of the local call stack.

@ -808,12 +808,15 @@ CORE_SOURCE_FILES = [
'src/core/lib/surface/version.cc',
'src/core/lib/transport/batch_builder.cc',
'src/core/lib/transport/bdp_estimator.cc',
'src/core/lib/transport/call_filters.cc',
'src/core/lib/transport/call_final_info.cc',
'src/core/lib/transport/connectivity_state.cc',
'src/core/lib/transport/error_utils.cc',
'src/core/lib/transport/handshaker.cc',
'src/core/lib/transport/handshaker_registry.cc',
'src/core/lib/transport/http_connect_handshaker.cc',
'src/core/lib/transport/message.cc',
'src/core/lib/transport/metadata.cc',
'src/core/lib/transport/metadata_batch.cc',
'src/core/lib/transport/parsed_metadata.cc',
'src/core/lib/transport/status_conversion.cc',

@ -35,6 +35,20 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "call_filters_test",
srcs = ["call_filters_test.cc"],
external_deps = [
"gtest",
],
language = "C++",
uses_event_engine = False,
uses_polling = False,
deps = [
"//src/core:call_filters",
],
)
grpc_cc_test(
name = "connectivity_state_test",
srcs = ["connectivity_state_test.cc"],

File diff suppressed because it is too large Load Diff

@ -2890,6 +2890,8 @@ src/core/lib/transport/batch_builder.cc \
src/core/lib/transport/batch_builder.h \
src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/bdp_estimator.h \
src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_filters.h \
src/core/lib/transport/call_final_info.cc \
src/core/lib/transport/call_final_info.h \
src/core/lib/transport/connectivity_state.cc \
@ -2905,6 +2907,10 @@ src/core/lib/transport/handshaker_registry.h \
src/core/lib/transport/http2_errors.h \
src/core/lib/transport/http_connect_handshaker.cc \
src/core/lib/transport/http_connect_handshaker.h \
src/core/lib/transport/message.cc \
src/core/lib/transport/message.h \
src/core/lib/transport/metadata.cc \
src/core/lib/transport/metadata.h \
src/core/lib/transport/metadata_batch.cc \
src/core/lib/transport/metadata_batch.h \
src/core/lib/transport/metadata_compression_traits.h \

@ -2671,6 +2671,8 @@ src/core/lib/transport/batch_builder.cc \
src/core/lib/transport/batch_builder.h \
src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/bdp_estimator.h \
src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_filters.h \
src/core/lib/transport/call_final_info.cc \
src/core/lib/transport/call_final_info.h \
src/core/lib/transport/connectivity_state.cc \
@ -2686,6 +2688,10 @@ src/core/lib/transport/handshaker_registry.h \
src/core/lib/transport/http2_errors.h \
src/core/lib/transport/http_connect_handshaker.cc \
src/core/lib/transport/http_connect_handshaker.h \
src/core/lib/transport/message.cc \
src/core/lib/transport/message.h \
src/core/lib/transport/metadata.cc \
src/core/lib/transport/metadata.h \
src/core/lib/transport/metadata_batch.cc \
src/core/lib/transport/metadata_batch.h \
src/core/lib/transport/metadata_compression_traits.h \

@ -1301,6 +1301,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "call_filters_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save