[call-v3] Separate out the CallState class from CallFilters (#37003)

At the same time, inline all state machine functions and move the trace to its own (debug-only) trace var.

Before:
```
----------------------------------------------------------------------------------------------------
Benchmark                                                          Time             CPU   Iterations
----------------------------------------------------------------------------------------------------
grpc_core::BM_UnaryWithSpawnPerEnd<CallSpineFixture>            1171 ns         1171 ns      2373821
grpc_core::BM_UnaryWithSpawnPerOp<CallSpineFixture>             1326 ns         1326 ns      2124026
grpc_core::BM_ClientToServerStreaming<CallSpineFixture>          256 ns          256 ns     10512990
grpc_core::BM_UnaryWithSpawnPerEnd<ForwardCallFixture>          2957 ns         2957 ns       949121
grpc_core::BM_UnaryWithSpawnPerOp<ForwardCallFixture>           3172 ns         3172 ns       882463
grpc_core::BM_ClientToServerStreaming<ForwardCallFixture>        500 ns          500 ns      5765914
```

After:
```
----------------------------------------------------------------------------------------------------
Benchmark                                                          Time             CPU   Iterations
----------------------------------------------------------------------------------------------------
grpc_core::BM_UnaryWithSpawnPerEnd<CallSpineFixture>            1102 ns         1102 ns      2511682
grpc_core::BM_UnaryWithSpawnPerOp<CallSpineFixture>             1263 ns         1263 ns      2264222
grpc_core::BM_ClientToServerStreaming<CallSpineFixture>          252 ns          252 ns     11090774
grpc_core::BM_UnaryWithSpawnPerEnd<ForwardCallFixture>          2855 ns         2855 ns       987991
grpc_core::BM_UnaryWithSpawnPerOp<ForwardCallFixture>           3082 ns         3081 ns       901020
grpc_core::BM_ClientToServerStreaming<ForwardCallFixture>        490 ns          490 ns      5675073
```

Closes #37003

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37003 from ctiller:excise-the-thing ab3943e828
PiperOrigin-RevId: 646493946
pull/37032/head^2
Craig Tiller 5 months ago committed by Copybara-Service
parent 12ca346150
commit ec7e4e2a96
  1. 59
      CMakeLists.txt
  2. 1
      Makefile
  3. 2
      Package.swift
  4. 54
      build_autogenerated.yaml
  5. 1
      config.m4
  6. 1
      config.w32
  7. 1
      doc/trace_flags.md
  8. 2
      gRPC-C++.podspec
  9. 3
      gRPC-Core.podspec
  10. 2
      grpc.gemspec
  11. 2
      package.xml
  12. 19
      src/core/BUILD
  13. 2
      src/core/lib/debug/trace_flags.cc
  14. 1
      src/core/lib/debug/trace_flags.h
  15. 4
      src/core/lib/debug/trace_flags.yaml
  16. 695
      src/core/lib/transport/call_filters.cc
  17. 260
      src/core/lib/transport/call_filters.h
  18. 39
      src/core/lib/transport/call_state.cc
  19. 957
      src/core/lib/transport/call_state.h
  20. 1
      src/python/grpcio/grpc_core_dependencies.py
  21. 15
      test/core/transport/BUILD
  22. 240
      test/core/transport/call_filters_test.cc
  23. 310
      test/core/transport/call_state_test.cc
  24. 2
      tools/doxygen/Doxyfile.c++.internal
  25. 2
      tools/doxygen/Doxyfile.core.internal
  26. 24
      tools/run_tests/generated/tests.json

59
CMakeLists.txt generated

@ -991,6 +991,7 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx call_spine_test) add_dependencies(buildtests_cxx call_spine_test)
endif() endif()
add_dependencies(buildtests_cxx call_state_test)
add_dependencies(buildtests_cxx call_tracer_test) add_dependencies(buildtests_cxx call_tracer_test)
add_dependencies(buildtests_cxx call_utils_test) add_dependencies(buildtests_cxx call_utils_test)
add_dependencies(buildtests_cxx cancel_after_accept_test) add_dependencies(buildtests_cxx cancel_after_accept_test)
@ -2526,6 +2527,7 @@ add_library(grpc
src/core/lib/transport/call_filters.cc src/core/lib/transport/call_filters.cc
src/core/lib/transport/call_final_info.cc src/core/lib/transport/call_final_info.cc
src/core/lib/transport/call_spine.cc src/core/lib/transport/call_spine.cc
src/core/lib/transport/call_state.cc
src/core/lib/transport/connectivity_state.cc src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/error_utils.cc src/core/lib/transport/error_utils.cc
src/core/lib/transport/interception_chain.cc src/core/lib/transport/interception_chain.cc
@ -3280,6 +3282,7 @@ add_library(grpc_unsecure
src/core/lib/transport/call_filters.cc src/core/lib/transport/call_filters.cc
src/core/lib/transport/call_final_info.cc src/core/lib/transport/call_final_info.cc
src/core/lib/transport/call_spine.cc src/core/lib/transport/call_spine.cc
src/core/lib/transport/call_state.cc
src/core/lib/transport/connectivity_state.cc src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/error_utils.cc src/core/lib/transport/error_utils.cc
src/core/lib/transport/interception_chain.cc src/core/lib/transport/interception_chain.cc
@ -5403,6 +5406,7 @@ add_library(grpc_authorization_provider
src/core/lib/transport/call_filters.cc src/core/lib/transport/call_filters.cc
src/core/lib/transport/call_final_info.cc src/core/lib/transport/call_final_info.cc
src/core/lib/transport/call_spine.cc src/core/lib/transport/call_spine.cc
src/core/lib/transport/call_state.cc
src/core/lib/transport/connectivity_state.cc src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/error_utils.cc src/core/lib/transport/error_utils.cc
src/core/lib/transport/interception_chain.cc src/core/lib/transport/interception_chain.cc
@ -8861,6 +8865,7 @@ add_executable(call_filters_test
src/core/lib/surface/channel_stack_type.cc src/core/lib/surface/channel_stack_type.cc
src/core/lib/transport/call_filters.cc src/core/lib/transport/call_filters.cc
src/core/lib/transport/call_final_info.cc src/core/lib/transport/call_final_info.cc
src/core/lib/transport/call_state.cc
src/core/lib/transport/error_utils.cc src/core/lib/transport/error_utils.cc
src/core/lib/transport/message.cc src/core/lib/transport/message.cc
src/core/lib/transport/metadata.cc src/core/lib/transport/metadata.cc
@ -9087,6 +9092,58 @@ endif()
endif() endif()
if(gRPC_BUILD_TESTS) if(gRPC_BUILD_TESTS)
add_executable(call_state_test
src/core/lib/debug/trace.cc
src/core/lib/debug/trace_flags.cc
src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/promise/activity.cc
src/core/lib/transport/call_state.cc
test/core/transport/call_state_test.cc
)
if(WIN32 AND MSVC)
if(BUILD_SHARED_LIBS)
target_compile_definitions(call_state_test
PRIVATE
"GPR_DLL_IMPORTS"
)
endif()
endif()
target_compile_features(call_state_test PUBLIC cxx_std_14)
target_include_directories(call_state_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(call_state_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
absl::config
absl::flat_hash_map
absl::hash
absl::type_traits
absl::statusor
gpr
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(call_tracer_test add_executable(call_tracer_test
test/core/telemetry/call_tracer_test.cc test/core/telemetry/call_tracer_test.cc
test/core/test_util/fake_stats_plugin.cc test/core/test_util/fake_stats_plugin.cc
@ -9338,6 +9395,7 @@ add_executable(call_utils_test
src/core/lib/transport/call_filters.cc src/core/lib/transport/call_filters.cc
src/core/lib/transport/call_final_info.cc src/core/lib/transport/call_final_info.cc
src/core/lib/transport/call_spine.cc src/core/lib/transport/call_spine.cc
src/core/lib/transport/call_state.cc
src/core/lib/transport/connectivity_state.cc src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/error_utils.cc src/core/lib/transport/error_utils.cc
src/core/lib/transport/interception_chain.cc src/core/lib/transport/interception_chain.cc
@ -18808,6 +18866,7 @@ add_executable(interception_chain_test
src/core/lib/transport/call_filters.cc src/core/lib/transport/call_filters.cc
src/core/lib/transport/call_final_info.cc src/core/lib/transport/call_final_info.cc
src/core/lib/transport/call_spine.cc src/core/lib/transport/call_spine.cc
src/core/lib/transport/call_state.cc
src/core/lib/transport/connectivity_state.cc src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/error_utils.cc src/core/lib/transport/error_utils.cc
src/core/lib/transport/interception_chain.cc src/core/lib/transport/interception_chain.cc

1
Makefile generated

@ -1343,6 +1343,7 @@ LIBGRPC_SRC = \
src/core/lib/transport/call_filters.cc \ src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_final_info.cc \ src/core/lib/transport/call_final_info.cc \
src/core/lib/transport/call_spine.cc \ src/core/lib/transport/call_spine.cc \
src/core/lib/transport/call_state.cc \
src/core/lib/transport/connectivity_state.cc \ src/core/lib/transport/connectivity_state.cc \
src/core/lib/transport/error_utils.cc \ src/core/lib/transport/error_utils.cc \
src/core/lib/transport/interception_chain.cc \ src/core/lib/transport/interception_chain.cc \

2
Package.swift generated

@ -1694,6 +1694,8 @@ let package = Package(
"src/core/lib/transport/call_final_info.h", "src/core/lib/transport/call_final_info.h",
"src/core/lib/transport/call_spine.cc", "src/core/lib/transport/call_spine.cc",
"src/core/lib/transport/call_spine.h", "src/core/lib/transport/call_spine.h",
"src/core/lib/transport/call_state.cc",
"src/core/lib/transport/call_state.h",
"src/core/lib/transport/connectivity_state.cc", "src/core/lib/transport/connectivity_state.cc",
"src/core/lib/transport/connectivity_state.h", "src/core/lib/transport/connectivity_state.h",
"src/core/lib/transport/custom_metadata.h", "src/core/lib/transport/custom_metadata.h",

@ -1099,6 +1099,7 @@ libs:
- src/core/lib/transport/call_filters.h - src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h - src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_spine.h - src/core/lib/transport/call_spine.h
- src/core/lib/transport/call_state.h
- src/core/lib/transport/connectivity_state.h - src/core/lib/transport/connectivity_state.h
- src/core/lib/transport/custom_metadata.h - src/core/lib/transport/custom_metadata.h
- src/core/lib/transport/error_utils.h - src/core/lib/transport/error_utils.h
@ -1908,6 +1909,7 @@ libs:
- src/core/lib/transport/call_filters.cc - src/core/lib/transport/call_filters.cc
- src/core/lib/transport/call_final_info.cc - src/core/lib/transport/call_final_info.cc
- src/core/lib/transport/call_spine.cc - src/core/lib/transport/call_spine.cc
- src/core/lib/transport/call_state.cc
- src/core/lib/transport/connectivity_state.cc - src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/error_utils.cc - src/core/lib/transport/error_utils.cc
- src/core/lib/transport/interception_chain.cc - src/core/lib/transport/interception_chain.cc
@ -2604,6 +2606,7 @@ libs:
- src/core/lib/transport/call_filters.h - src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h - src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_spine.h - src/core/lib/transport/call_spine.h
- src/core/lib/transport/call_state.h
- src/core/lib/transport/connectivity_state.h - src/core/lib/transport/connectivity_state.h
- src/core/lib/transport/custom_metadata.h - src/core/lib/transport/custom_metadata.h
- src/core/lib/transport/error_utils.h - src/core/lib/transport/error_utils.h
@ -3027,6 +3030,7 @@ libs:
- src/core/lib/transport/call_filters.cc - src/core/lib/transport/call_filters.cc
- src/core/lib/transport/call_final_info.cc - src/core/lib/transport/call_final_info.cc
- src/core/lib/transport/call_spine.cc - src/core/lib/transport/call_spine.cc
- src/core/lib/transport/call_state.cc
- src/core/lib/transport/connectivity_state.cc - src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/error_utils.cc - src/core/lib/transport/error_utils.cc
- src/core/lib/transport/interception_chain.cc - src/core/lib/transport/interception_chain.cc
@ -4701,6 +4705,7 @@ libs:
- src/core/lib/transport/call_filters.h - src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h - src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_spine.h - src/core/lib/transport/call_spine.h
- src/core/lib/transport/call_state.h
- src/core/lib/transport/connectivity_state.h - src/core/lib/transport/connectivity_state.h
- src/core/lib/transport/custom_metadata.h - src/core/lib/transport/custom_metadata.h
- src/core/lib/transport/error_utils.h - src/core/lib/transport/error_utils.h
@ -5001,6 +5006,7 @@ libs:
- src/core/lib/transport/call_filters.cc - src/core/lib/transport/call_filters.cc
- src/core/lib/transport/call_final_info.cc - src/core/lib/transport/call_final_info.cc
- src/core/lib/transport/call_spine.cc - src/core/lib/transport/call_spine.cc
- src/core/lib/transport/call_state.cc
- src/core/lib/transport/connectivity_state.cc - src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/error_utils.cc - src/core/lib/transport/error_utils.cc
- src/core/lib/transport/interception_chain.cc - src/core/lib/transport/interception_chain.cc
@ -6584,6 +6590,7 @@ targets:
- src/core/lib/surface/channel_stack_type.h - src/core/lib/surface/channel_stack_type.h
- src/core/lib/transport/call_filters.h - src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h - src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_state.h
- src/core/lib/transport/custom_metadata.h - src/core/lib/transport/custom_metadata.h
- src/core/lib/transport/error_utils.h - src/core/lib/transport/error_utils.h
- src/core/lib/transport/http2_errors.h - src/core/lib/transport/http2_errors.h
@ -6651,6 +6658,7 @@ targets:
- src/core/lib/surface/channel_stack_type.cc - src/core/lib/surface/channel_stack_type.cc
- src/core/lib/transport/call_filters.cc - src/core/lib/transport/call_filters.cc
- src/core/lib/transport/call_final_info.cc - src/core/lib/transport/call_final_info.cc
- src/core/lib/transport/call_state.cc
- src/core/lib/transport/error_utils.cc - src/core/lib/transport/error_utils.cc
- src/core/lib/transport/message.cc - src/core/lib/transport/message.cc
- src/core/lib/transport/metadata.cc - src/core/lib/transport/metadata.cc
@ -6782,6 +6790,48 @@ targets:
- linux - linux
- posix - posix
uses_polling: false uses_polling: false
- name: call_state_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/debug/trace.h
- src/core/lib/debug/trace_flags.h
- src/core/lib/debug/trace_impl.h
- src/core/lib/event_engine/event_engine_context.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/status_flag.h
- src/core/lib/transport/call_state.h
- test/core/promise/poll_matcher.h
src:
- src/core/lib/debug/trace.cc
- src/core/lib/debug/trace_flags.cc
- src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/promise/activity.cc
- src/core/lib/transport/call_state.cc
- test/core/transport/call_state_test.cc
deps:
- gtest
- absl/base:config
- absl/container:flat_hash_map
- absl/hash:hash
- absl/meta:type_traits
- absl/status:statusor
- gpr
uses_polling: false
- name: call_tracer_test - name: call_tracer_test
gtest: true gtest: true
build: test build: test
@ -7065,6 +7115,7 @@ targets:
- src/core/lib/transport/call_filters.h - src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h - src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_spine.h - src/core/lib/transport/call_spine.h
- src/core/lib/transport/call_state.h
- src/core/lib/transport/connectivity_state.h - src/core/lib/transport/connectivity_state.h
- src/core/lib/transport/custom_metadata.h - src/core/lib/transport/custom_metadata.h
- src/core/lib/transport/error_utils.h - src/core/lib/transport/error_utils.h
@ -7333,6 +7384,7 @@ targets:
- src/core/lib/transport/call_filters.cc - src/core/lib/transport/call_filters.cc
- src/core/lib/transport/call_final_info.cc - src/core/lib/transport/call_final_info.cc
- src/core/lib/transport/call_spine.cc - src/core/lib/transport/call_spine.cc
- src/core/lib/transport/call_state.cc
- src/core/lib/transport/connectivity_state.cc - src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/error_utils.cc - src/core/lib/transport/error_utils.cc
- src/core/lib/transport/interception_chain.cc - src/core/lib/transport/interception_chain.cc
@ -12788,6 +12840,7 @@ targets:
- src/core/lib/transport/call_filters.h - src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h - src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_spine.h - src/core/lib/transport/call_spine.h
- src/core/lib/transport/call_state.h
- src/core/lib/transport/connectivity_state.h - src/core/lib/transport/connectivity_state.h
- src/core/lib/transport/custom_metadata.h - src/core/lib/transport/custom_metadata.h
- src/core/lib/transport/error_utils.h - src/core/lib/transport/error_utils.h
@ -13057,6 +13110,7 @@ targets:
- src/core/lib/transport/call_filters.cc - src/core/lib/transport/call_filters.cc
- src/core/lib/transport/call_final_info.cc - src/core/lib/transport/call_final_info.cc
- src/core/lib/transport/call_spine.cc - src/core/lib/transport/call_spine.cc
- src/core/lib/transport/call_state.cc
- src/core/lib/transport/connectivity_state.cc - src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/error_utils.cc - src/core/lib/transport/error_utils.cc
- src/core/lib/transport/interception_chain.cc - src/core/lib/transport/interception_chain.cc

1
config.m4 generated

@ -718,6 +718,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/transport/call_filters.cc \ src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_final_info.cc \ src/core/lib/transport/call_final_info.cc \
src/core/lib/transport/call_spine.cc \ src/core/lib/transport/call_spine.cc \
src/core/lib/transport/call_state.cc \
src/core/lib/transport/connectivity_state.cc \ src/core/lib/transport/connectivity_state.cc \
src/core/lib/transport/error_utils.cc \ src/core/lib/transport/error_utils.cc \
src/core/lib/transport/interception_chain.cc \ src/core/lib/transport/interception_chain.cc \

1
config.w32 generated

@ -683,6 +683,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\transport\\call_filters.cc " + "src\\core\\lib\\transport\\call_filters.cc " +
"src\\core\\lib\\transport\\call_final_info.cc " + "src\\core\\lib\\transport\\call_final_info.cc " +
"src\\core\\lib\\transport\\call_spine.cc " + "src\\core\\lib\\transport\\call_spine.cc " +
"src\\core\\lib\\transport\\call_state.cc " +
"src\\core\\lib\\transport\\connectivity_state.cc " + "src\\core\\lib\\transport\\connectivity_state.cc " +
"src\\core\\lib\\transport\\error_utils.cc " + "src\\core\\lib\\transport\\error_utils.cc " +
"src\\core\\lib\\transport\\interception_chain.cc " + "src\\core\\lib\\transport\\interception_chain.cc " +

1
doc/trace_flags.md generated

@ -90,6 +90,7 @@ accomplished by invoking `bazel build --config=dbg <target>`
- auth_context_refcount - Auth context refcounting. - auth_context_refcount - Auth context refcounting.
- call_combiner - Call combiner state. - call_combiner - Call combiner state.
- call_refcount - Refcount on call. - call_refcount - Refcount on call.
- call_state - Traces transitions through the call spine state machine.
- closure - Legacy closure creation, scheduling, and completion. - closure - Legacy closure creation, scheduling, and completion.
- combiner - Combiner lock state. - combiner - Combiner lock state.
- cq_refcount - Completion queue refcounting. - cq_refcount - Completion queue refcounting.

2
gRPC-C++.podspec generated

@ -1202,6 +1202,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/call_filters.h', 'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h', 'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_spine.h', 'src/core/lib/transport/call_spine.h',
'src/core/lib/transport/call_state.h',
'src/core/lib/transport/connectivity_state.h', 'src/core/lib/transport/connectivity_state.h',
'src/core/lib/transport/custom_metadata.h', 'src/core/lib/transport/custom_metadata.h',
'src/core/lib/transport/error_utils.h', 'src/core/lib/transport/error_utils.h',
@ -2476,6 +2477,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/call_filters.h', 'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h', 'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_spine.h', 'src/core/lib/transport/call_spine.h',
'src/core/lib/transport/call_state.h',
'src/core/lib/transport/connectivity_state.h', 'src/core/lib/transport/connectivity_state.h',
'src/core/lib/transport/custom_metadata.h', 'src/core/lib/transport/custom_metadata.h',
'src/core/lib/transport/error_utils.h', 'src/core/lib/transport/error_utils.h',

3
gRPC-Core.podspec generated

@ -1809,6 +1809,8 @@ Pod::Spec.new do |s|
'src/core/lib/transport/call_final_info.h', 'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_spine.cc', 'src/core/lib/transport/call_spine.cc',
'src/core/lib/transport/call_spine.h', 'src/core/lib/transport/call_spine.h',
'src/core/lib/transport/call_state.cc',
'src/core/lib/transport/call_state.h',
'src/core/lib/transport/connectivity_state.cc', 'src/core/lib/transport/connectivity_state.cc',
'src/core/lib/transport/connectivity_state.h', 'src/core/lib/transport/connectivity_state.h',
'src/core/lib/transport/custom_metadata.h', 'src/core/lib/transport/custom_metadata.h',
@ -3248,6 +3250,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/call_filters.h', 'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h', 'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_spine.h', 'src/core/lib/transport/call_spine.h',
'src/core/lib/transport/call_state.h',
'src/core/lib/transport/connectivity_state.h', 'src/core/lib/transport/connectivity_state.h',
'src/core/lib/transport/custom_metadata.h', 'src/core/lib/transport/custom_metadata.h',
'src/core/lib/transport/error_utils.h', 'src/core/lib/transport/error_utils.h',

2
grpc.gemspec generated

@ -1696,6 +1696,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/transport/call_final_info.h ) s.files += %w( src/core/lib/transport/call_final_info.h )
s.files += %w( src/core/lib/transport/call_spine.cc ) s.files += %w( src/core/lib/transport/call_spine.cc )
s.files += %w( src/core/lib/transport/call_spine.h ) s.files += %w( src/core/lib/transport/call_spine.h )
s.files += %w( src/core/lib/transport/call_state.cc )
s.files += %w( src/core/lib/transport/call_state.h )
s.files += %w( src/core/lib/transport/connectivity_state.cc ) s.files += %w( src/core/lib/transport/connectivity_state.cc )
s.files += %w( src/core/lib/transport/connectivity_state.h ) s.files += %w( src/core/lib/transport/connectivity_state.h )
s.files += %w( src/core/lib/transport/custom_metadata.h ) s.files += %w( src/core/lib/transport/custom_metadata.h )

2
package.xml generated

@ -1678,6 +1678,8 @@
<file baseinstalldir="/" name="src/core/lib/transport/call_final_info.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/transport/call_final_info.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_spine.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/transport/call_spine.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_spine.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/transport/call_spine.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_state.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_state.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/connectivity_state.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/transport/connectivity_state.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/connectivity_state.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/transport/connectivity_state.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/custom_metadata.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/transport/custom_metadata.h" role="src" />

@ -7438,6 +7438,24 @@ grpc_cc_library(
], ],
) )
grpc_cc_library(
name = "call_state",
srcs = [
"lib/transport/call_state.cc",
],
hdrs = [
"lib/transport/call_state.h",
],
external_deps = ["absl/types:optional"],
deps = [
"activity",
"poll",
"status_flag",
"//:gpr",
"//:grpc_trace",
],
)
grpc_cc_library( grpc_cc_library(
name = "call_filters", name = "call_filters",
srcs = [ srcs = [
@ -7449,6 +7467,7 @@ grpc_cc_library(
external_deps = ["absl/log:check"], external_deps = ["absl/log:check"],
deps = [ deps = [
"call_final_info", "call_final_info",
"call_state",
"dump_args", "dump_args",
"if", "if",
"latch", "latch",

@ -26,6 +26,7 @@ namespace grpc_core {
DebugOnlyTraceFlag auth_context_refcount_trace(false, "auth_context_refcount"); DebugOnlyTraceFlag auth_context_refcount_trace(false, "auth_context_refcount");
DebugOnlyTraceFlag call_combiner_trace(false, "call_combiner"); DebugOnlyTraceFlag call_combiner_trace(false, "call_combiner");
DebugOnlyTraceFlag call_refcount_trace(false, "call_refcount"); DebugOnlyTraceFlag call_refcount_trace(false, "call_refcount");
DebugOnlyTraceFlag call_state_trace(false, "call_state");
DebugOnlyTraceFlag closure_trace(false, "closure"); DebugOnlyTraceFlag closure_trace(false, "closure");
DebugOnlyTraceFlag combiner_trace(false, "combiner"); DebugOnlyTraceFlag combiner_trace(false, "combiner");
DebugOnlyTraceFlag cq_refcount_trace(false, "cq_refcount"); DebugOnlyTraceFlag cq_refcount_trace(false, "cq_refcount");
@ -229,6 +230,7 @@ const absl::flat_hash_map<std::string, TraceFlag*>& GetAllTraceFlags() {
{"auth_context_refcount", &auth_context_refcount_trace}, {"auth_context_refcount", &auth_context_refcount_trace},
{"call_combiner", &call_combiner_trace}, {"call_combiner", &call_combiner_trace},
{"call_refcount", &call_refcount_trace}, {"call_refcount", &call_refcount_trace},
{"call_state", &call_state_trace},
{"closure", &closure_trace}, {"closure", &closure_trace},
{"combiner", &combiner_trace}, {"combiner", &combiner_trace},
{"cq_refcount", &cq_refcount_trace}, {"cq_refcount", &cq_refcount_trace},

@ -26,6 +26,7 @@ namespace grpc_core {
extern DebugOnlyTraceFlag auth_context_refcount_trace; extern DebugOnlyTraceFlag auth_context_refcount_trace;
extern DebugOnlyTraceFlag call_combiner_trace; extern DebugOnlyTraceFlag call_combiner_trace;
extern DebugOnlyTraceFlag call_refcount_trace; extern DebugOnlyTraceFlag call_refcount_trace;
extern DebugOnlyTraceFlag call_state_trace;
extern DebugOnlyTraceFlag closure_trace; extern DebugOnlyTraceFlag closure_trace;
extern DebugOnlyTraceFlag combiner_trace; extern DebugOnlyTraceFlag combiner_trace;
extern DebugOnlyTraceFlag cq_refcount_trace; extern DebugOnlyTraceFlag cq_refcount_trace;

@ -54,6 +54,10 @@ call_refcount:
debug_only: true debug_only: true
default: false default: false
description: Refcount on call. description: Refcount on call.
call_state:
debug_only: true
default: false
description: Traces transitions through the call spine state machine.
cares_address_sorting: cares_address_sorting:
default: false default: false
description: Operations of the c-ares based DNS resolver's address sorter. description: Operations of the c-ares based DNS resolver's address sorter.

@ -235,699 +235,4 @@ RefCountedPtr<CallFilters::Stack> CallFilters::StackBuilder::Build() {
return RefCountedPtr<Stack>(new Stack(std::move(data_))); return RefCountedPtr<Stack>(new Stack(std::move(data_)));
} }
///////////////////////////////////////////////////////////////////////////////
// CallState
namespace filters_detail {
CallState::CallState()
: client_to_server_pull_state_(ClientToServerPullState::kBegin),
client_to_server_push_state_(ClientToServerPushState::kIdle),
server_to_client_pull_state_(ServerToClientPullState::kUnstarted),
server_to_client_push_state_(ServerToClientPushState::kStart),
server_trailing_metadata_state_(ServerTrailingMetadataState::kNotPushed) {
}
void CallState::Start() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] Start: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
server_to_client_pull_state_ = ServerToClientPullState::kStarted;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kUnstartedReading:
server_to_client_pull_state_ = ServerToClientPullState::kStartedReading;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kStartedReading:
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
case ServerToClientPullState::kIdle:
case ServerToClientPullState::kReading:
case ServerToClientPullState::kProcessingServerToClientMessage:
LOG(FATAL) << "Start called twice";
case ServerToClientPullState::kProcessingServerTrailingMetadata:
case ServerToClientPullState::kTerminated:
break;
}
}
void CallState::BeginPushClientToServerMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] BeginPushClientToServerMessage: "
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
client_to_server_push_state_ = ClientToServerPushState::kPushedMessage;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
LOG(FATAL) << "PushClientToServerMessage called twice concurrently";
break;
case ClientToServerPushState::kPushedHalfClose:
LOG(FATAL) << "PushClientToServerMessage called after half-close";
break;
case ClientToServerPushState::kFinished:
break;
}
}
Poll<StatusFlag> CallState::PollPushClientToServerMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPushClientToServerMessage: "
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
case ClientToServerPushState::kPushedHalfClose:
return Success{};
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
return client_to_server_push_waiter_.pending();
case ClientToServerPushState::kFinished:
return Failure{};
}
Crash("Unreachable");
}
void CallState::ClientToServerHalfClose() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] ClientToServerHalfClose: "
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kPushedMessage:
client_to_server_push_state_ =
ClientToServerPushState::kPushedMessageAndHalfClosed;
break;
case ClientToServerPushState::kPushedHalfClose:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
LOG(FATAL) << "ClientToServerHalfClose called twice";
break;
case ClientToServerPushState::kFinished:
break;
}
}
void CallState::BeginPullClientInitialMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] BeginPullClientInitialMetadata: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
client_to_server_pull_state_ =
ClientToServerPullState::kProcessingClientInitialMetadata;
break;
case ClientToServerPullState::kProcessingClientInitialMetadata:
case ClientToServerPullState::kIdle:
case ClientToServerPullState::kReading:
case ClientToServerPullState::kProcessingClientToServerMessage:
LOG(FATAL) << "BeginPullClientInitialMetadata called twice";
break;
case ClientToServerPullState::kTerminated:
break;
}
}
void CallState::FinishPullClientInitialMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullClientInitialMetadata: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
LOG(FATAL) << "FinishPullClientInitialMetadata called before Begin";
break;
case ClientToServerPullState::kProcessingClientInitialMetadata:
client_to_server_pull_state_ = ClientToServerPullState::kIdle;
client_to_server_pull_waiter_.Wake();
break;
case ClientToServerPullState::kIdle:
case ClientToServerPullState::kReading:
case ClientToServerPullState::kProcessingClientToServerMessage:
LOG(FATAL) << "Out of order FinishPullClientInitialMetadata";
break;
case ClientToServerPullState::kTerminated:
break;
}
}
Poll<ValueOrFailure<bool>> CallState::PollPullClientToServerMessageAvailable() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPullClientToServerMessageAvailable: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_,
client_to_server_push_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
case ClientToServerPullState::kProcessingClientInitialMetadata:
return client_to_server_pull_waiter_.pending();
case ClientToServerPullState::kIdle:
client_to_server_pull_state_ = ClientToServerPullState::kReading;
ABSL_FALLTHROUGH_INTENDED;
case ClientToServerPullState::kReading:
break;
case ClientToServerPullState::kProcessingClientToServerMessage:
LOG(FATAL) << "PollPullClientToServerMessageAvailable called while "
"processing a message";
break;
case ClientToServerPullState::kTerminated:
return Failure{};
}
DCHECK_EQ(client_to_server_pull_state_, ClientToServerPullState::kReading);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
return client_to_server_push_waiter_.pending();
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
client_to_server_pull_state_ =
ClientToServerPullState::kProcessingClientToServerMessage;
return true;
case ClientToServerPushState::kPushedHalfClose:
return false;
case ClientToServerPushState::kFinished:
client_to_server_pull_state_ = ClientToServerPullState::kTerminated;
return Failure{};
}
Crash("Unreachable");
}
void CallState::FinishPullClientToServerMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullClientToServerMessage: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_,
client_to_server_push_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
case ClientToServerPullState::kProcessingClientInitialMetadata:
LOG(FATAL) << "FinishPullClientToServerMessage called before Begin";
break;
case ClientToServerPullState::kIdle:
LOG(FATAL) << "FinishPullClientToServerMessage called twice";
break;
case ClientToServerPullState::kReading:
LOG(FATAL) << "FinishPullClientToServerMessage called before "
"PollPullClientToServerMessageAvailable";
break;
case ClientToServerPullState::kProcessingClientToServerMessage:
client_to_server_pull_state_ = ClientToServerPullState::kIdle;
client_to_server_pull_waiter_.Wake();
break;
case ClientToServerPullState::kTerminated:
break;
}
switch (client_to_server_push_state_) {
case ClientToServerPushState::kPushedMessage:
client_to_server_push_state_ = ClientToServerPushState::kIdle;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kIdle:
case ClientToServerPushState::kPushedHalfClose:
LOG(FATAL) << "FinishPullClientToServerMessage called without a message";
break;
case ClientToServerPushState::kPushedMessageAndHalfClosed:
client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kFinished:
break;
}
}
StatusFlag CallState::PushServerInitialMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PushServerInitialMetadata: "
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_,
server_trailing_metadata_state_);
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
return Failure{};
}
CHECK_EQ(server_to_client_push_state_, ServerToClientPushState::kStart);
server_to_client_push_state_ =
ServerToClientPushState::kPushedServerInitialMetadata;
server_to_client_push_waiter_.Wake();
return Success{};
}
void CallState::BeginPushServerToClientMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] BeginPushServerToClientMessage: "
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_);
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
LOG(FATAL) << "BeginPushServerToClientMessage called before "
"PushServerInitialMetadata";
break;
case ServerToClientPushState::kPushedServerInitialMetadata:
server_to_client_push_state_ =
ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage;
break;
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
case ServerToClientPushState::kPushedMessage:
LOG(FATAL) << "BeginPushServerToClientMessage called twice concurrently";
break;
case ServerToClientPushState::kTrailersOnly:
// Will fail in poll.
break;
case ServerToClientPushState::kIdle:
server_to_client_push_state_ = ServerToClientPushState::kPushedMessage;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kFinished:
break;
}
}
Poll<StatusFlag> CallState::PollPushServerToClientMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPushServerToClientMessage: "
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_);
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
case ServerToClientPushState::kPushedServerInitialMetadata:
LOG(FATAL) << "PollPushServerToClientMessage called before "
<< "PushServerInitialMetadata";
case ServerToClientPushState::kTrailersOnly:
return false;
case ServerToClientPushState::kPushedMessage:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kIdle:
return Success{};
case ServerToClientPushState::kFinished:
return Failure{};
}
Crash("Unreachable");
}
bool CallState::PushServerTrailingMetadata(bool cancel) {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PushServerTrailingMetadata: "
<< GRPC_DUMP_ARGS(this, cancel, server_trailing_metadata_state_,
server_to_client_push_state_,
client_to_server_push_state_,
server_trailing_metadata_waiter_.DebugString());
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
return false;
}
server_trailing_metadata_state_ =
cancel ? ServerTrailingMetadataState::kPushedCancel
: ServerTrailingMetadataState::kPushed;
server_trailing_metadata_waiter_.Wake();
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
server_to_client_push_state_ = ServerToClientPushState::kTrailersOnly;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
case ServerToClientPushState::kPushedMessage:
if (cancel) {
server_to_client_push_state_ = ServerToClientPushState::kFinished;
server_to_client_push_waiter_.Wake();
}
break;
case ServerToClientPushState::kIdle:
if (cancel) {
server_to_client_push_state_ = ServerToClientPushState::kFinished;
server_to_client_push_waiter_.Wake();
}
break;
case ServerToClientPushState::kFinished:
case ServerToClientPushState::kTrailersOnly:
break;
}
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
client_to_server_push_state_ = ClientToServerPushState::kFinished;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
client_to_server_push_state_ = ClientToServerPushState::kFinished;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kPushedHalfClose:
case ClientToServerPushState::kFinished:
break;
}
return true;
}
Poll<bool> CallState::PollPullServerInitialMetadataAvailable() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPullServerInitialMetadataAvailable: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_to_client_push_state_);
bool reading;
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kUnstartedReading:
if (server_to_client_push_state_ ==
ServerToClientPushState::kTrailersOnly) {
server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
return false;
}
server_to_client_push_waiter_.pending();
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kStartedReading:
reading = true;
break;
case ServerToClientPullState::kStarted:
reading = false;
break;
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
case ServerToClientPullState::kIdle:
case ServerToClientPullState::kReading:
case ServerToClientPullState::kProcessingServerToClientMessage:
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "PollPullServerInitialMetadataAvailable called twice";
case ServerToClientPullState::kTerminated:
return false;
}
DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kStarted ||
server_to_client_pull_state_ ==
ServerToClientPullState::kStartedReading)
<< server_to_client_pull_state_;
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
server_to_client_pull_state_ =
reading
? ServerToClientPullState::kProcessingServerInitialMetadataReading
: ServerToClientPullState::kProcessingServerInitialMetadata;
server_to_client_pull_waiter_.Wake();
return true;
case ServerToClientPushState::kIdle:
case ServerToClientPushState::kPushedMessage:
LOG(FATAL)
<< "PollPullServerInitialMetadataAvailable after metadata processed";
case ServerToClientPushState::kFinished:
server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
server_to_client_pull_waiter_.Wake();
return false;
case ServerToClientPushState::kTrailersOnly:
return false;
}
Crash("Unreachable");
}
void CallState::FinishPullServerInitialMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullServerInitialMetadata: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kUnstartedReading:
LOG(FATAL) << "FinishPullServerInitialMetadata called before Start";
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kStartedReading:
CHECK_EQ(server_to_client_push_state_,
ServerToClientPushState::kTrailersOnly);
return;
case ServerToClientPullState::kProcessingServerInitialMetadata:
server_to_client_pull_state_ = ServerToClientPullState::kIdle;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
server_to_client_pull_state_ = ServerToClientPullState::kReading;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kIdle:
case ServerToClientPullState::kReading:
case ServerToClientPullState::kProcessingServerToClientMessage:
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "Out of order FinishPullServerInitialMetadata";
case ServerToClientPullState::kTerminated:
return;
}
DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kIdle ||
server_to_client_pull_state_ == ServerToClientPullState::kReading)
<< server_to_client_pull_state_;
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
LOG(FATAL) << "FinishPullServerInitialMetadata called before initial "
"metadata consumed";
case ServerToClientPushState::kPushedServerInitialMetadata:
server_to_client_push_state_ = ServerToClientPushState::kIdle;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
server_to_client_push_state_ = ServerToClientPushState::kPushedMessage;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kIdle:
case ServerToClientPushState::kPushedMessage:
case ServerToClientPushState::kTrailersOnly:
case ServerToClientPushState::kFinished:
LOG(FATAL) << "FinishPullServerInitialMetadata called twice";
}
}
Poll<ValueOrFailure<bool>> CallState::PollPullServerToClientMessageAvailable() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollPullServerToClientMessageAvailable: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_to_client_push_state_,
server_trailing_metadata_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
server_to_client_pull_state_ = ServerToClientPullState::kUnstartedReading;
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kProcessingServerInitialMetadata:
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerInitialMetadataReading;
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kUnstartedReading:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kStarted:
server_to_client_pull_state_ = ServerToClientPullState::kStartedReading;
ABSL_FALLTHROUGH_INTENDED;
case ServerToClientPullState::kStartedReading:
if (server_to_client_push_state_ ==
ServerToClientPushState::kTrailersOnly) {
return false;
}
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kIdle:
server_to_client_pull_state_ = ServerToClientPullState::kReading;
ABSL_FALLTHROUGH_INTENDED;
case ServerToClientPullState::kReading:
break;
case ServerToClientPullState::kProcessingServerToClientMessage:
LOG(FATAL) << "PollPullServerToClientMessageAvailable called while "
"processing a message";
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "PollPullServerToClientMessageAvailable called while "
"processing trailing metadata";
case ServerToClientPullState::kTerminated:
return Failure{};
}
DCHECK_EQ(server_to_client_pull_state_, ServerToClientPullState::kReading);
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kIdle:
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
return false;
}
server_trailing_metadata_waiter_.pending();
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kTrailersOnly:
DCHECK_NE(server_trailing_metadata_state_,
ServerTrailingMetadataState::kNotPushed);
return false;
case ServerToClientPushState::kPushedMessage:
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerToClientMessage;
server_to_client_pull_waiter_.Wake();
return true;
case ServerToClientPushState::kFinished:
server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
server_to_client_pull_waiter_.Wake();
return Failure{};
}
Crash("Unreachable");
}
void CallState::FinishPullServerToClientMessage() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullServerToClientMessage: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_to_client_push_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kUnstartedReading:
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kStartedReading:
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
LOG(FATAL)
<< "FinishPullServerToClientMessage called before metadata available";
case ServerToClientPullState::kIdle:
LOG(FATAL) << "FinishPullServerToClientMessage called twice";
case ServerToClientPullState::kReading:
LOG(FATAL) << "FinishPullServerToClientMessage called before "
<< "PollPullServerToClientMessageAvailable";
case ServerToClientPullState::kProcessingServerToClientMessage:
server_to_client_pull_state_ = ServerToClientPullState::kIdle;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "FinishPullServerToClientMessage called while processing "
"trailing metadata";
case ServerToClientPullState::kTerminated:
break;
}
switch (server_to_client_push_state_) {
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kStart:
LOG(FATAL) << "FinishPullServerToClientMessage called before initial "
"metadata consumed";
case ServerToClientPushState::kTrailersOnly:
LOG(FATAL) << "FinishPullServerToClientMessage called after "
"PushServerTrailingMetadata";
case ServerToClientPushState::kPushedMessage:
server_to_client_push_state_ = ServerToClientPushState::kIdle;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kIdle:
LOG(FATAL) << "FinishPullServerToClientMessage called without a message";
case ServerToClientPushState::kFinished:
break;
}
}
Poll<Empty> CallState::PollServerTrailingMetadataAvailable() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollServerTrailingMetadataAvailable: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_to_client_push_state_,
server_trailing_metadata_state_,
server_trailing_metadata_waiter_.DebugString());
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kProcessingServerToClientMessage:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
case ServerToClientPullState::kUnstartedReading:
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kStartedReading:
case ServerToClientPullState::kReading:
switch (server_to_client_push_state_) {
case ServerToClientPushState::kTrailersOnly:
case ServerToClientPushState::kIdle:
case ServerToClientPushState::kStart:
case ServerToClientPushState::kFinished:
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerTrailingMetadata;
server_to_client_pull_waiter_.Wake();
return Empty{};
}
ABSL_FALLTHROUGH_INTENDED;
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::
kPushedServerInitialMetadataAndPushedMessage:
case ServerToClientPushState::kPushedMessage:
server_to_client_push_waiter_.pending();
return server_to_client_pull_waiter_.pending();
}
break;
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kIdle:
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerTrailingMetadata;
server_to_client_pull_waiter_.Wake();
return Empty{};
}
return server_trailing_metadata_waiter_.pending();
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "PollServerTrailingMetadataAvailable called twice";
case ServerToClientPullState::kTerminated:
return Empty{};
}
Crash("Unreachable");
}
void CallState::FinishPullServerTrailingMetadata() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] FinishPullServerTrailingMetadata: "
<< GRPC_DUMP_ARGS(this, server_trailing_metadata_state_,
server_trailing_metadata_waiter_.DebugString());
switch (server_trailing_metadata_state_) {
case ServerTrailingMetadataState::kNotPushed:
LOG(FATAL) << "FinishPullServerTrailingMetadata called before "
"PollServerTrailingMetadataAvailable";
case ServerTrailingMetadataState::kPushed:
server_trailing_metadata_state_ = ServerTrailingMetadataState::kPulled;
server_trailing_metadata_waiter_.Wake();
break;
case ServerTrailingMetadataState::kPushedCancel:
server_trailing_metadata_state_ =
ServerTrailingMetadataState::kPulledCancel;
server_trailing_metadata_waiter_.Wake();
break;
case ServerTrailingMetadataState::kPulled:
case ServerTrailingMetadataState::kPulledCancel:
LOG(FATAL) << "FinishPullServerTrailingMetadata called twice";
}
}
Poll<bool> CallState::PollWasCancelled() {
GRPC_TRACE_LOG(call, INFO)
<< "[call_state] PollWasCancelled: "
<< GRPC_DUMP_ARGS(this, server_trailing_metadata_state_);
switch (server_trailing_metadata_state_) {
case ServerTrailingMetadataState::kNotPushed:
case ServerTrailingMetadataState::kPushed:
case ServerTrailingMetadataState::kPushedCancel: {
return server_trailing_metadata_waiter_.pending();
}
case ServerTrailingMetadataState::kPulled:
return false;
case ServerTrailingMetadataState::kPulledCancel:
return true;
}
Crash("Unreachable");
}
std::string CallState::DebugString() const {
return absl::StrCat(
"client_to_server_pull_state:", client_to_server_pull_state_,
" client_to_server_push_state:", client_to_server_push_state_,
" server_to_client_pull_state:", server_to_client_pull_state_,
" server_to_client_message_push_state:", server_to_client_push_state_,
" server_trailing_metadata_state:", server_trailing_metadata_state_,
client_to_server_push_waiter_.DebugString(),
" server_to_client_push_waiter:",
server_to_client_push_waiter_.DebugString(),
" client_to_server_pull_waiter:",
client_to_server_pull_waiter_.DebugString(),
" server_to_client_pull_waiter:",
server_to_client_pull_waiter_.DebugString(),
" server_trailing_metadata_waiter:",
server_trailing_metadata_waiter_.DebugString());
}
static_assert(sizeof(CallState) <= 16, "CallState too large");
} // namespace filters_detail
} // namespace grpc_core } // namespace grpc_core

@ -35,6 +35,7 @@
#include "src/core/lib/promise/status_flag.h" #include "src/core/lib/promise/status_flag.h"
#include "src/core/lib/promise/try_seq.h" #include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/transport/call_final_info.h" #include "src/core/lib/transport/call_final_info.h"
#include "src/core/lib/transport/call_state.h"
#include "src/core/lib/transport/message.h" #include "src/core/lib/transport/message.h"
#include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/metadata.h"
@ -1099,244 +1100,6 @@ class OperationExecutor {
const Operator<T>* end_ops_; const Operator<T>* end_ops_;
}; };
class CallState {
public:
CallState();
// Start the call: allows pulls to proceed
void Start();
// PUSH: client -> server
void BeginPushClientToServerMessage();
Poll<StatusFlag> PollPushClientToServerMessage();
void ClientToServerHalfClose();
// PULL: client -> server
void BeginPullClientInitialMetadata();
void FinishPullClientInitialMetadata();
Poll<ValueOrFailure<bool>> PollPullClientToServerMessageAvailable();
void FinishPullClientToServerMessage();
// PUSH: server -> client
StatusFlag PushServerInitialMetadata();
void BeginPushServerToClientMessage();
Poll<StatusFlag> PollPushServerToClientMessage();
bool PushServerTrailingMetadata(bool cancel);
// PULL: server -> client
Poll<bool> PollPullServerInitialMetadataAvailable();
void FinishPullServerInitialMetadata();
Poll<ValueOrFailure<bool>> PollPullServerToClientMessageAvailable();
void FinishPullServerToClientMessage();
Poll<Empty> PollServerTrailingMetadataAvailable();
void FinishPullServerTrailingMetadata();
Poll<bool> PollWasCancelled();
// Debug
std::string DebugString() const;
friend std::ostream& operator<<(std::ostream& out,
const CallState& call_state) {
return out << call_state.DebugString();
}
private:
enum class ClientToServerPullState : uint16_t {
// Ready to read: client initial metadata is there, but not yet processed
kBegin,
// Processing client initial metadata
kProcessingClientInitialMetadata,
// Main call loop: not reading
kIdle,
// Main call loop: reading but no message available
kReading,
// Main call loop: processing one message
kProcessingClientToServerMessage,
// Processing complete
kTerminated,
};
static const char* ClientToServerPullStateString(
ClientToServerPullState state) {
switch (state) {
case ClientToServerPullState::kBegin:
return "Begin";
case ClientToServerPullState::kProcessingClientInitialMetadata:
return "ProcessingClientInitialMetadata";
case ClientToServerPullState::kIdle:
return "Idle";
case ClientToServerPullState::kReading:
return "Reading";
case ClientToServerPullState::kProcessingClientToServerMessage:
return "ProcessingClientToServerMessage";
case ClientToServerPullState::kTerminated:
return "Terminated";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ClientToServerPullState state) {
out.Append(ClientToServerPullStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ClientToServerPullState state) {
return out << ClientToServerPullStateString(state);
}
enum class ClientToServerPushState : uint16_t {
kIdle,
kPushedMessage,
kPushedHalfClose,
kPushedMessageAndHalfClosed,
kFinished,
};
static const char* ClientToServerPushStateString(
ClientToServerPushState state) {
switch (state) {
case ClientToServerPushState::kIdle:
return "Idle";
case ClientToServerPushState::kPushedMessage:
return "PushedMessage";
case ClientToServerPushState::kPushedHalfClose:
return "PushedHalfClose";
case ClientToServerPushState::kPushedMessageAndHalfClosed:
return "PushedMessageAndHalfClosed";
case ClientToServerPushState::kFinished:
return "Finished";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ClientToServerPushState state) {
out.Append(ClientToServerPushStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ClientToServerPushState state) {
return out << ClientToServerPushStateString(state);
}
enum class ServerToClientPullState : uint16_t {
// Not yet started: cannot read
kUnstarted,
kUnstartedReading,
kStarted,
kStartedReading,
// Processing server initial metadata
kProcessingServerInitialMetadata,
kProcessingServerInitialMetadataReading,
// Main call loop: not reading
kIdle,
// Main call loop: reading but no message available
kReading,
// Main call loop: processing one message
kProcessingServerToClientMessage,
// Processing server trailing metadata
kProcessingServerTrailingMetadata,
kTerminated,
};
static const char* ServerToClientPullStateString(
ServerToClientPullState state) {
switch (state) {
case ServerToClientPullState::kUnstarted:
return "Unstarted";
case ServerToClientPullState::kUnstartedReading:
return "UnstartedReading";
case ServerToClientPullState::kStarted:
return "Started";
case ServerToClientPullState::kStartedReading:
return "StartedReading";
case ServerToClientPullState::kProcessingServerInitialMetadata:
return "ProcessingServerInitialMetadata";
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
return "ProcessingServerInitialMetadataReading";
case ServerToClientPullState::kIdle:
return "Idle";
case ServerToClientPullState::kReading:
return "Reading";
case ServerToClientPullState::kProcessingServerToClientMessage:
return "ProcessingServerToClientMessage";
case ServerToClientPullState::kProcessingServerTrailingMetadata:
return "ProcessingServerTrailingMetadata";
case ServerToClientPullState::kTerminated:
return "Terminated";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ServerToClientPullState state) {
out.Append(ServerToClientPullStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ServerToClientPullState state) {
return out << ServerToClientPullStateString(state);
}
enum class ServerToClientPushState : uint16_t {
kStart,
kPushedServerInitialMetadata,
kPushedServerInitialMetadataAndPushedMessage,
kTrailersOnly,
kIdle,
kPushedMessage,
kFinished,
};
static const char* ServerToClientPushStateString(
ServerToClientPushState state) {
switch (state) {
case ServerToClientPushState::kStart:
return "Start";
case ServerToClientPushState::kPushedServerInitialMetadata:
return "PushedServerInitialMetadata";
case ServerToClientPushState::
kPushedServerInitialMetadataAndPushedMessage:
return "PushedServerInitialMetadataAndPushedMessage";
case ServerToClientPushState::kTrailersOnly:
return "TrailersOnly";
case ServerToClientPushState::kIdle:
return "Idle";
case ServerToClientPushState::kPushedMessage:
return "PushedMessage";
case ServerToClientPushState::kFinished:
return "Finished";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ServerToClientPushState state) {
out.Append(ServerToClientPushStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ServerToClientPushState state) {
return out << ServerToClientPushStateString(state);
}
enum class ServerTrailingMetadataState : uint16_t {
kNotPushed,
kPushed,
kPushedCancel,
kPulled,
kPulledCancel,
};
static const char* ServerTrailingMetadataStateString(
ServerTrailingMetadataState state) {
switch (state) {
case ServerTrailingMetadataState::kNotPushed:
return "NotPushed";
case ServerTrailingMetadataState::kPushed:
return "Pushed";
case ServerTrailingMetadataState::kPushedCancel:
return "PushedCancel";
case ServerTrailingMetadataState::kPulled:
return "Pulled";
case ServerTrailingMetadataState::kPulledCancel:
return "PulledCancel";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ServerTrailingMetadataState state) {
out.Append(ServerTrailingMetadataStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ServerTrailingMetadataState state) {
return out << ServerTrailingMetadataStateString(state);
}
ClientToServerPullState client_to_server_pull_state_ : 3;
ClientToServerPushState client_to_server_push_state_ : 3;
ServerToClientPullState server_to_client_pull_state_ : 4;
ServerToClientPushState server_to_client_push_state_ : 3;
ServerTrailingMetadataState server_trailing_metadata_state_ : 3;
IntraActivityWaiter client_to_server_pull_waiter_;
IntraActivityWaiter server_to_client_pull_waiter_;
IntraActivityWaiter client_to_server_push_waiter_;
IntraActivityWaiter server_to_client_push_waiter_;
IntraActivityWaiter server_trailing_metadata_waiter_;
};
template <typename Fn> template <typename Fn>
class ServerTrailingMetadataInterceptor { class ServerTrailingMetadataInterceptor {
public: public:
@ -1513,8 +1276,7 @@ class CallFilters {
} }
private: private:
template <typename Output, void (filters_detail::CallState::*on_done)(), template <typename Output, void (CallState::*on_done)(), typename Input>
typename Input>
Poll<ValueOrFailure<Output>> FinishStep( Poll<ValueOrFailure<Output>> FinishStep(
Poll<filters_detail::ResultOr<Input>> p) { Poll<filters_detail::ResultOr<Input>> p) {
auto* r = p.value_if_ready(); auto* r = p.value_if_ready();
@ -1530,7 +1292,7 @@ class CallFilters {
template <typename Output, typename Input, template <typename Output, typename Input,
Input(CallFilters::*input_location), Input(CallFilters::*input_location),
filters_detail::Layout<Input>(filters_detail::StackData::*layout), filters_detail::Layout<Input>(filters_detail::StackData::*layout),
void (filters_detail::CallState::*on_done)()> void (CallState::*on_done)()>
auto RunExecutor() { auto RunExecutor() {
DCHECK_NE((this->*input_location).get(), nullptr); DCHECK_NE((this->*input_location).get(), nullptr);
filters_detail::OperationExecutor<Input> executor; filters_detail::OperationExecutor<Input> executor;
@ -1549,11 +1311,10 @@ class CallFilters {
// Returns a promise that resolves to ValueOrFailure<ClientMetadataHandle> // Returns a promise that resolves to ValueOrFailure<ClientMetadataHandle>
GRPC_MUST_USE_RESULT auto PullClientInitialMetadata() { GRPC_MUST_USE_RESULT auto PullClientInitialMetadata() {
call_state_.BeginPullClientInitialMetadata(); call_state_.BeginPullClientInitialMetadata();
return RunExecutor< return RunExecutor<ClientMetadataHandle, ClientMetadataHandle,
ClientMetadataHandle, ClientMetadataHandle,
&CallFilters::push_client_initial_metadata_, &CallFilters::push_client_initial_metadata_,
&filters_detail::StackData::client_initial_metadata, &filters_detail::StackData::client_initial_metadata,
&filters_detail::CallState::FinishPullClientInitialMetadata>(); &CallState::FinishPullClientInitialMetadata>();
} }
// Server: Push server initial metadata // Server: Push server initial metadata
// Returns a promise that resolves to a StatusFlag indicating success // Returns a promise that resolves to a StatusFlag indicating success
@ -1578,8 +1339,7 @@ class CallFilters {
ServerMetadataHandle, ServerMetadataHandle,
&CallFilters::push_server_initial_metadata_, &CallFilters::push_server_initial_metadata_,
&filters_detail::StackData::server_initial_metadata, &filters_detail::StackData::server_initial_metadata,
&filters_detail::CallState:: &CallState::FinishPullServerInitialMetadata>(),
FinishPullServerInitialMetadata>(),
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> r) { [](ValueOrFailure<absl::optional<ServerMetadataHandle>> r) {
if (r.ok()) return std::move(*r); if (r.ok()) return std::move(*r);
return absl::optional<ServerMetadataHandle>{}; return absl::optional<ServerMetadataHandle>{};
@ -1616,8 +1376,7 @@ class CallFilters {
absl::optional<MessageHandle>, MessageHandle, absl::optional<MessageHandle>, MessageHandle,
&CallFilters::push_client_to_server_message_, &CallFilters::push_client_to_server_message_,
&filters_detail::StackData::client_to_server_messages, &filters_detail::StackData::client_to_server_messages,
&filters_detail::CallState:: &CallState::FinishPullClientToServerMessage>();
FinishPullClientToServerMessage>();
}, },
[]() -> ValueOrFailure<absl::optional<MessageHandle>> { []() -> ValueOrFailure<absl::optional<MessageHandle>> {
return absl::optional<MessageHandle>(); return absl::optional<MessageHandle>();
@ -1646,8 +1405,7 @@ class CallFilters {
absl::optional<MessageHandle>, MessageHandle, absl::optional<MessageHandle>, MessageHandle,
&CallFilters::push_server_to_client_message_, &CallFilters::push_server_to_client_message_,
&filters_detail::StackData::server_to_client_messages, &filters_detail::StackData::server_to_client_messages,
&filters_detail::CallState:: &CallState::FinishPullServerToClientMessage>();
FinishPullServerToClientMessage>();
}, },
[]() -> ValueOrFailure<absl::optional<MessageHandle>> { []() -> ValueOrFailure<absl::optional<MessageHandle>> {
return absl::optional<MessageHandle>(); return absl::optional<MessageHandle>();
@ -1691,7 +1449,7 @@ class CallFilters {
RefCountedPtr<Stack> stack_; RefCountedPtr<Stack> stack_;
filters_detail::CallState call_state_; CallState call_state_;
void* call_data_; void* call_data_;
ClientMetadataHandle push_client_initial_metadata_; ClientMetadataHandle push_client_initial_metadata_;

@ -0,0 +1,39 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/transport/call_state.h"
namespace grpc_core {
std::string CallState::DebugString() const {
return absl::StrCat(
"client_to_server_pull_state:", client_to_server_pull_state_,
" client_to_server_push_state:", client_to_server_push_state_,
" server_to_client_pull_state:", server_to_client_pull_state_,
" server_to_client_message_push_state:", server_to_client_push_state_,
" server_trailing_metadata_state:", server_trailing_metadata_state_,
client_to_server_push_waiter_.DebugString(),
" server_to_client_push_waiter:",
server_to_client_push_waiter_.DebugString(),
" client_to_server_pull_waiter:",
client_to_server_pull_waiter_.DebugString(),
" server_to_client_pull_waiter:",
server_to_client_pull_waiter_.DebugString(),
" server_trailing_metadata_waiter:",
server_trailing_metadata_waiter_.DebugString());
}
static_assert(sizeof(CallState) <= 16, "CallState too large");
} // namespace grpc_core

@ -0,0 +1,957 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H
#define GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H
#include "absl/types/optional.h"
#include <grpc/support/port_platform.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/status_flag.h"
namespace grpc_core {
class CallState {
public:
CallState();
// Start the call: allows pulls to proceed
void Start();
// PUSH: client -> server
void BeginPushClientToServerMessage();
Poll<StatusFlag> PollPushClientToServerMessage();
void ClientToServerHalfClose();
// PULL: client -> server
void BeginPullClientInitialMetadata();
void FinishPullClientInitialMetadata();
Poll<ValueOrFailure<bool>> PollPullClientToServerMessageAvailable();
void FinishPullClientToServerMessage();
// PUSH: server -> client
StatusFlag PushServerInitialMetadata();
void BeginPushServerToClientMessage();
Poll<StatusFlag> PollPushServerToClientMessage();
bool PushServerTrailingMetadata(bool cancel);
// PULL: server -> client
Poll<bool> PollPullServerInitialMetadataAvailable();
void FinishPullServerInitialMetadata();
Poll<ValueOrFailure<bool>> PollPullServerToClientMessageAvailable();
void FinishPullServerToClientMessage();
Poll<Empty> PollServerTrailingMetadataAvailable();
void FinishPullServerTrailingMetadata();
Poll<bool> PollWasCancelled();
// Debug
std::string DebugString() const;
friend std::ostream& operator<<(std::ostream& out,
const CallState& call_state) {
return out << call_state.DebugString();
}
private:
enum class ClientToServerPullState : uint16_t {
// Ready to read: client initial metadata is there, but not yet processed
kBegin,
// Processing client initial metadata
kProcessingClientInitialMetadata,
// Main call loop: not reading
kIdle,
// Main call loop: reading but no message available
kReading,
// Main call loop: processing one message
kProcessingClientToServerMessage,
// Processing complete
kTerminated,
};
static const char* ClientToServerPullStateString(
ClientToServerPullState state) {
switch (state) {
case ClientToServerPullState::kBegin:
return "Begin";
case ClientToServerPullState::kProcessingClientInitialMetadata:
return "ProcessingClientInitialMetadata";
case ClientToServerPullState::kIdle:
return "Idle";
case ClientToServerPullState::kReading:
return "Reading";
case ClientToServerPullState::kProcessingClientToServerMessage:
return "ProcessingClientToServerMessage";
case ClientToServerPullState::kTerminated:
return "Terminated";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ClientToServerPullState state) {
out.Append(ClientToServerPullStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ClientToServerPullState state) {
return out << ClientToServerPullStateString(state);
}
enum class ClientToServerPushState : uint16_t {
kIdle,
kPushedMessage,
kPushedHalfClose,
kPushedMessageAndHalfClosed,
kFinished,
};
static const char* ClientToServerPushStateString(
ClientToServerPushState state) {
switch (state) {
case ClientToServerPushState::kIdle:
return "Idle";
case ClientToServerPushState::kPushedMessage:
return "PushedMessage";
case ClientToServerPushState::kPushedHalfClose:
return "PushedHalfClose";
case ClientToServerPushState::kPushedMessageAndHalfClosed:
return "PushedMessageAndHalfClosed";
case ClientToServerPushState::kFinished:
return "Finished";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ClientToServerPushState state) {
out.Append(ClientToServerPushStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ClientToServerPushState state) {
return out << ClientToServerPushStateString(state);
}
enum class ServerToClientPullState : uint16_t {
// Not yet started: cannot read
kUnstarted,
kUnstartedReading,
kStarted,
kStartedReading,
// Processing server initial metadata
kProcessingServerInitialMetadata,
kProcessingServerInitialMetadataReading,
// Main call loop: not reading
kIdle,
// Main call loop: reading but no message available
kReading,
// Main call loop: processing one message
kProcessingServerToClientMessage,
// Processing server trailing metadata
kProcessingServerTrailingMetadata,
kTerminated,
};
static const char* ServerToClientPullStateString(
ServerToClientPullState state) {
switch (state) {
case ServerToClientPullState::kUnstarted:
return "Unstarted";
case ServerToClientPullState::kUnstartedReading:
return "UnstartedReading";
case ServerToClientPullState::kStarted:
return "Started";
case ServerToClientPullState::kStartedReading:
return "StartedReading";
case ServerToClientPullState::kProcessingServerInitialMetadata:
return "ProcessingServerInitialMetadata";
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
return "ProcessingServerInitialMetadataReading";
case ServerToClientPullState::kIdle:
return "Idle";
case ServerToClientPullState::kReading:
return "Reading";
case ServerToClientPullState::kProcessingServerToClientMessage:
return "ProcessingServerToClientMessage";
case ServerToClientPullState::kProcessingServerTrailingMetadata:
return "ProcessingServerTrailingMetadata";
case ServerToClientPullState::kTerminated:
return "Terminated";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ServerToClientPullState state) {
out.Append(ServerToClientPullStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ServerToClientPullState state) {
return out << ServerToClientPullStateString(state);
}
enum class ServerToClientPushState : uint16_t {
kStart,
kPushedServerInitialMetadata,
kPushedServerInitialMetadataAndPushedMessage,
kTrailersOnly,
kIdle,
kPushedMessage,
kFinished,
};
static const char* ServerToClientPushStateString(
ServerToClientPushState state) {
switch (state) {
case ServerToClientPushState::kStart:
return "Start";
case ServerToClientPushState::kPushedServerInitialMetadata:
return "PushedServerInitialMetadata";
case ServerToClientPushState::
kPushedServerInitialMetadataAndPushedMessage:
return "PushedServerInitialMetadataAndPushedMessage";
case ServerToClientPushState::kTrailersOnly:
return "TrailersOnly";
case ServerToClientPushState::kIdle:
return "Idle";
case ServerToClientPushState::kPushedMessage:
return "PushedMessage";
case ServerToClientPushState::kFinished:
return "Finished";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ServerToClientPushState state) {
out.Append(ServerToClientPushStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ServerToClientPushState state) {
return out << ServerToClientPushStateString(state);
}
enum class ServerTrailingMetadataState : uint16_t {
kNotPushed,
kPushed,
kPushedCancel,
kPulled,
kPulledCancel,
};
static const char* ServerTrailingMetadataStateString(
ServerTrailingMetadataState state) {
switch (state) {
case ServerTrailingMetadataState::kNotPushed:
return "NotPushed";
case ServerTrailingMetadataState::kPushed:
return "Pushed";
case ServerTrailingMetadataState::kPushedCancel:
return "PushedCancel";
case ServerTrailingMetadataState::kPulled:
return "Pulled";
case ServerTrailingMetadataState::kPulledCancel:
return "PulledCancel";
}
}
template <typename Sink>
friend void AbslStringify(Sink& out, ServerTrailingMetadataState state) {
out.Append(ServerTrailingMetadataStateString(state));
}
friend std::ostream& operator<<(std::ostream& out,
ServerTrailingMetadataState state) {
return out << ServerTrailingMetadataStateString(state);
}
ClientToServerPullState client_to_server_pull_state_ : 3;
ClientToServerPushState client_to_server_push_state_ : 3;
ServerToClientPullState server_to_client_pull_state_ : 4;
ServerToClientPushState server_to_client_push_state_ : 3;
ServerTrailingMetadataState server_trailing_metadata_state_ : 3;
IntraActivityWaiter client_to_server_pull_waiter_;
IntraActivityWaiter server_to_client_pull_waiter_;
IntraActivityWaiter client_to_server_push_waiter_;
IntraActivityWaiter server_to_client_push_waiter_;
IntraActivityWaiter server_trailing_metadata_waiter_;
};
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline CallState::CallState()
: client_to_server_pull_state_(ClientToServerPullState::kBegin),
client_to_server_push_state_(ClientToServerPushState::kIdle),
server_to_client_pull_state_(ServerToClientPullState::kUnstarted),
server_to_client_push_state_(ServerToClientPushState::kStart),
server_trailing_metadata_state_(ServerTrailingMetadataState::kNotPushed) {
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void CallState::Start() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] Start: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
server_to_client_pull_state_ = ServerToClientPullState::kStarted;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kUnstartedReading:
server_to_client_pull_state_ = ServerToClientPullState::kStartedReading;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kStartedReading:
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
case ServerToClientPullState::kIdle:
case ServerToClientPullState::kReading:
case ServerToClientPullState::kProcessingServerToClientMessage:
LOG(FATAL) << "Start called twice";
case ServerToClientPullState::kProcessingServerTrailingMetadata:
case ServerToClientPullState::kTerminated:
break;
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::BeginPushClientToServerMessage() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] BeginPushClientToServerMessage: "
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
client_to_server_push_state_ = ClientToServerPushState::kPushedMessage;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
LOG(FATAL) << "PushClientToServerMessage called twice concurrently";
break;
case ClientToServerPushState::kPushedHalfClose:
LOG(FATAL) << "PushClientToServerMessage called after half-close";
break;
case ClientToServerPushState::kFinished:
break;
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<StatusFlag>
CallState::PollPushClientToServerMessage() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] PollPushClientToServerMessage: "
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
case ClientToServerPushState::kPushedHalfClose:
return Success{};
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
return client_to_server_push_waiter_.pending();
case ClientToServerPushState::kFinished:
return Failure{};
}
Crash("Unreachable");
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::ClientToServerHalfClose() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] ClientToServerHalfClose: "
<< GRPC_DUMP_ARGS(this, client_to_server_push_state_);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kPushedMessage:
client_to_server_push_state_ =
ClientToServerPushState::kPushedMessageAndHalfClosed;
break;
case ClientToServerPushState::kPushedHalfClose:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
LOG(FATAL) << "ClientToServerHalfClose called twice";
break;
case ClientToServerPushState::kFinished:
break;
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::BeginPullClientInitialMetadata() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] BeginPullClientInitialMetadata: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
client_to_server_pull_state_ =
ClientToServerPullState::kProcessingClientInitialMetadata;
break;
case ClientToServerPullState::kProcessingClientInitialMetadata:
case ClientToServerPullState::kIdle:
case ClientToServerPullState::kReading:
case ClientToServerPullState::kProcessingClientToServerMessage:
LOG(FATAL) << "BeginPullClientInitialMetadata called twice";
break;
case ClientToServerPullState::kTerminated:
break;
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::FinishPullClientInitialMetadata() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] FinishPullClientInitialMetadata: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
LOG(FATAL) << "FinishPullClientInitialMetadata called before Begin";
break;
case ClientToServerPullState::kProcessingClientInitialMetadata:
client_to_server_pull_state_ = ClientToServerPullState::kIdle;
client_to_server_pull_waiter_.Wake();
break;
case ClientToServerPullState::kIdle:
case ClientToServerPullState::kReading:
case ClientToServerPullState::kProcessingClientToServerMessage:
LOG(FATAL) << "Out of order FinishPullClientInitialMetadata";
break;
case ClientToServerPullState::kTerminated:
break;
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ValueOrFailure<bool>>
CallState::PollPullClientToServerMessageAvailable() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] PollPullClientToServerMessageAvailable: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_,
client_to_server_push_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
case ClientToServerPullState::kProcessingClientInitialMetadata:
return client_to_server_pull_waiter_.pending();
case ClientToServerPullState::kIdle:
client_to_server_pull_state_ = ClientToServerPullState::kReading;
ABSL_FALLTHROUGH_INTENDED;
case ClientToServerPullState::kReading:
break;
case ClientToServerPullState::kProcessingClientToServerMessage:
LOG(FATAL) << "PollPullClientToServerMessageAvailable called while "
"processing a message";
break;
case ClientToServerPullState::kTerminated:
return Failure{};
}
DCHECK_EQ(client_to_server_pull_state_, ClientToServerPullState::kReading);
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
return client_to_server_push_waiter_.pending();
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
client_to_server_pull_state_ =
ClientToServerPullState::kProcessingClientToServerMessage;
return true;
case ClientToServerPushState::kPushedHalfClose:
return false;
case ClientToServerPushState::kFinished:
client_to_server_pull_state_ = ClientToServerPullState::kTerminated;
return Failure{};
}
Crash("Unreachable");
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::FinishPullClientToServerMessage() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] FinishPullClientToServerMessage: "
<< GRPC_DUMP_ARGS(this, client_to_server_pull_state_,
client_to_server_push_state_);
switch (client_to_server_pull_state_) {
case ClientToServerPullState::kBegin:
case ClientToServerPullState::kProcessingClientInitialMetadata:
LOG(FATAL) << "FinishPullClientToServerMessage called before Begin";
break;
case ClientToServerPullState::kIdle:
LOG(FATAL) << "FinishPullClientToServerMessage called twice";
break;
case ClientToServerPullState::kReading:
LOG(FATAL) << "FinishPullClientToServerMessage called before "
"PollPullClientToServerMessageAvailable";
break;
case ClientToServerPullState::kProcessingClientToServerMessage:
client_to_server_pull_state_ = ClientToServerPullState::kIdle;
client_to_server_pull_waiter_.Wake();
break;
case ClientToServerPullState::kTerminated:
break;
}
switch (client_to_server_push_state_) {
case ClientToServerPushState::kPushedMessage:
client_to_server_push_state_ = ClientToServerPushState::kIdle;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kIdle:
case ClientToServerPushState::kPushedHalfClose:
LOG(FATAL) << "FinishPullClientToServerMessage called without a message";
break;
case ClientToServerPushState::kPushedMessageAndHalfClosed:
client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kFinished:
break;
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline StatusFlag
CallState::PushServerInitialMetadata() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] PushServerInitialMetadata: "
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_,
server_trailing_metadata_state_);
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
return Failure{};
}
CHECK_EQ(server_to_client_push_state_, ServerToClientPushState::kStart);
server_to_client_push_state_ =
ServerToClientPushState::kPushedServerInitialMetadata;
server_to_client_push_waiter_.Wake();
return Success{};
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::BeginPushServerToClientMessage() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] BeginPushServerToClientMessage: "
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_);
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
LOG(FATAL) << "BeginPushServerToClientMessage called before "
"PushServerInitialMetadata";
break;
case ServerToClientPushState::kPushedServerInitialMetadata:
server_to_client_push_state_ =
ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage;
break;
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
case ServerToClientPushState::kPushedMessage:
LOG(FATAL) << "BeginPushServerToClientMessage called twice concurrently";
break;
case ServerToClientPushState::kTrailersOnly:
// Will fail in poll.
break;
case ServerToClientPushState::kIdle:
server_to_client_push_state_ = ServerToClientPushState::kPushedMessage;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kFinished:
break;
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<StatusFlag>
CallState::PollPushServerToClientMessage() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] PollPushServerToClientMessage: "
<< GRPC_DUMP_ARGS(this, server_to_client_push_state_);
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
case ServerToClientPushState::kPushedServerInitialMetadata:
LOG(FATAL) << "PollPushServerToClientMessage called before "
<< "PushServerInitialMetadata";
case ServerToClientPushState::kTrailersOnly:
return false;
case ServerToClientPushState::kPushedMessage:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kIdle:
return Success{};
case ServerToClientPushState::kFinished:
return Failure{};
}
Crash("Unreachable");
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline bool
CallState::PushServerTrailingMetadata(bool cancel) {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] PushServerTrailingMetadata: "
<< GRPC_DUMP_ARGS(this, cancel, server_trailing_metadata_state_,
server_to_client_push_state_,
client_to_server_push_state_,
server_trailing_metadata_waiter_.DebugString());
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
return false;
}
server_trailing_metadata_state_ =
cancel ? ServerTrailingMetadataState::kPushedCancel
: ServerTrailingMetadataState::kPushed;
server_trailing_metadata_waiter_.Wake();
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
server_to_client_push_state_ = ServerToClientPushState::kTrailersOnly;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
case ServerToClientPushState::kPushedMessage:
if (cancel) {
server_to_client_push_state_ = ServerToClientPushState::kFinished;
server_to_client_push_waiter_.Wake();
}
break;
case ServerToClientPushState::kIdle:
if (cancel) {
server_to_client_push_state_ = ServerToClientPushState::kFinished;
server_to_client_push_waiter_.Wake();
}
break;
case ServerToClientPushState::kFinished:
case ServerToClientPushState::kTrailersOnly:
break;
}
switch (client_to_server_push_state_) {
case ClientToServerPushState::kIdle:
client_to_server_push_state_ = ClientToServerPushState::kFinished;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kPushedMessage:
case ClientToServerPushState::kPushedMessageAndHalfClosed:
client_to_server_push_state_ = ClientToServerPushState::kFinished;
client_to_server_push_waiter_.Wake();
break;
case ClientToServerPushState::kPushedHalfClose:
case ClientToServerPushState::kFinished:
break;
}
return true;
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<bool>
CallState::PollPullServerInitialMetadataAvailable() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] PollPullServerInitialMetadataAvailable: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_to_client_push_state_);
bool reading;
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kUnstartedReading:
if (server_to_client_push_state_ ==
ServerToClientPushState::kTrailersOnly) {
server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
return false;
}
server_to_client_push_waiter_.pending();
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kStartedReading:
reading = true;
break;
case ServerToClientPullState::kStarted:
reading = false;
break;
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
case ServerToClientPullState::kIdle:
case ServerToClientPullState::kReading:
case ServerToClientPullState::kProcessingServerToClientMessage:
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "PollPullServerInitialMetadataAvailable called twice";
case ServerToClientPullState::kTerminated:
return false;
}
DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kStarted ||
server_to_client_pull_state_ ==
ServerToClientPullState::kStartedReading)
<< server_to_client_pull_state_;
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
server_to_client_pull_state_ =
reading
? ServerToClientPullState::kProcessingServerInitialMetadataReading
: ServerToClientPullState::kProcessingServerInitialMetadata;
server_to_client_pull_waiter_.Wake();
return true;
case ServerToClientPushState::kIdle:
case ServerToClientPushState::kPushedMessage:
LOG(FATAL)
<< "PollPullServerInitialMetadataAvailable after metadata processed";
case ServerToClientPushState::kFinished:
server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
server_to_client_pull_waiter_.Wake();
return false;
case ServerToClientPushState::kTrailersOnly:
return false;
}
Crash("Unreachable");
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::FinishPullServerInitialMetadata() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] FinishPullServerInitialMetadata: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kUnstartedReading:
LOG(FATAL) << "FinishPullServerInitialMetadata called before Start";
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kStartedReading:
CHECK_EQ(server_to_client_push_state_,
ServerToClientPushState::kTrailersOnly);
return;
case ServerToClientPullState::kProcessingServerInitialMetadata:
server_to_client_pull_state_ = ServerToClientPullState::kIdle;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
server_to_client_pull_state_ = ServerToClientPullState::kReading;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kIdle:
case ServerToClientPullState::kReading:
case ServerToClientPullState::kProcessingServerToClientMessage:
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "Out of order FinishPullServerInitialMetadata";
case ServerToClientPullState::kTerminated:
return;
}
DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kIdle ||
server_to_client_pull_state_ == ServerToClientPullState::kReading)
<< server_to_client_pull_state_;
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
LOG(FATAL) << "FinishPullServerInitialMetadata called before initial "
"metadata consumed";
case ServerToClientPushState::kPushedServerInitialMetadata:
server_to_client_push_state_ = ServerToClientPushState::kIdle;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
server_to_client_push_state_ = ServerToClientPushState::kPushedMessage;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kIdle:
case ServerToClientPushState::kPushedMessage:
case ServerToClientPushState::kTrailersOnly:
case ServerToClientPushState::kFinished:
LOG(FATAL) << "FinishPullServerInitialMetadata called twice";
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<ValueOrFailure<bool>>
CallState::PollPullServerToClientMessageAvailable() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] PollPullServerToClientMessageAvailable: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_to_client_push_state_,
server_trailing_metadata_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
server_to_client_pull_state_ = ServerToClientPullState::kUnstartedReading;
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kProcessingServerInitialMetadata:
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerInitialMetadataReading;
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kUnstartedReading:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kStarted:
server_to_client_pull_state_ = ServerToClientPullState::kStartedReading;
ABSL_FALLTHROUGH_INTENDED;
case ServerToClientPullState::kStartedReading:
if (server_to_client_push_state_ ==
ServerToClientPushState::kTrailersOnly) {
return false;
}
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kIdle:
server_to_client_pull_state_ = ServerToClientPullState::kReading;
ABSL_FALLTHROUGH_INTENDED;
case ServerToClientPullState::kReading:
break;
case ServerToClientPullState::kProcessingServerToClientMessage:
LOG(FATAL) << "PollPullServerToClientMessageAvailable called while "
"processing a message";
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "PollPullServerToClientMessageAvailable called while "
"processing trailing metadata";
case ServerToClientPullState::kTerminated:
return Failure{};
}
DCHECK_EQ(server_to_client_pull_state_, ServerToClientPullState::kReading);
switch (server_to_client_push_state_) {
case ServerToClientPushState::kStart:
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kIdle:
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
return false;
}
server_trailing_metadata_waiter_.pending();
return server_to_client_push_waiter_.pending();
case ServerToClientPushState::kTrailersOnly:
DCHECK_NE(server_trailing_metadata_state_,
ServerTrailingMetadataState::kNotPushed);
return false;
case ServerToClientPushState::kPushedMessage:
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerToClientMessage;
server_to_client_pull_waiter_.Wake();
return true;
case ServerToClientPushState::kFinished:
server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
server_to_client_pull_waiter_.Wake();
return Failure{};
}
Crash("Unreachable");
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::FinishPullServerToClientMessage() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] FinishPullServerToClientMessage: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_to_client_push_state_);
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kUnstartedReading:
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kStartedReading:
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
LOG(FATAL)
<< "FinishPullServerToClientMessage called before metadata available";
case ServerToClientPullState::kIdle:
LOG(FATAL) << "FinishPullServerToClientMessage called twice";
case ServerToClientPullState::kReading:
LOG(FATAL) << "FinishPullServerToClientMessage called before "
<< "PollPullServerToClientMessageAvailable";
case ServerToClientPullState::kProcessingServerToClientMessage:
server_to_client_pull_state_ = ServerToClientPullState::kIdle;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "FinishPullServerToClientMessage called while processing "
"trailing metadata";
case ServerToClientPullState::kTerminated:
break;
}
switch (server_to_client_push_state_) {
case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::kStart:
LOG(FATAL) << "FinishPullServerToClientMessage called before initial "
"metadata consumed";
case ServerToClientPushState::kTrailersOnly:
LOG(FATAL) << "FinishPullServerToClientMessage called after "
"PushServerTrailingMetadata";
case ServerToClientPushState::kPushedMessage:
server_to_client_push_state_ = ServerToClientPushState::kIdle;
server_to_client_push_waiter_.Wake();
break;
case ServerToClientPushState::kIdle:
LOG(FATAL) << "FinishPullServerToClientMessage called without a message";
case ServerToClientPushState::kFinished:
break;
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<Empty>
CallState::PollServerTrailingMetadataAvailable() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] PollServerTrailingMetadataAvailable: "
<< GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
server_to_client_push_state_,
server_trailing_metadata_state_,
server_trailing_metadata_waiter_.DebugString());
switch (server_to_client_pull_state_) {
case ServerToClientPullState::kProcessingServerInitialMetadata:
case ServerToClientPullState::kProcessingServerToClientMessage:
case ServerToClientPullState::kProcessingServerInitialMetadataReading:
case ServerToClientPullState::kUnstartedReading:
return server_to_client_pull_waiter_.pending();
case ServerToClientPullState::kStartedReading:
case ServerToClientPullState::kReading:
switch (server_to_client_push_state_) {
case ServerToClientPushState::kTrailersOnly:
case ServerToClientPushState::kIdle:
case ServerToClientPushState::kStart:
case ServerToClientPushState::kFinished:
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerTrailingMetadata;
server_to_client_pull_waiter_.Wake();
return Empty{};
}
ABSL_FALLTHROUGH_INTENDED;
case ServerToClientPushState::kPushedServerInitialMetadata:
case ServerToClientPushState::
kPushedServerInitialMetadataAndPushedMessage:
case ServerToClientPushState::kPushedMessage:
server_to_client_push_waiter_.pending();
return server_to_client_pull_waiter_.pending();
}
break;
case ServerToClientPullState::kStarted:
case ServerToClientPullState::kUnstarted:
case ServerToClientPullState::kIdle:
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerTrailingMetadata;
server_to_client_pull_waiter_.Wake();
return Empty{};
}
return server_trailing_metadata_waiter_.pending();
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "PollServerTrailingMetadataAvailable called twice";
case ServerToClientPullState::kTerminated:
return Empty{};
}
Crash("Unreachable");
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::FinishPullServerTrailingMetadata() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] FinishPullServerTrailingMetadata: "
<< GRPC_DUMP_ARGS(this, server_trailing_metadata_state_,
server_trailing_metadata_waiter_.DebugString());
switch (server_trailing_metadata_state_) {
case ServerTrailingMetadataState::kNotPushed:
LOG(FATAL) << "FinishPullServerTrailingMetadata called before "
"PollServerTrailingMetadataAvailable";
case ServerTrailingMetadataState::kPushed:
server_trailing_metadata_state_ = ServerTrailingMetadataState::kPulled;
server_trailing_metadata_waiter_.Wake();
break;
case ServerTrailingMetadataState::kPushedCancel:
server_trailing_metadata_state_ =
ServerTrailingMetadataState::kPulledCancel;
server_trailing_metadata_waiter_.Wake();
break;
case ServerTrailingMetadataState::kPulled:
case ServerTrailingMetadataState::kPulledCancel:
LOG(FATAL) << "FinishPullServerTrailingMetadata called twice";
}
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<bool>
CallState::PollWasCancelled() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] PollWasCancelled: "
<< GRPC_DUMP_ARGS(this, server_trailing_metadata_state_);
switch (server_trailing_metadata_state_) {
case ServerTrailingMetadataState::kNotPushed:
case ServerTrailingMetadataState::kPushed:
case ServerTrailingMetadataState::kPushedCancel: {
return server_trailing_metadata_waiter_.pending();
}
case ServerTrailingMetadataState::kPulled:
return false;
case ServerTrailingMetadataState::kPulledCancel:
return true;
}
Crash("Unreachable");
}
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H

@ -692,6 +692,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/transport/call_filters.cc', 'src/core/lib/transport/call_filters.cc',
'src/core/lib/transport/call_final_info.cc', 'src/core/lib/transport/call_final_info.cc',
'src/core/lib/transport/call_spine.cc', 'src/core/lib/transport/call_spine.cc',
'src/core/lib/transport/call_state.cc',
'src/core/lib/transport/connectivity_state.cc', 'src/core/lib/transport/connectivity_state.cc',
'src/core/lib/transport/error_utils.cc', 'src/core/lib/transport/error_utils.cc',
'src/core/lib/transport/interception_chain.cc', 'src/core/lib/transport/interception_chain.cc',

@ -87,6 +87,21 @@ grpc_cc_test(
], ],
) )
grpc_cc_test(
name = "call_state_test",
srcs = ["call_state_test.cc"],
external_deps = [
"gtest",
],
language = "C++",
uses_event_engine = False,
uses_polling = False,
deps = [
"//src/core:call_state",
"//test/core/promise:poll_matcher",
],
)
grpc_cc_test( grpc_cc_test(
name = "connectivity_state_test", name = "connectivity_state_test",
srcs = ["connectivity_state_test.cc"], srcs = ["connectivity_state_test.cc"],

@ -1102,246 +1102,6 @@ TEST(OperationExecutorTest, PromiseTwo) {
gpr_free_aligned(call_data1); gpr_free_aligned(call_data1);
} }
///////////////////////////////////////////////////////////////////////////////
// CallState
TEST(CallStateTest, NoOp) { CallState state; }
TEST(CallStateTest, StartTwiceCrashes) {
CallState state;
state.Start();
EXPECT_DEATH(state.Start(), "");
}
TEST(CallStateTest, PullServerInitialMetadataBlocksUntilStart) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.PushServerInitialMetadata());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.Start());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady());
}
TEST(CallStateTest, PullClientInitialMetadata) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
EXPECT_DEATH(state.FinishPullClientInitialMetadata(), "");
state.BeginPullClientInitialMetadata();
state.FinishPullClientInitialMetadata();
}
TEST(CallStateTest, ClientToServerMessagesWaitForInitialMetadata) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
state.BeginPushClientToServerMessage();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
state.BeginPullClientInitialMetadata();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientInitialMetadata());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
}
TEST(CallStateTest, RepeatedClientToServerMessagesWithHalfClose) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.BeginPullClientInitialMetadata();
state.FinishPullClientInitialMetadata();
// Message 0
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// Message 1
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// Message 2: push before polling
state.BeginPushClientToServerMessage();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// Message 3: push before polling and half close
state.BeginPushClientToServerMessage();
state.ClientToServerHalfClose();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// ... and now we should see the half close
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(false));
}
TEST(CallStateTest, ImmediateClientToServerHalfClose) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.BeginPullClientInitialMetadata();
state.FinishPullClientInitialMetadata();
state.ClientToServerHalfClose();
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(false));
}
TEST(CallStateTest, ServerToClientMessagesWaitForInitialMetadata) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.Start());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.PushServerInitialMetadata());
state.BeginPushServerToClientMessage();
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity,
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(),
IsReady(true)));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerInitialMetadata());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
}
TEST(CallStateTest, RepeatedServerToClientMessages) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.PushServerInitialMetadata();
state.Start();
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(true));
state.FinishPullServerInitialMetadata();
// Message 0
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
// Message 1
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
// Message 2: push before polling
state.BeginPushServerToClientMessage();
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
// Message 3: push before polling
state.BeginPushServerToClientMessage();
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
}
TEST(CallStateTest, ReceiveTrailersOnly) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
state.PushServerTrailingMetadata(false);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
state.FinishPullServerTrailingMetadata();
}
TEST(CallStateTest, ReceiveTrailersOnlySkipsInitialMetadataOnUnstartedCalls) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.PushServerTrailingMetadata(false);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
state.FinishPullServerTrailingMetadata();
}
TEST(CallStateTest, RecallNoCancellation) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
state.PushServerTrailingMetadata(false);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
EXPECT_THAT(state.PollWasCancelled(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata());
EXPECT_THAT(state.PollWasCancelled(), IsReady(false));
}
TEST(CallStateTest, RecallCancellation) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
state.PushServerTrailingMetadata(true);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
EXPECT_THAT(state.PollWasCancelled(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata());
EXPECT_THAT(state.PollWasCancelled(), IsReady(true));
}
TEST(CallStateTest, ReceiveTrailingMetadataAfterMessageRead) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
state.PushServerInitialMetadata();
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(true));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.PushServerTrailingMetadata(false));
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(false));
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
}
} // namespace filters_detail } // namespace filters_detail
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////

@ -0,0 +1,310 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/transport/call_state.h"
#include <vector>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "test/core/promise/poll_matcher.h"
using testing::Mock;
using testing::StrictMock;
namespace grpc_core {
namespace {
// A mock activity that can be activated and deactivated.
class MockActivity : public Activity, public Wakeable {
public:
MOCK_METHOD(void, WakeupRequested, ());
void ForceImmediateRepoll(WakeupMask /*mask*/) override { WakeupRequested(); }
void Orphan() override {}
Waker MakeOwningWaker() override { return Waker(this, 0); }
Waker MakeNonOwningWaker() override { return Waker(this, 0); }
void Wakeup(WakeupMask /*mask*/) override { WakeupRequested(); }
void WakeupAsync(WakeupMask /*mask*/) override { WakeupRequested(); }
void Drop(WakeupMask /*mask*/) override {}
std::string DebugTag() const override { return "MockActivity"; }
std::string ActivityDebugTag(WakeupMask /*mask*/) const override {
return DebugTag();
}
void Activate() {
if (scoped_activity_ == nullptr) {
scoped_activity_ = std::make_unique<ScopedActivity>(this);
}
}
void Deactivate() { scoped_activity_.reset(); }
private:
std::unique_ptr<ScopedActivity> scoped_activity_;
};
#define EXPECT_WAKEUP(activity, statement) \
EXPECT_CALL((activity), WakeupRequested()).Times(::testing::AtLeast(1)); \
statement; \
Mock::VerifyAndClearExpectations(&(activity));
} // namespace
TEST(CallStateTest, NoOp) { CallState state; }
TEST(CallStateTest, StartTwiceCrashes) {
CallState state;
state.Start();
EXPECT_DEATH(state.Start(), "");
}
TEST(CallStateTest, PullServerInitialMetadataBlocksUntilStart) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.PushServerInitialMetadata());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.Start());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady());
}
TEST(CallStateTest, PullClientInitialMetadata) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
EXPECT_DEATH(state.FinishPullClientInitialMetadata(), "");
state.BeginPullClientInitialMetadata();
state.FinishPullClientInitialMetadata();
}
TEST(CallStateTest, ClientToServerMessagesWaitForInitialMetadata) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
state.BeginPushClientToServerMessage();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
state.BeginPullClientInitialMetadata();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientInitialMetadata());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
}
TEST(CallStateTest, RepeatedClientToServerMessagesWithHalfClose) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.BeginPullClientInitialMetadata();
state.FinishPullClientInitialMetadata();
// Message 0
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// Message 1
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// Message 2: push before polling
state.BeginPushClientToServerMessage();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// Message 3: push before polling and half close
state.BeginPushClientToServerMessage();
state.ClientToServerHalfClose();
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage());
EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{}));
// ... and now we should see the half close
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(false));
}
TEST(CallStateTest, ImmediateClientToServerHalfClose) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.BeginPullClientInitialMetadata();
state.FinishPullClientInitialMetadata();
state.ClientToServerHalfClose();
EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(false));
}
TEST(CallStateTest, ServerToClientMessagesWaitForInitialMetadata) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.Start());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.PushServerInitialMetadata());
state.BeginPushServerToClientMessage();
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity,
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(),
IsReady(true)));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerInitialMetadata());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
}
TEST(CallStateTest, RepeatedServerToClientMessages) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.PushServerInitialMetadata();
state.Start();
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(true));
state.FinishPullServerInitialMetadata();
// Message 0
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
// Message 1
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
// Message 2: push before polling
state.BeginPushServerToClientMessage();
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
// Message 3: push before polling
state.BeginPushServerToClientMessage();
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true));
EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage());
EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{}));
}
TEST(CallStateTest, ReceiveTrailersOnly) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
state.PushServerTrailingMetadata(false);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
state.FinishPullServerTrailingMetadata();
}
TEST(CallStateTest, ReceiveTrailersOnlySkipsInitialMetadataOnUnstartedCalls) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.PushServerTrailingMetadata(false);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
state.FinishPullServerTrailingMetadata();
}
TEST(CallStateTest, RecallNoCancellation) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
state.PushServerTrailingMetadata(false);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
EXPECT_THAT(state.PollWasCancelled(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata());
EXPECT_THAT(state.PollWasCancelled(), IsReady(false));
}
TEST(CallStateTest, RecallCancellation) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
state.PushServerTrailingMetadata(true);
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
EXPECT_THAT(state.PollWasCancelled(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata());
EXPECT_THAT(state.PollWasCancelled(), IsReady(true));
}
TEST(CallStateTest, ReceiveTrailingMetadataAfterMessageRead) {
StrictMock<MockActivity> activity;
activity.Activate();
CallState state;
state.Start();
state.PushServerInitialMetadata();
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(true));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending());
EXPECT_WAKEUP(activity, state.PushServerTrailingMetadata(false));
EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(false));
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
}
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc_tracer_init();
return RUN_ALL_TESTS();
}

@ -2699,6 +2699,8 @@ src/core/lib/transport/call_final_info.cc \
src/core/lib/transport/call_final_info.h \ src/core/lib/transport/call_final_info.h \
src/core/lib/transport/call_spine.cc \ src/core/lib/transport/call_spine.cc \
src/core/lib/transport/call_spine.h \ src/core/lib/transport/call_spine.h \
src/core/lib/transport/call_state.cc \
src/core/lib/transport/call_state.h \
src/core/lib/transport/connectivity_state.cc \ src/core/lib/transport/connectivity_state.cc \
src/core/lib/transport/connectivity_state.h \ src/core/lib/transport/connectivity_state.h \
src/core/lib/transport/custom_metadata.h \ src/core/lib/transport/custom_metadata.h \

@ -2472,6 +2472,8 @@ src/core/lib/transport/call_final_info.cc \
src/core/lib/transport/call_final_info.h \ src/core/lib/transport/call_final_info.h \
src/core/lib/transport/call_spine.cc \ src/core/lib/transport/call_spine.cc \
src/core/lib/transport/call_spine.h \ src/core/lib/transport/call_spine.h \
src/core/lib/transport/call_state.cc \
src/core/lib/transport/call_state.h \
src/core/lib/transport/connectivity_state.cc \ src/core/lib/transport/connectivity_state.cc \
src/core/lib/transport/connectivity_state.h \ src/core/lib/transport/connectivity_state.h \
src/core/lib/transport/custom_metadata.h \ src/core/lib/transport/custom_metadata.h \

@ -1525,6 +1525,30 @@
], ],
"uses_polling": false "uses_polling": false
}, },
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "call_state_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{ {
"args": [], "args": [],
"benchmark": false, "benchmark": false,

Loading…
Cancel
Save