diff --git a/BUILD b/BUILD
index f412be759a6..53c84945f50 100644
--- a/BUILD
+++ b/BUILD
@@ -1871,6 +1871,7 @@ grpc_cc_library(
"//src/core:arena",
"//src/core:channel_args",
"//src/core:channel_stack_type",
+ "//src/core:direct_channel",
"//src/core:experiments",
"//src/core:iomgr_fwd",
"//src/core:ref_counted",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d14de5ef625..280ae4514d9 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -967,6 +967,9 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx call_filters_test)
add_dependencies(buildtests_cxx call_finalization_test)
add_dependencies(buildtests_cxx call_host_override_test)
+ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
+ add_dependencies(buildtests_cxx call_spine_test)
+ endif()
add_dependencies(buildtests_cxx call_tracer_test)
add_dependencies(buildtests_cxx call_utils_test)
add_dependencies(buildtests_cxx cancel_after_accept_test)
@@ -1853,6 +1856,7 @@ add_library(grpc
src/core/client_channel/client_channel_filter.cc
src/core/client_channel/client_channel_plugin.cc
src/core/client_channel/client_channel_service_config.cc
+ src/core/client_channel/direct_channel.cc
src/core/client_channel/dynamic_filters.cc
src/core/client_channel/global_subchannel_pool.cc
src/core/client_channel/load_balanced_call_destination.cc
@@ -2945,6 +2949,7 @@ add_library(grpc_unsecure
src/core/client_channel/client_channel_filter.cc
src/core/client_channel/client_channel_plugin.cc
src/core/client_channel/client_channel_service_config.cc
+ src/core/client_channel/direct_channel.cc
src/core/client_channel/dynamic_filters.cc
src/core/client_channel/global_subchannel_pool.cc
src/core/client_channel/load_balanced_call_destination.cc
@@ -6012,6 +6017,7 @@ if(gRPC_BUILD_TESTS)
add_executable(activity_test
src/core/lib/debug/trace.cc
src/core/lib/debug/trace_flags.cc
+ src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/promise/activity.cc
test/core/promise/activity_test.cc
@@ -8724,6 +8730,59 @@ target_link_libraries(call_host_override_test
)
+endif()
+if(gRPC_BUILD_TESTS)
+if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
+
+ add_executable(call_spine_test
+ ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
+ ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
+ ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
+ ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
+ test/core/call/yodel/test_main.cc
+ test/core/call/yodel/yodel_test.cc
+ test/core/event_engine/event_engine_test_utils.cc
+ test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
+ test/core/transport/call_spine_test.cc
+ )
+ if(WIN32 AND MSVC)
+ if(BUILD_SHARED_LIBS)
+ target_compile_definitions(call_spine_test
+ PRIVATE
+ "GPR_DLL_IMPORTS"
+ "GRPC_DLL_IMPORTS"
+ )
+ endif()
+ endif()
+ target_compile_features(call_spine_test PUBLIC cxx_std_14)
+ target_include_directories(call_spine_test
+ PRIVATE
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${CMAKE_CURRENT_SOURCE_DIR}/include
+ ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+ ${_gRPC_RE2_INCLUDE_DIR}
+ ${_gRPC_SSL_INCLUDE_DIR}
+ ${_gRPC_UPB_GENERATED_DIR}
+ ${_gRPC_UPB_GRPC_GENERATED_DIR}
+ ${_gRPC_UPB_INCLUDE_DIR}
+ ${_gRPC_XXHASH_INCLUDE_DIR}
+ ${_gRPC_ZLIB_INCLUDE_DIR}
+ third_party/googletest/googletest/include
+ third_party/googletest/googletest
+ third_party/googletest/googlemock/include
+ third_party/googletest/googlemock
+ ${_gRPC_PROTO_GENS_DIR}
+ )
+
+ target_link_libraries(call_spine_test
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ gtest
+ ${_gRPC_PROTOBUF_LIBRARIES}
+ grpc_test_util
+ )
+
+
+endif()
endif()
if(gRPC_BUILD_TESTS)
@@ -9438,6 +9497,7 @@ add_executable(cancel_callback_test
src/core/lib/debug/trace_flags.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
+ src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
@@ -10545,6 +10605,7 @@ add_executable(chunked_vector_test
src/core/lib/debug/trace_flags.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
+ src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
@@ -13853,6 +13914,7 @@ add_executable(exec_ctx_wakeup_scheduler_test
src/core/lib/debug/trace_flags.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
+ src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
@@ -14653,6 +14715,7 @@ add_executable(flow_control_test
src/core/lib/debug/trace_flags.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
+ src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
@@ -14739,6 +14802,7 @@ add_executable(for_each_test
src/core/lib/debug/trace_flags.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
+ src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
@@ -18186,6 +18250,7 @@ if(gRPC_BUILD_TESTS)
add_executable(inter_activity_pipe_test
src/core/lib/debug/trace.cc
src/core/lib/debug/trace_flags.cc
+ src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/promise/activity.cc
test/core/promise/inter_activity_pipe_test.cc
@@ -18539,6 +18604,7 @@ add_executable(interceptor_list_test
src/core/lib/debug/trace_flags.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
+ src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
@@ -19331,6 +19397,7 @@ if(gRPC_BUILD_TESTS)
add_executable(latch_test
src/core/lib/debug/trace.cc
src/core/lib/debug/trace_flags.cc
+ src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/promise/activity.cc
test/core/promise/latch_test.cc
@@ -19749,6 +19816,7 @@ add_executable(map_pipe_test
src/core/lib/debug/trace_flags.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
+ src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
@@ -20733,6 +20801,10 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(mpsc_test
+ src/core/lib/debug/trace.cc
+ src/core/lib/debug/trace_flags.cc
+ src/core/lib/gprpp/dump_args.cc
+ src/core/lib/gprpp/glob.cc
src/core/lib/promise/activity.cc
test/core/promise/mpsc_test.cc
)
@@ -20768,6 +20840,7 @@ target_link_libraries(mpsc_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
absl::config
+ absl::flat_hash_map
absl::hash
absl::type_traits
absl::statusor
@@ -21229,6 +21302,10 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(observable_test
+ src/core/lib/debug/trace.cc
+ src/core/lib/debug/trace_flags.cc
+ src/core/lib/gprpp/dump_args.cc
+ src/core/lib/gprpp/glob.cc
src/core/lib/promise/activity.cc
test/core/promise/observable_test.cc
)
@@ -21264,6 +21341,7 @@ target_link_libraries(observable_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
absl::config
+ absl::flat_hash_map
absl::hash
absl::type_traits
absl::statusor
@@ -23109,6 +23187,7 @@ if(gRPC_BUILD_TESTS)
add_executable(promise_mutex_test
src/core/lib/debug/trace.cc
src/core/lib/debug/trace_flags.cc
+ src/core/lib/gprpp/dump_args.cc
src/core/lib/gprpp/glob.cc
src/core/lib/promise/activity.cc
test/core/promise/promise_mutex_test.cc
@@ -32870,6 +32949,10 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(wait_for_callback_test
+ src/core/lib/debug/trace.cc
+ src/core/lib/debug/trace_flags.cc
+ src/core/lib/gprpp/dump_args.cc
+ src/core/lib/gprpp/glob.cc
src/core/lib/promise/activity.cc
test/core/promise/wait_for_callback_test.cc
)
@@ -32905,6 +32988,7 @@ target_link_libraries(wait_for_callback_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
absl::config
+ absl::flat_hash_map
absl::hash
absl::type_traits
absl::statusor
diff --git a/Makefile b/Makefile
index 6d05e7aeeda..d4e982f903c 100644
--- a/Makefile
+++ b/Makefile
@@ -675,6 +675,7 @@ LIBGRPC_SRC = \
src/core/client_channel/client_channel_filter.cc \
src/core/client_channel/client_channel_plugin.cc \
src/core/client_channel/client_channel_service_config.cc \
+ src/core/client_channel/direct_channel.cc \
src/core/client_channel/dynamic_filters.cc \
src/core/client_channel/global_subchannel_pool.cc \
src/core/client_channel/load_balanced_call_destination.cc \
diff --git a/Package.swift b/Package.swift
index 903fceaa748..367dc96f994 100644
--- a/Package.swift
+++ b/Package.swift
@@ -138,6 +138,8 @@ let package = Package(
"src/core/client_channel/client_channel_service_config.h",
"src/core/client_channel/config_selector.h",
"src/core/client_channel/connector.h",
+ "src/core/client_channel/direct_channel.cc",
+ "src/core/client_channel/direct_channel.h",
"src/core/client_channel/dynamic_filters.cc",
"src/core/client_channel/dynamic_filters.h",
"src/core/client_channel/global_subchannel_pool.cc",
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index 39a48a8a44d..f900554412c 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -234,6 +234,7 @@ libs:
- src/core/client_channel/client_channel_service_config.h
- src/core/client_channel/config_selector.h
- src/core/client_channel/connector.h
+ - src/core/client_channel/direct_channel.h
- src/core/client_channel/dynamic_filters.h
- src/core/client_channel/global_subchannel_pool.h
- src/core/client_channel/load_balanced_call_destination.h
@@ -1258,6 +1259,7 @@ libs:
- src/core/client_channel/client_channel_filter.cc
- src/core/client_channel/client_channel_plugin.cc
- src/core/client_channel/client_channel_service_config.cc
+ - src/core/client_channel/direct_channel.cc
- src/core/client_channel/dynamic_filters.cc
- src/core/client_channel/global_subchannel_pool.cc
- src/core/client_channel/load_balanced_call_destination.cc
@@ -2219,6 +2221,7 @@ libs:
- src/core/client_channel/client_channel_service_config.h
- src/core/client_channel/config_selector.h
- src/core/client_channel/connector.h
+ - src/core/client_channel/direct_channel.h
- src/core/client_channel/dynamic_filters.h
- src/core/client_channel/global_subchannel_pool.h
- src/core/client_channel/load_balanced_call_destination.h
@@ -2713,6 +2716,7 @@ libs:
- src/core/client_channel/client_channel_filter.cc
- src/core/client_channel/client_channel_plugin.cc
- src/core/client_channel/client_channel_service_config.cc
+ - src/core/client_channel/direct_channel.cc
- src/core/client_channel/dynamic_filters.cc
- src/core/client_channel/global_subchannel_pool.cc
- src/core/client_channel/load_balanced_call_destination.cc
@@ -5238,6 +5242,7 @@ targets:
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/down_cast.h
+ - src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
@@ -5260,6 +5265,7 @@ targets:
src:
- src/core/lib/debug/trace.cc
- src/core/lib/debug/trace_flags.cc
+ - src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/promise/activity.cc
- test/core/promise/activity_test.cc
@@ -6428,6 +6434,7 @@ targets:
- src/core/lib/promise/detail/seq_state.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
+ - src/core/lib/promise/if.h
- src/core/lib/promise/latch.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
@@ -6436,6 +6443,7 @@ targets:
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/promise/status_flag.h
+ - src/core/lib/promise/try_seq.h
- src/core/lib/resource_quota/arena.h
- src/core/lib/resource_quota/connection_quota.h
- src/core/lib/resource_quota/memory_quota.h
@@ -6627,6 +6635,29 @@ targets:
- grpc_authorization_provider
- grpc_unsecure
- grpc_test_util
+- name: call_spine_test
+ gtest: true
+ build: test
+ language: c++
+ headers:
+ - test/core/call/yodel/yodel_test.h
+ - test/core/event_engine/event_engine_test_utils.h
+ - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
+ src:
+ - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
+ - test/core/call/yodel/test_main.cc
+ - test/core/call/yodel/yodel_test.cc
+ - test/core/event_engine/event_engine_test_utils.cc
+ - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
+ - test/core/transport/call_spine_test.cc
+ deps:
+ - gtest
+ - protobuf
+ - grpc_test_util
+ platforms:
+ - linux
+ - posix
+ uses_polling: false
- name: call_tracer_test
gtest: true
build: test
@@ -7590,6 +7621,7 @@ targets:
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
+ - src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h
@@ -7655,6 +7687,7 @@ targets:
- src/core/lib/debug/trace_flags.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
+ - src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
@@ -8409,6 +8442,7 @@ targets:
- src/core/lib/gprpp/chunked_vector.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
+ - src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h
@@ -8473,6 +8507,7 @@ targets:
- src/core/lib/debug/trace_flags.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
+ - src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
@@ -9991,6 +10026,7 @@ targets:
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/down_cast.h
+ - src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h
@@ -10043,6 +10079,7 @@ targets:
- src/core/lib/debug/trace_flags.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
+ - src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
@@ -10550,6 +10587,7 @@ targets:
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
+ - src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h
@@ -10619,6 +10657,7 @@ targets:
- src/core/lib/debug/trace_flags.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
+ - src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
@@ -10682,6 +10721,7 @@ targets:
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
+ - src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h
@@ -10756,6 +10796,7 @@ targets:
- src/core/lib/debug/trace_flags.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
+ - src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
@@ -12321,6 +12362,7 @@ targets:
- src/core/lib/event_engine/event_engine_context.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/down_cast.h
+ - src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
@@ -12339,6 +12381,7 @@ targets:
src:
- src/core/lib/debug/trace.cc
- src/core/lib/debug/trace_flags.cc
+ - src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/promise/activity.cc
- test/core/promise/inter_activity_pipe_test.cc
@@ -12966,6 +13009,7 @@ targets:
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
+ - src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h
@@ -13032,6 +13076,7 @@ targets:
- src/core/lib/debug/trace_flags.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
+ - src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
@@ -13482,6 +13527,7 @@ targets:
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/down_cast.h
+ - src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
@@ -13503,6 +13549,7 @@ targets:
src:
- src/core/lib/debug/trace.cc
- src/core/lib/debug/trace_flags.cc
+ - src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/promise/activity.cc
- test/core/promise/latch_test.cc
@@ -13668,6 +13715,7 @@ targets:
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
+ - src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h
@@ -13742,6 +13790,7 @@ targets:
- src/core/lib/debug/trace_flags.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
+ - src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
@@ -14272,9 +14321,14 @@ targets:
build: test
language: c++
headers:
+ - src/core/lib/debug/trace.h
+ - src/core/lib/debug/trace_flags.h
+ - src/core/lib/debug/trace_impl.h
- src/core/lib/event_engine/event_engine_context.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/down_cast.h
+ - src/core/lib/gprpp/dump_args.h
+ - src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
@@ -14289,11 +14343,16 @@ targets:
- src/core/lib/promise/wait_set.h
- test/core/promise/poll_matcher.h
src:
+ - src/core/lib/debug/trace.cc
+ - src/core/lib/debug/trace_flags.cc
+ - src/core/lib/gprpp/dump_args.cc
+ - src/core/lib/gprpp/glob.cc
- src/core/lib/promise/activity.cc
- test/core/promise/mpsc_test.cc
deps:
- gtest
- absl/base:config
+ - absl/container:flat_hash_map
- absl/hash:hash
- absl/meta:type_traits
- absl/status:statusor
@@ -14571,9 +14630,14 @@ targets:
build: test
language: c++
headers:
+ - src/core/lib/debug/trace.h
+ - src/core/lib/debug/trace_flags.h
+ - src/core/lib/debug/trace_impl.h
- src/core/lib/event_engine/event_engine_context.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/down_cast.h
+ - src/core/lib/gprpp/dump_args.h
+ - src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/notification.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
@@ -14589,11 +14653,16 @@ targets:
- src/core/lib/promise/poll.h
- test/core/promise/poll_matcher.h
src:
+ - src/core/lib/debug/trace.cc
+ - src/core/lib/debug/trace_flags.cc
+ - src/core/lib/gprpp/dump_args.cc
+ - src/core/lib/gprpp/glob.cc
- src/core/lib/promise/activity.cc
- test/core/promise/observable_test.cc
deps:
- gtest
- absl/base:config
+ - absl/container:flat_hash_map
- absl/hash:hash
- absl/meta:type_traits
- absl/status:statusor
@@ -15503,6 +15572,7 @@ targets:
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/down_cast.h
+ - src/core/lib/gprpp/dump_args.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
@@ -15525,6 +15595,7 @@ targets:
src:
- src/core/lib/debug/trace.cc
- src/core/lib/debug/trace_flags.cc
+ - src/core/lib/gprpp/dump_args.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/promise/activity.cc
- test/core/promise/promise_mutex_test.cc
@@ -21279,9 +21350,14 @@ targets:
build: test
language: c++
headers:
+ - src/core/lib/debug/trace.h
+ - src/core/lib/debug/trace_flags.h
+ - src/core/lib/debug/trace_impl.h
- src/core/lib/event_engine/event_engine_context.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/down_cast.h
+ - src/core/lib/gprpp/dump_args.h
+ - src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/notification.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
@@ -21296,11 +21372,16 @@ targets:
- src/core/lib/promise/wait_for_callback.h
- test/core/promise/test_wakeup_schedulers.h
src:
+ - src/core/lib/debug/trace.cc
+ - src/core/lib/debug/trace_flags.cc
+ - src/core/lib/gprpp/dump_args.cc
+ - src/core/lib/gprpp/glob.cc
- src/core/lib/promise/activity.cc
- test/core/promise/wait_for_callback_test.cc
deps:
- gtest
- absl/base:config
+ - absl/container:flat_hash_map
- absl/hash:hash
- absl/meta:type_traits
- absl/status:statusor
diff --git a/config.m4 b/config.m4
index 1678bbe1fce..d3c633249d3 100644
--- a/config.m4
+++ b/config.m4
@@ -50,6 +50,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/client_channel/client_channel_filter.cc \
src/core/client_channel/client_channel_plugin.cc \
src/core/client_channel/client_channel_service_config.cc \
+ src/core/client_channel/direct_channel.cc \
src/core/client_channel/dynamic_filters.cc \
src/core/client_channel/global_subchannel_pool.cc \
src/core/client_channel/load_balanced_call_destination.cc \
diff --git a/config.w32 b/config.w32
index 030bffccc0b..ade25276f75 100644
--- a/config.w32
+++ b/config.w32
@@ -15,6 +15,7 @@ if (PHP_GRPC != "no") {
"src\\core\\client_channel\\client_channel_filter.cc " +
"src\\core\\client_channel\\client_channel_plugin.cc " +
"src\\core\\client_channel\\client_channel_service_config.cc " +
+ "src\\core\\client_channel\\direct_channel.cc " +
"src\\core\\client_channel\\dynamic_filters.cc " +
"src\\core\\client_channel\\global_subchannel_pool.cc " +
"src\\core\\client_channel\\load_balanced_call_destination.cc " +
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 25ded36b344..a22b5704b0d 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -280,6 +280,7 @@ Pod::Spec.new do |s|
'src/core/client_channel/client_channel_service_config.h',
'src/core/client_channel/config_selector.h',
'src/core/client_channel/connector.h',
+ 'src/core/client_channel/direct_channel.h',
'src/core/client_channel/dynamic_filters.h',
'src/core/client_channel/global_subchannel_pool.h',
'src/core/client_channel/load_balanced_call_destination.h',
@@ -1571,6 +1572,7 @@ Pod::Spec.new do |s|
'src/core/client_channel/client_channel_service_config.h',
'src/core/client_channel/config_selector.h',
'src/core/client_channel/connector.h',
+ 'src/core/client_channel/direct_channel.h',
'src/core/client_channel/dynamic_filters.h',
'src/core/client_channel/global_subchannel_pool.h',
'src/core/client_channel/load_balanced_call_destination.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index cd72fa90e8c..256ddb0ffde 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -257,6 +257,8 @@ Pod::Spec.new do |s|
'src/core/client_channel/client_channel_service_config.h',
'src/core/client_channel/config_selector.h',
'src/core/client_channel/connector.h',
+ 'src/core/client_channel/direct_channel.cc',
+ 'src/core/client_channel/direct_channel.h',
'src/core/client_channel/dynamic_filters.cc',
'src/core/client_channel/dynamic_filters.h',
'src/core/client_channel/global_subchannel_pool.cc',
@@ -2361,6 +2363,7 @@ Pod::Spec.new do |s|
'src/core/client_channel/client_channel_service_config.h',
'src/core/client_channel/config_selector.h',
'src/core/client_channel/connector.h',
+ 'src/core/client_channel/direct_channel.h',
'src/core/client_channel/dynamic_filters.h',
'src/core/client_channel/global_subchannel_pool.h',
'src/core/client_channel/load_balanced_call_destination.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 92494c37b7d..79fe97f48fd 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -144,6 +144,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/client_channel/client_channel_service_config.h )
s.files += %w( src/core/client_channel/config_selector.h )
s.files += %w( src/core/client_channel/connector.h )
+ s.files += %w( src/core/client_channel/direct_channel.cc )
+ s.files += %w( src/core/client_channel/direct_channel.h )
s.files += %w( src/core/client_channel/dynamic_filters.cc )
s.files += %w( src/core/client_channel/dynamic_filters.h )
s.files += %w( src/core/client_channel/global_subchannel_pool.cc )
diff --git a/package.xml b/package.xml
index f1aa91cd4c4..d86081bdb75 100644
--- a/package.xml
+++ b/package.xml
@@ -126,6 +126,8 @@
+
+
diff --git a/src/core/BUILD b/src/core/BUILD
index 1b40042d771..c3366364c39 100644
--- a/src/core/BUILD
+++ b/src/core/BUILD
@@ -955,12 +955,14 @@ grpc_cc_library(
"atomic_utils",
"construct_destruct",
"context",
+ "dump_args",
"event_engine_context",
"no_destruct",
"poll",
"promise_factory",
"promise_status",
"//:gpr",
+ "//:grpc_trace",
"//:orphanable",
],
)
@@ -7012,8 +7014,8 @@ grpc_cc_library(
"ext/transport/inproc/legacy_inproc_transport.h",
],
external_deps = [
+ "absl/log",
"absl/log:check",
- "absl/log:log",
"absl/status",
"absl/status:statusor",
"absl/strings",
@@ -7030,6 +7032,7 @@ grpc_cc_library(
"error",
"experiments",
"iomgr_fwd",
+ "metadata",
"metadata_batch",
"resource_quota",
"slice",
@@ -7448,12 +7451,15 @@ grpc_cc_library(
deps = [
"call_final_info",
"dump_args",
+ "if",
"latch",
"map",
"message",
"metadata",
"ref_counted",
+ "seq",
"status_flag",
+ "try_seq",
"//:gpr",
"//:promise",
"//:ref_counted_ptr",
@@ -7553,7 +7559,10 @@ grpc_cc_library(
hdrs = [
"lib/transport/call_spine.h",
],
- external_deps = ["absl/log:check"],
+ external_deps = [
+ "absl/functional:any_invocable",
+ "absl/log:check",
+ ],
deps = [
"1999",
"call_arena_allocator",
@@ -7574,6 +7583,24 @@ grpc_cc_library(
],
)
+grpc_cc_library(
+ name = "direct_channel",
+ srcs = [
+ "client_channel/direct_channel.cc",
+ ],
+ hdrs = [
+ "client_channel/direct_channel.h",
+ ],
+ deps = [
+ "channel_stack_type",
+ "interception_chain",
+ "//:channel",
+ "//:config",
+ "//:grpc_base",
+ "//:orphanable",
+ ],
+)
+
grpc_cc_library(
name = "metadata_batch",
srcs = [
diff --git a/src/core/client_channel/direct_channel.cc b/src/core/client_channel/direct_channel.cc
new file mode 100644
index 00000000000..d6d843ed7f3
--- /dev/null
+++ b/src/core/client_channel/direct_channel.cc
@@ -0,0 +1,80 @@
+// Copyright 2024 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "src/core/client_channel/direct_channel.h"
+
+#include "src/core/lib/config/core_configuration.h"
+#include "src/core/lib/gprpp/orphanable.h"
+#include "src/core/lib/surface/channel_stack_type.h"
+#include "src/core/lib/surface/client_call.h"
+#include "src/core/lib/transport/interception_chain.h"
+
+namespace grpc_core {
+
+absl::StatusOr> DirectChannel::Create(
+ std::string target, const ChannelArgs& args) {
+ auto* transport = args.GetObject();
+ if (transport == nullptr) {
+ return absl::InvalidArgumentError("Transport not set in ChannelArgs");
+ }
+ if (transport->client_transport() == nullptr) {
+ return absl::InvalidArgumentError("Transport is not a client transport");
+ }
+ auto transport_call_destination = MakeRefCounted(
+ OrphanablePtr(transport->client_transport()));
+ auto event_engine =
+ args.GetObjectRef();
+ if (event_engine == nullptr) {
+ return absl::InvalidArgumentError("EventEngine not set in ChannelArgs");
+ }
+ InterceptionChainBuilder builder(args);
+ CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder(
+ GRPC_CLIENT_DIRECT_CHANNEL, builder);
+ auto interception_chain = builder.Build(transport_call_destination);
+ if (!interception_chain.ok()) return interception_chain.status();
+ return MakeRefCounted(
+ std::move(target), args, std::move(event_engine),
+ std::move(transport_call_destination), std::move(*interception_chain));
+}
+
+void DirectChannel::Orphaned() {
+ transport_call_destination_.reset();
+ interception_chain_.reset();
+}
+
+void DirectChannel::StartCall(UnstartedCallHandler unstarted_handler) {
+ unstarted_handler.SpawnInfallible(
+ "start",
+ [interception_chain = interception_chain_, unstarted_handler]() mutable {
+ interception_chain->StartCall(std::move(unstarted_handler));
+ return []() { return Empty{}; };
+ });
+}
+
+void DirectChannel::GetInfo(const grpc_channel_info*) {
+ // TODO(roth): Implement this.
+}
+
+grpc_call* DirectChannel::CreateCall(
+ grpc_call* parent_call, uint32_t propagation_mask,
+ grpc_completion_queue* cq, grpc_pollset_set* /*pollset_set_alternative*/,
+ Slice path, absl::optional authority, Timestamp deadline,
+ bool /*registered_method*/) {
+ return MakeClientCall(parent_call, propagation_mask, cq, std::move(path),
+ std::move(authority), false, deadline,
+ compression_options(), event_engine_.get(),
+ call_arena_allocator()->MakeArena(), Ref());
+}
+
+} // namespace grpc_core
diff --git a/src/core/client_channel/direct_channel.h b/src/core/client_channel/direct_channel.h
new file mode 100644
index 00000000000..e7b0ca60949
--- /dev/null
+++ b/src/core/client_channel/direct_channel.h
@@ -0,0 +1,101 @@
+// Copyright 2024 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_DIRECT_CHANNEL_H
+#define GRPC_SRC_CORE_CLIENT_CHANNEL_DIRECT_CHANNEL_H
+
+#include
+
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/transport.h"
+
+namespace grpc_core {
+
+class DirectChannel final : public Channel {
+ public:
+ class TransportCallDestination final : public CallDestination {
+ public:
+ explicit TransportCallDestination(OrphanablePtr transport)
+ : transport_(std::move(transport)) {}
+
+ ClientTransport* transport() { return transport_.get(); }
+
+ void HandleCall(CallHandler handler) override {
+ transport_->StartCall(std::move(handler));
+ }
+
+ void Orphaned() override { transport_.reset(); }
+
+ private:
+ OrphanablePtr transport_;
+ };
+
+ static absl::StatusOr> Create(
+ std::string target, const ChannelArgs& args);
+
+ DirectChannel(
+ std::string target, const ChannelArgs& args,
+ std::shared_ptr
+ event_engine,
+ RefCountedPtr transport_call_destination,
+ RefCountedPtr interception_chain)
+ : Channel(std::move(target), args),
+ transport_call_destination_(std::move(transport_call_destination)),
+ interception_chain_(std::move(interception_chain)),
+ event_engine_(std::move(event_engine)) {}
+
+ void Orphaned() override;
+ void StartCall(UnstartedCallHandler unstarted_handler) override;
+ bool IsLame() const override { return false; }
+ grpc_call* CreateCall(grpc_call* parent_call, uint32_t propagation_mask,
+ grpc_completion_queue* cq,
+ grpc_pollset_set* pollset_set_alternative, Slice path,
+ absl::optional authority, Timestamp deadline,
+ bool registered_method) override;
+ grpc_event_engine::experimental::EventEngine* event_engine() const override {
+ return event_engine_.get();
+ }
+ bool SupportsConnectivityWatcher() const override { return false; }
+ grpc_connectivity_state CheckConnectivityState(bool) override {
+ Crash("CheckConnectivityState not supported");
+ }
+ void WatchConnectivityState(grpc_connectivity_state, Timestamp,
+ grpc_completion_queue*, void*) override {
+ Crash("WatchConnectivityState not supported");
+ }
+ void AddConnectivityWatcher(
+ grpc_connectivity_state,
+ OrphanablePtr) override {
+ Crash("AddConnectivityWatcher not supported");
+ }
+ void RemoveConnectivityWatcher(
+ AsyncConnectivityStateWatcherInterface*) override {
+ Crash("RemoveConnectivityWatcher not supported");
+ }
+ void GetInfo(const grpc_channel_info* channel_info) override;
+ void ResetConnectionBackoff() override {}
+ void Ping(grpc_completion_queue*, void*) override {
+ Crash("Ping not supported");
+ }
+
+ private:
+ RefCountedPtr transport_call_destination_;
+ RefCountedPtr interception_chain_;
+ const std::shared_ptr
+ event_engine_;
+};
+
+} // namespace grpc_core
+
+#endif // GRPC_SRC_CORE_CLIENT_CHANNEL_DIRECT_CHANNEL_H
diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc
index 5d68d93cf64..616ed7e587f 100644
--- a/src/core/ext/transport/chaotic_good/client_transport.cc
+++ b/src/core/ext/transport/chaotic_good/client_transport.cc
@@ -24,6 +24,7 @@
#include "absl/log/check.h"
#include "absl/random/bit_gen_ref.h"
#include "absl/random/random.h"
+#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include
@@ -102,11 +103,11 @@ absl::optional ChaoticGoodClientTransport::LookupStream(
auto ChaoticGoodClientTransport::PushFrameIntoCall(ServerFragmentFrame frame,
CallHandler call_handler) {
- auto& headers = frame.headers;
+ const bool has_headers = frame.headers != nullptr;
auto push = TrySeq(
If(
- headers != nullptr,
- [call_handler, &headers]() mutable {
+ has_headers,
+ [call_handler, headers = std::move(frame.headers)]() mutable {
return call_handler.PushServerInitialMetadata(std::move(headers));
},
[]() -> StatusFlag { return Success{}; }),
@@ -173,9 +174,7 @@ auto ChaoticGoodClientTransport::TransportReadLoop(
frame = std::move(frame)]() mutable {
return Map(call_handler.CancelIfFails(PushFrameIntoCall(
std::move(frame), call_handler)),
- [](StatusFlag f) {
- return StatusCast(f);
- });
+ [](StatusFlag) { return absl::OkStatus(); });
});
},
[&deserialize_status]() {
@@ -190,8 +189,13 @@ auto ChaoticGoodClientTransport::TransportReadLoop(
});
}
-auto ChaoticGoodClientTransport::OnTransportActivityDone() {
- return [self = RefAsSubclass()](absl::Status) {
+auto ChaoticGoodClientTransport::OnTransportActivityDone(
+ absl::string_view what) {
+ return [self = RefAsSubclass(),
+ what](absl::Status status) {
+ GRPC_TRACE_LOG(chaotic_good, INFO)
+ << "CHAOTIC_GOOD: Client transport " << self.get() << " closed (via "
+ << what << "): " << status;
self->AbortWithError();
};
}
@@ -211,11 +215,12 @@ ChaoticGoodClientTransport::ChaoticGoodClientTransport(
writer_ = MakeActivity(
// Continuously write next outgoing frames to promise endpoints.
TransportWriteLoop(transport), EventEngineWakeupScheduler(event_engine),
- OnTransportActivityDone());
+ OnTransportActivityDone("write_loop"));
reader_ = MakeActivity(
// Continuously read next incoming frames from promise endpoints.
TransportReadLoop(std::move(transport)),
- EventEngineWakeupScheduler(event_engine), OnTransportActivityDone());
+ EventEngineWakeupScheduler(event_engine),
+ OnTransportActivityDone("read_loop"));
}
ChaoticGoodClientTransport::~ChaoticGoodClientTransport() {
@@ -314,16 +319,20 @@ void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) {
call_handler.SpawnGuarded("outbound_loop", [this, call_handler]() mutable {
const uint32_t stream_id = MakeStream(call_handler);
return Map(CallOutboundLoop(stream_id, call_handler),
- [stream_id, this](absl::Status result) {
- if (GRPC_TRACE_FLAG_ENABLED(chaotic_good)) {
- gpr_log(GPR_INFO, "CHAOTIC_GOOD: Call %d finished with %s",
- stream_id, result.ToString().c_str());
- }
+ [stream_id, sender = outgoing_frames_.MakeSender()](
+ absl::Status result) mutable {
+ GRPC_TRACE_LOG(chaotic_good, INFO)
+ << "CHAOTIC_GOOD: Call " << stream_id << " finished with "
+ << result.ToString();
if (!result.ok()) {
+ GRPC_TRACE_LOG(chaotic_good, INFO)
+ << "CHAOTIC_GOOD: Send cancel";
CancelFrame frame;
frame.stream_id = stream_id;
- outgoing_frames_.MakeSender().UnbufferedImmediateSend(
- std::move(frame));
+ if (!sender.UnbufferedImmediateSend(std::move(frame))) {
+ GRPC_TRACE_LOG(chaotic_good, INFO)
+ << "CHAOTIC_GOOD: Send cancel failed";
+ }
}
return result;
});
diff --git a/src/core/ext/transport/chaotic_good/client_transport.h b/src/core/ext/transport/chaotic_good/client_transport.h
index de37b8d7952..47dacc23bc7 100644
--- a/src/core/ext/transport/chaotic_good/client_transport.h
+++ b/src/core/ext/transport/chaotic_good/client_transport.h
@@ -96,7 +96,7 @@ class ChaoticGoodClientTransport final : public ClientTransport {
uint32_t MakeStream(CallHandler call_handler);
absl::optional LookupStream(uint32_t stream_id);
auto CallOutboundLoop(uint32_t stream_id, CallHandler call_handler);
- auto OnTransportActivityDone();
+ auto OnTransportActivityDone(absl::string_view what);
auto TransportWriteLoop(RefCountedPtr transport);
auto TransportReadLoop(RefCountedPtr transport);
// Push one frame into a call
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index 5d4d13abab5..3de78e61cf2 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -15,8 +15,10 @@
#include "src/core/ext/transport/inproc/inproc_transport.h"
#include
+#include
#include "absl/log/check.h"
+#include "absl/status/status.h"
#include
#include
@@ -26,10 +28,12 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/crash.h"
+#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/surface/channel_create.h"
+#include "src/core/lib/transport/metadata.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/server/server.h"
@@ -56,12 +60,14 @@ class InprocServerTransport final : public ServerTransport {
state_.compare_exchange_strong(expect, ConnectionState::kReady,
std::memory_order_acq_rel,
std::memory_order_acquire);
- MutexLock lock(&state_tracker_mu_);
- state_tracker_.SetState(GRPC_CHANNEL_READY, absl::OkStatus(),
- "accept function set");
+ connected_state()->SetReady();
}
- void Orphan() override { Unref(); }
+ void Orphan() override {
+ GRPC_TRACE_LOG(inproc, INFO) << "InprocServerTransport::Orphan(): " << this;
+ Disconnect(absl::UnavailableError("Server transport closed"));
+ Unref();
+ }
FilterStackTransport* filter_stack_transport() override { return nullptr; }
ClientTransport* client_transport() override { return nullptr; }
@@ -73,26 +79,27 @@ class InprocServerTransport final : public ServerTransport {
gpr_log(GPR_INFO, "inproc server op: %s",
grpc_transport_op_string(op).c_str());
if (op->start_connectivity_watch != nullptr) {
- MutexLock lock(&state_tracker_mu_);
- state_tracker_.AddWatcher(op->start_connectivity_watch_state,
- std::move(op->start_connectivity_watch));
+ connected_state()->AddWatcher(op->start_connectivity_watch_state,
+ std::move(op->start_connectivity_watch));
}
if (op->stop_connectivity_watch != nullptr) {
- MutexLock lock(&state_tracker_mu_);
- state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
+ connected_state()->RemoveWatcher(op->stop_connectivity_watch);
}
if (op->set_accept_stream) {
Crash("set_accept_stream not supported on inproc transport");
}
+ ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
}
void Disconnect(absl::Status error) {
- if (disconnecting_.exchange(true, std::memory_order_relaxed)) return;
- disconnect_error_ = std::move(error);
+ RefCountedPtr connected_state;
+ {
+ MutexLock lock(&connected_state_mu_);
+ connected_state = std::move(connected_state_);
+ }
+ if (connected_state == nullptr) return;
+ connected_state->Disconnect(std::move(error));
state_.store(ConnectionState::kDisconnected, std::memory_order_relaxed);
- MutexLock lock(&state_tracker_mu_);
- state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN, disconnect_error_,
- "inproc transport disconnected");
}
absl::StatusOr AcceptCall(ClientMetadataHandle md) {
@@ -113,16 +120,54 @@ class InprocServerTransport final : public ServerTransport {
OrphanablePtr MakeClientTransport();
+ class ConnectedState : public RefCounted {
+ public:
+ ~ConnectedState() override {
+ state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN, disconnect_error_,
+ "inproc transport disconnected");
+ }
+
+ void SetReady() {
+ MutexLock lock(&state_tracker_mu_);
+ state_tracker_.SetState(GRPC_CHANNEL_READY, absl::OkStatus(),
+ "accept function set");
+ }
+
+ void Disconnect(absl::Status error) {
+ disconnect_error_ = std::move(error);
+ }
+
+ void AddWatcher(grpc_connectivity_state initial_state,
+ OrphanablePtr watcher) {
+ MutexLock lock(&state_tracker_mu_);
+ state_tracker_.AddWatcher(initial_state, std::move(watcher));
+ }
+
+ void RemoveWatcher(ConnectivityStateWatcherInterface* watcher) {
+ MutexLock lock(&state_tracker_mu_);
+ state_tracker_.RemoveWatcher(watcher);
+ }
+
+ private:
+ absl::Status disconnect_error_;
+ Mutex state_tracker_mu_;
+ ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(state_tracker_mu_){
+ "inproc_server_transport", GRPC_CHANNEL_CONNECTING};
+ };
+
+ RefCountedPtr connected_state() {
+ MutexLock lock(&connected_state_mu_);
+ return connected_state_;
+ }
+
private:
enum class ConnectionState : uint8_t { kInitial, kReady, kDisconnected };
std::atomic state_{ConnectionState::kInitial};
- std::atomic disconnecting_{false};
RefCountedPtr unstarted_call_handler_;
- absl::Status disconnect_error_;
- Mutex state_tracker_mu_;
- ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(state_tracker_mu_){
- "inproc_server_transport", GRPC_CHANNEL_CONNECTING};
+ Mutex connected_state_mu_;
+ RefCountedPtr connected_state_
+ ABSL_GUARDED_BY(connected_state_mu_) = MakeRefCounted();
const std::shared_ptr
event_engine_;
const RefCountedPtr call_arena_allocator_;
@@ -139,19 +184,27 @@ class InprocClientTransport final : public ClientTransport {
"pull_initial_metadata",
TrySeq(child_call_handler.PullClientInitialMetadata(),
[server_transport = server_transport_,
- child_call_handler](ClientMetadataHandle md) {
+ connected_state = server_transport_->connected_state(),
+ child_call_handler](ClientMetadataHandle md) mutable {
auto server_call_initiator =
server_transport->AcceptCall(std::move(md));
if (!server_call_initiator.ok()) {
return server_call_initiator.status();
}
- ForwardCall(child_call_handler,
- std::move(*server_call_initiator));
+ ForwardCall(
+ child_call_handler, std::move(*server_call_initiator),
+ [connected_state =
+ std::move(connected_state)](ServerMetadata& md) {
+ md.Set(GrpcStatusFromWire(), true);
+ });
return absl::OkStatus();
}));
}
- void Orphan() override { delete this; }
+ void Orphan() override {
+ GRPC_TRACE_LOG(inproc, INFO) << "InprocClientTransport::Orphan(): " << this;
+ Unref();
+ }
FilterStackTransport* filter_stack_transport() override { return nullptr; }
ClientTransport* client_transport() override { return this; }
@@ -170,8 +223,10 @@ class InprocClientTransport final : public ClientTransport {
const RefCountedPtr server_transport_;
};
-bool UsePromiseBasedTransport() {
- return IsPromiseBasedInprocTransportEnabled();
+bool UsePromiseBasedTransport(const ChannelArgs& channel_args) {
+ return channel_args
+ .GetBool("grpc.experimental.promise_based_inproc_transport")
+ .value_or(IsPromiseBasedInprocTransportEnabled());
}
OrphanablePtr
@@ -210,7 +265,8 @@ RefCountedPtr MakeInprocChannel(Server* server,
std::ignore = server_transport.release(); // consumed by SetupTransport
auto channel = ChannelCreate(
"inproc",
- client_channel_args.Set(GRPC_ARG_DEFAULT_AUTHORITY, "inproc.authority"),
+ client_channel_args.Set(GRPC_ARG_DEFAULT_AUTHORITY, "inproc.authority")
+ .Set(GRPC_ARG_USE_V3_STACK, true),
GRPC_CLIENT_DIRECT_CHANNEL, client_transport.release());
if (!channel.ok()) {
return MakeLameChannel("Failed to create client channel", channel.status());
@@ -235,13 +291,14 @@ grpc_channel* grpc_inproc_channel_create(grpc_server* server,
void* reserved) {
grpc_core::ApplicationCallbackExecCtx app_exec_ctx;
grpc_core::ExecCtx exec_ctx;
- if (!grpc_core::UsePromiseBasedTransport()) {
+ const auto channel_args = grpc_core::CoreConfiguration::Get()
+ .channel_args_preconditioning()
+ .PreconditionChannelArgs(args);
+ if (!grpc_core::UsePromiseBasedTransport(channel_args)) {
return grpc_legacy_inproc_channel_create(server, args, reserved);
}
return grpc_core::MakeInprocChannel(grpc_core::Server::FromC(server),
- grpc_core::CoreConfiguration::Get()
- .channel_args_preconditioning()
- .PreconditionChannelArgs(args))
+ channel_args)
.release()
->c_ptr();
}
diff --git a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
index 7c0e421c4f7..0bc7e22ab11 100644
--- a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
+++ b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
@@ -266,7 +266,6 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread() {
}
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Quiesce() {
- LOG(INFO) << "WorkStealingThreadPoolImpl::Quiesce";
SetShutdown(true);
// Wait until all threads have exited.
// Note that if this is a threadpool thread then we won't exit this thread
diff --git a/src/core/lib/promise/activity.h b/src/core/lib/promise/activity.h
index 715de5317f2..5c2a6ebfeac 100644
--- a/src/core/lib/promise/activity.h
+++ b/src/core/lib/promise/activity.h
@@ -32,8 +32,10 @@
#include
#include
+#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/event_engine_context.h"
#include "src/core/lib/gprpp/construct_destruct.h"
+#include "src/core/lib/gprpp/dump_args.h"
#include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/sync.h"
@@ -662,11 +664,17 @@ ActivityPtr MakeActivity(Factory promise_factory,
}
inline Pending IntraActivityWaiter::pending() {
- wakeups_ |= GetContext()->CurrentParticipant();
+ const auto new_wakeups = GetContext()->CurrentParticipant();
+ GRPC_TRACE_LOG(promise_primitives, INFO)
+ << "IntraActivityWaiter::pending: "
+ << GRPC_DUMP_ARGS(this, new_wakeups, wakeups_);
+ wakeups_ |= new_wakeups;
return Pending();
}
inline void IntraActivityWaiter::Wake() {
+ GRPC_TRACE_LOG(promise_primitives, INFO)
+ << "IntraActivityWaiter::Wake: " << GRPC_DUMP_ARGS(this, wakeups_);
if (wakeups_ == 0) return;
GetContext()->ForceImmediateRepoll(std::exchange(wakeups_, 0));
}
diff --git a/src/core/lib/promise/status_flag.h b/src/core/lib/promise/status_flag.h
index 132b2079c07..7a845b0ca70 100644
--- a/src/core/lib/promise/status_flag.h
+++ b/src/core/lib/promise/status_flag.h
@@ -15,6 +15,8 @@
#ifndef GRPC_SRC_CORE_LIB_PROMISE_STATUS_FLAG_H
#define GRPC_SRC_CORE_LIB_PROMISE_STATUS_FLAG_H
+#include
+
#include "absl/log/check.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
@@ -190,6 +192,16 @@ class ValueOrFailure {
absl::optional value_;
};
+template
+inline std::ostream& operator<<(std::ostream& os,
+ const ValueOrFailure& value) {
+ if (value.ok()) {
+ return os << "Success(" << *value << ")";
+ } else {
+ return os << "Failure";
+ }
+}
+
template
inline bool IsStatusOk(const ValueOrFailure& value) {
return value.ok();
diff --git a/src/core/lib/surface/call_utils.cc b/src/core/lib/surface/call_utils.cc
index 4ffcf367316..afc2b34e5d4 100644
--- a/src/core/lib/surface/call_utils.cc
+++ b/src/core/lib/surface/call_utils.cc
@@ -219,7 +219,7 @@ std::string MakeErrorString(const ServerMetadata* trailing_metadata) {
trailing_metadata->get(GrpcStatusFromWire()).value_or(false)
? "Error received from peer"
: "Error generated by client",
- "grpc_status: ",
+ " grpc_status: ",
grpc_status_code_to_string(trailing_metadata->get(GrpcStatusMetadata())
.value_or(GRPC_STATUS_UNKNOWN)));
if (const Slice* message =
diff --git a/src/core/lib/surface/channel_create.cc b/src/core/lib/surface/channel_create.cc
index 5a10e783523..d95dc9c553e 100644
--- a/src/core/lib/surface/channel_create.cc
+++ b/src/core/lib/surface/channel_create.cc
@@ -25,10 +25,12 @@
#include "src/core/channelz/channelz.h"
#include "src/core/client_channel/client_channel.h"
+#include "src/core/client_channel/direct_channel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/surface/lame_client.h"
#include "src/core/lib/surface/legacy_channel.h"
#include "src/core/telemetry/stats.h"
@@ -86,8 +88,15 @@ absl::StatusOr> ChannelCreate(
return LegacyChannel::Create(std::move(target), std::move(args),
channel_stack_type);
}
- CHECK_EQ(channel_stack_type, GRPC_CLIENT_CHANNEL);
- return ClientChannel::Create(std::move(target), std::move(args));
+ switch (channel_stack_type) {
+ case GRPC_CLIENT_CHANNEL:
+ return ClientChannel::Create(std::move(target), std::move(args));
+ case GRPC_CLIENT_DIRECT_CHANNEL:
+ return DirectChannel::Create(std::move(target), args);
+ default:
+ Crash(absl::StrCat("Invalid channel stack type for ChannelCreate: ",
+ grpc_channel_stack_type_string(channel_stack_type)));
+ }
}
} // namespace grpc_core
diff --git a/src/core/lib/surface/client_call.cc b/src/core/lib/surface/client_call.cc
index 6ca22f87b4d..a67f12371e2 100644
--- a/src/core/lib/surface/client_call.cc
+++ b/src/core/lib/surface/client_call.cc
@@ -108,6 +108,7 @@ ClientCall::ClientCall(
RefCountedPtr arena,
RefCountedPtr destination)
: Call(false, deadline, std::move(arena), event_engine),
+ DualRefCounted("ClientCall"),
cq_(cq),
call_destination_(std::move(destination)),
compression_options_(compression_options) {
@@ -153,6 +154,7 @@ void ClientCall::CancelWithError(grpc_error_handle error) {
if (call_state_.compare_exchange_strong(cur_state, kCancelled,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
+ ResetDeadline();
return;
}
break;
@@ -168,6 +170,7 @@ void ClientCall::CancelWithError(grpc_error_handle error) {
if (call_state_.compare_exchange_strong(cur_state, kCancelled,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
+ ResetDeadline();
auto* unordered_start = reinterpret_cast(cur_state);
while (unordered_start != nullptr) {
auto next = unordered_start->next;
@@ -338,6 +341,8 @@ void ClientCall::CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
[this, out_status, out_status_details, out_error_string,
out_trailing_metadata](
ServerMetadataHandle server_trailing_metadata) {
+ saw_trailing_metadata_.store(true, std::memory_order_relaxed);
+ ResetDeadline();
GRPC_TRACE_LOG(call, INFO)
<< DebugTag() << "RecvStatusOnClient "
<< server_trailing_metadata->DebugString();
diff --git a/src/core/lib/surface/client_call.h b/src/core/lib/surface/client_call.h
index 26f27d26b5b..2d97ab9b731 100644
--- a/src/core/lib/surface/client_call.h
+++ b/src/core/lib/surface/client_call.h
@@ -82,8 +82,9 @@ class ClientCall final
void InternalUnref(const char*) override { WeakUnref(); }
void Orphaned() override {
- // TODO(ctiller): only when we're not already finished
- CancelWithError(absl::CancelledError());
+ if (!saw_trailing_metadata_.load(std::memory_order_relaxed)) {
+ CancelWithError(absl::CancelledError());
+ }
}
void SetCompletionQueue(grpc_completion_queue*) override {
@@ -164,6 +165,7 @@ class ClientCall final
ServerMetadataHandle received_initial_metadata_;
ServerMetadataHandle received_trailing_metadata_;
bool is_trailers_only_;
+ std::atomic saw_trailing_metadata_{false};
};
grpc_call* MakeClientCall(
diff --git a/src/core/lib/surface/server_call.cc b/src/core/lib/surface/server_call.cc
index 92f7db547d1..3eeddfca866 100644
--- a/src/core/lib/surface/server_call.cc
+++ b/src/core/lib/surface/server_call.cc
@@ -188,6 +188,8 @@ void ServerCall::CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
[this, cancelled = op->data.recv_close_on_server.cancelled]() {
return Map(call_handler_.WasCancelled(),
[cancelled, this](bool result) -> Success {
+ saw_was_cancelled_.store(true,
+ std::memory_order_relaxed);
ResetDeadline();
*cancelled = result ? 1 : 0;
return Success{};
diff --git a/src/core/lib/surface/server_call.h b/src/core/lib/surface/server_call.h
index 9d571d111ac..728c42bfcd3 100644
--- a/src/core/lib/surface/server_call.h
+++ b/src/core/lib/surface/server_call.h
@@ -20,6 +20,7 @@
#include
#include
+#include
#include
#include
#include
@@ -80,9 +81,8 @@ class ServerCall final : public Call, public DualRefCounted {
call_handler_.SpawnInfallible(
"CancelWithError",
[self = WeakRefAsSubclass(), error = std::move(error)] {
- auto status = ServerMetadataFromStatus(error);
- status->Set(GrpcCallWasCancelled(), true);
- self->call_handler_.PushServerTrailingMetadata(std::move(status));
+ self->call_handler_.PushServerTrailingMetadata(
+ CancelledServerMetadataFromStatus(error));
return Empty{};
});
}
@@ -101,8 +101,9 @@ class ServerCall final : public Call, public DualRefCounted {
void InternalUnref(const char*) override { WeakUnref(); }
void Orphaned() override {
- // TODO(ctiller): only when we're not already finished
- CancelWithError(absl::CancelledError());
+ if (!saw_was_cancelled_.load(std::memory_order_relaxed)) {
+ CancelWithError(absl::CancelledError());
+ }
}
void SetCompletionQueue(grpc_completion_queue*) override {
@@ -155,6 +156,7 @@ class ServerCall final : public Call, public DualRefCounted {
ClientMetadataHandle client_initial_metadata_stored_;
grpc_completion_queue* const cq_;
ServerInterface* const server_;
+ std::atomic saw_was_cancelled_{false};
};
grpc_call* MakeServerCall(CallHandler call_handler,
diff --git a/src/core/lib/transport/call_filters.cc b/src/core/lib/transport/call_filters.cc
index 26c6b3e7bbd..627ebd6303a 100644
--- a/src/core/lib/transport/call_filters.cc
+++ b/src/core/lib/transport/call_filters.cc
@@ -185,7 +185,7 @@ char g_empty_call_data;
CallFilters::CallFilters(ClientMetadataHandle client_initial_metadata)
: stack_(nullptr),
call_data_(nullptr),
- client_initial_metadata_(std::move(client_initial_metadata)) {}
+ push_client_initial_metadata_(std::move(client_initial_metadata)) {}
CallFilters::~CallFilters() {
if (call_data_ != nullptr && call_data_ != &g_empty_call_data) {
@@ -209,10 +209,7 @@ void CallFilters::SetStack(RefCountedPtr stack) {
constructor.call_init(Offset(call_data_, constructor.call_offset),
constructor.channel_data);
}
- client_initial_metadata_state_.Start();
- client_to_server_message_state_.Start();
- server_initial_metadata_state_.Start();
- server_to_client_message_state_.Start();
+ call_state_.Start();
}
void CallFilters::Finalize(const grpc_call_final_info* final_info) {
@@ -224,50 +221,40 @@ void CallFilters::Finalize(const grpc_call_final_info* final_info) {
void CallFilters::CancelDueToFailedPipeOperation(SourceLocation but_where) {
// We expect something cancelled before now
- if (server_trailing_metadata_ == nullptr) return;
+ if (push_server_trailing_metadata_ == nullptr) return;
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
gpr_log(but_where.file(), but_where.line(), GPR_LOG_SEVERITY_DEBUG,
"Cancelling due to failed pipe operation: %s",
DebugString().c_str());
}
- PushServerTrailingMetadata(
- ServerMetadataFromStatus(absl::CancelledError("Failed pipe operation")));
- server_trailing_metadata_waiter_.Wake();
+ auto status =
+ ServerMetadataFromStatus(absl::CancelledError("Failed pipe operation"));
+ status->Set(GrpcCallWasCancelled(), true);
+ PushServerTrailingMetadata(std::move(status));
}
void CallFilters::PushServerTrailingMetadata(ServerMetadataHandle md) {
CHECK(md != nullptr);
- if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
+ if (GRPC_TRACE_FLAG_ENABLED(call)) {
gpr_log(GPR_INFO, "%s PushServerTrailingMetadata[%p]: %s into %s",
GetContext()->DebugTag().c_str(), this,
md->DebugString().c_str(), DebugString().c_str());
}
CHECK(md != nullptr);
- if (cancelled_.is_set()) return;
- cancelled_.Set(md->get(GrpcCallWasCancelled()).value_or(false));
- server_trailing_metadata_ = std::move(md);
- client_initial_metadata_state_.CloseWithError();
- server_initial_metadata_state_.CloseSending();
- client_to_server_message_state_.CloseWithError();
- server_to_client_message_state_.CloseSending();
- server_trailing_metadata_waiter_.Wake();
+ if (call_state_.PushServerTrailingMetadata(
+ md->get(GrpcCallWasCancelled()).value_or(false))) {
+ push_server_trailing_metadata_ = std::move(md);
+ }
}
std::string CallFilters::DebugString() const {
std::vector components = {
absl::StrFormat("this:%p", this),
- absl::StrCat("client_initial_metadata:",
- client_initial_metadata_state_.DebugString()),
- ServerInitialMetadataPromises::DebugString("server_initial_metadata",
- this),
- ClientToServerMessagePromises::DebugString("client_to_server_message",
- this),
- ServerToClientMessagePromises::DebugString("server_to_client_message",
- this),
+ absl::StrCat("state:", call_state_.DebugString()),
absl::StrCat("server_trailing_metadata:",
- server_trailing_metadata_ == nullptr
+ push_server_trailing_metadata_ == nullptr
? "not-set"
- : server_trailing_metadata_->DebugString())};
+ : push_server_trailing_metadata_->DebugString())};
return absl::StrCat("CallFilters{", absl::StrJoin(components, ", "), "}");
};
@@ -303,202 +290,634 @@ RefCountedPtr CallFilters::StackBuilder::Build() {
}
///////////////////////////////////////////////////////////////////////////////
-// CallFilters::PipeState
-
-void filters_detail::PipeState::Start() {
- DCHECK(!started_);
- started_ = true;
- wait_recv_.Wake();
-}
-
-void filters_detail::PipeState::CloseWithError() {
- if (state_ == ValueState::kClosed) return;
- state_ = ValueState::kError;
- wait_recv_.Wake();
- wait_send_.Wake();
-}
-
-Poll filters_detail::PipeState::PollClosed() {
- switch (state_) {
- case ValueState::kIdle:
- case ValueState::kWaiting:
- case ValueState::kQueued:
- case ValueState::kReady:
- case ValueState::kProcessing:
- return wait_recv_.pending();
- case ValueState::kClosed:
- return false;
- case ValueState::kError:
- return true;
+// CallState
+
+namespace filters_detail {
+
+CallState::CallState()
+ : client_to_server_pull_state_(ClientToServerPullState::kBegin),
+ client_to_server_push_state_(ClientToServerPushState::kIdle),
+ server_to_client_pull_state_(ServerToClientPullState::kUnstarted),
+ server_to_client_push_state_(ServerToClientPushState::kStart),
+ server_trailing_metadata_state_(ServerTrailingMetadataState::kNotPushed) {
+}
+
+void CallState::Start() {
+ GRPC_TRACE_LOG(call, INFO)
+ << "[call_state] Start: "
+ << GRPC_DUMP_ARGS(this, server_to_client_pull_state_);
+ switch (server_to_client_pull_state_) {
+ case ServerToClientPullState::kUnstarted:
+ server_to_client_pull_state_ = ServerToClientPullState::kStarted;
+ server_to_client_pull_waiter_.Wake();
+ break;
+ case ServerToClientPullState::kStarted:
+ case ServerToClientPullState::kProcessingServerInitialMetadata:
+ case ServerToClientPullState::kIdle:
+ case ServerToClientPullState::kReading:
+ case ServerToClientPullState::kProcessingServerToClientMessage:
+ LOG(FATAL) << "Start called twice";
+ case ServerToClientPullState::kProcessingServerTrailingMetadata:
+ case ServerToClientPullState::kTerminated:
+ break;
}
- GPR_UNREACHABLE_CODE(return Pending{});
}
-void filters_detail::PipeState::CloseSending() {
- switch (state_) {
- case ValueState::kIdle:
- state_ = ValueState::kClosed;
+void CallState::BeginPushClientToServerMessage() {
+ GRPC_TRACE_LOG(call, INFO)
+ << "[call_state] BeginPushClientToServerMessage: "
+ << GRPC_DUMP_ARGS(this, client_to_server_push_state_);
+ switch (client_to_server_push_state_) {
+ case ClientToServerPushState::kIdle:
+ client_to_server_push_state_ = ClientToServerPushState::kPushedMessage;
+ client_to_server_push_waiter_.Wake();
break;
- case ValueState::kWaiting:
- state_ = ValueState::kClosed;
- wait_recv_.Wake();
+ case ClientToServerPushState::kPushedMessage:
+ case ClientToServerPushState::kPushedMessageAndHalfClosed:
+ LOG(FATAL) << "PushClientToServerMessage called twice concurrently";
break;
- case ValueState::kClosed:
- case ValueState::kError:
+ case ClientToServerPushState::kPushedHalfClose:
+ LOG(FATAL) << "PushClientToServerMessage called after half-close";
break;
- case ValueState::kQueued:
- case ValueState::kReady:
- case ValueState::kProcessing:
- Crash("Only one push allowed to be outstanding");
+ case ClientToServerPushState::kFinished:
break;
}
}
-void filters_detail::PipeState::BeginPush() {
- switch (state_) {
- case ValueState::kIdle:
- state_ = ValueState::kQueued;
+Poll CallState::PollPushClientToServerMessage() {
+ GRPC_TRACE_LOG(call, INFO)
+ << "[call_state] PollPushClientToServerMessage: "
+ << GRPC_DUMP_ARGS(this, client_to_server_push_state_);
+ switch (client_to_server_push_state_) {
+ case ClientToServerPushState::kIdle:
+ case ClientToServerPushState::kPushedHalfClose:
+ return Success{};
+ case ClientToServerPushState::kPushedMessage:
+ case ClientToServerPushState::kPushedMessageAndHalfClosed:
+ return client_to_server_push_waiter_.pending();
+ case ClientToServerPushState::kFinished:
+ return Failure{};
+ }
+ Crash("Unreachable");
+}
+
+void CallState::ClientToServerHalfClose() {
+ GRPC_TRACE_LOG(call, INFO)
+ << "[call_state] ClientToServerHalfClose: "
+ << GRPC_DUMP_ARGS(this, client_to_server_push_state_);
+ switch (client_to_server_push_state_) {
+ case ClientToServerPushState::kIdle:
+ client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose;
+ client_to_server_push_waiter_.Wake();
break;
- case ValueState::kWaiting:
- state_ = ValueState::kReady;
- wait_recv_.Wake();
+ case ClientToServerPushState::kPushedMessage:
+ client_to_server_push_state_ =
+ ClientToServerPushState::kPushedMessageAndHalfClosed;
break;
- case ValueState::kClosed:
- case ValueState::kError:
+ case ClientToServerPushState::kPushedHalfClose:
+ case ClientToServerPushState::kPushedMessageAndHalfClosed:
+ LOG(FATAL) << "ClientToServerHalfClose called twice";
break;
- case ValueState::kQueued:
- case ValueState::kReady:
- case ValueState::kProcessing:
- Crash("Only one push allowed to be outstanding");
+ case ClientToServerPushState::kFinished:
break;
}
}
-void filters_detail::PipeState::DropPush() {
- switch (state_) {
- case ValueState::kQueued:
- case ValueState::kReady:
- case ValueState::kProcessing:
- case ValueState::kWaiting:
- if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
- gpr_log(GPR_INFO, "%p drop push in state %s", this,
- DebugString().c_str());
- }
- state_ = ValueState::kError;
- wait_recv_.Wake();
+void CallState::BeginPullClientInitialMetadata() {
+ GRPC_TRACE_LOG(call, INFO)
+ << "[call_state] BeginPullClientInitialMetadata: "
+ << GRPC_DUMP_ARGS(this, client_to_server_pull_state_);
+ switch (client_to_server_pull_state_) {
+ case ClientToServerPullState::kBegin:
+ client_to_server_pull_state_ =
+ ClientToServerPullState::kProcessingClientInitialMetadata;
+ break;
+ case ClientToServerPullState::kProcessingClientInitialMetadata:
+ case ClientToServerPullState::kIdle:
+ case ClientToServerPullState::kReading:
+ case ClientToServerPullState::kProcessingClientToServerMessage:
+ LOG(FATAL) << "BeginPullClientInitialMetadata called twice";
break;
- case ValueState::kIdle:
- case ValueState::kClosed:
- case ValueState::kError:
+ case ClientToServerPullState::kTerminated:
break;
}
}
-void filters_detail::PipeState::DropPull() {
- switch (state_) {
- case ValueState::kQueued:
- case ValueState::kReady:
- case ValueState::kProcessing:
- case ValueState::kWaiting:
- if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
- gpr_log(GPR_INFO, "%p drop pull in state %s", this,
- DebugString().c_str());
- }
- state_ = ValueState::kError;
- wait_send_.Wake();
+void CallState::FinishPullClientInitialMetadata() {
+ GRPC_TRACE_LOG(call, INFO)
+ << "[call_state] FinishPullClientInitialMetadata: "
+ << GRPC_DUMP_ARGS(this, client_to_server_pull_state_);
+ switch (client_to_server_pull_state_) {
+ case ClientToServerPullState::kBegin:
+ LOG(FATAL) << "FinishPullClientInitialMetadata called before Begin";
+ break;
+ case ClientToServerPullState::kProcessingClientInitialMetadata:
+ client_to_server_pull_state_ = ClientToServerPullState::kIdle;
+ client_to_server_pull_waiter_.Wake();
+ break;
+ case ClientToServerPullState::kIdle:
+ case ClientToServerPullState::kReading:
+ case ClientToServerPullState::kProcessingClientToServerMessage:
+ LOG(FATAL) << "Out of order FinishPullClientInitialMetadata";
+ break;
+ case ClientToServerPullState::kTerminated:
+ break;
+ }
+}
+
+Poll> CallState::PollPullClientToServerMessageAvailable() {
+ GRPC_TRACE_LOG(call, INFO)
+ << "[call_state] PollPullClientToServerMessageAvailable: "
+ << GRPC_DUMP_ARGS(this, client_to_server_pull_state_,
+ client_to_server_push_state_);
+ switch (client_to_server_pull_state_) {
+ case ClientToServerPullState::kBegin:
+ case ClientToServerPullState::kProcessingClientInitialMetadata:
+ return client_to_server_pull_waiter_.pending();
+ case ClientToServerPullState::kIdle:
+ client_to_server_pull_state_ = ClientToServerPullState::kReading;
+ ABSL_FALLTHROUGH_INTENDED;
+ case ClientToServerPullState::kReading:
+ break;
+ case ClientToServerPullState::kProcessingClientToServerMessage:
+ LOG(FATAL) << "PollPullClientToServerMessageAvailable called while "
+ "processing a message";
+ break;
+ case ClientToServerPullState::kTerminated:
+ return Failure{};
+ }
+ DCHECK_EQ(client_to_server_pull_state_, ClientToServerPullState::kReading);
+ switch (client_to_server_push_state_) {
+ case ClientToServerPushState::kIdle:
+ return client_to_server_push_waiter_.pending();
+ case ClientToServerPushState::kPushedMessage:
+ case ClientToServerPushState::kPushedMessageAndHalfClosed:
+ client_to_server_pull_state_ =
+ ClientToServerPullState::kProcessingClientToServerMessage;
+ return true;
+ case ClientToServerPushState::kPushedHalfClose:
+ return false;
+ case ClientToServerPushState::kFinished:
+ client_to_server_pull_state_ = ClientToServerPullState::kTerminated;
+ return Failure{};
+ }
+ Crash("Unreachable");
+}
+
+void CallState::FinishPullClientToServerMessage() {
+ GRPC_TRACE_LOG(call, INFO)
+ << "[call_state] FinishPullClientToServerMessage: "
+ << GRPC_DUMP_ARGS(this, client_to_server_pull_state_,
+ client_to_server_push_state_);
+ switch (client_to_server_pull_state_) {
+ case ClientToServerPullState::kBegin:
+ case ClientToServerPullState::kProcessingClientInitialMetadata:
+ LOG(FATAL) << "FinishPullClientToServerMessage called before Begin";
+ break;
+ case ClientToServerPullState::kIdle:
+ LOG(FATAL) << "FinishPullClientToServerMessage called twice";
+ break;
+ case ClientToServerPullState::kReading:
+ LOG(FATAL) << "FinishPullClientToServerMessage called before "
+ "PollPullClientToServerMessageAvailable";
+ break;
+ case ClientToServerPullState::kProcessingClientToServerMessage:
+ client_to_server_pull_state_ = ClientToServerPullState::kIdle;
+ client_to_server_pull_waiter_.Wake();
+ break;
+ case ClientToServerPullState::kTerminated:
+ break;
+ }
+ switch (client_to_server_push_state_) {
+ case ClientToServerPushState::kPushedMessage:
+ client_to_server_push_state_ = ClientToServerPushState::kIdle;
+ client_to_server_push_waiter_.Wake();
+ break;
+ case ClientToServerPushState::kIdle:
+ case ClientToServerPushState::kPushedHalfClose:
+ LOG(FATAL) << "FinishPullClientToServerMessage called without a message";
+ break;
+ case ClientToServerPushState::kPushedMessageAndHalfClosed:
+ client_to_server_push_state_ = ClientToServerPushState::kPushedHalfClose;
+ client_to_server_push_waiter_.Wake();
+ break;
+ case ClientToServerPushState::kFinished:
+ break;
+ }
+}
+
+StatusFlag CallState::PushServerInitialMetadata() {
+ GRPC_TRACE_LOG(call, INFO)
+ << "[call_state] PushServerInitialMetadata: "
+ << GRPC_DUMP_ARGS(this, server_to_client_push_state_,
+ server_trailing_metadata_state_);
+ if (server_trailing_metadata_state_ !=
+ ServerTrailingMetadataState::kNotPushed) {
+ return Failure{};
+ }
+ CHECK_EQ(server_to_client_push_state_, ServerToClientPushState::kStart);
+ server_to_client_push_state_ =
+ ServerToClientPushState::kPushedServerInitialMetadata;
+ server_to_client_push_waiter_.Wake();
+ return Success{};
+}
+
+void CallState::BeginPushServerToClientMessage() {
+ GRPC_TRACE_LOG(call, INFO)
+ << "[call_state] BeginPushServerToClientMessage: "
+ << GRPC_DUMP_ARGS(this, server_to_client_push_state_);
+ switch (server_to_client_push_state_) {
+ case ServerToClientPushState::kStart:
+ LOG(FATAL) << "BeginPushServerToClientMessage called before "
+ "PushServerInitialMetadata";
+ break;
+ case ServerToClientPushState::kPushedServerInitialMetadata:
+ server_to_client_push_state_ =
+ ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage;
break;
- case ValueState::kIdle:
- case ValueState::kClosed:
- case ValueState::kError:
+ case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
+ case ServerToClientPushState::kPushedMessage:
+ LOG(FATAL) << "BeginPushServerToClientMessage called twice concurrently";
+ break;
+ case ServerToClientPushState::kTrailersOnly:
+ // Will fail in poll.
+ break;
+ case ServerToClientPushState::kIdle:
+ server_to_client_push_state_ = ServerToClientPushState::kPushedMessage;
+ server_to_client_push_waiter_.Wake();
+ break;
+ case ServerToClientPushState::kFinished:
break;
}
}
-Poll filters_detail::PipeState::PollPush() {
- switch (state_) {
- // Read completed and new read started => we see waiting here
- case ValueState::kWaiting:
- state_ = ValueState::kReady;
- wait_recv_.Wake();
- return wait_send_.pending();
- case ValueState::kIdle:
- case ValueState::kClosed:
+Poll CallState::PollPushServerToClientMessage() {
+ GRPC_TRACE_LOG(call, INFO)
+ << "[call_state] PollPushServerToClientMessage: "
+ << GRPC_DUMP_ARGS(this, server_to_client_push_state_);
+ switch (server_to_client_push_state_) {
+ case ServerToClientPushState::kStart:
+ case ServerToClientPushState::kPushedServerInitialMetadata:
+ LOG(FATAL) << "PollPushServerToClientMessage called before "
+ << "PushServerInitialMetadata";
+ case ServerToClientPushState::kTrailersOnly:
+ return false;
+ case ServerToClientPushState::kPushedMessage:
+ case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
+ return server_to_client_push_waiter_.pending();
+ case ServerToClientPushState::kIdle:
return Success{};
- case ValueState::kQueued:
- case ValueState::kReady:
- case ValueState::kProcessing:
- return wait_send_.pending();
- case ValueState::kError:
+ case ServerToClientPushState::kFinished:
return Failure{};
}
- GPR_UNREACHABLE_CODE(return Pending{});
+ Crash("Unreachable");
+}
+
+bool CallState::PushServerTrailingMetadata(bool cancel) {
+ GRPC_TRACE_LOG(call, INFO)
+ << "[call_state] PushServerTrailingMetadata: "
+ << GRPC_DUMP_ARGS(this, cancel, server_trailing_metadata_state_,
+ server_to_client_push_state_,
+ client_to_server_push_state_,
+ server_trailing_metadata_waiter_.DebugString());
+ if (server_trailing_metadata_state_ !=
+ ServerTrailingMetadataState::kNotPushed) {
+ return false;
+ }
+ server_trailing_metadata_state_ =
+ cancel ? ServerTrailingMetadataState::kPushedCancel
+ : ServerTrailingMetadataState::kPushed;
+ server_trailing_metadata_waiter_.Wake();
+ switch (server_to_client_push_state_) {
+ case ServerToClientPushState::kStart:
+ server_to_client_push_state_ = ServerToClientPushState::kTrailersOnly;
+ server_to_client_push_waiter_.Wake();
+ break;
+ case ServerToClientPushState::kPushedServerInitialMetadata:
+ case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
+ case ServerToClientPushState::kPushedMessage:
+ if (cancel) {
+ server_to_client_push_state_ = ServerToClientPushState::kFinished;
+ server_to_client_push_waiter_.Wake();
+ }
+ break;
+ case ServerToClientPushState::kIdle:
+ if (cancel) {
+ server_to_client_push_state_ = ServerToClientPushState::kFinished;
+ server_to_client_push_waiter_.Wake();
+ }
+ break;
+ case ServerToClientPushState::kFinished:
+ case ServerToClientPushState::kTrailersOnly:
+ break;
+ }
+ switch (client_to_server_push_state_) {
+ case ClientToServerPushState::kIdle:
+ client_to_server_push_state_ = ClientToServerPushState::kFinished;
+ client_to_server_push_waiter_.Wake();
+ break;
+ case ClientToServerPushState::kPushedMessage:
+ case ClientToServerPushState::kPushedMessageAndHalfClosed:
+ client_to_server_push_state_ = ClientToServerPushState::kFinished;
+ client_to_server_push_waiter_.Wake();
+ break;
+ case ClientToServerPushState::kPushedHalfClose:
+ case ClientToServerPushState::kFinished:
+ break;
+ }
+ return true;
}
-Poll> filters_detail::PipeState::PollPull() {
- switch (state_) {
- case ValueState::kWaiting:
- return wait_recv_.pending();
- case ValueState::kIdle:
- state_ = ValueState::kWaiting;
- return wait_recv_.pending();
- case ValueState::kReady:
- case ValueState::kQueued:
- if (!started_) return wait_recv_.pending();
- state_ = ValueState::kProcessing;
+Poll CallState::PollPullServerInitialMetadataAvailable() {
+ GRPC_TRACE_LOG(call, INFO)
+ << "[call_state] PollPullServerInitialMetadataAvailable: "
+ << GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
+ server_to_client_push_state_);
+ switch (server_to_client_pull_state_) {
+ case ServerToClientPullState::kUnstarted:
+ if (server_to_client_push_state_ ==
+ ServerToClientPushState::kTrailersOnly) {
+ server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
+ return false;
+ }
+ server_to_client_push_waiter_.pending();
+ return server_to_client_pull_waiter_.pending();
+ case ServerToClientPullState::kStarted:
+ break;
+ case ServerToClientPullState::kProcessingServerInitialMetadata:
+ case ServerToClientPullState::kIdle:
+ case ServerToClientPullState::kReading:
+ case ServerToClientPullState::kProcessingServerToClientMessage:
+ case ServerToClientPullState::kProcessingServerTrailingMetadata:
+ LOG(FATAL) << "PollPullServerInitialMetadataAvailable called twice";
+ case ServerToClientPullState::kTerminated:
+ return false;
+ }
+ DCHECK_EQ(server_to_client_pull_state_, ServerToClientPullState::kStarted);
+ switch (server_to_client_push_state_) {
+ case ServerToClientPushState::kStart:
+ return server_to_client_push_waiter_.pending();
+ case ServerToClientPushState::kPushedServerInitialMetadata:
+ case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
+ server_to_client_pull_state_ =
+ ServerToClientPullState::kProcessingServerInitialMetadata;
+ server_to_client_pull_waiter_.Wake();
return true;
- case ValueState::kProcessing:
- Crash("Only one pull allowed to be outstanding");
- case ValueState::kClosed:
+ case ServerToClientPushState::kIdle:
+ case ServerToClientPushState::kPushedMessage:
+ LOG(FATAL)
+ << "PollPullServerInitialMetadataAvailable after metadata processed";
+ case ServerToClientPushState::kFinished:
+ server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
+ server_to_client_pull_waiter_.Wake();
+ return false;
+ case ServerToClientPushState::kTrailersOnly:
return false;
- case ValueState::kError:
- return Failure{};
}
- GPR_UNREACHABLE_CODE(return Pending{});
+ Crash("Unreachable");
}
-void filters_detail::PipeState::AckPull() {
- switch (state_) {
- case ValueState::kProcessing:
- state_ = ValueState::kIdle;
- wait_send_.Wake();
+void CallState::FinishPullServerInitialMetadata() {
+ GRPC_TRACE_LOG(call, INFO)
+ << "[call_state] FinishPullServerInitialMetadata: "
+ << GRPC_DUMP_ARGS(this, server_to_client_pull_state_);
+ switch (server_to_client_pull_state_) {
+ case ServerToClientPullState::kUnstarted:
+ LOG(FATAL) << "FinishPullServerInitialMetadata called before Start";
+ case ServerToClientPullState::kStarted:
+ CHECK_EQ(server_to_client_push_state_,
+ ServerToClientPushState::kTrailersOnly);
+ return;
+ case ServerToClientPullState::kProcessingServerInitialMetadata:
+ server_to_client_pull_state_ = ServerToClientPullState::kIdle;
+ server_to_client_pull_waiter_.Wake();
break;
- case ValueState::kWaiting:
- case ValueState::kIdle:
- case ValueState::kQueued:
- case ValueState::kReady:
- case ValueState::kClosed:
- Crash("AckPullValue called in invalid state");
- case ValueState::kError:
+ case ServerToClientPullState::kIdle:
+ case ServerToClientPullState::kReading:
+ case ServerToClientPullState::kProcessingServerToClientMessage:
+ case ServerToClientPullState::kProcessingServerTrailingMetadata:
+ LOG(FATAL) << "Out of order FinishPullServerInitialMetadata";
+ case ServerToClientPullState::kTerminated:
+ return;
+ }
+ DCHECK_EQ(server_to_client_pull_state_, ServerToClientPullState::kIdle);
+ switch (server_to_client_push_state_) {
+ case ServerToClientPushState::kStart:
+ LOG(FATAL) << "FinishPullServerInitialMetadata called before initial "
+ "metadata consumed";
+ case ServerToClientPushState::kPushedServerInitialMetadata:
+ server_to_client_push_state_ = ServerToClientPushState::kIdle;
+ server_to_client_push_waiter_.Wake();
+ break;
+ case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
+ server_to_client_push_state_ = ServerToClientPushState::kPushedMessage;
+ server_to_client_pull_waiter_.Wake();
break;
+ case ServerToClientPushState::kIdle:
+ case ServerToClientPushState::kPushedMessage:
+ case ServerToClientPushState::kTrailersOnly:
+ case ServerToClientPushState::kFinished:
+ LOG(FATAL) << "FinishPullServerInitialMetadata called twice";
}
}
-std::string filters_detail::PipeState::DebugString() const {
- const char* state_str = "<>";
- switch (state_) {
- case ValueState::kIdle:
- state_str = "Idle";
+Poll> CallState::PollPullServerToClientMessageAvailable() {
+ GRPC_TRACE_LOG(call, INFO)
+ << "[call_state] PollPullServerToClientMessageAvailable: "
+ << GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
+ server_to_client_push_state_,
+ server_trailing_metadata_state_);
+ switch (server_to_client_pull_state_) {
+ case ServerToClientPullState::kUnstarted:
+ case ServerToClientPullState::kProcessingServerInitialMetadata:
+ return server_to_client_pull_waiter_.pending();
+ case ServerToClientPullState::kStarted:
+ if (server_to_client_push_state_ ==
+ ServerToClientPushState::kTrailersOnly) {
+ return false;
+ }
+ return server_to_client_pull_waiter_.pending();
+ case ServerToClientPullState::kIdle:
+ server_to_client_pull_state_ = ServerToClientPullState::kReading;
+ ABSL_FALLTHROUGH_INTENDED;
+ case ServerToClientPullState::kReading:
break;
- case ValueState::kWaiting:
- state_str = "Waiting";
+ case ServerToClientPullState::kProcessingServerToClientMessage:
+ LOG(FATAL) << "PollPullServerToClientMessageAvailable called while "
+ "processing a message";
+ case ServerToClientPullState::kProcessingServerTrailingMetadata:
+ LOG(FATAL) << "PollPullServerToClientMessageAvailable called while "
+ "processing trailing metadata";
+ case ServerToClientPullState::kTerminated:
+ return Failure{};
+ }
+ DCHECK_EQ(server_to_client_pull_state_, ServerToClientPullState::kReading);
+ switch (server_to_client_push_state_) {
+ case ServerToClientPushState::kStart:
+ case ServerToClientPushState::kPushedServerInitialMetadata:
+ case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
+ return server_to_client_push_waiter_.pending();
+ case ServerToClientPushState::kIdle:
+ if (server_trailing_metadata_state_ !=
+ ServerTrailingMetadataState::kNotPushed) {
+ return false;
+ }
+ server_trailing_metadata_waiter_.pending();
+ return server_to_client_push_waiter_.pending();
+ case ServerToClientPushState::kTrailersOnly:
+ DCHECK_NE(server_trailing_metadata_state_,
+ ServerTrailingMetadataState::kNotPushed);
+ return false;
+ case ServerToClientPushState::kPushedMessage:
+ server_to_client_pull_state_ =
+ ServerToClientPullState::kProcessingServerToClientMessage;
+ server_to_client_pull_waiter_.Wake();
+ return true;
+ case ServerToClientPushState::kFinished:
+ server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
+ server_to_client_pull_waiter_.Wake();
+ return Failure{};
+ }
+ Crash("Unreachable");
+}
+
+void CallState::FinishPullServerToClientMessage() {
+ GRPC_TRACE_LOG(call, INFO)
+ << "[call_state] FinishPullServerToClientMessage: "
+ << GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
+ server_to_client_push_state_);
+ switch (server_to_client_pull_state_) {
+ case ServerToClientPullState::kUnstarted:
+ case ServerToClientPullState::kStarted:
+ case ServerToClientPullState::kProcessingServerInitialMetadata:
+ LOG(FATAL)
+ << "FinishPullServerToClientMessage called before metadata available";
+ case ServerToClientPullState::kIdle:
+ LOG(FATAL) << "FinishPullServerToClientMessage called twice";
+ case ServerToClientPullState::kReading:
+ LOG(FATAL) << "FinishPullServerToClientMessage called before "
+ << "PollPullServerToClientMessageAvailable";
+ case ServerToClientPullState::kProcessingServerToClientMessage:
+ server_to_client_pull_state_ = ServerToClientPullState::kIdle;
+ server_to_client_pull_waiter_.Wake();
break;
- case ValueState::kQueued:
- state_str = "Queued";
+ case ServerToClientPullState::kProcessingServerTrailingMetadata:
+ LOG(FATAL) << "FinishPullServerToClientMessage called while processing "
+ "trailing metadata";
+ case ServerToClientPullState::kTerminated:
break;
- case ValueState::kReady:
- state_str = "Ready";
+ }
+ switch (server_to_client_push_state_) {
+ case ServerToClientPushState::kPushedServerInitialMetadataAndPushedMessage:
+ case ServerToClientPushState::kPushedServerInitialMetadata:
+ case ServerToClientPushState::kStart:
+ LOG(FATAL) << "FinishPullServerToClientMessage called before initial "
+ "metadata consumed";
+ case ServerToClientPushState::kTrailersOnly:
+ LOG(FATAL) << "FinishPullServerToClientMessage called after "
+ "PushServerTrailingMetadata";
+ case ServerToClientPushState::kPushedMessage:
+ server_to_client_push_state_ = ServerToClientPushState::kIdle;
+ server_to_client_push_waiter_.Wake();
break;
- case ValueState::kProcessing:
- state_str = "Processing";
+ case ServerToClientPushState::kIdle:
+ LOG(FATAL) << "FinishPullServerToClientMessage called without a message";
+ case ServerToClientPushState::kFinished:
break;
- case ValueState::kClosed:
- state_str = "Closed";
+ }
+}
+
+Poll CallState::PollServerTrailingMetadataAvailable() {
+ GRPC_TRACE_LOG(call, INFO)
+ << "[call_state] PollServerTrailingMetadataAvailable: "
+ << GRPC_DUMP_ARGS(this, server_to_client_pull_state_,
+ server_trailing_metadata_state_,
+ server_trailing_metadata_waiter_.DebugString());
+ switch (server_to_client_pull_state_) {
+ case ServerToClientPullState::kProcessingServerInitialMetadata:
+ case ServerToClientPullState::kProcessingServerToClientMessage:
+ return server_to_client_pull_waiter_.pending();
+ case ServerToClientPullState::kStarted:
+ case ServerToClientPullState::kUnstarted:
+ case ServerToClientPullState::kIdle:
+ case ServerToClientPullState::kReading:
+ if (server_trailing_metadata_state_ !=
+ ServerTrailingMetadataState::kNotPushed) {
+ server_to_client_pull_state_ =
+ ServerToClientPullState::kProcessingServerTrailingMetadata;
+ server_to_client_pull_waiter_.Wake();
+ return Empty{};
+ }
+ return server_trailing_metadata_waiter_.pending();
+ case ServerToClientPullState::kProcessingServerTrailingMetadata:
+ LOG(FATAL) << "PollServerTrailingMetadataAvailable called twice";
+ case ServerToClientPullState::kTerminated:
+ return Empty{};
+ }
+ Crash("Unreachable");
+}
+
+void CallState::FinishPullServerTrailingMetadata() {
+ GRPC_TRACE_LOG(call, INFO)
+ << "[call_state] FinishPullServerTrailingMetadata: "
+ << GRPC_DUMP_ARGS(this, server_trailing_metadata_state_,
+ server_trailing_metadata_waiter_.DebugString());
+ switch (server_trailing_metadata_state_) {
+ case ServerTrailingMetadataState::kNotPushed:
+ LOG(FATAL) << "FinishPullServerTrailingMetadata called before "
+ "PollServerTrailingMetadataAvailable";
+ case ServerTrailingMetadataState::kPushed:
+ server_trailing_metadata_state_ = ServerTrailingMetadataState::kPulled;
+ server_trailing_metadata_waiter_.Wake();
break;
- case ValueState::kError:
- state_str = "Error";
+ case ServerTrailingMetadataState::kPushedCancel:
+ server_trailing_metadata_state_ =
+ ServerTrailingMetadataState::kPulledCancel;
+ server_trailing_metadata_waiter_.Wake();
break;
+ case ServerTrailingMetadataState::kPulled:
+ case ServerTrailingMetadataState::kPulledCancel:
+ LOG(FATAL) << "FinishPullServerTrailingMetadata called twice";
}
- return absl::StrCat(state_str, started_ ? "" : " (not started)");
}
+Poll CallState::PollWasCancelled() {
+ GRPC_TRACE_LOG(call, INFO)
+ << "[call_state] PollWasCancelled: "
+ << GRPC_DUMP_ARGS(this, server_trailing_metadata_state_);
+ switch (server_trailing_metadata_state_) {
+ case ServerTrailingMetadataState::kNotPushed:
+ case ServerTrailingMetadataState::kPushed:
+ case ServerTrailingMetadataState::kPushedCancel: {
+ return server_trailing_metadata_waiter_.pending();
+ }
+ case ServerTrailingMetadataState::kPulled:
+ return false;
+ case ServerTrailingMetadataState::kPulledCancel:
+ return true;
+ }
+ Crash("Unreachable");
+}
+
+std::string CallState::DebugString() const {
+ return absl::StrCat(
+ "client_to_server_pull_state:", client_to_server_pull_state_,
+ " client_to_server_push_state:", client_to_server_push_state_,
+ " server_to_client_pull_state:", server_to_client_pull_state_,
+ " server_to_client_message_push_state:", server_to_client_push_state_,
+ " server_trailing_metadata_state:", server_trailing_metadata_state_,
+ client_to_server_push_waiter_.DebugString(),
+ " server_to_client_push_waiter:",
+ server_to_client_push_waiter_.DebugString(),
+ " client_to_server_pull_waiter:",
+ client_to_server_pull_waiter_.DebugString(),
+ " server_to_client_pull_waiter:",
+ server_to_client_pull_waiter_.DebugString(),
+ " server_trailing_metadata_waiter:",
+ server_trailing_metadata_waiter_.DebugString());
+}
+
+static_assert(sizeof(CallState) <= 16, "CallState too large");
+
+} // namespace filters_detail
} // namespace grpc_core
diff --git a/src/core/lib/transport/call_filters.h b/src/core/lib/transport/call_filters.h
index 04296316fe8..b4f98dbe4f3 100644
--- a/src/core/lib/transport/call_filters.h
+++ b/src/core/lib/transport/call_filters.h
@@ -17,6 +17,7 @@
#include
#include
+#include
#include
#include "absl/log/check.h"
@@ -26,10 +27,13 @@
#include "src/core/lib/gprpp/dump_args.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/promise/if.h"
#include "src/core/lib/promise/latch.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/promise.h"
+#include "src/core/lib/promise/seq.h"
#include "src/core/lib/promise/status_flag.h"
+#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/transport/call_final_info.h"
#include "src/core/lib/transport/message.h"
#include "src/core/lib/transport/metadata.h"
@@ -733,7 +737,7 @@ struct AddOpImpl<
if (r->ok()) {
return ResultOr{std::move(value), nullptr};
}
- return ResultOr{nullptr, ServerMetadataFromStatus(*r)};
+ return ResultOr{nullptr, CancelledServerMetadataFromStatus(*r)};
}
private:
@@ -788,7 +792,7 @@ struct AddOpImpl<
if (r->ok()) {
return ResultOr{std::move(value), nullptr};
}
- return ResultOr{nullptr, ServerMetadataFromStatus(*r)};
+ return ResultOr{nullptr, CancelledServerMetadataFromStatus(*r)};
}
private:
@@ -838,7 +842,8 @@ struct AddOpImpl~Promise();
if (r->ok()) return ResultOr{std::move(**r), nullptr};
- return ResultOr{nullptr, ServerMetadataFromStatus(r->status())};
+ return ResultOr{nullptr,
+ CancelledServerMetadataFromStatus(r->status())};
}
private:
@@ -1186,74 +1191,213 @@ class InfallibleOperationExecutor {
const InfallibleOperator* end_ops_;
};
-// The current state of a pipe.
-// CallFilters expose a set of pipe like objects for client & server initial
-// metadata and for messages.
-// This class tracks the state of one of those pipes.
-// Size matters here: this state is kept for the lifetime of a call, and we keep
-// multiple of them.
-// This class encapsulates the untyped work of the state machine; there are
-// typed wrappers around this class as private members of CallFilters that
-// augment it to provide all the functionality that we must.
-class PipeState {
+class CallState {
public:
- struct StartPushed {};
- PipeState() = default;
- explicit PipeState(StartPushed) : state_(ValueState::kQueued) {}
- // Start the pipe: allows pulls to proceed
+ CallState();
+ // Start the call: allows pulls to proceed
void Start();
- // A push operation is beginning
- void BeginPush();
- // A previously started push operation has completed
- void DropPush();
- // Poll for push completion: occurs after the corresponding Pull()
- Poll PollPush();
- // Poll for pull completion; returns Failure{} if closed with error,
- // true if a value is available, or false if the pipe was closed without
- // error.
- Poll> PollPull();
- // A pulled value has been consumed: we can unblock the push
- void AckPull();
- // A previously started pull operation has completed
- void DropPull();
- // Close sending
- void CloseSending();
- // Close sending with error
- void CloseWithError();
- // Poll for closedness - if true, closed with error
- Poll PollClosed();
-
- bool holds_error() const { return state_ == ValueState::kError; }
-
+ // PUSH: client -> server
+ void BeginPushClientToServerMessage();
+ Poll PollPushClientToServerMessage();
+ void ClientToServerHalfClose();
+ // PULL: client -> server
+ void BeginPullClientInitialMetadata();
+ void FinishPullClientInitialMetadata();
+ Poll> PollPullClientToServerMessageAvailable();
+ void FinishPullClientToServerMessage();
+ // PUSH: server -> client
+ StatusFlag PushServerInitialMetadata();
+ void BeginPushServerToClientMessage();
+ Poll PollPushServerToClientMessage();
+ bool PushServerTrailingMetadata(bool cancel);
+ // PULL: server -> client
+ Poll PollPullServerInitialMetadataAvailable();
+ void FinishPullServerInitialMetadata();
+ Poll> PollPullServerToClientMessageAvailable();
+ void FinishPullServerToClientMessage();
+ Poll PollServerTrailingMetadataAvailable();
+ void FinishPullServerTrailingMetadata();
+ Poll PollWasCancelled();
+ // Debug
std::string DebugString() const;
+ friend std::ostream& operator<<(std::ostream& out,
+ const CallState& call_state) {
+ return out << call_state.DebugString();
+ }
+
private:
- enum class ValueState : uint8_t {
- // Nothing sending nor receiving
+ enum class ClientToServerPullState : uint16_t {
+ // Ready to read: client initial metadata is there, but not yet processed
+ kBegin,
+ // Processing client initial metadata
+ kProcessingClientInitialMetadata,
+ // Main call loop: not reading
kIdle,
- // Sent, but not yet received
- kQueued,
- // Trying to receive, but not yet sent
- kWaiting,
- // Ready to start processing, but not yet started
- // (we have the value to send through the pipe, the reader is waiting,
- // but it's not yet been polled)
- kReady,
- // Processing through filters
- kProcessing,
- // Closed sending
- kClosed,
- // Closed due to failure
- kError
+ // Main call loop: reading but no message available
+ kReading,
+ // Main call loop: processing one message
+ kProcessingClientToServerMessage,
+ // Processing complete
+ kTerminated,
+ };
+ static const char* ClientToServerPullStateString(
+ ClientToServerPullState state) {
+ switch (state) {
+ case ClientToServerPullState::kBegin:
+ return "Begin";
+ case ClientToServerPullState::kProcessingClientInitialMetadata:
+ return "ProcessingClientInitialMetadata";
+ case ClientToServerPullState::kIdle:
+ return "Idle";
+ case ClientToServerPullState::kReading:
+ return "Reading";
+ case ClientToServerPullState::kProcessingClientToServerMessage:
+ return "ProcessingClientToServerMessage";
+ case ClientToServerPullState::kTerminated:
+ return "Terminated";
+ }
+ }
+ friend std::ostream& operator<<(std::ostream& out,
+ ClientToServerPullState state) {
+ return out << ClientToServerPullStateString(state);
+ }
+ enum class ClientToServerPushState : uint16_t {
+ kIdle,
+ kPushedMessage,
+ kPushedHalfClose,
+ kPushedMessageAndHalfClosed,
+ kFinished,
+ };
+ static const char* ClientToServerPushStateString(
+ ClientToServerPushState state) {
+ switch (state) {
+ case ClientToServerPushState::kIdle:
+ return "Idle";
+ case ClientToServerPushState::kPushedMessage:
+ return "PushedMessage";
+ case ClientToServerPushState::kPushedHalfClose:
+ return "PushedHalfClose";
+ case ClientToServerPushState::kPushedMessageAndHalfClosed:
+ return "PushedMessageAndHalfClosed";
+ case ClientToServerPushState::kFinished:
+ return "Finished";
+ }
+ }
+ friend std::ostream& operator<<(std::ostream& out,
+ ClientToServerPushState state) {
+ return out << ClientToServerPushStateString(state);
+ }
+ enum class ServerToClientPullState : uint16_t {
+ // Not yet started: cannot read
+ kUnstarted,
+ kStarted,
+ // Processing server initial metadata
+ kProcessingServerInitialMetadata,
+ // Main call loop: not reading
+ kIdle,
+ // Main call loop: reading but no message available
+ kReading,
+ // Main call loop: processing one message
+ kProcessingServerToClientMessage,
+ // Processing server trailing metadata
+ kProcessingServerTrailingMetadata,
+ kTerminated,
+ };
+ static const char* ServerToClientPullStateString(
+ ServerToClientPullState state) {
+ switch (state) {
+ case ServerToClientPullState::kUnstarted:
+ return "Unstarted";
+ case ServerToClientPullState::kStarted:
+ return "Started";
+ case ServerToClientPullState::kProcessingServerInitialMetadata:
+ return "ProcessingServerInitialMetadata";
+ case ServerToClientPullState::kIdle:
+ return "Idle";
+ case ServerToClientPullState::kReading:
+ return "Reading";
+ case ServerToClientPullState::kProcessingServerToClientMessage:
+ return "ProcessingServerToClientMessage";
+ case ServerToClientPullState::kProcessingServerTrailingMetadata:
+ return "ProcessingServerTrailingMetadata";
+ case ServerToClientPullState::kTerminated:
+ return "Terminated";
+ }
+ }
+ friend std::ostream& operator<<(std::ostream& out,
+ ServerToClientPullState state) {
+ return out << ServerToClientPullStateString(state);
+ }
+ enum class ServerToClientPushState : uint16_t {
+ kStart,
+ kPushedServerInitialMetadata,
+ kPushedServerInitialMetadataAndPushedMessage,
+ kTrailersOnly,
+ kIdle,
+ kPushedMessage,
+ kFinished,
+ };
+ static const char* ServerToClientPushStateString(
+ ServerToClientPushState state) {
+ switch (state) {
+ case ServerToClientPushState::kStart:
+ return "Start";
+ case ServerToClientPushState::kPushedServerInitialMetadata:
+ return "PushedServerInitialMetadata";
+ case ServerToClientPushState::
+ kPushedServerInitialMetadataAndPushedMessage:
+ return "PushedServerInitialMetadataAndPushedMessage";
+ case ServerToClientPushState::kTrailersOnly:
+ return "TrailersOnly";
+ case ServerToClientPushState::kIdle:
+ return "Idle";
+ case ServerToClientPushState::kPushedMessage:
+ return "PushedMessage";
+ case ServerToClientPushState::kFinished:
+ return "Finished";
+ }
+ }
+ friend std::ostream& operator<<(std::ostream& out,
+ ServerToClientPushState state) {
+ return out << ServerToClientPushStateString(state);
+ }
+ enum class ServerTrailingMetadataState : uint16_t {
+ kNotPushed,
+ kPushed,
+ kPushedCancel,
+ kPulled,
+ kPulledCancel,
};
- // Waiter for a promise blocked waiting to send.
- IntraActivityWaiter wait_send_;
- // Waiter for a promise blocked waiting to receive.
- IntraActivityWaiter wait_recv_;
- // Current state.
- ValueState state_ = ValueState::kIdle;
- // Has the pipe been started?
- bool started_ = false;
+ static const char* ServerTrailingMetadataStateString(
+ ServerTrailingMetadataState state) {
+ switch (state) {
+ case ServerTrailingMetadataState::kNotPushed:
+ return "NotPushed";
+ case ServerTrailingMetadataState::kPushed:
+ return "Pushed";
+ case ServerTrailingMetadataState::kPushedCancel:
+ return "PushedCancel";
+ case ServerTrailingMetadataState::kPulled:
+ return "Pulled";
+ case ServerTrailingMetadataState::kPulledCancel:
+ return "PulledCancel";
+ }
+ }
+ friend std::ostream& operator<<(std::ostream& out,
+ ServerTrailingMetadataState state) {
+ return out << ServerTrailingMetadataStateString(state);
+ }
+ ClientToServerPullState client_to_server_pull_state_ : 3;
+ ClientToServerPushState client_to_server_push_state_ : 3;
+ ServerToClientPullState server_to_client_pull_state_ : 4;
+ ServerToClientPushState server_to_client_push_state_ : 3;
+ ServerTrailingMetadataState server_trailing_metadata_state_ : 3;
+ IntraActivityWaiter client_to_server_pull_waiter_;
+ IntraActivityWaiter server_to_client_pull_waiter_;
+ IntraActivityWaiter client_to_server_push_waiter_;
+ IntraActivityWaiter server_to_client_push_waiter_;
+ IntraActivityWaiter server_trailing_metadata_waiter_;
};
template
@@ -1428,38 +1572,152 @@ class CallFilters {
// Access client initial metadata before it's processed
ClientMetadata* unprocessed_client_initial_metadata() {
- return client_initial_metadata_.get();
+ return push_client_initial_metadata_.get();
}
+ private:
+ template
+ Poll> FinishStep(
+ Poll> p) {
+ auto* r = p.value_if_ready();
+ if (r == nullptr) return Pending{};
+ (call_state_.*on_done)();
+ if (r->ok != nullptr) {
+ return ValueOrFailure