[call-v3] Interception chain (#36414)

Introduce the interception chain type.
Also introduces the real call-v3 call spine based atop CallFilters.

Closes #36414

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36414 from ctiller:interception-chain 90c8e96973
PiperOrigin-RevId: 627784183
pull/36402/head
Craig Tiller 7 months ago committed by Copybara-Service
parent 459abbec5a
commit d52779da52
  1. 2
      BUILD
  2. 307
      CMakeLists.txt
  3. 2
      Makefile
  4. 4
      Package.swift
  5. 606
      build_autogenerated.yaml
  6. 2
      config.m4
  7. 2
      config.w32
  8. 4
      gRPC-C++.podspec
  9. 6
      gRPC-Core.podspec
  10. 4
      grpc.gemspec
  11. 4
      package.xml
  12. 36
      src/core/BUILD
  13. 41
      src/core/lib/channel/promise_based_filter.h
  14. 360
      src/core/lib/promise/detail/seq_state.h
  15. 19
      src/core/lib/promise/for_each.h
  16. 12
      src/core/lib/promise/party.cc
  17. 13
      src/core/lib/surface/call.cc
  18. 2
      src/core/lib/surface/legacy_channel.h
  19. 2
      src/core/lib/transport/call_arena_allocator.cc
  20. 27
      src/core/lib/transport/call_arena_allocator.h
  21. 31
      src/core/lib/transport/call_destination.h
  22. 21
      src/core/lib/transport/call_filters.cc
  23. 154
      src/core/lib/transport/call_filters.h
  24. 13
      src/core/lib/transport/call_spine.cc
  25. 227
      src/core/lib/transport/call_spine.h
  26. 156
      src/core/lib/transport/interception_chain.cc
  27. 225
      src/core/lib/transport/interception_chain.h
  28. 2
      src/python/grpcio/grpc_core_dependencies.py
  29. 10
      test/core/promise/BUILD
  30. 29
      test/core/promise/observable_test.cc
  31. 60
      test/core/promise/poll_matcher.h
  32. 18
      test/core/transport/BUILD
  33. 30
      test/core/transport/call_filters_test.cc
  34. 57
      test/core/transport/chaotic_good/client_transport_error_test.cc
  35. 22
      test/core/transport/chaotic_good/client_transport_test.cc
  36. 8
      test/core/transport/chaotic_good/server_transport_test.cc
  37. 21
      test/core/transport/chaotic_good/transport_test.h
  38. 406
      test/core/transport/interception_chain_test.cc
  39. 10
      test/core/transport/test_suite/test.cc
  40. 29
      test/core/transport/test_suite/test.h
  41. 4
      tools/doxygen/Doxyfile.c++.internal
  42. 4
      tools/doxygen/Doxyfile.core.internal
  43. 24
      tools/run_tests/generated/tests.json

@ -1796,7 +1796,7 @@ grpc_cc_library(
"ref_counted_ptr",
"stats",
"//src/core:arena",
"//src/core:call_size_estimator",
"//src/core:call_arena_allocator",
"//src/core:channel_args",
"//src/core:channel_fwd",
"//src/core:channel_init",

307
CMakeLists.txt generated

@ -1169,6 +1169,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx insecure_security_connector_test)
add_dependencies(buildtests_cxx inter_activity_latch_test)
add_dependencies(buildtests_cxx inter_activity_pipe_test)
add_dependencies(buildtests_cxx interception_chain_test)
add_dependencies(buildtests_cxx interceptor_list_test)
add_dependencies(buildtests_cxx interop_client)
add_dependencies(buildtests_cxx interop_server)
@ -2514,9 +2515,9 @@ add_library(grpc
src/core/lib/surface/wait_for_cq_end_op.cc
src/core/lib/transport/batch_builder.cc
src/core/lib/transport/bdp_estimator.cc
src/core/lib/transport/call_arena_allocator.cc
src/core/lib/transport/call_filters.cc
src/core/lib/transport/call_final_info.cc
src/core/lib/transport/call_size_estimator.cc
src/core/lib/transport/call_spine.cc
src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/endpoint_info_handshaker.cc
@ -3240,9 +3241,9 @@ add_library(grpc_unsecure
src/core/lib/surface/wait_for_cq_end_op.cc
src/core/lib/transport/batch_builder.cc
src/core/lib/transport/bdp_estimator.cc
src/core/lib/transport/call_arena_allocator.cc
src/core/lib/transport/call_filters.cc
src/core/lib/transport/call_final_info.cc
src/core/lib/transport/call_size_estimator.cc
src/core/lib/transport/call_spine.cc
src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/endpoint_info_handshaker.cc
@ -5349,6 +5350,7 @@ add_library(grpc_authorization_provider
src/core/lib/surface/version.cc
src/core/lib/surface/wait_for_cq_end_op.cc
src/core/lib/transport/batch_builder.cc
src/core/lib/transport/call_arena_allocator.cc
src/core/lib/transport/call_filters.cc
src/core/lib/transport/call_final_info.cc
src/core/lib/transport/call_spine.cc
@ -17784,6 +17786,307 @@ target_link_libraries(inter_activity_pipe_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(interception_chain_test
src/core/channelz/channel_trace.cc
src/core/channelz/channelz.cc
src/core/channelz/channelz_registry.cc
src/core/ext/upb-gen/google/protobuf/any.upb_minitable.c
src/core/ext/upb-gen/google/rpc/status.upb_minitable.c
src/core/ext/upb-gen/src/proto/grpc/gcp/altscontext.upb_minitable.c
src/core/ext/upb-gen/src/proto/grpc/gcp/handshaker.upb_minitable.c
src/core/ext/upb-gen/src/proto/grpc/gcp/transport_security_common.upb_minitable.c
src/core/lib/address_utils/parse_address.cc
src/core/lib/address_utils/sockaddr_utils.cc
src/core/lib/backoff/backoff.cc
src/core/lib/channel/call_tracer.cc
src/core/lib/channel/channel_args.cc
src/core/lib/channel/channel_args_preconditioning.cc
src/core/lib/channel/channel_stack.cc
src/core/lib/channel/channel_stack_builder.cc
src/core/lib/channel/channel_stack_builder_impl.cc
src/core/lib/channel/channel_stack_trace.cc
src/core/lib/channel/connected_channel.cc
src/core/lib/channel/metrics.cc
src/core/lib/channel/promise_based_filter.cc
src/core/lib/channel/status_util.cc
src/core/lib/compression/compression.cc
src/core/lib/compression/compression_internal.cc
src/core/lib/compression/message_compress.cc
src/core/lib/config/core_configuration.cc
src/core/lib/debug/event_log.cc
src/core/lib/debug/histogram_view.cc
src/core/lib/debug/stats.cc
src/core/lib/debug/stats_data.cc
src/core/lib/debug/trace.cc
src/core/lib/event_engine/ares_resolver.cc
src/core/lib/event_engine/cf_engine/cf_engine.cc
src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc
src/core/lib/event_engine/cf_engine/dns_service_resolver.cc
src/core/lib/event_engine/channel_args_endpoint_config.cc
src/core/lib/event_engine/default_event_engine.cc
src/core/lib/event_engine/default_event_engine_factory.cc
src/core/lib/event_engine/event_engine.cc
src/core/lib/event_engine/forkable.cc
src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc
src/core/lib/event_engine/posix_engine/internal_errqueue.cc
src/core/lib/event_engine/posix_engine/lockfree_event.cc
src/core/lib/event_engine/posix_engine/native_posix_dns_resolver.cc
src/core/lib/event_engine/posix_engine/posix_endpoint.cc
src/core/lib/event_engine/posix_engine/posix_engine.cc
src/core/lib/event_engine/posix_engine/posix_engine_listener.cc
src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc
src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
src/core/lib/event_engine/posix_engine/timer.cc
src/core/lib/event_engine/posix_engine/timer_heap.cc
src/core/lib/event_engine/posix_engine/timer_manager.cc
src/core/lib/event_engine/posix_engine/traced_buffer_list.cc
src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc
src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc
src/core/lib/event_engine/resolved_address.cc
src/core/lib/event_engine/shim.cc
src/core/lib/event_engine/slice.cc
src/core/lib/event_engine/slice_buffer.cc
src/core/lib/event_engine/tcp_socket_utils.cc
src/core/lib/event_engine/thread_pool/thread_count.cc
src/core/lib/event_engine/thread_pool/thread_pool_factory.cc
src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc
src/core/lib/event_engine/time_util.cc
src/core/lib/event_engine/trace.cc
src/core/lib/event_engine/utils.cc
src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
src/core/lib/event_engine/windows/iocp.cc
src/core/lib/event_engine/windows/native_windows_dns_resolver.cc
src/core/lib/event_engine/windows/win_socket.cc
src/core/lib/event_engine/windows/windows_endpoint.cc
src/core/lib/event_engine/windows/windows_engine.cc
src/core/lib/event_engine/windows/windows_listener.cc
src/core/lib/event_engine/work_queue/basic_work_queue.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/gprpp/load_file.cc
src/core/lib/gprpp/per_cpu.cc
src/core/lib/gprpp/ref_counted_string.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
src/core/lib/gprpp/time_averaged_stats.cc
src/core/lib/gprpp/validation_errors.cc
src/core/lib/gprpp/work_serializer.cc
src/core/lib/handshaker/proxy_mapper_registry.cc
src/core/lib/iomgr/buffer_list.cc
src/core/lib/iomgr/call_combiner.cc
src/core/lib/iomgr/cfstream_handle.cc
src/core/lib/iomgr/closure.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/dualstack_socket_posix.cc
src/core/lib/iomgr/endpoint.cc
src/core/lib/iomgr/endpoint_cfstream.cc
src/core/lib/iomgr/endpoint_pair_posix.cc
src/core/lib/iomgr/endpoint_pair_windows.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/error_cfstream.cc
src/core/lib/iomgr/ev_apple.cc
src/core/lib/iomgr/ev_epoll1_linux.cc
src/core/lib/iomgr/ev_poll_posix.cc
src/core/lib/iomgr/ev_posix.cc
src/core/lib/iomgr/ev_windows.cc
src/core/lib/iomgr/event_engine_shims/closure.cc
src/core/lib/iomgr/event_engine_shims/endpoint.cc
src/core/lib/iomgr/event_engine_shims/tcp_client.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/fork_posix.cc
src/core/lib/iomgr/fork_windows.cc
src/core/lib/iomgr/gethostname_fallback.cc
src/core/lib/iomgr/gethostname_host_name_max.cc
src/core/lib/iomgr/gethostname_sysconf.cc
src/core/lib/iomgr/grpc_if_nametoindex_posix.cc
src/core/lib/iomgr/grpc_if_nametoindex_unsupported.cc
src/core/lib/iomgr/internal_errqueue.cc
src/core/lib/iomgr/iocp_windows.cc
src/core/lib/iomgr/iomgr.cc
src/core/lib/iomgr/iomgr_internal.cc
src/core/lib/iomgr/iomgr_posix.cc
src/core/lib/iomgr/iomgr_posix_cfstream.cc
src/core/lib/iomgr/iomgr_windows.cc
src/core/lib/iomgr/lockfree_event.cc
src/core/lib/iomgr/polling_entity.cc
src/core/lib/iomgr/pollset.cc
src/core/lib/iomgr/pollset_set.cc
src/core/lib/iomgr/pollset_set_windows.cc
src/core/lib/iomgr/pollset_windows.cc
src/core/lib/iomgr/resolve_address.cc
src/core/lib/iomgr/resolve_address_posix.cc
src/core/lib/iomgr/resolve_address_windows.cc
src/core/lib/iomgr/sockaddr_utils_posix.cc
src/core/lib/iomgr/socket_factory_posix.cc
src/core/lib/iomgr/socket_mutator.cc
src/core/lib/iomgr/socket_utils_common_posix.cc
src/core/lib/iomgr/socket_utils_linux.cc
src/core/lib/iomgr/socket_utils_posix.cc
src/core/lib/iomgr/socket_utils_windows.cc
src/core/lib/iomgr/socket_windows.cc
src/core/lib/iomgr/systemd_utils.cc
src/core/lib/iomgr/tcp_client.cc
src/core/lib/iomgr/tcp_client_cfstream.cc
src/core/lib/iomgr/tcp_client_posix.cc
src/core/lib/iomgr/tcp_client_windows.cc
src/core/lib/iomgr/tcp_posix.cc
src/core/lib/iomgr/tcp_server.cc
src/core/lib/iomgr/tcp_server_posix.cc
src/core/lib/iomgr/tcp_server_utils_posix_common.cc
src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.cc
src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.cc
src/core/lib/iomgr/tcp_server_windows.cc
src/core/lib/iomgr/tcp_windows.cc
src/core/lib/iomgr/timer.cc
src/core/lib/iomgr/timer_generic.cc
src/core/lib/iomgr/timer_heap.cc
src/core/lib/iomgr/timer_manager.cc
src/core/lib/iomgr/unix_sockets_posix.cc
src/core/lib/iomgr/unix_sockets_posix_noop.cc
src/core/lib/iomgr/vsock.cc
src/core/lib/iomgr/wakeup_fd_eventfd.cc
src/core/lib/iomgr/wakeup_fd_nospecial.cc
src/core/lib/iomgr/wakeup_fd_pipe.cc
src/core/lib/iomgr/wakeup_fd_posix.cc
src/core/lib/json/json_writer.cc
src/core/lib/promise/activity.cc
src/core/lib/promise/party.cc
src/core/lib/promise/trace.cc
src/core/lib/resource_quota/api.cc
src/core/lib/resource_quota/arena.cc
src/core/lib/resource_quota/connection_quota.cc
src/core/lib/resource_quota/memory_quota.cc
src/core/lib/resource_quota/periodic_update.cc
src/core/lib/resource_quota/resource_quota.cc
src/core/lib/resource_quota/thread_quota.cc
src/core/lib/resource_quota/trace.cc
src/core/lib/security/certificate_provider/certificate_provider_registry.cc
src/core/lib/security/credentials/alts/check_gcp_environment.cc
src/core/lib/security/credentials/alts/check_gcp_environment_linux.cc
src/core/lib/security/credentials/alts/check_gcp_environment_no_op.cc
src/core/lib/security/credentials/alts/check_gcp_environment_windows.cc
src/core/lib/security/credentials/alts/grpc_alts_credentials_client_options.cc
src/core/lib/security/credentials/alts/grpc_alts_credentials_options.cc
src/core/lib/security/credentials/alts/grpc_alts_credentials_server_options.cc
src/core/lib/slice/percent_encoding.cc
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_buffer.cc
src/core/lib/slice/slice_refcount.cc
src/core/lib/slice/slice_string_helpers.cc
src/core/lib/surface/api_trace.cc
src/core/lib/surface/byte_buffer.cc
src/core/lib/surface/byte_buffer_reader.cc
src/core/lib/surface/call.cc
src/core/lib/surface/call_details.cc
src/core/lib/surface/call_log_batch.cc
src/core/lib/surface/channel.cc
src/core/lib/surface/channel_init.cc
src/core/lib/surface/channel_stack_type.cc
src/core/lib/surface/completion_queue.cc
src/core/lib/surface/completion_queue_factory.cc
src/core/lib/surface/event_string.cc
src/core/lib/surface/init_internally.cc
src/core/lib/surface/lame_client.cc
src/core/lib/surface/metadata_array.cc
src/core/lib/surface/validate_metadata.cc
src/core/lib/surface/version.cc
src/core/lib/surface/wait_for_cq_end_op.cc
src/core/lib/transport/batch_builder.cc
src/core/lib/transport/call_arena_allocator.cc
src/core/lib/transport/call_filters.cc
src/core/lib/transport/call_final_info.cc
src/core/lib/transport/call_spine.cc
src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/error_utils.cc
src/core/lib/transport/handshaker_registry.cc
src/core/lib/transport/interception_chain.cc
src/core/lib/transport/message.cc
src/core/lib/transport/metadata.cc
src/core/lib/transport/metadata_batch.cc
src/core/lib/transport/parsed_metadata.cc
src/core/lib/transport/status_conversion.cc
src/core/lib/transport/timeout_encoding.cc
src/core/lib/transport/transport.cc
src/core/lib/transport/transport_op_string.cc
src/core/lib/uri/uri_parser.cc
src/core/load_balancing/lb_policy.cc
src/core/load_balancing/lb_policy_registry.cc
src/core/resolver/endpoint_addresses.cc
src/core/resolver/resolver.cc
src/core/resolver/resolver_registry.cc
src/core/service_config/service_config_parser.cc
src/core/tsi/alts/handshaker/transport_security_common_api.cc
test/core/transport/interception_chain_test.cc
third_party/upb/upb/mini_descriptor/build_enum.c
third_party/upb/upb/mini_descriptor/decode.c
third_party/upb/upb/mini_descriptor/internal/base92.c
third_party/upb/upb/mini_descriptor/internal/encode.c
third_party/upb/upb/mini_descriptor/link.c
third_party/upb/upb/wire/decode.c
third_party/upb/upb/wire/encode.c
third_party/upb/upb/wire/eps_copy_input_stream.c
third_party/upb/upb/wire/internal/decode_fast.c
third_party/upb/upb/wire/reader.c
)
if(WIN32 AND MSVC)
if(BUILD_SHARED_LIBS)
target_compile_definitions(interception_chain_test
PRIVATE
"GPR_DLL_IMPORTS"
)
endif()
endif()
target_compile_features(interception_chain_test PUBLIC cxx_std_14)
target_include_directories(interception_chain_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(interception_chain_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
utf8_range_lib
upb_message_lib
${_gRPC_ZLIB_LIBRARIES}
absl::config
absl::no_destructor
absl::cleanup
absl::flat_hash_map
absl::inlined_vector
absl::function_ref
absl::hash
absl::type_traits
absl::statusor
absl::span
absl::utility
${_gRPC_CARES_LIBRARIES}
gpr
${_gRPC_ADDRESS_SORTING_LIBRARIES}
)
endif()
if(gRPC_BUILD_TESTS)

2
Makefile generated

@ -1406,9 +1406,9 @@ LIBGRPC_SRC = \
src/core/lib/surface/wait_for_cq_end_op.cc \
src/core/lib/transport/batch_builder.cc \
src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/call_arena_allocator.cc \
src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_final_info.cc \
src/core/lib/transport/call_size_estimator.cc \
src/core/lib/transport/call_spine.cc \
src/core/lib/transport/connectivity_state.cc \
src/core/lib/transport/endpoint_info_handshaker.cc \

4
Package.swift generated

@ -1803,12 +1803,12 @@ let package = Package(
"src/core/lib/transport/batch_builder.h",
"src/core/lib/transport/bdp_estimator.cc",
"src/core/lib/transport/bdp_estimator.h",
"src/core/lib/transport/call_arena_allocator.cc",
"src/core/lib/transport/call_arena_allocator.h",
"src/core/lib/transport/call_filters.cc",
"src/core/lib/transport/call_filters.h",
"src/core/lib/transport/call_final_info.cc",
"src/core/lib/transport/call_final_info.h",
"src/core/lib/transport/call_size_estimator.cc",
"src/core/lib/transport/call_size_estimator.h",
"src/core/lib/transport/call_spine.cc",
"src/core/lib/transport/call_spine.h",
"src/core/lib/transport/connectivity_state.cc",

@ -1138,9 +1138,9 @@ libs:
- src/core/lib/surface/wait_for_cq_end_op.h
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/bdp_estimator.h
- src/core/lib/transport/call_arena_allocator.h
- src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_size_estimator.h
- src/core/lib/transport/call_spine.h
- src/core/lib/transport/connectivity_state.h
- src/core/lib/transport/custom_metadata.h
@ -1939,9 +1939,9 @@ libs:
- src/core/lib/surface/wait_for_cq_end_op.cc
- src/core/lib/transport/batch_builder.cc
- src/core/lib/transport/bdp_estimator.cc
- src/core/lib/transport/call_arena_allocator.cc
- src/core/lib/transport/call_filters.cc
- src/core/lib/transport/call_final_info.cc
- src/core/lib/transport/call_size_estimator.cc
- src/core/lib/transport/call_spine.cc
- src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/endpoint_info_handshaker.cc
@ -2605,9 +2605,9 @@ libs:
- src/core/lib/surface/wait_for_cq_end_op.h
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/bdp_estimator.h
- src/core/lib/transport/call_arena_allocator.h
- src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_size_estimator.h
- src/core/lib/transport/call_spine.h
- src/core/lib/transport/connectivity_state.h
- src/core/lib/transport/custom_metadata.h
@ -3024,9 +3024,9 @@ libs:
- src/core/lib/surface/wait_for_cq_end_op.cc
- src/core/lib/transport/batch_builder.cc
- src/core/lib/transport/bdp_estimator.cc
- src/core/lib/transport/call_arena_allocator.cc
- src/core/lib/transport/call_filters.cc
- src/core/lib/transport/call_final_info.cc
- src/core/lib/transport/call_size_estimator.cc
- src/core/lib/transport/call_spine.cc
- src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/endpoint_info_handshaker.cc
@ -4616,7 +4616,6 @@ libs:
- src/core/lib/promise/party.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/prioritized_race.h
- src/core/lib/promise/promise.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
@ -4684,6 +4683,7 @@ libs:
- src/core/lib/surface/validate_metadata.h
- src/core/lib/surface/wait_for_cq_end_op.h
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/call_arena_allocator.h
- src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_spine.h
@ -4981,6 +4981,7 @@ libs:
- src/core/lib/surface/version.cc
- src/core/lib/surface/wait_for_cq_end_op.cc
- src/core/lib/transport/batch_builder.cc
- src/core/lib/transport/call_arena_allocator.cc
- src/core/lib/transport/call_filters.cc
- src/core/lib/transport/call_final_info.cc
- src/core/lib/transport/call_spine.cc
@ -6454,6 +6455,7 @@ targets:
- src/core/lib/transport/simple_slice_based_metadata.h
- src/core/lib/transport/status_conversion.h
- src/core/lib/transport/timeout_encoding.h
- test/core/promise/poll_matcher.h
- third_party/upb/upb/generated_code_support.h
- third_party/upb/upb/mini_descriptor/build_enum.h
- third_party/upb/upb/mini_descriptor/decode.h
@ -11649,6 +11651,599 @@ targets:
- absl/status:statusor
- gpr
uses_polling: false
- name: interception_chain_test
gtest: true
build: test
language: c++
headers:
- src/core/channelz/channel_trace.h
- src/core/channelz/channelz.h
- src/core/channelz/channelz_registry.h
- src/core/ext/upb-gen/google/protobuf/any.upb.h
- src/core/ext/upb-gen/google/protobuf/any.upb_minitable.h
- src/core/ext/upb-gen/google/rpc/status.upb.h
- src/core/ext/upb-gen/google/rpc/status.upb_minitable.h
- src/core/ext/upb-gen/src/proto/grpc/gcp/altscontext.upb.h
- src/core/ext/upb-gen/src/proto/grpc/gcp/altscontext.upb_minitable.h
- src/core/ext/upb-gen/src/proto/grpc/gcp/handshaker.upb.h
- src/core/ext/upb-gen/src/proto/grpc/gcp/handshaker.upb_minitable.h
- src/core/ext/upb-gen/src/proto/grpc/gcp/transport_security_common.upb.h
- src/core/ext/upb-gen/src/proto/grpc/gcp/transport_security_common.upb_minitable.h
- src/core/lib/address_utils/parse_address.h
- src/core/lib/address_utils/sockaddr_utils.h
- src/core/lib/avl/avl.h
- src/core/lib/backoff/backoff.h
- src/core/lib/channel/call_finalization.h
- src/core/lib/channel/call_tracer.h
- src/core/lib/channel/channel_args.h
- src/core/lib/channel/channel_args_preconditioning.h
- src/core/lib/channel/channel_fwd.h
- src/core/lib/channel/channel_stack.h
- src/core/lib/channel/channel_stack_builder.h
- src/core/lib/channel/channel_stack_builder_impl.h
- src/core/lib/channel/channel_stack_trace.h
- src/core/lib/channel/connected_channel.h
- src/core/lib/channel/context.h
- src/core/lib/channel/metrics.h
- src/core/lib/channel/promise_based_filter.h
- src/core/lib/channel/status_util.h
- src/core/lib/channel/tcp_tracer.h
- src/core/lib/compression/compression_internal.h
- src/core/lib/compression/message_compress.h
- src/core/lib/config/core_configuration.h
- src/core/lib/debug/event_log.h
- src/core/lib/debug/histogram_view.h
- src/core/lib/debug/stats.h
- src/core/lib/debug/stats_data.h
- src/core/lib/debug/trace.h
- src/core/lib/event_engine/ares_resolver.h
- src/core/lib/event_engine/cf_engine/cf_engine.h
- src/core/lib/event_engine/cf_engine/cfstream_endpoint.h
- src/core/lib/event_engine/cf_engine/cftype_unique_ref.h
- src/core/lib/event_engine/cf_engine/dns_service_resolver.h
- src/core/lib/event_engine/channel_args_endpoint_config.h
- src/core/lib/event_engine/common_closures.h
- src/core/lib/event_engine/default_event_engine.h
- src/core/lib/event_engine/default_event_engine_factory.h
- src/core/lib/event_engine/event_engine_context.h
- src/core/lib/event_engine/extensions/can_track_errors.h
- src/core/lib/event_engine/extensions/chaotic_good_extension.h
- src/core/lib/event_engine/extensions/supports_fd.h
- src/core/lib/event_engine/forkable.h
- src/core/lib/event_engine/grpc_polled_fd.h
- src/core/lib/event_engine/handle_containers.h
- src/core/lib/event_engine/memory_allocator_factory.h
- src/core/lib/event_engine/nameser.h
- src/core/lib/event_engine/poller.h
- src/core/lib/event_engine/posix.h
- src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h
- src/core/lib/event_engine/posix_engine/ev_poll_posix.h
- src/core/lib/event_engine/posix_engine/event_poller.h
- src/core/lib/event_engine/posix_engine/event_poller_posix_default.h
- src/core/lib/event_engine/posix_engine/grpc_polled_fd_posix.h
- src/core/lib/event_engine/posix_engine/internal_errqueue.h
- src/core/lib/event_engine/posix_engine/lockfree_event.h
- src/core/lib/event_engine/posix_engine/native_posix_dns_resolver.h
- src/core/lib/event_engine/posix_engine/posix_endpoint.h
- src/core/lib/event_engine/posix_engine/posix_engine.h
- src/core/lib/event_engine/posix_engine/posix_engine_closure.h
- src/core/lib/event_engine/posix_engine/posix_engine_listener.h
- src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h
- src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
- src/core/lib/event_engine/posix_engine/timer.h
- src/core/lib/event_engine/posix_engine/timer_heap.h
- src/core/lib/event_engine/posix_engine/timer_manager.h
- src/core/lib/event_engine/posix_engine/traced_buffer_list.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h
- src/core/lib/event_engine/query_extensions.h
- src/core/lib/event_engine/ref_counted_dns_resolver_interface.h
- src/core/lib/event_engine/resolved_address_internal.h
- src/core/lib/event_engine/shim.h
- src/core/lib/event_engine/tcp_socket_utils.h
- src/core/lib/event_engine/thread_pool/thread_count.h
- src/core/lib/event_engine/thread_pool/thread_pool.h
- src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h
- src/core/lib/event_engine/thready_event_engine/thready_event_engine.h
- src/core/lib/event_engine/time_util.h
- src/core/lib/event_engine/trace.h
- src/core/lib/event_engine/utils.h
- src/core/lib/event_engine/windows/grpc_polled_fd_windows.h
- src/core/lib/event_engine/windows/iocp.h
- src/core/lib/event_engine/windows/native_windows_dns_resolver.h
- src/core/lib/event_engine/windows/win_socket.h
- src/core/lib/event_engine/windows/windows_endpoint.h
- src/core/lib/event_engine/windows/windows_engine.h
- src/core/lib/event_engine/windows/windows_listener.h
- src/core/lib/event_engine/work_queue/basic_work_queue.h
- src/core/lib/event_engine/work_queue/work_queue.h
- src/core/lib/experiments/config.h
- src/core/lib/experiments/experiments.h
- src/core/lib/gpr/spinlock.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/chunked_vector.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/if_list.h
- src/core/lib/gprpp/load_file.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/match.h
- src/core/lib/gprpp/notification.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/overload.h
- src/core/lib/gprpp/packed_table.h
- src/core/lib/gprpp/per_cpu.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
- src/core/lib/gprpp/ref_counted_string.h
- src/core/lib/gprpp/sorted_pack.h
- src/core/lib/gprpp/status_helper.h
- src/core/lib/gprpp/table.h
- src/core/lib/gprpp/time.h
- src/core/lib/gprpp/time_averaged_stats.h
- src/core/lib/gprpp/type_list.h
- src/core/lib/gprpp/unique_type_name.h
- src/core/lib/gprpp/validation_errors.h
- src/core/lib/gprpp/work_serializer.h
- src/core/lib/handshaker/proxy_mapper.h
- src/core/lib/handshaker/proxy_mapper_registry.h
- src/core/lib/iomgr/block_annotate.h
- src/core/lib/iomgr/buffer_list.h
- src/core/lib/iomgr/call_combiner.h
- src/core/lib/iomgr/cfstream_handle.h
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/combiner.h
- src/core/lib/iomgr/dynamic_annotations.h
- src/core/lib/iomgr/endpoint.h
- src/core/lib/iomgr/endpoint_cfstream.h
- src/core/lib/iomgr/endpoint_pair.h
- src/core/lib/iomgr/error.h
- src/core/lib/iomgr/error_cfstream.h
- src/core/lib/iomgr/ev_apple.h
- src/core/lib/iomgr/ev_epoll1_linux.h
- src/core/lib/iomgr/ev_poll_posix.h
- src/core/lib/iomgr/ev_posix.h
- src/core/lib/iomgr/event_engine_shims/closure.h
- src/core/lib/iomgr/event_engine_shims/endpoint.h
- src/core/lib/iomgr/event_engine_shims/tcp_client.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/gethostname.h
- src/core/lib/iomgr/grpc_if_nametoindex.h
- src/core/lib/iomgr/internal_errqueue.h
- src/core/lib/iomgr/iocp_windows.h
- src/core/lib/iomgr/iomgr.h
- src/core/lib/iomgr/iomgr_fwd.h
- src/core/lib/iomgr/iomgr_internal.h
- src/core/lib/iomgr/lockfree_event.h
- src/core/lib/iomgr/nameser.h
- src/core/lib/iomgr/polling_entity.h
- src/core/lib/iomgr/pollset.h
- src/core/lib/iomgr/pollset_set.h
- src/core/lib/iomgr/pollset_set_windows.h
- src/core/lib/iomgr/pollset_windows.h
- src/core/lib/iomgr/port.h
- src/core/lib/iomgr/python_util.h
- src/core/lib/iomgr/resolve_address.h
- src/core/lib/iomgr/resolve_address_impl.h
- src/core/lib/iomgr/resolve_address_posix.h
- src/core/lib/iomgr/resolve_address_windows.h
- src/core/lib/iomgr/resolved_address.h
- src/core/lib/iomgr/sockaddr.h
- src/core/lib/iomgr/sockaddr_posix.h
- src/core/lib/iomgr/sockaddr_windows.h
- src/core/lib/iomgr/socket_factory_posix.h
- src/core/lib/iomgr/socket_mutator.h
- src/core/lib/iomgr/socket_utils.h
- src/core/lib/iomgr/socket_utils_posix.h
- src/core/lib/iomgr/socket_windows.h
- src/core/lib/iomgr/systemd_utils.h
- src/core/lib/iomgr/tcp_client.h
- src/core/lib/iomgr/tcp_client_posix.h
- src/core/lib/iomgr/tcp_posix.h
- src/core/lib/iomgr/tcp_server.h
- src/core/lib/iomgr/tcp_server_utils_posix.h
- src/core/lib/iomgr/tcp_windows.h
- src/core/lib/iomgr/timer.h
- src/core/lib/iomgr/timer_generic.h
- src/core/lib/iomgr/timer_heap.h
- src/core/lib/iomgr/timer_manager.h
- src/core/lib/iomgr/unix_sockets_posix.h
- src/core/lib/iomgr/vsock.h
- src/core/lib/iomgr/wakeup_fd_pipe.h
- src/core/lib/iomgr/wakeup_fd_posix.h
- src/core/lib/json/json.h
- src/core/lib/json/json_args.h
- src/core/lib/json/json_writer.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/all_ok.h
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/join_state.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/seq_state.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/for_each.h
- src/core/lib/promise/if.h
- src/core/lib/promise/interceptor_list.h
- src/core/lib/promise/latch.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/party.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/promise.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/promise/status_flag.h
- src/core/lib/promise/trace.h
- src/core/lib/promise/try_seq.h
- src/core/lib/resource_quota/api.h
- src/core/lib/resource_quota/arena.h
- src/core/lib/resource_quota/connection_quota.h
- src/core/lib/resource_quota/memory_quota.h
- src/core/lib/resource_quota/periodic_update.h
- src/core/lib/resource_quota/resource_quota.h
- src/core/lib/resource_quota/thread_quota.h
- src/core/lib/resource_quota/trace.h
- src/core/lib/security/certificate_provider/certificate_provider_factory.h
- src/core/lib/security/certificate_provider/certificate_provider_registry.h
- src/core/lib/security/credentials/alts/check_gcp_environment.h
- src/core/lib/security/credentials/alts/grpc_alts_credentials_options.h
- src/core/lib/security/credentials/channel_creds_registry.h
- src/core/lib/slice/percent_encoding.h
- src/core/lib/slice/slice.h
- src/core/lib/slice/slice_buffer.h
- src/core/lib/slice/slice_internal.h
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/lib/surface/api_trace.h
- src/core/lib/surface/call.h
- src/core/lib/surface/call_test_only.h
- src/core/lib/surface/call_trace.h
- src/core/lib/surface/channel.h
- src/core/lib/surface/channel_init.h
- src/core/lib/surface/channel_stack_type.h
- src/core/lib/surface/completion_queue.h
- src/core/lib/surface/completion_queue_factory.h
- src/core/lib/surface/event_string.h
- src/core/lib/surface/init.h
- src/core/lib/surface/init_internally.h
- src/core/lib/surface/lame_client.h
- src/core/lib/surface/server_interface.h
- src/core/lib/surface/validate_metadata.h
- src/core/lib/surface/wait_for_cq_end_op.h
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/call_arena_allocator.h
- src/core/lib/transport/call_destination.h
- src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_spine.h
- src/core/lib/transport/connectivity_state.h
- src/core/lib/transport/custom_metadata.h
- src/core/lib/transport/error_utils.h
- src/core/lib/transport/handshaker_factory.h
- src/core/lib/transport/handshaker_registry.h
- src/core/lib/transport/http2_errors.h
- src/core/lib/transport/interception_chain.h
- src/core/lib/transport/message.h
- src/core/lib/transport/metadata.h
- src/core/lib/transport/metadata_batch.h
- src/core/lib/transport/metadata_compression_traits.h
- src/core/lib/transport/parsed_metadata.h
- src/core/lib/transport/simple_slice_based_metadata.h
- src/core/lib/transport/status_conversion.h
- src/core/lib/transport/timeout_encoding.h
- src/core/lib/transport/transport.h
- src/core/lib/transport/transport_fwd.h
- src/core/lib/uri/uri_parser.h
- src/core/load_balancing/backend_metric_data.h
- src/core/load_balancing/lb_policy.h
- src/core/load_balancing/lb_policy_factory.h
- src/core/load_balancing/lb_policy_registry.h
- src/core/load_balancing/subchannel_interface.h
- src/core/resolver/endpoint_addresses.h
- src/core/resolver/resolver.h
- src/core/resolver/resolver_factory.h
- src/core/resolver/resolver_registry.h
- src/core/resolver/server_address.h
- src/core/service_config/service_config.h
- src/core/service_config/service_config_call_data.h
- src/core/service_config/service_config_parser.h
- src/core/tsi/alts/handshaker/transport_security_common_api.h
- test/core/promise/poll_matcher.h
- third_party/upb/upb/generated_code_support.h
- third_party/upb/upb/mini_descriptor/build_enum.h
- third_party/upb/upb/mini_descriptor/decode.h
- third_party/upb/upb/mini_descriptor/internal/base92.h
- third_party/upb/upb/mini_descriptor/internal/decoder.h
- third_party/upb/upb/mini_descriptor/internal/encode.h
- third_party/upb/upb/mini_descriptor/internal/encode.hpp
- third_party/upb/upb/mini_descriptor/internal/modifiers.h
- third_party/upb/upb/mini_descriptor/internal/wire_constants.h
- third_party/upb/upb/mini_descriptor/link.h
- third_party/upb/upb/wire/decode.h
- third_party/upb/upb/wire/encode.h
- third_party/upb/upb/wire/eps_copy_input_stream.h
- third_party/upb/upb/wire/internal/constants.h
- third_party/upb/upb/wire/internal/decode_fast.h
- third_party/upb/upb/wire/internal/decoder.h
- third_party/upb/upb/wire/internal/reader.h
- third_party/upb/upb/wire/reader.h
- third_party/upb/upb/wire/types.h
src:
- src/core/channelz/channel_trace.cc
- src/core/channelz/channelz.cc
- src/core/channelz/channelz_registry.cc
- src/core/ext/upb-gen/google/protobuf/any.upb_minitable.c
- src/core/ext/upb-gen/google/rpc/status.upb_minitable.c
- src/core/ext/upb-gen/src/proto/grpc/gcp/altscontext.upb_minitable.c
- src/core/ext/upb-gen/src/proto/grpc/gcp/handshaker.upb_minitable.c
- src/core/ext/upb-gen/src/proto/grpc/gcp/transport_security_common.upb_minitable.c
- src/core/lib/address_utils/parse_address.cc
- src/core/lib/address_utils/sockaddr_utils.cc
- src/core/lib/backoff/backoff.cc
- src/core/lib/channel/call_tracer.cc
- src/core/lib/channel/channel_args.cc
- src/core/lib/channel/channel_args_preconditioning.cc
- src/core/lib/channel/channel_stack.cc
- src/core/lib/channel/channel_stack_builder.cc
- src/core/lib/channel/channel_stack_builder_impl.cc
- src/core/lib/channel/channel_stack_trace.cc
- src/core/lib/channel/connected_channel.cc
- src/core/lib/channel/metrics.cc
- src/core/lib/channel/promise_based_filter.cc
- src/core/lib/channel/status_util.cc
- src/core/lib/compression/compression.cc
- src/core/lib/compression/compression_internal.cc
- src/core/lib/compression/message_compress.cc
- src/core/lib/config/core_configuration.cc
- src/core/lib/debug/event_log.cc
- src/core/lib/debug/histogram_view.cc
- src/core/lib/debug/stats.cc
- src/core/lib/debug/stats_data.cc
- src/core/lib/debug/trace.cc
- src/core/lib/event_engine/ares_resolver.cc
- src/core/lib/event_engine/cf_engine/cf_engine.cc
- src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc
- src/core/lib/event_engine/cf_engine/dns_service_resolver.cc
- src/core/lib/event_engine/channel_args_endpoint_config.cc
- src/core/lib/event_engine/default_event_engine.cc
- src/core/lib/event_engine/default_event_engine_factory.cc
- src/core/lib/event_engine/event_engine.cc
- src/core/lib/event_engine/forkable.cc
- src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
- src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
- src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc
- src/core/lib/event_engine/posix_engine/internal_errqueue.cc
- src/core/lib/event_engine/posix_engine/lockfree_event.cc
- src/core/lib/event_engine/posix_engine/native_posix_dns_resolver.cc
- src/core/lib/event_engine/posix_engine/posix_endpoint.cc
- src/core/lib/event_engine/posix_engine/posix_engine.cc
- src/core/lib/event_engine/posix_engine/posix_engine_listener.cc
- src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc
- src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
- src/core/lib/event_engine/posix_engine/timer.cc
- src/core/lib/event_engine/posix_engine/timer_heap.cc
- src/core/lib/event_engine/posix_engine/timer_manager.cc
- src/core/lib/event_engine/posix_engine/traced_buffer_list.cc
- src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc
- src/core/lib/event_engine/resolved_address.cc
- src/core/lib/event_engine/shim.cc
- src/core/lib/event_engine/slice.cc
- src/core/lib/event_engine/slice_buffer.cc
- src/core/lib/event_engine/tcp_socket_utils.cc
- src/core/lib/event_engine/thread_pool/thread_count.cc
- src/core/lib/event_engine/thread_pool/thread_pool_factory.cc
- src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
- src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc
- src/core/lib/event_engine/time_util.cc
- src/core/lib/event_engine/trace.cc
- src/core/lib/event_engine/utils.cc
- src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
- src/core/lib/event_engine/windows/iocp.cc
- src/core/lib/event_engine/windows/native_windows_dns_resolver.cc
- src/core/lib/event_engine/windows/win_socket.cc
- src/core/lib/event_engine/windows/windows_endpoint.cc
- src/core/lib/event_engine/windows/windows_engine.cc
- src/core/lib/event_engine/windows/windows_listener.cc
- src/core/lib/event_engine/work_queue/basic_work_queue.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/gprpp/load_file.cc
- src/core/lib/gprpp/per_cpu.cc
- src/core/lib/gprpp/ref_counted_string.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
- src/core/lib/gprpp/time_averaged_stats.cc
- src/core/lib/gprpp/validation_errors.cc
- src/core/lib/gprpp/work_serializer.cc
- src/core/lib/handshaker/proxy_mapper_registry.cc
- src/core/lib/iomgr/buffer_list.cc
- src/core/lib/iomgr/call_combiner.cc
- src/core/lib/iomgr/cfstream_handle.cc
- src/core/lib/iomgr/closure.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/dualstack_socket_posix.cc
- src/core/lib/iomgr/endpoint.cc
- src/core/lib/iomgr/endpoint_cfstream.cc
- src/core/lib/iomgr/endpoint_pair_posix.cc
- src/core/lib/iomgr/endpoint_pair_windows.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/error_cfstream.cc
- src/core/lib/iomgr/ev_apple.cc
- src/core/lib/iomgr/ev_epoll1_linux.cc
- src/core/lib/iomgr/ev_poll_posix.cc
- src/core/lib/iomgr/ev_posix.cc
- src/core/lib/iomgr/ev_windows.cc
- src/core/lib/iomgr/event_engine_shims/closure.cc
- src/core/lib/iomgr/event_engine_shims/endpoint.cc
- src/core/lib/iomgr/event_engine_shims/tcp_client.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/fork_posix.cc
- src/core/lib/iomgr/fork_windows.cc
- src/core/lib/iomgr/gethostname_fallback.cc
- src/core/lib/iomgr/gethostname_host_name_max.cc
- src/core/lib/iomgr/gethostname_sysconf.cc
- src/core/lib/iomgr/grpc_if_nametoindex_posix.cc
- src/core/lib/iomgr/grpc_if_nametoindex_unsupported.cc
- src/core/lib/iomgr/internal_errqueue.cc
- src/core/lib/iomgr/iocp_windows.cc
- src/core/lib/iomgr/iomgr.cc
- src/core/lib/iomgr/iomgr_internal.cc
- src/core/lib/iomgr/iomgr_posix.cc
- src/core/lib/iomgr/iomgr_posix_cfstream.cc
- src/core/lib/iomgr/iomgr_windows.cc
- src/core/lib/iomgr/lockfree_event.cc
- src/core/lib/iomgr/polling_entity.cc
- src/core/lib/iomgr/pollset.cc
- src/core/lib/iomgr/pollset_set.cc
- src/core/lib/iomgr/pollset_set_windows.cc
- src/core/lib/iomgr/pollset_windows.cc
- src/core/lib/iomgr/resolve_address.cc
- src/core/lib/iomgr/resolve_address_posix.cc
- src/core/lib/iomgr/resolve_address_windows.cc
- src/core/lib/iomgr/sockaddr_utils_posix.cc
- src/core/lib/iomgr/socket_factory_posix.cc
- src/core/lib/iomgr/socket_mutator.cc
- src/core/lib/iomgr/socket_utils_common_posix.cc
- src/core/lib/iomgr/socket_utils_linux.cc
- src/core/lib/iomgr/socket_utils_posix.cc
- src/core/lib/iomgr/socket_utils_windows.cc
- src/core/lib/iomgr/socket_windows.cc
- src/core/lib/iomgr/systemd_utils.cc
- src/core/lib/iomgr/tcp_client.cc
- src/core/lib/iomgr/tcp_client_cfstream.cc
- src/core/lib/iomgr/tcp_client_posix.cc
- src/core/lib/iomgr/tcp_client_windows.cc
- src/core/lib/iomgr/tcp_posix.cc
- src/core/lib/iomgr/tcp_server.cc
- src/core/lib/iomgr/tcp_server_posix.cc
- src/core/lib/iomgr/tcp_server_utils_posix_common.cc
- src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.cc
- src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.cc
- src/core/lib/iomgr/tcp_server_windows.cc
- src/core/lib/iomgr/tcp_windows.cc
- src/core/lib/iomgr/timer.cc
- src/core/lib/iomgr/timer_generic.cc
- src/core/lib/iomgr/timer_heap.cc
- src/core/lib/iomgr/timer_manager.cc
- src/core/lib/iomgr/unix_sockets_posix.cc
- src/core/lib/iomgr/unix_sockets_posix_noop.cc
- src/core/lib/iomgr/vsock.cc
- src/core/lib/iomgr/wakeup_fd_eventfd.cc
- src/core/lib/iomgr/wakeup_fd_nospecial.cc
- src/core/lib/iomgr/wakeup_fd_pipe.cc
- src/core/lib/iomgr/wakeup_fd_posix.cc
- src/core/lib/json/json_writer.cc
- src/core/lib/promise/activity.cc
- src/core/lib/promise/party.cc
- src/core/lib/promise/trace.cc
- src/core/lib/resource_quota/api.cc
- src/core/lib/resource_quota/arena.cc
- src/core/lib/resource_quota/connection_quota.cc
- src/core/lib/resource_quota/memory_quota.cc
- src/core/lib/resource_quota/periodic_update.cc
- src/core/lib/resource_quota/resource_quota.cc
- src/core/lib/resource_quota/thread_quota.cc
- src/core/lib/resource_quota/trace.cc
- src/core/lib/security/certificate_provider/certificate_provider_registry.cc
- src/core/lib/security/credentials/alts/check_gcp_environment.cc
- src/core/lib/security/credentials/alts/check_gcp_environment_linux.cc
- src/core/lib/security/credentials/alts/check_gcp_environment_no_op.cc
- src/core/lib/security/credentials/alts/check_gcp_environment_windows.cc
- src/core/lib/security/credentials/alts/grpc_alts_credentials_client_options.cc
- src/core/lib/security/credentials/alts/grpc_alts_credentials_options.cc
- src/core/lib/security/credentials/alts/grpc_alts_credentials_server_options.cc
- src/core/lib/slice/percent_encoding.cc
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_buffer.cc
- src/core/lib/slice/slice_refcount.cc
- src/core/lib/slice/slice_string_helpers.cc
- src/core/lib/surface/api_trace.cc
- src/core/lib/surface/byte_buffer.cc
- src/core/lib/surface/byte_buffer_reader.cc
- src/core/lib/surface/call.cc
- src/core/lib/surface/call_details.cc
- src/core/lib/surface/call_log_batch.cc
- src/core/lib/surface/channel.cc
- src/core/lib/surface/channel_init.cc
- src/core/lib/surface/channel_stack_type.cc
- src/core/lib/surface/completion_queue.cc
- src/core/lib/surface/completion_queue_factory.cc
- src/core/lib/surface/event_string.cc
- src/core/lib/surface/init_internally.cc
- src/core/lib/surface/lame_client.cc
- src/core/lib/surface/metadata_array.cc
- src/core/lib/surface/validate_metadata.cc
- src/core/lib/surface/version.cc
- src/core/lib/surface/wait_for_cq_end_op.cc
- src/core/lib/transport/batch_builder.cc
- src/core/lib/transport/call_arena_allocator.cc
- src/core/lib/transport/call_filters.cc
- src/core/lib/transport/call_final_info.cc
- src/core/lib/transport/call_spine.cc
- src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/error_utils.cc
- src/core/lib/transport/handshaker_registry.cc
- src/core/lib/transport/interception_chain.cc
- src/core/lib/transport/message.cc
- src/core/lib/transport/metadata.cc
- src/core/lib/transport/metadata_batch.cc
- src/core/lib/transport/parsed_metadata.cc
- src/core/lib/transport/status_conversion.cc
- src/core/lib/transport/timeout_encoding.cc
- src/core/lib/transport/transport.cc
- src/core/lib/transport/transport_op_string.cc
- src/core/lib/uri/uri_parser.cc
- src/core/load_balancing/lb_policy.cc
- src/core/load_balancing/lb_policy_registry.cc
- src/core/resolver/endpoint_addresses.cc
- src/core/resolver/resolver.cc
- src/core/resolver/resolver_registry.cc
- src/core/service_config/service_config_parser.cc
- src/core/tsi/alts/handshaker/transport_security_common_api.cc
- test/core/transport/interception_chain_test.cc
- third_party/upb/upb/mini_descriptor/build_enum.c
- third_party/upb/upb/mini_descriptor/decode.c
- third_party/upb/upb/mini_descriptor/internal/base92.c
- third_party/upb/upb/mini_descriptor/internal/encode.c
- third_party/upb/upb/mini_descriptor/link.c
- third_party/upb/upb/wire/decode.c
- third_party/upb/upb/wire/encode.c
- third_party/upb/upb/wire/eps_copy_input_stream.c
- third_party/upb/upb/wire/internal/decode_fast.c
- third_party/upb/upb/wire/reader.c
deps:
- gtest
- utf8_range_lib
- upb_message_lib
- z
- absl/base:config
- absl/base:no_destructor
- absl/cleanup:cleanup
- absl/container:flat_hash_map
- absl/container:inlined_vector
- absl/functional:function_ref
- absl/hash:hash
- absl/meta:type_traits
- absl/status:statusor
- absl/types:span
- absl/utility:utility
- cares
- gpr
- address_sorting
uses_polling: false
- name: interceptor_list_test
gtest: true
build: test
@ -13274,6 +13869,7 @@ targets:
- src/core/lib/promise/map.h
- src/core/lib/promise/observable.h
- src/core/lib/promise/poll.h
- test/core/promise/poll_matcher.h
src:
- src/core/lib/promise/activity.cc
- test/core/promise/observable_test.cc

2
config.m4 generated

@ -781,9 +781,9 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/surface/wait_for_cq_end_op.cc \
src/core/lib/transport/batch_builder.cc \
src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/call_arena_allocator.cc \
src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_final_info.cc \
src/core/lib/transport/call_size_estimator.cc \
src/core/lib/transport/call_spine.cc \
src/core/lib/transport/connectivity_state.cc \
src/core/lib/transport/endpoint_info_handshaker.cc \

2
config.w32 generated

@ -746,9 +746,9 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\surface\\wait_for_cq_end_op.cc " +
"src\\core\\lib\\transport\\batch_builder.cc " +
"src\\core\\lib\\transport\\bdp_estimator.cc " +
"src\\core\\lib\\transport\\call_arena_allocator.cc " +
"src\\core\\lib\\transport\\call_filters.cc " +
"src\\core\\lib\\transport\\call_final_info.cc " +
"src\\core\\lib\\transport\\call_size_estimator.cc " +
"src\\core\\lib\\transport\\call_spine.cc " +
"src\\core\\lib\\transport\\connectivity_state.cc " +
"src\\core\\lib\\transport\\endpoint_info_handshaker.cc " +

4
gRPC-C++.podspec generated

@ -1241,9 +1241,9 @@ Pod::Spec.new do |s|
'src/core/lib/surface/wait_for_cq_end_op.h',
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_arena_allocator.h',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_size_estimator.h',
'src/core/lib/transport/call_spine.h',
'src/core/lib/transport/connectivity_state.h',
'src/core/lib/transport/custom_metadata.h',
@ -2512,9 +2512,9 @@ Pod::Spec.new do |s|
'src/core/lib/surface/wait_for_cq_end_op.h',
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_arena_allocator.h',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_size_estimator.h',
'src/core/lib/transport/call_spine.h',
'src/core/lib/transport/connectivity_state.h',
'src/core/lib/transport/custom_metadata.h',

6
gRPC-Core.podspec generated

@ -1915,12 +1915,12 @@ Pod::Spec.new do |s|
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.cc',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_arena_allocator.cc',
'src/core/lib/transport/call_arena_allocator.h',
'src/core/lib/transport/call_filters.cc',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.cc',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_size_estimator.cc',
'src/core/lib/transport/call_size_estimator.h',
'src/core/lib/transport/call_spine.cc',
'src/core/lib/transport/call_spine.h',
'src/core/lib/transport/connectivity_state.cc',
@ -3291,9 +3291,9 @@ Pod::Spec.new do |s|
'src/core/lib/surface/wait_for_cq_end_op.h',
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_arena_allocator.h',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_size_estimator.h',
'src/core/lib/transport/call_spine.h',
'src/core/lib/transport/connectivity_state.h',
'src/core/lib/transport/custom_metadata.h',

4
grpc.gemspec generated

@ -1805,12 +1805,12 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/transport/batch_builder.h )
s.files += %w( src/core/lib/transport/bdp_estimator.cc )
s.files += %w( src/core/lib/transport/bdp_estimator.h )
s.files += %w( src/core/lib/transport/call_arena_allocator.cc )
s.files += %w( src/core/lib/transport/call_arena_allocator.h )
s.files += %w( src/core/lib/transport/call_filters.cc )
s.files += %w( src/core/lib/transport/call_filters.h )
s.files += %w( src/core/lib/transport/call_final_info.cc )
s.files += %w( src/core/lib/transport/call_final_info.h )
s.files += %w( src/core/lib/transport/call_size_estimator.cc )
s.files += %w( src/core/lib/transport/call_size_estimator.h )
s.files += %w( src/core/lib/transport/call_spine.cc )
s.files += %w( src/core/lib/transport/call_spine.h )
s.files += %w( src/core/lib/transport/connectivity_state.cc )

4
package.xml generated

@ -1787,12 +1787,12 @@
<file baseinstalldir="/" name="src/core/lib/transport/batch_builder.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/bdp_estimator.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/bdp_estimator.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_arena_allocator.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_arena_allocator.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_filters.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_filters.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_final_info.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_final_info.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_size_estimator.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_size_estimator.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_spine.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_spine.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/connectivity_state.cc" role="src" />

@ -7255,6 +7255,25 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "interception_chain",
srcs = [
"lib/transport/interception_chain.cc",
],
hdrs = [
"lib/transport/interception_chain.h",
],
deps = [
"call_destination",
"call_filters",
"call_spine",
"match",
"metadata",
"ref_counted",
"//:gpr_platform",
],
)
grpc_cc_library(
name = "call_destination",
hdrs = [
@ -7331,17 +7350,19 @@ grpc_cc_library(
],
deps = [
"1999",
"call_arena_allocator",
"call_filters",
"for_each",
"if",
"latch",
"message",
"metadata",
"pipe",
"prioritized_race",
"promise_status",
"status_flag",
"try_seq",
"//:gpr",
"//:legacy_context",
"//:promise",
],
)
@ -7405,14 +7426,19 @@ grpc_cc_library(
)
grpc_cc_library(
name = "call_size_estimator",
name = "call_arena_allocator",
srcs = [
"lib/transport/call_size_estimator.cc",
"lib/transport/call_arena_allocator.cc",
],
hdrs = [
"lib/transport/call_size_estimator.h",
"lib/transport/call_arena_allocator.h",
],
deps = [
"arena",
"memory_quota",
"ref_counted",
"//:gpr_platform",
],
deps = ["//:gpr_platform"],
)
grpc_cc_library(

@ -49,6 +49,7 @@
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/event_engine/event_engine_context.h" // IWYU pragma: keep
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
@ -84,12 +85,21 @@ class ChannelFilter {
class Args {
public:
Args() : Args(nullptr, nullptr) {}
explicit Args(grpc_channel_stack* channel_stack,
grpc_channel_element* channel_element)
: channel_stack_(channel_stack), channel_element_(channel_element) {}
Args(grpc_channel_stack* channel_stack,
grpc_channel_element* channel_element)
: impl_(ChannelStackBased{channel_stack, channel_element}) {}
// While we're moving to call-v3 we need to have access to
// grpc_channel_stack & friends here. That means that we can't rely on this
// type signature from interception_chain.h, which means that we need a way
// of constructing this object without naming it ===> implicit construction.
// TODO(ctiller): remove this once we're fully on call-v3
// NOLINTNEXTLINE(google-explicit-constructor)
Args(size_t instance_id) : impl_(V3Based{instance_id}) {}
ABSL_DEPRECATED("Direct access to channel stack is deprecated")
grpc_channel_stack* channel_stack() const { return channel_stack_; }
grpc_channel_stack* channel_stack() const {
return absl::get<ChannelStackBased>(impl_).channel_stack;
}
// Get the instance id of this filter.
// This id is unique amongst all filters /of the same type/ and densely
@ -99,14 +109,29 @@ class ChannelFilter {
// This is useful for filters that need to store per-instance data in a
// parallel data structure.
size_t instance_id() const {
return grpc_channel_stack_filter_instance_number(channel_stack_,
channel_element_);
return Match(
impl_,
[](const ChannelStackBased& cs) {
return grpc_channel_stack_filter_instance_number(
cs.channel_stack, cs.channel_element);
},
[](const V3Based& v3) { return v3.instance_id; });
}
private:
friend class ChannelFilter;
grpc_channel_stack* channel_stack_;
grpc_channel_element* channel_element_;
struct ChannelStackBased {
grpc_channel_stack* channel_stack;
grpc_channel_element* channel_element;
};
struct V3Based {
size_t instance_id;
};
using Impl = absl::variant<ChannelStackBased, V3Based>;
Impl impl_;
};
// Perform post-initialization step (if any).

File diff suppressed because it is too large Load Diff

@ -109,8 +109,10 @@ class ForEach {
public:
using Result =
typename PollTraits<decltype(std::declval<ActionPromise>()())>::Type;
ForEach(Reader reader, Action action)
: reader_(std::move(reader)), action_factory_(std::move(action)) {
ForEach(Reader reader, Action action, DebugLocation whence = {})
: reader_(std::move(reader)),
action_factory_(std::move(action)),
whence_(whence) {
Construct(&reader_next_, reader_.Next());
}
~ForEach() {
@ -125,7 +127,8 @@ class ForEach {
ForEach& operator=(const ForEach&) = delete;
ForEach(ForEach&& other) noexcept
: reader_(std::move(other.reader_)),
action_factory_(std::move(other.action_factory_)) {
action_factory_(std::move(other.action_factory_)),
whence_(other.whence_) {
GPR_DEBUG_ASSERT(reading_next_);
GPR_DEBUG_ASSERT(other.reading_next_);
Construct(&reader_next_, std::move(other.reader_next_));
@ -136,6 +139,7 @@ class ForEach {
reader_ = std::move(other.reader_);
action_factory_ = std::move(other.action_factory_);
reader_next_ = std::move(other.reader_next_);
whence_ = other.whence_;
return *this;
}
@ -154,7 +158,8 @@ class ForEach {
std::string DebugTag() {
return absl::StrCat(GetContext<Activity>()->DebugTag(), " FOR_EACH[0x",
reinterpret_cast<uintptr_t>(this), "]: ");
reinterpret_cast<uintptr_t>(this), "@", whence_.file(),
":", whence_.line(), "]: ");
}
Poll<Result> PollReaderNext() {
@ -215,6 +220,7 @@ class ForEach {
GPR_NO_UNIQUE_ADDRESS Reader reader_;
GPR_NO_UNIQUE_ADDRESS ActionFactory action_factory_;
GPR_NO_UNIQUE_ADDRESS DebugLocation whence_;
bool reading_next_ = true;
union {
ReaderNext reader_next_;
@ -226,9 +232,10 @@ class ForEach {
/// For each item acquired by calling Reader::Next, run the promise Action.
template <typename Reader, typename Action>
for_each_detail::ForEach<Reader, Action> ForEach(Reader reader, Action action) {
for_each_detail::ForEach<Reader, Action> ForEach(Reader reader, Action action,
DebugLocation whence = {}) {
return for_each_detail::ForEach<Reader, Action>(std::move(reader),
std::move(action));
std::move(action), whence);
}
} // namespace grpc_core

@ -273,7 +273,7 @@ bool Party::RunOneParticipant(int i) {
auto* participant = participants_[i].load(std::memory_order_acquire);
if (participant == nullptr) {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG, "%s[party] wakeup %d already complete",
gpr_log(GPR_INFO, "%s[party] wakeup %d already complete",
DebugTag().c_str(), i);
}
return false;
@ -281,7 +281,7 @@ bool Party::RunOneParticipant(int i) {
absl::string_view name;
if (grpc_trace_promise_primitives.enabled()) {
name = participant->name();
gpr_log(GPR_DEBUG, "%s[%s] begin job %d", DebugTag().c_str(),
gpr_log(GPR_INFO, "%s[%s] begin job %d", DebugTag().c_str(),
std::string(name).c_str(), i);
}
// Poll the participant.
@ -290,12 +290,12 @@ bool Party::RunOneParticipant(int i) {
currently_polling_ = kNotPolling;
if (done) {
if (!name.empty()) {
gpr_log(GPR_DEBUG, "%s[%s] end poll and finish job %d",
DebugTag().c_str(), std::string(name).c_str(), i);
gpr_log(GPR_INFO, "%s[%s] end poll and finish job %d", DebugTag().c_str(),
std::string(name).c_str(), i);
}
participants_[i].store(nullptr, std::memory_order_relaxed);
} else if (!name.empty()) {
gpr_log(GPR_DEBUG, "%s[%s] end poll", DebugTag().c_str(),
gpr_log(GPR_INFO, "%s[%s] end poll", DebugTag().c_str(),
std::string(name).c_str());
}
return done;
@ -306,7 +306,7 @@ void Party::AddParticipants(Participant** participants, size_t count) {
count](size_t* slots) {
for (size_t i = 0; i < count; i++) {
if (grpc_trace_party_state.enabled()) {
gpr_log(GPR_DEBUG,
gpr_log(GPR_INFO,
"Party %p AddParticipant: %s @ %" PRIdPTR,
&sync_, std::string(participants[i]->name()).c_str(), slots[i]);
}

@ -90,6 +90,7 @@
#include "src/core/lib/promise/race.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/promise/status_flag.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h"
@ -2879,6 +2880,12 @@ class ClientPromiseBasedCall final : public PromiseBasedCall {
return RefCountedPtr<WrappingCallSpine>(this);
}
ClientMetadata& UnprocessedClientInitialMetadata() override {
Crash("not for v2");
}
void V2HackToStartCallWithoutACallFilterStack() override {}
private:
RefCount refs_;
ClientPromiseBasedCall* const call_;
@ -3764,6 +3771,12 @@ class ServerCallSpine final : public PipeBasedCallSpine,
Crash("unimplemented");
}
void V2HackToStartCallWithoutACallFilterStack() override {}
ClientMetadata& UnprocessedClientInitialMetadata() override {
Crash("not for v2");
}
bool RunParty() override {
ScopedContext ctx(this);
return Party::RunParty();

@ -39,7 +39,7 @@
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/call_size_estimator.h"
#include "src/core/lib/transport/call_arena_allocator.h"
namespace grpc_core {

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/transport/call_size_estimator.h"
#include "src/core/lib/transport/call_arena_allocator.h"
#include <algorithm>

@ -12,15 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_TRANSPORT_CALL_SIZE_ESTIMATOR_H
#define GRPC_SRC_CORE_LIB_TRANSPORT_CALL_SIZE_ESTIMATOR_H
#ifndef GRPC_SRC_CORE_LIB_TRANSPORT_CALL_ARENA_ALLOCATOR_H
#define GRPC_SRC_CORE_LIB_TRANSPORT_CALL_ARENA_ALLOCATOR_H
#include <stddef.h>
#include <atomic>
#include <cstddef>
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
namespace grpc_core {
class CallSizeEstimator {
@ -47,6 +52,22 @@ class CallSizeEstimator {
std::atomic<size_t> call_size_estimate_;
};
class CallArenaAllocator : public RefCounted<CallArenaAllocator> {
public:
CallArenaAllocator(MemoryAllocator allocator, size_t initial_size)
: allocator_(std::move(allocator)), call_size_estimator_(initial_size) {}
Arena* MakeArena() {
return Arena::Create(call_size_estimator_.CallSizeEstimate(), &allocator_);
}
void Destroy(Arena* arena) { arena->Destroy(); }
private:
MemoryAllocator allocator_;
CallSizeEstimator call_size_estimator_;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_SIZE_ESTIMATOR_H
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_ARENA_ALLOCATOR_H

@ -22,12 +22,33 @@
namespace grpc_core {
// CallDestination is responsible for the processing of a CallHandler.
// It might be a transport, the server API, or a subchannel on the client (for
// instance).
class CallDestination : public Orphanable {
// UnstartedCallDestination is responsible for starting an UnstartedCallHandler
// and then processing operations on the resulting CallHandler.
//
// Examples of UnstartedCallDestinations include:
// - a load-balanced call in the client channel
// - a hijacking filter (see Interceptor)
class UnstartedCallDestination
: public DualRefCounted<UnstartedCallDestination> {
public:
~UnstartedCallDestination() override = default;
// Start a call. The UnstartedCallHandler will be consumed by the Destination
// and started.
// Must be called from the party owned by the call, eg the following must
// hold:
// GPR_ASSERT(GetContext<Activity>() == unstarted_call_handler.party());
virtual void StartCall(UnstartedCallHandler unstarted_call_handler) = 0;
};
// CallDestination is responsible for handling processing of an already started
// call.
//
// Examples of CallDestinations include:
// - a client transport
// - the server API
class CallDestination : public DualRefCounted<CallDestination> {
public:
virtual void StartCall(CallHandler call_handler) = 0;
virtual void HandleCall(CallHandler unstarted_call_handler) = 0;
};
} // namespace grpc_core

@ -53,6 +53,7 @@ Poll<ResultOr<T>> OperationExecutor<T>::Start(
template <typename T>
Poll<ResultOr<T>> OperationExecutor<T>::InitStep(T input, void* call_data) {
GPR_ASSERT(input != nullptr);
while (true) {
if (ops_ == end_ops_) {
return ResultOr<T>{std::move(input), nullptr};
@ -216,9 +217,10 @@ void CallFilters::CancelDueToFailedPipeOperation(SourceLocation but_where) {
}
void CallFilters::PushServerTrailingMetadata(ServerMetadataHandle md) {
GPR_ASSERT(md != nullptr);
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) {
gpr_log(GPR_DEBUG, "%s Push server trailing metadata: %s into %s",
GetContext<Activity>()->DebugTag().c_str(),
gpr_log(GPR_INFO, "%s PushServerTrailingMetadata[%p]: %s into %s",
GetContext<Activity>()->DebugTag().c_str(), this,
md->DebugString().c_str(), DebugString().c_str());
}
GPR_ASSERT(md != nullptr);
@ -227,7 +229,7 @@ void CallFilters::PushServerTrailingMetadata(ServerMetadataHandle md) {
client_initial_metadata_state_.CloseWithError();
server_initial_metadata_state_.CloseSending();
client_to_server_message_state_.CloseWithError();
server_to_client_message_state_.CloseWithError();
server_to_client_message_state_.CloseSending();
server_trailing_metadata_waiter_.Wake();
}
@ -358,6 +360,10 @@ void filters_detail::PipeState::DropPush() {
case ValueState::kReady:
case ValueState::kProcessing:
case ValueState::kWaiting:
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) {
gpr_log(GPR_INFO, "%p drop push in state %s", this,
DebugString().c_str());
}
state_ = ValueState::kError;
wait_recv_.Wake();
break;
@ -374,6 +380,10 @@ void filters_detail::PipeState::DropPull() {
case ValueState::kReady:
case ValueState::kProcessing:
case ValueState::kWaiting:
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) {
gpr_log(GPR_INFO, "%p drop pull in state %s", this,
DebugString().c_str());
}
state_ = ValueState::kError;
wait_send_.Wake();
break;
@ -386,9 +396,12 @@ void filters_detail::PipeState::DropPull() {
Poll<StatusFlag> filters_detail::PipeState::PollPush() {
switch (state_) {
case ValueState::kIdle:
// Read completed and new read started => we see waiting here
case ValueState::kWaiting:
state_ = ValueState::kReady;
wait_recv_.Wake();
return wait_send_.pending();
case ValueState::kIdle:
case ValueState::kClosed:
return Success{};
case ValueState::kQueued:

@ -155,6 +155,10 @@ struct CallConstructor<FilterType,
// Only one pointer can be set.
template <typename T>
struct ResultOr {
ResultOr(T ok, ServerMetadataHandle error)
: ok(std::move(ok)), error(std::move(error)) {
GPR_ASSERT((this->ok == nullptr) ^ (this->error == nullptr));
}
T ok;
ServerMetadataHandle error;
};
@ -1310,44 +1314,6 @@ class CallFilters {
filters_detail::StackData data_;
};
class NextMessage {
public:
NextMessage() : has_value_(false), cancelled_(false) {}
explicit NextMessage(MessageHandle value)
: has_value_(true), value_(std::move(value)) {}
explicit NextMessage(bool cancelled)
: has_value_(false), cancelled_(cancelled) {}
NextMessage(const NextMessage&) = delete;
NextMessage& operator=(const NextMessage&) = delete;
NextMessage(NextMessage&& other) noexcept = default;
NextMessage& operator=(NextMessage&& other) = default;
using value_type = MessageHandle;
void reset() {
has_value_ = false;
cancelled_ = false;
value_.reset();
}
bool has_value() const { return has_value_; }
const MessageHandle& value() const {
GPR_DEBUG_ASSERT(has_value_);
return value_;
}
MessageHandle& value() {
GPR_DEBUG_ASSERT(has_value_);
return value_;
}
const MessageHandle& operator*() const { return value(); }
MessageHandle& operator*() { return value(); }
bool cancelled() const { return !has_value_ && cancelled_; }
private:
bool has_value_;
bool cancelled_;
MessageHandle value_;
};
explicit CallFilters(ClientMetadataHandle client_initial_metadata);
~CallFilters();
@ -1422,12 +1388,21 @@ class CallFilters {
public:
Push(CallFilters* filters, T x)
: filters_(filters), value_(std::move(x)) {
GPR_ASSERT(value_ != nullptr);
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) {
gpr_log(GPR_INFO, "BeginPush[%p|%p]: %s", &state(), this,
state().DebugString().c_str());
}
GPR_ASSERT(push_slot() == nullptr);
state().BeginPush();
push_slot() = this;
}
~Push() {
if (filters_ != nullptr) {
state().DropPush();
if (value_ != nullptr) {
state().DropPush();
}
GPR_ASSERT(push_slot() == this);
push_slot() = nullptr;
}
}
@ -1445,9 +1420,49 @@ class CallFilters {
Push& operator=(Push&&) = delete;
Poll<StatusFlag> operator()() { return state().PollPush(); }
Poll<StatusFlag> operator()() {
if (value_ == nullptr) {
GPR_ASSERT(filters_ == nullptr);
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) {
gpr_log(GPR_INFO, "Push[|%p]: already done", this);
}
return Success{};
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) {
gpr_log(GPR_INFO, "Push[%p|%p]: %s", &state(), this,
state().DebugString().c_str());
}
auto r = state().PollPush();
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) {
if (r.pending()) {
gpr_log(GPR_INFO, "Push[%p|%p]: pending; %s", &state(), this,
state().DebugString().c_str());
} else if (r.value().ok()) {
gpr_log(GPR_INFO, "Push[%p|%p]: success; %s", &state(), this,
state().DebugString().c_str());
} else {
gpr_log(GPR_INFO, "Push[%p|%p]: failure; %s", &state(), this,
state().DebugString().c_str());
}
}
if (r.ready()) {
push_slot() = nullptr;
filters_ = nullptr;
}
return r;
}
T TakeValue() { return std::move(value_); }
T TakeValue() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) {
gpr_log(GPR_INFO, "Push[%p|%p]: take value; %s", &state(), this,
state().DebugString().c_str());
}
GPR_ASSERT(value_ != nullptr);
GPR_ASSERT(filters_ != nullptr);
push_slot() = nullptr;
filters_ = nullptr;
return std::move(value_);
}
absl::string_view DebugString() const {
return value_ != nullptr ? " (not pulled)" : "";
@ -1485,6 +1500,10 @@ class CallFilters {
PullMaybe& operator=(PullMaybe&&) = delete;
Poll<ValueOrFailure<absl::optional<T>>> operator()() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) {
gpr_log(GPR_INFO, "PullMaybe[%p|%p]: %s executor:%d", &state(), this,
state().DebugString().c_str(), executor_.IsRunning());
}
if (executor_.IsRunning()) {
auto c = state().PollClosed();
if (c.ready() && c.value()) {
@ -1544,23 +1563,36 @@ class CallFilters {
executor_(std::move(other.executor_)) {}
PullMessage& operator=(PullMessage&&) = delete;
Poll<NextMessage> operator()() {
Poll<ValueOrFailure<absl::optional<MessageHandle>>> operator()() {
GPR_ASSERT(filters_ != nullptr);
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) {
gpr_log(GPR_INFO, "PullMessage[%p|%p]: %s executor:%d", &state(),
this, state().DebugString().c_str(), executor_.IsRunning());
}
if (executor_.IsRunning()) {
auto c = state().PollClosed();
if (c.ready() && c.value()) {
filters_->CancelDueToFailedPipeOperation();
return NextMessage(true);
return Failure{};
}
return FinishOperationExecutor(executor_.Step(filters_->call_data_));
}
auto p = state().PollPull();
auto* r = p.value_if_ready();
if (r == nullptr) return Pending{};
if (r == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) {
gpr_log(GPR_INFO, "PullMessage[%p] pending: %s executor:%d",
&state(), state().DebugString().c_str(),
executor_.IsRunning());
}
return Pending{};
}
if (!r->ok()) {
filters_->CancelDueToFailedPipeOperation();
return NextMessage(true);
return Failure{};
}
if (!**r) return NextMessage(false);
if (!**r) return absl::nullopt;
GPR_ASSERT(filters_ != nullptr);
return FinishOperationExecutor(executor_.Start(
layout(), push()->TakeValue(), filters_->call_data_));
}
@ -1573,15 +1605,19 @@ class CallFilters {
return &(filters_->stack_->data_.*layout_ptr);
}
Poll<NextMessage> FinishOperationExecutor(
Poll<filters_detail::ResultOr<T>> p) {
Poll<ValueOrFailure<absl::optional<MessageHandle>>>
FinishOperationExecutor(Poll<filters_detail::ResultOr<T>> p) {
auto* r = p.value_if_ready();
if (r == nullptr) return Pending{};
GPR_DEBUG_ASSERT(!executor_.IsRunning());
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) {
gpr_log(GPR_INFO, "PullMessage[%p|%p] executor done: %s", &state(),
this, state().DebugString().c_str());
}
state().AckPull();
if (r->ok != nullptr) return NextMessage(std::move(r->ok));
if (r->ok != nullptr) return std::move(r->ok);
filters_->PushServerTrailingMetadata(std::move(r->error));
return NextMessage(true);
return Failure{};
}
CallFilters* filters_;
@ -1699,9 +1735,21 @@ class CallFilters {
return std::move(filters_->server_trailing_metadata_);
}
// Otherwise we need to process it through all the filters.
return executor_.Start(&filters_->stack_->data_.server_trailing_metadata,
std::move(filters_->server_trailing_metadata_),
filters_->call_data_);
auto r = executor_.Start(
&filters_->stack_->data_.server_trailing_metadata,
std::move(filters_->server_trailing_metadata_), filters_->call_data_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) {
if (r.pending()) {
gpr_log(GPR_INFO,
"%s PullServerTrailingMetadata[%p]: Pending(but executing)",
GetContext<Activity>()->DebugTag().c_str(), filters_);
} else {
gpr_log(GPR_INFO, "%s PullServerTrailingMetadata[%p]: Ready: %s",
GetContext<Activity>()->DebugTag().c_str(), filters_,
r.value()->DebugString().c_str());
}
}
return r;
}
private:

@ -16,6 +16,9 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/promise/for_each.h"
#include "src/core/lib/promise/try_seq.h"
namespace grpc_core {
void ForwardCall(CallHandler call_handler, CallInitiator call_initiator) {
@ -89,12 +92,14 @@ void ForwardCall(CallHandler call_handler, CallInitiator call_initiator) {
});
}
CallInitiatorAndHandler MakeCall(
CallInitiatorAndHandler MakeCallPair(
ClientMetadataHandle client_initial_metadata,
grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena,
bool is_arena_owned) {
auto spine = CallSpine::Create(std::move(client_initial_metadata),
event_engine, arena, is_arena_owned);
RefCountedPtr<CallArenaAllocator> call_arena_allocator_if_arena_is_owned,
grpc_call_context_element* legacy_context) {
auto spine = CallSpine::Create(
std::move(client_initial_metadata), event_engine, arena,
std::move(call_arena_allocator_if_arena_is_owned), legacy_context);
return {CallInitiator(spine), UnstartedCallHandler(spine)};
}

@ -18,16 +18,16 @@
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/context.h"
#include "src/core/lib/promise/detail/status.h"
#include "src/core/lib/promise/for_each.h"
#include "src/core/lib/promise/if.h"
#include "src/core/lib/promise/latch.h"
#include "src/core/lib/promise/party.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/prioritized_race.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/status_flag.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/transport/call_arena_allocator.h"
#include "src/core/lib/transport/call_filters.h"
#include "src/core/lib/transport/message.h"
#include "src/core/lib/transport/metadata.h"
@ -80,6 +80,8 @@ class CallSpineInterface {
virtual Promise<StatusFlag> PushServerInitialMetadata(
absl::optional<ServerMetadataHandle> md) = 0;
virtual Promise<bool> WasCancelled() = 0;
virtual ClientMetadata& UnprocessedClientInitialMetadata() = 0;
virtual void V2HackToStartCallWithoutACallFilterStack() = 0;
// Wrap a promise so that if it returns failure it automatically cancels
// the rest of the call.
@ -252,58 +254,149 @@ class PipeBasedCallSpine : public CallSpineInterface {
}
};
class CallSpine final : public PipeBasedCallSpine, public Party {
class CallSpine final : public CallSpineInterface, public Party {
public:
static RefCountedPtr<CallSpine> Create(
ClientMetadataHandle client_initial_metadata,
grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena,
bool is_arena_owned) {
auto spine = RefCountedPtr<CallSpine>(
arena->New<CallSpine>(event_engine, arena, is_arena_owned));
spine->SpawnInfallible(
"push_client_initial_metadata",
[spine = spine.get(), client_initial_metadata = std::move(
client_initial_metadata)]() mutable {
return Map(spine->client_initial_metadata_.sender.Push(
std::move(client_initial_metadata)),
[](bool) { return Empty{}; });
});
return spine;
RefCountedPtr<CallArenaAllocator> call_arena_allocator_if_arena_is_owned,
grpc_call_context_element* legacy_context) {
return RefCountedPtr<CallSpine>(arena->New<CallSpine>(
std::move(client_initial_metadata), event_engine, arena,
std::move(call_arena_allocator_if_arena_is_owned), legacy_context));
}
~CallSpine() override {
if (legacy_context_is_owned_) {
for (size_t i = 0; i < GRPC_CONTEXT_COUNT; i++) {
grpc_call_context_element& elem = legacy_context_[i];
if (elem.destroy != nullptr) elem.destroy(&elem);
}
}
}
Pipe<ClientMetadataHandle>& client_initial_metadata() override {
return client_initial_metadata_;
}
Pipe<ServerMetadataHandle>& server_initial_metadata() override {
return server_initial_metadata_;
}
Pipe<MessageHandle>& client_to_server_messages() override {
return client_to_server_messages_;
}
Pipe<MessageHandle>& server_to_client_messages() override {
return server_to_client_messages_;
}
Latch<ServerMetadataHandle>& cancel_latch() override { return cancel_latch_; }
Latch<bool>& was_cancelled_latch() override { return was_cancelled_latch_; }
CallFilters& call_filters() { return call_filters_; }
Party& party() override { return *this; }
Arena* arena() override { return arena_; }
void IncrementRefCount() override { Party::IncrementRefCount(); }
void Unref() override { Party::Unref(); }
Promise<ValueOrFailure<absl::optional<ServerMetadataHandle>>>
PullServerInitialMetadata() override {
return call_filters().PullServerInitialMetadata();
}
Promise<ServerMetadataHandle> PullServerTrailingMetadata() override {
return call_filters().PullServerTrailingMetadata();
}
Promise<StatusFlag> PushClientToServerMessage(
MessageHandle message) override {
return call_filters().PushClientToServerMessage(std::move(message));
}
Promise<ValueOrFailure<absl::optional<MessageHandle>>>
PullClientToServerMessage() override {
return call_filters().PullClientToServerMessage();
}
Promise<StatusFlag> PushServerToClientMessage(
MessageHandle message) override {
return call_filters().PushServerToClientMessage(std::move(message));
}
Promise<ValueOrFailure<absl::optional<MessageHandle>>>
PullServerToClientMessage() override {
return call_filters().PullServerToClientMessage();
}
void PushServerTrailingMetadata(ServerMetadataHandle md) override {
call_filters().PushServerTrailingMetadata(std::move(md));
}
void FinishSends() override { call_filters().FinishClientToServerSends(); }
Promise<ValueOrFailure<ClientMetadataHandle>> PullClientInitialMetadata()
override {
return call_filters().PullClientInitialMetadata();
}
Promise<StatusFlag> PushServerInitialMetadata(
absl::optional<ServerMetadataHandle> md) override {
if (md.has_value()) {
return call_filters().PushServerInitialMetadata(std::move(*md));
} else {
call_filters().NoServerInitialMetadata();
return Immediate<StatusFlag>(Success{});
}
}
Promise<bool> WasCancelled() override {
return call_filters().WasCancelled();
}
ClientMetadata& UnprocessedClientInitialMetadata() override {
return *call_filters().unprocessed_client_initial_metadata();
}
// TODO(ctiller): re-evaluate legacy context apis
grpc_call_context_element& legacy_context(grpc_context_index index) const {
return legacy_context_[index];
}
grpc_call_context_element* legacy_context() { return legacy_context_; }
grpc_event_engine::experimental::EventEngine* event_engine() const override {
return event_engine_;
}
void V2HackToStartCallWithoutACallFilterStack() override {
CallFilters::StackBuilder empty_stack_builder;
call_filters().SetStack(empty_stack_builder.Build());
}
private:
friend class Arena;
CallSpine(grpc_event_engine::experimental::EventEngine* event_engine,
Arena* arena, bool is_arena_owned)
CallSpine(ClientMetadataHandle client_initial_metadata,
grpc_event_engine::experimental::EventEngine* event_engine,
Arena* arena,
RefCountedPtr<CallArenaAllocator> call_arena_allocator,
grpc_call_context_element* legacy_context)
: Party(1),
call_filters_(std::move(client_initial_metadata)),
arena_(arena),
is_arena_owned_(is_arena_owned),
event_engine_(event_engine) {}
event_engine_(event_engine),
call_arena_allocator_if_arena_is_owned_(
std::move(call_arena_allocator)) {
if (legacy_context == nullptr) {
legacy_context_ = static_cast<grpc_call_context_element*>(
arena->Alloc(sizeof(grpc_call_context_element) * GRPC_CONTEXT_COUNT));
memset(legacy_context_, 0,
sizeof(grpc_call_context_element) * GRPC_CONTEXT_COUNT);
legacy_context_is_owned_ = true;
} else {
legacy_context_ = legacy_context;
legacy_context_is_owned_ = false;
}
}
class ScopedContext : public ScopedActivity,
public promise_detail::Context<Arena> {
class ScopedContext
: public ScopedActivity,
public promise_detail::Context<Arena>,
public promise_detail::Context<
grpc_event_engine::experimental::EventEngine>,
public promise_detail::Context<grpc_call_context_element> {
public:
explicit ScopedContext(CallSpine* spine)
: ScopedActivity(&spine->party()), Context<Arena>(spine->arena()) {}
: ScopedActivity(spine),
Context<Arena>(spine->arena_),
Context<grpc_event_engine::experimental::EventEngine>(
spine->event_engine()),
Context<grpc_call_context_element>(spine->legacy_context_) {}
};
bool RunParty() override {
@ -312,35 +405,30 @@ class CallSpine final : public PipeBasedCallSpine, public Party {
}
void PartyOver() override {
Arena* a = arena();
Arena* a = arena_;
RefCountedPtr<CallArenaAllocator> call_arena_allocator_if_arena_is_owned =
std::move(call_arena_allocator_if_arena_is_owned_);
{
ScopedContext context(this);
CancelRemainingParticipants();
a->DestroyManagedNewObjects();
}
this->~CallSpine();
a->Destroy();
}
grpc_event_engine::experimental::EventEngine* event_engine() const override {
return event_engine_;
if (call_arena_allocator_if_arena_is_owned != nullptr) {
call_arena_allocator_if_arena_is_owned->Destroy(a);
}
}
Arena* arena_;
bool is_arena_owned_;
// Initial metadata from client to server
Pipe<ClientMetadataHandle> client_initial_metadata_{arena()};
// Initial metadata from server to client
Pipe<ServerMetadataHandle> server_initial_metadata_{arena()};
// Messages travelling from the application to the transport.
Pipe<MessageHandle> client_to_server_messages_{arena()};
// Messages travelling from the transport to the application.
Pipe<MessageHandle> server_to_client_messages_{arena()};
// Latch that can be set to terminate the call
Latch<ServerMetadataHandle> cancel_latch_;
Latch<bool> was_cancelled_latch_;
// Call filters/pipes part of the spine
CallFilters call_filters_;
Arena* const arena_;
// Event engine associated with this call
grpc_event_engine::experimental::EventEngine* const event_engine_;
// Legacy context
// TODO(ctiller): remove
grpc_call_context_element* legacy_context_;
RefCountedPtr<CallArenaAllocator> call_arena_allocator_if_arena_is_owned_;
bool legacy_context_is_owned_;
};
class CallInitiator {
@ -446,6 +534,15 @@ class CallHandler {
Arena* arena() { return spine_->arena(); }
grpc_event_engine::experimental::EventEngine* event_engine() {
return DownCast<CallSpine*>(spine_.get())->event_engine();
}
// TODO(ctiller): re-evaluate this API
grpc_call_context_element* legacy_context() {
return DownCast<CallSpine*>(spine_.get())->legacy_context();
}
private:
RefCountedPtr<CallSpineInterface> spine_;
};
@ -482,8 +579,19 @@ class UnstartedCallHandler {
return spine_->party().SpawnWaitable(name, std::move(promise_factory));
}
ClientMetadata& UnprocessedClientInitialMetadata() {
return spine_->UnprocessedClientInitialMetadata();
}
CallHandler V2HackToStartCallWithoutACallFilterStack() {
GPR_ASSERT(DownCast<PipeBasedCallSpine*>(spine_.get()) != nullptr);
spine_->V2HackToStartCallWithoutACallFilterStack();
return CallHandler(std::move(spine_));
}
CallHandler StartCall(RefCountedPtr<CallFilters::Stack> call_filters) {
DownCast<CallSpine*>(spine_.get())
->call_filters()
.SetStack(std::move(call_filters));
return CallHandler(std::move(spine_));
}
@ -498,10 +606,11 @@ struct CallInitiatorAndHandler {
UnstartedCallHandler handler;
};
CallInitiatorAndHandler MakeCall(
CallInitiatorAndHandler MakeCallPair(
ClientMetadataHandle client_initial_metadata,
grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena,
bool is_arena_owned);
RefCountedPtr<CallArenaAllocator> call_arena_allocator_if_arena_is_owned,
grpc_call_context_element* legacy_context);
template <typename CallHalf>
auto OutgoingMessages(CallHalf h) {

@ -0,0 +1,156 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/transport/interception_chain.h"
#include <cstddef>
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/transport/call_destination.h"
#include "src/core/lib/transport/call_filters.h"
#include "src/core/lib/transport/call_spine.h"
#include "src/core/lib/transport/metadata.h"
namespace grpc_core {
std::atomic<size_t> InterceptionChainBuilder::next_filter_id_{0};
///////////////////////////////////////////////////////////////////////////////
// HijackedCall
CallInitiator HijackedCall::MakeCall() {
auto metadata = Arena::MakePooled<ClientMetadata>();
*metadata = metadata_->Copy();
return MakeCallWithMetadata(std::move(metadata));
}
CallInitiator HijackedCall::MakeCallWithMetadata(
ClientMetadataHandle metadata) {
auto call = MakeCallPair(std::move(metadata), call_handler_.event_engine(),
call_handler_.arena(), nullptr,
call_handler_.legacy_context());
destination_->StartCall(std::move(call.handler));
return std::move(call.initiator);
}
namespace {
class CallStarter final : public UnstartedCallDestination {
public:
CallStarter(RefCountedPtr<CallFilters::Stack> stack,
RefCountedPtr<CallDestination> destination)
: stack_(std::move(stack)), destination_(std::move(destination)) {}
void Orphaned() override {
stack_.reset();
destination_.reset();
}
void StartCall(UnstartedCallHandler unstarted_call_handler) override {
destination_->HandleCall(unstarted_call_handler.StartCall(stack_));
}
private:
RefCountedPtr<CallFilters::Stack> stack_;
RefCountedPtr<CallDestination> destination_;
};
class TerminalInterceptor final : public UnstartedCallDestination {
public:
explicit TerminalInterceptor(
RefCountedPtr<CallFilters::Stack> stack,
RefCountedPtr<UnstartedCallDestination> destination)
: stack_(std::move(stack)), destination_(std::move(destination)) {}
void Orphaned() override {
stack_.reset();
destination_.reset();
}
void StartCall(UnstartedCallHandler unstarted_call_handler) override {
unstarted_call_handler.SpawnGuarded(
"start_call",
Map(interception_chain_detail::HijackCall(unstarted_call_handler,
destination_, stack_),
[](ValueOrFailure<HijackedCall> hijacked_call) -> StatusFlag {
if (!hijacked_call.ok()) return Failure{};
ForwardCall(hijacked_call.value().original_call_handler(),
hijacked_call.value().MakeLastCall());
return Success{};
}));
}
private:
RefCountedPtr<CallFilters::Stack> stack_;
RefCountedPtr<UnstartedCallDestination> destination_;
};
} // namespace
///////////////////////////////////////////////////////////////////////////////
// InterceptionChain::Builder
void InterceptionChainBuilder::AddInterceptor(
absl::StatusOr<RefCountedPtr<Interceptor>> interceptor) {
if (!status_.ok()) return;
if (!interceptor.ok()) {
status_ = interceptor.status();
return;
}
(*interceptor)->filter_stack_ = MakeFilterStack();
if (top_interceptor_ == nullptr) {
top_interceptor_ = std::move(*interceptor);
} else {
Interceptor* previous = top_interceptor_.get();
while (previous->wrapped_destination_ != nullptr) {
previous = DownCast<Interceptor*>(previous->wrapped_destination_.get());
}
previous->wrapped_destination_ = std::move(*interceptor);
}
}
absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>
InterceptionChainBuilder::Build(FinalDestination final_destination) {
if (!status_.ok()) return status_;
// Build the final UnstartedCallDestination in the chain - what we do here
// depends on both the type of the final destination and the filters we have
// that haven't been captured into an Interceptor yet.
RefCountedPtr<UnstartedCallDestination> terminator = Match(
final_destination,
[this](RefCountedPtr<UnstartedCallDestination> final_destination)
-> RefCountedPtr<UnstartedCallDestination> {
if (stack_builder_.has_value()) {
return MakeRefCounted<TerminalInterceptor>(MakeFilterStack(),
final_destination);
}
return final_destination;
},
[this](RefCountedPtr<CallDestination> final_destination)
-> RefCountedPtr<UnstartedCallDestination> {
return MakeRefCounted<CallStarter>(MakeFilterStack(),
std::move(final_destination));
});
// Now append the terminator to the interceptor chain.
if (top_interceptor_ == nullptr) {
return std::move(terminator);
}
Interceptor* previous = top_interceptor_.get();
while (previous->wrapped_destination_ != nullptr) {
previous = DownCast<Interceptor*>(previous->wrapped_destination_.get());
}
previous->wrapped_destination_ = std::move(terminator);
return std::move(top_interceptor_);
}
} // namespace grpc_core

@ -0,0 +1,225 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_TRANSPORT_INTERCEPTION_CHAIN_H
#define GRPC_SRC_CORE_LIB_TRANSPORT_INTERCEPTION_CHAIN_H
#include <memory>
#include <vector>
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/transport/call_destination.h"
#include "src/core/lib/transport/call_filters.h"
#include "src/core/lib/transport/call_spine.h"
#include "src/core/lib/transport/metadata.h"
namespace grpc_core {
class InterceptionChainBuilder;
// One hijacked call. Using this we can get access to the CallHandler for the
// call object above us, the processed metadata from any filters/interceptors
// above us, and also create new CallInterceptor objects that will be handled
// below.
class HijackedCall final {
public:
HijackedCall(ClientMetadataHandle metadata,
RefCountedPtr<UnstartedCallDestination> destination,
CallHandler call_handler)
: metadata_(std::move(metadata)),
destination_(std::move(destination)),
call_handler_(std::move(call_handler)) {}
// Create a new call and pass it down the stack.
// This can be called as many times as needed.
CallInitiator MakeCall();
// Per MakeCall(), but precludes creating further calls.
// Allows us to optimize by not copying initial metadata.
CallInitiator MakeLastCall() {
return MakeCallWithMetadata(std::move(metadata_));
}
CallHandler& original_call_handler() { return call_handler_; }
ClientMetadata& client_metadata() { return *metadata_; }
private:
CallInitiator MakeCallWithMetadata(ClientMetadataHandle metadata);
ClientMetadataHandle metadata_;
RefCountedPtr<UnstartedCallDestination> destination_;
CallHandler call_handler_;
};
namespace interception_chain_detail {
inline auto HijackCall(UnstartedCallHandler unstarted_call_handler,
RefCountedPtr<UnstartedCallDestination> destination,
RefCountedPtr<CallFilters::Stack> stack) {
auto call_handler = unstarted_call_handler.StartCall(stack);
return Map(
call_handler.PullClientInitialMetadata(),
[call_handler,
destination](ValueOrFailure<ClientMetadataHandle> metadata) mutable
-> ValueOrFailure<HijackedCall> {
if (!metadata.ok()) return Failure{};
return HijackedCall(std::move(metadata.value()), std::move(destination),
std::move(call_handler));
});
}
} // namespace interception_chain_detail
// A delegating UnstartedCallDestination for use as a hijacking filter.
// Implementations may look at the unprocessed initial metadata
// and decide to do one of two things:
//
// 1. It can hijack the call. Returns a HijackedCall object that can
// be used to start new calls with the same metadata.
//
// 2. It can consume the call by calling `Consume`.
//
// Upon the StartCall call the UnstartedCallHandler will be from the last
// *Interceptor* in the call chain (without having been processed by any
// intervening filters) -- note that this is commonly not useful (not enough
// guarantees), and so it's usually better to Hijack and examine the metadata.
class Interceptor : public UnstartedCallDestination {
protected:
// Returns a promise that resolves to a HijackedCall instance.
// Hijacking is the process of taking over a call and starting one or more new
// ones.
auto Hijack(UnstartedCallHandler unstarted_call_handler) {
return interception_chain_detail::HijackCall(
std::move(unstarted_call_handler), wrapped_destination_, filter_stack_);
}
// Consume this call - it will not be passed on to any further filters.
CallHandler Consume(UnstartedCallHandler unstarted_call_handler) {
return unstarted_call_handler.StartCall(filter_stack_);
}
// TODO(ctiller): Consider a Passthrough() method that allows the call to be
// passed on to the next filter in the chain without any interception by the
// current filter.
private:
friend class InterceptionChainBuilder;
RefCountedPtr<UnstartedCallDestination> wrapped_destination_;
RefCountedPtr<CallFilters::Stack> filter_stack_;
};
class InterceptionChainBuilder final {
public:
// The kind of destination that the chain will eventually call.
// We can bottom out in various types depending on where we're intercepting:
// - The top half of the client channel wants to terminate on a
// UnstartedCallDestination (specifically the LB call destination).
// - The bottom half of the client channel and the server code wants to
// terminate on a ClientTransport - which unlike a
// UnstartedCallDestination demands a started CallHandler.
// There's some adaption code that's needed to start filters just prior
// to the bottoming out, and some design considerations to make with that.
// One way (that's not chosen here) would be to have the caller of the
// Builder provide something that can build an adaptor
// UnstartedCallDestination with parameters supplied by this builder - that
// disperses the responsibility of building the adaptor to the caller, which
// is not ideal - we might want to adjust the way this construct is built in
// the future, and building is a builder responsibility.
// Instead, we declare a relatively closed set of destinations here, and
// hide the adaptors inside the builder at build time.
using FinalDestination =
absl::variant<RefCountedPtr<UnstartedCallDestination>,
RefCountedPtr<CallDestination>>;
explicit InterceptionChainBuilder(ChannelArgs args)
: args_(std::move(args)) {}
// Add a filter with a `Call` class as an inner member.
// Call class must be one compatible with the filters described in
// call_filters.h.
template <typename T>
absl::enable_if_t<sizeof(typename T::Call) != 0, InterceptionChainBuilder&>
Add() {
if (!status_.ok()) return *this;
auto filter = T::Create(args_, {FilterInstanceId(FilterTypeId<T>())});
if (!filter.ok()) {
status_ = filter.status();
return *this;
}
auto& sb = stack_builder();
sb.Add(filter.value().get());
sb.AddOwnedObject(std::move(filter.value()));
return *this;
};
// Add a filter that is an interceptor - one that can hijack calls.
template <typename T>
absl::enable_if_t<std::is_base_of<Interceptor, T>::value,
InterceptionChainBuilder&>
Add() {
AddInterceptor(T::Create(args_, {FilterInstanceId(FilterTypeId<T>())}));
return *this;
};
// Add a filter that just mutates server trailing metadata.
template <typename F>
void AddOnServerTrailingMetadata(F f) {
stack_builder().AddOnServerTrailingMetadata(std::move(f));
}
// Build this stack
absl::StatusOr<RefCountedPtr<UnstartedCallDestination>> Build(
FinalDestination final_destination);
const ChannelArgs& channel_args() const { return args_; }
private:
CallFilters::StackBuilder& stack_builder() {
if (!stack_builder_.has_value()) stack_builder_.emplace();
return *stack_builder_;
}
RefCountedPtr<CallFilters::Stack> MakeFilterStack() {
auto stack = stack_builder().Build();
stack_builder_.reset();
return stack;
}
template <typename T>
static size_t FilterTypeId() {
static const size_t id =
next_filter_id_.fetch_add(1, std::memory_order_relaxed);
return id;
}
size_t FilterInstanceId(size_t filter_type) {
return filter_type_counts_[filter_type]++;
}
void AddInterceptor(absl::StatusOr<RefCountedPtr<Interceptor>> interceptor);
ChannelArgs args_;
absl::optional<CallFilters::StackBuilder> stack_builder_;
RefCountedPtr<Interceptor> top_interceptor_;
absl::Status status_;
std::map<size_t, size_t> filter_type_counts_;
static std::atomic<size_t> next_filter_id_;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_INTERCEPTION_CHAIN_H

@ -755,9 +755,9 @@ CORE_SOURCE_FILES = [
'src/core/lib/surface/wait_for_cq_end_op.cc',
'src/core/lib/transport/batch_builder.cc',
'src/core/lib/transport/bdp_estimator.cc',
'src/core/lib/transport/call_arena_allocator.cc',
'src/core/lib/transport/call_filters.cc',
'src/core/lib/transport/call_final_info.cc',
'src/core/lib/transport/call_size_estimator.cc',
'src/core/lib/transport/call_spine.cc',
'src/core/lib/transport/connectivity_state.cc',
'src/core/lib/transport/endpoint_info_handshaker.cc',

@ -19,6 +19,15 @@ licenses(["notice"])
grpc_package(name = "test/core/promise")
grpc_cc_library(
name = "poll_matcher",
testonly = True,
hdrs = ["poll_matcher.h"],
external_deps = ["gtest"],
visibility = ["//test/core:__subpackages__"],
deps = ["//src/core:poll"],
)
grpc_cc_library(
name = "test_wakeup_schedulers",
testonly = True,
@ -496,6 +505,7 @@ grpc_cc_test(
uses_event_engine = False,
uses_polling = False,
deps = [
"poll_matcher",
"//src/core:loop",
"//src/core:map",
"//src/core:notification",

@ -26,6 +26,7 @@
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/map.h"
#include "test/core/promise/poll_matcher.h"
using testing::Mock;
using testing::StrictMock;
@ -58,34 +59,6 @@ class MockActivity : public Activity, public Wakeable {
std::unique_ptr<ScopedActivity> scoped_activity_;
};
MATCHER(IsPending, "") {
if (arg.ready()) {
*result_listener << "is ready";
return false;
}
return true;
}
MATCHER(IsReady, "") {
if (arg.pending()) {
*result_listener << "is pending";
return false;
}
return true;
}
MATCHER_P(IsReady, value, "") {
if (arg.pending()) {
*result_listener << "is pending";
return false;
}
if (arg.value() != value) {
*result_listener << "is " << ::testing::PrintToString(arg.value());
return false;
}
return true;
}
TEST(ObservableTest, ImmediateNext) {
Observable<int> observable(1);
auto next = observable.Next(0);

@ -0,0 +1,60 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_TEST_CORE_PROMISE_POLL_MATCHER_H
#define GRPC_TEST_CORE_PROMISE_POLL_MATCHER_H
#include "gmock/gmock.h"
// Various gmock matchers for Poll
namespace grpc_core {
// Expect that a promise is still pending:
// EXPECT_THAT(some_promise(), IsPending());
MATCHER(IsPending, "") {
if (arg.ready()) {
*result_listener << "is ready";
return false;
}
return true;
}
// Expect that a promise is ready:
// EXPECT_THAT(some_promise(), IsReady());
MATCHER(IsReady, "") {
if (arg.pending()) {
*result_listener << "is pending";
return false;
}
return true;
}
// Expect that a promise is ready with a specific value:
// EXPECT_THAT(some_promise(), IsReady(value));
MATCHER_P(IsReady, value, "") {
if (arg.pending()) {
*result_listener << "is pending";
return false;
}
if (arg.value() != value) {
*result_listener << "is " << ::testing::PrintToString(arg.value());
return false;
}
return true;
}
} // namespace grpc_core
#endif // GRPC_TEST_CORE_PROMISE_POLL_MATCHER_H

@ -35,6 +35,23 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "interception_chain_test",
srcs = ["interception_chain_test.cc"],
external_deps = [
"gtest",
],
language = "C++",
uses_event_engine = False,
uses_polling = False,
deps = [
"//:grpc_base",
"//src/core:interception_chain",
"//src/core:resource_quota",
"//test/core/promise:poll_matcher",
],
)
grpc_cc_test(
name = "call_filters_test",
srcs = ["call_filters_test.cc"],
@ -46,6 +63,7 @@ grpc_cc_test(
uses_polling = False,
deps = [
"//src/core:call_filters",
"//test/core/promise:poll_matcher",
],
)

@ -19,6 +19,8 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "test/core/promise/poll_matcher.h"
using testing::Mock;
using testing::StrictMock;
@ -57,34 +59,6 @@ class MockActivity : public Activity, public Wakeable {
std::unique_ptr<ScopedActivity> scoped_activity_;
};
MATCHER(IsPending, "") {
if (arg.ready()) {
*result_listener << "is ready";
return false;
}
return true;
}
MATCHER(IsReady, "") {
if (arg.pending()) {
*result_listener << "is pending";
return false;
}
return true;
}
MATCHER_P(IsReady, value, "") {
if (arg.pending()) {
*result_listener << "is pending";
return false;
}
if (arg.value() != value) {
*result_listener << "is " << ::testing::PrintToString(arg.value());
return false;
}
return true;
}
} // namespace
////////////////////////////////////////////////////////////////////////////////

@ -105,16 +105,19 @@ struct MockPromiseEndpoint {
auto SendClientToServerMessages(CallInitiator initiator, int num_messages) {
return Loop([initiator, num_messages]() mutable {
bool has_message = (num_messages > 0);
return If(has_message,
Seq(initiator.PushMessage(Arena::MakePooled<Message>()),
[&num_messages]() -> LoopCtl<absl::Status> {
--num_messages;
return Continue();
}),
[initiator]() mutable -> LoopCtl<absl::Status> {
initiator.FinishSends();
return absl::OkStatus();
});
return If(
has_message,
[initiator, &num_messages]() mutable {
return Seq(initiator.PushMessage(Arena::MakePooled<Message>()),
[&num_messages]() -> LoopCtl<absl::Status> {
--num_messages;
return Continue();
});
},
[initiator]() mutable -> LoopCtl<absl::Status> {
initiator.FinishSends();
return absl::OkStatus();
});
});
}
@ -130,7 +133,6 @@ class ClientTransportTest : public ::testing::Test {
event_engine() {
return event_engine_;
}
MemoryAllocator* memory_allocator() { return &allocator_; }
ChannelArgs MakeChannelArgs() {
return CoreConfiguration::Get()
@ -138,6 +140,12 @@ class ClientTransportTest : public ::testing::Test {
.PreconditionChannelArgs(nullptr);
}
auto MakeCall(ClientMetadataHandle client_initial_metadata) {
auto* arena = call_arena_allocator_->MakeArena();
return MakeCallPair(std::move(client_initial_metadata), event_engine_.get(),
arena, call_arena_allocator_, nullptr);
}
private:
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
event_engine_{
@ -149,9 +157,12 @@ class ClientTransportTest : public ::testing::Test {
return options;
}(),
fuzzing_event_engine::Actions())};
MemoryAllocator allocator_ = MakeResourceQuota("test-quota")
->memory_quota()
->CreateMemoryAllocator("test-allocator");
RefCountedPtr<CallArenaAllocator> call_arena_allocator_{
MakeRefCounted<CallArenaAllocator>(
MakeResourceQuota("test-quota")
->memory_quota()
->CreateMemoryAllocator("test-allocator"),
1024)};
};
TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) {
@ -177,8 +188,7 @@ TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) {
std::move(control_endpoint.promise_endpoint),
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
auto call = MakeCall(TestInitialMetadata());
transport->StartCall(call.handler.V2HackToStartCallWithoutACallFilterStack());
call.initiator.SpawnGuarded("test-send",
[initiator = call.initiator]() mutable {
@ -222,8 +232,7 @@ TEST_F(ClientTransportTest, AddOneStreamWithReadFailed) {
std::move(control_endpoint.promise_endpoint),
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
auto call = MakeCall(TestInitialMetadata());
transport->StartCall(call.handler.V2HackToStartCallWithoutACallFilterStack());
call.initiator.SpawnGuarded("test-send",
[initiator = call.initiator]() mutable {
@ -275,12 +284,10 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithWriteFailed) {
std::move(control_endpoint.promise_endpoint),
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call1 = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
auto call1 = MakeCall(TestInitialMetadata());
transport->StartCall(
call1.handler.V2HackToStartCallWithoutACallFilterStack());
auto call2 = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
auto call2 = MakeCall(TestInitialMetadata());
transport->StartCall(
call2.handler.V2HackToStartCallWithoutACallFilterStack());
call1.initiator.SpawnGuarded(
@ -347,12 +354,10 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithReadFailed) {
std::move(control_endpoint.promise_endpoint),
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call1 = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
auto call1 = MakeCall(TestInitialMetadata());
transport->StartCall(
call1.handler.V2HackToStartCallWithoutACallFilterStack());
auto call2 = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
auto call2 = MakeCall(TestInitialMetadata());
transport->StartCall(
call2.handler.V2HackToStartCallWithoutACallFilterStack());
call1.initiator.SpawnGuarded(

@ -15,6 +15,7 @@
#include "src/core/ext/transport/chaotic_good/client_transport.h"
#include <algorithm>
#include <cstddef>
#include <cstdlib>
#include <initializer_list>
#include <memory>
@ -78,12 +79,15 @@ auto SendClientToServerMessages(CallInitiator initiator, int num_messages) {
bool has_message = (i < num_messages);
return If(
has_message,
Seq(initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString(std::to_string(i))), 0)),
[&i]() -> LoopCtl<absl::Status> {
++i;
return Continue();
}),
[initiator, &i]() mutable {
return Seq(
initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString(std::to_string(i))), 0)),
[&i]() -> LoopCtl<absl::Status> {
++i;
return Continue();
});
},
[initiator]() mutable -> LoopCtl<absl::Status> {
initiator.FinishSends();
return absl::OkStatus();
@ -115,8 +119,7 @@ TEST_F(TransportTest, AddOneStream) {
std::move(control_endpoint.promise_endpoint),
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(1024, memory_allocator()), true);
auto call = MakeCall(TestInitialMetadata());
transport->StartCall(call.handler.V2HackToStartCallWithoutACallFilterStack());
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(on_done, Call());
@ -202,8 +205,7 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) {
std::move(control_endpoint.promise_endpoint),
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
auto call = MakeCall(TestInitialMetadata());
transport->StartCall(call.handler.V2HackToStartCallWithoutACallFilterStack());
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(on_done, Call());

@ -112,7 +112,7 @@ TEST_F(TransportTest, ReadAndWriteOneMessage) {
data_endpoint.ExpectRead(
{EventEngineSlice::FromCopiedString("12345678"), Zeros(56)}, nullptr);
// Once that's read we'll create a new call
auto* call_arena = Arena::Create(1024, memory_allocator());
auto* call_arena = MakeArena();
EXPECT_CALL(acceptor, CreateArena).WillOnce(Return(call_arena));
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(acceptor, CreateCall(_, call_arena))
@ -121,9 +121,9 @@ TEST_F(TransportTest, ReadAndWriteOneMessage) {
EXPECT_EQ(client_initial_metadata->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
CallInitiatorAndHandler call =
MakeCall(std::move(client_initial_metadata), event_engine().get(),
call_arena, true);
CallInitiatorAndHandler call = MakeCallPair(
std::move(client_initial_metadata), event_engine().get(),
call_arena, call_arena_allocator(), nullptr);
auto handler = call.handler.V2HackToStartCallWithoutACallFilterStack();
handler.SpawnInfallible("test-io", [&on_done, handler]() mutable {
return Seq(

@ -36,7 +36,17 @@ class TransportTest : public ::testing::Test {
return event_engine_;
}
MemoryAllocator* memory_allocator() { return &allocator_; }
Arena* MakeArena() { return call_arena_allocator_->MakeArena(); }
RefCountedPtr<CallArenaAllocator> call_arena_allocator() {
return call_arena_allocator_;
}
auto MakeCall(ClientMetadataHandle client_initial_metadata) {
auto* arena = call_arena_allocator_->MakeArena();
return MakeCallPair(std::move(client_initial_metadata), event_engine_.get(),
arena, call_arena_allocator_, nullptr);
}
private:
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
@ -49,9 +59,12 @@ class TransportTest : public ::testing::Test {
return options;
}(),
fuzzing_event_engine::Actions())};
MemoryAllocator allocator_ = MakeResourceQuota("test-quota")
->memory_quota()
->CreateMemoryAllocator("test-allocator");
RefCountedPtr<CallArenaAllocator> call_arena_allocator_{
MakeRefCounted<CallArenaAllocator>(
MakeResourceQuota("test-quota")
->memory_quota()
->CreateMemoryAllocator("test-allocator"),
1024)};
};
grpc_event_engine::experimental::Slice SerializedFrameHeader(

@ -0,0 +1,406 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/transport/interception_chain.h"
#include <memory>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <grpc/support/log.h>
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "test/core/promise/poll_matcher.h"
namespace grpc_core {
namespace {
///////////////////////////////////////////////////////////////////////////////
// Mutate metadata by annotating that it passed through a filter "x"
void AnnotatePassedThrough(ClientMetadata& md, int x) {
md.Append(absl::StrCat("passed-through-", x), Slice::FromCopiedString("true"),
[](absl::string_view, const Slice&) { Crash("unreachable"); });
}
///////////////////////////////////////////////////////////////////////////////
// CreationLog helps us reason about filter creation order by logging a small
// record of each filter's creation.
struct CreationLogEntry {
size_t filter_instance_id;
size_t type_tag;
bool operator==(const CreationLogEntry& other) const {
return filter_instance_id == other.filter_instance_id &&
type_tag == other.type_tag;
}
friend std::ostream& operator<<(std::ostream& os,
const CreationLogEntry& entry) {
return os << "{filter_instance_id=" << entry.filter_instance_id
<< ", type_tag=" << entry.type_tag << "}";
}
};
struct CreationLog {
struct RawPointerChannelArgTag {};
static absl::string_view ChannelArgName() { return "creation_log"; }
std::vector<CreationLogEntry> entries;
};
void MaybeLogCreation(const ChannelArgs& channel_args,
ChannelFilter::Args filter_args, size_t type_tag) {
auto* log = channel_args.GetPointer<CreationLog>("creation_log");
if (log == nullptr) return;
log->entries.push_back(CreationLogEntry{filter_args.instance_id(), type_tag});
}
///////////////////////////////////////////////////////////////////////////////
// Test call filter
template <int I>
class TestFilter {
public:
class Call {
public:
void OnClientInitialMetadata(ClientMetadata& md) {
AnnotatePassedThrough(md, I);
}
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnFinalize;
};
static absl::StatusOr<std::unique_ptr<TestFilter<I>>> Create(
const ChannelArgs& channel_args, ChannelFilter::Args filter_args) {
MaybeLogCreation(channel_args, filter_args, I);
return std::make_unique<TestFilter<I>>();
}
private:
std::unique_ptr<int> i_ = std::make_unique<int>(I);
};
template <int I>
const NoInterceptor TestFilter<I>::Call::OnServerInitialMetadata;
template <int I>
const NoInterceptor TestFilter<I>::Call::OnClientToServerMessage;
template <int I>
const NoInterceptor TestFilter<I>::Call::OnServerToClientMessage;
template <int I>
const NoInterceptor TestFilter<I>::Call::OnServerTrailingMetadata;
template <int I>
const NoInterceptor TestFilter<I>::Call::OnFinalize;
///////////////////////////////////////////////////////////////////////////////
// Test call filter that fails to instantiate
template <int I>
class FailsToInstantiateFilter {
public:
class Call {
public:
static const NoInterceptor OnClientInitialMetadata;
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnFinalize;
};
static absl::StatusOr<std::unique_ptr<FailsToInstantiateFilter<I>>> Create(
const ChannelArgs& channel_args, ChannelFilter::Args filter_args) {
MaybeLogCreation(channel_args, filter_args, I);
return absl::InternalError(absl::StrCat("👊 failed to instantiate ", I));
}
};
template <int I>
const NoInterceptor FailsToInstantiateFilter<I>::Call::OnClientInitialMetadata;
template <int I>
const NoInterceptor FailsToInstantiateFilter<I>::Call::OnServerInitialMetadata;
template <int I>
const NoInterceptor FailsToInstantiateFilter<I>::Call::OnClientToServerMessage;
template <int I>
const NoInterceptor FailsToInstantiateFilter<I>::Call::OnServerToClientMessage;
template <int I>
const NoInterceptor FailsToInstantiateFilter<I>::Call::OnServerTrailingMetadata;
template <int I>
const NoInterceptor FailsToInstantiateFilter<I>::Call::OnFinalize;
///////////////////////////////////////////////////////////////////////////////
// Test call interceptor - consumes calls
template <int I>
class TestConsumingInterceptor final : public Interceptor {
public:
void StartCall(UnstartedCallHandler unstarted_call_handler) override {
Consume(std::move(unstarted_call_handler))
.PushServerTrailingMetadata(
ServerMetadataFromStatus(absl::InternalError("👊 consumed")));
}
void Orphaned() override {}
static absl::StatusOr<RefCountedPtr<TestConsumingInterceptor<I>>> Create(
const ChannelArgs& channel_args, ChannelFilter::Args filter_args) {
MaybeLogCreation(channel_args, filter_args, I);
return MakeRefCounted<TestConsumingInterceptor<I>>();
}
};
///////////////////////////////////////////////////////////////////////////////
// Test call interceptor - fails to instantiate
template <int I>
class TestFailingInterceptor final : public Interceptor {
public:
void StartCall(UnstartedCallHandler unstarted_call_handler) override {
Crash("unreachable");
}
void Orphaned() override {}
static absl::StatusOr<RefCountedPtr<TestFailingInterceptor<I>>> Create(
const ChannelArgs& channel_args, ChannelFilter::Args filter_args) {
MaybeLogCreation(channel_args, filter_args, I);
return absl::InternalError(absl::StrCat("👊 failed to instantiate ", I));
}
};
///////////////////////////////////////////////////////////////////////////////
// Test call interceptor - hijacks calls
template <int I>
class TestHijackingInterceptor final : public Interceptor {
public:
void StartCall(UnstartedCallHandler unstarted_call_handler) override {
unstarted_call_handler.SpawnInfallible(
"hijack", [this, unstarted_call_handler]() mutable {
return Map(Hijack(std::move(unstarted_call_handler)),
[](ValueOrFailure<HijackedCall> hijacked_call) {
ForwardCall(
hijacked_call.value().original_call_handler(),
hijacked_call.value().MakeCall());
return Empty{};
});
});
}
void Orphaned() override {}
static absl::StatusOr<RefCountedPtr<TestHijackingInterceptor<I>>> Create(
const ChannelArgs& channel_args, ChannelFilter::Args filter_args) {
MaybeLogCreation(channel_args, filter_args, I);
return MakeRefCounted<TestHijackingInterceptor<I>>();
}
};
///////////////////////////////////////////////////////////////////////////////
// Test fixture
class InterceptionChainTest : public ::testing::Test {
protected:
InterceptionChainTest() {}
~InterceptionChainTest() override {}
RefCountedPtr<UnstartedCallDestination> destination() { return destination_; }
struct FinishedCall {
CallInitiator call;
ClientMetadataHandle client_metadata;
ServerMetadataHandle server_metadata;
};
// Run a call through a UnstartedCallDestination until it's complete.
FinishedCall RunCall(UnstartedCallDestination* destination) {
auto* arena = call_arena_allocator_->MakeArena();
auto call = MakeCallPair(Arena::MakePooled<ClientMetadata>(), nullptr,
arena, call_arena_allocator_, nullptr);
Poll<ServerMetadataHandle> trailing_md;
call.initiator.SpawnInfallible(
"run_call", [destination, &call, &trailing_md]() mutable {
gpr_log(GPR_INFO, "👊 start call");
destination->StartCall(std::move(call.handler));
return Map(call.initiator.PullServerTrailingMetadata(),
[&trailing_md](ServerMetadataHandle md) {
trailing_md = std::move(md);
return Empty{};
});
});
EXPECT_THAT(trailing_md, IsReady());
return FinishedCall{std::move(call.initiator), destination_->TakeMetadata(),
std::move(trailing_md.value())};
}
private:
class Destination final : public UnstartedCallDestination {
public:
void StartCall(UnstartedCallHandler unstarted_call_handler) override {
gpr_log(GPR_INFO, "👊 started call: metadata=%s",
unstarted_call_handler.UnprocessedClientInitialMetadata()
.DebugString()
.c_str());
EXPECT_EQ(metadata_.get(), nullptr);
metadata_ = Arena::MakePooled<ClientMetadata>();
*metadata_ =
unstarted_call_handler.UnprocessedClientInitialMetadata().Copy();
unstarted_call_handler.PushServerTrailingMetadata(
ServerMetadataFromStatus(absl::InternalError("👊 cancelled")));
}
void Orphaned() override {}
ClientMetadataHandle TakeMetadata() { return std::move(metadata_); }
private:
ClientMetadataHandle metadata_;
};
RefCountedPtr<Destination> destination_ = MakeRefCounted<Destination>();
RefCountedPtr<CallArenaAllocator> call_arena_allocator_ =
MakeRefCounted<CallArenaAllocator>(
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
"test"),
1024);
};
///////////////////////////////////////////////////////////////////////////////
// Tests begin
TEST_F(InterceptionChainTest, Empty) {
auto r = InterceptionChainBuilder(ChannelArgs()).Build(destination());
ASSERT_TRUE(r.ok()) << r.status();
auto finished_call = RunCall(r.value().get());
EXPECT_EQ(finished_call.server_metadata->get(GrpcStatusMetadata()),
GRPC_STATUS_INTERNAL);
EXPECT_EQ(finished_call.server_metadata->get_pointer(GrpcMessageMetadata())
->as_string_view(),
"👊 cancelled");
EXPECT_NE(finished_call.client_metadata, nullptr);
}
TEST_F(InterceptionChainTest, Consumed) {
auto r = InterceptionChainBuilder(ChannelArgs())
.Add<TestConsumingInterceptor<1>>()
.Build(destination());
ASSERT_TRUE(r.ok()) << r.status();
auto finished_call = RunCall(r.value().get());
EXPECT_EQ(finished_call.server_metadata->get(GrpcStatusMetadata()),
GRPC_STATUS_INTERNAL);
EXPECT_EQ(finished_call.server_metadata->get_pointer(GrpcMessageMetadata())
->as_string_view(),
"👊 consumed");
EXPECT_EQ(finished_call.client_metadata, nullptr);
}
TEST_F(InterceptionChainTest, Hijacked) {
auto r = InterceptionChainBuilder(ChannelArgs())
.Add<TestHijackingInterceptor<1>>()
.Build(destination());
ASSERT_TRUE(r.ok()) << r.status();
auto finished_call = RunCall(r.value().get());
EXPECT_EQ(finished_call.server_metadata->get(GrpcStatusMetadata()),
GRPC_STATUS_INTERNAL);
EXPECT_EQ(finished_call.server_metadata->get_pointer(GrpcMessageMetadata())
->as_string_view(),
"👊 cancelled");
EXPECT_NE(finished_call.client_metadata, nullptr);
}
TEST_F(InterceptionChainTest, FiltersThenHijacked) {
auto r = InterceptionChainBuilder(ChannelArgs())
.Add<TestFilter<1>>()
.Add<TestHijackingInterceptor<2>>()
.Build(destination());
ASSERT_TRUE(r.ok()) << r.status();
auto finished_call = RunCall(r.value().get());
EXPECT_EQ(finished_call.server_metadata->get(GrpcStatusMetadata()),
GRPC_STATUS_INTERNAL);
EXPECT_EQ(finished_call.server_metadata->get_pointer(GrpcMessageMetadata())
->as_string_view(),
"👊 cancelled");
EXPECT_NE(finished_call.client_metadata, nullptr);
std::string backing;
EXPECT_EQ(finished_call.client_metadata->GetStringValue("passed-through-1",
&backing),
"true");
}
TEST_F(InterceptionChainTest, FailsToInstantiateInterceptor) {
auto r = InterceptionChainBuilder(ChannelArgs())
.Add<TestFailingInterceptor<1>>()
.Build(destination());
EXPECT_FALSE(r.ok());
EXPECT_EQ(r.status().code(), absl::StatusCode::kInternal);
EXPECT_EQ(r.status().message(), "👊 failed to instantiate 1");
}
TEST_F(InterceptionChainTest, FailsToInstantiateInterceptor2) {
auto r = InterceptionChainBuilder(ChannelArgs())
.Add<TestFilter<1>>()
.Add<TestFailingInterceptor<2>>()
.Build(destination());
EXPECT_FALSE(r.ok());
EXPECT_EQ(r.status().code(), absl::StatusCode::kInternal);
EXPECT_EQ(r.status().message(), "👊 failed to instantiate 2");
}
TEST_F(InterceptionChainTest, FailsToInstantiateFilter) {
auto r = InterceptionChainBuilder(ChannelArgs())
.Add<FailsToInstantiateFilter<1>>()
.Build(destination());
EXPECT_FALSE(r.ok());
EXPECT_EQ(r.status().code(), absl::StatusCode::kInternal);
EXPECT_EQ(r.status().message(), "👊 failed to instantiate 1");
}
TEST_F(InterceptionChainTest, FailsToInstantiateFilter2) {
auto r = InterceptionChainBuilder(ChannelArgs())
.Add<TestFilter<1>>()
.Add<FailsToInstantiateFilter<2>>()
.Build(destination());
EXPECT_FALSE(r.ok());
EXPECT_EQ(r.status().code(), absl::StatusCode::kInternal);
EXPECT_EQ(r.status().message(), "👊 failed to instantiate 2");
}
TEST_F(InterceptionChainTest, CreationOrderCorrect) {
CreationLog log;
auto r = InterceptionChainBuilder(ChannelArgs().SetObject(&log))
.Add<TestFilter<1>>()
.Add<TestFilter<2>>()
.Add<TestFilter<3>>()
.Add<TestConsumingInterceptor<4>>()
.Add<TestFilter<1>>()
.Add<TestFilter<2>>()
.Add<TestFilter<3>>()
.Add<TestConsumingInterceptor<4>>()
.Add<TestFilter<1>>()
.Build(destination());
EXPECT_THAT(log.entries, ::testing::ElementsAre(
CreationLogEntry{0, 1}, CreationLogEntry{0, 2},
CreationLogEntry{0, 3}, CreationLogEntry{0, 4},
CreationLogEntry{1, 1}, CreationLogEntry{1, 2},
CreationLogEntry{1, 3}, CreationLogEntry{1, 4},
CreationLogEntry{2, 1}));
}
} // namespace
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc_tracer_init();
gpr_log_verbosity_init();
return RUN_ALL_TESTS();
}

@ -58,8 +58,7 @@ void TransportTest::SetServerAcceptor() {
CallInitiator TransportTest::CreateCall(
ClientMetadataHandle client_initial_metadata) {
auto call = MakeCall(std::move(client_initial_metadata), event_engine_.get(),
Arena::Create(1024, &allocator_), true);
auto call = MakeCall(std::move(client_initial_metadata));
call.handler.SpawnInfallible(
"start-call", [this, handler = call.handler]() mutable {
transport_pair_.client->client_transport()->StartCall(
@ -231,13 +230,14 @@ std::string TransportTest::RandomMessage() {
// TransportTest::Acceptor
Arena* TransportTest::Acceptor::CreateArena() {
return Arena::Create(1024, allocator_);
return test_->call_arena_allocator_->MakeArena();
}
absl::StatusOr<CallInitiator> TransportTest::Acceptor::CreateCall(
ClientMetadataHandle client_initial_metadata, Arena* arena) {
auto call =
MakeCall(std::move(client_initial_metadata), event_engine_, arena, true);
auto call = MakeCallPair(std::move(client_initial_metadata),
test_->event_engine_.get(), arena,
test_->call_arena_allocator_, nullptr);
handlers_.push(call.handler.V2HackToStartCallWithoutACallFilterStack());
return std::move(call.initiator);
}

@ -86,10 +86,11 @@ class ActionState {
explicit ActionState(NameAndLocation name_and_location);
State Get() const { return state_; }
void Set(State state) {
void Set(State state, SourceLocation whence = {}) {
gpr_log(GPR_INFO, "%s",
absl::StrCat(StateString(state), " ", name(), " [", step(), "] ",
file(), ":", line())
file(), ":", line(), " @ ", whence.file(), ":",
whence.line())
.c_str());
state_ = state;
}
@ -237,6 +238,12 @@ class TransportTest : public ::testing::Test {
CallHandler TickUntilServerCall();
void WaitForAllPendingWork();
auto MakeCall(ClientMetadataHandle client_initial_metadata) {
auto* arena = call_arena_allocator_->MakeArena();
return MakeCallPair(std::move(client_initial_metadata), event_engine_.get(),
arena, call_arena_allocator_, nullptr);
}
// Alternative for Seq for test driver code.
// Registers each step so that WaitForAllPendingWork() can report progress,
// and wait for completion... AND generate good failure messages when a
@ -265,9 +272,7 @@ class TransportTest : public ::testing::Test {
class Acceptor final : public ServerTransport::Acceptor {
public:
Acceptor(grpc_event_engine::experimental::EventEngine* event_engine,
MemoryAllocator* allocator)
: event_engine_(event_engine), allocator_(allocator) {}
explicit Acceptor(TransportTest* test) : test_(test) {}
Arena* CreateArena() override;
absl::StatusOr<CallInitiator> CreateCall(
@ -276,8 +281,7 @@ class TransportTest : public ::testing::Test {
private:
std::queue<CallHandler> handlers_;
grpc_event_engine::experimental::EventEngine* const event_engine_;
MemoryAllocator* const allocator_;
TransportTest* const test_;
};
class WatchDog {
@ -303,10 +307,13 @@ class TransportTest : public ::testing::Test {
}(),
fuzzing_event_engine::Actions())};
std::unique_ptr<TransportFixture> fixture_;
MemoryAllocator allocator_ = MakeResourceQuota("test-quota")
->memory_quota()
->CreateMemoryAllocator("test-allocator");
Acceptor acceptor_{event_engine_.get(), &allocator_};
RefCountedPtr<CallArenaAllocator> call_arena_allocator_{
MakeRefCounted<CallArenaAllocator>(
MakeResourceQuota("test-quota")
->memory_quota()
->CreateMemoryAllocator("test-allocator"),
1024)};
Acceptor acceptor_{this};
TransportFixture::ClientAndServerTransportPair transport_pair_ =
fixture_->CreateTransportPair(event_engine_);
std::queue<std::shared_ptr<transport_test_detail::ActionState>>

@ -2804,12 +2804,12 @@ src/core/lib/transport/batch_builder.cc \
src/core/lib/transport/batch_builder.h \
src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/bdp_estimator.h \
src/core/lib/transport/call_arena_allocator.cc \
src/core/lib/transport/call_arena_allocator.h \
src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_filters.h \
src/core/lib/transport/call_final_info.cc \
src/core/lib/transport/call_final_info.h \
src/core/lib/transport/call_size_estimator.cc \
src/core/lib/transport/call_size_estimator.h \
src/core/lib/transport/call_spine.cc \
src/core/lib/transport/call_spine.h \
src/core/lib/transport/connectivity_state.cc \

@ -2581,12 +2581,12 @@ src/core/lib/transport/batch_builder.cc \
src/core/lib/transport/batch_builder.h \
src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/bdp_estimator.h \
src/core/lib/transport/call_arena_allocator.cc \
src/core/lib/transport/call_arena_allocator.h \
src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_filters.h \
src/core/lib/transport/call_final_info.cc \
src/core/lib/transport/call_final_info.h \
src/core/lib/transport/call_size_estimator.cc \
src/core/lib/transport/call_size_estimator.h \
src/core/lib/transport/call_spine.cc \
src/core/lib/transport/call_spine.h \
src/core/lib/transport/connectivity_state.cc \

@ -5103,6 +5103,30 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "interception_chain_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save