diff --git a/CMakeLists.txt b/CMakeLists.txt index cc37c080df7..d02b6dad8ca 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -991,6 +991,7 @@ if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx call_spine_test) endif() + add_dependencies(buildtests_cxx call_state_test) add_dependencies(buildtests_cxx call_tracer_test) add_dependencies(buildtests_cxx call_utils_test) add_dependencies(buildtests_cxx cancel_after_accept_test) @@ -2526,6 +2527,7 @@ add_library(grpc src/core/lib/transport/call_filters.cc src/core/lib/transport/call_final_info.cc src/core/lib/transport/call_spine.cc + src/core/lib/transport/call_state.cc src/core/lib/transport/connectivity_state.cc src/core/lib/transport/error_utils.cc src/core/lib/transport/interception_chain.cc @@ -3280,6 +3282,7 @@ add_library(grpc_unsecure src/core/lib/transport/call_filters.cc src/core/lib/transport/call_final_info.cc src/core/lib/transport/call_spine.cc + src/core/lib/transport/call_state.cc src/core/lib/transport/connectivity_state.cc src/core/lib/transport/error_utils.cc src/core/lib/transport/interception_chain.cc @@ -5403,6 +5406,7 @@ add_library(grpc_authorization_provider src/core/lib/transport/call_filters.cc src/core/lib/transport/call_final_info.cc src/core/lib/transport/call_spine.cc + src/core/lib/transport/call_state.cc src/core/lib/transport/connectivity_state.cc src/core/lib/transport/error_utils.cc src/core/lib/transport/interception_chain.cc @@ -8861,6 +8865,7 @@ add_executable(call_filters_test src/core/lib/surface/channel_stack_type.cc src/core/lib/transport/call_filters.cc src/core/lib/transport/call_final_info.cc + src/core/lib/transport/call_state.cc src/core/lib/transport/error_utils.cc src/core/lib/transport/message.cc src/core/lib/transport/metadata.cc @@ -9087,6 +9092,58 @@ endif() endif() if(gRPC_BUILD_TESTS) +add_executable(call_state_test + src/core/lib/debug/trace.cc + src/core/lib/debug/trace_flags.cc + src/core/lib/gprpp/dump_args.cc + src/core/lib/gprpp/glob.cc + src/core/lib/promise/activity.cc + src/core/lib/transport/call_state.cc + test/core/transport/call_state_test.cc +) +if(WIN32 AND MSVC) + if(BUILD_SHARED_LIBS) + target_compile_definitions(call_state_test + PRIVATE + "GPR_DLL_IMPORTS" + ) + endif() +endif() +target_compile_features(call_state_test PUBLIC cxx_std_14) +target_include_directories(call_state_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(call_state_test + ${_gRPC_ALLTARGETS_LIBRARIES} + gtest + absl::config + absl::flat_hash_map + absl::hash + absl::type_traits + absl::statusor + gpr +) + + +endif() +if(gRPC_BUILD_TESTS) + add_executable(call_tracer_test test/core/telemetry/call_tracer_test.cc test/core/test_util/fake_stats_plugin.cc @@ -9338,6 +9395,7 @@ add_executable(call_utils_test src/core/lib/transport/call_filters.cc src/core/lib/transport/call_final_info.cc src/core/lib/transport/call_spine.cc + src/core/lib/transport/call_state.cc src/core/lib/transport/connectivity_state.cc src/core/lib/transport/error_utils.cc src/core/lib/transport/interception_chain.cc @@ -18808,6 +18866,7 @@ add_executable(interception_chain_test src/core/lib/transport/call_filters.cc src/core/lib/transport/call_final_info.cc src/core/lib/transport/call_spine.cc + src/core/lib/transport/call_state.cc src/core/lib/transport/connectivity_state.cc src/core/lib/transport/error_utils.cc src/core/lib/transport/interception_chain.cc diff --git a/Makefile b/Makefile index 652e940ec9e..6630151de34 100644 --- a/Makefile +++ b/Makefile @@ -1343,6 +1343,7 @@ LIBGRPC_SRC = \ src/core/lib/transport/call_filters.cc \ src/core/lib/transport/call_final_info.cc \ src/core/lib/transport/call_spine.cc \ + src/core/lib/transport/call_state.cc \ src/core/lib/transport/connectivity_state.cc \ src/core/lib/transport/error_utils.cc \ src/core/lib/transport/interception_chain.cc \ diff --git a/Package.swift b/Package.swift index bbdc95a5037..0c6faa4b0c8 100644 --- a/Package.swift +++ b/Package.swift @@ -1694,6 +1694,8 @@ let package = Package( "src/core/lib/transport/call_final_info.h", "src/core/lib/transport/call_spine.cc", "src/core/lib/transport/call_spine.h", + "src/core/lib/transport/call_state.cc", + "src/core/lib/transport/call_state.h", "src/core/lib/transport/connectivity_state.cc", "src/core/lib/transport/connectivity_state.h", "src/core/lib/transport/custom_metadata.h", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 967dabb71ac..c1fc350aacc 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -1099,6 +1099,7 @@ libs: - src/core/lib/transport/call_filters.h - src/core/lib/transport/call_final_info.h - src/core/lib/transport/call_spine.h + - src/core/lib/transport/call_state.h - src/core/lib/transport/connectivity_state.h - src/core/lib/transport/custom_metadata.h - src/core/lib/transport/error_utils.h @@ -1908,6 +1909,7 @@ libs: - src/core/lib/transport/call_filters.cc - src/core/lib/transport/call_final_info.cc - src/core/lib/transport/call_spine.cc + - src/core/lib/transport/call_state.cc - src/core/lib/transport/connectivity_state.cc - src/core/lib/transport/error_utils.cc - src/core/lib/transport/interception_chain.cc @@ -2604,6 +2606,7 @@ libs: - src/core/lib/transport/call_filters.h - src/core/lib/transport/call_final_info.h - src/core/lib/transport/call_spine.h + - src/core/lib/transport/call_state.h - src/core/lib/transport/connectivity_state.h - src/core/lib/transport/custom_metadata.h - src/core/lib/transport/error_utils.h @@ -3027,6 +3030,7 @@ libs: - src/core/lib/transport/call_filters.cc - src/core/lib/transport/call_final_info.cc - src/core/lib/transport/call_spine.cc + - src/core/lib/transport/call_state.cc - src/core/lib/transport/connectivity_state.cc - src/core/lib/transport/error_utils.cc - src/core/lib/transport/interception_chain.cc @@ -4701,6 +4705,7 @@ libs: - src/core/lib/transport/call_filters.h - src/core/lib/transport/call_final_info.h - src/core/lib/transport/call_spine.h + - src/core/lib/transport/call_state.h - src/core/lib/transport/connectivity_state.h - src/core/lib/transport/custom_metadata.h - src/core/lib/transport/error_utils.h @@ -5001,6 +5006,7 @@ libs: - src/core/lib/transport/call_filters.cc - src/core/lib/transport/call_final_info.cc - src/core/lib/transport/call_spine.cc + - src/core/lib/transport/call_state.cc - src/core/lib/transport/connectivity_state.cc - src/core/lib/transport/error_utils.cc - src/core/lib/transport/interception_chain.cc @@ -6584,6 +6590,7 @@ targets: - src/core/lib/surface/channel_stack_type.h - src/core/lib/transport/call_filters.h - src/core/lib/transport/call_final_info.h + - src/core/lib/transport/call_state.h - src/core/lib/transport/custom_metadata.h - src/core/lib/transport/error_utils.h - src/core/lib/transport/http2_errors.h @@ -6651,6 +6658,7 @@ targets: - src/core/lib/surface/channel_stack_type.cc - src/core/lib/transport/call_filters.cc - src/core/lib/transport/call_final_info.cc + - src/core/lib/transport/call_state.cc - src/core/lib/transport/error_utils.cc - src/core/lib/transport/message.cc - src/core/lib/transport/metadata.cc @@ -6782,6 +6790,48 @@ targets: - linux - posix uses_polling: false +- name: call_state_test + gtest: true + build: test + language: c++ + headers: + - src/core/lib/debug/trace.h + - src/core/lib/debug/trace_flags.h + - src/core/lib/debug/trace_impl.h + - src/core/lib/event_engine/event_engine_context.h + - src/core/lib/gprpp/atomic_utils.h + - src/core/lib/gprpp/down_cast.h + - src/core/lib/gprpp/dump_args.h + - src/core/lib/gprpp/glob.h + - src/core/lib/gprpp/orphanable.h + - src/core/lib/gprpp/ref_counted.h + - src/core/lib/gprpp/ref_counted_ptr.h + - src/core/lib/promise/activity.h + - src/core/lib/promise/context.h + - src/core/lib/promise/detail/promise_factory.h + - src/core/lib/promise/detail/promise_like.h + - src/core/lib/promise/detail/status.h + - src/core/lib/promise/poll.h + - src/core/lib/promise/status_flag.h + - src/core/lib/transport/call_state.h + - test/core/promise/poll_matcher.h + src: + - src/core/lib/debug/trace.cc + - src/core/lib/debug/trace_flags.cc + - src/core/lib/gprpp/dump_args.cc + - src/core/lib/gprpp/glob.cc + - src/core/lib/promise/activity.cc + - src/core/lib/transport/call_state.cc + - test/core/transport/call_state_test.cc + deps: + - gtest + - absl/base:config + - absl/container:flat_hash_map + - absl/hash:hash + - absl/meta:type_traits + - absl/status:statusor + - gpr + uses_polling: false - name: call_tracer_test gtest: true build: test @@ -7065,6 +7115,7 @@ targets: - src/core/lib/transport/call_filters.h - src/core/lib/transport/call_final_info.h - src/core/lib/transport/call_spine.h + - src/core/lib/transport/call_state.h - src/core/lib/transport/connectivity_state.h - src/core/lib/transport/custom_metadata.h - src/core/lib/transport/error_utils.h @@ -7333,6 +7384,7 @@ targets: - src/core/lib/transport/call_filters.cc - src/core/lib/transport/call_final_info.cc - src/core/lib/transport/call_spine.cc + - src/core/lib/transport/call_state.cc - src/core/lib/transport/connectivity_state.cc - src/core/lib/transport/error_utils.cc - src/core/lib/transport/interception_chain.cc @@ -12788,6 +12840,7 @@ targets: - src/core/lib/transport/call_filters.h - src/core/lib/transport/call_final_info.h - src/core/lib/transport/call_spine.h + - src/core/lib/transport/call_state.h - src/core/lib/transport/connectivity_state.h - src/core/lib/transport/custom_metadata.h - src/core/lib/transport/error_utils.h @@ -13057,6 +13110,7 @@ targets: - src/core/lib/transport/call_filters.cc - src/core/lib/transport/call_final_info.cc - src/core/lib/transport/call_spine.cc + - src/core/lib/transport/call_state.cc - src/core/lib/transport/connectivity_state.cc - src/core/lib/transport/error_utils.cc - src/core/lib/transport/interception_chain.cc diff --git a/config.m4 b/config.m4 index 6435dff6bcc..00e36a726b9 100644 --- a/config.m4 +++ b/config.m4 @@ -718,6 +718,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/transport/call_filters.cc \ src/core/lib/transport/call_final_info.cc \ src/core/lib/transport/call_spine.cc \ + src/core/lib/transport/call_state.cc \ src/core/lib/transport/connectivity_state.cc \ src/core/lib/transport/error_utils.cc \ src/core/lib/transport/interception_chain.cc \ diff --git a/config.w32 b/config.w32 index 1c871b2300b..56e9bbf62c1 100644 --- a/config.w32 +++ b/config.w32 @@ -683,6 +683,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\transport\\call_filters.cc " + "src\\core\\lib\\transport\\call_final_info.cc " + "src\\core\\lib\\transport\\call_spine.cc " + + "src\\core\\lib\\transport\\call_state.cc " + "src\\core\\lib\\transport\\connectivity_state.cc " + "src\\core\\lib\\transport\\error_utils.cc " + "src\\core\\lib\\transport\\interception_chain.cc " + diff --git a/doc/trace_flags.md b/doc/trace_flags.md index 65ddd405aa7..56ad7191781 100644 --- a/doc/trace_flags.md +++ b/doc/trace_flags.md @@ -90,6 +90,7 @@ accomplished by invoking `bazel build --config=dbg ` - auth_context_refcount - Auth context refcounting. - call_combiner - Call combiner state. - call_refcount - Refcount on call. + - call_state - Traces transitions through the call spine state machine. - closure - Legacy closure creation, scheduling, and completion. - combiner - Combiner lock state. - cq_refcount - Completion queue refcounting. diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 4901373eb91..bd21f0d9449 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -1202,6 +1202,7 @@ Pod::Spec.new do |s| 'src/core/lib/transport/call_filters.h', 'src/core/lib/transport/call_final_info.h', 'src/core/lib/transport/call_spine.h', + 'src/core/lib/transport/call_state.h', 'src/core/lib/transport/connectivity_state.h', 'src/core/lib/transport/custom_metadata.h', 'src/core/lib/transport/error_utils.h', @@ -2476,6 +2477,7 @@ Pod::Spec.new do |s| 'src/core/lib/transport/call_filters.h', 'src/core/lib/transport/call_final_info.h', 'src/core/lib/transport/call_spine.h', + 'src/core/lib/transport/call_state.h', 'src/core/lib/transport/connectivity_state.h', 'src/core/lib/transport/custom_metadata.h', 'src/core/lib/transport/error_utils.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index d812ea38794..0ec29d02d9a 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1809,6 +1809,8 @@ Pod::Spec.new do |s| 'src/core/lib/transport/call_final_info.h', 'src/core/lib/transport/call_spine.cc', 'src/core/lib/transport/call_spine.h', + 'src/core/lib/transport/call_state.cc', + 'src/core/lib/transport/call_state.h', 'src/core/lib/transport/connectivity_state.cc', 'src/core/lib/transport/connectivity_state.h', 'src/core/lib/transport/custom_metadata.h', @@ -3248,6 +3250,7 @@ Pod::Spec.new do |s| 'src/core/lib/transport/call_filters.h', 'src/core/lib/transport/call_final_info.h', 'src/core/lib/transport/call_spine.h', + 'src/core/lib/transport/call_state.h', 'src/core/lib/transport/connectivity_state.h', 'src/core/lib/transport/custom_metadata.h', 'src/core/lib/transport/error_utils.h', diff --git a/grpc.gemspec b/grpc.gemspec index 213e157d0eb..502209e72cc 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -1696,6 +1696,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/transport/call_final_info.h ) s.files += %w( src/core/lib/transport/call_spine.cc ) s.files += %w( src/core/lib/transport/call_spine.h ) + s.files += %w( src/core/lib/transport/call_state.cc ) + s.files += %w( src/core/lib/transport/call_state.h ) s.files += %w( src/core/lib/transport/connectivity_state.cc ) s.files += %w( src/core/lib/transport/connectivity_state.h ) s.files += %w( src/core/lib/transport/custom_metadata.h ) diff --git a/package.xml b/package.xml index 3fcb69ed981..5b27bb11256 100644 --- a/package.xml +++ b/package.xml @@ -1678,6 +1678,8 @@ + + diff --git a/src/core/BUILD b/src/core/BUILD index 04cc6d5458f..efebfbca3cb 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -7438,6 +7438,24 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "call_state", + srcs = [ + "lib/transport/call_state.cc", + ], + hdrs = [ + "lib/transport/call_state.h", + ], + external_deps = ["absl/types:optional"], + deps = [ + "activity", + "poll", + "status_flag", + "//:gpr", + "//:grpc_trace", + ], +) + grpc_cc_library( name = "call_filters", srcs = [ @@ -7449,6 +7467,7 @@ grpc_cc_library( external_deps = ["absl/log:check"], deps = [ "call_final_info", + "call_state", "dump_args", "if", "latch", diff --git a/src/core/lib/debug/trace_flags.cc b/src/core/lib/debug/trace_flags.cc index 05ef50af0e4..f3fa6c63314 100644 --- a/src/core/lib/debug/trace_flags.cc +++ b/src/core/lib/debug/trace_flags.cc @@ -26,6 +26,7 @@ namespace grpc_core { DebugOnlyTraceFlag auth_context_refcount_trace(false, "auth_context_refcount"); DebugOnlyTraceFlag call_combiner_trace(false, "call_combiner"); DebugOnlyTraceFlag call_refcount_trace(false, "call_refcount"); +DebugOnlyTraceFlag call_state_trace(false, "call_state"); DebugOnlyTraceFlag closure_trace(false, "closure"); DebugOnlyTraceFlag combiner_trace(false, "combiner"); DebugOnlyTraceFlag cq_refcount_trace(false, "cq_refcount"); @@ -229,6 +230,7 @@ const absl::flat_hash_map& GetAllTraceFlags() { {"auth_context_refcount", &auth_context_refcount_trace}, {"call_combiner", &call_combiner_trace}, {"call_refcount", &call_refcount_trace}, + {"call_state", &call_state_trace}, {"closure", &closure_trace}, {"combiner", &combiner_trace}, {"cq_refcount", &cq_refcount_trace}, diff --git a/src/core/lib/debug/trace_flags.h b/src/core/lib/debug/trace_flags.h index 65d59df5078..4aaf5e01111 100644 --- a/src/core/lib/debug/trace_flags.h +++ b/src/core/lib/debug/trace_flags.h @@ -26,6 +26,7 @@ namespace grpc_core { extern DebugOnlyTraceFlag auth_context_refcount_trace; extern DebugOnlyTraceFlag call_combiner_trace; extern DebugOnlyTraceFlag call_refcount_trace; +extern DebugOnlyTraceFlag call_state_trace; extern DebugOnlyTraceFlag closure_trace; extern DebugOnlyTraceFlag combiner_trace; extern DebugOnlyTraceFlag cq_refcount_trace; diff --git a/src/core/lib/debug/trace_flags.yaml b/src/core/lib/debug/trace_flags.yaml index 8260daf561c..247d830e81f 100644 --- a/src/core/lib/debug/trace_flags.yaml +++ b/src/core/lib/debug/trace_flags.yaml @@ -54,6 +54,10 @@ call_refcount: debug_only: true default: false description: Refcount on call. +call_state: + debug_only: true + default: false + description: Traces transitions through the call spine state machine. cares_address_sorting: default: false description: Operations of the c-ares based DNS resolver's address sorter. diff --git a/src/core/lib/transport/call_filters.cc b/src/core/lib/transport/call_filters.cc index 0e2cd212fd0..33d12287737 100644 --- a/src/core/lib/transport/call_filters.cc +++ b/src/core/lib/transport/call_filters.cc @@ -235,699 +235,4 @@ RefCountedPtr CallFilters::StackBuilder::Build() { return RefCountedPtr(new Stack(std::move(data_))); } -/////////////////////////////////////////////////////////////////////////////// -// CallState - -namespace filters_detail { - -CallState::CallState() - : client_to_server_pull_state_(ClientToServerPullState::kBegin), - client_to_server_push_state_(ClientToServerPushState::kIdle), - server_to_client_pull_state_(ServerToClientPullState::kUnstarted), - server_to_client_push_state_(ServerToClientPushState::kStart), - server_trailing_metadata_state_(ServerTrailingMetadataState::kNotPushed) { -} - -void CallState::Start() { - GRPC_TRACE_LOG(call, INFO) - << "[call_state] Start: " - << GRPC_DUMP_ARGS(this, server_to_client_pull_state_); - switch (server_to_client_pull_state_) { - case ServerToClientPullState::kUnstarted: - server_to_client_pull_state_ = ServerToClientPullState::kStarted; - server_to_client_pull_waiter_.Wake(); - break; - case ServerToClientPullState::kUnstartedReading: - server_to_client_pull_state_ = ServerToClientPullState::kStartedReading; - server_to_client_pull_waiter_.Wake(); - break; - case ServerToClientPullState::kStarted: - case ServerToClientPullState::kStartedReading: - case ServerToClientPullState::kProcessingServerInitialMetadata: - case ServerToClientPullState::kProcessingServerInitialMetadataReading: - case ServerToClientPullState::kIdle: - case ServerToClientPullState::kReading: - case ServerToClientPullState::kProcessingServerToClientMessage: - LOG(FATAL) << "Start called twice"; - case ServerToClientPullState::kProcessingServerTrailingMetadata: - case ServerToClientPullState::kTerminated: - break; - } -} - -void CallState::BeginPushClientToServerMessage() { - GRPC_TRACE_LOG(call, INFO) - << "[call_state] BeginPushClientToServerMessage: " - << GRPC_DUMP_ARGS(this, client_to_server_push_state_); - switch (client_to_server_push_state_) { - case ClientToServerPushState::kIdle: - client_to_server_push_state_ = ClientToServerPushState::kPushedMessage; - client_to_server_push_waiter_.Wake(); - break; - case ClientToServerPushState::kPushedMessage: - case ClientToServerPushState::kPushedMessageAndHalfClosed: - LOG(FATAL) << "PushClientToServerMessage called twice concurrently"; - break; - case ClientToServerPushState::kPushedHalfClose: - LOG(FATAL) << "PushClientToServerMessage called after half-close"; - break; - case ClientToServerPushState::kFinished: - break; - } -} - -Poll CallState::PollPushClientToServerMessage() { - GRPC_TRACE_LOG(call, INFO) - << "[call_state] PollPushClientToServerMessage: " - << GRPC_DUMP_ARGS(this, client_to_server_push_state_); - switch (client_to_server_push_state_) { - case ClientToServerPushState::kIdle: - case ClientToServerPushState::kPushedHalfClose: - return Success{}; - case ClientToServerPushState::kPushedMessage: - case ClientToServerPushState::kPushedMessageAndHalfClosed: - return client_to_server_push_waiter_.pending(); - case ClientToServerPushState::kFinished: - return Failure{}; - } - Crash("Unreachable"); -} - -void CallState::ClientToServerHalfClose() { - GRPC_TRACE_LOG(call, INFO) - << "[call_state] ClientToServerHalfClose: " - << GRPC_DUMP_ARGS(this, client_to_server_push_state_); - switch (client_to_server_push_state_) { - case ClientToServerPushState::kIdle: - client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose; - client_to_server_push_waiter_.Wake(); - break; - case ClientToServerPushState::kPushedMessage: - client_to_server_push_state_ = - ClientToServerPushState::kPushedMessageAndHalfClosed; - break; - case ClientToServerPushState::kPushedHalfClose: - case ClientToServerPushState::kPushedMessageAndHalfClosed: - LOG(FATAL) << "ClientToServerHalfClose called twice"; - break; - case ClientToServerPushState::kFinished: - break; - } -} - -void CallState::BeginPullClientInitialMetadata() { - GRPC_TRACE_LOG(call, INFO) - << "[call_state] BeginPullClientInitialMetadata: " - << GRPC_DUMP_ARGS(this, client_to_server_pull_state_); - switch (client_to_server_pull_state_) { - case ClientToServerPullState::kBegin: - client_to_server_pull_state_ = - ClientToServerPullState::kProcessingClientInitialMetadata; - break; - case ClientToServerPullState::kProcessingClientInitialMetadata: - case ClientToServerPullState::kIdle: - case ClientToServerPullState::kReading: - case ClientToServerPullState::kProcessingClientToServerMessage: - LOG(FATAL) << "BeginPullClientInitialMetadata called twice"; - break; - case ClientToServerPullState::kTerminated: - break; - } -} - -void CallState::FinishPullClientInitialMetadata() { - GRPC_TRACE_LOG(call, INFO) - << "[call_state] FinishPullClientInitialMetadata: " - << GRPC_DUMP_ARGS(this, client_to_server_pull_state_); - switch (client_to_server_pull_state_) { - case ClientToServerPullState::kBegin: - LOG(FATAL) << "FinishPullClientInitialMetadata called before Begin"; - break; - case ClientToServerPullState::kProcessingClientInitialMetadata: - client_to_server_pull_state_ = ClientToServerPullState::kIdle; - client_to_server_pull_waiter_.Wake(); - break; - case ClientToServerPullState::kIdle: - case ClientToServerPullState::kReading: - case ClientToServerPullState::kProcessingClientToServerMessage: - LOG(FATAL) << "Out of order FinishPullClientInitialMetadata"; - break; - case ClientToServerPullState::kTerminated: - break; - } -} - -Poll> CallState::PollPullClientToServerMessageAvailable() { - GRPC_TRACE_LOG(call, INFO) - << "[call_state] PollPullClientToServerMessageAvailable: " - << GRPC_DUMP_ARGS(this, client_to_server_pull_state_, - client_to_server_push_state_); - switch (client_to_server_pull_state_) { - case ClientToServerPullState::kBegin: - case ClientToServerPullState::kProcessingClientInitialMetadata: - return client_to_server_pull_waiter_.pending(); - case ClientToServerPullState::kIdle: - client_to_server_pull_state_ = ClientToServerPullState::kReading; - ABSL_FALLTHROUGH_INTENDED; - case ClientToServerPullState::kReading: - break; - case ClientToServerPullState::kProcessingClientToServerMessage: - LOG(FATAL) << "PollPullClientToServerMessageAvailable called while " - "processing a message"; - break; - case ClientToServerPullState::kTerminated: - return Failure{}; - } - DCHECK_EQ(client_to_server_pull_state_, ClientToServerPullState::kReading); - switch (client_to_server_push_state_) { - case ClientToServerPushState::kIdle: - return client_to_server_push_waiter_.pending(); - case ClientToServerPushState::kPushedMessage: - case ClientToServerPushState::kPushedMessageAndHalfClosed: - client_to_server_pull_state_ = - ClientToServerPullState::kProcessingClientToServerMessage; - return true; - case ClientToServerPushState::kPushedHalfClose: - return false; - case ClientToServerPushState::kFinished: - client_to_server_pull_state_ = ClientToServerPullState::kTerminated; - return Failure{}; - } - Crash("Unreachable"); -} - -void CallState::FinishPullClientToServerMessage() { - GRPC_TRACE_LOG(call, INFO) - << "[call_state] FinishPullClientToServerMessage: " - << GRPC_DUMP_ARGS(this, client_to_server_pull_state_, - client_to_server_push_state_); - switch (client_to_server_pull_state_) { - case ClientToServerPullState::kBegin: - case ClientToServerPullState::kProcessingClientInitialMetadata: - LOG(FATAL) << "FinishPullClientToServerMessage called before Begin"; - break; - case ClientToServerPullState::kIdle: - LOG(FATAL) << "FinishPullClientToServerMessage called twice"; - break; - case ClientToServerPullState::kReading: - LOG(FATAL) << "FinishPullClientToServerMessage called before " - "PollPullClientToServerMessageAvailable"; - break; - case ClientToServerPullState::kProcessingClientToServerMessage: - client_to_server_pull_state_ = ClientToServerPullState::kIdle; - client_to_server_pull_waiter_.Wake(); - break; - case ClientToServerPullState::kTerminated: - break; - } - switch (client_to_server_push_state_) { - case ClientToServerPushState::kPushedMessage: - client_to_server_push_state_ = ClientToServerPushState::kIdle; - client_to_server_push_waiter_.Wake(); - break; - case ClientToServerPushState::kIdle: - case ClientToServerPushState::kPushedHalfClose: - LOG(FATAL) << "FinishPullClientToServerMessage called without a message"; - break; - case ClientToServerPushState::kPushedMessageAndHalfClosed: - client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose; - client_to_server_push_waiter_.Wake(); - break; - case ClientToServerPushState::kFinished: - break; - } -} - -StatusFlag CallState::PushServerInitialMetadata() { - GRPC_TRACE_LOG(call, INFO) - << "[call_state] PushServerInitialMetadata: " - << GRPC_DUMP_ARGS(this, server_to_client_push_state_, - server_trailing_metadata_state_); - if (server_trailing_metadata_state_ != - ServerTrailingMetadataState::kNotPushed) { - return Failure{}; - } - CHECK_EQ(server_to_client_push_state_, ServerToClientPushState::kStart); - server_to_client_push_state_ = - ServerToClientPushState::kPushedServerInitialMetadata; - server_to_client_push_waiter_.Wake(); - return Success{}; -} - -void CallState::BeginPushServerToClientMessage() { - GRPC_TRACE_LOG(call, INFO) - << "[call_state] BeginPushServerToClientMessage: " - << GRPC_DUMP_ARGS(this, server_to_client_push_state_); - switch (server_to_client_push_state_) { - case ServerToClientPushState::kStart: - LOG(FATAL) << "BeginPushServerToClientMessage called before " - "PushServerInitialMetadata"; - break; - case ServerToClientPushState::kPushedServerInitialMetadata: - server_to_client_push_state_ = - ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage; - break; - case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: - case ServerToClientPushState::kPushedMessage: - LOG(FATAL) << "BeginPushServerToClientMessage called twice concurrently"; - break; - case ServerToClientPushState::kTrailersOnly: - // Will fail in poll. - break; - case ServerToClientPushState::kIdle: - server_to_client_push_state_ = ServerToClientPushState::kPushedMessage; - server_to_client_push_waiter_.Wake(); - break; - case ServerToClientPushState::kFinished: - break; - } -} - -Poll CallState::PollPushServerToClientMessage() { - GRPC_TRACE_LOG(call, INFO) - << "[call_state] PollPushServerToClientMessage: " - << GRPC_DUMP_ARGS(this, server_to_client_push_state_); - switch (server_to_client_push_state_) { - case ServerToClientPushState::kStart: - case ServerToClientPushState::kPushedServerInitialMetadata: - LOG(FATAL) << "PollPushServerToClientMessage called before " - << "PushServerInitialMetadata"; - case ServerToClientPushState::kTrailersOnly: - return false; - case ServerToClientPushState::kPushedMessage: - case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: - return server_to_client_push_waiter_.pending(); - case ServerToClientPushState::kIdle: - return Success{}; - case ServerToClientPushState::kFinished: - return Failure{}; - } - Crash("Unreachable"); -} - -bool CallState::PushServerTrailingMetadata(bool cancel) { - GRPC_TRACE_LOG(call, INFO) - << "[call_state] PushServerTrailingMetadata: " - << GRPC_DUMP_ARGS(this, cancel, server_trailing_metadata_state_, - server_to_client_push_state_, - client_to_server_push_state_, - server_trailing_metadata_waiter_.DebugString()); - if (server_trailing_metadata_state_ != - ServerTrailingMetadataState::kNotPushed) { - return false; - } - server_trailing_metadata_state_ = - cancel ? ServerTrailingMetadataState::kPushedCancel - : ServerTrailingMetadataState::kPushed; - server_trailing_metadata_waiter_.Wake(); - switch (server_to_client_push_state_) { - case ServerToClientPushState::kStart: - server_to_client_push_state_ = ServerToClientPushState::kTrailersOnly; - server_to_client_push_waiter_.Wake(); - break; - case ServerToClientPushState::kPushedServerInitialMetadata: - case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: - case ServerToClientPushState::kPushedMessage: - if (cancel) { - server_to_client_push_state_ = ServerToClientPushState::kFinished; - server_to_client_push_waiter_.Wake(); - } - break; - case ServerToClientPushState::kIdle: - if (cancel) { - server_to_client_push_state_ = ServerToClientPushState::kFinished; - server_to_client_push_waiter_.Wake(); - } - break; - case ServerToClientPushState::kFinished: - case ServerToClientPushState::kTrailersOnly: - break; - } - switch (client_to_server_push_state_) { - case ClientToServerPushState::kIdle: - client_to_server_push_state_ = ClientToServerPushState::kFinished; - client_to_server_push_waiter_.Wake(); - break; - case ClientToServerPushState::kPushedMessage: - case ClientToServerPushState::kPushedMessageAndHalfClosed: - client_to_server_push_state_ = ClientToServerPushState::kFinished; - client_to_server_push_waiter_.Wake(); - break; - case ClientToServerPushState::kPushedHalfClose: - case ClientToServerPushState::kFinished: - break; - } - return true; -} - -Poll CallState::PollPullServerInitialMetadataAvailable() { - GRPC_TRACE_LOG(call, INFO) - << "[call_state] PollPullServerInitialMetadataAvailable: " - << GRPC_DUMP_ARGS(this, server_to_client_pull_state_, - server_to_client_push_state_); - bool reading; - switch (server_to_client_pull_state_) { - case ServerToClientPullState::kUnstarted: - case ServerToClientPullState::kUnstartedReading: - if (server_to_client_push_state_ == - ServerToClientPushState::kTrailersOnly) { - server_to_client_pull_state_ = ServerToClientPullState::kTerminated; - return false; - } - server_to_client_push_waiter_.pending(); - return server_to_client_pull_waiter_.pending(); - case ServerToClientPullState::kStartedReading: - reading = true; - break; - case ServerToClientPullState::kStarted: - reading = false; - break; - case ServerToClientPullState::kProcessingServerInitialMetadata: - case ServerToClientPullState::kProcessingServerInitialMetadataReading: - case ServerToClientPullState::kIdle: - case ServerToClientPullState::kReading: - case ServerToClientPullState::kProcessingServerToClientMessage: - case ServerToClientPullState::kProcessingServerTrailingMetadata: - LOG(FATAL) << "PollPullServerInitialMetadataAvailable called twice"; - case ServerToClientPullState::kTerminated: - return false; - } - DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kStarted || - server_to_client_pull_state_ == - ServerToClientPullState::kStartedReading) - << server_to_client_pull_state_; - switch (server_to_client_push_state_) { - case ServerToClientPushState::kStart: - return server_to_client_push_waiter_.pending(); - case ServerToClientPushState::kPushedServerInitialMetadata: - case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: - server_to_client_pull_state_ = - reading - ? ServerToClientPullState::kProcessingServerInitialMetadataReading - : ServerToClientPullState::kProcessingServerInitialMetadata; - server_to_client_pull_waiter_.Wake(); - return true; - case ServerToClientPushState::kIdle: - case ServerToClientPushState::kPushedMessage: - LOG(FATAL) - << "PollPullServerInitialMetadataAvailable after metadata processed"; - case ServerToClientPushState::kFinished: - server_to_client_pull_state_ = ServerToClientPullState::kTerminated; - server_to_client_pull_waiter_.Wake(); - return false; - case ServerToClientPushState::kTrailersOnly: - return false; - } - Crash("Unreachable"); -} - -void CallState::FinishPullServerInitialMetadata() { - GRPC_TRACE_LOG(call, INFO) - << "[call_state] FinishPullServerInitialMetadata: " - << GRPC_DUMP_ARGS(this, server_to_client_pull_state_); - switch (server_to_client_pull_state_) { - case ServerToClientPullState::kUnstarted: - case ServerToClientPullState::kUnstartedReading: - LOG(FATAL) << "FinishPullServerInitialMetadata called before Start"; - case ServerToClientPullState::kStarted: - case ServerToClientPullState::kStartedReading: - CHECK_EQ(server_to_client_push_state_, - ServerToClientPushState::kTrailersOnly); - return; - case ServerToClientPullState::kProcessingServerInitialMetadata: - server_to_client_pull_state_ = ServerToClientPullState::kIdle; - server_to_client_pull_waiter_.Wake(); - break; - case ServerToClientPullState::kProcessingServerInitialMetadataReading: - server_to_client_pull_state_ = ServerToClientPullState::kReading; - server_to_client_pull_waiter_.Wake(); - break; - case ServerToClientPullState::kIdle: - case ServerToClientPullState::kReading: - case ServerToClientPullState::kProcessingServerToClientMessage: - case ServerToClientPullState::kProcessingServerTrailingMetadata: - LOG(FATAL) << "Out of order FinishPullServerInitialMetadata"; - case ServerToClientPullState::kTerminated: - return; - } - DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kIdle || - server_to_client_pull_state_ == ServerToClientPullState::kReading) - << server_to_client_pull_state_; - switch (server_to_client_push_state_) { - case ServerToClientPushState::kStart: - LOG(FATAL) << "FinishPullServerInitialMetadata called before initial " - "metadata consumed"; - case ServerToClientPushState::kPushedServerInitialMetadata: - server_to_client_push_state_ = ServerToClientPushState::kIdle; - server_to_client_push_waiter_.Wake(); - break; - case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: - server_to_client_push_state_ = ServerToClientPushState::kPushedMessage; - server_to_client_push_waiter_.Wake(); - break; - case ServerToClientPushState::kIdle: - case ServerToClientPushState::kPushedMessage: - case ServerToClientPushState::kTrailersOnly: - case ServerToClientPushState::kFinished: - LOG(FATAL) << "FinishPullServerInitialMetadata called twice"; - } -} - -Poll> CallState::PollPullServerToClientMessageAvailable() { - GRPC_TRACE_LOG(call, INFO) - << "[call_state] PollPullServerToClientMessageAvailable: " - << GRPC_DUMP_ARGS(this, server_to_client_pull_state_, - server_to_client_push_state_, - server_trailing_metadata_state_); - switch (server_to_client_pull_state_) { - case ServerToClientPullState::kUnstarted: - server_to_client_pull_state_ = ServerToClientPullState::kUnstartedReading; - return server_to_client_pull_waiter_.pending(); - case ServerToClientPullState::kProcessingServerInitialMetadata: - server_to_client_pull_state_ = - ServerToClientPullState::kProcessingServerInitialMetadataReading; - return server_to_client_pull_waiter_.pending(); - case ServerToClientPullState::kUnstartedReading: - case ServerToClientPullState::kProcessingServerInitialMetadataReading: - return server_to_client_pull_waiter_.pending(); - case ServerToClientPullState::kStarted: - server_to_client_pull_state_ = ServerToClientPullState::kStartedReading; - ABSL_FALLTHROUGH_INTENDED; - case ServerToClientPullState::kStartedReading: - if (server_to_client_push_state_ == - ServerToClientPushState::kTrailersOnly) { - return false; - } - return server_to_client_pull_waiter_.pending(); - case ServerToClientPullState::kIdle: - server_to_client_pull_state_ = ServerToClientPullState::kReading; - ABSL_FALLTHROUGH_INTENDED; - case ServerToClientPullState::kReading: - break; - case ServerToClientPullState::kProcessingServerToClientMessage: - LOG(FATAL) << "PollPullServerToClientMessageAvailable called while " - "processing a message"; - case ServerToClientPullState::kProcessingServerTrailingMetadata: - LOG(FATAL) << "PollPullServerToClientMessageAvailable called while " - "processing trailing metadata"; - case ServerToClientPullState::kTerminated: - return Failure{}; - } - DCHECK_EQ(server_to_client_pull_state_, ServerToClientPullState::kReading); - switch (server_to_client_push_state_) { - case ServerToClientPushState::kStart: - case ServerToClientPushState::kPushedServerInitialMetadata: - case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: - return server_to_client_push_waiter_.pending(); - case ServerToClientPushState::kIdle: - if (server_trailing_metadata_state_ != - ServerTrailingMetadataState::kNotPushed) { - return false; - } - server_trailing_metadata_waiter_.pending(); - return server_to_client_push_waiter_.pending(); - case ServerToClientPushState::kTrailersOnly: - DCHECK_NE(server_trailing_metadata_state_, - ServerTrailingMetadataState::kNotPushed); - return false; - case ServerToClientPushState::kPushedMessage: - server_to_client_pull_state_ = - ServerToClientPullState::kProcessingServerToClientMessage; - server_to_client_pull_waiter_.Wake(); - return true; - case ServerToClientPushState::kFinished: - server_to_client_pull_state_ = ServerToClientPullState::kTerminated; - server_to_client_pull_waiter_.Wake(); - return Failure{}; - } - Crash("Unreachable"); -} - -void CallState::FinishPullServerToClientMessage() { - GRPC_TRACE_LOG(call, INFO) - << "[call_state] FinishPullServerToClientMessage: " - << GRPC_DUMP_ARGS(this, server_to_client_pull_state_, - server_to_client_push_state_); - switch (server_to_client_pull_state_) { - case ServerToClientPullState::kUnstarted: - case ServerToClientPullState::kUnstartedReading: - case ServerToClientPullState::kStarted: - case ServerToClientPullState::kStartedReading: - case ServerToClientPullState::kProcessingServerInitialMetadata: - case ServerToClientPullState::kProcessingServerInitialMetadataReading: - LOG(FATAL) - << "FinishPullServerToClientMessage called before metadata available"; - case ServerToClientPullState::kIdle: - LOG(FATAL) << "FinishPullServerToClientMessage called twice"; - case ServerToClientPullState::kReading: - LOG(FATAL) << "FinishPullServerToClientMessage called before " - << "PollPullServerToClientMessageAvailable"; - case ServerToClientPullState::kProcessingServerToClientMessage: - server_to_client_pull_state_ = ServerToClientPullState::kIdle; - server_to_client_pull_waiter_.Wake(); - break; - case ServerToClientPullState::kProcessingServerTrailingMetadata: - LOG(FATAL) << "FinishPullServerToClientMessage called while processing " - "trailing metadata"; - case ServerToClientPullState::kTerminated: - break; - } - switch (server_to_client_push_state_) { - case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: - case ServerToClientPushState::kPushedServerInitialMetadata: - case ServerToClientPushState::kStart: - LOG(FATAL) << "FinishPullServerToClientMessage called before initial " - "metadata consumed"; - case ServerToClientPushState::kTrailersOnly: - LOG(FATAL) << "FinishPullServerToClientMessage called after " - "PushServerTrailingMetadata"; - case ServerToClientPushState::kPushedMessage: - server_to_client_push_state_ = ServerToClientPushState::kIdle; - server_to_client_push_waiter_.Wake(); - break; - case ServerToClientPushState::kIdle: - LOG(FATAL) << "FinishPullServerToClientMessage called without a message"; - case ServerToClientPushState::kFinished: - break; - } -} - -Poll CallState::PollServerTrailingMetadataAvailable() { - GRPC_TRACE_LOG(call, INFO) - << "[call_state] PollServerTrailingMetadataAvailable: " - << GRPC_DUMP_ARGS(this, server_to_client_pull_state_, - server_to_client_push_state_, - server_trailing_metadata_state_, - server_trailing_metadata_waiter_.DebugString()); - switch (server_to_client_pull_state_) { - case ServerToClientPullState::kProcessingServerInitialMetadata: - case ServerToClientPullState::kProcessingServerToClientMessage: - case ServerToClientPullState::kProcessingServerInitialMetadataReading: - case ServerToClientPullState::kUnstartedReading: - return server_to_client_pull_waiter_.pending(); - case ServerToClientPullState::kStartedReading: - case ServerToClientPullState::kReading: - switch (server_to_client_push_state_) { - case ServerToClientPushState::kTrailersOnly: - case ServerToClientPushState::kIdle: - case ServerToClientPushState::kStart: - case ServerToClientPushState::kFinished: - if (server_trailing_metadata_state_ != - ServerTrailingMetadataState::kNotPushed) { - server_to_client_pull_state_ = - ServerToClientPullState::kProcessingServerTrailingMetadata; - server_to_client_pull_waiter_.Wake(); - return Empty{}; - } - ABSL_FALLTHROUGH_INTENDED; - case ServerToClientPushState::kPushedServerInitialMetadata: - case ServerToClientPushState:: - kPushedServerInitialMetadataAndPushedMessage: - case ServerToClientPushState::kPushedMessage: - server_to_client_push_waiter_.pending(); - return server_to_client_pull_waiter_.pending(); - } - break; - case ServerToClientPullState::kStarted: - case ServerToClientPullState::kUnstarted: - case ServerToClientPullState::kIdle: - if (server_trailing_metadata_state_ != - ServerTrailingMetadataState::kNotPushed) { - server_to_client_pull_state_ = - ServerToClientPullState::kProcessingServerTrailingMetadata; - server_to_client_pull_waiter_.Wake(); - return Empty{}; - } - return server_trailing_metadata_waiter_.pending(); - case ServerToClientPullState::kProcessingServerTrailingMetadata: - LOG(FATAL) << "PollServerTrailingMetadataAvailable called twice"; - case ServerToClientPullState::kTerminated: - return Empty{}; - } - Crash("Unreachable"); -} - -void CallState::FinishPullServerTrailingMetadata() { - GRPC_TRACE_LOG(call, INFO) - << "[call_state] FinishPullServerTrailingMetadata: " - << GRPC_DUMP_ARGS(this, server_trailing_metadata_state_, - server_trailing_metadata_waiter_.DebugString()); - switch (server_trailing_metadata_state_) { - case ServerTrailingMetadataState::kNotPushed: - LOG(FATAL) << "FinishPullServerTrailingMetadata called before " - "PollServerTrailingMetadataAvailable"; - case ServerTrailingMetadataState::kPushed: - server_trailing_metadata_state_ = ServerTrailingMetadataState::kPulled; - server_trailing_metadata_waiter_.Wake(); - break; - case ServerTrailingMetadataState::kPushedCancel: - server_trailing_metadata_state_ = - ServerTrailingMetadataState::kPulledCancel; - server_trailing_metadata_waiter_.Wake(); - break; - case ServerTrailingMetadataState::kPulled: - case ServerTrailingMetadataState::kPulledCancel: - LOG(FATAL) << "FinishPullServerTrailingMetadata called twice"; - } -} - -Poll CallState::PollWasCancelled() { - GRPC_TRACE_LOG(call, INFO) - << "[call_state] PollWasCancelled: " - << GRPC_DUMP_ARGS(this, server_trailing_metadata_state_); - switch (server_trailing_metadata_state_) { - case ServerTrailingMetadataState::kNotPushed: - case ServerTrailingMetadataState::kPushed: - case ServerTrailingMetadataState::kPushedCancel: { - return server_trailing_metadata_waiter_.pending(); - } - case ServerTrailingMetadataState::kPulled: - return false; - case ServerTrailingMetadataState::kPulledCancel: - return true; - } - Crash("Unreachable"); -} - -std::string CallState::DebugString() const { - return absl::StrCat( - "client_to_server_pull_state:", client_to_server_pull_state_, - " client_to_server_push_state:", client_to_server_push_state_, - " server_to_client_pull_state:", server_to_client_pull_state_, - " server_to_client_message_push_state:", server_to_client_push_state_, - " server_trailing_metadata_state:", server_trailing_metadata_state_, - client_to_server_push_waiter_.DebugString(), - " server_to_client_push_waiter:", - server_to_client_push_waiter_.DebugString(), - " client_to_server_pull_waiter:", - client_to_server_pull_waiter_.DebugString(), - " server_to_client_pull_waiter:", - server_to_client_pull_waiter_.DebugString(), - " server_trailing_metadata_waiter:", - server_trailing_metadata_waiter_.DebugString()); -} - -static_assert(sizeof(CallState) <= 16, "CallState too large"); - -} // namespace filters_detail } // namespace grpc_core diff --git a/src/core/lib/transport/call_filters.h b/src/core/lib/transport/call_filters.h index 620c45d3c24..aefe130c58f 100644 --- a/src/core/lib/transport/call_filters.h +++ b/src/core/lib/transport/call_filters.h @@ -35,6 +35,7 @@ #include "src/core/lib/promise/status_flag.h" #include "src/core/lib/promise/try_seq.h" #include "src/core/lib/transport/call_final_info.h" +#include "src/core/lib/transport/call_state.h" #include "src/core/lib/transport/message.h" #include "src/core/lib/transport/metadata.h" @@ -1099,244 +1100,6 @@ class OperationExecutor { const Operator* end_ops_; }; -class CallState { - public: - CallState(); - // Start the call: allows pulls to proceed - void Start(); - // PUSH: client -> server - void BeginPushClientToServerMessage(); - Poll PollPushClientToServerMessage(); - void ClientToServerHalfClose(); - // PULL: client -> server - void BeginPullClientInitialMetadata(); - void FinishPullClientInitialMetadata(); - Poll> PollPullClientToServerMessageAvailable(); - void FinishPullClientToServerMessage(); - // PUSH: server -> client - StatusFlag PushServerInitialMetadata(); - void BeginPushServerToClientMessage(); - Poll PollPushServerToClientMessage(); - bool PushServerTrailingMetadata(bool cancel); - // PULL: server -> client - Poll PollPullServerInitialMetadataAvailable(); - void FinishPullServerInitialMetadata(); - Poll> PollPullServerToClientMessageAvailable(); - void FinishPullServerToClientMessage(); - Poll PollServerTrailingMetadataAvailable(); - void FinishPullServerTrailingMetadata(); - Poll PollWasCancelled(); - // Debug - std::string DebugString() const; - - friend std::ostream& operator<<(std::ostream& out, - const CallState& call_state) { - return out << call_state.DebugString(); - } - - private: - enum class ClientToServerPullState : uint16_t { - // Ready to read: client initial metadata is there, but not yet processed - kBegin, - // Processing client initial metadata - kProcessingClientInitialMetadata, - // Main call loop: not reading - kIdle, - // Main call loop: reading but no message available - kReading, - // Main call loop: processing one message - kProcessingClientToServerMessage, - // Processing complete - kTerminated, - }; - static const char* ClientToServerPullStateString( - ClientToServerPullState state) { - switch (state) { - case ClientToServerPullState::kBegin: - return "Begin"; - case ClientToServerPullState::kProcessingClientInitialMetadata: - return "ProcessingClientInitialMetadata"; - case ClientToServerPullState::kIdle: - return "Idle"; - case ClientToServerPullState::kReading: - return "Reading"; - case ClientToServerPullState::kProcessingClientToServerMessage: - return "ProcessingClientToServerMessage"; - case ClientToServerPullState::kTerminated: - return "Terminated"; - } - } - template - friend void AbslStringify(Sink& out, ClientToServerPullState state) { - out.Append(ClientToServerPullStateString(state)); - } - friend std::ostream& operator<<(std::ostream& out, - ClientToServerPullState state) { - return out << ClientToServerPullStateString(state); - } - enum class ClientToServerPushState : uint16_t { - kIdle, - kPushedMessage, - kPushedHalfClose, - kPushedMessageAndHalfClosed, - kFinished, - }; - static const char* ClientToServerPushStateString( - ClientToServerPushState state) { - switch (state) { - case ClientToServerPushState::kIdle: - return "Idle"; - case ClientToServerPushState::kPushedMessage: - return "PushedMessage"; - case ClientToServerPushState::kPushedHalfClose: - return "PushedHalfClose"; - case ClientToServerPushState::kPushedMessageAndHalfClosed: - return "PushedMessageAndHalfClosed"; - case ClientToServerPushState::kFinished: - return "Finished"; - } - } - template - friend void AbslStringify(Sink& out, ClientToServerPushState state) { - out.Append(ClientToServerPushStateString(state)); - } - friend std::ostream& operator<<(std::ostream& out, - ClientToServerPushState state) { - return out << ClientToServerPushStateString(state); - } - enum class ServerToClientPullState : uint16_t { - // Not yet started: cannot read - kUnstarted, - kUnstartedReading, - kStarted, - kStartedReading, - // Processing server initial metadata - kProcessingServerInitialMetadata, - kProcessingServerInitialMetadataReading, - // Main call loop: not reading - kIdle, - // Main call loop: reading but no message available - kReading, - // Main call loop: processing one message - kProcessingServerToClientMessage, - // Processing server trailing metadata - kProcessingServerTrailingMetadata, - kTerminated, - }; - static const char* ServerToClientPullStateString( - ServerToClientPullState state) { - switch (state) { - case ServerToClientPullState::kUnstarted: - return "Unstarted"; - case ServerToClientPullState::kUnstartedReading: - return "UnstartedReading"; - case ServerToClientPullState::kStarted: - return "Started"; - case ServerToClientPullState::kStartedReading: - return "StartedReading"; - case ServerToClientPullState::kProcessingServerInitialMetadata: - return "ProcessingServerInitialMetadata"; - case ServerToClientPullState::kProcessingServerInitialMetadataReading: - return "ProcessingServerInitialMetadataReading"; - case ServerToClientPullState::kIdle: - return "Idle"; - case ServerToClientPullState::kReading: - return "Reading"; - case ServerToClientPullState::kProcessingServerToClientMessage: - return "ProcessingServerToClientMessage"; - case ServerToClientPullState::kProcessingServerTrailingMetadata: - return "ProcessingServerTrailingMetadata"; - case ServerToClientPullState::kTerminated: - return "Terminated"; - } - } - template - friend void AbslStringify(Sink& out, ServerToClientPullState state) { - out.Append(ServerToClientPullStateString(state)); - } - friend std::ostream& operator<<(std::ostream& out, - ServerToClientPullState state) { - return out << ServerToClientPullStateString(state); - } - enum class ServerToClientPushState : uint16_t { - kStart, - kPushedServerInitialMetadata, - kPushedServerInitialMetadataAndPushedMessage, - kTrailersOnly, - kIdle, - kPushedMessage, - kFinished, - }; - static const char* ServerToClientPushStateString( - ServerToClientPushState state) { - switch (state) { - case ServerToClientPushState::kStart: - return "Start"; - case ServerToClientPushState::kPushedServerInitialMetadata: - return "PushedServerInitialMetadata"; - case ServerToClientPushState:: - kPushedServerInitialMetadataAndPushedMessage: - return "PushedServerInitialMetadataAndPushedMessage"; - case ServerToClientPushState::kTrailersOnly: - return "TrailersOnly"; - case ServerToClientPushState::kIdle: - return "Idle"; - case ServerToClientPushState::kPushedMessage: - return "PushedMessage"; - case ServerToClientPushState::kFinished: - return "Finished"; - } - } - template - friend void AbslStringify(Sink& out, ServerToClientPushState state) { - out.Append(ServerToClientPushStateString(state)); - } - friend std::ostream& operator<<(std::ostream& out, - ServerToClientPushState state) { - return out << ServerToClientPushStateString(state); - } - enum class ServerTrailingMetadataState : uint16_t { - kNotPushed, - kPushed, - kPushedCancel, - kPulled, - kPulledCancel, - }; - static const char* ServerTrailingMetadataStateString( - ServerTrailingMetadataState state) { - switch (state) { - case ServerTrailingMetadataState::kNotPushed: - return "NotPushed"; - case ServerTrailingMetadataState::kPushed: - return "Pushed"; - case ServerTrailingMetadataState::kPushedCancel: - return "PushedCancel"; - case ServerTrailingMetadataState::kPulled: - return "Pulled"; - case ServerTrailingMetadataState::kPulledCancel: - return "PulledCancel"; - } - } - template - friend void AbslStringify(Sink& out, ServerTrailingMetadataState state) { - out.Append(ServerTrailingMetadataStateString(state)); - } - friend std::ostream& operator<<(std::ostream& out, - ServerTrailingMetadataState state) { - return out << ServerTrailingMetadataStateString(state); - } - ClientToServerPullState client_to_server_pull_state_ : 3; - ClientToServerPushState client_to_server_push_state_ : 3; - ServerToClientPullState server_to_client_pull_state_ : 4; - ServerToClientPushState server_to_client_push_state_ : 3; - ServerTrailingMetadataState server_trailing_metadata_state_ : 3; - IntraActivityWaiter client_to_server_pull_waiter_; - IntraActivityWaiter server_to_client_pull_waiter_; - IntraActivityWaiter client_to_server_push_waiter_; - IntraActivityWaiter server_to_client_push_waiter_; - IntraActivityWaiter server_trailing_metadata_waiter_; -}; - template class ServerTrailingMetadataInterceptor { public: @@ -1513,8 +1276,7 @@ class CallFilters { } private: - template + template Poll> FinishStep( Poll> p) { auto* r = p.value_if_ready(); @@ -1530,7 +1292,7 @@ class CallFilters { template (filters_detail::StackData::*layout), - void (filters_detail::CallState::*on_done)()> + void (CallState::*on_done)()> auto RunExecutor() { DCHECK_NE((this->*input_location).get(), nullptr); filters_detail::OperationExecutor executor; @@ -1549,11 +1311,10 @@ class CallFilters { // Returns a promise that resolves to ValueOrFailure GRPC_MUST_USE_RESULT auto PullClientInitialMetadata() { call_state_.BeginPullClientInitialMetadata(); - return RunExecutor< - ClientMetadataHandle, ClientMetadataHandle, - &CallFilters::push_client_initial_metadata_, - &filters_detail::StackData::client_initial_metadata, - &filters_detail::CallState::FinishPullClientInitialMetadata>(); + return RunExecutor(); } // Server: Push server initial metadata // Returns a promise that resolves to a StatusFlag indicating success @@ -1578,8 +1339,7 @@ class CallFilters { ServerMetadataHandle, &CallFilters::push_server_initial_metadata_, &filters_detail::StackData::server_initial_metadata, - &filters_detail::CallState:: - FinishPullServerInitialMetadata>(), + &CallState::FinishPullServerInitialMetadata>(), [](ValueOrFailure> r) { if (r.ok()) return std::move(*r); return absl::optional{}; @@ -1616,8 +1376,7 @@ class CallFilters { absl::optional, MessageHandle, &CallFilters::push_client_to_server_message_, &filters_detail::StackData::client_to_server_messages, - &filters_detail::CallState:: - FinishPullClientToServerMessage>(); + &CallState::FinishPullClientToServerMessage>(); }, []() -> ValueOrFailure> { return absl::optional(); @@ -1646,8 +1405,7 @@ class CallFilters { absl::optional, MessageHandle, &CallFilters::push_server_to_client_message_, &filters_detail::StackData::server_to_client_messages, - &filters_detail::CallState:: - FinishPullServerToClientMessage>(); + &CallState::FinishPullServerToClientMessage>(); }, []() -> ValueOrFailure> { return absl::optional(); @@ -1691,7 +1449,7 @@ class CallFilters { RefCountedPtr stack_; - filters_detail::CallState call_state_; + CallState call_state_; void* call_data_; ClientMetadataHandle push_client_initial_metadata_; diff --git a/src/core/lib/transport/call_state.cc b/src/core/lib/transport/call_state.cc new file mode 100644 index 00000000000..c52abb00b2e --- /dev/null +++ b/src/core/lib/transport/call_state.cc @@ -0,0 +1,39 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/lib/transport/call_state.h" + +namespace grpc_core { + +std::string CallState::DebugString() const { + return absl::StrCat( + "client_to_server_pull_state:", client_to_server_pull_state_, + " client_to_server_push_state:", client_to_server_push_state_, + " server_to_client_pull_state:", server_to_client_pull_state_, + " server_to_client_message_push_state:", server_to_client_push_state_, + " server_trailing_metadata_state:", server_trailing_metadata_state_, + client_to_server_push_waiter_.DebugString(), + " server_to_client_push_waiter:", + server_to_client_push_waiter_.DebugString(), + " client_to_server_pull_waiter:", + client_to_server_pull_waiter_.DebugString(), + " server_to_client_pull_waiter:", + server_to_client_pull_waiter_.DebugString(), + " server_trailing_metadata_waiter:", + server_trailing_metadata_waiter_.DebugString()); +} + +static_assert(sizeof(CallState) <= 16, "CallState too large"); + +} // namespace grpc_core diff --git a/src/core/lib/transport/call_state.h b/src/core/lib/transport/call_state.h new file mode 100644 index 00000000000..3e9acd99c16 --- /dev/null +++ b/src/core/lib/transport/call_state.h @@ -0,0 +1,957 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H +#define GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H + +#include "absl/types/optional.h" + +#include + +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/gprpp/crash.h" +#include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/poll.h" +#include "src/core/lib/promise/status_flag.h" + +namespace grpc_core { + +class CallState { + public: + CallState(); + // Start the call: allows pulls to proceed + void Start(); + // PUSH: client -> server + void BeginPushClientToServerMessage(); + Poll PollPushClientToServerMessage(); + void ClientToServerHalfClose(); + // PULL: client -> server + void BeginPullClientInitialMetadata(); + void FinishPullClientInitialMetadata(); + Poll> PollPullClientToServerMessageAvailable(); + void FinishPullClientToServerMessage(); + // PUSH: server -> client + StatusFlag PushServerInitialMetadata(); + void BeginPushServerToClientMessage(); + Poll PollPushServerToClientMessage(); + bool PushServerTrailingMetadata(bool cancel); + // PULL: server -> client + Poll PollPullServerInitialMetadataAvailable(); + void FinishPullServerInitialMetadata(); + Poll> PollPullServerToClientMessageAvailable(); + void FinishPullServerToClientMessage(); + Poll PollServerTrailingMetadataAvailable(); + void FinishPullServerTrailingMetadata(); + Poll PollWasCancelled(); + // Debug + std::string DebugString() const; + + friend std::ostream& operator<<(std::ostream& out, + const CallState& call_state) { + return out << call_state.DebugString(); + } + + private: + enum class ClientToServerPullState : uint16_t { + // Ready to read: client initial metadata is there, but not yet processed + kBegin, + // Processing client initial metadata + kProcessingClientInitialMetadata, + // Main call loop: not reading + kIdle, + // Main call loop: reading but no message available + kReading, + // Main call loop: processing one message + kProcessingClientToServerMessage, + // Processing complete + kTerminated, + }; + static const char* ClientToServerPullStateString( + ClientToServerPullState state) { + switch (state) { + case ClientToServerPullState::kBegin: + return "Begin"; + case ClientToServerPullState::kProcessingClientInitialMetadata: + return "ProcessingClientInitialMetadata"; + case ClientToServerPullState::kIdle: + return "Idle"; + case ClientToServerPullState::kReading: + return "Reading"; + case ClientToServerPullState::kProcessingClientToServerMessage: + return "ProcessingClientToServerMessage"; + case ClientToServerPullState::kTerminated: + return "Terminated"; + } + } + template + friend void AbslStringify(Sink& out, ClientToServerPullState state) { + out.Append(ClientToServerPullStateString(state)); + } + friend std::ostream& operator<<(std::ostream& out, + ClientToServerPullState state) { + return out << ClientToServerPullStateString(state); + } + enum class ClientToServerPushState : uint16_t { + kIdle, + kPushedMessage, + kPushedHalfClose, + kPushedMessageAndHalfClosed, + kFinished, + }; + static const char* ClientToServerPushStateString( + ClientToServerPushState state) { + switch (state) { + case ClientToServerPushState::kIdle: + return "Idle"; + case ClientToServerPushState::kPushedMessage: + return "PushedMessage"; + case ClientToServerPushState::kPushedHalfClose: + return "PushedHalfClose"; + case ClientToServerPushState::kPushedMessageAndHalfClosed: + return "PushedMessageAndHalfClosed"; + case ClientToServerPushState::kFinished: + return "Finished"; + } + } + template + friend void AbslStringify(Sink& out, ClientToServerPushState state) { + out.Append(ClientToServerPushStateString(state)); + } + friend std::ostream& operator<<(std::ostream& out, + ClientToServerPushState state) { + return out << ClientToServerPushStateString(state); + } + enum class ServerToClientPullState : uint16_t { + // Not yet started: cannot read + kUnstarted, + kUnstartedReading, + kStarted, + kStartedReading, + // Processing server initial metadata + kProcessingServerInitialMetadata, + kProcessingServerInitialMetadataReading, + // Main call loop: not reading + kIdle, + // Main call loop: reading but no message available + kReading, + // Main call loop: processing one message + kProcessingServerToClientMessage, + // Processing server trailing metadata + kProcessingServerTrailingMetadata, + kTerminated, + }; + static const char* ServerToClientPullStateString( + ServerToClientPullState state) { + switch (state) { + case ServerToClientPullState::kUnstarted: + return "Unstarted"; + case ServerToClientPullState::kUnstartedReading: + return "UnstartedReading"; + case ServerToClientPullState::kStarted: + return "Started"; + case ServerToClientPullState::kStartedReading: + return "StartedReading"; + case ServerToClientPullState::kProcessingServerInitialMetadata: + return "ProcessingServerInitialMetadata"; + case ServerToClientPullState::kProcessingServerInitialMetadataReading: + return "ProcessingServerInitialMetadataReading"; + case ServerToClientPullState::kIdle: + return "Idle"; + case ServerToClientPullState::kReading: + return "Reading"; + case ServerToClientPullState::kProcessingServerToClientMessage: + return "ProcessingServerToClientMessage"; + case ServerToClientPullState::kProcessingServerTrailingMetadata: + return "ProcessingServerTrailingMetadata"; + case ServerToClientPullState::kTerminated: + return "Terminated"; + } + } + template + friend void AbslStringify(Sink& out, ServerToClientPullState state) { + out.Append(ServerToClientPullStateString(state)); + } + friend std::ostream& operator<<(std::ostream& out, + ServerToClientPullState state) { + return out << ServerToClientPullStateString(state); + } + enum class ServerToClientPushState : uint16_t { + kStart, + kPushedServerInitialMetadata, + kPushedServerInitialMetadataAndPushedMessage, + kTrailersOnly, + kIdle, + kPushedMessage, + kFinished, + }; + static const char* ServerToClientPushStateString( + ServerToClientPushState state) { + switch (state) { + case ServerToClientPushState::kStart: + return "Start"; + case ServerToClientPushState::kPushedServerInitialMetadata: + return "PushedServerInitialMetadata"; + case ServerToClientPushState:: + kPushedServerInitialMetadataAndPushedMessage: + return "PushedServerInitialMetadataAndPushedMessage"; + case ServerToClientPushState::kTrailersOnly: + return "TrailersOnly"; + case ServerToClientPushState::kIdle: + return "Idle"; + case ServerToClientPushState::kPushedMessage: + return "PushedMessage"; + case ServerToClientPushState::kFinished: + return "Finished"; + } + } + template + friend void AbslStringify(Sink& out, ServerToClientPushState state) { + out.Append(ServerToClientPushStateString(state)); + } + friend std::ostream& operator<<(std::ostream& out, + ServerToClientPushState state) { + return out << ServerToClientPushStateString(state); + } + enum class ServerTrailingMetadataState : uint16_t { + kNotPushed, + kPushed, + kPushedCancel, + kPulled, + kPulledCancel, + }; + static const char* ServerTrailingMetadataStateString( + ServerTrailingMetadataState state) { + switch (state) { + case ServerTrailingMetadataState::kNotPushed: + return "NotPushed"; + case ServerTrailingMetadataState::kPushed: + return "Pushed"; + case ServerTrailingMetadataState::kPushedCancel: + return "PushedCancel"; + case ServerTrailingMetadataState::kPulled: + return "Pulled"; + case ServerTrailingMetadataState::kPulledCancel: + return "PulledCancel"; + } + } + template + friend void AbslStringify(Sink& out, ServerTrailingMetadataState state) { + out.Append(ServerTrailingMetadataStateString(state)); + } + friend std::ostream& operator<<(std::ostream& out, + ServerTrailingMetadataState state) { + return out << ServerTrailingMetadataStateString(state); + } + ClientToServerPullState client_to_server_pull_state_ : 3; + ClientToServerPushState client_to_server_push_state_ : 3; + ServerToClientPullState server_to_client_pull_state_ : 4; + ServerToClientPushState server_to_client_push_state_ : 3; + ServerTrailingMetadataState server_trailing_metadata_state_ : 3; + IntraActivityWaiter client_to_server_pull_waiter_; + IntraActivityWaiter server_to_client_pull_waiter_; + IntraActivityWaiter client_to_server_push_waiter_; + IntraActivityWaiter server_to_client_push_waiter_; + IntraActivityWaiter server_trailing_metadata_waiter_; +}; + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline CallState::CallState() + : client_to_server_pull_state_(ClientToServerPullState::kBegin), + client_to_server_push_state_(ClientToServerPushState::kIdle), + server_to_client_pull_state_(ServerToClientPullState::kUnstarted), + server_to_client_push_state_(ServerToClientPushState::kStart), + server_trailing_metadata_state_(ServerTrailingMetadataState::kNotPushed) { +} + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void CallState::Start() { + GRPC_TRACE_LOG(call_state, INFO) + << "[call_state] Start: " + << GRPC_DUMP_ARGS(this, server_to_client_pull_state_); + switch (server_to_client_pull_state_) { + case ServerToClientPullState::kUnstarted: + server_to_client_pull_state_ = ServerToClientPullState::kStarted; + server_to_client_pull_waiter_.Wake(); + break; + case ServerToClientPullState::kUnstartedReading: + server_to_client_pull_state_ = ServerToClientPullState::kStartedReading; + server_to_client_pull_waiter_.Wake(); + break; + case ServerToClientPullState::kStarted: + case ServerToClientPullState::kStartedReading: + case ServerToClientPullState::kProcessingServerInitialMetadata: + case ServerToClientPullState::kProcessingServerInitialMetadataReading: + case ServerToClientPullState::kIdle: + case ServerToClientPullState::kReading: + case ServerToClientPullState::kProcessingServerToClientMessage: + LOG(FATAL) << "Start called twice"; + case ServerToClientPullState::kProcessingServerTrailingMetadata: + case ServerToClientPullState::kTerminated: + break; + } +} + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void +CallState::BeginPushClientToServerMessage() { + GRPC_TRACE_LOG(call_state, INFO) + << "[call_state] BeginPushClientToServerMessage: " + << GRPC_DUMP_ARGS(this, client_to_server_push_state_); + switch (client_to_server_push_state_) { + case ClientToServerPushState::kIdle: + client_to_server_push_state_ = ClientToServerPushState::kPushedMessage; + client_to_server_push_waiter_.Wake(); + break; + case ClientToServerPushState::kPushedMessage: + case ClientToServerPushState::kPushedMessageAndHalfClosed: + LOG(FATAL) << "PushClientToServerMessage called twice concurrently"; + break; + case ClientToServerPushState::kPushedHalfClose: + LOG(FATAL) << "PushClientToServerMessage called after half-close"; + break; + case ClientToServerPushState::kFinished: + break; + } +} + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll +CallState::PollPushClientToServerMessage() { + GRPC_TRACE_LOG(call_state, INFO) + << "[call_state] PollPushClientToServerMessage: " + << GRPC_DUMP_ARGS(this, client_to_server_push_state_); + switch (client_to_server_push_state_) { + case ClientToServerPushState::kIdle: + case ClientToServerPushState::kPushedHalfClose: + return Success{}; + case ClientToServerPushState::kPushedMessage: + case ClientToServerPushState::kPushedMessageAndHalfClosed: + return client_to_server_push_waiter_.pending(); + case ClientToServerPushState::kFinished: + return Failure{}; + } + Crash("Unreachable"); +} + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void +CallState::ClientToServerHalfClose() { + GRPC_TRACE_LOG(call_state, INFO) + << "[call_state] ClientToServerHalfClose: " + << GRPC_DUMP_ARGS(this, client_to_server_push_state_); + switch (client_to_server_push_state_) { + case ClientToServerPushState::kIdle: + client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose; + client_to_server_push_waiter_.Wake(); + break; + case ClientToServerPushState::kPushedMessage: + client_to_server_push_state_ = + ClientToServerPushState::kPushedMessageAndHalfClosed; + break; + case ClientToServerPushState::kPushedHalfClose: + case ClientToServerPushState::kPushedMessageAndHalfClosed: + LOG(FATAL) << "ClientToServerHalfClose called twice"; + break; + case ClientToServerPushState::kFinished: + break; + } +} + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void +CallState::BeginPullClientInitialMetadata() { + GRPC_TRACE_LOG(call_state, INFO) + << "[call_state] BeginPullClientInitialMetadata: " + << GRPC_DUMP_ARGS(this, client_to_server_pull_state_); + switch (client_to_server_pull_state_) { + case ClientToServerPullState::kBegin: + client_to_server_pull_state_ = + ClientToServerPullState::kProcessingClientInitialMetadata; + break; + case ClientToServerPullState::kProcessingClientInitialMetadata: + case ClientToServerPullState::kIdle: + case ClientToServerPullState::kReading: + case ClientToServerPullState::kProcessingClientToServerMessage: + LOG(FATAL) << "BeginPullClientInitialMetadata called twice"; + break; + case ClientToServerPullState::kTerminated: + break; + } +} + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void +CallState::FinishPullClientInitialMetadata() { + GRPC_TRACE_LOG(call_state, INFO) + << "[call_state] FinishPullClientInitialMetadata: " + << GRPC_DUMP_ARGS(this, client_to_server_pull_state_); + switch (client_to_server_pull_state_) { + case ClientToServerPullState::kBegin: + LOG(FATAL) << "FinishPullClientInitialMetadata called before Begin"; + break; + case ClientToServerPullState::kProcessingClientInitialMetadata: + client_to_server_pull_state_ = ClientToServerPullState::kIdle; + client_to_server_pull_waiter_.Wake(); + break; + case ClientToServerPullState::kIdle: + case ClientToServerPullState::kReading: + case ClientToServerPullState::kProcessingClientToServerMessage: + LOG(FATAL) << "Out of order FinishPullClientInitialMetadata"; + break; + case ClientToServerPullState::kTerminated: + break; + } +} + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll> +CallState::PollPullClientToServerMessageAvailable() { + GRPC_TRACE_LOG(call_state, INFO) + << "[call_state] PollPullClientToServerMessageAvailable: " + << GRPC_DUMP_ARGS(this, client_to_server_pull_state_, + client_to_server_push_state_); + switch (client_to_server_pull_state_) { + case ClientToServerPullState::kBegin: + case ClientToServerPullState::kProcessingClientInitialMetadata: + return client_to_server_pull_waiter_.pending(); + case ClientToServerPullState::kIdle: + client_to_server_pull_state_ = ClientToServerPullState::kReading; + ABSL_FALLTHROUGH_INTENDED; + case ClientToServerPullState::kReading: + break; + case ClientToServerPullState::kProcessingClientToServerMessage: + LOG(FATAL) << "PollPullClientToServerMessageAvailable called while " + "processing a message"; + break; + case ClientToServerPullState::kTerminated: + return Failure{}; + } + DCHECK_EQ(client_to_server_pull_state_, ClientToServerPullState::kReading); + switch (client_to_server_push_state_) { + case ClientToServerPushState::kIdle: + return client_to_server_push_waiter_.pending(); + case ClientToServerPushState::kPushedMessage: + case ClientToServerPushState::kPushedMessageAndHalfClosed: + client_to_server_pull_state_ = + ClientToServerPullState::kProcessingClientToServerMessage; + return true; + case ClientToServerPushState::kPushedHalfClose: + return false; + case ClientToServerPushState::kFinished: + client_to_server_pull_state_ = ClientToServerPullState::kTerminated; + return Failure{}; + } + Crash("Unreachable"); +} + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void +CallState::FinishPullClientToServerMessage() { + GRPC_TRACE_LOG(call_state, INFO) + << "[call_state] FinishPullClientToServerMessage: " + << GRPC_DUMP_ARGS(this, client_to_server_pull_state_, + client_to_server_push_state_); + switch (client_to_server_pull_state_) { + case ClientToServerPullState::kBegin: + case ClientToServerPullState::kProcessingClientInitialMetadata: + LOG(FATAL) << "FinishPullClientToServerMessage called before Begin"; + break; + case ClientToServerPullState::kIdle: + LOG(FATAL) << "FinishPullClientToServerMessage called twice"; + break; + case ClientToServerPullState::kReading: + LOG(FATAL) << "FinishPullClientToServerMessage called before " + "PollPullClientToServerMessageAvailable"; + break; + case ClientToServerPullState::kProcessingClientToServerMessage: + client_to_server_pull_state_ = ClientToServerPullState::kIdle; + client_to_server_pull_waiter_.Wake(); + break; + case ClientToServerPullState::kTerminated: + break; + } + switch (client_to_server_push_state_) { + case ClientToServerPushState::kPushedMessage: + client_to_server_push_state_ = ClientToServerPushState::kIdle; + client_to_server_push_waiter_.Wake(); + break; + case ClientToServerPushState::kIdle: + case ClientToServerPushState::kPushedHalfClose: + LOG(FATAL) << "FinishPullClientToServerMessage called without a message"; + break; + case ClientToServerPushState::kPushedMessageAndHalfClosed: + client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose; + client_to_server_push_waiter_.Wake(); + break; + case ClientToServerPushState::kFinished: + break; + } +} + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline StatusFlag +CallState::PushServerInitialMetadata() { + GRPC_TRACE_LOG(call_state, INFO) + << "[call_state] PushServerInitialMetadata: " + << GRPC_DUMP_ARGS(this, server_to_client_push_state_, + server_trailing_metadata_state_); + if (server_trailing_metadata_state_ != + ServerTrailingMetadataState::kNotPushed) { + return Failure{}; + } + CHECK_EQ(server_to_client_push_state_, ServerToClientPushState::kStart); + server_to_client_push_state_ = + ServerToClientPushState::kPushedServerInitialMetadata; + server_to_client_push_waiter_.Wake(); + return Success{}; +} + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void +CallState::BeginPushServerToClientMessage() { + GRPC_TRACE_LOG(call_state, INFO) + << "[call_state] BeginPushServerToClientMessage: " + << GRPC_DUMP_ARGS(this, server_to_client_push_state_); + switch (server_to_client_push_state_) { + case ServerToClientPushState::kStart: + LOG(FATAL) << "BeginPushServerToClientMessage called before " + "PushServerInitialMetadata"; + break; + case ServerToClientPushState::kPushedServerInitialMetadata: + server_to_client_push_state_ = + ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage; + break; + case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: + case ServerToClientPushState::kPushedMessage: + LOG(FATAL) << "BeginPushServerToClientMessage called twice concurrently"; + break; + case ServerToClientPushState::kTrailersOnly: + // Will fail in poll. + break; + case ServerToClientPushState::kIdle: + server_to_client_push_state_ = ServerToClientPushState::kPushedMessage; + server_to_client_push_waiter_.Wake(); + break; + case ServerToClientPushState::kFinished: + break; + } +} + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll +CallState::PollPushServerToClientMessage() { + GRPC_TRACE_LOG(call_state, INFO) + << "[call_state] PollPushServerToClientMessage: " + << GRPC_DUMP_ARGS(this, server_to_client_push_state_); + switch (server_to_client_push_state_) { + case ServerToClientPushState::kStart: + case ServerToClientPushState::kPushedServerInitialMetadata: + LOG(FATAL) << "PollPushServerToClientMessage called before " + << "PushServerInitialMetadata"; + case ServerToClientPushState::kTrailersOnly: + return false; + case ServerToClientPushState::kPushedMessage: + case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: + return server_to_client_push_waiter_.pending(); + case ServerToClientPushState::kIdle: + return Success{}; + case ServerToClientPushState::kFinished: + return Failure{}; + } + Crash("Unreachable"); +} + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline bool +CallState::PushServerTrailingMetadata(bool cancel) { + GRPC_TRACE_LOG(call_state, INFO) + << "[call_state] PushServerTrailingMetadata: " + << GRPC_DUMP_ARGS(this, cancel, server_trailing_metadata_state_, + server_to_client_push_state_, + client_to_server_push_state_, + server_trailing_metadata_waiter_.DebugString()); + if (server_trailing_metadata_state_ != + ServerTrailingMetadataState::kNotPushed) { + return false; + } + server_trailing_metadata_state_ = + cancel ? ServerTrailingMetadataState::kPushedCancel + : ServerTrailingMetadataState::kPushed; + server_trailing_metadata_waiter_.Wake(); + switch (server_to_client_push_state_) { + case ServerToClientPushState::kStart: + server_to_client_push_state_ = ServerToClientPushState::kTrailersOnly; + server_to_client_push_waiter_.Wake(); + break; + case ServerToClientPushState::kPushedServerInitialMetadata: + case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: + case ServerToClientPushState::kPushedMessage: + if (cancel) { + server_to_client_push_state_ = ServerToClientPushState::kFinished; + server_to_client_push_waiter_.Wake(); + } + break; + case ServerToClientPushState::kIdle: + if (cancel) { + server_to_client_push_state_ = ServerToClientPushState::kFinished; + server_to_client_push_waiter_.Wake(); + } + break; + case ServerToClientPushState::kFinished: + case ServerToClientPushState::kTrailersOnly: + break; + } + switch (client_to_server_push_state_) { + case ClientToServerPushState::kIdle: + client_to_server_push_state_ = ClientToServerPushState::kFinished; + client_to_server_push_waiter_.Wake(); + break; + case ClientToServerPushState::kPushedMessage: + case ClientToServerPushState::kPushedMessageAndHalfClosed: + client_to_server_push_state_ = ClientToServerPushState::kFinished; + client_to_server_push_waiter_.Wake(); + break; + case ClientToServerPushState::kPushedHalfClose: + case ClientToServerPushState::kFinished: + break; + } + return true; +} + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll +CallState::PollPullServerInitialMetadataAvailable() { + GRPC_TRACE_LOG(call_state, INFO) + << "[call_state] PollPullServerInitialMetadataAvailable: " + << GRPC_DUMP_ARGS(this, server_to_client_pull_state_, + server_to_client_push_state_); + bool reading; + switch (server_to_client_pull_state_) { + case ServerToClientPullState::kUnstarted: + case ServerToClientPullState::kUnstartedReading: + if (server_to_client_push_state_ == + ServerToClientPushState::kTrailersOnly) { + server_to_client_pull_state_ = ServerToClientPullState::kTerminated; + return false; + } + server_to_client_push_waiter_.pending(); + return server_to_client_pull_waiter_.pending(); + case ServerToClientPullState::kStartedReading: + reading = true; + break; + case ServerToClientPullState::kStarted: + reading = false; + break; + case ServerToClientPullState::kProcessingServerInitialMetadata: + case ServerToClientPullState::kProcessingServerInitialMetadataReading: + case ServerToClientPullState::kIdle: + case ServerToClientPullState::kReading: + case ServerToClientPullState::kProcessingServerToClientMessage: + case ServerToClientPullState::kProcessingServerTrailingMetadata: + LOG(FATAL) << "PollPullServerInitialMetadataAvailable called twice"; + case ServerToClientPullState::kTerminated: + return false; + } + DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kStarted || + server_to_client_pull_state_ == + ServerToClientPullState::kStartedReading) + << server_to_client_pull_state_; + switch (server_to_client_push_state_) { + case ServerToClientPushState::kStart: + return server_to_client_push_waiter_.pending(); + case ServerToClientPushState::kPushedServerInitialMetadata: + case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: + server_to_client_pull_state_ = + reading + ? ServerToClientPullState::kProcessingServerInitialMetadataReading + : ServerToClientPullState::kProcessingServerInitialMetadata; + server_to_client_pull_waiter_.Wake(); + return true; + case ServerToClientPushState::kIdle: + case ServerToClientPushState::kPushedMessage: + LOG(FATAL) + << "PollPullServerInitialMetadataAvailable after metadata processed"; + case ServerToClientPushState::kFinished: + server_to_client_pull_state_ = ServerToClientPullState::kTerminated; + server_to_client_pull_waiter_.Wake(); + return false; + case ServerToClientPushState::kTrailersOnly: + return false; + } + Crash("Unreachable"); +} + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void +CallState::FinishPullServerInitialMetadata() { + GRPC_TRACE_LOG(call_state, INFO) + << "[call_state] FinishPullServerInitialMetadata: " + << GRPC_DUMP_ARGS(this, server_to_client_pull_state_); + switch (server_to_client_pull_state_) { + case ServerToClientPullState::kUnstarted: + case ServerToClientPullState::kUnstartedReading: + LOG(FATAL) << "FinishPullServerInitialMetadata called before Start"; + case ServerToClientPullState::kStarted: + case ServerToClientPullState::kStartedReading: + CHECK_EQ(server_to_client_push_state_, + ServerToClientPushState::kTrailersOnly); + return; + case ServerToClientPullState::kProcessingServerInitialMetadata: + server_to_client_pull_state_ = ServerToClientPullState::kIdle; + server_to_client_pull_waiter_.Wake(); + break; + case ServerToClientPullState::kProcessingServerInitialMetadataReading: + server_to_client_pull_state_ = ServerToClientPullState::kReading; + server_to_client_pull_waiter_.Wake(); + break; + case ServerToClientPullState::kIdle: + case ServerToClientPullState::kReading: + case ServerToClientPullState::kProcessingServerToClientMessage: + case ServerToClientPullState::kProcessingServerTrailingMetadata: + LOG(FATAL) << "Out of order FinishPullServerInitialMetadata"; + case ServerToClientPullState::kTerminated: + return; + } + DCHECK(server_to_client_pull_state_ == ServerToClientPullState::kIdle || + server_to_client_pull_state_ == ServerToClientPullState::kReading) + << server_to_client_pull_state_; + switch (server_to_client_push_state_) { + case ServerToClientPushState::kStart: + LOG(FATAL) << "FinishPullServerInitialMetadata called before initial " + "metadata consumed"; + case ServerToClientPushState::kPushedServerInitialMetadata: + server_to_client_push_state_ = ServerToClientPushState::kIdle; + server_to_client_push_waiter_.Wake(); + break; + case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: + server_to_client_push_state_ = ServerToClientPushState::kPushedMessage; + server_to_client_push_waiter_.Wake(); + break; + case ServerToClientPushState::kIdle: + case ServerToClientPushState::kPushedMessage: + case ServerToClientPushState::kTrailersOnly: + case ServerToClientPushState::kFinished: + LOG(FATAL) << "FinishPullServerInitialMetadata called twice"; + } +} + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll> +CallState::PollPullServerToClientMessageAvailable() { + GRPC_TRACE_LOG(call_state, INFO) + << "[call_state] PollPullServerToClientMessageAvailable: " + << GRPC_DUMP_ARGS(this, server_to_client_pull_state_, + server_to_client_push_state_, + server_trailing_metadata_state_); + switch (server_to_client_pull_state_) { + case ServerToClientPullState::kUnstarted: + server_to_client_pull_state_ = ServerToClientPullState::kUnstartedReading; + return server_to_client_pull_waiter_.pending(); + case ServerToClientPullState::kProcessingServerInitialMetadata: + server_to_client_pull_state_ = + ServerToClientPullState::kProcessingServerInitialMetadataReading; + return server_to_client_pull_waiter_.pending(); + case ServerToClientPullState::kUnstartedReading: + case ServerToClientPullState::kProcessingServerInitialMetadataReading: + return server_to_client_pull_waiter_.pending(); + case ServerToClientPullState::kStarted: + server_to_client_pull_state_ = ServerToClientPullState::kStartedReading; + ABSL_FALLTHROUGH_INTENDED; + case ServerToClientPullState::kStartedReading: + if (server_to_client_push_state_ == + ServerToClientPushState::kTrailersOnly) { + return false; + } + return server_to_client_pull_waiter_.pending(); + case ServerToClientPullState::kIdle: + server_to_client_pull_state_ = ServerToClientPullState::kReading; + ABSL_FALLTHROUGH_INTENDED; + case ServerToClientPullState::kReading: + break; + case ServerToClientPullState::kProcessingServerToClientMessage: + LOG(FATAL) << "PollPullServerToClientMessageAvailable called while " + "processing a message"; + case ServerToClientPullState::kProcessingServerTrailingMetadata: + LOG(FATAL) << "PollPullServerToClientMessageAvailable called while " + "processing trailing metadata"; + case ServerToClientPullState::kTerminated: + return Failure{}; + } + DCHECK_EQ(server_to_client_pull_state_, ServerToClientPullState::kReading); + switch (server_to_client_push_state_) { + case ServerToClientPushState::kStart: + case ServerToClientPushState::kPushedServerInitialMetadata: + case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: + return server_to_client_push_waiter_.pending(); + case ServerToClientPushState::kIdle: + if (server_trailing_metadata_state_ != + ServerTrailingMetadataState::kNotPushed) { + return false; + } + server_trailing_metadata_waiter_.pending(); + return server_to_client_push_waiter_.pending(); + case ServerToClientPushState::kTrailersOnly: + DCHECK_NE(server_trailing_metadata_state_, + ServerTrailingMetadataState::kNotPushed); + return false; + case ServerToClientPushState::kPushedMessage: + server_to_client_pull_state_ = + ServerToClientPullState::kProcessingServerToClientMessage; + server_to_client_pull_waiter_.Wake(); + return true; + case ServerToClientPushState::kFinished: + server_to_client_pull_state_ = ServerToClientPullState::kTerminated; + server_to_client_pull_waiter_.Wake(); + return Failure{}; + } + Crash("Unreachable"); +} + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void +CallState::FinishPullServerToClientMessage() { + GRPC_TRACE_LOG(call_state, INFO) + << "[call_state] FinishPullServerToClientMessage: " + << GRPC_DUMP_ARGS(this, server_to_client_pull_state_, + server_to_client_push_state_); + switch (server_to_client_pull_state_) { + case ServerToClientPullState::kUnstarted: + case ServerToClientPullState::kUnstartedReading: + case ServerToClientPullState::kStarted: + case ServerToClientPullState::kStartedReading: + case ServerToClientPullState::kProcessingServerInitialMetadata: + case ServerToClientPullState::kProcessingServerInitialMetadataReading: + LOG(FATAL) + << "FinishPullServerToClientMessage called before metadata available"; + case ServerToClientPullState::kIdle: + LOG(FATAL) << "FinishPullServerToClientMessage called twice"; + case ServerToClientPullState::kReading: + LOG(FATAL) << "FinishPullServerToClientMessage called before " + << "PollPullServerToClientMessageAvailable"; + case ServerToClientPullState::kProcessingServerToClientMessage: + server_to_client_pull_state_ = ServerToClientPullState::kIdle; + server_to_client_pull_waiter_.Wake(); + break; + case ServerToClientPullState::kProcessingServerTrailingMetadata: + LOG(FATAL) << "FinishPullServerToClientMessage called while processing " + "trailing metadata"; + case ServerToClientPullState::kTerminated: + break; + } + switch (server_to_client_push_state_) { + case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage: + case ServerToClientPushState::kPushedServerInitialMetadata: + case ServerToClientPushState::kStart: + LOG(FATAL) << "FinishPullServerToClientMessage called before initial " + "metadata consumed"; + case ServerToClientPushState::kTrailersOnly: + LOG(FATAL) << "FinishPullServerToClientMessage called after " + "PushServerTrailingMetadata"; + case ServerToClientPushState::kPushedMessage: + server_to_client_push_state_ = ServerToClientPushState::kIdle; + server_to_client_push_waiter_.Wake(); + break; + case ServerToClientPushState::kIdle: + LOG(FATAL) << "FinishPullServerToClientMessage called without a message"; + case ServerToClientPushState::kFinished: + break; + } +} + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll +CallState::PollServerTrailingMetadataAvailable() { + GRPC_TRACE_LOG(call_state, INFO) + << "[call_state] PollServerTrailingMetadataAvailable: " + << GRPC_DUMP_ARGS(this, server_to_client_pull_state_, + server_to_client_push_state_, + server_trailing_metadata_state_, + server_trailing_metadata_waiter_.DebugString()); + switch (server_to_client_pull_state_) { + case ServerToClientPullState::kProcessingServerInitialMetadata: + case ServerToClientPullState::kProcessingServerToClientMessage: + case ServerToClientPullState::kProcessingServerInitialMetadataReading: + case ServerToClientPullState::kUnstartedReading: + return server_to_client_pull_waiter_.pending(); + case ServerToClientPullState::kStartedReading: + case ServerToClientPullState::kReading: + switch (server_to_client_push_state_) { + case ServerToClientPushState::kTrailersOnly: + case ServerToClientPushState::kIdle: + case ServerToClientPushState::kStart: + case ServerToClientPushState::kFinished: + if (server_trailing_metadata_state_ != + ServerTrailingMetadataState::kNotPushed) { + server_to_client_pull_state_ = + ServerToClientPullState::kProcessingServerTrailingMetadata; + server_to_client_pull_waiter_.Wake(); + return Empty{}; + } + ABSL_FALLTHROUGH_INTENDED; + case ServerToClientPushState::kPushedServerInitialMetadata: + case ServerToClientPushState:: + kPushedServerInitialMetadataAndPushedMessage: + case ServerToClientPushState::kPushedMessage: + server_to_client_push_waiter_.pending(); + return server_to_client_pull_waiter_.pending(); + } + break; + case ServerToClientPullState::kStarted: + case ServerToClientPullState::kUnstarted: + case ServerToClientPullState::kIdle: + if (server_trailing_metadata_state_ != + ServerTrailingMetadataState::kNotPushed) { + server_to_client_pull_state_ = + ServerToClientPullState::kProcessingServerTrailingMetadata; + server_to_client_pull_waiter_.Wake(); + return Empty{}; + } + return server_trailing_metadata_waiter_.pending(); + case ServerToClientPullState::kProcessingServerTrailingMetadata: + LOG(FATAL) << "PollServerTrailingMetadataAvailable called twice"; + case ServerToClientPullState::kTerminated: + return Empty{}; + } + Crash("Unreachable"); +} + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void +CallState::FinishPullServerTrailingMetadata() { + GRPC_TRACE_LOG(call_state, INFO) + << "[call_state] FinishPullServerTrailingMetadata: " + << GRPC_DUMP_ARGS(this, server_trailing_metadata_state_, + server_trailing_metadata_waiter_.DebugString()); + switch (server_trailing_metadata_state_) { + case ServerTrailingMetadataState::kNotPushed: + LOG(FATAL) << "FinishPullServerTrailingMetadata called before " + "PollServerTrailingMetadataAvailable"; + case ServerTrailingMetadataState::kPushed: + server_trailing_metadata_state_ = ServerTrailingMetadataState::kPulled; + server_trailing_metadata_waiter_.Wake(); + break; + case ServerTrailingMetadataState::kPushedCancel: + server_trailing_metadata_state_ = + ServerTrailingMetadataState::kPulledCancel; + server_trailing_metadata_waiter_.Wake(); + break; + case ServerTrailingMetadataState::kPulled: + case ServerTrailingMetadataState::kPulledCancel: + LOG(FATAL) << "FinishPullServerTrailingMetadata called twice"; + } +} + +GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll +CallState::PollWasCancelled() { + GRPC_TRACE_LOG(call_state, INFO) + << "[call_state] PollWasCancelled: " + << GRPC_DUMP_ARGS(this, server_trailing_metadata_state_); + switch (server_trailing_metadata_state_) { + case ServerTrailingMetadataState::kNotPushed: + case ServerTrailingMetadataState::kPushed: + case ServerTrailingMetadataState::kPushedCancel: { + return server_trailing_metadata_waiter_.pending(); + } + case ServerTrailingMetadataState::kPulled: + return false; + case ServerTrailingMetadataState::kPulledCancel: + return true; + } + Crash("Unreachable"); +} + +} // namespace grpc_core + +#endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_STATE_H diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 70c1bbfa63d..f25afdd0126 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -692,6 +692,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/transport/call_filters.cc', 'src/core/lib/transport/call_final_info.cc', 'src/core/lib/transport/call_spine.cc', + 'src/core/lib/transport/call_state.cc', 'src/core/lib/transport/connectivity_state.cc', 'src/core/lib/transport/error_utils.cc', 'src/core/lib/transport/interception_chain.cc', diff --git a/test/core/transport/BUILD b/test/core/transport/BUILD index bc2b65382b8..9a610419c91 100644 --- a/test/core/transport/BUILD +++ b/test/core/transport/BUILD @@ -87,6 +87,21 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "call_state_test", + srcs = ["call_state_test.cc"], + external_deps = [ + "gtest", + ], + language = "C++", + uses_event_engine = False, + uses_polling = False, + deps = [ + "//src/core:call_state", + "//test/core/promise:poll_matcher", + ], +) + grpc_cc_test( name = "connectivity_state_test", srcs = ["connectivity_state_test.cc"], diff --git a/test/core/transport/call_filters_test.cc b/test/core/transport/call_filters_test.cc index b443321dba9..ce2f8c81806 100644 --- a/test/core/transport/call_filters_test.cc +++ b/test/core/transport/call_filters_test.cc @@ -1102,246 +1102,6 @@ TEST(OperationExecutorTest, PromiseTwo) { gpr_free_aligned(call_data1); } -/////////////////////////////////////////////////////////////////////////////// -// CallState - -TEST(CallStateTest, NoOp) { CallState state; } - -TEST(CallStateTest, StartTwiceCrashes) { - CallState state; - state.Start(); - EXPECT_DEATH(state.Start(), ""); -} - -TEST(CallStateTest, PullServerInitialMetadataBlocksUntilStart) { - StrictMock activity; - activity.Activate(); - CallState state; - EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending()); - EXPECT_WAKEUP(activity, state.PushServerInitialMetadata()); - EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending()); - EXPECT_WAKEUP(activity, state.Start()); - EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady()); -} - -TEST(CallStateTest, PullClientInitialMetadata) { - StrictMock activity; - activity.Activate(); - CallState state; - EXPECT_DEATH(state.FinishPullClientInitialMetadata(), ""); - state.BeginPullClientInitialMetadata(); - state.FinishPullClientInitialMetadata(); -} - -TEST(CallStateTest, ClientToServerMessagesWaitForInitialMetadata) { - StrictMock activity; - activity.Activate(); - CallState state; - EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); - state.BeginPushClientToServerMessage(); - EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); - EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); - state.BeginPullClientInitialMetadata(); - EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); - EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); - EXPECT_WAKEUP(activity, state.FinishPullClientInitialMetadata()); - EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); - EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); - EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); - EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); - EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); -} - -TEST(CallStateTest, RepeatedClientToServerMessagesWithHalfClose) { - StrictMock activity; - activity.Activate(); - CallState state; - state.BeginPullClientInitialMetadata(); - state.FinishPullClientInitialMetadata(); - - // Message 0 - EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); - EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage()); - EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); - EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); - EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); - EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); - EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); - - // Message 1 - EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); - EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage()); - EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); - EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); - EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); - EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); - EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); - - // Message 2: push before polling - state.BeginPushClientToServerMessage(); - EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); - EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); - EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); - EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); - EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); - - // Message 3: push before polling and half close - state.BeginPushClientToServerMessage(); - state.ClientToServerHalfClose(); - EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); - EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); - EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); - EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); - EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); - - // ... and now we should see the half close - EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(false)); -} - -TEST(CallStateTest, ImmediateClientToServerHalfClose) { - StrictMock activity; - activity.Activate(); - CallState state; - state.BeginPullClientInitialMetadata(); - state.FinishPullClientInitialMetadata(); - state.ClientToServerHalfClose(); - EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(false)); -} - -TEST(CallStateTest, ServerToClientMessagesWaitForInitialMetadata) { - StrictMock activity; - activity.Activate(); - CallState state; - EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); - EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending()); - EXPECT_WAKEUP(activity, state.Start()); - EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); - EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending()); - EXPECT_WAKEUP(activity, state.PushServerInitialMetadata()); - state.BeginPushServerToClientMessage(); - EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); - EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); - EXPECT_WAKEUP(activity, - EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), - IsReady(true))); - EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); - EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); - EXPECT_WAKEUP(activity, state.FinishPullServerInitialMetadata()); - EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); - EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); - EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); - EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); -} - -TEST(CallStateTest, RepeatedServerToClientMessages) { - StrictMock activity; - activity.Activate(); - CallState state; - state.PushServerInitialMetadata(); - state.Start(); - EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(true)); - state.FinishPullServerInitialMetadata(); - - // Message 0 - EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); - EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage()); - EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); - EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); - EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); - EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); - EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); - - // Message 1 - EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); - EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage()); - EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); - EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); - EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); - EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); - EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); - - // Message 2: push before polling - state.BeginPushServerToClientMessage(); - EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); - EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); - EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); - EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); - EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); - - // Message 3: push before polling - state.BeginPushServerToClientMessage(); - EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); - EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); - EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); - EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); - EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); -} - -TEST(CallStateTest, ReceiveTrailersOnly) { - StrictMock activity; - activity.Activate(); - CallState state; - state.Start(); - state.PushServerTrailingMetadata(false); - EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false)); - state.FinishPullServerInitialMetadata(); - EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); - state.FinishPullServerTrailingMetadata(); -} - -TEST(CallStateTest, ReceiveTrailersOnlySkipsInitialMetadataOnUnstartedCalls) { - StrictMock activity; - activity.Activate(); - CallState state; - state.PushServerTrailingMetadata(false); - EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false)); - state.FinishPullServerInitialMetadata(); - EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); - state.FinishPullServerTrailingMetadata(); -} - -TEST(CallStateTest, RecallNoCancellation) { - StrictMock activity; - activity.Activate(); - CallState state; - state.Start(); - state.PushServerTrailingMetadata(false); - EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false)); - state.FinishPullServerInitialMetadata(); - EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); - EXPECT_THAT(state.PollWasCancelled(), IsPending()); - EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata()); - EXPECT_THAT(state.PollWasCancelled(), IsReady(false)); -} - -TEST(CallStateTest, RecallCancellation) { - StrictMock activity; - activity.Activate(); - CallState state; - state.Start(); - state.PushServerTrailingMetadata(true); - EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false)); - state.FinishPullServerInitialMetadata(); - EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); - EXPECT_THAT(state.PollWasCancelled(), IsPending()); - EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata()); - EXPECT_THAT(state.PollWasCancelled(), IsReady(true)); -} - -TEST(CallStateTest, ReceiveTrailingMetadataAfterMessageRead) { - StrictMock activity; - activity.Activate(); - CallState state; - state.Start(); - state.PushServerInitialMetadata(); - EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(true)); - state.FinishPullServerInitialMetadata(); - EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); - EXPECT_WAKEUP(activity, state.PushServerTrailingMetadata(false)); - EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(false)); - EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); -} - } // namespace filters_detail /////////////////////////////////////////////////////////////////////////////// diff --git a/test/core/transport/call_state_test.cc b/test/core/transport/call_state_test.cc new file mode 100644 index 00000000000..cc35b5b7ec5 --- /dev/null +++ b/test/core/transport/call_state_test.cc @@ -0,0 +1,310 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/lib/transport/call_state.h" + +#include + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include "test/core/promise/poll_matcher.h" + +using testing::Mock; +using testing::StrictMock; + +namespace grpc_core { + +namespace { + +// A mock activity that can be activated and deactivated. +class MockActivity : public Activity, public Wakeable { + public: + MOCK_METHOD(void, WakeupRequested, ()); + + void ForceImmediateRepoll(WakeupMask /*mask*/) override { WakeupRequested(); } + void Orphan() override {} + Waker MakeOwningWaker() override { return Waker(this, 0); } + Waker MakeNonOwningWaker() override { return Waker(this, 0); } + void Wakeup(WakeupMask /*mask*/) override { WakeupRequested(); } + void WakeupAsync(WakeupMask /*mask*/) override { WakeupRequested(); } + void Drop(WakeupMask /*mask*/) override {} + std::string DebugTag() const override { return "MockActivity"; } + std::string ActivityDebugTag(WakeupMask /*mask*/) const override { + return DebugTag(); + } + + void Activate() { + if (scoped_activity_ == nullptr) { + scoped_activity_ = std::make_unique(this); + } + } + + void Deactivate() { scoped_activity_.reset(); } + + private: + std::unique_ptr scoped_activity_; +}; + +#define EXPECT_WAKEUP(activity, statement) \ + EXPECT_CALL((activity), WakeupRequested()).Times(::testing::AtLeast(1)); \ + statement; \ + Mock::VerifyAndClearExpectations(&(activity)); + +} // namespace + +TEST(CallStateTest, NoOp) { CallState state; } + +TEST(CallStateTest, StartTwiceCrashes) { + CallState state; + state.Start(); + EXPECT_DEATH(state.Start(), ""); +} + +TEST(CallStateTest, PullServerInitialMetadataBlocksUntilStart) { + StrictMock activity; + activity.Activate(); + CallState state; + EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending()); + EXPECT_WAKEUP(activity, state.PushServerInitialMetadata()); + EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending()); + EXPECT_WAKEUP(activity, state.Start()); + EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady()); +} + +TEST(CallStateTest, PullClientInitialMetadata) { + StrictMock activity; + activity.Activate(); + CallState state; + EXPECT_DEATH(state.FinishPullClientInitialMetadata(), ""); + state.BeginPullClientInitialMetadata(); + state.FinishPullClientInitialMetadata(); +} + +TEST(CallStateTest, ClientToServerMessagesWaitForInitialMetadata) { + StrictMock activity; + activity.Activate(); + CallState state; + EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); + state.BeginPushClientToServerMessage(); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); + EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); + state.BeginPullClientInitialMetadata(); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); + EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); + EXPECT_WAKEUP(activity, state.FinishPullClientInitialMetadata()); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); + EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); + EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); +} + +TEST(CallStateTest, RepeatedClientToServerMessagesWithHalfClose) { + StrictMock activity; + activity.Activate(); + CallState state; + state.BeginPullClientInitialMetadata(); + state.FinishPullClientInitialMetadata(); + + // Message 0 + EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); + EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage()); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); + EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); + EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); + + // Message 1 + EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); + EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage()); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); + EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); + EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); + + // Message 2: push before polling + state.BeginPushClientToServerMessage(); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); + EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); + EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); + + // Message 3: push before polling and half close + state.BeginPushClientToServerMessage(); + state.ClientToServerHalfClose(); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); + EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(true)); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); + EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); + + // ... and now we should see the half close + EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(false)); +} + +TEST(CallStateTest, ImmediateClientToServerHalfClose) { + StrictMock activity; + activity.Activate(); + CallState state; + state.BeginPullClientInitialMetadata(); + state.FinishPullClientInitialMetadata(); + state.ClientToServerHalfClose(); + EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady(false)); +} + +TEST(CallStateTest, ServerToClientMessagesWaitForInitialMetadata) { + StrictMock activity; + activity.Activate(); + CallState state; + EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); + EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending()); + EXPECT_WAKEUP(activity, state.Start()); + EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); + EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsPending()); + EXPECT_WAKEUP(activity, state.PushServerInitialMetadata()); + state.BeginPushServerToClientMessage(); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); + EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); + EXPECT_WAKEUP(activity, + EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), + IsReady(true))); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); + EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); + EXPECT_WAKEUP(activity, state.FinishPullServerInitialMetadata()); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); + EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); + EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); +} + +TEST(CallStateTest, RepeatedServerToClientMessages) { + StrictMock activity; + activity.Activate(); + CallState state; + state.PushServerInitialMetadata(); + state.Start(); + EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(true)); + state.FinishPullServerInitialMetadata(); + + // Message 0 + EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); + EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage()); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); + EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); + EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); + + // Message 1 + EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); + EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage()); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); + EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); + EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); + + // Message 2: push before polling + state.BeginPushServerToClientMessage(); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); + EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); + EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); + + // Message 3: push before polling + state.BeginPushServerToClientMessage(); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); + EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(true)); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); + EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); +} + +TEST(CallStateTest, ReceiveTrailersOnly) { + StrictMock activity; + activity.Activate(); + CallState state; + state.Start(); + state.PushServerTrailingMetadata(false); + EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false)); + state.FinishPullServerInitialMetadata(); + EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); + state.FinishPullServerTrailingMetadata(); +} + +TEST(CallStateTest, ReceiveTrailersOnlySkipsInitialMetadataOnUnstartedCalls) { + StrictMock activity; + activity.Activate(); + CallState state; + state.PushServerTrailingMetadata(false); + EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false)); + state.FinishPullServerInitialMetadata(); + EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); + state.FinishPullServerTrailingMetadata(); +} + +TEST(CallStateTest, RecallNoCancellation) { + StrictMock activity; + activity.Activate(); + CallState state; + state.Start(); + state.PushServerTrailingMetadata(false); + EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false)); + state.FinishPullServerInitialMetadata(); + EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); + EXPECT_THAT(state.PollWasCancelled(), IsPending()); + EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata()); + EXPECT_THAT(state.PollWasCancelled(), IsReady(false)); +} + +TEST(CallStateTest, RecallCancellation) { + StrictMock activity; + activity.Activate(); + CallState state; + state.Start(); + state.PushServerTrailingMetadata(true); + EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false)); + state.FinishPullServerInitialMetadata(); + EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); + EXPECT_THAT(state.PollWasCancelled(), IsPending()); + EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata()); + EXPECT_THAT(state.PollWasCancelled(), IsReady(true)); +} + +TEST(CallStateTest, ReceiveTrailingMetadataAfterMessageRead) { + StrictMock activity; + activity.Activate(); + CallState state; + state.Start(); + state.PushServerInitialMetadata(); + EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(true)); + state.FinishPullServerInitialMetadata(); + EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); + EXPECT_WAKEUP(activity, state.PushServerTrailingMetadata(false)); + EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady(false)); + EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady()); +} + +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc_tracer_init(); + return RUN_ALL_TESTS(); +} diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 7065c3262d7..ab12d2dc09e 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -2699,6 +2699,8 @@ src/core/lib/transport/call_final_info.cc \ src/core/lib/transport/call_final_info.h \ src/core/lib/transport/call_spine.cc \ src/core/lib/transport/call_spine.h \ +src/core/lib/transport/call_state.cc \ +src/core/lib/transport/call_state.h \ src/core/lib/transport/connectivity_state.cc \ src/core/lib/transport/connectivity_state.h \ src/core/lib/transport/custom_metadata.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index de0d4e57c86..e08c9737443 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -2472,6 +2472,8 @@ src/core/lib/transport/call_final_info.cc \ src/core/lib/transport/call_final_info.h \ src/core/lib/transport/call_spine.cc \ src/core/lib/transport/call_spine.h \ +src/core/lib/transport/call_state.cc \ +src/core/lib/transport/call_state.h \ src/core/lib/transport/connectivity_state.cc \ src/core/lib/transport/connectivity_state.h \ src/core/lib/transport/custom_metadata.h \ diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index eea7a46ad85..8c02d4c2bfb 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -1525,6 +1525,30 @@ ], "uses_polling": false }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "call_state_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,