[call-v3] Direct channel implementation (#36734)

This change brings up the direct channel, and inproc promise based transports.

This work exposed a bug that was very difficult to fix with the current call_filters.cc implementation, so I've substantially revamped that - instead of having a pipe-like object per call element, we now have a big ol' combined state machine for the entire call. It's a touch more code, but substantially easier to reason about individual cases, so I much prefer this form (it's also a slight memory improvement: 12 bytes total to track call state, and 10 of those are wakeup bitmasks...).

Closes #36734

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36734 from ctiller:transport-refs-9 3e2a80b40d
PiperOrigin-RevId: 644034593
pull/36952/head
Craig Tiller 9 months ago committed by Copybara-Service
parent 426df93434
commit e21467475f
  1. 1
      BUILD
  2. 84
      CMakeLists.txt
  3. 1
      Makefile
  4. 2
      Package.swift
  5. 81
      build_autogenerated.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 2
      gRPC-C++.podspec
  9. 3
      gRPC-Core.podspec
  10. 2
      grpc.gemspec
  11. 2
      package.xml
  12. 31
      src/core/BUILD
  13. 80
      src/core/client_channel/direct_channel.cc
  14. 101
      src/core/client_channel/direct_channel.h
  15. 43
      src/core/ext/transport/chaotic_good/client_transport.cc
  16. 2
      src/core/ext/transport/chaotic_good/client_transport.h
  17. 117
      src/core/ext/transport/inproc/inproc_transport.cc
  18. 1
      src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
  19. 10
      src/core/lib/promise/activity.h
  20. 12
      src/core/lib/promise/status_flag.h
  21. 2
      src/core/lib/surface/call_utils.cc
  22. 13
      src/core/lib/surface/channel_create.cc
  23. 5
      src/core/lib/surface/client_call.cc
  24. 6
      src/core/lib/surface/client_call.h
  25. 2
      src/core/lib/surface/server_call.cc
  26. 12
      src/core/lib/surface/server_call.h
  27. 767
      src/core/lib/transport/call_filters.cc
  28. 917
      src/core/lib/transport/call_filters.h
  29. 158
      src/core/lib/transport/call_spine.cc
  30. 27
      src/core/lib/transport/call_spine.h
  31. 13
      src/core/lib/transport/metadata.cc
  32. 18
      src/core/lib/transport/metadata.h
  33. 4
      src/core/lib/transport/transport.h
  34. 1
      src/python/grpcio/grpc_core_dependencies.py
  35. 2
      test/core/client_channel/client_channel_test.cc
  36. 1
      test/core/end2end/cq_verifier.cc
  37. 14
      test/core/end2end/end2end_test_suites.cc
  38. 7
      test/core/end2end/end2end_tests.h
  39. 17
      test/core/end2end/fixtures/inproc_fixture.h
  40. 4
      test/core/end2end/tests/cancel_after_client_done.cc
  41. 6
      test/core/end2end/tests/channelz.cc
  42. 2
      test/core/end2end/tests/disappearing_server.cc
  43. 12
      test/core/end2end/tests/filter_init_fails.cc
  44. 20
      test/core/end2end/tests/max_message_length.cc
  45. 8
      test/core/end2end/tests/request_with_flags.cc
  46. 2
      test/core/end2end/tests/shutdown_finishes_calls.cc
  47. 6
      test/core/end2end/tests/streaming_error_response.cc
  48. 6
      test/core/end2end/tests/timeout_before_request_call.cc
  49. 14
      test/core/event_engine/event_engine_test_utils.cc
  50. 14
      test/core/transport/BUILD
  51. 340
      test/core/transport/call_filters_test.cc
  52. 189
      test/core/transport/call_spine_test.cc
  53. 1
      test/core/transport/corpus/call_spine/empty
  54. 1
      test/cpp/microbenchmarks/fullstack_fixtures.h
  55. 1
      test/cpp/microbenchmarks/fullstack_unary_ping_pong.h
  56. 1
      tools/distrib/fix_build_deps.py
  57. 2
      tools/doxygen/Doxyfile.c++.internal
  58. 2
      tools/doxygen/Doxyfile.core.internal
  59. 20
      tools/run_tests/generated/tests.json

@ -1871,6 +1871,7 @@ grpc_cc_library(
"//src/core:arena",
"//src/core:channel_args",
"//src/core:channel_stack_type",
"//src/core:direct_channel",
"//src/core:experiments",
"//src/core:iomgr_fwd",
"//src/core:ref_counted",

84
CMakeLists.txt generated

@ -967,6 +967,9 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx call_filters_test)
add_dependencies(buildtests_cxx call_finalization_test)
add_dependencies(buildtests_cxx call_host_override_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx call_spine_test)
endif()
add_dependencies(buildtests_cxx call_tracer_test)
add_dependencies(buildtests_cxx call_utils_test)
add_dependencies(buildtests_cxx cancel_after_accept_test)
@ -1853,6 +1856,7 @@ add_library(grpc
src/core/client_channel/client_channel_filter.cc
src/core/client_channel/client_channel_plugin.cc
src/core/client_channel/client_channel_service_config.cc
src/core/client_channel/direct_channel.cc
src/core/client_channel/dynamic_filters.cc
src/core/client_channel/global_subchannel_pool.cc
src/core/client_channel/load_balanced_call_destination.cc
@ -2945,6 +2949,7 @@ add_library(grpc_unsecure
src/core/client_channel/client_channel_filter.cc
src/core/client_channel/client_channel_plugin.cc
src/core/client_channel/client_channel_service_config.cc
src/core/client_channel/direct_channel.cc
src/core/client_channel/dynamic_filters.cc
src/core/client_channel/global_subchannel_pool.cc
src/core/client_channel/load_balanced_call_destination.cc
@ -6012,6 +6017,7 @@ if(gRPC_BUILD_TESTS)
add_executable(activity_test
src/core/lib/debug/trace.cc
src/core/lib/debug/trace_flags.cc
src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/promise/activity.cc
test/core/promise/activity_test.cc
@ -8724,6 +8730,59 @@ target_link_libraries(call_host_override_test
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_executable(call_spine_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
test/core/call/yodel/test_main.cc
test/core/call/yodel/yodel_test.cc
test/core/event_engine/event_engine_test_utils.cc
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
test/core/transport/call_spine_test.cc
)
if(WIN32 AND MSVC)
if(BUILD_SHARED_LIBS)
target_compile_definitions(call_spine_test
PRIVATE
"GPR_DLL_IMPORTS"
"GRPC_DLL_IMPORTS"
)
endif()
endif()
target_compile_features(call_spine_test PUBLIC cxx_std_14)
target_include_directories(call_spine_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(call_spine_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
${_gRPC_PROTOBUF_LIBRARIES}
grpc_test_util
)
endif()
endif()
if(gRPC_BUILD_TESTS)
@ -9438,6 +9497,7 @@ add_executable(cancel_callback_test
src/core/lib/debug/trace_flags.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
@ -10545,6 +10605,7 @@ add_executable(chunked_vector_test
src/core/lib/debug/trace_flags.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
@ -13853,6 +13914,7 @@ add_executable(exec_ctx_wakeup_scheduler_test
src/core/lib/debug/trace_flags.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
@ -14653,6 +14715,7 @@ add_executable(flow_control_test
src/core/lib/debug/trace_flags.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
@ -14739,6 +14802,7 @@ add_executable(for_each_test
src/core/lib/debug/trace_flags.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
@ -18186,6 +18250,7 @@ if(gRPC_BUILD_TESTS)
add_executable(inter_activity_pipe_test
src/core/lib/debug/trace.cc
src/core/lib/debug/trace_flags.cc
src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/promise/activity.cc
test/core/promise/inter_activity_pipe_test.cc
@ -18539,6 +18604,7 @@ add_executable(interceptor_list_test
src/core/lib/debug/trace_flags.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
@ -19331,6 +19397,7 @@ if(gRPC_BUILD_TESTS)
add_executable(latch_test
src/core/lib/debug/trace.cc
src/core/lib/debug/trace_flags.cc
src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/promise/activity.cc
test/core/promise/latch_test.cc
@ -19749,6 +19816,7 @@ add_executable(map_pipe_test
src/core/lib/debug/trace_flags.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
@ -20733,6 +20801,10 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(mpsc_test
src/core/lib/debug/trace.cc
src/core/lib/debug/trace_flags.cc
src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/promise/activity.cc
test/core/promise/mpsc_test.cc
)
@ -20768,6 +20840,7 @@ target_link_libraries(mpsc_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
absl::config
absl::flat_hash_map
absl::hash
absl::type_traits
absl::statusor
@ -21229,6 +21302,10 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(observable_test
src/core/lib/debug/trace.cc
src/core/lib/debug/trace_flags.cc
src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/promise/activity.cc
test/core/promise/observable_test.cc
)
@ -21264,6 +21341,7 @@ target_link_libraries(observable_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
absl::config
absl::flat_hash_map
absl::hash
absl::type_traits
absl::statusor
@ -23109,6 +23187,7 @@ if(gRPC_BUILD_TESTS)
add_executable(promise_mutex_test
src/core/lib/debug/trace.cc
src/core/lib/debug/trace_flags.cc
src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/promise/activity.cc
test/core/promise/promise_mutex_test.cc
@ -32870,6 +32949,10 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(wait_for_callback_test
src/core/lib/debug/trace.cc
src/core/lib/debug/trace_flags.cc
src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/promise/activity.cc
test/core/promise/wait_for_callback_test.cc
)
@ -32905,6 +32988,7 @@ target_link_libraries(wait_for_callback_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
absl::config
absl::flat_hash_map
absl::hash
absl::type_traits
absl::statusor

1
Makefile generated

@ -675,6 +675,7 @@ LIBGRPC_SRC = \
src/core/client_channel/client_channel_filter.cc \
src/core/client_channel/client_channel_plugin.cc \
src/core/client_channel/client_channel_service_config.cc \
src/core/client_channel/direct_channel.cc \
src/core/client_channel/dynamic_filters.cc \
src/core/client_channel/global_subchannel_pool.cc \
src/core/client_channel/load_balanced_call_destination.cc \

2
Package.swift generated

@ -138,6 +138,8 @@ let package = Package(
"src/core/client_channel/client_channel_service_config.h",
"src/core/client_channel/config_selector.h",
"src/core/client_channel/connector.h",
"src/core/client_channel/direct_channel.cc",
"src/core/client_channel/direct_channel.h",
"src/core/client_channel/dynamic_filters.cc",
"src/core/client_channel/dynamic_filters.h",
"src/core/client_channel/global_subchannel_pool.cc",

@ -234,6 +234,7 @@ libs:
- src/core/client_channel/client_channel_service_config.h
- src/core/client_channel/config_selector.h
- src/core/client_channel/connector.h
- src/core/client_channel/direct_channel.h
- src/core/client_channel/dynamic_filters.h
- src/core/client_channel/global_subchannel_pool.h
- src/core/client_channel/load_balanced_call_destination.h
@ -1258,6 +1259,7 @@ libs:
- src/core/client_channel/client_channel_filter.cc
- src/core/client_channel/client_channel_plugin.cc
- src/core/client_channel/client_channel_service_config.cc
- src/core/client_channel/direct_channel.cc
- src/core/client_channel/dynamic_filters.cc
- src/core/client_channel/global_subchannel_pool.cc
- src/core/client_channel/load_balanced_call_destination.cc
@ -2219,6 +2221,7 @@ libs:
- src/core/client_channel/client_channel_service_config.h
- src/core/client_channel/config_selector.h
- src/core/client_channel/connector.h
- src/core/client_channel/direct_channel.h
- src/core/client_channel/dynamic_filters.h
- src/core/client_channel/global_subchannel_pool.h
- src/core/client_channel/load_balanced_call_destination.h
@ -2713,6 +2716,7 @@ libs:
- src/core/client_channel/client_channel_filter.cc
- src/core/client_channel/client_channel_plugin.cc
- src/core/client_channel/client_channel_service_config.cc
- src/core/client_channel/direct_channel.cc
- src/core/client_channel/dynamic_filters.cc
- src/core/client_channel/global_subchannel_pool.cc
- src/core/client_channel/load_balanced_call_destination.cc
@ -5238,6 +5242,7 @@ targets:
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
@ -5260,6 +5265,7 @@ targets:
src:
- src/core/lib/debug/trace.cc
- src/core/lib/debug/trace_flags.cc
- src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/promise/activity.cc
- test/core/promise/activity_test.cc
@ -6428,6 +6434,7 @@ targets:
- src/core/lib/promise/detail/seq_state.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/if.h
- src/core/lib/promise/latch.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
@ -6436,6 +6443,7 @@ targets:
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/promise/status_flag.h
- src/core/lib/promise/try_seq.h
- src/core/lib/resource_quota/arena.h
- src/core/lib/resource_quota/connection_quota.h
- src/core/lib/resource_quota/memory_quota.h
@ -6627,6 +6635,29 @@ targets:
- grpc_authorization_provider
- grpc_unsecure
- grpc_test_util
- name: call_spine_test
gtest: true
build: test
language: c++
headers:
- test/core/call/yodel/yodel_test.h
- test/core/event_engine/event_engine_test_utils.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- test/core/call/yodel/test_main.cc
- test/core/call/yodel/yodel_test.cc
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
- test/core/transport/call_spine_test.cc
deps:
- gtest
- protobuf
- grpc_test_util
platforms:
- linux
- posix
uses_polling: false
- name: call_tracer_test
gtest: true
build: test
@ -7590,6 +7621,7 @@ targets:
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h
@ -7655,6 +7687,7 @@ targets:
- src/core/lib/debug/trace_flags.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
@ -8409,6 +8442,7 @@ targets:
- src/core/lib/gprpp/chunked_vector.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h
@ -8473,6 +8507,7 @@ targets:
- src/core/lib/debug/trace_flags.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
@ -9991,6 +10026,7 @@ targets:
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h
@ -10043,6 +10079,7 @@ targets:
- src/core/lib/debug/trace_flags.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
@ -10550,6 +10587,7 @@ targets:
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h
@ -10619,6 +10657,7 @@ targets:
- src/core/lib/debug/trace_flags.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
@ -10682,6 +10721,7 @@ targets:
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h
@ -10756,6 +10796,7 @@ targets:
- src/core/lib/debug/trace_flags.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
@ -12321,6 +12362,7 @@ targets:
- src/core/lib/event_engine/event_engine_context.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
@ -12339,6 +12381,7 @@ targets:
src:
- src/core/lib/debug/trace.cc
- src/core/lib/debug/trace_flags.cc
- src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/promise/activity.cc
- test/core/promise/inter_activity_pipe_test.cc
@ -12966,6 +13009,7 @@ targets:
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h
@ -13032,6 +13076,7 @@ targets:
- src/core/lib/debug/trace_flags.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
@ -13482,6 +13527,7 @@ targets:
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
@ -13503,6 +13549,7 @@ targets:
src:
- src/core/lib/debug/trace.cc
- src/core/lib/debug/trace_flags.cc
- src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/promise/activity.cc
- test/core/promise/latch_test.cc
@ -13668,6 +13715,7 @@ targets:
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h
@ -13742,6 +13790,7 @@ targets:
- src/core/lib/debug/trace_flags.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
@ -14272,9 +14321,14 @@ targets:
build: test
language: c++
headers:
- src/core/lib/debug/trace.h
- src/core/lib/debug/trace_flags.h
- src/core/lib/debug/trace_impl.h
- src/core/lib/event_engine/event_engine_context.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
@ -14289,11 +14343,16 @@ targets:
- src/core/lib/promise/wait_set.h
- test/core/promise/poll_matcher.h
src:
- src/core/lib/debug/trace.cc
- src/core/lib/debug/trace_flags.cc
- src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/promise/activity.cc
- test/core/promise/mpsc_test.cc
deps:
- gtest
- absl/base:config
- absl/container:flat_hash_map
- absl/hash:hash
- absl/meta:type_traits
- absl/status:statusor
@ -14571,9 +14630,14 @@ targets:
build: test
language: c++
headers:
- src/core/lib/debug/trace.h
- src/core/lib/debug/trace_flags.h
- src/core/lib/debug/trace_impl.h
- src/core/lib/event_engine/event_engine_context.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/notification.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
@ -14589,11 +14653,16 @@ targets:
- src/core/lib/promise/poll.h
- test/core/promise/poll_matcher.h
src:
- src/core/lib/debug/trace.cc
- src/core/lib/debug/trace_flags.cc
- src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/promise/activity.cc
- test/core/promise/observable_test.cc
deps:
- gtest
- absl/base:config
- absl/container:flat_hash_map
- absl/hash:hash
- absl/meta:type_traits
- absl/status:statusor
@ -15503,6 +15572,7 @@ targets:
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
@ -15525,6 +15595,7 @@ targets:
src:
- src/core/lib/debug/trace.cc
- src/core/lib/debug/trace_flags.cc
- src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/promise/activity.cc
- test/core/promise/promise_mutex_test.cc
@ -21279,9 +21350,14 @@ targets:
build: test
language: c++
headers:
- src/core/lib/debug/trace.h
- src/core/lib/debug/trace_flags.h
- src/core/lib/debug/trace_impl.h
- src/core/lib/event_engine/event_engine_context.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/notification.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
@ -21296,11 +21372,16 @@ targets:
- src/core/lib/promise/wait_for_callback.h
- test/core/promise/test_wakeup_schedulers.h
src:
- src/core/lib/debug/trace.cc
- src/core/lib/debug/trace_flags.cc
- src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/promise/activity.cc
- test/core/promise/wait_for_callback_test.cc
deps:
- gtest
- absl/base:config
- absl/container:flat_hash_map
- absl/hash:hash
- absl/meta:type_traits
- absl/status:statusor

1
config.m4 generated

@ -50,6 +50,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/client_channel/client_channel_filter.cc \
src/core/client_channel/client_channel_plugin.cc \
src/core/client_channel/client_channel_service_config.cc \
src/core/client_channel/direct_channel.cc \
src/core/client_channel/dynamic_filters.cc \
src/core/client_channel/global_subchannel_pool.cc \
src/core/client_channel/load_balanced_call_destination.cc \

1
config.w32 generated

@ -15,6 +15,7 @@ if (PHP_GRPC != "no") {
"src\\core\\client_channel\\client_channel_filter.cc " +
"src\\core\\client_channel\\client_channel_plugin.cc " +
"src\\core\\client_channel\\client_channel_service_config.cc " +
"src\\core\\client_channel\\direct_channel.cc " +
"src\\core\\client_channel\\dynamic_filters.cc " +
"src\\core\\client_channel\\global_subchannel_pool.cc " +
"src\\core\\client_channel\\load_balanced_call_destination.cc " +

2
gRPC-C++.podspec generated

@ -280,6 +280,7 @@ Pod::Spec.new do |s|
'src/core/client_channel/client_channel_service_config.h',
'src/core/client_channel/config_selector.h',
'src/core/client_channel/connector.h',
'src/core/client_channel/direct_channel.h',
'src/core/client_channel/dynamic_filters.h',
'src/core/client_channel/global_subchannel_pool.h',
'src/core/client_channel/load_balanced_call_destination.h',
@ -1571,6 +1572,7 @@ Pod::Spec.new do |s|
'src/core/client_channel/client_channel_service_config.h',
'src/core/client_channel/config_selector.h',
'src/core/client_channel/connector.h',
'src/core/client_channel/direct_channel.h',
'src/core/client_channel/dynamic_filters.h',
'src/core/client_channel/global_subchannel_pool.h',
'src/core/client_channel/load_balanced_call_destination.h',

3
gRPC-Core.podspec generated

@ -257,6 +257,8 @@ Pod::Spec.new do |s|
'src/core/client_channel/client_channel_service_config.h',
'src/core/client_channel/config_selector.h',
'src/core/client_channel/connector.h',
'src/core/client_channel/direct_channel.cc',
'src/core/client_channel/direct_channel.h',
'src/core/client_channel/dynamic_filters.cc',
'src/core/client_channel/dynamic_filters.h',
'src/core/client_channel/global_subchannel_pool.cc',
@ -2361,6 +2363,7 @@ Pod::Spec.new do |s|
'src/core/client_channel/client_channel_service_config.h',
'src/core/client_channel/config_selector.h',
'src/core/client_channel/connector.h',
'src/core/client_channel/direct_channel.h',
'src/core/client_channel/dynamic_filters.h',
'src/core/client_channel/global_subchannel_pool.h',
'src/core/client_channel/load_balanced_call_destination.h',

2
grpc.gemspec generated

@ -144,6 +144,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/client_channel/client_channel_service_config.h )
s.files += %w( src/core/client_channel/config_selector.h )
s.files += %w( src/core/client_channel/connector.h )
s.files += %w( src/core/client_channel/direct_channel.cc )
s.files += %w( src/core/client_channel/direct_channel.h )
s.files += %w( src/core/client_channel/dynamic_filters.cc )
s.files += %w( src/core/client_channel/dynamic_filters.h )
s.files += %w( src/core/client_channel/global_subchannel_pool.cc )

2
package.xml generated

@ -126,6 +126,8 @@
<file baseinstalldir="/" name="src/core/client_channel/client_channel_service_config.h" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/config_selector.h" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/connector.h" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/direct_channel.cc" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/direct_channel.h" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/dynamic_filters.cc" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/dynamic_filters.h" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/global_subchannel_pool.cc" role="src" />

@ -955,12 +955,14 @@ grpc_cc_library(
"atomic_utils",
"construct_destruct",
"context",
"dump_args",
"event_engine_context",
"no_destruct",
"poll",
"promise_factory",
"promise_status",
"//:gpr",
"//:grpc_trace",
"//:orphanable",
],
)
@ -7012,8 +7014,8 @@ grpc_cc_library(
"ext/transport/inproc/legacy_inproc_transport.h",
],
external_deps = [
"absl/log",
"absl/log:check",
"absl/log:log",
"absl/status",
"absl/status:statusor",
"absl/strings",
@ -7030,6 +7032,7 @@ grpc_cc_library(
"error",
"experiments",
"iomgr_fwd",
"metadata",
"metadata_batch",
"resource_quota",
"slice",
@ -7448,12 +7451,15 @@ grpc_cc_library(
deps = [
"call_final_info",
"dump_args",
"if",
"latch",
"map",
"message",
"metadata",
"ref_counted",
"seq",
"status_flag",
"try_seq",
"//:gpr",
"//:promise",
"//:ref_counted_ptr",
@ -7553,7 +7559,10 @@ grpc_cc_library(
hdrs = [
"lib/transport/call_spine.h",
],
external_deps = ["absl/log:check"],
external_deps = [
"absl/functional:any_invocable",
"absl/log:check",
],
deps = [
"1999",
"call_arena_allocator",
@ -7574,6 +7583,24 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "direct_channel",
srcs = [
"client_channel/direct_channel.cc",
],
hdrs = [
"client_channel/direct_channel.h",
],
deps = [
"channel_stack_type",
"interception_chain",
"//:channel",
"//:config",
"//:grpc_base",
"//:orphanable",
],
)
grpc_cc_library(
name = "metadata_batch",
srcs = [

@ -0,0 +1,80 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/client_channel/direct_channel.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/surface/client_call.h"
#include "src/core/lib/transport/interception_chain.h"
namespace grpc_core {
absl::StatusOr<RefCountedPtr<DirectChannel>> DirectChannel::Create(
std::string target, const ChannelArgs& args) {
auto* transport = args.GetObject<Transport>();
if (transport == nullptr) {
return absl::InvalidArgumentError("Transport not set in ChannelArgs");
}
if (transport->client_transport() == nullptr) {
return absl::InvalidArgumentError("Transport is not a client transport");
}
auto transport_call_destination = MakeRefCounted<TransportCallDestination>(
OrphanablePtr<ClientTransport>(transport->client_transport()));
auto event_engine =
args.GetObjectRef<grpc_event_engine::experimental::EventEngine>();
if (event_engine == nullptr) {
return absl::InvalidArgumentError("EventEngine not set in ChannelArgs");
}
InterceptionChainBuilder builder(args);
CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder(
GRPC_CLIENT_DIRECT_CHANNEL, builder);
auto interception_chain = builder.Build(transport_call_destination);
if (!interception_chain.ok()) return interception_chain.status();
return MakeRefCounted<DirectChannel>(
std::move(target), args, std::move(event_engine),
std::move(transport_call_destination), std::move(*interception_chain));
}
void DirectChannel::Orphaned() {
transport_call_destination_.reset();
interception_chain_.reset();
}
void DirectChannel::StartCall(UnstartedCallHandler unstarted_handler) {
unstarted_handler.SpawnInfallible(
"start",
[interception_chain = interception_chain_, unstarted_handler]() mutable {
interception_chain->StartCall(std::move(unstarted_handler));
return []() { return Empty{}; };
});
}
void DirectChannel::GetInfo(const grpc_channel_info*) {
// TODO(roth): Implement this.
}
grpc_call* DirectChannel::CreateCall(
grpc_call* parent_call, uint32_t propagation_mask,
grpc_completion_queue* cq, grpc_pollset_set* /*pollset_set_alternative*/,
Slice path, absl::optional<Slice> authority, Timestamp deadline,
bool /*registered_method*/) {
return MakeClientCall(parent_call, propagation_mask, cq, std::move(path),
std::move(authority), false, deadline,
compression_options(), event_engine_.get(),
call_arena_allocator()->MakeArena(), Ref());
}
} // namespace grpc_core

@ -0,0 +1,101 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_DIRECT_CHANNEL_H
#define GRPC_SRC_CORE_CLIENT_CHANNEL_DIRECT_CHANNEL_H
#include <memory>
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
class DirectChannel final : public Channel {
public:
class TransportCallDestination final : public CallDestination {
public:
explicit TransportCallDestination(OrphanablePtr<ClientTransport> transport)
: transport_(std::move(transport)) {}
ClientTransport* transport() { return transport_.get(); }
void HandleCall(CallHandler handler) override {
transport_->StartCall(std::move(handler));
}
void Orphaned() override { transport_.reset(); }
private:
OrphanablePtr<ClientTransport> transport_;
};
static absl::StatusOr<RefCountedPtr<DirectChannel>> Create(
std::string target, const ChannelArgs& args);
DirectChannel(
std::string target, const ChannelArgs& args,
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine,
RefCountedPtr<TransportCallDestination> transport_call_destination,
RefCountedPtr<UnstartedCallDestination> interception_chain)
: Channel(std::move(target), args),
transport_call_destination_(std::move(transport_call_destination)),
interception_chain_(std::move(interception_chain)),
event_engine_(std::move(event_engine)) {}
void Orphaned() override;
void StartCall(UnstartedCallHandler unstarted_handler) override;
bool IsLame() const override { return false; }
grpc_call* CreateCall(grpc_call* parent_call, uint32_t propagation_mask,
grpc_completion_queue* cq,
grpc_pollset_set* pollset_set_alternative, Slice path,
absl::optional<Slice> authority, Timestamp deadline,
bool registered_method) override;
grpc_event_engine::experimental::EventEngine* event_engine() const override {
return event_engine_.get();
}
bool SupportsConnectivityWatcher() const override { return false; }
grpc_connectivity_state CheckConnectivityState(bool) override {
Crash("CheckConnectivityState not supported");
}
void WatchConnectivityState(grpc_connectivity_state, Timestamp,
grpc_completion_queue*, void*) override {
Crash("WatchConnectivityState not supported");
}
void AddConnectivityWatcher(
grpc_connectivity_state,
OrphanablePtr<AsyncConnectivityStateWatcherInterface>) override {
Crash("AddConnectivityWatcher not supported");
}
void RemoveConnectivityWatcher(
AsyncConnectivityStateWatcherInterface*) override {
Crash("RemoveConnectivityWatcher not supported");
}
void GetInfo(const grpc_channel_info* channel_info) override;
void ResetConnectionBackoff() override {}
void Ping(grpc_completion_queue*, void*) override {
Crash("Ping not supported");
}
private:
RefCountedPtr<TransportCallDestination> transport_call_destination_;
RefCountedPtr<UnstartedCallDestination> interception_chain_;
const std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine_;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_CLIENT_CHANNEL_DIRECT_CHANNEL_H

@ -24,6 +24,7 @@
#include "absl/log/check.h"
#include "absl/random/bit_gen_ref.h"
#include "absl/random/random.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include <grpc/event_engine/event_engine.h>
@ -102,11 +103,11 @@ absl::optional<CallHandler> ChaoticGoodClientTransport::LookupStream(
auto ChaoticGoodClientTransport::PushFrameIntoCall(ServerFragmentFrame frame,
CallHandler call_handler) {
auto& headers = frame.headers;
const bool has_headers = frame.headers != nullptr;
auto push = TrySeq(
If(
headers != nullptr,
[call_handler, &headers]() mutable {
has_headers,
[call_handler, headers = std::move(frame.headers)]() mutable {
return call_handler.PushServerInitialMetadata(std::move(headers));
},
[]() -> StatusFlag { return Success{}; }),
@ -173,9 +174,7 @@ auto ChaoticGoodClientTransport::TransportReadLoop(
frame = std::move(frame)]() mutable {
return Map(call_handler.CancelIfFails(PushFrameIntoCall(
std::move(frame), call_handler)),
[](StatusFlag f) {
return StatusCast<absl::Status>(f);
});
[](StatusFlag) { return absl::OkStatus(); });
});
},
[&deserialize_status]() {
@ -190,8 +189,13 @@ auto ChaoticGoodClientTransport::TransportReadLoop(
});
}
auto ChaoticGoodClientTransport::OnTransportActivityDone() {
return [self = RefAsSubclass<ChaoticGoodClientTransport>()](absl::Status) {
auto ChaoticGoodClientTransport::OnTransportActivityDone(
absl::string_view what) {
return [self = RefAsSubclass<ChaoticGoodClientTransport>(),
what](absl::Status status) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Client transport " << self.get() << " closed (via "
<< what << "): " << status;
self->AbortWithError();
};
}
@ -211,11 +215,12 @@ ChaoticGoodClientTransport::ChaoticGoodClientTransport(
writer_ = MakeActivity(
// Continuously write next outgoing frames to promise endpoints.
TransportWriteLoop(transport), EventEngineWakeupScheduler(event_engine),
OnTransportActivityDone());
OnTransportActivityDone("write_loop"));
reader_ = MakeActivity(
// Continuously read next incoming frames from promise endpoints.
TransportReadLoop(std::move(transport)),
EventEngineWakeupScheduler(event_engine), OnTransportActivityDone());
EventEngineWakeupScheduler(event_engine),
OnTransportActivityDone("read_loop"));
}
ChaoticGoodClientTransport::~ChaoticGoodClientTransport() {
@ -314,16 +319,20 @@ void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) {
call_handler.SpawnGuarded("outbound_loop", [this, call_handler]() mutable {
const uint32_t stream_id = MakeStream(call_handler);
return Map(CallOutboundLoop(stream_id, call_handler),
[stream_id, this](absl::Status result) {
if (GRPC_TRACE_FLAG_ENABLED(chaotic_good)) {
gpr_log(GPR_INFO, "CHAOTIC_GOOD: Call %d finished with %s",
stream_id, result.ToString().c_str());
}
[stream_id, sender = outgoing_frames_.MakeSender()](
absl::Status result) mutable {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Call " << stream_id << " finished with "
<< result.ToString();
if (!result.ok()) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Send cancel";
CancelFrame frame;
frame.stream_id = stream_id;
outgoing_frames_.MakeSender().UnbufferedImmediateSend(
std::move(frame));
if (!sender.UnbufferedImmediateSend(std::move(frame))) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Send cancel failed";
}
}
return result;
});

@ -96,7 +96,7 @@ class ChaoticGoodClientTransport final : public ClientTransport {
uint32_t MakeStream(CallHandler call_handler);
absl::optional<CallHandler> LookupStream(uint32_t stream_id);
auto CallOutboundLoop(uint32_t stream_id, CallHandler call_handler);
auto OnTransportActivityDone();
auto OnTransportActivityDone(absl::string_view what);
auto TransportWriteLoop(RefCountedPtr<ChaoticGoodTransport> transport);
auto TransportReadLoop(RefCountedPtr<ChaoticGoodTransport> transport);
// Push one frame into a call

@ -15,8 +15,10 @@
#include "src/core/ext/transport/inproc/inproc_transport.h"
#include <atomic>
#include <memory>
#include "absl/log/check.h"
#include "absl/status/status.h"
#include <grpc/grpc.h>
#include <grpc/support/log.h>
@ -26,10 +28,12 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/surface/channel_create.h"
#include "src/core/lib/transport/metadata.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/server/server.h"
@ -56,12 +60,14 @@ class InprocServerTransport final : public ServerTransport {
state_.compare_exchange_strong(expect, ConnectionState::kReady,
std::memory_order_acq_rel,
std::memory_order_acquire);
MutexLock lock(&state_tracker_mu_);
state_tracker_.SetState(GRPC_CHANNEL_READY, absl::OkStatus(),
"accept function set");
connected_state()->SetReady();
}
void Orphan() override { Unref(); }
void Orphan() override {
GRPC_TRACE_LOG(inproc, INFO) << "InprocServerTransport::Orphan(): " << this;
Disconnect(absl::UnavailableError("Server transport closed"));
Unref();
}
FilterStackTransport* filter_stack_transport() override { return nullptr; }
ClientTransport* client_transport() override { return nullptr; }
@ -73,26 +79,27 @@ class InprocServerTransport final : public ServerTransport {
gpr_log(GPR_INFO, "inproc server op: %s",
grpc_transport_op_string(op).c_str());
if (op->start_connectivity_watch != nullptr) {
MutexLock lock(&state_tracker_mu_);
state_tracker_.AddWatcher(op->start_connectivity_watch_state,
std::move(op->start_connectivity_watch));
connected_state()->AddWatcher(op->start_connectivity_watch_state,
std::move(op->start_connectivity_watch));
}
if (op->stop_connectivity_watch != nullptr) {
MutexLock lock(&state_tracker_mu_);
state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
connected_state()->RemoveWatcher(op->stop_connectivity_watch);
}
if (op->set_accept_stream) {
Crash("set_accept_stream not supported on inproc transport");
}
ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
}
void Disconnect(absl::Status error) {
if (disconnecting_.exchange(true, std::memory_order_relaxed)) return;
disconnect_error_ = std::move(error);
RefCountedPtr<ConnectedState> connected_state;
{
MutexLock lock(&connected_state_mu_);
connected_state = std::move(connected_state_);
}
if (connected_state == nullptr) return;
connected_state->Disconnect(std::move(error));
state_.store(ConnectionState::kDisconnected, std::memory_order_relaxed);
MutexLock lock(&state_tracker_mu_);
state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN, disconnect_error_,
"inproc transport disconnected");
}
absl::StatusOr<CallInitiator> AcceptCall(ClientMetadataHandle md) {
@ -113,16 +120,54 @@ class InprocServerTransport final : public ServerTransport {
OrphanablePtr<InprocClientTransport> MakeClientTransport();
class ConnectedState : public RefCounted<ConnectedState> {
public:
~ConnectedState() override {
state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN, disconnect_error_,
"inproc transport disconnected");
}
void SetReady() {
MutexLock lock(&state_tracker_mu_);
state_tracker_.SetState(GRPC_CHANNEL_READY, absl::OkStatus(),
"accept function set");
}
void Disconnect(absl::Status error) {
disconnect_error_ = std::move(error);
}
void AddWatcher(grpc_connectivity_state initial_state,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
MutexLock lock(&state_tracker_mu_);
state_tracker_.AddWatcher(initial_state, std::move(watcher));
}
void RemoveWatcher(ConnectivityStateWatcherInterface* watcher) {
MutexLock lock(&state_tracker_mu_);
state_tracker_.RemoveWatcher(watcher);
}
private:
absl::Status disconnect_error_;
Mutex state_tracker_mu_;
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(state_tracker_mu_){
"inproc_server_transport", GRPC_CHANNEL_CONNECTING};
};
RefCountedPtr<ConnectedState> connected_state() {
MutexLock lock(&connected_state_mu_);
return connected_state_;
}
private:
enum class ConnectionState : uint8_t { kInitial, kReady, kDisconnected };
std::atomic<ConnectionState> state_{ConnectionState::kInitial};
std::atomic<bool> disconnecting_{false};
RefCountedPtr<UnstartedCallDestination> unstarted_call_handler_;
absl::Status disconnect_error_;
Mutex state_tracker_mu_;
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(state_tracker_mu_){
"inproc_server_transport", GRPC_CHANNEL_CONNECTING};
Mutex connected_state_mu_;
RefCountedPtr<ConnectedState> connected_state_
ABSL_GUARDED_BY(connected_state_mu_) = MakeRefCounted<ConnectedState>();
const std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine_;
const RefCountedPtr<CallArenaAllocator> call_arena_allocator_;
@ -139,19 +184,27 @@ class InprocClientTransport final : public ClientTransport {
"pull_initial_metadata",
TrySeq(child_call_handler.PullClientInitialMetadata(),
[server_transport = server_transport_,
child_call_handler](ClientMetadataHandle md) {
connected_state = server_transport_->connected_state(),
child_call_handler](ClientMetadataHandle md) mutable {
auto server_call_initiator =
server_transport->AcceptCall(std::move(md));
if (!server_call_initiator.ok()) {
return server_call_initiator.status();
}
ForwardCall(child_call_handler,
std::move(*server_call_initiator));
ForwardCall(
child_call_handler, std::move(*server_call_initiator),
[connected_state =
std::move(connected_state)](ServerMetadata& md) {
md.Set(GrpcStatusFromWire(), true);
});
return absl::OkStatus();
}));
}
void Orphan() override { delete this; }
void Orphan() override {
GRPC_TRACE_LOG(inproc, INFO) << "InprocClientTransport::Orphan(): " << this;
Unref();
}
FilterStackTransport* filter_stack_transport() override { return nullptr; }
ClientTransport* client_transport() override { return this; }
@ -170,8 +223,10 @@ class InprocClientTransport final : public ClientTransport {
const RefCountedPtr<InprocServerTransport> server_transport_;
};
bool UsePromiseBasedTransport() {
return IsPromiseBasedInprocTransportEnabled();
bool UsePromiseBasedTransport(const ChannelArgs& channel_args) {
return channel_args
.GetBool("grpc.experimental.promise_based_inproc_transport")
.value_or(IsPromiseBasedInprocTransportEnabled());
}
OrphanablePtr<InprocClientTransport>
@ -210,7 +265,8 @@ RefCountedPtr<Channel> MakeInprocChannel(Server* server,
std::ignore = server_transport.release(); // consumed by SetupTransport
auto channel = ChannelCreate(
"inproc",
client_channel_args.Set(GRPC_ARG_DEFAULT_AUTHORITY, "inproc.authority"),
client_channel_args.Set(GRPC_ARG_DEFAULT_AUTHORITY, "inproc.authority")
.Set(GRPC_ARG_USE_V3_STACK, true),
GRPC_CLIENT_DIRECT_CHANNEL, client_transport.release());
if (!channel.ok()) {
return MakeLameChannel("Failed to create client channel", channel.status());
@ -235,13 +291,14 @@ grpc_channel* grpc_inproc_channel_create(grpc_server* server,
void* reserved) {
grpc_core::ApplicationCallbackExecCtx app_exec_ctx;
grpc_core::ExecCtx exec_ctx;
if (!grpc_core::UsePromiseBasedTransport()) {
const auto channel_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
if (!grpc_core::UsePromiseBasedTransport(channel_args)) {
return grpc_legacy_inproc_channel_create(server, args, reserved);
}
return grpc_core::MakeInprocChannel(grpc_core::Server::FromC(server),
grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args))
channel_args)
.release()
->c_ptr();
}

@ -266,7 +266,6 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread() {
}
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Quiesce() {
LOG(INFO) << "WorkStealingThreadPoolImpl::Quiesce";
SetShutdown(true);
// Wait until all threads have exited.
// Note that if this is a threadpool thread then we won't exit this thread

@ -32,8 +32,10 @@
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/event_engine_context.h"
#include "src/core/lib/gprpp/construct_destruct.h"
#include "src/core/lib/gprpp/dump_args.h"
#include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/sync.h"
@ -662,11 +664,17 @@ ActivityPtr MakeActivity(Factory promise_factory,
}
inline Pending IntraActivityWaiter::pending() {
wakeups_ |= GetContext<Activity>()->CurrentParticipant();
const auto new_wakeups = GetContext<Activity>()->CurrentParticipant();
GRPC_TRACE_LOG(promise_primitives, INFO)
<< "IntraActivityWaiter::pending: "
<< GRPC_DUMP_ARGS(this, new_wakeups, wakeups_);
wakeups_ |= new_wakeups;
return Pending();
}
inline void IntraActivityWaiter::Wake() {
GRPC_TRACE_LOG(promise_primitives, INFO)
<< "IntraActivityWaiter::Wake: " << GRPC_DUMP_ARGS(this, wakeups_);
if (wakeups_ == 0) return;
GetContext<Activity>()->ForceImmediateRepoll(std::exchange(wakeups_, 0));
}

@ -15,6 +15,8 @@
#ifndef GRPC_SRC_CORE_LIB_PROMISE_STATUS_FLAG_H
#define GRPC_SRC_CORE_LIB_PROMISE_STATUS_FLAG_H
#include <ostream>
#include "absl/log/check.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
@ -190,6 +192,16 @@ class ValueOrFailure {
absl::optional<T> value_;
};
template <typename T>
inline std::ostream& operator<<(std::ostream& os,
const ValueOrFailure<T>& value) {
if (value.ok()) {
return os << "Success(" << *value << ")";
} else {
return os << "Failure";
}
}
template <typename T>
inline bool IsStatusOk(const ValueOrFailure<T>& value) {
return value.ok();

@ -219,7 +219,7 @@ std::string MakeErrorString(const ServerMetadata* trailing_metadata) {
trailing_metadata->get(GrpcStatusFromWire()).value_or(false)
? "Error received from peer"
: "Error generated by client",
"grpc_status: ",
" grpc_status: ",
grpc_status_code_to_string(trailing_metadata->get(GrpcStatusMetadata())
.value_or(GRPC_STATUS_UNKNOWN)));
if (const Slice* message =

@ -25,10 +25,12 @@
#include "src/core/channelz/channelz.h"
#include "src/core/client_channel/client_channel.h"
#include "src/core/client_channel/direct_channel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/surface/lame_client.h"
#include "src/core/lib/surface/legacy_channel.h"
#include "src/core/telemetry/stats.h"
@ -86,8 +88,15 @@ absl::StatusOr<RefCountedPtr<Channel>> ChannelCreate(
return LegacyChannel::Create(std::move(target), std::move(args),
channel_stack_type);
}
CHECK_EQ(channel_stack_type, GRPC_CLIENT_CHANNEL);
return ClientChannel::Create(std::move(target), std::move(args));
switch (channel_stack_type) {
case GRPC_CLIENT_CHANNEL:
return ClientChannel::Create(std::move(target), std::move(args));
case GRPC_CLIENT_DIRECT_CHANNEL:
return DirectChannel::Create(std::move(target), args);
default:
Crash(absl::StrCat("Invalid channel stack type for ChannelCreate: ",
grpc_channel_stack_type_string(channel_stack_type)));
}
}
} // namespace grpc_core

@ -108,6 +108,7 @@ ClientCall::ClientCall(
RefCountedPtr<Arena> arena,
RefCountedPtr<UnstartedCallDestination> destination)
: Call(false, deadline, std::move(arena), event_engine),
DualRefCounted("ClientCall"),
cq_(cq),
call_destination_(std::move(destination)),
compression_options_(compression_options) {
@ -153,6 +154,7 @@ void ClientCall::CancelWithError(grpc_error_handle error) {
if (call_state_.compare_exchange_strong(cur_state, kCancelled,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
ResetDeadline();
return;
}
break;
@ -168,6 +170,7 @@ void ClientCall::CancelWithError(grpc_error_handle error) {
if (call_state_.compare_exchange_strong(cur_state, kCancelled,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
ResetDeadline();
auto* unordered_start = reinterpret_cast<UnorderedStart*>(cur_state);
while (unordered_start != nullptr) {
auto next = unordered_start->next;
@ -338,6 +341,8 @@ void ClientCall::CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
[this, out_status, out_status_details, out_error_string,
out_trailing_metadata](
ServerMetadataHandle server_trailing_metadata) {
saw_trailing_metadata_.store(true, std::memory_order_relaxed);
ResetDeadline();
GRPC_TRACE_LOG(call, INFO)
<< DebugTag() << "RecvStatusOnClient "
<< server_trailing_metadata->DebugString();

@ -82,8 +82,9 @@ class ClientCall final
void InternalUnref(const char*) override { WeakUnref(); }
void Orphaned() override {
// TODO(ctiller): only when we're not already finished
CancelWithError(absl::CancelledError());
if (!saw_trailing_metadata_.load(std::memory_order_relaxed)) {
CancelWithError(absl::CancelledError());
}
}
void SetCompletionQueue(grpc_completion_queue*) override {
@ -164,6 +165,7 @@ class ClientCall final
ServerMetadataHandle received_initial_metadata_;
ServerMetadataHandle received_trailing_metadata_;
bool is_trailers_only_;
std::atomic<bool> saw_trailing_metadata_{false};
};
grpc_call* MakeClientCall(

@ -188,6 +188,8 @@ void ServerCall::CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
[this, cancelled = op->data.recv_close_on_server.cancelled]() {
return Map(call_handler_.WasCancelled(),
[cancelled, this](bool result) -> Success {
saw_was_cancelled_.store(true,
std::memory_order_relaxed);
ResetDeadline();
*cancelled = result ? 1 : 0;
return Success{};

@ -20,6 +20,7 @@
#include <stdlib.h>
#include <string.h>
#include <atomic>
#include <cstdint>
#include <memory>
#include <string>
@ -80,9 +81,8 @@ class ServerCall final : public Call, public DualRefCounted<ServerCall> {
call_handler_.SpawnInfallible(
"CancelWithError",
[self = WeakRefAsSubclass<ServerCall>(), error = std::move(error)] {
auto status = ServerMetadataFromStatus(error);
status->Set(GrpcCallWasCancelled(), true);
self->call_handler_.PushServerTrailingMetadata(std::move(status));
self->call_handler_.PushServerTrailingMetadata(
CancelledServerMetadataFromStatus(error));
return Empty{};
});
}
@ -101,8 +101,9 @@ class ServerCall final : public Call, public DualRefCounted<ServerCall> {
void InternalUnref(const char*) override { WeakUnref(); }
void Orphaned() override {
// TODO(ctiller): only when we're not already finished
CancelWithError(absl::CancelledError());
if (!saw_was_cancelled_.load(std::memory_order_relaxed)) {
CancelWithError(absl::CancelledError());
}
}
void SetCompletionQueue(grpc_completion_queue*) override {
@ -155,6 +156,7 @@ class ServerCall final : public Call, public DualRefCounted<ServerCall> {
ClientMetadataHandle client_initial_metadata_stored_;
grpc_completion_queue* const cq_;
ServerInterface* const server_;
std::atomic<bool> saw_was_cancelled_{false};
};
grpc_call* MakeServerCall(CallHandler call_handler,

@ -185,7 +185,7 @@ char g_empty_call_data;
CallFilters::CallFilters(ClientMetadataHandle client_initial_metadata)
: stack_(nullptr),
call_data_(nullptr),
client_initial_metadata_(std::move(client_initial_metadata)) {}
push_client_initial_metadata_(std::move(client_initial_metadata)) {}
CallFilters::~CallFilters() {
if (call_data_ != nullptr && call_data_ != &g_empty_call_data) {
@ -209,10 +209,7 @@ void CallFilters::SetStack(RefCountedPtr<Stack> stack) {
constructor.call_init(Offset(call_data_, constructor.call_offset),
constructor.channel_data);
}
client_initial_metadata_state_.Start();
client_to_server_message_state_.Start();
server_initial_metadata_state_.Start();
server_to_client_message_state_.Start();
call_state_.Start();
}
void CallFilters::Finalize(const grpc_call_final_info* final_info) {
@ -224,50 +221,40 @@ void CallFilters::Finalize(const grpc_call_final_info* final_info) {
void CallFilters::CancelDueToFailedPipeOperation(SourceLocation but_where) {
// We expect something cancelled before now
if (server_trailing_metadata_ == nullptr) return;
if (push_server_trailing_metadata_ == nullptr) return;
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
gpr_log(but_where.file(), but_where.line(), GPR_LOG_SEVERITY_DEBUG,
"Cancelling due to failed pipe operation: %s",
DebugString().c_str());
}
PushServerTrailingMetadata(
ServerMetadataFromStatus(absl::CancelledError("Failed pipe operation")));
server_trailing_metadata_waiter_.Wake();
auto status =
ServerMetadataFromStatus(absl::CancelledError("Failed pipe operation"));
status->Set(GrpcCallWasCancelled(), true);
PushServerTrailingMetadata(std::move(status));
}
void CallFilters::PushServerTrailingMetadata(ServerMetadataHandle md) {
CHECK(md != nullptr);
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
if (GRPC_TRACE_FLAG_ENABLED(call)) {
gpr_log(GPR_INFO, "%s PushServerTrailingMetadata[%p]: %s into %s",
GetContext<Activity>()->DebugTag().c_str(), this,
md->DebugString().c_str(), DebugString().c_str());
}
CHECK(md != nullptr);
if (cancelled_.is_set()) return;
cancelled_.Set(md->get(GrpcCallWasCancelled()).value_or(false));
server_trailing_metadata_ = std::move(md);
client_initial_metadata_state_.CloseWithError();
server_initial_metadata_state_.CloseSending();
client_to_server_message_state_.CloseWithError();
server_to_client_message_state_.CloseSending();
server_trailing_metadata_waiter_.Wake();
if (call_state_.PushServerTrailingMetadata(
md->get(GrpcCallWasCancelled()).value_or(false))) {
push_server_trailing_metadata_ = std::move(md);
}
}
std::string CallFilters::DebugString() const {
std::vector<std::string> components = {
absl::StrFormat("this:%p", this),
absl::StrCat("client_initial_metadata:",
client_initial_metadata_state_.DebugString()),
ServerInitialMetadataPromises::DebugString("server_initial_metadata",
this),
ClientToServerMessagePromises::DebugString("client_to_server_message",
this),
ServerToClientMessagePromises::DebugString("server_to_client_message",
this),
absl::StrCat("state:", call_state_.DebugString()),
absl::StrCat("server_trailing_metadata:",
server_trailing_metadata_ == nullptr
push_server_trailing_metadata_ == nullptr
? "not-set"
: server_trailing_metadata_->DebugString())};
: push_server_trailing_metadata_->DebugString())};
return absl::StrCat("CallFilters{", absl::StrJoin(components, ", "), "}");
};
@ -303,202 +290,634 @@ RefCountedPtr<CallFilters::Stack> CallFilters::StackBuilder::Build() {
}
///////////////////////////////////////////////////////////////////////////////
// CallFilters::PipeState
void filters_detail::PipeState::Start() {
DCHECK(!started_);
started_ = true;
wait_recv_.Wake();
}
void filters_detail::PipeState::CloseWithError() {
if (state_ == ValueState::kClosed) return;
state_ = ValueState::kError;
wait_recv_.Wake();
wait_send_.Wake();
}
Poll<bool> filters_detail::PipeState::PollClosed() {
switch (state_) {
case ValueState::kIdle:
case ValueState::kWaiting:
case ValueState::kQueued:
case ValueState::kReady:
case ValueState::kProcessing:
return wait_recv_.pending();
case ValueState::kClosed:
return false;
case ValueState::kError:
return true;
// CallState
namespace filters_detail {
CallState::CallState()
: client_to_server_pull_state_(ClientToServerPullState::kBegin),
client_to_server_push_state_(ClientToServerPushState::kIdle),
server_to_client_pull_state_(ServerToClientPullState::kUnstarted),
server_to_client_push_state_(ServerToClientPushState::kStart),
server_trailing_metadata_state_(ServerTrailingMetadataState::kNotPushed) {
}
void CallState::Start() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] Start: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
server_to_client_pull_state_ = ServerToClientPullState::kStarted;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kIdle:
case ServerToClientPullState::kReading:
case ServerToClientPullState::kProcessingServerToClientMessage:
LOG(FATAL) << "Start called twice";
case ServerToClientPullState::kProcessingServerTrailingMetadata:
case ServerToClientPullState::kTerminated:
break;
}
GPR_UNREACHABLE_CODE(return Pending{});
}
void filters_detail::PipeState::CloseSending() {
switch (state_) {
case ValueState::kIdle:
state_ = ValueState::kClosed;
void CallState::BeginPushClientToServerMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] BeginPushClientToServerMessage: "
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
client_to_server_push_state_ = ClientToServerPushState::kPushedMessage;
client_to_server_push_waiter_.Wake();
break;
case ValueState::kWaiting:
state_ = ValueState::kClosed;
wait_recv_.Wake();
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
LOG(FATAL) << "PushClientToServerMessage called twice concurrently";
break;
case ValueState::kClosed:
case ValueState::kError:
case ClientToServerPushState::kPushedHalfClose:
LOG(FATAL) << "PushClientToServerMessage called after half-close";
break;
case ValueState::kQueued:
case ValueState::kReady:
case ValueState::kProcessing:
Crash("Only one push allowed to be outstanding");
case ClientToServerPushState::kFinished:
break;
}
}
void filters_detail::PipeState::BeginPush() {
switch (state_) {
case ValueState::kIdle:
state_ = ValueState::kQueued;
Poll<StatusFlag> CallState::PollPushClientToServerMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPushClientToServerMessage: "
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
case ClientToServerPushState::kPushedHalfClose:
return Success{};
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
return client_to_server_push_waiter_.pending();
case ClientToServerPushState::kFinished:
return Failure{};
}
Crash("Unreachable");
}
void CallState::ClientToServerHalfClose() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] ClientToServerHalfClose: "
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose;
client_to_server_push_waiter_.Wake();
break;
case ValueState::kWaiting:
state_ = ValueState::kReady;
wait_recv_.Wake();
case ClientToServerPushState::kPushedMessage:
client_to_server_push_state_ =
ClientToServerPushState::kPushedMessageAndHalfClosed;
break;
case ValueState::kClosed:
case ValueState::kError:
case ClientToServerPushState::kPushedHalfClose:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
LOG(FATAL) << "ClientToServerHalfClose called twice";
break;
case ValueState::kQueued:
case ValueState::kReady:
case ValueState::kProcessing:
Crash("Only one push allowed to be outstanding");
case ClientToServerPushState::kFinished:
break;
}
}
void filters_detail::PipeState::DropPush() {
switch (state_) {
case ValueState::kQueued:
case ValueState::kReady:
case ValueState::kProcessing:
case ValueState::kWaiting:
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
gpr_log(GPR_INFO, "%p drop push in state %s", this,
DebugString().c_str());
}
state_ = ValueState::kError;
wait_recv_.Wake();
void CallState::BeginPullClientInitialMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] BeginPullClientInitialMetadata: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
client_to_server_pull_state_ =
ClientToServerPullState::kProcessingClientInitialMetadata;
break;
case ClientToServerPullState::kProcessingClientInitialMetadata:
case ClientToServerPullState::kIdle:
case ClientToServerPullState::kReading:
case ClientToServerPullState::kProcessingClientToServerMessage:
LOG(FATAL) << "BeginPullClientInitialMetadata called twice";
break;
case ValueState::kIdle:
case ValueState::kClosed:
case ValueState::kError:
case ClientToServerPullState::kTerminated:
break;
}
}
void filters_detail::PipeState::DropPull() {
switch (state_) {
case ValueState::kQueued:
case ValueState::kReady:
case ValueState::kProcessing:
case ValueState::kWaiting:
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
gpr_log(GPR_INFO, "%p drop pull in state %s", this,
DebugString().c_str());
}
state_ = ValueState::kError;
wait_send_.Wake();
void CallState::FinishPullClientInitialMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullClientInitialMetadata: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
LOG(FATAL) << "FinishPullClientInitialMetadata called before Begin";
break;
case ClientToServerPullState::kProcessingClientInitialMetadata:
client_to_server_pull_state_ = ClientToServerPullState::kIdle;
client_to_server_pull_waiter_.Wake();
break;
case ClientToServerPullState::kIdle:
case ClientToServerPullState::kReading:
case ClientToServerPullState::kProcessingClientToServerMessage:
LOG(FATAL) << "Out of order FinishPullClientInitialMetadata";
break;
case ClientToServerPullState::kTerminated:
break;
}
}
Poll<ValueOrFailure<bool>> CallState::PollPullClientToServerMessageAvailable() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPullClientToServerMessageAvailable: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_,
client_to_server_push_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
case ClientToServerPullState::kProcessingClientInitialMetadata:
return client_to_server_pull_waiter_.pending();
case ClientToServerPullState::kIdle:
client_to_server_pull_state_ = ClientToServerPullState::kReading;
ABSL_FALLTHROUGH_INTENDED;
case ClientToServerPullState::kReading:
break;
case ClientToServerPullState::kProcessingClientToServerMessage:
LOG(FATAL) << "PollPullClientToServerMessageAvailable called while "
"processing a message";
break;
case ClientToServerPullState::kTerminated:
return Failure{};
}
DCHECK_EQ(client_to_server_pull_state_, ClientToServerPullState::kReading);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
return client_to_server_push_waiter_.pending();
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
client_to_server_pull_state_ =
ClientToServerPullState::kProcessingClientToServerMessage;
return true;
case ClientToServerPushState::kPushedHalfClose:
return false;
case ClientToServerPushState::kFinished:
client_to_server_pull_state_ = ClientToServerPullState::kTerminated;
return Failure{};
}
Crash("Unreachable");
}
void CallState::FinishPullClientToServerMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullClientToServerMessage: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_,
client_to_server_push_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
case ClientToServerPullState::kProcessingClientInitialMetadata:
LOG(FATAL) << "FinishPullClientToServerMessage called before Begin";
break;
case ClientToServerPullState::kIdle:
LOG(FATAL) << "FinishPullClientToServerMessage called twice";
break;
case ClientToServerPullState::kReading:
LOG(FATAL) << "FinishPullClientToServerMessage called before "
"PollPullClientToServerMessageAvailable";
break;
case ClientToServerPullState::kProcessingClientToServerMessage:
client_to_server_pull_state_ = ClientToServerPullState::kIdle;
client_to_server_pull_waiter_.Wake();
break;
case ClientToServerPullState::kTerminated:
break;
}
switch (client_to_server_push_state_) {
case ClientToServerPushState::kPushedMessage:
client_to_server_push_state_ = ClientToServerPushState::kIdle;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kIdle:
case ClientToServerPushState::kPushedHalfClose:
LOG(FATAL) << "FinishPullClientToServerMessage called without a message";
break;
case ClientToServerPushState::kPushedMessageAndHalfClosed:
client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kFinished:
break;
}
}
StatusFlag CallState::PushServerInitialMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PushServerInitialMetadata: "
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_,
server_trailing_metadata_state_);
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
return Failure{};
}
CHECK_EQ(server_to_client_push_state_, ServerToClientPushState::kStart);
server_to_client_push_state_ =
ServerToClientPushState::kPushedServerInitialMetadata;
server_to_client_push_waiter_.Wake();
return Success{};
}
void CallState::BeginPushServerToClientMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] BeginPushServerToClientMessage: "
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_);
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
LOG(FATAL) << "BeginPushServerToClientMessage called before "
"PushServerInitialMetadata";
break;
case ServerToClientPushState::kPushedServerInitialMetadata:
server_to_client_push_state_ =
ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage;
break;
case ValueState::kIdle:
case ValueState::kClosed:
case ValueState::kError:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
case ServerToClientPushState::kPushedMessage:
LOG(FATAL) << "BeginPushServerToClientMessage called twice concurrently";
break;
case ServerToClientPushState::kTrailersOnly:
// Will fail in poll.
break;
case ServerToClientPushState::kIdle:
server_to_client_push_state_ = ServerToClientPushState::kPushedMessage;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kFinished:
break;
}
}
Poll<StatusFlag> filters_detail::PipeState::PollPush() {
switch (state_) {
// Read completed and new read started => we see waiting here
case ValueState::kWaiting:
state_ = ValueState::kReady;
wait_recv_.Wake();
return wait_send_.pending();
case ValueState::kIdle:
case ValueState::kClosed:
Poll<StatusFlag> CallState::PollPushServerToClientMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPushServerToClientMessage: "
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_);
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
case ServerToClientPushState::kPushedServerInitialMetadata:
LOG(FATAL) << "PollPushServerToClientMessage called before "
<< "PushServerInitialMetadata";
case ServerToClientPushState::kTrailersOnly:
return false;
case ServerToClientPushState::kPushedMessage:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kIdle:
return Success{};
case ValueState::kQueued:
case ValueState::kReady:
case ValueState::kProcessing:
return wait_send_.pending();
case ValueState::kError:
case ServerToClientPushState::kFinished:
return Failure{};
}
GPR_UNREACHABLE_CODE(return Pending{});
Crash("Unreachable");
}
bool CallState::PushServerTrailingMetadata(bool cancel) {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PushServerTrailingMetadata: "
<< GRPC_DUMP_ARGS(this, cancel, server_trailing_metadata_state_,
server_to_client_push_state_,
client_to_server_push_state_,
server_trailing_metadata_waiter_.DebugString());
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
return false;
}
server_trailing_metadata_state_ =
cancel ? ServerTrailingMetadataState::kPushedCancel
: ServerTrailingMetadataState::kPushed;
server_trailing_metadata_waiter_.Wake();
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
server_to_client_push_state_ = ServerToClientPushState::kTrailersOnly;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
case ServerToClientPushState::kPushedMessage:
if (cancel) {
server_to_client_push_state_ = ServerToClientPushState::kFinished;
server_to_client_push_waiter_.Wake();
}
break;
case ServerToClientPushState::kIdle:
if (cancel) {
server_to_client_push_state_ = ServerToClientPushState::kFinished;
server_to_client_push_waiter_.Wake();
}
break;
case ServerToClientPushState::kFinished:
case ServerToClientPushState::kTrailersOnly:
break;
}
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
client_to_server_push_state_ = ClientToServerPushState::kFinished;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
client_to_server_push_state_ = ClientToServerPushState::kFinished;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kPushedHalfClose:
case ClientToServerPushState::kFinished:
break;
}
return true;
}
Poll<ValueOrFailure<bool>> filters_detail::PipeState::PollPull() {
switch (state_) {
case ValueState::kWaiting:
return wait_recv_.pending();
case ValueState::kIdle:
state_ = ValueState::kWaiting;
return wait_recv_.pending();
case ValueState::kReady:
case ValueState::kQueued:
if (!started_) return wait_recv_.pending();
state_ = ValueState::kProcessing;
Poll<bool> CallState::PollPullServerInitialMetadataAvailable() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPullServerInitialMetadataAvailable: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_to_client_push_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
if (server_to_client_push_state_ ==
ServerToClientPushState::kTrailersOnly) {
server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
return false;
}
server_to_client_push_waiter_.pending();
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kStarted:
break;
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kIdle:
case ServerToClientPullState::kReading:
case ServerToClientPullState::kProcessingServerToClientMessage:
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "PollPullServerInitialMetadataAvailable called twice";
case ServerToClientPullState::kTerminated:
return false;
}
DCHECK_EQ(server_to_client_pull_state_, ServerToClientPullState::kStarted);
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerInitialMetadata;
server_to_client_pull_waiter_.Wake();
return true;
case ValueState::kProcessing:
Crash("Only one pull allowed to be outstanding");
case ValueState::kClosed:
case ServerToClientPushState::kIdle:
case ServerToClientPushState::kPushedMessage:
LOG(FATAL)
<< "PollPullServerInitialMetadataAvailable after metadata processed";
case ServerToClientPushState::kFinished:
server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
server_to_client_pull_waiter_.Wake();
return false;
case ServerToClientPushState::kTrailersOnly:
return false;
case ValueState::kError:
return Failure{};
}
GPR_UNREACHABLE_CODE(return Pending{});
Crash("Unreachable");
}
void filters_detail::PipeState::AckPull() {
switch (state_) {
case ValueState::kProcessing:
state_ = ValueState::kIdle;
wait_send_.Wake();
void CallState::FinishPullServerInitialMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullServerInitialMetadata: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
LOG(FATAL) << "FinishPullServerInitialMetadata called before Start";
case ServerToClientPullState::kStarted:
CHECK_EQ(server_to_client_push_state_,
ServerToClientPushState::kTrailersOnly);
return;
case ServerToClientPullState::kProcessingServerInitialMetadata:
server_to_client_pull_state_ = ServerToClientPullState::kIdle;
server_to_client_pull_waiter_.Wake();
break;
case ValueState::kWaiting:
case ValueState::kIdle:
case ValueState::kQueued:
case ValueState::kReady:
case ValueState::kClosed:
Crash("AckPullValue called in invalid state");
case ValueState::kError:
case ServerToClientPullState::kIdle:
case ServerToClientPullState::kReading:
case ServerToClientPullState::kProcessingServerToClientMessage:
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "Out of order FinishPullServerInitialMetadata";
case ServerToClientPullState::kTerminated:
return;
}
DCHECK_EQ(server_to_client_pull_state_, ServerToClientPullState::kIdle);
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
LOG(FATAL) << "FinishPullServerInitialMetadata called before initial "
"metadata consumed";
case ServerToClientPushState::kPushedServerInitialMetadata:
server_to_client_push_state_ = ServerToClientPushState::kIdle;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
server_to_client_push_state_ = ServerToClientPushState::kPushedMessage;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPushState::kIdle:
case ServerToClientPushState::kPushedMessage:
case ServerToClientPushState::kTrailersOnly:
case ServerToClientPushState::kFinished:
LOG(FATAL) << "FinishPullServerInitialMetadata called twice";
}
}
std::string filters_detail::PipeState::DebugString() const {
const char* state_str = "<<invalid-value>>";
switch (state_) {
case ValueState::kIdle:
state_str = "Idle";
Poll<ValueOrFailure<bool>> CallState::PollPullServerToClientMessageAvailable() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPullServerToClientMessageAvailable: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_to_client_push_state_,
server_trailing_metadata_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kProcessingServerInitialMetadata:
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kStarted:
if (server_to_client_push_state_ ==
ServerToClientPushState::kTrailersOnly) {
return false;
}
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kIdle:
server_to_client_pull_state_ = ServerToClientPullState::kReading;
ABSL_FALLTHROUGH_INTENDED;
case ServerToClientPullState::kReading:
break;
case ValueState::kWaiting:
state_str = "Waiting";
case ServerToClientPullState::kProcessingServerToClientMessage:
LOG(FATAL) << "PollPullServerToClientMessageAvailable called while "
"processing a message";
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "PollPullServerToClientMessageAvailable called while "
"processing trailing metadata";
case ServerToClientPullState::kTerminated:
return Failure{};
}
DCHECK_EQ(server_to_client_pull_state_, ServerToClientPullState::kReading);
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kIdle:
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
return false;
}
server_trailing_metadata_waiter_.pending();
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kTrailersOnly:
DCHECK_NE(server_trailing_metadata_state_,
ServerTrailingMetadataState::kNotPushed);
return false;
case ServerToClientPushState::kPushedMessage:
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerToClientMessage;
server_to_client_pull_waiter_.Wake();
return true;
case ServerToClientPushState::kFinished:
server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
server_to_client_pull_waiter_.Wake();
return Failure{};
}
Crash("Unreachable");
}
void CallState::FinishPullServerToClientMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullServerToClientMessage: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_to_client_push_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kProcessingServerInitialMetadata:
LOG(FATAL)
<< "FinishPullServerToClientMessage called before metadata available";
case ServerToClientPullState::kIdle:
LOG(FATAL) << "FinishPullServerToClientMessage called twice";
case ServerToClientPullState::kReading:
LOG(FATAL) << "FinishPullServerToClientMessage called before "
<< "PollPullServerToClientMessageAvailable";
case ServerToClientPullState::kProcessingServerToClientMessage:
server_to_client_pull_state_ = ServerToClientPullState::kIdle;
server_to_client_pull_waiter_.Wake();
break;
case ValueState::kQueued:
state_str = "Queued";
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "FinishPullServerToClientMessage called while processing "
"trailing metadata";
case ServerToClientPullState::kTerminated:
break;
case ValueState::kReady:
state_str = "Ready";
}
switch (server_to_client_push_state_) {
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kStart:
LOG(FATAL) << "FinishPullServerToClientMessage called before initial "
"metadata consumed";
case ServerToClientPushState::kTrailersOnly:
LOG(FATAL) << "FinishPullServerToClientMessage called after "
"PushServerTrailingMetadata";
case ServerToClientPushState::kPushedMessage:
server_to_client_push_state_ = ServerToClientPushState::kIdle;
server_to_client_push_waiter_.Wake();
break;
case ValueState::kProcessing:
state_str = "Processing";
case ServerToClientPushState::kIdle:
LOG(FATAL) << "FinishPullServerToClientMessage called without a message";
case ServerToClientPushState::kFinished:
break;
case ValueState::kClosed:
state_str = "Closed";
}
}
Poll<Empty> CallState::PollServerTrailingMetadataAvailable() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollServerTrailingMetadataAvailable: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_trailing_metadata_state_,
server_trailing_metadata_waiter_.DebugString());
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kProcessingServerToClientMessage:
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kIdle:
case ServerToClientPullState::kReading:
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerTrailingMetadata;
server_to_client_pull_waiter_.Wake();
return Empty{};
}
return server_trailing_metadata_waiter_.pending();
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "PollServerTrailingMetadataAvailable called twice";
case ServerToClientPullState::kTerminated:
return Empty{};
}
Crash("Unreachable");
}
void CallState::FinishPullServerTrailingMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullServerTrailingMetadata: "
<< GRPC_DUMP_ARGS(this, server_trailing_metadata_state_,
server_trailing_metadata_waiter_.DebugString());
switch (server_trailing_metadata_state_) {
case ServerTrailingMetadataState::kNotPushed:
LOG(FATAL) << "FinishPullServerTrailingMetadata called before "
"PollServerTrailingMetadataAvailable";
case ServerTrailingMetadataState::kPushed:
server_trailing_metadata_state_ = ServerTrailingMetadataState::kPulled;
server_trailing_metadata_waiter_.Wake();
break;
case ValueState::kError:
state_str = "Error";
case ServerTrailingMetadataState::kPushedCancel:
server_trailing_metadata_state_ =
ServerTrailingMetadataState::kPulledCancel;
server_trailing_metadata_waiter_.Wake();
break;
case ServerTrailingMetadataState::kPulled:
case ServerTrailingMetadataState::kPulledCancel:
LOG(FATAL) << "FinishPullServerTrailingMetadata called twice";
}
return absl::StrCat(state_str, started_ ? "" : " (not started)");
}
Poll<bool> CallState::PollWasCancelled() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollWasCancelled: "
<< GRPC_DUMP_ARGS(this, server_trailing_metadata_state_);
switch (server_trailing_metadata_state_) {
case ServerTrailingMetadataState::kNotPushed:
case ServerTrailingMetadataState::kPushed:
case ServerTrailingMetadataState::kPushedCancel: {
return server_trailing_metadata_waiter_.pending();
}
case ServerTrailingMetadataState::kPulled:
return false;
case ServerTrailingMetadataState::kPulledCancel:
return true;
}
Crash("Unreachable");
}
std::string CallState::DebugString() const {
return absl::StrCat(
"client_to_server_pull_state:", client_to_server_pull_state_,
" client_to_server_push_state:", client_to_server_push_state_,
" server_to_client_pull_state:", server_to_client_pull_state_,
" server_to_client_message_push_state:", server_to_client_push_state_,
" server_trailing_metadata_state:", server_trailing_metadata_state_,
client_to_server_push_waiter_.DebugString(),
" server_to_client_push_waiter:",
server_to_client_push_waiter_.DebugString(),
" client_to_server_pull_waiter:",
client_to_server_pull_waiter_.DebugString(),
" server_to_client_pull_waiter:",
server_to_client_pull_waiter_.DebugString(),
" server_trailing_metadata_waiter:",
server_trailing_metadata_waiter_.DebugString());
}
static_assert(sizeof(CallState) <= 16, "CallState too large");
} // namespace filters_detail
} // namespace grpc_core

File diff suppressed because it is too large Load Diff

@ -14,6 +14,8 @@
#include "src/core/lib/transport/call_spine.h"
#include "absl/functional/any_invocable.h"
#include <grpc/support/port_platform.h>
#include "src/core/lib/promise/for_each.h"
@ -21,75 +23,99 @@
namespace grpc_core {
void ForwardCall(CallHandler call_handler, CallInitiator call_initiator) {
void ForwardCall(CallHandler call_handler, CallInitiator call_initiator,
absl::AnyInvocable<void(ServerMetadata&)>
on_server_trailing_metadata_from_initiator) {
// Read messages from handler into initiator.
call_handler.SpawnGuarded("read_messages", [call_handler,
call_initiator]() mutable {
return Seq(ForEach(OutgoingMessages(call_handler),
[call_initiator](MessageHandle msg) mutable {
// Need to spawn a job into the initiator's activity to
// push the message in.
return call_initiator.SpawnWaitable(
"send_message",
[msg = std::move(msg), call_initiator]() mutable {
return call_initiator.CancelIfFails(
call_initiator.PushMessage(std::move(msg)));
});
}),
[call_initiator](StatusFlag result) mutable {
if (result.ok()) {
call_initiator.SpawnInfallible(
"finish-downstream-ok", [call_initiator]() mutable {
call_initiator.FinishSends();
return Empty{};
});
} else {
call_initiator.SpawnInfallible("finish-downstream-fail",
[call_initiator]() mutable {
call_initiator.Cancel();
return Empty{};
});
}
return result;
});
});
call_initiator.SpawnInfallible("read_the_things", [call_initiator,
call_handler]() mutable {
return Seq(
call_initiator.CancelIfFails(TrySeq(
call_initiator.PullServerInitialMetadata(),
call_handler.SpawnInfallible(
"read_messages", [call_handler, call_initiator]() mutable {
return Seq(
ForEach(OutgoingMessages(call_handler),
[call_initiator](MessageHandle msg) mutable {
// Need to spawn a job into the initiator's activity to
// push the message in.
return call_initiator.SpawnWaitable(
"send_message",
[msg = std::move(msg), call_initiator]() mutable {
return call_initiator.PushMessage(std::move(msg));
});
}),
[call_initiator](StatusFlag result) mutable {
if (result.ok()) {
call_initiator.SpawnInfallible("finish-downstream-ok",
[call_initiator]() mutable {
call_initiator.FinishSends();
return Empty{};
});
}
return Empty{};
});
});
call_handler.SpawnInfallible(
"check_cancellation", [call_handler, call_initiator]() mutable {
return Map(call_handler.WasCancelled(), [call_initiator =
std::move(call_initiator)](
bool cancelled) mutable {
if (cancelled) {
call_initiator.SpawnInfallible("propagate_handler_cancel",
[call_initiator]() mutable {
call_initiator.Cancel();
return Empty();
});
}
return Empty();
});
});
call_initiator.SpawnInfallible(
"read_the_things",
[call_initiator, call_handler,
on_server_trailing_metadata_from_initiator =
std::move(on_server_trailing_metadata_from_initiator)]() mutable {
return Seq(
call_initiator.CancelIfFails(TrySeq(
call_initiator.PullServerInitialMetadata(),
[call_handler, call_initiator](
absl::optional<ServerMetadataHandle> md) mutable {
const bool has_md = md.has_value();
return If(
has_md,
[&call_handler, &call_initiator,
md = std::move(md)]() mutable {
call_handler.SpawnGuarded(
"recv_initial_metadata",
[md = std::move(*md), call_handler]() mutable {
return call_handler.PushServerInitialMetadata(
std::move(md));
});
return ForEach(
OutgoingMessages(call_initiator),
[call_handler](MessageHandle msg) mutable {
return call_handler.SpawnWaitable(
"recv_message", [msg = std::move(msg),
call_handler]() mutable {
return call_handler.CancelIfFails(
call_handler.PushMessage(
std::move(msg)));
});
});
},
[]() -> StatusFlag { return Success{}; });
})),
call_initiator.PullServerTrailingMetadata(),
[call_handler,
call_initiator](absl::optional<ServerMetadataHandle> md) mutable {
const bool has_md = md.has_value();
call_handler.SpawnGuarded(
"recv_initial_metadata",
[md = std::move(md), call_handler]() mutable {
return call_handler.PushServerInitialMetadata(
std::move(md));
on_server_trailing_metadata_from_initiator =
std::move(on_server_trailing_metadata_from_initiator)](
ServerMetadataHandle md) mutable {
on_server_trailing_metadata_from_initiator(*md);
call_handler.SpawnInfallible(
"recv_trailing",
[call_handler, md = std::move(md)]() mutable {
call_handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
return If(
has_md,
ForEach(OutgoingMessages(call_initiator),
[call_handler](MessageHandle msg) mutable {
return call_handler.SpawnWaitable(
"recv_message",
[msg = std::move(msg), call_handler]() mutable {
return call_handler.CancelIfFails(
call_handler.PushMessage(std::move(msg)));
});
}),
[]() -> StatusFlag { return Success{}; });
})),
call_initiator.PullServerTrailingMetadata(),
[call_handler](ServerMetadataHandle md) mutable {
call_handler.SpawnInfallible(
"recv_trailing", [call_handler, md = std::move(md)]() mutable {
call_handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
return Empty{};
});
});
return Empty{};
});
});
}
CallInitiatorAndHandler MakeCallPair(

@ -106,17 +106,8 @@ class CallSpine final : public Party {
return call_filters().PullClientInitialMetadata();
}
auto PushServerInitialMetadata(absl::optional<ServerMetadataHandle> md) {
bool has_md = md.has_value();
return If(
has_md,
[this, md = std::move(md)]() mutable {
return call_filters().PushServerInitialMetadata(std::move(*md));
},
[this]() {
call_filters().NoServerInitialMetadata();
return Immediate<StatusFlag>(Success{});
});
StatusFlag PushServerInitialMetadata(ServerMetadataHandle md) {
return call_filters().PushServerInitialMetadata(std::move(md));
}
auto WasCancelled() { return call_filters().WasCancelled(); }
@ -139,7 +130,9 @@ class CallSpine final : public Party {
using ResultType = typename P::Result;
return Map(std::move(promise), [this](ResultType r) {
if (!IsStatusOk(r)) {
PushServerTrailingMetadata(StatusCast<ServerMetadataHandle>(r));
auto md = StatusCast<ServerMetadataHandle>(r);
md->Set(GrpcCallWasCancelled(), true);
PushServerTrailingMetadata(std::move(md));
}
return r;
});
@ -315,7 +308,7 @@ class CallHandler {
return spine_->PullClientInitialMetadata();
}
auto PushServerInitialMetadata(absl::optional<ServerMetadataHandle> md) {
auto PushServerInitialMetadata(ServerMetadataHandle md) {
return spine_->PushServerInitialMetadata(std::move(md));
}
@ -450,7 +443,13 @@ auto OutgoingMessages(CallHalf h) {
// Forward a call from `call_handler` to `call_initiator` (with initial metadata
// `client_initial_metadata`)
void ForwardCall(CallHandler call_handler, CallInitiator call_initiator);
// `on_server_trailing_metadata_from_initiator` is a callback that will be
// called with the server trailing metadata received by the initiator, and can
// be used to mutate that metadata if desired.
void ForwardCall(
CallHandler call_handler, CallInitiator call_initiator,
absl::AnyInvocable<void(ServerMetadata&)>
on_server_trailing_metadata_from_initiator = [](ServerMetadata&) {});
} // namespace grpc_core

@ -17,12 +17,12 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata_batch.h"
namespace grpc_core {
ServerMetadataHandle ServerMetadataFromStatus(const absl::Status& status,
Arena* arena) {
auto hdl = arena->MakePooled<ServerMetadata>();
ServerMetadataHandle ServerMetadataFromStatus(const absl::Status& status) {
auto hdl = Arena::MakePooled<ServerMetadata>();
grpc_status_code code;
std::string message;
grpc_error_get_status(status, Timestamp::InfFuture(), &code, &message,
@ -34,4 +34,11 @@ ServerMetadataHandle ServerMetadataFromStatus(const absl::Status& status,
return hdl;
}
ServerMetadataHandle CancelledServerMetadataFromStatus(
const absl::Status& status) {
auto hdl = ServerMetadataFromStatus(status);
hdl->Set(GrpcCallWasCancelled(), true);
return hdl;
}
} // namespace grpc_core

@ -31,6 +31,19 @@ using ServerMetadataHandle = Arena::PoolPtr<ServerMetadata>;
using ClientMetadata = grpc_metadata_batch;
using ClientMetadataHandle = Arena::PoolPtr<ClientMetadata>;
// TODO(ctiller): separate when we have different types for client/server
// metadata.
template <typename Sink>
void AbslStringify(Sink& sink, const Arena::PoolPtr<grpc_metadata_batch>& md) {
if (md == nullptr) {
sink.Append("nullptr");
return;
}
sink.Append("ServerMetadata{");
sink.Append(md->DebugString());
sink.Append("}");
}
// Ok/not-ok check for trailing metadata, so that it can be used as result types
// for TrySeq.
inline bool IsStatusOk(const ServerMetadataHandle& m) {
@ -38,8 +51,9 @@ inline bool IsStatusOk(const ServerMetadataHandle& m) {
GRPC_STATUS_OK;
}
ServerMetadataHandle ServerMetadataFromStatus(
const absl::Status& status, Arena* arena = GetContext<Arena>());
ServerMetadataHandle ServerMetadataFromStatus(const absl::Status& status);
ServerMetadataHandle CancelledServerMetadataFromStatus(
const absl::Status& status);
template <>
struct StatusCastImpl<ServerMetadataHandle, absl::Status> {

@ -508,6 +508,8 @@ class Transport : public InternallyRefCounted<Transport> {
struct RawPointerChannelArgTag {};
static absl::string_view ChannelArgName() { return GRPC_ARG_TRANSPORT; }
using InternallyRefCounted<Transport>::InternallyRefCounted;
// Though internally ref counted transports expose their "Ref" method to
// create a RefCountedPtr to themselves. The OrphanablePtr owner is the
// singleton decision maker on whether the transport should be destroyed or
@ -601,6 +603,7 @@ class FilterStackTransport : public Transport {
class ClientTransport : public Transport {
public:
using Transport::Transport;
virtual void StartCall(CallHandler call_handler) = 0;
protected:
@ -609,6 +612,7 @@ class ClientTransport : public Transport {
class ServerTransport : public Transport {
public:
using Transport::Transport;
// Called once slightly after transport setup to register the accept function.
virtual void SetCallDestination(
RefCountedPtr<UnstartedCallDestination> unstarted_call_handler) = 0;

@ -24,6 +24,7 @@ CORE_SOURCE_FILES = [
'src/core/client_channel/client_channel_filter.cc',
'src/core/client_channel/client_channel_plugin.cc',
'src/core/client_channel/client_channel_service_config.cc',
'src/core/client_channel/direct_channel.cc',
'src/core/client_channel/dynamic_filters.cc',
'src/core/client_channel/global_subchannel_pool.cc',
'src/core/client_channel/load_balanced_call_destination.cc',

@ -282,7 +282,7 @@ CLIENT_CHANNEL_TEST(StartCall) {
}
// A filter that adds metadata foo=bar.
class TestFilter : public ImplementChannelFilter<TestFilter> {
class TestFilter {
public:
class Call {
public:

@ -49,6 +49,7 @@
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/surface/event_string.h"
#include "test/core/test_util/build.h"
#include "test/core/test_util/test_config.h"
// a set of metadata we expect to find on an event

@ -836,7 +836,16 @@ std::vector<CoreTestConfiguration> DefaultConfigs() {
FEATURE_MASK_DOES_NOT_SUPPORT_WRITE_BUFFERING,
nullptr,
[](const ChannelArgs&, const ChannelArgs&) {
return std::make_unique<InprocFixture>();
return std::make_unique<InprocFixture>(false);
},
},
CoreTestConfiguration{
"InprocWithPromises",
FEATURE_MASK_DOES_NOT_SUPPORT_WRITE_BUFFERING |
FEATURE_MASK_IS_CALL_V3,
nullptr,
[](const ChannelArgs&, const ChannelArgs&) {
return std::make_unique<InprocFixture>(true);
},
},
CoreTestConfiguration{
@ -990,7 +999,8 @@ std::vector<CoreTestConfiguration> DefaultConfigs() {
"ChaoticGoodFullStack",
FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL |
FEATURE_MASK_DOES_NOT_SUPPORT_RETRY |
FEATURE_MASK_DOES_NOT_SUPPORT_WRITE_BUFFERING,
FEATURE_MASK_DOES_NOT_SUPPORT_WRITE_BUFFERING |
FEATURE_MASK_IS_CALL_V3,
nullptr,
[](const ChannelArgs& /*client_args*/,
const ChannelArgs& /*server_args*/) {

@ -83,6 +83,7 @@
#define FEATURE_MASK_DO_NOT_FUZZ (1 << 13)
// Exclude this fixture from experiment runs
#define FEATURE_MASK_EXCLUDE_FROM_EXPERIMENT_RUNS (1 << 14)
#define FEATURE_MASK_IS_CALL_V3 (1 << 15)
#define FAIL_AUTH_CHECK_SERVER_ARG_NAME "fail_auth_check"
@ -677,9 +678,9 @@ class CoreEnd2endTestRegistry {
#define SKIP_IF_FUZZING() \
if (g_is_fuzzing_core_e2e_tests) GTEST_SKIP() << "Skipping test for fuzzing"
#define SKIP_IF_CHAOTIC_GOOD() \
if (absl::StrContains(GetParam()->name, "ChaoticGood")) { \
GTEST_SKIP() << "Disabled for initial chaotic good testing"; \
#define SKIP_IF_V3() \
if (GetParam()->feature_mask & FEATURE_MASK_IS_CALL_V3) { \
GTEST_SKIP() << "Disabled for initial v3 testing"; \
}
#define CORE_END2END_TEST(suite, name) \

@ -24,12 +24,20 @@
#include "test/core/end2end/end2end_tests.h"
class InprocFixture : public grpc_core::CoreTestFixture {
public:
explicit InprocFixture(bool promise_based) : promise_based_(promise_based) {}
private:
grpc_server* MakeServer(
const grpc_core::ChannelArgs& args, grpc_completion_queue* cq,
absl::AnyInvocable<void(grpc_server*)>& pre_server_start) override {
if (made_server_ != nullptr) return made_server_;
made_server_ = grpc_server_create(args.ToC().get(), nullptr);
made_server_ = grpc_server_create(
args.Set("grpc.experimental.promise_based_inproc_transport",
promise_based_)
.ToC()
.get(),
nullptr);
grpc_server_register_completion_queue(made_server_, cq, nullptr);
pre_server_start(made_server_);
grpc_server_start(made_server_);
@ -43,10 +51,15 @@ class InprocFixture : public grpc_core::CoreTestFixture {
not_sure_what_to_do_but_this_works_for_now = [](grpc_server*) {};
return grpc_inproc_channel_create(
MakeServer(args, cq, not_sure_what_to_do_but_this_works_for_now),
args.ToC().get(), nullptr);
args.Set("grpc.experimental.promise_based_inproc_transport",
promise_based_)
.ToC()
.get(),
nullptr);
}
grpc_server* made_server_ = nullptr;
const bool promise_based_;
};
#endif // GRPC_TEST_CORE_END2END_FIXTURES_INPROC_FIXTURE_H

@ -67,12 +67,12 @@ void CancelAfterClientDone(
}
CORE_END2END_TEST(CoreEnd2endTest, CancelAfterClientDone) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
CancelAfterClientDone(*this, std::make_unique<CancelCancellationMode>());
}
CORE_END2END_TEST(CoreDeadlineTest, DeadlineAfterClientDone) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
CancelAfterClientDone(*this, std::make_unique<DeadlineCancellationMode>());
}

@ -65,7 +65,7 @@ void RunOneRequest(CoreEnd2endTest& test, bool request_is_success) {
}
CORE_END2END_TEST(CoreEnd2endTest, Channelz) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
auto args = ChannelArgs()
.Set(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE, 0)
.Set(GRPC_ARG_ENABLE_CHANNELZ, true);
@ -119,7 +119,7 @@ CORE_END2END_TEST(CoreEnd2endTest, Channelz) {
}
CORE_END2END_TEST(CoreEnd2endTest, ChannelzWithChannelTrace) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
auto args =
ChannelArgs()
.Set(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE, 1024 * 1024)
@ -149,7 +149,7 @@ CORE_END2END_TEST(CoreEnd2endTest, ChannelzWithChannelTrace) {
}
CORE_END2END_TEST(CoreEnd2endTest, ChannelzDisabled) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
auto args = ChannelArgs()
.Set(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE, 0)
.Set(GRPC_ARG_ENABLE_CHANNELZ, false);

@ -71,7 +71,7 @@ static void OneRequestAndShutdownServer(CoreEnd2endTest& test) {
}
CORE_END2END_TEST(CoreClientChannelTest, DisappearingServer) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
OneRequestAndShutdownServer(*this);
InitServer(ChannelArgs());
OneRequestAndShutdownServer(*this);

@ -95,7 +95,7 @@ void RegisterFilter(grpc_channel_stack_type type) {
}
CORE_END2END_TEST(CoreEnd2endTest, DISABLED_ServerFilterChannelInitFails) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
RegisterFilter(GRPC_SERVER_CHANNEL);
InitClient(ChannelArgs());
InitServer(ChannelArgs().Set("channel_init_fails", true));
@ -121,7 +121,7 @@ CORE_END2END_TEST(CoreEnd2endTest, DISABLED_ServerFilterChannelInitFails) {
CORE_END2END_TEST(CoreEnd2endTest, ServerFilterCallInitFails) {
SKIP_IF_FUZZING();
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
RegisterFilter(GRPC_SERVER_CHANNEL);
auto c = NewClientCall("/foo").Timeout(Duration::Seconds(5)).Create();
@ -141,7 +141,7 @@ CORE_END2END_TEST(CoreEnd2endTest, ServerFilterCallInitFails) {
};
CORE_END2END_TEST(CoreEnd2endTest, DISABLED_ClientFilterChannelInitFails) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
RegisterFilter(GRPC_CLIENT_CHANNEL);
RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL);
InitServer(ChannelArgs());
@ -161,7 +161,7 @@ CORE_END2END_TEST(CoreEnd2endTest, DISABLED_ClientFilterChannelInitFails) {
}
CORE_END2END_TEST(CoreEnd2endTest, ClientFilterCallInitFails) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
SKIP_IF_FUZZING();
RegisterFilter(GRPC_CLIENT_CHANNEL);
@ -184,7 +184,7 @@ CORE_END2END_TEST(CoreEnd2endTest, ClientFilterCallInitFails) {
CORE_END2END_TEST(CoreClientChannelTest,
DISABLED_SubchannelFilterChannelInitFails) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
RegisterFilter(GRPC_CLIENT_SUBCHANNEL);
InitServer(ChannelArgs());
InitClient(ChannelArgs().Set("channel_init_fails", true));
@ -220,7 +220,7 @@ CORE_END2END_TEST(CoreClientChannelTest,
}
CORE_END2END_TEST(CoreClientChannelTest, SubchannelFilterCallInitFails) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
RegisterFilter(GRPC_CLIENT_SUBCHANNEL);
auto c = NewClientCall("/foo").Timeout(Duration::Seconds(5)).Create();
IncomingStatusOnClient server_status;

@ -136,7 +136,7 @@ void TestMaxMessageLengthOnServerOnResponse(CoreEnd2endTest& test) {
CORE_END2END_TEST(CoreEnd2endTest,
MaxMessageLengthOnClientOnRequestViaChannelArg) {
SKIP_IF_MINSTACK();
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
InitServer(ChannelArgs());
InitClient(ChannelArgs().Set(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, 5));
TestMaxMessageLengthOnClientOnRequest(*this);
@ -146,7 +146,7 @@ CORE_END2END_TEST(
CoreEnd2endTest,
MaxMessageLengthOnClientOnRequestViaServiceConfigWithStringJsonValue) {
SKIP_IF_MINSTACK();
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
InitServer(ChannelArgs());
InitClient(ChannelArgs().Set(
GRPC_ARG_SERVICE_CONFIG,
@ -165,7 +165,7 @@ CORE_END2END_TEST(
CoreEnd2endTest,
MaxMessageLengthOnClientOnRequestViaServiceConfigWithIntegerJsonValue) {
SKIP_IF_MINSTACK();
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
InitServer(ChannelArgs());
InitClient(ChannelArgs().Set(
GRPC_ARG_SERVICE_CONFIG,
@ -183,7 +183,7 @@ CORE_END2END_TEST(
CORE_END2END_TEST(CoreEnd2endTest,
MaxMessageLengthOnServerOnRequestViaChannelArg) {
SKIP_IF_MINSTACK();
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
InitServer(ChannelArgs().Set(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 5));
InitClient(ChannelArgs());
TestMaxMessageLengthOnServerOnRequest(*this);
@ -192,7 +192,7 @@ CORE_END2END_TEST(CoreEnd2endTest,
CORE_END2END_TEST(CoreEnd2endTest,
MaxMessageLengthOnClientOnResponseViaChannelArg) {
SKIP_IF_MINSTACK();
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
InitServer(ChannelArgs());
InitClient(ChannelArgs().Set(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 5));
TestMaxMessageLengthOnClientOnResponse(*this);
@ -202,7 +202,7 @@ CORE_END2END_TEST(
CoreEnd2endTest,
MaxMessageLengthOnClientOnResponseViaServiceConfigWithStringJsonValue) {
SKIP_IF_MINSTACK();
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
InitServer(ChannelArgs());
InitClient(ChannelArgs().Set(
GRPC_ARG_SERVICE_CONFIG,
@ -221,7 +221,7 @@ CORE_END2END_TEST(
CoreEnd2endTest,
MaxMessageLengthOnClientOnResponseViaServiceConfigWithIntegerJsonValue) {
SKIP_IF_MINSTACK();
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
InitServer(ChannelArgs());
InitClient(ChannelArgs().Set(
GRPC_ARG_SERVICE_CONFIG,
@ -239,7 +239,7 @@ CORE_END2END_TEST(
CORE_END2END_TEST(CoreEnd2endTest,
MaxMessageLengthOnServerOnResponseViaChannelArg) {
SKIP_IF_MINSTACK();
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
InitServer(ChannelArgs().Set(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, 5));
InitClient(ChannelArgs());
TestMaxMessageLengthOnServerOnResponse(*this);
@ -247,7 +247,7 @@ CORE_END2END_TEST(CoreEnd2endTest,
CORE_END2END_TEST(Http2Test, MaxMessageLengthOnServerOnRequestWithCompression) {
SKIP_IF_MINSTACK();
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
// Set limit via channel args.
InitServer(ChannelArgs().Set(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 5));
InitClient(ChannelArgs());
@ -284,7 +284,7 @@ CORE_END2END_TEST(Http2Test, MaxMessageLengthOnServerOnRequestWithCompression) {
CORE_END2END_TEST(Http2Test,
MaxMessageLengthOnClientOnResponseWithCompression) {
SKIP_IF_MINSTACK();
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
// Set limit via channel args.
InitServer(ChannelArgs());
InitClient(ChannelArgs().Set(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 5));

@ -144,20 +144,20 @@ CORE_END2END_TEST(CoreEnd2endTest, BadFlagsOnRecvStatusOnClient) {
}
CORE_END2END_TEST(CoreEnd2endTest, WriteBufferIntAcceptedOnSendMessage) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
InvokeRequestWithFlags(
*this, {{GRPC_OP_SEND_MESSAGE, GRPC_WRITE_BUFFER_HINT}}, GRPC_CALL_OK);
}
CORE_END2END_TEST(CoreEnd2endTest, WriteNoCompressAcceptedOnSendMessage) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
InvokeRequestWithFlags(
*this, {{GRPC_OP_SEND_MESSAGE, GRPC_WRITE_NO_COMPRESS}}, GRPC_CALL_OK);
}
CORE_END2END_TEST(CoreEnd2endTest,
WriteBufferHintAndNoCompressAcceptedOnSendMessage) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
InvokeRequestWithFlags(
*this,
{{GRPC_OP_SEND_MESSAGE, GRPC_WRITE_BUFFER_HINT | GRPC_WRITE_NO_COMPRESS}},
@ -165,7 +165,7 @@ CORE_END2END_TEST(CoreEnd2endTest,
}
CORE_END2END_TEST(CoreEnd2endTest, WriteInternalCompressAcceptedOnSendMessage) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
InvokeRequestWithFlags(*this,
{{GRPC_OP_SEND_MESSAGE, GRPC_WRITE_INTERNAL_COMPRESS}},
GRPC_CALL_OK);

@ -31,7 +31,7 @@ namespace grpc_core {
namespace {
CORE_END2END_TEST(CoreEnd2endTest, EarlyServerShutdownFinishesInflightCalls) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
SKIP_IF_FUZZING();
auto c = NewClientCall("/foo").Timeout(Duration::Seconds(5)).Create();

@ -36,7 +36,7 @@ namespace {
// error status. (Server sending a non-OK status is not considered an error
// status.)
CORE_END2END_TEST(CoreEnd2endTest, StreamingErrorResponse) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
auto c = NewClientCall("/foo").Timeout(Duration::Seconds(5)).Create();
IncomingMetadata server_initial_metadata;
IncomingMessage response_payload1_recv;
@ -81,7 +81,7 @@ CORE_END2END_TEST(CoreEnd2endTest, StreamingErrorResponse) {
}
CORE_END2END_TEST(CoreEnd2endTest, StreamingErrorResponseRequestStatusEarly) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
auto c = NewClientCall("/foo").Timeout(Duration::Seconds(5)).Create();
IncomingMetadata server_initial_metadata;
IncomingMessage response_payload1_recv;
@ -119,7 +119,7 @@ CORE_END2END_TEST(CoreEnd2endTest, StreamingErrorResponseRequestStatusEarly) {
CORE_END2END_TEST(
CoreEnd2endTest,
StreamingErrorResponseRequestStatusEarlyAndRecvMessageSeparately) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
auto c = NewClientCall("/foo").Timeout(Duration::Seconds(5)).Create();
IncomingMetadata server_initial_metadata;
IncomingStatusOnClient server_status;

@ -32,7 +32,7 @@ namespace grpc_core {
namespace {
CORE_END2END_TEST(CoreDeadlineTest, TimeoutBeforeRequestCall) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
auto c = NewClientCall("/foo").Timeout(Duration::Seconds(1)).Create();
IncomingStatusOnClient server_status;
IncomingMetadata server_initial_metadata;
@ -75,7 +75,7 @@ CORE_END2END_TEST(CoreDeadlineTest, TimeoutBeforeRequestCall) {
CORE_END2END_TEST(CoreDeadlineTest,
TimeoutBeforeRequestCallWithRegisteredMethod) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
auto method = RegisterServerMethod("/foo", GRPC_SRM_PAYLOAD_NONE);
auto c = NewClientCall("/foo").Timeout(Duration::Seconds(1)).Create();
@ -120,7 +120,7 @@ CORE_END2END_TEST(CoreDeadlineTest,
CORE_END2END_TEST(CoreDeadlineSingleHopTest,
TimeoutBeforeRequestCallWithRegisteredMethodWithPayload) {
SKIP_IF_CHAOTIC_GOOD();
SKIP_IF_V3();
auto method =
RegisterServerMethod("/foo", GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER);

@ -89,12 +89,18 @@ void WaitForSingleOwnerWithTimeout(std::shared_ptr<EventEngine> engine,
auto start = std::chrono::system_clock::now();
while (engine.use_count() > 1) {
++n;
if (n % 100 == 0) AsanAssertNoLeaks();
if (std::chrono::system_clock::now() - start > timeout) {
if (n % 100 == 0) {
LOG(INFO) << "Checking for leaks...";
AsanAssertNoLeaks();
}
auto remaining = timeout - (std::chrono::system_clock::now() - start);
if (remaining < std::chrono::seconds{0}) {
grpc_core::Crash("Timed out waiting for a single EventEngine owner");
}
GRPC_LOG_EVERY_N_SEC(2, GPR_INFO, "engine.use_count() = %ld",
engine.use_count());
GRPC_LOG_EVERY_N_SEC(
2, GPR_INFO, "engine.use_count() = %ld timeout_remaining = %s",
engine.use_count(),
absl::FormatDuration(absl::Nanoseconds(remaining.count())).c_str());
absl::SleepFor(absl::Milliseconds(100));
}
}

@ -13,6 +13,7 @@
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package")
load("//test/core/call/yodel:grpc_yodel_test.bzl", "grpc_yodel_simple_test")
licenses(["notice"])
@ -176,3 +177,16 @@ grpc_cc_test(
"//test/core/test_util:grpc_test_util_base",
],
)
grpc_yodel_simple_test(
name = "call_spine",
srcs = ["call_spine_test.cc"],
external_deps = [
"gtest",
],
language = "C++",
deps = [
"//src/core:call_spine",
"//test/core/call/yodel:yodel_test",
],
)

@ -59,6 +59,11 @@ class MockActivity : public Activity, public Wakeable {
std::unique_ptr<ScopedActivity> scoped_activity_;
};
#define EXPECT_WAKEUP(activity, statement) \
EXPECT_CALL((activity), WakeupRequested()); \
statement; \
Mock::VerifyAndClearExpectations(&(activity));
} // namespace
////////////////////////////////////////////////////////////////////////////////
@ -1161,135 +1166,244 @@ TEST(InfallibleOperationExecutor, InstantTwo) {
gpr_free_aligned(call_data);
}
} // namespace filters_detail
///////////////////////////////////////////////////////////////////////////////
// PipeState
// CallState
namespace filters_detail {
TEST(CallStateTest, NoOp) { CallState state; }
TEST(PipeStateTest, NoOp) { PipeState(); }
TEST(CallStateTest, StartTwiceCrashes) {
CallState state;
state.Start();
EXPECT_DEATH(state.Start(), "");
}
TEST(PipeStateTest, OnePull) {
PipeState ps;
TEST(CallStateTest, PullServerInitialMetadataBlocksUntilStart) {
StrictMock<MockActivity> activity;
activity.Activate();
// initially: not started, should only see pending from pulls
EXPECT_THAT(ps.PollPull(), IsPending());
EXPECT_THAT(ps.PollPull(), IsPending());
// start it, should see a wakeup
EXPECT_CALL(activity, WakeupRequested());
ps.Start();
Mock::VerifyAndClearExpectations(&activity);
// should still see pending! nothing's been pushed
EXPECT_THAT(ps.PollPull(), IsPending());
EXPECT_THAT(ps.PollPull(), IsPending());
// begin a push, should see a wakeup
EXPECT_CALL(activity, WakeupRequested());
ps.BeginPush();
Mock::VerifyAndClearExpectations(&activity);
// now we should see a value on the pull poll
EXPECT_THAT(ps.PollPull(), IsReady(true));
// push should be pending though!
EXPECT_THAT(ps.PollPush(), IsPending());
// ack the pull, should see a wakeup
EXPECT_CALL(activity, WakeupRequested());
ps.AckPull();
Mock::VerifyAndClearExpectations(&activity);
// now the push is complete
EXPECT_THAT(ps.PollPush(), IsReady(Success()));
ps.DropPush();
ps.DropPull();
EXPECT_FALSE(ps.holds_error());
CallState state;
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.PushServerInitialMetadata());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.Start());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady());
}
TEST(PipeStateTest, StartThenPull) {
PipeState ps;
TEST(CallStateTest, PullClientInitialMetadata) {
StrictMock<MockActivity> activity;
activity.Activate();
ps.Start();
// pull is pending! nothing's been pushed
EXPECT_THAT(ps.PollPull(), IsPending());
EXPECT_THAT(ps.PollPull(), IsPending());
// begin a push, should see a wakeup
EXPECT_CALL(activity, WakeupRequested());
ps.BeginPush();
Mock::VerifyAndClearExpectations(&activity);
// now we should see a value on the pull poll
EXPECT_THAT(ps.PollPull(), IsReady(true));
// push should be pending though!
EXPECT_THAT(ps.PollPush(), IsPending());
// ack the pull, should see a wakeup
EXPECT_CALL(activity, WakeupRequested());
ps.AckPull();
Mock::VerifyAndClearExpectations(&activity);
// now the push is complete
EXPECT_THAT(ps.PollPush(), IsReady(Success()));
ps.DropPush();
ps.DropPull();
EXPECT_FALSE(ps.holds_error());
CallState state;
EXPECT_DEATH(state.FinishPullClientInitialMetadata(), "");
state.BeginPullClientInitialMetadata();
state.FinishPullClientInitialMetadata();
}
TEST(PipeStateTest, PushFirst) {
PipeState ps;
TEST(CallStateTest, ClientToServerMessagesWaitForInitialMetadata) {
StrictMock<MockActivity> activity;
activity.Activate();
// start immediately, and push immediately
ps.Start();
ps.BeginPush();
// push should be pending
EXPECT_THAT(ps.PollPush(), IsPending());
// pull should immediately see a value
EXPECT_THAT(ps.PollPull(), IsReady(true));
// push should still be pending though!
EXPECT_THAT(ps.PollPush(), IsPending());
// ack the pull, should see a wakeup
EXPECT_CALL(activity, WakeupRequested());
ps.AckPull();
Mock::VerifyAndClearExpectations(&activity);
// now the push is complete
EXPECT_THAT(ps.PollPush(), IsReady(Success()));
ps.DropPush();
ps.DropPull();
EXPECT_FALSE(ps.holds_error());
CallState state;
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
state.BeginPushClientToServerMessage();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
state.BeginPullClientInitialMetadata();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientInitialMetadata());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
}
TEST(CallStateTest, RepeatedClientToServerMessagesWithHalfClose) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.BeginPullClientInitialMetadata();
state.FinishPullClientInitialMetadata();
// Message 0
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// Message 1
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// Message 2: push before polling
state.BeginPushClientToServerMessage();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// Message 3: push before polling and half close
state.BeginPushClientToServerMessage();
state.ClientToServerHalfClose();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// ... and now we should see the half close
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(false));
}
TEST(CallStateTest, ImmediateClientToServerHalfClose) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.BeginPullClientInitialMetadata();
state.FinishPullClientInitialMetadata();
state.ClientToServerHalfClose();
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(false));
}
TEST(PipeStateTest, DropPushing) {
PipeState ps;
TEST(CallStateTest, ServerToClientMessagesWaitForInitialMetadata) {
StrictMock<MockActivity> activity;
activity.Activate();
ps.BeginPush();
ps.DropPush();
EXPECT_TRUE(ps.holds_error());
EXPECT_THAT(ps.PollPull(), IsReady(Failure()));
ps.BeginPush();
EXPECT_THAT(ps.PollPush(), IsReady(Failure()));
ps.DropPush();
CallState state;
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.Start());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.PushServerInitialMetadata());
state.BeginPushServerToClientMessage();
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity,
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(),
IsReady(true)));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerInitialMetadata());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
}
TEST(PipeStateTest, DropPulling) {
PipeState ps;
TEST(CallStateTest, RepeatedServerToClientMessages) {
StrictMock<MockActivity> activity;
activity.Activate();
EXPECT_THAT(ps.PollPull(), IsPending());
ps.DropPull();
EXPECT_TRUE(ps.holds_error());
EXPECT_THAT(ps.PollPull(), IsReady(Failure()));
ps.DropPull();
EXPECT_THAT(ps.PollPush(), IsReady(Failure()));
CallState state;
state.PushServerInitialMetadata();
state.Start();
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(true));
state.FinishPullServerInitialMetadata();
// Message 0
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
// Message 1
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
// Message 2: push before polling
state.BeginPushServerToClientMessage();
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
// Message 3: push before polling
state.BeginPushServerToClientMessage();
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
}
TEST(PipeStateTest, DropProcessing) {
PipeState ps;
TEST(CallStateTest, ReceiveTrailersOnly) {
StrictMock<MockActivity> activity;
activity.Activate();
ps.Start();
ps.BeginPush();
EXPECT_THAT(ps.PollPull(), IsReady(true));
ps.DropPull();
EXPECT_TRUE(ps.holds_error());
EXPECT_THAT(ps.PollPull(), IsReady(Failure()));
EXPECT_THAT(ps.PollPush(), IsReady(Failure()));
CallState state;
state.Start();
state.PushServerTrailingMetadata(false);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
state.FinishPullServerTrailingMetadata();
}
TEST(CallStateTest, ReceiveTrailersOnlySkipsInitialMetadataOnUnstartedCalls) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.PushServerTrailingMetadata(false);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
state.FinishPullServerTrailingMetadata();
}
TEST(CallStateTest, RecallNoCancellation) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
state.PushServerTrailingMetadata(false);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
EXPECT_THAT(state.PollWasCancelled(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata());
EXPECT_THAT(state.PollWasCancelled(), IsReady(false));
}
TEST(CallStateTest, RecallCancellation) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
state.PushServerTrailingMetadata(true);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
EXPECT_THAT(state.PollWasCancelled(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata());
EXPECT_THAT(state.PollWasCancelled(), IsReady(true));
}
TEST(CallStateTest, ReceiveTrailingMetadataAfterMessageRead) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
state.PushServerInitialMetadata();
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(true));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.PushServerTrailingMetadata(false));
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(false));
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
}
} // namespace filters_detail
@ -1369,31 +1483,24 @@ TEST(CallFiltersTest, UnaryCall) {
EXPECT_THAT(push_client_to_server_message(), IsPending());
auto pull_client_to_server_message = filters.PullClientToServerMessage();
// Pull client to server message, expect a wakeup
EXPECT_CALL(activity, WakeupRequested());
EXPECT_THAT(pull_client_to_server_message(), IsReady());
Mock::VerifyAndClearExpectations(&activity);
EXPECT_WAKEUP(activity,
EXPECT_THAT(pull_client_to_server_message(), IsReady()));
// Push should be done
EXPECT_THAT(push_client_to_server_message(), IsReady(Success{}));
// Push server initial metadata
auto push_server_initial_metadata =
filters.PushServerInitialMetadata(Arena::MakePooled<ServerMetadata>());
EXPECT_THAT(push_server_initial_metadata(), IsPending());
filters.PushServerInitialMetadata(Arena::MakePooled<ServerMetadata>());
auto pull_server_initial_metadata = filters.PullServerInitialMetadata();
// Pull server initial metadata, expect a wakeup
EXPECT_CALL(activity, WakeupRequested());
// Pull server initial metadata
EXPECT_THAT(pull_server_initial_metadata(), IsReady());
Mock::VerifyAndClearExpectations(&activity);
// Push should be done
EXPECT_THAT(push_server_initial_metadata(), IsReady(Success{}));
// Push server to client message
auto push_server_to_client_message = filters.PushServerToClientMessage(
Arena::MakePooled<Message>(SliceBuffer(), 0));
EXPECT_THAT(push_server_to_client_message(), IsPending());
auto pull_server_to_client_message = filters.PullServerToClientMessage();
// Pull server to client message, expect a wakeup
EXPECT_CALL(activity, WakeupRequested());
EXPECT_THAT(pull_server_to_client_message(), IsReady());
Mock::VerifyAndClearExpectations(&activity);
EXPECT_WAKEUP(activity,
EXPECT_THAT(pull_server_to_client_message(), IsReady()));
// Push should be done
EXPECT_THAT(push_server_to_client_message(), IsReady(Success{}));
// Push server trailing metadata
@ -1417,5 +1524,6 @@ TEST(CallFiltersTest, UnaryCall) {
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc_tracer_init();
return RUN_ALL_TESTS();
}

@ -0,0 +1,189 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/transport/call_spine.h"
#include <atomic>
#include <memory>
#include <queue>
#include "absl/strings/string_view.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <grpc/grpc.h>
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/transport/metadata.h"
#include "test/core/call/yodel/yodel_test.h"
namespace grpc_core {
using EventEngine = grpc_event_engine::experimental::EventEngine;
namespace {
const absl::string_view kTestPath = "/foo/bar";
} // namespace
class CallSpineTest : public YodelTest {
protected:
using YodelTest::YodelTest;
ClientMetadataHandle MakeClientInitialMetadata() {
auto client_initial_metadata = Arena::MakePooled<ClientMetadata>();
client_initial_metadata->Set(HttpPathMetadata(),
Slice::FromCopiedString(kTestPath));
return client_initial_metadata;
}
CallInitiatorAndHandler MakeCall(
ClientMetadataHandle client_initial_metadata) {
return MakeCallPair(std::move(client_initial_metadata),
event_engine().get(),
SimpleArenaAllocator()->MakeArena());
}
void UnaryRequest(CallInitiator initiator, CallHandler handler);
private:
void InitCoreConfiguration() override {}
void Shutdown() override {}
};
#define CALL_SPINE_TEST(name) YODEL_TEST(CallSpineTest, name)
CALL_SPINE_TEST(NoOp) {}
CALL_SPINE_TEST(Create) { MakeCall(MakeClientInitialMetadata()); }
void CallSpineTest::UnaryRequest(CallInitiator initiator, CallHandler handler) {
SpawnTestSeq(
initiator, "initiator",
[initiator]() mutable {
return initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("hello world")), 0));
},
[initiator](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
initiator.FinishSends();
return initiator.PullServerInitialMetadata();
},
[initiator](
ValueOrFailure<absl::optional<ServerMetadataHandle>> md) mutable {
EXPECT_TRUE(md.ok());
EXPECT_TRUE(md.value().has_value());
EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()),
ContentTypeMetadata::kApplicationGrpc);
return initiator.PullMessage();
},
[initiator](ValueOrFailure<absl::optional<MessageHandle>> msg) mutable {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"why hello neighbor");
return initiator.PullMessage();
},
[initiator](ValueOrFailure<absl::optional<MessageHandle>> msg) mutable {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return initiator.PullServerTrailingMetadata();
},
[initiator](ValueOrFailure<ServerMetadataHandle> md) mutable {
EXPECT_TRUE(md.ok());
EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()),
GRPC_STATUS_UNIMPLEMENTED);
return Empty{};
});
SpawnTestSeq(
handler, "handler",
[handler]() mutable { return handler.PullClientInitialMetadata(); },
[handler](ValueOrFailure<ServerMetadataHandle> md) mutable {
EXPECT_TRUE(md.ok());
EXPECT_EQ(md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
kTestPath);
return handler.PullMessage();
},
[handler](ValueOrFailure<absl::optional<MessageHandle>> msg) mutable {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"hello world");
return handler.PullMessage();
},
[handler](ValueOrFailure<absl::optional<MessageHandle>> msg) mutable {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
return handler.PushServerInitialMetadata(std::move(md));
},
[handler](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
return handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0));
},
[handler](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
}
CALL_SPINE_TEST(UnaryRequest) {
auto call = MakeCall(MakeClientInitialMetadata());
UnaryRequest(call.initiator, call.handler.StartWithEmptyFilterStack());
WaitForAllPendingWork();
}
CALL_SPINE_TEST(UnaryRequestThroughForwardCall) {
auto call1 = MakeCall(MakeClientInitialMetadata());
auto handler = call1.handler.StartWithEmptyFilterStack();
SpawnTestSeq(
call1.initiator, "initiator",
[handler]() mutable { return handler.PullClientInitialMetadata(); },
[this, handler, initiator = call1.initiator](
ValueOrFailure<ClientMetadataHandle> md) mutable {
EXPECT_TRUE(md.ok());
auto call2 = MakeCall(std::move(md.value()));
ForwardCall(handler, call2.initiator);
UnaryRequest(initiator, call2.handler.StartWithEmptyFilterStack());
return Empty{};
});
WaitForAllPendingWork();
}
CALL_SPINE_TEST(UnaryRequestThroughForwardCallWithServerTrailingMetadataHook) {
auto call1 = MakeCall(MakeClientInitialMetadata());
auto handler = call1.handler.StartWithEmptyFilterStack();
bool got_md = false;
SpawnTestSeq(
call1.initiator, "initiator",
[handler]() mutable { return handler.PullClientInitialMetadata(); },
[this, handler, initiator = call1.initiator,
&got_md](ValueOrFailure<ClientMetadataHandle> md) mutable {
EXPECT_TRUE(md.ok());
auto call2 = MakeCall(std::move(md.value()));
ForwardCall(handler, call2.initiator,
[&got_md](ServerMetadata&) { got_md = true; });
UnaryRequest(initiator, call2.handler.StartWithEmptyFilterStack());
return Empty{};
});
WaitForAllPendingWork();
EXPECT_TRUE(got_md);
}
} // namespace grpc_core

@ -95,6 +95,7 @@ class FullstackFixture : public BaseFixture {
}
~FullstackFixture() override {
channel_.reset();
server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
cq_->Shutdown();
void* tag;

@ -104,6 +104,7 @@ static void BM_UnaryPingPong(benchmark::State& state) {
service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer,
fixture->cq(), fixture->cq(), tag(slot));
}
stub.reset();
fixture.reset();
server_env[0]->~ServerEnv();
server_env[1]->~ServerEnv();

@ -451,6 +451,7 @@ for dirname in [
"filegroup": lambda name, **kwargs: None,
"sh_library": lambda name, **kwargs: None,
"platform": lambda name, **kwargs: None,
"grpc_clang_cl_settings": lambda **kwargs: None,
},
{},
)

@ -1109,6 +1109,8 @@ src/core/client_channel/client_channel_service_config.cc \
src/core/client_channel/client_channel_service_config.h \
src/core/client_channel/config_selector.h \
src/core/client_channel/connector.h \
src/core/client_channel/direct_channel.cc \
src/core/client_channel/direct_channel.h \
src/core/client_channel/dynamic_filters.cc \
src/core/client_channel/dynamic_filters.h \
src/core/client_channel/global_subchannel_pool.cc \

@ -909,6 +909,8 @@ src/core/client_channel/client_channel_service_config.cc \
src/core/client_channel/client_channel_service_config.h \
src/core/client_channel/config_selector.h \
src/core/client_channel/connector.h \
src/core/client_channel/direct_channel.cc \
src/core/client_channel/direct_channel.h \
src/core/client_channel/dynamic_filters.cc \
src/core/client_channel/dynamic_filters.h \
src/core/client_channel/global_subchannel_pool.cc \

@ -1349,6 +1349,26 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "call_spine_test",
"platforms": [
"linux",
"posix"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save