diff --git a/BUILD b/BUILD
index 044350b4d8c..b751f1d5fc5 100644
--- a/BUILD
+++ b/BUILD
@@ -701,6 +701,7 @@ grpc_cc_library(
external_deps = [
"absl/base",
"absl/base:core_headers",
+ "absl/functional:any_invocable",
"absl/memory",
"absl/random",
"absl/status",
@@ -1311,6 +1312,7 @@ grpc_cc_library(
"//src/core:lib/transport/timeout_encoding.cc",
"//src/core:lib/transport/transport.cc",
"//src/core:lib/transport/transport_op_string.cc",
+ "//src/core:lib/transport/batch_builder.cc",
] +
# TODO(vigneshbabu): remove these
# These headers used to be vended by this target, but they have to be
@@ -1402,6 +1404,7 @@ grpc_cc_library(
"//src/core:lib/transport/timeout_encoding.h",
"//src/core:lib/transport/transport.h",
"//src/core:lib/transport/transport_impl.h",
+ "//src/core:lib/transport/batch_builder.h",
] +
# TODO(vigneshbabu): remove these
# These headers used to be vended by this target, but they have to be
@@ -1458,6 +1461,7 @@ grpc_cc_library(
"stats",
"uri_parser",
"work_serializer",
+ "//src/core:1999",
"//src/core:activity",
"//src/core:arena",
"//src/core:arena_promise",
@@ -1486,15 +1490,19 @@ grpc_cc_library(
"//src/core:event_engine_trace",
"//src/core:event_log",
"//src/core:experiments",
+ "//src/core:for_each",
"//src/core:gpr_atm",
"//src/core:gpr_manual_constructor",
"//src/core:gpr_spinlock",
"//src/core:grpc_sockaddr",
"//src/core:http2_errors",
+ "//src/core:if",
"//src/core:init_internally",
"//src/core:iomgr_fwd",
"//src/core:iomgr_port",
"//src/core:json",
+ "//src/core:latch",
+ "//src/core:loop",
"//src/core:map",
"//src/core:match",
"//src/core:memory_quota",
@@ -1506,10 +1514,12 @@ grpc_cc_library(
"//src/core:pollset_set",
"//src/core:posix_event_engine_base_hdrs",
"//src/core:promise_status",
+ "//src/core:race",
"//src/core:ref_counted",
"//src/core:resolved_address",
"//src/core:resource_quota",
"//src/core:resource_quota_trace",
+ "//src/core:seq",
"//src/core:slice",
"//src/core:slice_buffer",
"//src/core:slice_cast",
@@ -2345,6 +2355,7 @@ grpc_cc_library(
grpc_cc_library(
name = "promise",
external_deps = [
+ "absl/functional:any_invocable",
"absl/status",
"absl/types:optional",
],
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 371d2e2c149..44c28879312 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1070,7 +1070,6 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx nonblocking_test)
add_dependencies(buildtests_cxx notification_test)
add_dependencies(buildtests_cxx num_external_connectivity_watchers_test)
- add_dependencies(buildtests_cxx observable_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx oracle_event_engine_posix_test)
endif()
@@ -1644,6 +1643,7 @@ target_link_libraries(gpr
${_gRPC_ALLTARGETS_LIBRARIES}
absl::base
absl::core_headers
+ absl::any_invocable
absl::memory
absl::random_random
absl::status
@@ -2317,6 +2317,7 @@ add_library(grpc
src/core/lib/load_balancing/lb_policy_registry.cc
src/core/lib/matchers/matchers.cc
src/core/lib/promise/activity.cc
+ src/core/lib/promise/party.cc
src/core/lib/promise/sleep.cc
src/core/lib/promise/trace.cc
src/core/lib/resolver/resolver.cc
@@ -2420,6 +2421,7 @@ add_library(grpc
src/core/lib/surface/server.cc
src/core/lib/surface/validate_metadata.cc
src/core/lib/surface/version.cc
+ src/core/lib/transport/batch_builder.cc
src/core/lib/transport/bdp_estimator.cc
src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/error_utils.cc
@@ -2511,7 +2513,6 @@ target_link_libraries(grpc
absl::flat_hash_map
absl::flat_hash_set
absl::inlined_vector
- absl::any_invocable
absl::bind_front
absl::function_ref
absl::hash
@@ -3004,6 +3005,7 @@ add_library(grpc_unsecure
src/core/lib/load_balancing/lb_policy.cc
src/core/lib/load_balancing/lb_policy_registry.cc
src/core/lib/promise/activity.cc
+ src/core/lib/promise/party.cc
src/core/lib/promise/sleep.cc
src/core/lib/promise/trace.cc
src/core/lib/resolver/resolver.cc
@@ -3076,6 +3078,7 @@ add_library(grpc_unsecure
src/core/lib/surface/server.cc
src/core/lib/surface/validate_metadata.cc
src/core/lib/surface/version.cc
+ src/core/lib/transport/batch_builder.cc
src/core/lib/transport/bdp_estimator.cc
src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/error_utils.cc
@@ -3143,7 +3146,6 @@ target_link_libraries(grpc_unsecure
absl::flat_hash_map
absl::flat_hash_set
absl::inlined_vector
- absl::any_invocable
absl::bind_front
absl::function_ref
absl::hash
@@ -4523,6 +4525,7 @@ add_library(grpc_authorization_provider
src/core/lib/load_balancing/lb_policy_registry.cc
src/core/lib/matchers/matchers.cc
src/core/lib/promise/activity.cc
+ src/core/lib/promise/party.cc
src/core/lib/promise/trace.cc
src/core/lib/resolver/resolver.cc
src/core/lib/resolver/resolver_registry.cc
@@ -4593,6 +4596,7 @@ add_library(grpc_authorization_provider
src/core/lib/surface/server.cc
src/core/lib/surface/validate_metadata.cc
src/core/lib/surface/version.cc
+ src/core/lib/transport/batch_builder.cc
src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/error_utils.cc
src/core/lib/transport/handshaker.cc
@@ -4651,7 +4655,6 @@ target_link_libraries(grpc_authorization_provider
absl::flat_hash_map
absl::flat_hash_set
absl::inlined_vector
- absl::any_invocable
absl::function_ref
absl::hash
absl::type_traits
@@ -5403,7 +5406,6 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::algorithm_container
- absl::any_invocable
absl::span
${_gRPC_BENCHMARK_LIBRARIES}
gpr
@@ -8350,7 +8352,6 @@ target_link_libraries(chunked_vector_test
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
- absl::any_invocable
absl::function_ref
absl::hash
absl::type_traits
@@ -9099,7 +9100,6 @@ target_link_libraries(common_closures_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
- absl::any_invocable
absl::statusor
gpr
)
@@ -10128,7 +10128,6 @@ target_link_libraries(endpoint_config_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
- absl::any_invocable
absl::type_traits
absl::statusor
gpr
@@ -10638,7 +10637,6 @@ target_link_libraries(exec_ctx_wakeup_scheduler_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
- absl::any_invocable
absl::hash
absl::type_traits
absl::statusor
@@ -11121,7 +11119,6 @@ target_link_libraries(flow_control_test
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
- absl::any_invocable
absl::function_ref
absl::hash
absl::type_traits
@@ -11192,7 +11189,6 @@ target_link_libraries(for_each_test
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
- absl::any_invocable
absl::function_ref
absl::hash
absl::type_traits
@@ -11277,7 +11273,6 @@ target_link_libraries(forkable_test
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
- absl::any_invocable
absl::statusor
gpr
)
@@ -11587,6 +11582,7 @@ add_executable(frame_test
src/core/lib/load_balancing/lb_policy.cc
src/core/lib/load_balancing/lb_policy_registry.cc
src/core/lib/promise/activity.cc
+ src/core/lib/promise/party.cc
src/core/lib/promise/trace.cc
src/core/lib/resolver/resolver.cc
src/core/lib/resolver/resolver_registry.cc
@@ -11634,6 +11630,7 @@ add_executable(frame_test
src/core/lib/surface/server.cc
src/core/lib/surface/validate_metadata.cc
src/core/lib/surface/version.cc
+ src/core/lib/transport/batch_builder.cc
src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/error_utils.cc
src/core/lib/transport/handshaker_registry.cc
@@ -11678,7 +11675,6 @@ target_link_libraries(frame_test
absl::flat_hash_map
absl::flat_hash_set
absl::inlined_vector
- absl::any_invocable
absl::function_ref
absl::hash
absl::type_traits
@@ -14127,7 +14123,6 @@ target_link_libraries(interceptor_list_test
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
- absl::any_invocable
absl::function_ref
absl::hash
absl::type_traits
@@ -14917,7 +14912,6 @@ target_link_libraries(map_pipe_test
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
- absl::any_invocable
absl::function_ref
absl::hash
absl::type_traits
@@ -15708,49 +15702,6 @@ target_link_libraries(num_external_connectivity_watchers_test
)
-endif()
-if(gRPC_BUILD_TESTS)
-
-add_executable(observable_test
- src/core/lib/promise/activity.cc
- test/core/promise/observable_test.cc
- third_party/googletest/googletest/src/gtest-all.cc
- third_party/googletest/googlemock/src/gmock-all.cc
-)
-target_compile_features(observable_test PUBLIC cxx_std_14)
-target_include_directories(observable_test
- PRIVATE
- ${CMAKE_CURRENT_SOURCE_DIR}
- ${CMAKE_CURRENT_SOURCE_DIR}/include
- ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
- ${_gRPC_RE2_INCLUDE_DIR}
- ${_gRPC_SSL_INCLUDE_DIR}
- ${_gRPC_UPB_GENERATED_DIR}
- ${_gRPC_UPB_GRPC_GENERATED_DIR}
- ${_gRPC_UPB_INCLUDE_DIR}
- ${_gRPC_XXHASH_INCLUDE_DIR}
- ${_gRPC_ZLIB_INCLUDE_DIR}
- third_party/googletest/googletest/include
- third_party/googletest/googletest
- third_party/googletest/googlemock/include
- third_party/googletest/googlemock
- ${_gRPC_PROTO_GENS_DIR}
-)
-
-target_link_libraries(observable_test
- ${_gRPC_BASELIB_LIBRARIES}
- ${_gRPC_PROTOBUF_LIBRARIES}
- ${_gRPC_ZLIB_LIBRARIES}
- ${_gRPC_ALLTARGETS_LIBRARIES}
- absl::flat_hash_set
- absl::hash
- absl::type_traits
- absl::statusor
- absl::utility
- gpr
-)
-
-
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
@@ -16198,7 +16149,6 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(party_test
- src/core/lib/promise/party.cc
test/core/promise/party_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@@ -16318,7 +16268,6 @@ target_link_libraries(periodic_update_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
- absl::any_invocable
absl::function_ref
absl::hash
absl::statusor
@@ -19102,7 +19051,6 @@ target_link_libraries(slice_string_helpers_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
- absl::any_invocable
absl::hash
absl::statusor
gpr
@@ -19582,7 +19530,6 @@ target_link_libraries(static_stride_scheduler_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
- absl::any_invocable
absl::span
gpr
)
@@ -20412,7 +20359,6 @@ target_link_libraries(test_core_event_engine_posix_timer_heap_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
- absl::any_invocable
absl::statusor
gpr
)
@@ -20455,7 +20401,6 @@ target_link_libraries(test_core_event_engine_posix_timer_list_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
- absl::any_invocable
absl::statusor
gpr
)
@@ -20504,7 +20449,6 @@ target_link_libraries(test_core_event_engine_slice_buffer_test
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
- absl::any_invocable
absl::hash
absl::statusor
absl::utility
@@ -20620,7 +20564,6 @@ target_link_libraries(test_core_gprpp_time_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
- absl::any_invocable
absl::statusor
gpr
)
@@ -21112,7 +21055,6 @@ target_link_libraries(thread_pool_test
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
- absl::any_invocable
absl::statusor
gpr
)
@@ -26153,7 +26095,7 @@ generate_pkgconfig(
"gpr"
"gRPC platform support library"
"${gRPC_CORE_VERSION}"
- "absl_base absl_cord absl_core_headers absl_memory absl_optional absl_random_random absl_status absl_str_format absl_strings absl_synchronization absl_time absl_variant"
+ "absl_any_invocable absl_base absl_cord absl_core_headers absl_memory absl_optional absl_random_random absl_status absl_str_format absl_strings absl_synchronization absl_time absl_variant"
""
"-lgpr"
""
diff --git a/Makefile b/Makefile
index f5710676a33..a3560aa12a7 100644
--- a/Makefile
+++ b/Makefile
@@ -1561,6 +1561,7 @@ LIBGRPC_SRC = \
src/core/lib/load_balancing/lb_policy_registry.cc \
src/core/lib/matchers/matchers.cc \
src/core/lib/promise/activity.cc \
+ src/core/lib/promise/party.cc \
src/core/lib/promise/sleep.cc \
src/core/lib/promise/trace.cc \
src/core/lib/resolver/resolver.cc \
@@ -1664,6 +1665,7 @@ LIBGRPC_SRC = \
src/core/lib/surface/server.cc \
src/core/lib/surface/validate_metadata.cc \
src/core/lib/surface/version.cc \
+ src/core/lib/transport/batch_builder.cc \
src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/connectivity_state.cc \
src/core/lib/transport/error_utils.cc \
@@ -2101,6 +2103,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/load_balancing/lb_policy.cc \
src/core/lib/load_balancing/lb_policy_registry.cc \
src/core/lib/promise/activity.cc \
+ src/core/lib/promise/party.cc \
src/core/lib/promise/sleep.cc \
src/core/lib/promise/trace.cc \
src/core/lib/resolver/resolver.cc \
@@ -2173,6 +2176,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/surface/server.cc \
src/core/lib/surface/validate_metadata.cc \
src/core/lib/surface/version.cc \
+ src/core/lib/transport/batch_builder.cc \
src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/connectivity_state.cc \
src/core/lib/transport/error_utils.cc \
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index 8533b98e880..8d556f6c62b 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -251,6 +251,7 @@ libs:
deps:
- absl/base:base
- absl/base:core_headers
+ - absl/functional:any_invocable
- absl/memory:memory
- absl/random:random
- absl/status:status
@@ -951,12 +952,13 @@ libs:
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/detail/switch.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
+ - src/core/lib/promise/for_each.h
- src/core/lib/promise/if.h
- src/core/lib/promise/interceptor_list.h
- - src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/latch.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
+ - src/core/lib/promise/party.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/promise.h
@@ -1060,6 +1062,7 @@ libs:
- src/core/lib/surface/lame_client.h
- src/core/lib/surface/server.h
- src/core/lib/surface/validate_metadata.h
+ - src/core/lib/transport/batch_builder.h
- src/core/lib/transport/bdp_estimator.h
- src/core/lib/transport/connectivity_state.h
- src/core/lib/transport/error_utils.h
@@ -1711,6 +1714,7 @@ libs:
- src/core/lib/load_balancing/lb_policy_registry.cc
- src/core/lib/matchers/matchers.cc
- src/core/lib/promise/activity.cc
+ - src/core/lib/promise/party.cc
- src/core/lib/promise/sleep.cc
- src/core/lib/promise/trace.cc
- src/core/lib/resolver/resolver.cc
@@ -1814,6 +1818,7 @@ libs:
- src/core/lib/surface/server.cc
- src/core/lib/surface/validate_metadata.cc
- src/core/lib/surface/version.cc
+ - src/core/lib/transport/batch_builder.cc
- src/core/lib/transport/bdp_estimator.cc
- src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/error_utils.cc
@@ -1865,7 +1870,6 @@ libs:
- absl/container:flat_hash_map
- absl/container:flat_hash_set
- absl/container:inlined_vector
- - absl/functional:any_invocable
- absl/functional:bind_front
- absl/functional:function_ref
- absl/hash:hash
@@ -2292,12 +2296,13 @@ libs:
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/detail/switch.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
+ - src/core/lib/promise/for_each.h
- src/core/lib/promise/if.h
- src/core/lib/promise/interceptor_list.h
- - src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/latch.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
+ - src/core/lib/promise/party.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/promise.h
@@ -2372,6 +2377,7 @@ libs:
- src/core/lib/surface/lame_client.h
- src/core/lib/surface/server.h
- src/core/lib/surface/validate_metadata.h
+ - src/core/lib/transport/batch_builder.h
- src/core/lib/transport/bdp_estimator.h
- src/core/lib/transport/connectivity_state.h
- src/core/lib/transport/error_utils.h
@@ -2665,6 +2671,7 @@ libs:
- src/core/lib/load_balancing/lb_policy.cc
- src/core/lib/load_balancing/lb_policy_registry.cc
- src/core/lib/promise/activity.cc
+ - src/core/lib/promise/party.cc
- src/core/lib/promise/sleep.cc
- src/core/lib/promise/trace.cc
- src/core/lib/resolver/resolver.cc
@@ -2737,6 +2744,7 @@ libs:
- src/core/lib/surface/server.cc
- src/core/lib/surface/validate_metadata.cc
- src/core/lib/surface/version.cc
+ - src/core/lib/transport/batch_builder.cc
- src/core/lib/transport/bdp_estimator.cc
- src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/error_utils.cc
@@ -2764,7 +2772,6 @@ libs:
- absl/container:flat_hash_map
- absl/container:flat_hash_set
- absl/container:inlined_vector
- - absl/functional:any_invocable
- absl/functional:bind_front
- absl/functional:function_ref
- absl/hash:hash
@@ -3755,11 +3762,13 @@ libs:
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/detail/switch.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
+ - src/core/lib/promise/for_each.h
- src/core/lib/promise/if.h
- src/core/lib/promise/interceptor_list.h
- - src/core/lib/promise/intra_activity_waiter.h
+ - src/core/lib/promise/latch.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
+ - src/core/lib/promise/party.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/promise.h
@@ -3833,6 +3842,7 @@ libs:
- src/core/lib/surface/lame_client.h
- src/core/lib/surface/server.h
- src/core/lib/surface/validate_metadata.h
+ - src/core/lib/transport/batch_builder.h
- src/core/lib/transport/connectivity_state.h
- src/core/lib/transport/error_utils.h
- src/core/lib/transport/handshaker.h
@@ -4010,6 +4020,7 @@ libs:
- src/core/lib/load_balancing/lb_policy_registry.cc
- src/core/lib/matchers/matchers.cc
- src/core/lib/promise/activity.cc
+ - src/core/lib/promise/party.cc
- src/core/lib/promise/trace.cc
- src/core/lib/resolver/resolver.cc
- src/core/lib/resolver/resolver_registry.cc
@@ -4080,6 +4091,7 @@ libs:
- src/core/lib/surface/server.cc
- src/core/lib/surface/validate_metadata.cc
- src/core/lib/surface/version.cc
+ - src/core/lib/transport/batch_builder.cc
- src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/error_utils.cc
- src/core/lib/transport/handshaker.cc
@@ -4099,7 +4111,6 @@ libs:
- absl/container:flat_hash_map
- absl/container:flat_hash_set
- absl/container:inlined_vector
- - absl/functional:any_invocable
- absl/functional:function_ref
- absl/hash:hash
- absl/meta:type_traits
@@ -4381,7 +4392,6 @@ targets:
- test/core/client_channel/lb_policy/static_stride_scheduler_benchmark.cc
deps:
- absl/algorithm:container
- - absl/functional:any_invocable
- absl/types:span
- benchmark
- gpr
@@ -5866,7 +5876,6 @@ targets:
- test/core/gprpp/chunked_vector_test.cc
deps:
- absl/container:flat_hash_set
- - absl/functional:any_invocable
- absl/functional:function_ref
- absl/hash:hash
- absl/meta:type_traits
@@ -6154,7 +6163,6 @@ targets:
src:
- test/core/event_engine/common_closures_test.cc
deps:
- - absl/functional:any_invocable
- absl/status:statusor
- gpr
- name: completion_queue_threading_test
@@ -6585,7 +6593,6 @@ targets:
- src/core/lib/surface/channel_stack_type.cc
- test/core/event_engine/endpoint_config_test.cc
deps:
- - absl/functional:any_invocable
- absl/meta:type_traits
- absl/status:statusor
- gpr
@@ -6861,7 +6868,6 @@ targets:
- src/core/lib/slice/slice_string_helpers.cc
- test/core/promise/exec_ctx_wakeup_scheduler_test.cc
deps:
- - absl/functional:any_invocable
- absl/hash:hash
- absl/meta:type_traits
- absl/status:statusor
@@ -7168,7 +7174,6 @@ targets:
- test/core/transport/chttp2/flow_control_test.cc
deps:
- absl/container:flat_hash_set
- - absl/functional:any_invocable
- absl/functional:function_ref
- absl/hash:hash
- absl/meta:type_traits
@@ -7215,7 +7220,6 @@ targets:
- src/core/lib/promise/for_each.h
- src/core/lib/promise/if.h
- src/core/lib/promise/interceptor_list.h
- - src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/join.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
@@ -7267,7 +7271,6 @@ targets:
- test/core/promise/for_each_test.cc
deps:
- absl/container:flat_hash_set
- - absl/functional:any_invocable
- absl/functional:function_ref
- absl/hash:hash
- absl/meta:type_traits
@@ -7301,7 +7304,6 @@ targets:
- test/core/event_engine/forkable_test.cc
deps:
- absl/container:flat_hash_set
- - absl/functional:any_invocable
- absl/status:statusor
- gpr
- name: format_request_test
@@ -7571,11 +7573,13 @@ targets:
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/detail/switch.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
+ - src/core/lib/promise/for_each.h
- src/core/lib/promise/if.h
- src/core/lib/promise/interceptor_list.h
- - src/core/lib/promise/intra_activity_waiter.h
+ - src/core/lib/promise/latch.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
+ - src/core/lib/promise/party.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/promise.h
@@ -7626,6 +7630,7 @@ targets:
- src/core/lib/surface/lame_client.h
- src/core/lib/surface/server.h
- src/core/lib/surface/validate_metadata.h
+ - src/core/lib/transport/batch_builder.h
- src/core/lib/transport/connectivity_state.h
- src/core/lib/transport/error_utils.h
- src/core/lib/transport/handshaker_factory.h
@@ -7809,6 +7814,7 @@ targets:
- src/core/lib/load_balancing/lb_policy.cc
- src/core/lib/load_balancing/lb_policy_registry.cc
- src/core/lib/promise/activity.cc
+ - src/core/lib/promise/party.cc
- src/core/lib/promise/trace.cc
- src/core/lib/resolver/resolver.cc
- src/core/lib/resolver/resolver_registry.cc
@@ -7856,6 +7862,7 @@ targets:
- src/core/lib/surface/server.cc
- src/core/lib/surface/validate_metadata.cc
- src/core/lib/surface/version.cc
+ - src/core/lib/transport/batch_builder.cc
- src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/error_utils.cc
- src/core/lib/transport/handshaker_registry.cc
@@ -7873,7 +7880,6 @@ targets:
- absl/container:flat_hash_map
- absl/container:flat_hash_set
- absl/container:inlined_vector
- - absl/functional:any_invocable
- absl/functional:function_ref
- absl/hash:hash
- absl/meta:type_traits
@@ -8973,7 +8979,6 @@ targets:
- test/core/promise/interceptor_list_test.cc
deps:
- absl/container:flat_hash_set
- - absl/functional:any_invocable
- absl/functional:function_ref
- absl/hash:hash
- absl/meta:type_traits
@@ -9187,7 +9192,6 @@ targets:
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/detail/switch.h
- - src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/join.h
- src/core/lib/promise/latch.h
- src/core/lib/promise/poll.h
@@ -9330,7 +9334,6 @@ targets:
- src/core/lib/promise/for_each.h
- src/core/lib/promise/if.h
- src/core/lib/promise/interceptor_list.h
- - src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/join.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
@@ -9383,7 +9386,6 @@ targets:
- test/core/promise/map_pipe_test.cc
deps:
- absl/container:flat_hash_set
- - absl/functional:any_invocable
- absl/functional:function_ref
- absl/hash:hash
- absl/meta:type_traits
@@ -9689,39 +9691,6 @@ targets:
- test/core/surface/num_external_connectivity_watchers_test.cc
deps:
- grpc_test_util
-- name: observable_test
- gtest: true
- build: test
- language: c++
- headers:
- - src/core/lib/gprpp/atomic_utils.h
- - src/core/lib/gprpp/orphanable.h
- - src/core/lib/gprpp/ref_counted.h
- - src/core/lib/gprpp/ref_counted_ptr.h
- - src/core/lib/promise/activity.h
- - src/core/lib/promise/context.h
- - src/core/lib/promise/detail/basic_seq.h
- - src/core/lib/promise/detail/promise_factory.h
- - src/core/lib/promise/detail/promise_like.h
- - src/core/lib/promise/detail/status.h
- - src/core/lib/promise/detail/switch.h
- - src/core/lib/promise/observable.h
- - src/core/lib/promise/poll.h
- - src/core/lib/promise/promise.h
- - src/core/lib/promise/seq.h
- - src/core/lib/promise/wait_set.h
- - test/core/promise/test_wakeup_schedulers.h
- src:
- - src/core/lib/promise/activity.cc
- - test/core/promise/observable_test.cc
- deps:
- - absl/container:flat_hash_set
- - absl/hash:hash
- - absl/meta:type_traits
- - absl/status:statusor
- - absl/utility:utility
- - gpr
- uses_polling: false
- name: oracle_event_engine_posix_test
gtest: true
build: test
@@ -9888,10 +9857,8 @@ targets:
gtest: true
build: test
language: c++
- headers:
- - src/core/lib/promise/party.h
+ headers: []
src:
- - src/core/lib/promise/party.cc
- test/core/promise/party_test.cc
deps:
- grpc_unsecure
@@ -9951,7 +9918,6 @@ targets:
- src/core/lib/slice/slice_string_helpers.cc
- test/core/resource_quota/periodic_update_test.cc
deps:
- - absl/functional:any_invocable
- absl/functional:function_ref
- absl/hash:hash
- absl/status:statusor
@@ -10130,7 +10096,6 @@ targets:
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/poll.h
- - src/core/lib/promise/promise.h
src:
- test/core/promise/promise_factory_test.cc
deps:
@@ -11113,7 +11078,6 @@ targets:
- src/core/lib/slice/slice_string_helpers.cc
- test/core/slice/slice_string_helpers_test.cc
deps:
- - absl/functional:any_invocable
- absl/hash:hash
- absl/status:statusor
- gpr
@@ -11299,7 +11263,6 @@ targets:
- src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/static_stride_scheduler.cc
- test/core/client_channel/lb_policy/static_stride_scheduler_test.cc
deps:
- - absl/functional:any_invocable
- absl/types:span
- gpr
uses_polling: false
@@ -11704,7 +11667,6 @@ targets:
- src/core/lib/gprpp/time_averaged_stats.cc
- test/core/event_engine/posix/timer_heap_test.cc
deps:
- - absl/functional:any_invocable
- absl/status:statusor
- gpr
uses_polling: false
@@ -11724,7 +11686,6 @@ targets:
- src/core/lib/gprpp/time_averaged_stats.cc
- test/core/event_engine/posix/timer_list_test.cc
deps:
- - absl/functional:any_invocable
- absl/status:statusor
- gpr
uses_polling: false
@@ -11756,7 +11717,6 @@ targets:
- test/core/event_engine/slice_buffer_test.cc
deps:
- absl/container:flat_hash_set
- - absl/functional:any_invocable
- absl/hash:hash
- absl/status:statusor
- absl/utility:utility
@@ -11791,7 +11751,6 @@ targets:
- src/core/lib/gprpp/time.cc
- test/core/gprpp/time_test.cc
deps:
- - absl/functional:any_invocable
- absl/status:statusor
- gpr
uses_polling: false
@@ -12002,7 +11961,6 @@ targets:
- test/core/event_engine/thread_pool_test.cc
deps:
- absl/container:flat_hash_set
- - absl/functional:any_invocable
- absl/status:statusor
- gpr
- name: thread_quota_test
diff --git a/config.m4 b/config.m4
index 92612e073b6..410626e3323 100644
--- a/config.m4
+++ b/config.m4
@@ -686,6 +686,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/load_balancing/lb_policy_registry.cc \
src/core/lib/matchers/matchers.cc \
src/core/lib/promise/activity.cc \
+ src/core/lib/promise/party.cc \
src/core/lib/promise/sleep.cc \
src/core/lib/promise/trace.cc \
src/core/lib/resolver/resolver.cc \
@@ -789,6 +790,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/surface/server.cc \
src/core/lib/surface/validate_metadata.cc \
src/core/lib/surface/version.cc \
+ src/core/lib/transport/batch_builder.cc \
src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/connectivity_state.cc \
src/core/lib/transport/error_utils.cc \
diff --git a/config.w32 b/config.w32
index 6da67e3c94c..d20340afe8a 100644
--- a/config.w32
+++ b/config.w32
@@ -652,6 +652,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\load_balancing\\lb_policy_registry.cc " +
"src\\core\\lib\\matchers\\matchers.cc " +
"src\\core\\lib\\promise\\activity.cc " +
+ "src\\core\\lib\\promise\\party.cc " +
"src\\core\\lib\\promise\\sleep.cc " +
"src\\core\\lib\\promise\\trace.cc " +
"src\\core\\lib\\resolver\\resolver.cc " +
@@ -755,6 +756,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\surface\\server.cc " +
"src\\core\\lib\\surface\\validate_metadata.cc " +
"src\\core\\lib\\surface\\version.cc " +
+ "src\\core\\lib\\transport\\batch_builder.cc " +
"src\\core\\lib\\transport\\bdp_estimator.cc " +
"src\\core\\lib\\transport\\connectivity_state.cc " +
"src\\core\\lib\\transport\\error_utils.cc " +
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index d03714be789..baee59d5aaa 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -921,12 +921,13 @@ Pod::Spec.new do |s|
'src/core/lib/promise/detail/status.h',
'src/core/lib/promise/detail/switch.h',
'src/core/lib/promise/exec_ctx_wakeup_scheduler.h',
+ 'src/core/lib/promise/for_each.h',
'src/core/lib/promise/if.h',
'src/core/lib/promise/interceptor_list.h',
- 'src/core/lib/promise/intra_activity_waiter.h',
'src/core/lib/promise/latch.h',
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',
+ 'src/core/lib/promise/party.h',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',
'src/core/lib/promise/promise.h',
@@ -1030,6 +1031,7 @@ Pod::Spec.new do |s|
'src/core/lib/surface/lame_client.h',
'src/core/lib/surface/server.h',
'src/core/lib/surface/validate_metadata.h',
+ 'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/connectivity_state.h',
'src/core/lib/transport/error_utils.h',
@@ -1859,12 +1861,13 @@ Pod::Spec.new do |s|
'src/core/lib/promise/detail/status.h',
'src/core/lib/promise/detail/switch.h',
'src/core/lib/promise/exec_ctx_wakeup_scheduler.h',
+ 'src/core/lib/promise/for_each.h',
'src/core/lib/promise/if.h',
'src/core/lib/promise/interceptor_list.h',
- 'src/core/lib/promise/intra_activity_waiter.h',
'src/core/lib/promise/latch.h',
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',
+ 'src/core/lib/promise/party.h',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',
'src/core/lib/promise/promise.h',
@@ -1968,6 +1971,7 @@ Pod::Spec.new do |s|
'src/core/lib/surface/lame_client.h',
'src/core/lib/surface/server.h',
'src/core/lib/surface/validate_metadata.h',
+ 'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/connectivity_state.h',
'src/core/lib/transport/error_utils.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 4ec4870a7e0..ef365e45e78 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -1496,12 +1496,14 @@ Pod::Spec.new do |s|
'src/core/lib/promise/detail/status.h',
'src/core/lib/promise/detail/switch.h',
'src/core/lib/promise/exec_ctx_wakeup_scheduler.h',
+ 'src/core/lib/promise/for_each.h',
'src/core/lib/promise/if.h',
'src/core/lib/promise/interceptor_list.h',
- 'src/core/lib/promise/intra_activity_waiter.h',
'src/core/lib/promise/latch.h',
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',
+ 'src/core/lib/promise/party.cc',
+ 'src/core/lib/promise/party.h',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',
'src/core/lib/promise/promise.h',
@@ -1708,6 +1710,8 @@ Pod::Spec.new do |s|
'src/core/lib/surface/validate_metadata.cc',
'src/core/lib/surface/validate_metadata.h',
'src/core/lib/surface/version.cc',
+ 'src/core/lib/transport/batch_builder.cc',
+ 'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.cc',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/connectivity_state.cc',
@@ -2546,12 +2550,13 @@ Pod::Spec.new do |s|
'src/core/lib/promise/detail/status.h',
'src/core/lib/promise/detail/switch.h',
'src/core/lib/promise/exec_ctx_wakeup_scheduler.h',
+ 'src/core/lib/promise/for_each.h',
'src/core/lib/promise/if.h',
'src/core/lib/promise/interceptor_list.h',
- 'src/core/lib/promise/intra_activity_waiter.h',
'src/core/lib/promise/latch.h',
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',
+ 'src/core/lib/promise/party.h',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',
'src/core/lib/promise/promise.h',
@@ -2655,6 +2660,7 @@ Pod::Spec.new do |s|
'src/core/lib/surface/lame_client.h',
'src/core/lib/surface/server.h',
'src/core/lib/surface/validate_metadata.h',
+ 'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/connectivity_state.h',
'src/core/lib/transport/error_utils.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 9426def7f7c..6facb0a1066 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -1405,12 +1405,14 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/promise/detail/status.h )
s.files += %w( src/core/lib/promise/detail/switch.h )
s.files += %w( src/core/lib/promise/exec_ctx_wakeup_scheduler.h )
+ s.files += %w( src/core/lib/promise/for_each.h )
s.files += %w( src/core/lib/promise/if.h )
s.files += %w( src/core/lib/promise/interceptor_list.h )
- s.files += %w( src/core/lib/promise/intra_activity_waiter.h )
s.files += %w( src/core/lib/promise/latch.h )
s.files += %w( src/core/lib/promise/loop.h )
s.files += %w( src/core/lib/promise/map.h )
+ s.files += %w( src/core/lib/promise/party.cc )
+ s.files += %w( src/core/lib/promise/party.h )
s.files += %w( src/core/lib/promise/pipe.h )
s.files += %w( src/core/lib/promise/poll.h )
s.files += %w( src/core/lib/promise/promise.h )
@@ -1617,6 +1619,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/surface/validate_metadata.cc )
s.files += %w( src/core/lib/surface/validate_metadata.h )
s.files += %w( src/core/lib/surface/version.cc )
+ s.files += %w( src/core/lib/transport/batch_builder.cc )
+ s.files += %w( src/core/lib/transport/batch_builder.h )
s.files += %w( src/core/lib/transport/bdp_estimator.cc )
s.files += %w( src/core/lib/transport/bdp_estimator.h )
s.files += %w( src/core/lib/transport/connectivity_state.cc )
diff --git a/grpc.gyp b/grpc.gyp
index 9acf4bee5b8..c2d2ab0fcd0 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -293,6 +293,7 @@
'dependencies': [
'absl/base:base',
'absl/base:core_headers',
+ 'absl/functional:any_invocable',
'absl/memory:memory',
'absl/random:random',
'absl/status:status',
@@ -359,7 +360,6 @@
'absl/container:flat_hash_map',
'absl/container:flat_hash_set',
'absl/container:inlined_vector',
- 'absl/functional:any_invocable',
'absl/functional:bind_front',
'absl/functional:function_ref',
'absl/hash:hash',
@@ -974,6 +974,7 @@
'src/core/lib/load_balancing/lb_policy_registry.cc',
'src/core/lib/matchers/matchers.cc',
'src/core/lib/promise/activity.cc',
+ 'src/core/lib/promise/party.cc',
'src/core/lib/promise/sleep.cc',
'src/core/lib/promise/trace.cc',
'src/core/lib/resolver/resolver.cc',
@@ -1077,6 +1078,7 @@
'src/core/lib/surface/server.cc',
'src/core/lib/surface/validate_metadata.cc',
'src/core/lib/surface/version.cc',
+ 'src/core/lib/transport/batch_builder.cc',
'src/core/lib/transport/bdp_estimator.cc',
'src/core/lib/transport/connectivity_state.cc',
'src/core/lib/transport/error_utils.cc',
@@ -1176,7 +1178,6 @@
'absl/container:flat_hash_map',
'absl/container:flat_hash_set',
'absl/container:inlined_vector',
- 'absl/functional:any_invocable',
'absl/functional:bind_front',
'absl/functional:function_ref',
'absl/hash:hash',
@@ -1456,6 +1457,7 @@
'src/core/lib/load_balancing/lb_policy.cc',
'src/core/lib/load_balancing/lb_policy_registry.cc',
'src/core/lib/promise/activity.cc',
+ 'src/core/lib/promise/party.cc',
'src/core/lib/promise/sleep.cc',
'src/core/lib/promise/trace.cc',
'src/core/lib/resolver/resolver.cc',
@@ -1528,6 +1530,7 @@
'src/core/lib/surface/server.cc',
'src/core/lib/surface/validate_metadata.cc',
'src/core/lib/surface/version.cc',
+ 'src/core/lib/transport/batch_builder.cc',
'src/core/lib/transport/bdp_estimator.cc',
'src/core/lib/transport/connectivity_state.cc',
'src/core/lib/transport/error_utils.cc',
@@ -1795,7 +1798,6 @@
'absl/container:flat_hash_map',
'absl/container:flat_hash_set',
'absl/container:inlined_vector',
- 'absl/functional:any_invocable',
'absl/functional:function_ref',
'absl/hash:hash',
'absl/meta:type_traits',
@@ -1964,6 +1966,7 @@
'src/core/lib/load_balancing/lb_policy_registry.cc',
'src/core/lib/matchers/matchers.cc',
'src/core/lib/promise/activity.cc',
+ 'src/core/lib/promise/party.cc',
'src/core/lib/promise/trace.cc',
'src/core/lib/resolver/resolver.cc',
'src/core/lib/resolver/resolver_registry.cc',
@@ -2034,6 +2037,7 @@
'src/core/lib/surface/server.cc',
'src/core/lib/surface/validate_metadata.cc',
'src/core/lib/surface/version.cc',
+ 'src/core/lib/transport/batch_builder.cc',
'src/core/lib/transport/connectivity_state.cc',
'src/core/lib/transport/error_utils.cc',
'src/core/lib/transport/handshaker.cc',
diff --git a/package.xml b/package.xml
index abbbad4d4d1..3846ae4eb4b 100644
--- a/package.xml
+++ b/package.xml
@@ -1387,12 +1387,14 @@
+
-
+
+
@@ -1599,6 +1601,8 @@
+
+
diff --git a/src/core/BUILD b/src/core/BUILD
index b67320be39f..9adb0182026 100644
--- a/src/core/BUILD
+++ b/src/core/BUILD
@@ -413,7 +413,6 @@ grpc_cc_library(
],
external_deps = [
"absl/base:core_headers",
- "absl/container:inlined_vector",
"absl/strings",
"absl/strings:str_format",
],
@@ -421,9 +420,15 @@ grpc_cc_library(
deps = [
"activity",
"arena",
+ "construct_destruct",
+ "context",
+ "promise_factory",
"promise_trace",
+ "ref_counted",
+ "//:exec_ctx",
"//:gpr",
"//:grpc_trace",
+ "//:ref_counted_ptr",
],
)
@@ -571,6 +576,7 @@ grpc_cc_library(
"lib/promise/loop.h",
],
deps = [
+ "construct_destruct",
"poll",
"promise_factory",
"//:gpr_platform",
@@ -696,6 +702,7 @@ grpc_cc_library(
external_deps = [
"absl/base:core_headers",
"absl/status",
+ "absl/strings",
"absl/strings:str_format",
"absl/types:optional",
],
@@ -708,6 +715,7 @@ grpc_cc_library(
"construct_destruct",
"context",
"no_destruct",
+ "poll",
"promise_factory",
"promise_status",
"//:gpr",
@@ -761,19 +769,6 @@ grpc_cc_library(
],
)
-grpc_cc_library(
- name = "intra_activity_waiter",
- language = "c++",
- public_hdrs = [
- "lib/promise/intra_activity_waiter.h",
- ],
- deps = [
- "activity",
- "poll",
- "//:gpr_platform",
- ],
-)
-
grpc_cc_library(
name = "latch",
external_deps = ["absl/strings"],
@@ -783,7 +778,6 @@ grpc_cc_library(
],
deps = [
"activity",
- "intra_activity_waiter",
"poll",
"promise_trace",
"//:gpr",
@@ -791,25 +785,6 @@ grpc_cc_library(
],
)
-grpc_cc_library(
- name = "observable",
- external_deps = [
- "absl/base:core_headers",
- "absl/types:optional",
- ],
- language = "c++",
- public_hdrs = [
- "lib/promise/observable.h",
- ],
- deps = [
- "activity",
- "poll",
- "promise_like",
- "wait_set",
- "//:gpr",
- ],
-)
-
grpc_cc_library(
name = "interceptor_list",
hdrs = [
@@ -839,7 +814,6 @@ grpc_cc_library(
"lib/promise/pipe.h",
],
external_deps = [
- "absl/base:core_headers",
"absl/strings",
"absl/types:optional",
"absl/types:variant",
@@ -851,7 +825,6 @@ grpc_cc_library(
"context",
"if",
"interceptor_list",
- "intra_activity_waiter",
"map",
"poll",
"promise_trace",
@@ -3502,33 +3475,38 @@ grpc_cc_library(
"ext/filters/message_size/message_size_filter.h",
],
external_deps = [
- "absl/status",
+ "absl/status:statusor",
"absl/strings",
"absl/strings:str_format",
"absl/types:optional",
],
language = "c++",
deps = [
+ "activity",
+ "arena",
+ "arena_promise",
"channel_args",
"channel_fwd",
"channel_init",
"channel_stack_type",
- "closure",
- "error",
+ "context",
"grpc_service_config",
"json",
"json_args",
"json_object_loader",
+ "latch",
+ "poll",
+ "race",
"service_config_parser",
+ "slice",
"slice_buffer",
- "status_helper",
"validation_errors",
"//:channel_stack_builder",
"//:config",
- "//:debug_location",
"//:gpr",
"//:grpc_base",
"//:grpc_public_hdrs",
+ "//:grpc_trace",
],
)
diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h
index bfc35c27b4d..957a4748c3b 100644
--- a/src/core/ext/filters/client_channel/client_channel.h
+++ b/src/core/ext/filters/client_channel/client_channel.h
@@ -363,7 +363,7 @@ class ClientChannel {
// TODO(roth): As part of simplifying cancellation in the filter stack,
// this should no longer need to be ref-counted.
class ClientChannel::LoadBalancedCall
- : public InternallyRefCounted {
+ : public InternallyRefCounted {
public:
LoadBalancedCall(
ClientChannel* chand, grpc_call_context_element* call_context,
diff --git a/src/core/ext/filters/client_channel/retry_filter.cc b/src/core/ext/filters/client_channel/retry_filter.cc
index c52b6300208..a92a1837bbf 100644
--- a/src/core/ext/filters/client_channel/retry_filter.cc
+++ b/src/core/ext/filters/client_channel/retry_filter.cc
@@ -272,7 +272,7 @@ class RetryFilter::CallData {
// We allocate one struct on the arena for each attempt at starting a
// batch on a given LB call.
class BatchData
- : public RefCounted {
+ : public RefCounted {
public:
BatchData(RefCountedPtr call_attempt, int refcount,
bool set_on_complete);
@@ -649,7 +649,7 @@ class RetryFilter::CallData {
// on_call_stack_destruction closure from the surface.
class RetryFilter::CallData::CallStackDestructionBarrier
: public RefCounted {
+ UnrefCallDtor> {
public:
CallStackDestructionBarrier() {}
diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc
index 58f9a709024..89ff0356cce 100644
--- a/src/core/ext/filters/http/client/http_client_filter.cc
+++ b/src/core/ext/filters/http/client/http_client_filter.cc
@@ -133,13 +133,13 @@ ArenaPromise HttpClientFilter::MakeCallPromise(
return std::move(md);
});
- return Race(Map(next_promise_factory(std::move(call_args)),
+ return Race(initial_metadata_err->Wait(),
+ Map(next_promise_factory(std::move(call_args)),
[](ServerMetadataHandle md) -> ServerMetadataHandle {
auto r = CheckServerMetadata(md.get());
if (!r.ok()) return ServerMetadataFromStatus(r);
return md;
- }),
- initial_metadata_err->Wait());
+ }));
}
HttpClientFilter::HttpClientFilter(HttpSchemeMetadata::ValueType scheme,
diff --git a/src/core/ext/filters/http/message_compress/compression_filter.cc b/src/core/ext/filters/http/message_compress/compression_filter.cc
index a719f5b9132..aea371dbe6d 100644
--- a/src/core/ext/filters/http/message_compress/compression_filter.cc
+++ b/src/core/ext/filters/http/message_compress/compression_filter.cc
@@ -252,7 +252,7 @@ ArenaPromise ClientCompressionFilter::MakeCallPromise(
return CompressMessage(std::move(message), compression_algorithm);
});
auto* decompress_args = GetContext()->New(
- DecompressArgs{GRPC_COMPRESS_NONE, absl::nullopt});
+ DecompressArgs{GRPC_COMPRESS_ALGORITHMS_COUNT, absl::nullopt});
auto* decompress_err =
GetContext()->New>();
call_args.server_initial_metadata->InterceptAndMap(
@@ -273,8 +273,8 @@ ArenaPromise ClientCompressionFilter::MakeCallPromise(
return std::move(*r);
});
// Run the next filter, and race it with getting an error from decompression.
- return Race(next_promise_factory(std::move(call_args)),
- decompress_err->Wait());
+ return Race(decompress_err->Wait(),
+ next_promise_factory(std::move(call_args)));
}
ArenaPromise ServerCompressionFilter::MakeCallPromise(
@@ -288,7 +288,8 @@ ArenaPromise ServerCompressionFilter::MakeCallPromise(
this](MessageHandle message) -> absl::optional {
auto r = DecompressMessage(std::move(message), decompress_args);
if (grpc_call_trace.enabled()) {
- gpr_log(GPR_DEBUG, "DecompressMessage returned %s",
+ gpr_log(GPR_DEBUG, "%s[compression] DecompressMessage returned %s",
+ Activity::current()->DebugTag().c_str(),
r.status().ToString().c_str());
}
if (!r.ok()) {
@@ -314,13 +315,9 @@ ArenaPromise ServerCompressionFilter::MakeCallPromise(
this](MessageHandle message) -> absl::optional {
return CompressMessage(std::move(message), *compression_algorithm);
});
- // Concurrently:
- // - call the next filter
- // - decompress incoming messages
- // - wait for initial metadata to be sent, and then commence compression of
- // outgoing messages
- return Race(next_promise_factory(std::move(call_args)),
- decompress_err->Wait());
+ // Run the next filter, and race it with getting an error from decompression.
+ return Race(decompress_err->Wait(),
+ next_promise_factory(std::move(call_args)));
}
} // namespace grpc_core
diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc
index 33ff178e5a9..6143239c0c0 100644
--- a/src/core/ext/filters/message_size/message_size_filter.cc
+++ b/src/core/ext/filters/message_size/message_size_filter.cc
@@ -18,10 +18,13 @@
#include "src/core/ext/filters/message_size/message_size_filter.h"
+#include
+
+#include
#include
-#include
+#include
+#include
-#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include
@@ -32,21 +35,22 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
-#include "src/core/lib/gprpp/debug_location.h"
-#include "src/core/lib/gprpp/status_helper.h"
-#include "src/core/lib/iomgr/call_combiner.h"
-#include "src/core/lib/iomgr/closure.h"
-#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/promise/activity.h"
+#include "src/core/lib/promise/context.h"
+#include "src/core/lib/promise/latch.h"
+#include "src/core/lib/promise/poll.h"
+#include "src/core/lib/promise/race.h"
+#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/service_config/service_config_call_data.h"
+#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
+#include "src/core/lib/surface/call_trace.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
+#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
-static void recv_message_ready(void* user_data, grpc_error_handle error);
-static void recv_trailing_metadata_ready(void* user_data,
- grpc_error_handle error);
-
namespace grpc_core {
//
@@ -124,251 +128,164 @@ size_t MessageSizeParser::ParserIndex() {
parser_name());
}
-} // namespace grpc_core
-
-namespace {
-struct channel_data {
- grpc_core::MessageSizeParsedConfig limits;
- const size_t service_config_parser_index{
- grpc_core::MessageSizeParser::ParserIndex()};
-};
+//
+// MessageSizeFilter
+//
-struct call_data {
- call_data(grpc_call_element* elem, const channel_data& chand,
- const grpc_call_element_args& args)
- : call_combiner(args.call_combiner), limits(chand.limits) {
- GRPC_CLOSURE_INIT(&recv_message_ready, ::recv_message_ready, elem,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready,
- ::recv_trailing_metadata_ready, elem,
- grpc_schedule_on_exec_ctx);
- // Get max sizes from channel data, then merge in per-method config values.
- // Note: Per-method config is only available on the client, so we
- // apply the max request size to the send limit and the max response
- // size to the receive limit.
- const grpc_core::MessageSizeParsedConfig* config_from_call_context =
- grpc_core::MessageSizeParsedConfig::GetFromCallContext(
- args.context, chand.service_config_parser_index);
- if (config_from_call_context != nullptr) {
- absl::optional max_send_size = limits.max_send_size();
- absl::optional max_recv_size = limits.max_recv_size();
- if (config_from_call_context->max_send_size().has_value() &&
- (!max_send_size.has_value() ||
- *config_from_call_context->max_send_size() < *max_send_size)) {
- max_send_size = *config_from_call_context->max_send_size();
+const grpc_channel_filter ClientMessageSizeFilter::kFilter =
+ MakePromiseBasedFilter("message_size");
+const grpc_channel_filter ServerMessageSizeFilter::kFilter =
+ MakePromiseBasedFilter("message_size");
+
+class MessageSizeFilter::CallBuilder {
+ private:
+ auto Interceptor(uint32_t max_length, bool is_send) {
+ return [max_length, is_send,
+ err = err_](MessageHandle msg) -> absl::optional {
+ if (grpc_call_trace.enabled()) {
+ gpr_log(GPR_INFO, "%s[message_size] %s len:%" PRIdPTR " max:%d",
+ Activity::current()->DebugTag().c_str(),
+ is_send ? "send" : "recv", msg->payload()->Length(),
+ max_length);
}
- if (config_from_call_context->max_recv_size().has_value() &&
- (!max_recv_size.has_value() ||
- *config_from_call_context->max_recv_size() < *max_recv_size)) {
- max_recv_size = *config_from_call_context->max_recv_size();
+ if (msg->payload()->Length() > max_length) {
+ if (err->is_set()) return std::move(msg);
+ auto r = GetContext()->MakePooled(
+ GetContext());
+ r->Set(GrpcStatusMetadata(), GRPC_STATUS_RESOURCE_EXHAUSTED);
+ r->Set(GrpcMessageMetadata(),
+ Slice::FromCopiedString(
+ absl::StrFormat("%s message larger than max (%u vs. %d)",
+ is_send ? "Sent" : "Received",
+ msg->payload()->Length(), max_length)));
+ err->Set(std::move(r));
+ return absl::nullopt;
}
- limits = grpc_core::MessageSizeParsedConfig(max_send_size, max_recv_size);
- }
+ return std::move(msg);
+ };
}
- ~call_data() {}
-
- grpc_core::CallCombiner* call_combiner;
- grpc_core::MessageSizeParsedConfig limits;
- // Receive closures are chained: we inject this closure as the
- // recv_message_ready up-call on transport_stream_op, and remember to
- // call our next_recv_message_ready member after handling it.
- grpc_closure recv_message_ready;
- grpc_closure recv_trailing_metadata_ready;
- // The error caused by a message that is too large, or absl::OkStatus()
- grpc_error_handle error;
- // Used by recv_message_ready.
- absl::optional* recv_message = nullptr;
- // Original recv_message_ready callback, invoked after our own.
- grpc_closure* next_recv_message_ready = nullptr;
- // Original recv_trailing_metadata callback, invoked after our own.
- grpc_closure* original_recv_trailing_metadata_ready;
- bool seen_recv_trailing_metadata = false;
- grpc_error_handle recv_trailing_metadata_error;
-};
-
-} // namespace
+ public:
+ explicit CallBuilder(const MessageSizeParsedConfig& limits)
+ : limits_(limits) {}
-// Callback invoked when we receive a message. Here we check the max
-// receive message size.
-static void recv_message_ready(void* user_data, grpc_error_handle error) {
- grpc_call_element* elem = static_cast(user_data);
- call_data* calld = static_cast(elem->call_data);
- if (calld->recv_message->has_value() &&
- calld->limits.max_recv_size().has_value() &&
- (*calld->recv_message)->Length() >
- static_cast(*calld->limits.max_recv_size())) {
- grpc_error_handle new_error = grpc_error_set_int(
- GRPC_ERROR_CREATE(absl::StrFormat(
- "Received message larger than max (%u vs. %d)",
- (*calld->recv_message)->Length(), *calld->limits.max_recv_size())),
- grpc_core::StatusIntProperty::kRpcStatus,
- GRPC_STATUS_RESOURCE_EXHAUSTED);
- error = grpc_error_add_child(error, new_error);
- calld->error = error;
+ template
+ void AddSend(T* pipe_end) {
+ if (!limits_.max_send_size().has_value()) return;
+ pipe_end->InterceptAndMap(Interceptor(*limits_.max_send_size(), true));
}
- // Invoke the next callback.
- grpc_closure* closure = calld->next_recv_message_ready;
- calld->next_recv_message_ready = nullptr;
- if (calld->seen_recv_trailing_metadata) {
- // We might potentially see another RECV_MESSAGE op. In that case, we do not
- // want to run the recv_trailing_metadata_ready closure again. The newer
- // RECV_MESSAGE op cannot cause any errors since the transport has already
- // invoked the recv_trailing_metadata_ready closure and all further
- // RECV_MESSAGE ops will get null payloads.
- calld->seen_recv_trailing_metadata = false;
- GRPC_CALL_COMBINER_START(calld->call_combiner,
- &calld->recv_trailing_metadata_ready,
- calld->recv_trailing_metadata_error,
- "continue recv_trailing_metadata_ready");
+ template
+ void AddRecv(T* pipe_end) {
+ if (!limits_.max_recv_size().has_value()) return;
+ pipe_end->InterceptAndMap(Interceptor(*limits_.max_recv_size(), false));
}
- grpc_core::Closure::Run(DEBUG_LOCATION, closure, error);
-}
-// Callback invoked on completion of recv_trailing_metadata
-// Notifies the recv_trailing_metadata batch of any message size failures
-static void recv_trailing_metadata_ready(void* user_data,
- grpc_error_handle error) {
- grpc_call_element* elem = static_cast(user_data);
- call_data* calld = static_cast(elem->call_data);
- if (calld->next_recv_message_ready != nullptr) {
- calld->seen_recv_trailing_metadata = true;
- calld->recv_trailing_metadata_error = error;
- GRPC_CALL_COMBINER_STOP(calld->call_combiner,
- "deferring recv_trailing_metadata_ready until "
- "after recv_message_ready");
- return;
+ ArenaPromise Run(
+ CallArgs call_args, NextPromiseFactory next_promise_factory) {
+ return Race(err_->Wait(), next_promise_factory(std::move(call_args)));
}
- error = grpc_error_add_child(error, calld->error);
- // Invoke the next callback.
- grpc_core::Closure::Run(DEBUG_LOCATION,
- calld->original_recv_trailing_metadata_ready, error);
-}
-// Start transport stream op.
-static void message_size_start_transport_stream_op_batch(
- grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
- call_data* calld = static_cast(elem->call_data);
- // Check max send message size.
- if (op->send_message && calld->limits.max_send_size().has_value() &&
- op->payload->send_message.send_message->Length() >
- static_cast(*calld->limits.max_send_size())) {
- grpc_transport_stream_op_batch_finish_with_failure(
- op,
- grpc_error_set_int(GRPC_ERROR_CREATE(absl::StrFormat(
- "Sent message larger than max (%u vs. %d)",
- op->payload->send_message.send_message->Length(),
- *calld->limits.max_send_size())),
- grpc_core::StatusIntProperty::kRpcStatus,
- GRPC_STATUS_RESOURCE_EXHAUSTED),
- calld->call_combiner);
- return;
- }
- // Inject callback for receiving a message.
- if (op->recv_message) {
- calld->next_recv_message_ready =
- op->payload->recv_message.recv_message_ready;
- calld->recv_message = op->payload->recv_message.recv_message;
- op->payload->recv_message.recv_message_ready = &calld->recv_message_ready;
- }
- // Inject callback for receiving trailing metadata.
- if (op->recv_trailing_metadata) {
- calld->original_recv_trailing_metadata_ready =
- op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
- op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
- &calld->recv_trailing_metadata_ready;
- }
- // Chain to the next filter.
- grpc_call_next_op(elem, op);
-}
+ private:
+ Latch* const err_ =
+ GetContext()->ManagedNew>();
+ MessageSizeParsedConfig limits_;
+};
-// Constructor for call_data.
-static grpc_error_handle message_size_init_call_elem(
- grpc_call_element* elem, const grpc_call_element_args* args) {
- channel_data* chand = static_cast(elem->channel_data);
- new (elem->call_data) call_data(elem, *chand, *args);
- return absl::OkStatus();
+absl::StatusOr ClientMessageSizeFilter::Create(
+ const ChannelArgs& args, ChannelFilter::Args) {
+ return ClientMessageSizeFilter(args);
}
-// Destructor for call_data.
-static void message_size_destroy_call_elem(
- grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
- grpc_closure* /*ignored*/) {
- call_data* calld = static_cast(elem->call_data);
- calld->~call_data();
+absl::StatusOr ServerMessageSizeFilter::Create(
+ const ChannelArgs& args, ChannelFilter::Args) {
+ return ServerMessageSizeFilter(args);
}
-// Constructor for channel_data.
-static grpc_error_handle message_size_init_channel_elem(
- grpc_channel_element* elem, grpc_channel_element_args* args) {
- GPR_ASSERT(!args->is_last);
- channel_data* chand = static_cast(elem->channel_data);
- new (chand) channel_data();
- chand->limits = grpc_core::MessageSizeParsedConfig::GetFromChannelArgs(
- args->channel_args);
- return absl::OkStatus();
-}
+ArenaPromise ClientMessageSizeFilter::MakeCallPromise(
+ CallArgs call_args, NextPromiseFactory next_promise_factory) {
+ // Get max sizes from channel data, then merge in per-method config values.
+ // Note: Per-method config is only available on the client, so we
+ // apply the max request size to the send limit and the max response
+ // size to the receive limit.
+ MessageSizeParsedConfig limits = this->limits();
+ const MessageSizeParsedConfig* config_from_call_context =
+ MessageSizeParsedConfig::GetFromCallContext(
+ GetContext(),
+ service_config_parser_index_);
+ if (config_from_call_context != nullptr) {
+ absl::optional max_send_size = limits.max_send_size();
+ absl::optional max_recv_size = limits.max_recv_size();
+ if (config_from_call_context->max_send_size().has_value() &&
+ (!max_send_size.has_value() ||
+ *config_from_call_context->max_send_size() < *max_send_size)) {
+ max_send_size = *config_from_call_context->max_send_size();
+ }
+ if (config_from_call_context->max_recv_size().has_value() &&
+ (!max_recv_size.has_value() ||
+ *config_from_call_context->max_recv_size() < *max_recv_size)) {
+ max_recv_size = *config_from_call_context->max_recv_size();
+ }
+ limits = MessageSizeParsedConfig(max_send_size, max_recv_size);
+ }
-// Destructor for channel_data.
-static void message_size_destroy_channel_elem(grpc_channel_element* elem) {
- channel_data* chand = static_cast(elem->channel_data);
- chand->~channel_data();
+ CallBuilder b(limits);
+ b.AddSend(call_args.client_to_server_messages);
+ b.AddRecv(call_args.server_to_client_messages);
+ return b.Run(std::move(call_args), std::move(next_promise_factory));
}
-const grpc_channel_filter grpc_message_size_filter = {
- message_size_start_transport_stream_op_batch,
- nullptr,
- grpc_channel_next_op,
- sizeof(call_data),
- message_size_init_call_elem,
- grpc_call_stack_ignore_set_pollset_or_pollset_set,
- message_size_destroy_call_elem,
- sizeof(channel_data),
- message_size_init_channel_elem,
- grpc_channel_stack_no_post_init,
- message_size_destroy_channel_elem,
- grpc_channel_next_get_info,
- "message_size"};
+ArenaPromise ServerMessageSizeFilter::MakeCallPromise(
+ CallArgs call_args, NextPromiseFactory next_promise_factory) {
+ CallBuilder b(limits());
+ b.AddSend(call_args.server_to_client_messages);
+ b.AddRecv(call_args.client_to_server_messages);
+ return b.Run(std::move(call_args), std::move(next_promise_factory));
+}
+namespace {
// Used for GRPC_CLIENT_SUBCHANNEL
-static bool maybe_add_message_size_filter_subchannel(
- grpc_core::ChannelStackBuilder* builder) {
+bool MaybeAddMessageSizeFilterToSubchannel(ChannelStackBuilder* builder) {
if (builder->channel_args().WantMinimalStack()) {
return true;
}
- builder->PrependFilter(&grpc_message_size_filter);
+ builder->PrependFilter(&ClientMessageSizeFilter::kFilter);
return true;
}
-// Used for GRPC_CLIENT_DIRECT_CHANNEL and GRPC_SERVER_CHANNEL. Adds the filter
-// only if message size limits or service config is specified.
-static bool maybe_add_message_size_filter(
- grpc_core::ChannelStackBuilder* builder) {
- auto channel_args = builder->channel_args();
- if (channel_args.WantMinimalStack()) {
+// Used for GRPC_CLIENT_DIRECT_CHANNEL and GRPC_SERVER_CHANNEL. Adds the
+// filter only if message size limits or service config is specified.
+auto MaybeAddMessageSizeFilter(const grpc_channel_filter* filter) {
+ return [filter](ChannelStackBuilder* builder) {
+ auto channel_args = builder->channel_args();
+ if (channel_args.WantMinimalStack()) {
+ return true;
+ }
+ MessageSizeParsedConfig limits =
+ MessageSizeParsedConfig::GetFromChannelArgs(channel_args);
+ const bool enable =
+ limits.max_send_size().has_value() ||
+ limits.max_recv_size().has_value() ||
+ channel_args.GetString(GRPC_ARG_SERVICE_CONFIG).has_value();
+ if (enable) builder->PrependFilter(filter);
return true;
- }
- grpc_core::MessageSizeParsedConfig limits =
- grpc_core::MessageSizeParsedConfig::GetFromChannelArgs(channel_args);
- const bool enable =
- limits.max_send_size().has_value() ||
- limits.max_recv_size().has_value() ||
- channel_args.GetString(GRPC_ARG_SERVICE_CONFIG).has_value();
- if (enable) builder->PrependFilter(&grpc_message_size_filter);
- return true;
+ };
}
-namespace grpc_core {
+} // namespace
void RegisterMessageSizeFilter(CoreConfiguration::Builder* builder) {
MessageSizeParser::Register(builder);
- builder->channel_init()->RegisterStage(
- GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- maybe_add_message_size_filter_subchannel);
- builder->channel_init()->RegisterStage(GRPC_CLIENT_DIRECT_CHANNEL,
- GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- maybe_add_message_size_filter);
- builder->channel_init()->RegisterStage(GRPC_SERVER_CHANNEL,
+ builder->channel_init()->RegisterStage(GRPC_CLIENT_SUBCHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- maybe_add_message_size_filter);
+ MaybeAddMessageSizeFilterToSubchannel);
+ builder->channel_init()->RegisterStage(
+ GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ MaybeAddMessageSizeFilter(&ClientMessageSizeFilter::kFilter));
+ builder->channel_init()->RegisterStage(
+ GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ MaybeAddMessageSizeFilter(&ServerMessageSizeFilter::kFilter));
}
} // namespace grpc_core
diff --git a/src/core/ext/filters/message_size/message_size_filter.h b/src/core/ext/filters/message_size/message_size_filter.h
index e47485a8950..75135a1b75e 100644
--- a/src/core/ext/filters/message_size/message_size_filter.h
+++ b/src/core/ext/filters/message_size/message_size_filter.h
@@ -24,21 +24,22 @@
#include
+#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
-#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
+#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
+#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/service_config/service_config_parser.h"
-
-extern const grpc_channel_filter grpc_message_size_filter;
+#include "src/core/lib/transport/transport.h"
namespace grpc_core {
@@ -85,6 +86,50 @@ class MessageSizeParser : public ServiceConfigParser::Parser {
absl::optional GetMaxRecvSizeFromChannelArgs(const ChannelArgs& args);
absl::optional GetMaxSendSizeFromChannelArgs(const ChannelArgs& args);
+class MessageSizeFilter : public ChannelFilter {
+ protected:
+ explicit MessageSizeFilter(const ChannelArgs& args)
+ : limits_(MessageSizeParsedConfig::GetFromChannelArgs(args)) {}
+
+ class CallBuilder;
+
+ const MessageSizeParsedConfig& limits() const { return limits_; }
+
+ private:
+ MessageSizeParsedConfig limits_;
+};
+
+class ServerMessageSizeFilter final : public MessageSizeFilter {
+ public:
+ static const grpc_channel_filter kFilter;
+
+ static absl::StatusOr Create(
+ const ChannelArgs& args, ChannelFilter::Args filter_args);
+
+ // Construct a promise for one call.
+ ArenaPromise MakeCallPromise(
+ CallArgs call_args, NextPromiseFactory next_promise_factory) override;
+
+ private:
+ using MessageSizeFilter::MessageSizeFilter;
+};
+
+class ClientMessageSizeFilter final : public MessageSizeFilter {
+ public:
+ static const grpc_channel_filter kFilter;
+
+ static absl::StatusOr Create(
+ const ChannelArgs& args, ChannelFilter::Args filter_args);
+
+ // Construct a promise for one call.
+ ArenaPromise MakeCallPromise(
+ CallArgs call_args, NextPromiseFactory next_promise_factory) override;
+
+ private:
+ const size_t service_config_parser_index_{MessageSizeParser::ParserIndex()};
+ using MessageSizeFilter::MessageSizeFilter;
+};
+
} // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_FILTERS_MESSAGE_SIZE_MESSAGE_SIZE_FILTER_H
diff --git a/src/core/ext/transport/binder/transport/binder_transport.cc b/src/core/ext/transport/binder/transport/binder_transport.cc
index 38ccbc6ef8f..0420e96b184 100644
--- a/src/core/ext/transport/binder/transport/binder_transport.cc
+++ b/src/core/ext/transport/binder/transport/binder_transport.cc
@@ -694,6 +694,7 @@ static grpc_endpoint* get_endpoint(grpc_transport*) {
// See grpc_transport_vtable declaration for meaning of each field
static const grpc_transport_vtable vtable = {sizeof(grpc_binder_stream),
+ false,
"binder",
init_stream,
nullptr,
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index df7adaf667f..0b55f73f67c 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -1204,7 +1204,8 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
grpc_chttp2_stream* s,
grpc_closure** pclosure,
grpc_error_handle error,
- const char* desc) {
+ const char* desc,
+ grpc_core::DebugLocation whence) {
grpc_closure* closure = *pclosure;
*pclosure = nullptr;
if (closure == nullptr) {
@@ -1215,14 +1216,14 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
gpr_log(
GPR_INFO,
"complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
- "write_state=%s",
+ "write_state=%s whence=%s:%d",
t, closure,
static_cast(closure->next_data.scratch /
CLOSURE_BARRIER_FIRST_REF_BIT),
static_cast(closure->next_data.scratch %
CLOSURE_BARRIER_FIRST_REF_BIT),
desc, grpc_core::StatusToString(error).c_str(),
- write_state_name(t->write_state));
+ write_state_name(t->write_state), whence.file(), whence.line());
}
auto* tracer = CallTracerIfEnabled(s);
@@ -3073,6 +3074,7 @@ static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) {
}
static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
+ false,
"chttp2",
init_stream,
nullptr,
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 8ad1a17a101..a89b003fcdb 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -709,7 +709,8 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
grpc_chttp2_stream* s,
grpc_closure** pclosure,
grpc_error_handle error,
- const char* desc);
+ const char* desc,
+ grpc_core::DebugLocation whence = {});
#define GRPC_HEADER_SIZE_IN_BYTES 5
#define MAX_SIZE_T (~(size_t)0)
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc
index 77cf88b17de..e1dfbb33b71 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.cc
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc
@@ -1462,6 +1462,7 @@ static void perform_op(grpc_transport* /*gt*/, grpc_transport_op* /*op*/) {}
static const grpc_transport_vtable grpc_cronet_vtable = {
sizeof(stream_obj),
+ false,
"cronet_http",
init_stream,
nullptr,
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index dc6b4804f0a..b4185e454c0 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -408,7 +408,7 @@ void complete_if_batch_end_locked(inproc_stream* s, grpc_error_handle error,
int is_rtm = static_cast(op == s->recv_trailing_md_op);
if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
- INPROC_LOG(GPR_INFO, "%s %p %p %s", msg, s, op,
+ INPROC_LOG(GPR_INFO, "%s %p %p %p %s", msg, s, op, op->on_complete,
grpc_core::StatusToString(error).c_str());
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, error);
}
@@ -697,8 +697,9 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
s->to_read_initial_md_filled = false;
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
- s->recv_initial_md_op->payload->recv_initial_metadata
- .recv_initial_metadata_ready,
+ std::exchange(s->recv_initial_md_op->payload->recv_initial_metadata
+ .recv_initial_metadata_ready,
+ nullptr),
absl::OkStatus());
complete_if_batch_end_locked(
s, absl::OkStatus(), s->recv_initial_md_op,
@@ -766,6 +767,8 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
nullptr);
s->to_read_trailing_md.Clear();
s->to_read_trailing_md_filled = false;
+ s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata->Set(grpc_core::GrpcStatusFromWire(), true);
// We should schedule the recv_trailing_md_op completion if
// 1. this stream is the client-side
@@ -906,8 +909,6 @@ bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error) {
return ret;
}
-void do_nothing(void* /*arg*/, grpc_error_handle /*error*/) {}
-
void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
grpc_transport_stream_op_batch* op) {
INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
@@ -933,8 +934,8 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
// completed). This can go away once we move to a new C++ closure API
// that provides the ability to create a barrier closure.
if (on_complete == nullptr) {
- on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
- nullptr, grpc_schedule_on_exec_ctx);
+ on_complete = op->on_complete =
+ grpc_core::NewClosure([](grpc_error_handle) {});
}
if (op->cancel_stream) {
@@ -1177,13 +1178,18 @@ void set_pollset_set(grpc_transport* /*gt*/, grpc_stream* /*gs*/,
grpc_endpoint* get_endpoint(grpc_transport* /*t*/) { return nullptr; }
-const grpc_transport_vtable inproc_vtable = {
- sizeof(inproc_stream), "inproc",
- init_stream, nullptr,
- set_pollset, set_pollset_set,
- perform_stream_op, perform_transport_op,
- destroy_stream, destroy_transport,
- get_endpoint};
+const grpc_transport_vtable inproc_vtable = {sizeof(inproc_stream),
+ true,
+ "inproc",
+ init_stream,
+ nullptr,
+ set_pollset,
+ set_pollset_set,
+ perform_stream_op,
+ perform_transport_op,
+ destroy_stream,
+ destroy_transport,
+ get_endpoint};
//******************************************************************************
// Main inproc transport functions
diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc
index 40582f99e44..fa66027557f 100644
--- a/src/core/lib/channel/connected_channel.cc
+++ b/src/core/lib/channel/connected_channel.cc
@@ -21,21 +21,16 @@
#include "src/core/lib/channel/connected_channel.h"
#include
-#include
-#include
#include
#include
#include
#include
+#include
#include
-#include
-#include "absl/base/thread_annotations.h"
-#include "absl/container/inlined_vector.h"
#include "absl/status/status.h"
-#include "absl/strings/str_cat.h"
-#include "absl/strings/str_join.h"
+#include "absl/status/statusor.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
@@ -47,39 +42,48 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/channel/context.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gprpp/debug_location.h"
-#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/orphanable.h"
-#include "src/core/lib/gprpp/status_helper.h"
-#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/context.h"
+#include "src/core/lib/promise/detail/basic_join.h"
#include "src/core/lib/promise/detail/basic_seq.h"
+#include "src/core/lib/promise/for_each.h"
+#include "src/core/lib/promise/if.h"
+#include "src/core/lib/promise/latch.h"
+#include "src/core/lib/promise/loop.h"
+#include "src/core/lib/promise/map.h"
+#include "src/core/lib/promise/party.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/poll.h"
+#include "src/core/lib/promise/promise.h"
+#include "src/core/lib/promise/race.h"
+#include "src/core/lib/promise/seq.h"
+#include "src/core/lib/promise/try_join.h"
+#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/call_trace.h"
#include "src/core/lib/surface/channel_stack_type.h"
+#include "src/core/lib/transport/batch_builder.h"
+#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/transport/transport_fwd.h"
#include "src/core/lib/transport/transport_impl.h"
-#define MAX_BUFFER_LENGTH 8192
-
typedef struct connected_channel_channel_data {
grpc_transport* transport;
} channel_data;
@@ -252,10 +256,24 @@ namespace {
defined(GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL)
class ConnectedChannelStream : public Orphanable {
public:
+ explicit ConnectedChannelStream(grpc_transport* transport)
+ : transport_(transport), stream_(nullptr, StreamDeleter(this)) {
+ GRPC_STREAM_REF_INIT(
+ &stream_refcount_, 1,
+ [](void* p, grpc_error_handle) {
+ static_cast(p)->BeginDestroy();
+ },
+ this, "ConnectedChannelStream");
+ }
+
grpc_transport* transport() { return transport_; }
grpc_closure* stream_destroyed_closure() { return &stream_destroyed_; }
- void IncrementRefCount(const char* reason) {
+ BatchBuilder::Target batch_target() {
+ return BatchBuilder::Target{transport_, stream_.get(), &stream_refcount_};
+ }
+
+ void IncrementRefCount(const char* reason = "smartptr") {
#ifndef NDEBUG
grpc_stream_ref(&stream_refcount_, reason);
#else
@@ -264,7 +282,7 @@ class ConnectedChannelStream : public Orphanable {
#endif
}
- void Unref(const char* reason) {
+ void Unref(const char* reason = "smartptr") {
#ifndef NDEBUG
grpc_stream_unref(&stream_refcount_, reason);
#else
@@ -273,235 +291,48 @@ class ConnectedChannelStream : public Orphanable {
#endif
}
+ RefCountedPtr InternalRef() {
+ IncrementRefCount("smartptr");
+ return RefCountedPtr(this);
+ }
+
void Orphan() final {
- bool finished;
- {
- MutexLock lock(mu());
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO, "%s[connected] DropStream: %s finished=%s",
- Activity::current()->DebugTag().c_str(),
- ActiveOpsString().c_str(), finished_ ? "true" : "false");
- }
- finished = finished_;
+ bool finished = finished_.IsSet();
+ if (grpc_call_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "%s[connected] Orphan stream, finished: %d",
+ party_->DebugTag().c_str(), finished);
}
// If we hadn't already observed the stream to be finished, we need to
// cancel it at the transport.
if (!finished) {
- IncrementRefCount("shutdown client stream");
- auto* cancel_op =
- GetContext()->New();
- cancel_op->cancel_stream = true;
- cancel_op->payload = batch_payload();
- auto* s = stream();
- cancel_op->on_complete = NewClosure(
- [this](grpc_error_handle) { Unref("shutdown client stream"); });
- batch_payload()->cancel_stream.cancel_error = absl::CancelledError();
- grpc_transport_perform_stream_op(transport(), s, cancel_op);
+ party_->Spawn(
+ "finish",
+ [self = InternalRef()]() {
+ if (!self->finished_.IsSet()) {
+ self->finished_.Set();
+ }
+ return Empty{};
+ },
+ [](Empty) {});
+ GetContext()->Cancel(batch_target(),
+ absl::CancelledError());
}
- Unref("orphan client stream");
+ Unref("orphan connected stream");
}
- protected:
- explicit ConnectedChannelStream(grpc_transport* transport)
- : transport_(transport), stream_(nullptr, StreamDeleter(this)) {
- call_context_->IncrementRefCount("connected_channel_stream");
- GRPC_STREAM_REF_INIT(
- &stream_refcount_, 1,
- [](void* p, grpc_error_handle) {
- static_cast(p)->BeginDestroy();
- },
- this, "client_stream");
- }
+ // Returns a promise that implements the receive message loop.
+ auto RecvMessages(PipeSender* incoming_messages);
+ // Returns a promise that implements the send message loop.
+ auto SendMessages(PipeReceiver* outgoing_messages);
- grpc_stream* stream() { return stream_.get(); }
void SetStream(grpc_stream* stream) { stream_.reset(stream); }
+ grpc_stream* stream() { return stream_.get(); }
grpc_stream_refcount* stream_refcount() { return &stream_refcount_; }
- Mutex* mu() const ABSL_LOCK_RETURNED(mu_) { return &mu_; }
- grpc_transport_stream_op_batch_payload* batch_payload() {
- return &batch_payload_;
- }
- bool finished() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return finished_; }
- void set_finished() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { finished_ = true; }
- virtual std::string ActiveOpsString() const
- ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) = 0;
-
- void SchedulePush(grpc_transport_stream_op_batch* batch)
- ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
- batch->is_traced = GetContext()->traced();
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_DEBUG, "%s[connected] Push batch to transport: %s",
- Activity::current()->DebugTag().c_str(),
- grpc_transport_stream_op_batch_string(batch, false).c_str());
- }
- if (push_batches_.empty()) {
- IncrementRefCount("push");
- ExecCtx::Run(DEBUG_LOCATION, &push_, absl::OkStatus());
- }
- push_batches_.push_back(batch);
- }
- void PollSendMessage(PipeReceiver* outgoing_messages,
- ClientMetadataHandle* client_trailing_metadata)
- ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
- if (absl::holds_alternative(send_message_state_)) {
- message_to_send_.reset();
- }
- if (absl::holds_alternative(send_message_state_)) {
- message_to_send_.reset();
- send_message_state_.emplace>(
- outgoing_messages->Next());
- }
- if (auto* next = absl::get_if>(
- &send_message_state_)) {
- auto r = (*next)();
- if (auto* p = r.value_if_ready()) {
- memset(&send_message_, 0, sizeof(send_message_));
- send_message_.payload = batch_payload();
- send_message_.on_complete = &send_message_batch_done_;
- // No value => half close from above.
- if (p->has_value()) {
- message_to_send_ = std::move(*p);
- send_message_state_ = SendMessageToTransport{};
- send_message_.send_message = true;
- batch_payload()->send_message.send_message =
- (*message_to_send_)->payload();
- batch_payload()->send_message.flags = (*message_to_send_)->flags();
- } else {
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO, "%s[connected] PollConnectedChannel: half close",
- Activity::current()->DebugTag().c_str());
- }
- GPR_ASSERT(!absl::holds_alternative(send_message_state_));
- send_message_state_ = Closed{};
- send_message_.send_trailing_metadata = true;
- if (client_trailing_metadata != nullptr) {
- *client_trailing_metadata =
- GetContext()->MakePooled(
- GetContext());
- batch_payload()->send_trailing_metadata.send_trailing_metadata =
- client_trailing_metadata->get();
- batch_payload()->send_trailing_metadata.sent = nullptr;
- } else {
- return; // Skip rest of function for server
- }
- }
- IncrementRefCount("send_message");
- send_message_waker_ = Activity::current()->MakeOwningWaker();
- SchedulePush(&send_message_);
- }
- }
- }
-
- void PollRecvMessage(PipeSender*& incoming_messages)
- ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
- if (auto* pending =
- absl::get_if(&recv_message_state_)) {
- if (pending->received) {
- if (pending->payload.has_value()) {
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO,
- "%s[connected] PollRecvMessage: received payload of "
- "%" PRIdPTR " bytes",
- recv_message_waker_.ActivityDebugTag().c_str(),
- pending->payload->Length());
- }
- recv_message_state_ =
- incoming_messages->Push(GetContext()->MakePooled(
- std::move(*pending->payload), pending->flags));
- } else {
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO,
- "%s[connected] PollRecvMessage: received no payload",
- recv_message_waker_.ActivityDebugTag().c_str());
- }
- recv_message_state_ = Closed{};
- std::exchange(incoming_messages, nullptr)->Close();
- }
- }
- }
- if (absl::holds_alternative(recv_message_state_)) {
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO, "%s[connected] PollRecvMessage: requesting message",
- Activity::current()->DebugTag().c_str());
- }
- PushRecvMessage();
- }
- if (auto* push = absl::get_if::PushType>(
- &recv_message_state_)) {
- auto r = (*push)();
- if (bool* result = r.value_if_ready()) {
- if (*result) {
- if (!finished_) {
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO,
- "%s[connected] PollRecvMessage: pushed message; "
- "requesting next",
- Activity::current()->DebugTag().c_str());
- }
- PushRecvMessage();
- } else {
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO,
- "%s[connected] PollRecvMessage: pushed message "
- "and finished; "
- "marking closed",
- Activity::current()->DebugTag().c_str());
- }
- recv_message_state_ = Closed{};
- std::exchange(incoming_messages, nullptr)->Close();
- }
- } else {
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO,
- "%s[connected] PollRecvMessage: failed to push "
- "message; marking "
- "closed",
- Activity::current()->DebugTag().c_str());
- }
- recv_message_state_ = Closed{};
- std::exchange(incoming_messages, nullptr)->Close();
- }
- }
- }
- }
-
- std::string SendMessageString() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
- return Match(
- send_message_state_, [](Idle) -> std::string { return "IDLE"; },
- [](Closed) -> std::string { return "CLOSED"; },
- [](const PipeReceiverNextType&) -> std::string {
- return "WAITING";
- },
- [](SendMessageToTransport) -> std::string { return "SENDING"; });
- }
-
- std::string RecvMessageString() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
- return Match(
- recv_message_state_, [](Idle) -> std::string { return "IDLE"; },
- [](Closed) -> std::string { return "CLOSED"; },
- [](const PendingReceiveMessage&) -> std::string { return "WAITING"; },
- [](const absl::optional& message) -> std::string {
- return absl::StrCat(
- "READY:", message.has_value()
- ? absl::StrCat((*message)->payload()->Length(), "b")
- : "EOS");
- },
- [](const PipeSender::PushType&) -> std::string {
- return "PUSHING";
- });
- }
-
- bool IsPromiseReceiving() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
- return absl::holds_alternative::PushType>(
- recv_message_state_) ||
- absl::holds_alternative(recv_message_state_);
- }
+ void set_finished() { finished_.Set(); }
+ auto WaitFinished() { return finished_.Wait(); }
private:
- struct SendMessageToTransport {};
- struct Idle {};
- struct Closed {};
-
class StreamDeleter {
public:
explicit StreamDeleter(ConnectedChannelStream* impl) : impl_(impl) {}
@@ -517,11 +348,7 @@ class ConnectedChannelStream : public Orphanable {
using StreamPtr = std::unique_ptr;
void StreamDestroyed() {
- call_context_->RunInContext([this] {
- auto* cc = call_context_;
- this->~ConnectedChannelStream();
- cc->Unref("child_stream");
- });
+ call_context_->RunInContext([this] { this->~ConnectedChannelStream(); });
}
void BeginDestroy() {
@@ -532,824 +359,434 @@ class ConnectedChannelStream : public Orphanable {
}
}
- // Called from outside the activity to push work down to the transport.
- void Push() {
- PushBatches push_batches;
- {
- MutexLock lock(&mu_);
- push_batches.swap(push_batches_);
- }
- for (auto* batch : push_batches) {
- if (stream() != nullptr) {
- grpc_transport_perform_stream_op(transport(), stream(), batch);
- } else {
- grpc_transport_stream_op_batch_finish_with_failure_from_transport(
- batch, absl::CancelledError());
- }
- }
- Unref("push");
- }
-
- void SendMessageBatchDone(grpc_error_handle error) {
- {
- MutexLock lock(&mu_);
- if (error != absl::OkStatus()) {
- // Note that we're in error here, the call will be closed by the
- // transport in a moment, and we'll return from the promise with an
- // error - so we don't need to do any extra work to close out pipes or
- // the like.
- send_message_state_ = Closed{};
- }
- if (!absl::holds_alternative(send_message_state_)) {
- send_message_state_ = Idle{};
- }
- send_message_waker_.Wakeup();
- }
- Unref("send_message");
- }
-
- void RecvMessageBatchDone(grpc_error_handle error) {
- {
- MutexLock lock(mu());
- if (error != absl::OkStatus()) {
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO, "%s[connected] RecvMessageBatchDone: error=%s",
- recv_message_waker_.ActivityDebugTag().c_str(),
- StatusToString(error).c_str());
- }
- } else if (absl::holds_alternative(recv_message_state_)) {
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO,
- "%s[connected] RecvMessageBatchDone: already closed, "
- "ignoring",
- recv_message_waker_.ActivityDebugTag().c_str());
- }
- } else {
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO,
- "%s[connected] RecvMessageBatchDone: received message",
- recv_message_waker_.ActivityDebugTag().c_str());
- }
- auto pending =
- absl::get_if(&recv_message_state_);
- GPR_ASSERT(pending != nullptr);
- GPR_ASSERT(pending->received == false);
- pending->received = true;
- }
- recv_message_waker_.Wakeup();
- }
- Unref("recv_message");
- }
-
- void PushRecvMessage() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
- recv_message_state_ = PendingReceiveMessage{};
- auto& pending_recv_message =
- absl::get(recv_message_state_);
- memset(&recv_message_, 0, sizeof(recv_message_));
- recv_message_.payload = batch_payload();
- recv_message_.on_complete = nullptr;
- recv_message_.recv_message = true;
- batch_payload()->recv_message.recv_message = &pending_recv_message.payload;
- batch_payload()->recv_message.flags = &pending_recv_message.flags;
- batch_payload()->recv_message.call_failed_before_recv_message = nullptr;
- batch_payload()->recv_message.recv_message_ready =
- &recv_message_batch_done_;
- IncrementRefCount("recv_message");
- recv_message_waker_ = Activity::current()->MakeOwningWaker();
- SchedulePush(&recv_message_);
- }
-
- mutable Mutex mu_;
grpc_transport* const transport_;
- CallContext* const call_context_{GetContext()};
+ RefCountedPtr const call_context_{
+ GetContext()->Ref()};
grpc_closure stream_destroyed_ =
MakeMemberClosure(
this, DEBUG_LOCATION);
grpc_stream_refcount stream_refcount_;
StreamPtr stream_;
- using PushBatches = absl::InlinedVector;
- PushBatches push_batches_ ABSL_GUARDED_BY(mu_);
- grpc_closure push_ =
- MakeMemberClosure(
- this, DEBUG_LOCATION);
-
- NextResult message_to_send_ ABSL_GUARDED_BY(mu_);
- absl::variant,
- SendMessageToTransport>
- send_message_state_ ABSL_GUARDED_BY(mu_);
- grpc_transport_stream_op_batch send_message_;
- grpc_closure send_message_batch_done_ =
- MakeMemberClosure(
- this, DEBUG_LOCATION);
-
- struct PendingReceiveMessage {
- absl::optional payload;
- uint32_t flags;
- bool received = false;
- };
- absl::variant::PushType>
- recv_message_state_ ABSL_GUARDED_BY(mu_);
- grpc_closure recv_message_batch_done_ =
- MakeMemberClosure(
- this, DEBUG_LOCATION);
- grpc_transport_stream_op_batch recv_message_;
-
- Waker send_message_waker_ ABSL_GUARDED_BY(mu_);
- Waker recv_message_waker_ ABSL_GUARDED_BY(mu_);
- bool finished_ ABSL_GUARDED_BY(mu_) = false;
-
- grpc_transport_stream_op_batch_payload batch_payload_{
- GetContext()};
+ Arena* arena_ = GetContext();
+ Party* const party_ = static_cast(Activity::current());
+ ExternallyObservableLatch finished_;
};
-#endif
-
-#ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_CLIENT_CALL
-class ClientStream : public ConnectedChannelStream {
- public:
- ClientStream(grpc_transport* transport, CallArgs call_args)
- : ConnectedChannelStream(transport),
- server_initial_metadata_pipe_(call_args.server_initial_metadata),
- client_to_server_messages_(call_args.client_to_server_messages),
- server_to_client_messages_(call_args.server_to_client_messages),
- client_initial_metadata_(std::move(call_args.client_initial_metadata)) {
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO, "%s[connected] InitImpl: intitial_metadata=%s",
- Activity::current()->DebugTag().c_str(),
- client_initial_metadata_->DebugString().c_str());
- }
- }
-
- Poll PollOnce() {
- MutexLock lock(mu());
- GPR_ASSERT(!finished());
-
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO, "%s[connected] PollConnectedChannel: %s",
- Activity::current()->DebugTag().c_str(),
- ActiveOpsString().c_str());
- }
-
- if (!std::exchange(requested_metadata_, true)) {
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO,
- "%s[connected] PollConnectedChannel: requesting metadata",
- Activity::current()->DebugTag().c_str());
- }
- SetStream(static_cast(
- GetContext()->Alloc(transport()->vtable->sizeof_stream)));
- grpc_transport_init_stream(transport(), stream(), stream_refcount(),
- nullptr, GetContext());
- grpc_transport_set_pops(transport(), stream(),
- GetContext()->polling_entity());
- memset(&metadata_, 0, sizeof(metadata_));
- metadata_.send_initial_metadata = true;
- metadata_.recv_initial_metadata = true;
- metadata_.recv_trailing_metadata = true;
- metadata_.payload = batch_payload();
- metadata_.on_complete = &metadata_batch_done_;
- batch_payload()->send_initial_metadata.send_initial_metadata =
- client_initial_metadata_.get();
- server_initial_metadata_ =
- GetContext()->MakePooled(GetContext());
- batch_payload()->recv_initial_metadata.recv_initial_metadata =
- server_initial_metadata_.get();
- batch_payload()->recv_initial_metadata.recv_initial_metadata_ready =
- &recv_initial_metadata_ready_;
- batch_payload()->recv_initial_metadata.trailing_metadata_available =
- nullptr;
- server_trailing_metadata_ =
- GetContext()->MakePooled(GetContext());
- batch_payload()->recv_trailing_metadata.recv_trailing_metadata =
- server_trailing_metadata_.get();
- batch_payload()->recv_trailing_metadata.collect_stats =
- &GetContext()->call_stats()->transport_stream_stats;
- batch_payload()->recv_trailing_metadata.recv_trailing_metadata_ready =
- &recv_trailing_metadata_ready_;
- IncrementRefCount("metadata_batch_done");
- IncrementRefCount("initial_metadata_ready");
- IncrementRefCount("trailing_metadata_ready");
- initial_metadata_waker_ = Activity::current()->MakeOwningWaker();
- trailing_metadata_waker_ = Activity::current()->MakeOwningWaker();
- SchedulePush(&metadata_);
- }
- if (server_initial_metadata_state_ ==
- ServerInitialMetadataState::kReceivedButNotPushed) {
- server_initial_metadata_state_ = ServerInitialMetadataState::kPushing;
- server_initial_metadata_push_promise_ =
- server_initial_metadata_pipe_->Push(
- std::move(server_initial_metadata_));
- }
- if (server_initial_metadata_state_ ==
- ServerInitialMetadataState::kPushing) {
- auto r = (*server_initial_metadata_push_promise_)();
- if (r.ready()) {
- server_initial_metadata_state_ = ServerInitialMetadataState::kPushed;
- server_initial_metadata_push_promise_.reset();
- }
- }
- PollSendMessage(client_to_server_messages_, &client_trailing_metadata_);
- PollRecvMessage(server_to_client_messages_);
- if (server_initial_metadata_state_ == ServerInitialMetadataState::kPushed &&
- !IsPromiseReceiving() &&
- std::exchange(queued_trailing_metadata_, false)) {
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO,
- "%s[connected] PollConnectedChannel: finished request, "
- "returning: {%s}; "
- "active_ops: %s",
- Activity::current()->DebugTag().c_str(),
- server_trailing_metadata_->DebugString().c_str(),
- ActiveOpsString().c_str());
- }
- set_finished();
- return ServerMetadataHandle(std::move(server_trailing_metadata_));
- }
- return Pending{};
- }
-
- void RecvInitialMetadataReady(grpc_error_handle error) {
- GPR_ASSERT(error == absl::OkStatus());
- {
- MutexLock lock(mu());
- server_initial_metadata_state_ =
- ServerInitialMetadataState::kReceivedButNotPushed;
- initial_metadata_waker_.Wakeup();
- }
- Unref("initial_metadata_ready");
- }
-
- void RecvTrailingMetadataReady(grpc_error_handle error) {
- GPR_ASSERT(error == absl::OkStatus());
- {
- MutexLock lock(mu());
- queued_trailing_metadata_ = true;
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "%s[connected] RecvTrailingMetadataReady: "
- "queued_trailing_metadata_ "
- "set to true; active_ops: %s",
- trailing_metadata_waker_.ActivityDebugTag().c_str(),
- ActiveOpsString().c_str());
- }
- trailing_metadata_waker_.Wakeup();
- }
- Unref("trailing_metadata_ready");
- }
-
- void MetadataBatchDone(grpc_error_handle error) {
- GPR_ASSERT(error == absl::OkStatus());
- Unref("metadata_batch_done");
- }
- private:
- enum class ServerInitialMetadataState : uint8_t {
- // Initial metadata has not been received from the server.
- kNotReceived,
- // Initial metadata has been received from the server via the transport, but
- // has not yet been pushed onto the pipe to publish it up the call stack.
- kReceivedButNotPushed,
- // Initial metadata has been received from the server via the transport and
- // has been pushed on the pipe to publish it up the call stack.
- // It's still in the pipe and has not been removed by the call at the top
- // yet.
- kPushing,
- // Initial metadata has been received from the server via the transport and
- // has been pushed on the pipe to publish it up the call stack AND removed
- // by the call at the top.
- kPushed,
- };
-
- std::string ActiveOpsString() const override
- ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
- std::vector ops;
- if (finished()) ops.push_back("FINISHED");
- // Outstanding Operations on Transport
- std::vector waiting;
- if (initial_metadata_waker_ != Waker()) {
- waiting.push_back("initial_metadata");
- }
- if (trailing_metadata_waker_ != Waker()) {
- waiting.push_back("trailing_metadata");
- }
- if (!waiting.empty()) {
- ops.push_back(absl::StrCat("waiting:", absl::StrJoin(waiting, ",")));
- }
- // Results from transport
- std::vector queued;
- if (server_initial_metadata_state_ ==
- ServerInitialMetadataState::kReceivedButNotPushed) {
- queued.push_back("initial_metadata");
- }
- if (queued_trailing_metadata_) queued.push_back("trailing_metadata");
- if (!queued.empty()) {
- ops.push_back(absl::StrCat("queued:", absl::StrJoin(queued, ",")));
- }
- // Send message
- std::string send_message_state = SendMessageString();
- if (send_message_state != "WAITING") {
- ops.push_back(absl::StrCat("send_message:", send_message_state));
- }
- // Receive message
- std::string recv_message_state = RecvMessageString();
- if (recv_message_state != "IDLE") {
- ops.push_back(absl::StrCat("recv_message:", recv_message_state));
- }
- return absl::StrJoin(ops, " ");
- }
-
- bool requested_metadata_ = false;
- ServerInitialMetadataState server_initial_metadata_state_
- ABSL_GUARDED_BY(mu()) = ServerInitialMetadataState::kNotReceived;
- bool queued_trailing_metadata_ ABSL_GUARDED_BY(mu()) = false;
- Waker initial_metadata_waker_ ABSL_GUARDED_BY(mu());
- Waker trailing_metadata_waker_ ABSL_GUARDED_BY(mu());
- PipeSender* server_initial_metadata_pipe_;
- PipeReceiver* client_to_server_messages_;
- PipeSender* server_to_client_messages_;
- grpc_closure recv_initial_metadata_ready_ =
- MakeMemberClosure(
- this, DEBUG_LOCATION);
- grpc_closure recv_trailing_metadata_ready_ =
- MakeMemberClosure(
- this, DEBUG_LOCATION);
- ClientMetadataHandle client_initial_metadata_;
- ClientMetadataHandle client_trailing_metadata_;
- ServerMetadataHandle server_initial_metadata_;
- ServerMetadataHandle server_trailing_metadata_;
- absl::optional::PushType>
- server_initial_metadata_push_promise_;
- grpc_transport_stream_op_batch metadata_;
- grpc_closure metadata_batch_done_ =
- MakeMemberClosure(
- this, DEBUG_LOCATION);
-};
-
-class ClientConnectedCallPromise {
- public:
- ClientConnectedCallPromise(grpc_transport* transport, CallArgs call_args)
- : impl_(GetContext()->New(transport,
- std::move(call_args))) {}
-
- ClientConnectedCallPromise(const ClientConnectedCallPromise&) = delete;
- ClientConnectedCallPromise& operator=(const ClientConnectedCallPromise&) =
- delete;
- ClientConnectedCallPromise(ClientConnectedCallPromise&& other) noexcept
- : impl_(std::exchange(other.impl_, nullptr)) {}
- ClientConnectedCallPromise& operator=(
- ClientConnectedCallPromise&& other) noexcept {
- impl_ = std::move(other.impl_);
- return *this;
- }
-
- static ArenaPromise Make(grpc_transport* transport,
- CallArgs call_args,
- NextPromiseFactory) {
- return ClientConnectedCallPromise(transport, std::move(call_args));
- }
+auto ConnectedChannelStream::RecvMessages(
+ PipeSender* incoming_messages) {
+ return Loop([self = InternalRef(),
+ incoming_messages = std::move(*incoming_messages)]() mutable {
+ return Seq(
+ GetContext()->ReceiveMessage(self->batch_target()),
+ [&incoming_messages](
+ absl::StatusOr> status) mutable {
+ bool has_message = status.ok() && status->has_value();
+ auto publish_message = [&incoming_messages, &status]() {
+ auto pending_message = std::move(**status);
+ if (grpc_call_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "%s[connected] RecvMessage: received payload of %" PRIdPTR
+ " bytes",
+ Activity::current()->DebugTag().c_str(),
+ pending_message->payload()->Length());
+ }
+ return Map(incoming_messages.Push(std::move(pending_message)),
+ [](bool ok) -> LoopCtl {
+ if (!ok) {
+ if (grpc_call_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "%s[connected] RecvMessage: failed to "
+ "push message towards the application",
+ Activity::current()->DebugTag().c_str());
+ }
+ return absl::OkStatus();
+ }
+ return Continue{};
+ });
+ };
+ auto publish_close = [&status]() mutable {
+ if (grpc_call_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "%s[connected] RecvMessage: reached end of stream with "
+ "status:%s",
+ Activity::current()->DebugTag().c_str(),
+ status.status().ToString().c_str());
+ }
+ return Immediate(LoopCtl(status.status()));
+ };
+ return If(has_message, std::move(publish_message),
+ std::move(publish_close));
+ });
+ });
+}
- Poll operator()() { return impl_->PollOnce(); }
+auto ConnectedChannelStream::SendMessages(
+ PipeReceiver* outgoing_messages) {
+ return ForEach(std::move(*outgoing_messages),
+ [self = InternalRef()](MessageHandle message) {
+ return GetContext()->SendMessage(
+ self->batch_target(), std::move(message));
+ });
+}
+#endif // defined(GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_CLIENT_CALL) ||
+ // defined(GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL)
- private:
- OrphanablePtr impl_;
-};
+#ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_CLIENT_CALL
+ArenaPromise MakeClientCallPromise(
+ grpc_transport* transport, CallArgs call_args, NextPromiseFactory) {
+ OrphanablePtr stream(
+ GetContext()->New(transport));
+ stream->SetStream(static_cast(
+ GetContext()->Alloc(transport->vtable->sizeof_stream)));
+ grpc_transport_init_stream(transport, stream->stream(),
+ stream->stream_refcount(), nullptr,
+ GetContext());
+ grpc_transport_set_pops(transport, stream->stream(),
+ GetContext()->polling_entity());
+ auto* party = static_cast(Activity::current());
+ // Start a loop to send messages from client_to_server_messages to the
+ // transport. When the pipe closes and the loop completes, send a trailing
+ // metadata batch to close the stream.
+ party->Spawn(
+ "send_messages",
+ TrySeq(stream->SendMessages(call_args.client_to_server_messages),
+ [stream = stream->InternalRef()]() {
+ return GetContext()->SendClientTrailingMetadata(
+ stream->batch_target());
+ }),
+ [](absl::Status) {});
+ // Start a promise to receive server initial metadata and then forward it up
+ // through the receiving pipe.
+ auto server_initial_metadata =
+ GetContext()->MakePooled(GetContext());
+ party->Spawn(
+ "recv_initial_metadata",
+ TrySeq(GetContext()->ReceiveServerInitialMetadata(
+ stream->batch_target()),
+ [pipe = call_args.server_initial_metadata](
+ ServerMetadataHandle server_initial_metadata) {
+ if (grpc_call_trace.enabled()) {
+ gpr_log(GPR_DEBUG,
+ "%s[connected] Publish client initial metadata: %s",
+ Activity::current()->DebugTag().c_str(),
+ server_initial_metadata->DebugString().c_str());
+ }
+ return Map(pipe->Push(std::move(server_initial_metadata)),
+ [](bool r) {
+ if (r) return absl::OkStatus();
+ return absl::CancelledError();
+ });
+ }),
+ [](absl::Status) {});
+
+ // Build up the rest of the main call promise:
+
+ // Create a promise that will send initial metadata and then signal completion
+ // of that via the token.
+ auto send_initial_metadata = Seq(
+ GetContext()->SendClientInitialMetadata(
+ stream->batch_target(), std::move(call_args.client_initial_metadata)),
+ [sent_initial_metadata_token =
+ std::move(call_args.client_initial_metadata_outstanding)](
+ absl::Status status) mutable {
+ sent_initial_metadata_token.Complete(status.ok());
+ return status;
+ });
+ // Create a promise that will receive server trailing metadata.
+ // If this fails, we massage the error into metadata that we can report
+ // upwards.
+ auto server_trailing_metadata =
+ GetContext()->MakePooled(GetContext());
+ auto recv_trailing_metadata =
+ Map(GetContext()->ReceiveServerTrailingMetadata(
+ stream->batch_target()),
+ [](absl::StatusOr status) mutable {
+ if (!status.ok()) {
+ auto server_trailing_metadata =
+ GetContext()->MakePooled(
+ GetContext());
+ grpc_status_code status_code = GRPC_STATUS_UNKNOWN;
+ std::string message;
+ grpc_error_get_status(status.status(), Timestamp::InfFuture(),
+ &status_code, &message, nullptr, nullptr);
+ server_trailing_metadata->Set(GrpcStatusMetadata(), status_code);
+ server_trailing_metadata->Set(GrpcMessageMetadata(),
+ Slice::FromCopiedString(message));
+ return server_trailing_metadata;
+ } else {
+ return std::move(*status);
+ }
+ });
+ // Finally the main call promise.
+ // Concurrently: send initial metadata and receive messages, until BOTH
+ // complete (or one fails).
+ // Next: receive trailing metadata, and return that up the stack.
+ auto recv_messages =
+ stream->RecvMessages(call_args.server_to_client_messages);
+ return Map(TrySeq(TryJoin(std::move(send_initial_metadata),
+ std::move(recv_messages)),
+ std::move(recv_trailing_metadata)),
+ [stream = std::move(stream)](ServerMetadataHandle result) {
+ stream->set_finished();
+ return result;
+ });
+}
#endif
#ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL
-class ServerStream final : public ConnectedChannelStream {
- public:
- ServerStream(grpc_transport* transport,
- NextPromiseFactory next_promise_factory)
- : ConnectedChannelStream(transport) {
- SetStream(static_cast(
- GetContext()->Alloc(transport->vtable->sizeof_stream)));
- grpc_transport_init_stream(
- transport, stream(), stream_refcount(),
- GetContext()->server_call_context()->server_stream_data(),
- GetContext());
- grpc_transport_set_pops(transport, stream(),
- GetContext()->polling_entity());
-
- // Fetch initial metadata
- auto& gim = call_state_.emplace(this);
- gim.recv_initial_metadata_ready_waker =
- Activity::current()->MakeOwningWaker();
- memset(&gim.recv_initial_metadata, 0, sizeof(gim.recv_initial_metadata));
- gim.recv_initial_metadata.payload = batch_payload();
- gim.recv_initial_metadata.on_complete = nullptr;
- gim.recv_initial_metadata.recv_initial_metadata = true;
- gim.next_promise_factory = std::move(next_promise_factory);
- batch_payload()->recv_initial_metadata.recv_initial_metadata =
- gim.client_initial_metadata.get();
- batch_payload()->recv_initial_metadata.recv_initial_metadata_ready =
- &gim.recv_initial_metadata_ready;
- SchedulePush(&gim.recv_initial_metadata);
-
- // Fetch trailing metadata (to catch cancellations)
- auto& gtm =
- client_trailing_metadata_state_.emplace();
- gtm.recv_trailing_metadata_ready =
- MakeMemberClosure(this);
- memset(>m.recv_trailing_metadata, 0, sizeof(gtm.recv_trailing_metadata));
- gtm.recv_trailing_metadata.payload = batch_payload();
- gtm.recv_trailing_metadata.recv_trailing_metadata = true;
- batch_payload()->recv_trailing_metadata.recv_trailing_metadata =
- gtm.result.get();
- batch_payload()->recv_trailing_metadata.collect_stats =
- &GetContext()->call_stats()->transport_stream_stats;
- batch_payload()->recv_trailing_metadata.recv_trailing_metadata_ready =
- >m.recv_trailing_metadata_ready;
- SchedulePush(>m.recv_trailing_metadata);
- gtm.waker = Activity::current()->MakeOwningWaker();
- }
-
- Poll PollOnce() {
- MutexLock lock(mu());
-
- auto poll_send_initial_metadata = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
- mu()) {
- if (auto* promise =
- absl::get_if>(
- &server_initial_metadata_)) {
- auto r = (*promise)();
- if (auto* md = r.value_if_ready()) {
- if (grpc_call_trace.enabled()) {
- gpr_log(
- GPR_INFO, "%s[connected] got initial metadata %s",
- Activity::current()->DebugTag().c_str(),
- (md->has_value() ? (**md)->DebugString() : "")
- .c_str());
- }
- memset(&send_initial_metadata_, 0, sizeof(send_initial_metadata_));
- send_initial_metadata_.send_initial_metadata = true;
- send_initial_metadata_.payload = batch_payload();
- send_initial_metadata_.on_complete = &send_initial_metadata_done_;
- batch_payload()->send_initial_metadata.send_initial_metadata =
- server_initial_metadata_
- .emplace(std::move(**md))
- .get();
- SchedulePush(&send_initial_metadata_);
- return true;
- } else {
- return false;
- }
- } else {
- return true;
- }
- };
-
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO, "%s[connected] PollConnectedChannel: %s",
- Activity::current()->DebugTag().c_str(),
- ActiveOpsString().c_str());
- }
-
- poll_send_initial_metadata();
-
- if (auto* p = absl::get_if(
- &client_trailing_metadata_state_)) {
- pipes_.client_to_server.sender.Close();
- if (!p->result.ok()) {
- // client cancelled, we should cancel too
- if (absl::holds_alternative(call_state_) ||
- absl::holds_alternative(call_state_) ||
- absl::holds_alternative(call_state_)) {
- if (!absl::holds_alternative(
- server_initial_metadata_)) {
- // pretend we've sent initial metadata to stop that op from
- // progressing if it's stuck somewhere above us in the stack
- server_initial_metadata_.emplace();
- }
- // cancel the call - this status will be returned to the server bottom
- // promise
- call_state_.emplace(
- Complete{ServerMetadataFromStatus(p->result)});
- }
- }
- }
-
- if (auto* p = absl::get_if(&call_state_)) {
- incoming_messages_ = &pipes_.client_to_server.sender;
- auto promise = p->next_promise_factory(CallArgs{
- std::move(p->client_initial_metadata),
- &pipes_.server_initial_metadata.sender,
- &pipes_.client_to_server.receiver, &pipes_.server_to_client.sender});
- call_state_.emplace(
- MessageLoop{&pipes_.server_to_client.receiver, std::move(promise)});
- server_initial_metadata_
- .emplace>(
- pipes_.server_initial_metadata.receiver.Next());
- }
- if (incoming_messages_ != nullptr) {
- PollRecvMessage(incoming_messages_);
- }
- if (auto* p = absl::get_if(&call_state_)) {
- if (absl::holds_alternative(
- server_initial_metadata_)) {
- PollSendMessage(p->outgoing_messages, nullptr);
- }
- auto poll = p->promise();
- if (auto* r = poll.value_if_ready()) {
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO, "%s[connected] got trailing metadata %s; %s",
- Activity::current()->DebugTag().c_str(),
- (*r)->DebugString().c_str(), ActiveOpsString().c_str());
- }
- auto& completing = call_state_.emplace();
- completing.server_trailing_metadata = std::move(*r);
- completing.on_complete =
- MakeMemberClosure(this);
- completing.waker = Activity::current()->MakeOwningWaker();
- auto& op = completing.send_trailing_metadata;
- memset(&op, 0, sizeof(op));
- op.payload = batch_payload();
- op.on_complete = &completing.on_complete;
- // If we've gotten initial server metadata, we can send trailing
- // metadata.
- // Otherwise we need to cancel the call.
- // There could be an unlucky ordering, so we poll here to make sure.
- if (poll_send_initial_metadata()) {
- op.send_trailing_metadata = true;
- batch_payload()->send_trailing_metadata.send_trailing_metadata =
- completing.server_trailing_metadata.get();
- batch_payload()->send_trailing_metadata.sent = &completing.sent;
- } else {
- op.cancel_stream = true;
- const auto status_code =
- completing.server_trailing_metadata->get(GrpcStatusMetadata())
- .value_or(GRPC_STATUS_UNKNOWN);
- batch_payload()->cancel_stream.cancel_error = grpc_error_set_int(
- absl::Status(static_cast(status_code),
- completing.server_trailing_metadata
- ->GetOrCreatePointer(GrpcMessageMetadata())
- ->as_string_view()),
- StatusIntProperty::kRpcStatus, status_code);
- }
- SchedulePush(&op);
- }
- }
- if (auto* p = absl::get_if(&call_state_)) {
- set_finished();
- return std::move(p->result);
- }
- return Pending{};
- }
-
- private:
- // Call state: we've asked the transport for initial metadata and are
- // waiting for it before proceeding.
- struct GettingInitialMetadata {
- explicit GettingInitialMetadata(ServerStream* stream)
- : recv_initial_metadata_ready(
- MakeMemberClosure(
- stream)) {}
- // The batch we're using to get initial metadata.
- grpc_transport_stream_op_batch recv_initial_metadata;
- // Waker to re-enter the activity once the transport returns.
- Waker recv_initial_metadata_ready_waker;
- // Initial metadata storage for the transport.
- ClientMetadataHandle client_initial_metadata =
- GetContext()->MakePooled(GetContext());
- // Closure for the transport to call when it's ready.
- grpc_closure recv_initial_metadata_ready;
- // Next promise factory to use once we have initial metadata.
- NextPromiseFactory next_promise_factory;
- };
-
- // Call state: transport has returned initial metadata, we're waiting to
- // re-enter the activity to process it.
- struct GotInitialMetadata {
- ClientMetadataHandle client_initial_metadata;
- NextPromiseFactory next_promise_factory;
- };
-
- // Call state: we're sending/receiving messages and processing the filter
- // stack.
- struct MessageLoop {
- PipeReceiver* outgoing_messages;
- ArenaPromise promise;
- };
-
- // Call state: promise stack has returned trailing metadata, we're sending it
- // to the transport to communicate.
- struct Completing {
- ServerMetadataHandle server_trailing_metadata;
- grpc_transport_stream_op_batch send_trailing_metadata;
- grpc_closure on_complete;
- bool sent = false;
- Waker waker;
- };
-
- // Call state: server metadata has been communicated to the transport and sent
- // to the client.
- // The metadata will be returned down to the server call to tick the
- // cancellation bit or not on the originating batch.
- struct Complete {
- ServerMetadataHandle result;
- };
-
- // Trailing metadata state: we've asked the transport for trailing metadata
- // and are waiting for it before proceeding.
- struct WaitingForTrailingMetadata {
- ClientMetadataHandle result =
- GetContext()->MakePooled(GetContext());
- grpc_transport_stream_op_batch recv_trailing_metadata;
- grpc_closure recv_trailing_metadata_ready;
- Waker waker;
- };
-
- // We've received trailing metadata from the transport - which indicates reads
- // are closed.
- // We convert to an absl::Status here and use that to drive a decision to
- // cancel the call (on error) or not.
- struct GotClientHalfClose {
- absl::Status result;
- };
-
- void RecvInitialMetadataReady(absl::Status status) {
- MutexLock lock(mu());
- auto& getting = absl::get(call_state_);
- auto waker = std::move(getting.recv_initial_metadata_ready_waker);
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_DEBUG, "%sGOT INITIAL METADATA: err=%s %s",
- waker.ActivityDebugTag().c_str(), status.ToString().c_str(),
- getting.client_initial_metadata->DebugString().c_str());
- }
- GotInitialMetadata got{std::move(getting.client_initial_metadata),
- std::move(getting.next_promise_factory)};
- call_state_.emplace(std::move(got));
- waker.Wakeup();
- }
-
- void SendTrailingMetadataDone(absl::Status result) {
- MutexLock lock(mu());
- auto& completing = absl::get(call_state_);
- auto md = std::move(completing.server_trailing_metadata);
- auto waker = std::move(completing.waker);
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_DEBUG, "%sSEND TRAILING METADATA DONE: err=%s sent=%s %s",
- waker.ActivityDebugTag().c_str(), result.ToString().c_str(),
- completing.sent ? "true" : "false", md->DebugString().c_str());
- }
- md->Set(GrpcStatusFromWire(), completing.sent);
- if (!result.ok()) {
- md->Clear();
- md->Set(GrpcStatusMetadata(),
- static_cast(result.code()));
- md->Set(GrpcMessageMetadata(), Slice::FromCopiedString(result.message()));
- md->Set(GrpcStatusFromWire(), false);
- }
- call_state_.emplace(Complete{std::move(md)});
- waker.Wakeup();
- }
-
- std::string ActiveOpsString() const override
- ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
- std::vector ops;
- ops.push_back(absl::StrCat(
- "call_state:",
- Match(
- call_state_,
- [](const absl::monostate&) { return "absl::monostate"; },
- [](const GettingInitialMetadata&) { return "GETTING"; },
- [](const GotInitialMetadata&) { return "GOT"; },
- [](const MessageLoop&) { return "RUNNING"; },
- [](const Completing&) { return "COMPLETING"; },
- [](const Complete&) { return "COMPLETE"; })));
- ops.push_back(
- absl::StrCat("client_trailing_metadata_state:",
- Match(
- client_trailing_metadata_state_,
- [](const absl::monostate&) -> std::string {
- return "absl::monostate";
- },
- [](const WaitingForTrailingMetadata&) -> std::string {
- return "WAITING";
- },
- [](const GotClientHalfClose& got) -> std::string {
- return absl::StrCat("GOT:", got.result.ToString());
- })));
- // Send initial metadata
- ops.push_back(absl::StrCat(
- "server_initial_metadata_state:",
- Match(
- server_initial_metadata_,
- [](const absl::monostate&) { return "absl::monostate"; },
- [](const PipeReceiverNextType&) {
- return "WAITING";
- },
- [](const ServerMetadataHandle&) { return "GOT"; })));
- // Send message
- std::string send_message_state = SendMessageString();
- if (send_message_state != "WAITING") {
- ops.push_back(absl::StrCat("send_message:", send_message_state));
- }
- // Receive message
- std::string recv_message_state = RecvMessageString();
- if (recv_message_state != "IDLE") {
- ops.push_back(absl::StrCat("recv_message:", recv_message_state));
- }
- return absl::StrJoin(ops, " ");
- }
-
- void SendInitialMetadataDone() {}
-
- void RecvTrailingMetadataReady(absl::Status error) {
- MutexLock lock(mu());
- auto& state =
- absl::get(client_trailing_metadata_state_);
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO,
- "%sRecvTrailingMetadataReady: error:%s metadata:%s state:%s",
- state.waker.ActivityDebugTag().c_str(), error.ToString().c_str(),
- state.result->DebugString().c_str(), ActiveOpsString().c_str());
- }
- auto waker = std::move(state.waker);
- ServerMetadataHandle result = std::move(state.result);
- if (error.ok()) {
- auto* message = result->get_pointer(GrpcMessageMetadata());
- error = absl::Status(
- static_cast(
- result->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN)),
- message == nullptr ? "" : message->as_string_view());
- }
- client_trailing_metadata_state_.emplace(
- GotClientHalfClose{error});
- waker.Wakeup();
- }
-
- struct Pipes {
+ArenaPromise MakeServerCallPromise(
+ grpc_transport* transport, CallArgs,
+ NextPromiseFactory next_promise_factory) {
+ OrphanablePtr stream(
+ GetContext()->New(transport));
+
+ stream->SetStream(static_cast(
+ GetContext()->Alloc(transport->vtable->sizeof_stream)));
+ grpc_transport_init_stream(
+ transport, stream->stream(), stream->stream_refcount(),
+ GetContext()->server_call_context()->server_stream_data(),
+ GetContext());
+ grpc_transport_set_pops(transport, stream->stream(),
+ GetContext()->polling_entity());
+
+ auto* party = static_cast(Activity::current());
+
+ // Arifacts we need for the lifetime of the call.
+ struct CallData {
Pipe server_to_client;
Pipe client_to_server;
Pipe server_initial_metadata;
+ Latch failure_latch;
+ bool sent_initial_metadata = false;
+ bool sent_trailing_metadata = false;
};
-
- using CallState =
- absl::variant;
- CallState call_state_ ABSL_GUARDED_BY(mu()) = absl::monostate{};
- using ClientTrailingMetadataState =
- absl::variant;
- ClientTrailingMetadataState client_trailing_metadata_state_
- ABSL_GUARDED_BY(mu()) = absl::monostate{};
- absl::variant,
- ServerMetadataHandle>
- ABSL_GUARDED_BY(mu()) server_initial_metadata_ = absl::monostate{};
- PipeSender* incoming_messages_ = nullptr;
- grpc_transport_stream_op_batch send_initial_metadata_;
- grpc_closure send_initial_metadata_done_ =
- MakeMemberClosure(
- this);
- Pipes pipes_ ABSL_GUARDED_BY(mu());
-};
-
-class ServerConnectedCallPromise {
- public:
- ServerConnectedCallPromise(grpc_transport* transport,
- NextPromiseFactory next_promise_factory)
- : impl_(GetContext()->New