From 2f9ac46a5d479c14407a84bb9c0b0536fa711ef4 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 27 Oct 2022 12:42:13 -0700 Subject: [PATCH] Revert "Revert "[promise] CallPushPull -> more general TryConcurrently (#31429)" (#31480)" (#31481) * Revert "Revert "[promise] CallPushPull -> more general TryConcurrently (#31429)" (#31480)" This reverts commit 821443e9b566b173c7f50fdf26914982c3e36788. * fix --- BUILD | 3 +- CMakeLists.txt | 105 ++++-- build_autogenerated.yaml | 115 ++++-- gRPC-C++.podspec | 8 +- gRPC-Core.podspec | 8 +- grpc.gemspec | 4 +- package.xml | 4 +- src/core/BUILD | 32 +- .../filters/http/client/http_client_filter.cc | 32 +- .../filters/http/server/http_server_filter.cc | 32 +- src/core/lib/promise/arena_promise.h | 131 ++++--- src/core/lib/promise/call_push_pull.h | 148 -------- src/core/lib/promise/try_concurrently.h | 341 ++++++++++++++++++ test/core/promise/BUILD | 6 +- test/core/promise/arena_promise_test.cc | 23 ++ test/core/promise/call_push_pull_test.cc | 77 ---- test/core/promise/try_concurrently_test.cc | 160 ++++++++ tools/doxygen/Doxyfile.c++.internal | 4 +- tools/doxygen/Doxyfile.core.internal | 4 +- tools/run_tests/generated/tests.json | 48 +-- 20 files changed, 864 insertions(+), 421 deletions(-) delete mode 100644 src/core/lib/promise/call_push_pull.h create mode 100644 src/core/lib/promise/try_concurrently.h delete mode 100644 test/core/promise/call_push_pull_test.cc create mode 100644 test/core/promise/try_concurrently_test.cc diff --git a/BUILD b/BUILD index b052be2350a..277f21c512a 100644 --- a/BUILD +++ b/BUILD @@ -3116,8 +3116,6 @@ grpc_cc_library( "promise", "//src/core:arena", "//src/core:arena_promise", - "//src/core:basic_seq", - "//src/core:call_push_pull", "//src/core:channel_fwd", "//src/core:channel_init", "//src/core:channel_stack_type", @@ -3130,6 +3128,7 @@ grpc_cc_library( "//src/core:slice_buffer", "//src/core:status_helper", "//src/core:transport_fwd", + "//src/core:try_concurrently", ], ) diff --git a/CMakeLists.txt b/CMakeLists.txt index abf6aa74a24..73a34423991 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -847,7 +847,6 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx byte_buffer_test) add_dependencies(buildtests_cxx c_slice_buffer_test) add_dependencies(buildtests_cxx call_finalization_test) - add_dependencies(buildtests_cxx call_push_pull_test) add_dependencies(buildtests_cxx cancel_ares_query_test) add_dependencies(buildtests_cxx cel_authorization_engine_test) add_dependencies(buildtests_cxx certificate_provider_registry_test) @@ -1220,6 +1219,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx transport_security_common_api_test) add_dependencies(buildtests_cxx transport_security_test) add_dependencies(buildtests_cxx transport_stream_receiver_test) + add_dependencies(buildtests_cxx try_concurrently_test) add_dependencies(buildtests_cxx try_join_test) add_dependencies(buildtests_cxx try_seq_metadata_test) add_dependencies(buildtests_cxx try_seq_test) @@ -6779,45 +6779,6 @@ target_link_libraries(call_finalization_test ) -endif() -if(gRPC_BUILD_TESTS) - -add_executable(call_push_pull_test - test/core/promise/call_push_pull_test.cc - third_party/googletest/googletest/src/gtest-all.cc - third_party/googletest/googlemock/src/gmock-all.cc -) - -target_include_directories(call_push_pull_test - PRIVATE - ${CMAKE_CURRENT_SOURCE_DIR} - ${CMAKE_CURRENT_SOURCE_DIR}/include - ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} - ${_gRPC_RE2_INCLUDE_DIR} - ${_gRPC_SSL_INCLUDE_DIR} - ${_gRPC_UPB_GENERATED_DIR} - ${_gRPC_UPB_GRPC_GENERATED_DIR} - ${_gRPC_UPB_INCLUDE_DIR} - ${_gRPC_XXHASH_INCLUDE_DIR} - ${_gRPC_ZLIB_INCLUDE_DIR} - third_party/googletest/googletest/include - third_party/googletest/googletest - third_party/googletest/googlemock/include - third_party/googletest/googlemock - ${_gRPC_PROTO_GENS_DIR} -) - -target_link_libraries(call_push_pull_test - ${_gRPC_PROTOBUF_LIBRARIES} - ${_gRPC_ALLTARGETS_LIBRARIES} - absl::type_traits - absl::status - absl::statusor - absl::strings - absl::variant -) - - endif() if(gRPC_BUILD_TESTS) @@ -19603,6 +19564,70 @@ target_link_libraries(transport_stream_receiver_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(try_concurrently_test + src/core/ext/upb-generated/google/protobuf/any.upb.c + src/core/ext/upb-generated/google/rpc/status.upb.c + src/core/lib/debug/trace.cc + src/core/lib/event_engine/memory_allocator.cc + src/core/lib/experiments/config.cc + src/core/lib/experiments/experiments.cc + src/core/lib/gprpp/status_helper.cc + src/core/lib/gprpp/time.cc + src/core/lib/iomgr/combiner.cc + src/core/lib/iomgr/error.cc + src/core/lib/iomgr/exec_ctx.cc + src/core/lib/iomgr/executor.cc + src/core/lib/iomgr/iomgr_internal.cc + src/core/lib/promise/activity.cc + src/core/lib/promise/pipe.cc + src/core/lib/resource_quota/arena.cc + src/core/lib/resource_quota/memory_quota.cc + src/core/lib/resource_quota/periodic_update.cc + src/core/lib/resource_quota/trace.cc + src/core/lib/slice/percent_encoding.cc + src/core/lib/slice/slice.cc + src/core/lib/slice/slice_string_helpers.cc + test/core/promise/try_concurrently_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(try_concurrently_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(try_concurrently_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + absl::any_invocable + absl::function_ref + absl::hash + absl::type_traits + absl::statusor + absl::utility + gpr + upb +) + + endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index e5243269f2d..29da9210ebf 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -879,23 +879,25 @@ libs: - src/core/lib/matchers/matchers.h - src/core/lib/promise/activity.h - src/core/lib/promise/arena_promise.h - - src/core/lib/promise/call_push_pull.h - src/core/lib/promise/context.h - src/core/lib/promise/detail/basic_seq.h - src/core/lib/promise/detail/promise_factory.h - src/core/lib/promise/detail/promise_like.h - src/core/lib/promise/detail/status.h - src/core/lib/promise/exec_ctx_wakeup_scheduler.h + - src/core/lib/promise/for_each.h - src/core/lib/promise/intra_activity_waiter.h - src/core/lib/promise/latch.h - src/core/lib/promise/loop.h - src/core/lib/promise/map.h + - src/core/lib/promise/map_pipe.h - src/core/lib/promise/pipe.h - src/core/lib/promise/poll.h - src/core/lib/promise/promise.h - src/core/lib/promise/race.h - src/core/lib/promise/seq.h - src/core/lib/promise/sleep.h + - src/core/lib/promise/try_concurrently.h - src/core/lib/promise/try_seq.h - src/core/lib/resolver/resolver.h - src/core/lib/resolver/resolver_factory.h @@ -2134,23 +2136,25 @@ libs: - src/core/lib/load_balancing/subchannel_interface.h - src/core/lib/promise/activity.h - src/core/lib/promise/arena_promise.h - - src/core/lib/promise/call_push_pull.h - src/core/lib/promise/context.h - src/core/lib/promise/detail/basic_seq.h - src/core/lib/promise/detail/promise_factory.h - src/core/lib/promise/detail/promise_like.h - src/core/lib/promise/detail/status.h - src/core/lib/promise/exec_ctx_wakeup_scheduler.h + - src/core/lib/promise/for_each.h - src/core/lib/promise/intra_activity_waiter.h - src/core/lib/promise/latch.h - src/core/lib/promise/loop.h - src/core/lib/promise/map.h + - src/core/lib/promise/map_pipe.h - src/core/lib/promise/pipe.h - src/core/lib/promise/poll.h - src/core/lib/promise/promise.h - src/core/lib/promise/race.h - src/core/lib/promise/seq.h - src/core/lib/promise/sleep.h + - src/core/lib/promise/try_concurrently.h - src/core/lib/promise/try_seq.h - src/core/lib/resolver/resolver.h - src/core/lib/resolver/resolver_factory.h @@ -4564,27 +4568,6 @@ targets: - test/core/channel/call_finalization_test.cc deps: - grpc_test_util -- name: call_push_pull_test - gtest: true - build: test - language: c++ - headers: - - src/core/lib/gpr/useful.h - - src/core/lib/gprpp/bitset.h - - src/core/lib/gprpp/construct_destruct.h - - src/core/lib/promise/call_push_pull.h - - src/core/lib/promise/detail/promise_like.h - - src/core/lib/promise/detail/status.h - - src/core/lib/promise/poll.h - src: - - test/core/promise/call_push_pull_test.cc - deps: - - absl/meta:type_traits - - absl/status:status - - absl/status:statusor - - absl/strings:strings - - absl/types:variant - uses_polling: false - name: cancel_ares_query_test gtest: true build: test @@ -10796,6 +10779,92 @@ targets: deps: - grpc_test_util uses_polling: false +- name: try_concurrently_test + gtest: true + build: test + language: c++ + headers: + - src/core/ext/upb-generated/google/protobuf/any.upb.h + - src/core/ext/upb-generated/google/rpc/status.upb.h + - src/core/lib/debug/trace.h + - src/core/lib/experiments/config.h + - src/core/lib/experiments/experiments.h + - src/core/lib/gpr/spinlock.h + - src/core/lib/gprpp/atomic_utils.h + - src/core/lib/gprpp/bitset.h + - src/core/lib/gprpp/debug_location.h + - src/core/lib/gprpp/manual_constructor.h + - src/core/lib/gprpp/orphanable.h + - src/core/lib/gprpp/ref_counted.h + - src/core/lib/gprpp/ref_counted_ptr.h + - src/core/lib/gprpp/status_helper.h + - src/core/lib/gprpp/time.h + - src/core/lib/iomgr/closure.h + - src/core/lib/iomgr/combiner.h + - src/core/lib/iomgr/error.h + - src/core/lib/iomgr/exec_ctx.h + - src/core/lib/iomgr/executor.h + - src/core/lib/iomgr/iomgr_internal.h + - src/core/lib/promise/activity.h + - src/core/lib/promise/context.h + - src/core/lib/promise/detail/basic_seq.h + - src/core/lib/promise/detail/promise_factory.h + - src/core/lib/promise/detail/promise_like.h + - src/core/lib/promise/detail/status.h + - src/core/lib/promise/exec_ctx_wakeup_scheduler.h + - src/core/lib/promise/for_each.h + - src/core/lib/promise/intra_activity_waiter.h + - src/core/lib/promise/loop.h + - src/core/lib/promise/map.h + - src/core/lib/promise/map_pipe.h + - src/core/lib/promise/pipe.h + - src/core/lib/promise/poll.h + - src/core/lib/promise/race.h + - src/core/lib/promise/seq.h + - src/core/lib/promise/try_concurrently.h + - src/core/lib/resource_quota/arena.h + - src/core/lib/resource_quota/memory_quota.h + - src/core/lib/resource_quota/periodic_update.h + - src/core/lib/resource_quota/trace.h + - src/core/lib/slice/percent_encoding.h + - src/core/lib/slice/slice.h + - src/core/lib/slice/slice_internal.h + - src/core/lib/slice/slice_refcount.h + - src/core/lib/slice/slice_string_helpers.h + src: + - src/core/ext/upb-generated/google/protobuf/any.upb.c + - src/core/ext/upb-generated/google/rpc/status.upb.c + - src/core/lib/debug/trace.cc + - src/core/lib/event_engine/memory_allocator.cc + - src/core/lib/experiments/config.cc + - src/core/lib/experiments/experiments.cc + - src/core/lib/gprpp/status_helper.cc + - src/core/lib/gprpp/time.cc + - src/core/lib/iomgr/combiner.cc + - src/core/lib/iomgr/error.cc + - src/core/lib/iomgr/exec_ctx.cc + - src/core/lib/iomgr/executor.cc + - src/core/lib/iomgr/iomgr_internal.cc + - src/core/lib/promise/activity.cc + - src/core/lib/promise/pipe.cc + - src/core/lib/resource_quota/arena.cc + - src/core/lib/resource_quota/memory_quota.cc + - src/core/lib/resource_quota/periodic_update.cc + - src/core/lib/resource_quota/trace.cc + - src/core/lib/slice/percent_encoding.cc + - src/core/lib/slice/slice.cc + - src/core/lib/slice/slice_string_helpers.cc + - test/core/promise/try_concurrently_test.cc + deps: + - absl/functional:any_invocable + - absl/functional:function_ref + - absl/hash:hash + - absl/meta:type_traits + - absl/status:statusor + - absl/utility:utility + - gpr + - upb + uses_polling: false - name: try_join_test gtest: true build: test diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index fb8609e99fc..66f2dc13a7a 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -842,23 +842,25 @@ Pod::Spec.new do |s| 'src/core/lib/matchers/matchers.h', 'src/core/lib/promise/activity.h', 'src/core/lib/promise/arena_promise.h', - 'src/core/lib/promise/call_push_pull.h', 'src/core/lib/promise/context.h', 'src/core/lib/promise/detail/basic_seq.h', 'src/core/lib/promise/detail/promise_factory.h', 'src/core/lib/promise/detail/promise_like.h', 'src/core/lib/promise/detail/status.h', 'src/core/lib/promise/exec_ctx_wakeup_scheduler.h', + 'src/core/lib/promise/for_each.h', 'src/core/lib/promise/intra_activity_waiter.h', 'src/core/lib/promise/latch.h', 'src/core/lib/promise/loop.h', 'src/core/lib/promise/map.h', + 'src/core/lib/promise/map_pipe.h', 'src/core/lib/promise/pipe.h', 'src/core/lib/promise/poll.h', 'src/core/lib/promise/promise.h', 'src/core/lib/promise/race.h', 'src/core/lib/promise/seq.h', 'src/core/lib/promise/sleep.h', + 'src/core/lib/promise/try_concurrently.h', 'src/core/lib/promise/try_seq.h', 'src/core/lib/resolver/resolver.h', 'src/core/lib/resolver/resolver_factory.h', @@ -1719,23 +1721,25 @@ Pod::Spec.new do |s| 'src/core/lib/matchers/matchers.h', 'src/core/lib/promise/activity.h', 'src/core/lib/promise/arena_promise.h', - 'src/core/lib/promise/call_push_pull.h', 'src/core/lib/promise/context.h', 'src/core/lib/promise/detail/basic_seq.h', 'src/core/lib/promise/detail/promise_factory.h', 'src/core/lib/promise/detail/promise_like.h', 'src/core/lib/promise/detail/status.h', 'src/core/lib/promise/exec_ctx_wakeup_scheduler.h', + 'src/core/lib/promise/for_each.h', 'src/core/lib/promise/intra_activity_waiter.h', 'src/core/lib/promise/latch.h', 'src/core/lib/promise/loop.h', 'src/core/lib/promise/map.h', + 'src/core/lib/promise/map_pipe.h', 'src/core/lib/promise/pipe.h', 'src/core/lib/promise/poll.h', 'src/core/lib/promise/promise.h', 'src/core/lib/promise/race.h', 'src/core/lib/promise/seq.h', 'src/core/lib/promise/sleep.h', + 'src/core/lib/promise/try_concurrently.h', 'src/core/lib/promise/try_seq.h', 'src/core/lib/resolver/resolver.h', 'src/core/lib/resolver/resolver_factory.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 48088f3b1b8..598a3fcf6c6 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1371,17 +1371,18 @@ Pod::Spec.new do |s| 'src/core/lib/promise/activity.cc', 'src/core/lib/promise/activity.h', 'src/core/lib/promise/arena_promise.h', - 'src/core/lib/promise/call_push_pull.h', 'src/core/lib/promise/context.h', 'src/core/lib/promise/detail/basic_seq.h', 'src/core/lib/promise/detail/promise_factory.h', 'src/core/lib/promise/detail/promise_like.h', 'src/core/lib/promise/detail/status.h', 'src/core/lib/promise/exec_ctx_wakeup_scheduler.h', + 'src/core/lib/promise/for_each.h', 'src/core/lib/promise/intra_activity_waiter.h', 'src/core/lib/promise/latch.h', 'src/core/lib/promise/loop.h', 'src/core/lib/promise/map.h', + 'src/core/lib/promise/map_pipe.h', 'src/core/lib/promise/pipe.cc', 'src/core/lib/promise/pipe.h', 'src/core/lib/promise/poll.h', @@ -1390,6 +1391,7 @@ Pod::Spec.new do |s| 'src/core/lib/promise/seq.h', 'src/core/lib/promise/sleep.cc', 'src/core/lib/promise/sleep.h', + 'src/core/lib/promise/try_concurrently.h', 'src/core/lib/promise/try_seq.h', 'src/core/lib/resolver/resolver.cc', 'src/core/lib/resolver/resolver.h', @@ -2357,23 +2359,25 @@ Pod::Spec.new do |s| 'src/core/lib/matchers/matchers.h', 'src/core/lib/promise/activity.h', 'src/core/lib/promise/arena_promise.h', - 'src/core/lib/promise/call_push_pull.h', 'src/core/lib/promise/context.h', 'src/core/lib/promise/detail/basic_seq.h', 'src/core/lib/promise/detail/promise_factory.h', 'src/core/lib/promise/detail/promise_like.h', 'src/core/lib/promise/detail/status.h', 'src/core/lib/promise/exec_ctx_wakeup_scheduler.h', + 'src/core/lib/promise/for_each.h', 'src/core/lib/promise/intra_activity_waiter.h', 'src/core/lib/promise/latch.h', 'src/core/lib/promise/loop.h', 'src/core/lib/promise/map.h', + 'src/core/lib/promise/map_pipe.h', 'src/core/lib/promise/pipe.h', 'src/core/lib/promise/poll.h', 'src/core/lib/promise/promise.h', 'src/core/lib/promise/race.h', 'src/core/lib/promise/seq.h', 'src/core/lib/promise/sleep.h', + 'src/core/lib/promise/try_concurrently.h', 'src/core/lib/promise/try_seq.h', 'src/core/lib/resolver/resolver.h', 'src/core/lib/resolver/resolver_factory.h', diff --git a/grpc.gemspec b/grpc.gemspec index d142689dfb0..5f1d8c0b7d9 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -1282,17 +1282,18 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/promise/activity.cc ) s.files += %w( src/core/lib/promise/activity.h ) s.files += %w( src/core/lib/promise/arena_promise.h ) - s.files += %w( src/core/lib/promise/call_push_pull.h ) s.files += %w( src/core/lib/promise/context.h ) s.files += %w( src/core/lib/promise/detail/basic_seq.h ) s.files += %w( src/core/lib/promise/detail/promise_factory.h ) s.files += %w( src/core/lib/promise/detail/promise_like.h ) s.files += %w( src/core/lib/promise/detail/status.h ) s.files += %w( src/core/lib/promise/exec_ctx_wakeup_scheduler.h ) + s.files += %w( src/core/lib/promise/for_each.h ) s.files += %w( src/core/lib/promise/intra_activity_waiter.h ) s.files += %w( src/core/lib/promise/latch.h ) s.files += %w( src/core/lib/promise/loop.h ) s.files += %w( src/core/lib/promise/map.h ) + s.files += %w( src/core/lib/promise/map_pipe.h ) s.files += %w( src/core/lib/promise/pipe.cc ) s.files += %w( src/core/lib/promise/pipe.h ) s.files += %w( src/core/lib/promise/poll.h ) @@ -1301,6 +1302,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/promise/seq.h ) s.files += %w( src/core/lib/promise/sleep.cc ) s.files += %w( src/core/lib/promise/sleep.h ) + s.files += %w( src/core/lib/promise/try_concurrently.h ) s.files += %w( src/core/lib/promise/try_seq.h ) s.files += %w( src/core/lib/resolver/resolver.cc ) s.files += %w( src/core/lib/resolver/resolver.h ) diff --git a/package.xml b/package.xml index 99acefc790b..1dc8ab46392 100644 --- a/package.xml +++ b/package.xml @@ -1264,17 +1264,18 @@ - + + @@ -1283,6 +1284,7 @@ + diff --git a/src/core/BUILD b/src/core/BUILD index 99b4c3831be..7be287f76d2 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -362,32 +362,41 @@ grpc_cc_library( ) grpc_cc_library( - name = "map_pipe", - external_deps = ["absl/status"], + name = "try_concurrently", + hdrs = ["lib/promise/try_concurrently.h"], + external_deps = [ + "absl/status", + "absl/types:variant", + ], language = "c++", public_hdrs = [ "lib/promise/map_pipe.h", ], deps = [ + "construct_destruct", "for_each", "map", "pipe", + "poll", "promise_factory", - "//:gpr_platform", + "promise_like", + "promise_status", + "//:gpr", ], ) grpc_cc_library( - name = "call_push_pull", - hdrs = ["lib/promise/call_push_pull.h"], - external_deps = ["absl/types:variant"], + name = "map_pipe", + external_deps = ["absl/status"], language = "c++", + public_hdrs = [ + "lib/promise/map_pipe.h", + ], deps = [ - "bitset", - "construct_destruct", - "poll", - "promise_like", - "promise_status", + "for_each", + "map", + "pipe", + "promise_factory", "//:gpr_platform", ], ) @@ -443,6 +452,7 @@ grpc_cc_library( ], deps = [ "arena", + "construct_destruct", "context", "poll", "//:gpr_platform", diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index 83ca6db4703..1833a7fefc7 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -39,11 +39,10 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/promise/call_push_pull.h" #include "src/core/lib/promise/context.h" -#include "src/core/lib/promise/detail/basic_seq.h" #include "src/core/lib/promise/latch.h" #include "src/core/lib/promise/seq.h" +#include "src/core/lib/promise/try_concurrently.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/slice/percent_encoding.h" #include "src/core/lib/transport/status_conversion.h" @@ -122,21 +121,20 @@ ArenaPromise HttpClientFilter::MakeCallPromise( auto* write_latch = std::exchange(call_args.server_initial_metadata, read_latch); - return CallPushPull( - Seq(next_promise_factory(std::move(call_args)), - [](ServerMetadataHandle md) -> ServerMetadataHandle { - auto r = CheckServerMetadata(md.get()); - if (!r.ok()) return ServerMetadataFromStatus(r); - return md; - }), - []() { return absl::OkStatus(); }, - Seq(read_latch->Wait(), - [write_latch](ServerMetadata** md) -> absl::Status { - auto r = - *md == nullptr ? absl::OkStatus() : CheckServerMetadata(*md); - write_latch->Set(*md); - return r; - })); + return TryConcurrently( + Seq(next_promise_factory(std::move(call_args)), + [](ServerMetadataHandle md) -> ServerMetadataHandle { + auto r = CheckServerMetadata(md.get()); + if (!r.ok()) return ServerMetadataFromStatus(r); + return md; + })) + .NecessaryPull(Seq(read_latch->Wait(), + [write_latch](ServerMetadata** md) -> absl::Status { + auto r = *md == nullptr ? absl::OkStatus() + : CheckServerMetadata(*md); + write_latch->Set(*md); + return r; + })); } HttpClientFilter::HttpClientFilter(HttpSchemeMetadata::ValueType scheme, diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index f6314d61592..1add5957927 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -32,12 +32,11 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/promise/call_push_pull.h" #include "src/core/lib/promise/context.h" -#include "src/core/lib/promise/detail/basic_seq.h" #include "src/core/lib/promise/latch.h" #include "src/core/lib/promise/promise.h" #include "src/core/lib/promise/seq.h" +#include "src/core/lib/promise/try_concurrently.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/slice/percent_encoding.h" #include "src/core/lib/slice/slice.h" @@ -132,21 +131,20 @@ ArenaPromise HttpServerFilter::MakeCallPromise( auto* write_latch = std::exchange(call_args.server_initial_metadata, read_latch); - return CallPushPull(Seq(next_promise_factory(std::move(call_args)), - [](ServerMetadataHandle md) -> ServerMetadataHandle { - FilterOutgoingMetadata(md.get()); - return md; - }), - Seq(read_latch->Wait(), - [write_latch](ServerMetadata** md) { - FilterOutgoingMetadata(*md); - (*md)->Set(HttpStatusMetadata(), 200); - (*md)->Set(ContentTypeMetadata(), - ContentTypeMetadata::kApplicationGrpc); - write_latch->Set(*md); - return absl::OkStatus(); - }), - []() { return absl::OkStatus(); }); + return TryConcurrently( + Seq(next_promise_factory(std::move(call_args)), + [](ServerMetadataHandle md) -> ServerMetadataHandle { + FilterOutgoingMetadata(md.get()); + return md; + })) + .Push(Seq(read_latch->Wait(), [write_latch](ServerMetadata** md) { + FilterOutgoingMetadata(*md); + (*md)->Set(HttpStatusMetadata(), 200); + (*md)->Set(ContentTypeMetadata(), + ContentTypeMetadata::kApplicationGrpc); + write_latch->Set(*md); + return absl::OkStatus(); + })); } absl::StatusOr HttpServerFilter::Create( diff --git a/src/core/lib/promise/arena_promise.h b/src/core/lib/promise/arena_promise.h index 7acc6c0389a..6b43b91ee16 100644 --- a/src/core/lib/promise/arena_promise.h +++ b/src/core/lib/promise/arena_promise.h @@ -19,11 +19,12 @@ #include -#include +#include #include #include "absl/meta/type_traits.h" +#include "src/core/lib/gprpp/construct_destruct.h" #include "src/core/lib/promise/context.h" #include "src/core/lib/promise/poll.h" #include "src/core/lib/resource_quota/arena.h" @@ -32,20 +33,28 @@ namespace grpc_core { namespace arena_promise_detail { +using ArgType = std::aligned_storage_t; +template +T*& ArgAsPtr(ArgType* arg) { + static_assert(sizeof(ArgType) >= sizeof(T**), + "Must have ArgType of at least one pointer size"); + return *reinterpret_cast(arg); +} + template struct Vtable { // Poll the promise, once. - Poll (*poll_once)(void** arg); + Poll (*poll_once)(ArgType* arg); // Destroy the underlying callable object if there is one. // Since we don't delete (the arena owns the memory) but we may need to call a // destructor, we expose this for when the ArenaPromise object is destroyed. - void (*destroy)(void** arg); + void (*destroy)(ArgType* arg); }; template struct VtableAndArg { const Vtable* vtable; - void* arg; + ArgType arg; }; // Implementation of Vtable for an empty object. @@ -53,37 +62,52 @@ struct VtableAndArg { // from. Since in either case these objects should not be polled, we simply // crash if it is. template -inline const Vtable* null_impl() { - static const Vtable vtable = {[](void**) -> Poll { - abort(); - GPR_UNREACHABLE_CODE(return Pending{}); - }, - [](void**) {}}; - return &vtable; -} +struct Null { + static const Vtable vtable; + + static Poll PollOnce(ArgType*) { + abort(); + GPR_UNREACHABLE_CODE(return Pending{}); + } + + static void Destroy(ArgType*) {} +}; + +template +const Vtable Null::vtable = {PollOnce, Destroy}; // Implementation of ImplInterface for a callable object. template -inline const Vtable* allocated_callable_impl() { - static const Vtable vtable = { - [](void** arg) -> Poll { - return poll_cast((*static_cast(*arg))()); - }, - [](void** arg) { static_cast(*arg)->~Callable(); }}; - return &vtable; -} +struct AllocatedCallable { + static const Vtable vtable; + + static Poll PollOnce(ArgType* arg) { + return poll_cast((*ArgAsPtr(arg))()); + } + + static void Destroy(ArgType* arg) { Destruct(ArgAsPtr(arg)); } +}; + +template +const Vtable AllocatedCallable::vtable = {PollOnce, Destroy}; // Implementation of ImplInterface for a small callable object (one that fits -// within the void* arg) +// within the ArgType arg) template -inline const Vtable* inlined_callable_impl() { - static const Vtable vtable = { - [](void** arg) -> Poll { - return poll_cast((*reinterpret_cast(arg))()); - }, - [](void** arg) { reinterpret_cast(arg)->~Callable(); }}; - return &vtable; -} +struct Inlined { + static const Vtable vtable; + + static Poll PollOnce(ArgType* arg) { + return poll_cast((*reinterpret_cast(arg))()); + } + + static void Destroy(ArgType* arg) { + Destruct(reinterpret_cast(arg)); + } +}; + +template +const Vtable Inlined::vtable = {PollOnce, Destroy}; // If a callable object is empty we can substitute any instance of that callable // for the one we call (for how could we tell the difference)? @@ -93,12 +117,17 @@ inline const Vtable* inlined_callable_impl() { // (this comes up often when the promise only accesses context data from the // containing activity). template -inline const Vtable* shared_callable_impl(Callable&& callable) { - static Callable instance = std::forward(callable); - static const Vtable vtable = {[](void**) -> Poll { return instance(); }, - [](void**) {}}; - return &vtable; -} +struct SharedCallable { + static const Vtable vtable; + + static Poll PollOnce(ArgType* arg) { + return (*reinterpret_cast(arg))(); + } +}; + +template +const Vtable SharedCallable::vtable = {PollOnce, + Null::Destroy}; // Redirector type: given a callable type, expose a Make() function that creates // the appropriate underlying implementation. @@ -109,11 +138,11 @@ template struct ChooseImplForCallable< T, Callable, absl::enable_if_t::value && - (sizeof(Callable) > sizeof(void*))>> { + (sizeof(Callable) > sizeof(ArgType))>> { static void Make(Callable&& callable, VtableAndArg* out) { - *out = {allocated_callable_impl(), - GetContext()->template New( - std::forward(callable))}; + out->vtable = &AllocatedCallable::vtable; + ArgAsPtr(&out->arg) = GetContext()->template New( + std::forward(callable)); } }; @@ -121,19 +150,19 @@ template struct ChooseImplForCallable< T, Callable, absl::enable_if_t::value && - (sizeof(Callable) <= sizeof(void*))>> { + (sizeof(Callable) <= sizeof(ArgType))>> { static void Make(Callable&& callable, VtableAndArg* out) { - out->vtable = inlined_callable_impl(); - new (&out->arg) Callable(std::forward(callable)); + out->vtable = &Inlined::vtable; + Construct(reinterpret_cast(&out->arg), + std::forward(callable)); } }; template struct ChooseImplForCallable< T, Callable, absl::enable_if_t::value>> { - static void Make(Callable&& callable, VtableAndArg* out) { - out->vtable = - shared_callable_impl(std::forward(callable)); + static void Make(Callable&&, VtableAndArg* out) { + out->vtable = &SharedCallable::vtable; } }; @@ -159,8 +188,8 @@ class ArenaPromise { absl::enable_if_t::value>> // NOLINTNEXTLINE(google-explicit-constructor) ArenaPromise(Callable&& callable) { - arena_promise_detail::MakeImplForCallable( - std::forward(callable), &vtable_and_arg_); + arena_promise_detail::MakeImplForCallable(std::forward(callable), + &vtable_and_arg_); } // ArenaPromise is not copyable. @@ -169,12 +198,12 @@ class ArenaPromise { // ArenaPromise is movable. ArenaPromise(ArenaPromise&& other) noexcept : vtable_and_arg_(other.vtable_and_arg_) { - other.vtable_and_arg_.vtable = arena_promise_detail::null_impl(); + other.vtable_and_arg_.vtable = &arena_promise_detail::Null::vtable; } ArenaPromise& operator=(ArenaPromise&& other) noexcept { vtable_and_arg_.vtable->destroy(&vtable_and_arg_.arg); vtable_and_arg_ = other.vtable_and_arg_; - other.vtable_and_arg_.vtable = arena_promise_detail::null_impl(); + other.vtable_and_arg_.vtable = &arena_promise_detail::Null::vtable; return *this; } @@ -187,13 +216,13 @@ class ArenaPromise { } bool has_value() const { - return vtable_and_arg_.vtable != arena_promise_detail::null_impl(); + return vtable_and_arg_.vtable != &arena_promise_detail::Null::vtable; } private: // Underlying impl object. arena_promise_detail::VtableAndArg vtable_and_arg_ = { - arena_promise_detail::null_impl(), nullptr}; + &arena_promise_detail::Null::vtable, {}}; }; } // namespace grpc_core diff --git a/src/core/lib/promise/call_push_pull.h b/src/core/lib/promise/call_push_pull.h deleted file mode 100644 index c5672a00412..00000000000 --- a/src/core/lib/promise/call_push_pull.h +++ /dev/null @@ -1,148 +0,0 @@ -// Copyright 2022 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef GRPC_CORE_LIB_PROMISE_CALL_PUSH_PULL_H -#define GRPC_CORE_LIB_PROMISE_CALL_PUSH_PULL_H - -#include - -#include - -#include - -#include "absl/types/variant.h" - -#include "src/core/lib/gprpp/bitset.h" -#include "src/core/lib/gprpp/construct_destruct.h" -#include "src/core/lib/promise/detail/promise_like.h" -#include "src/core/lib/promise/detail/status.h" -#include "src/core/lib/promise/poll.h" - -namespace grpc_core { - -namespace promise_detail { - -template -class CallPushPull { - public: - CallPushPull(FMain f_main, FPush f_push, FPull f_pull) - : push_(std::move(f_push)), pull_(std::move(f_pull)) { - Construct(&main_, std::move(f_main)); - } - - CallPushPull(const CallPushPull&) = delete; - CallPushPull& operator=(const CallPushPull&) = delete; - CallPushPull(CallPushPull&& other) noexcept - : done_(other.done_), - push_(std::move(other.push_)), - pull_(std::move(other.pull_)) { - assert(!done_.is_set(kDoneMain)); - Construct(&main_, std::move(other.main_)); - } - - CallPushPull& operator=(CallPushPull&& other) noexcept { - assert(!done_.is_set(kDoneMain)); - done_ = other.done_; - assert(!done_.is_set(kDoneMain)); - push_ = std::move(other.push_); - main_ = std::move(other.main_); - pull_ = std::move(other.pull_); - return *this; - } - - ~CallPushPull() { - if (done_.is_set(kDoneMain)) { - Destruct(&result_); - } else { - Destruct(&main_); - } - } - - using Result = - typename PollTraits>()())>::Type; - - Poll operator()() { - if (!done_.is_set(kDonePush)) { - auto p = push_(); - if (auto* status = absl::get_if(&p)) { - if (IsStatusOk(*status)) { - done_.set(kDonePush); - } else { - return StatusCast(std::move(*status)); - } - } - } - if (!done_.is_set(kDoneMain)) { - auto p = main_(); - if (auto* status = absl::get_if(&p)) { - done_.set(kDoneMain); - Destruct(&main_); - Construct(&result_, std::move(*status)); - } - } - if (!done_.is_set(kDonePull)) { - auto p = pull_(); - if (auto* status = absl::get_if(&p)) { - if (IsStatusOk(*status)) { - done_.set(kDonePull); - } else { - return StatusCast(std::move(*status)); - } - } - } - if (done_.all()) return std::move(result_); - return Pending{}; - } - - private: - enum { kDonePull = 0, kDoneMain = 1, kDonePush = 2 }; - BitSet<3> done_; - GPR_NO_UNIQUE_ADDRESS PromiseLike push_; - union { - PromiseLike main_; - Result result_; - }; - GPR_NO_UNIQUE_ADDRESS PromiseLike pull_; -}; - -} // namespace promise_detail - -// For promises representing calls a common pattern emerges: -// There's a process pushing data down the stack, a process handling the main -// call part, and a process pulling data back up the stack. -// -// This can reasonably be represented by the right combinations of TryJoins and -// Maps, but since the structure is fundamental to the domain we introduce -// this simple helper to make it easier to write the common case. -// -// It takes three promises: the main call, the push and the pull. -// When polling, the push is polled first, then the main call (descending the -// stack), then the pull (as we ascend once more). -// -// If the push or the pull fail early, then the entire call fails. -// If the main part of the call fails, we wait until both push and pull are also -// done. -// -// This strategy minimizes repolls. -template -promise_detail::CallPushPull CallPushPull(FMain f_main, - FPush f_push, - FPull f_pull) { - return promise_detail::CallPushPull( - std::move(f_main), std::move(f_push), std::move(f_pull)); -} - -} // namespace grpc_core - -#endif // GRPC_CORE_LIB_PROMISE_CALL_PUSH_PULL_H diff --git a/src/core/lib/promise/try_concurrently.h b/src/core/lib/promise/try_concurrently.h new file mode 100644 index 00000000000..a6f19cf5d5f --- /dev/null +++ b/src/core/lib/promise/try_concurrently.h @@ -0,0 +1,341 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_PROMISE_TRY_CONCURRENTLY_H +#define GRPC_CORE_LIB_PROMISE_TRY_CONCURRENTLY_H + +#include + +#include + +#include +#include + +#include "absl/types/variant.h" + +#include + +#include "src/core/lib/gprpp/construct_destruct.h" +#include "src/core/lib/promise/detail/promise_like.h" +#include "src/core/lib/promise/detail/status.h" +#include "src/core/lib/promise/poll.h" + +namespace grpc_core { + +namespace promise_detail { + +template +struct Necessary { + PromiseLike promise; + static constexpr bool must_complete() { return true; } +}; + +template +struct Helper { + PromiseLike promise; + static constexpr bool must_complete() { return false; } +}; + +// A set of promises that can be polled concurrently. +// Fuses them when completed (that is, destroys the promise and records it +// completed). +// Relies on an external bit field to handle the recording: this saves a bunch +// of space, but means the implementation of this type is weird: it's really +// super tied to TryConcurrently and no attempt should be made to use this +// independently. +template +class FusedSet; + +template +class FusedSet : public FusedSet { + public: + explicit FusedSet(T&& x, Ts&&... xs) + : FusedSet(std::forward(xs)...) { + Construct(&wrapper_, std::forward(x)); + } + explicit FusedSet(T&& x, FusedSet&& xs) + : FusedSet(std::forward>(xs)) { + Construct(&wrapper_, std::forward(x)); + } + // Empty destructor: consumers must call Destroy() to ensure cleanup occurs + ~FusedSet() {} + + FusedSet(const FusedSet&) = delete; + FusedSet& operator=(const FusedSet&) = delete; + + // Assumes all 'done_bits' for other are 0 and will be set to 1 + FusedSet(FusedSet&& other) noexcept : FusedSet(std::move(other)) { + Construct(&wrapper_, std::move(other.wrapper_)); + Destruct(&other.wrapper_); + } + + static constexpr size_t Size() { return 1 + sizeof...(Ts); } + + static constexpr uint8_t NecessaryBits() { + return (T::must_complete() ? 1 : 0) | + (FusedSet::NecessaryBits() << 1); + } + + template + void Destroy(uint8_t done_bits) { + if ((done_bits & (1 << kDoneBit)) == 0) { + Destruct(&wrapper_); + } + FusedSet::template Destroy(done_bits); + } + + template + Poll Run(uint8_t& done_bits) { + if ((done_bits & (1 << kDoneBit)) == 0) { + auto p = wrapper_.promise(); + if (auto* status = absl::get_if(&p)) { + done_bits |= (1 << kDoneBit); + Destruct(&wrapper_); + if (!IsStatusOk(*status)) { + return StatusCast(std::move(*status)); + } + } + } + return FusedSet::template Run(done_bits); + } + + template + FusedSet With(P x) { + return FusedSet(std::move(x), std::move(*this)); + } + + private: + union { + T wrapper_; + }; +}; + +template <> +class FusedSet<> { + public: + static constexpr size_t Size() { return 0; } + static constexpr uint8_t NecessaryBits() { return 0; } + + template + Poll Run(uint8_t) { + return Pending{}; + } + template + void Destroy(uint8_t) {} + + template + FusedSet

With(P x) { + return FusedSet

(std::move(x)); + } +}; + +template +class TryConcurrently { + public: + TryConcurrently(Main main, PreMain pre_main, PostMain post_main) + : done_bits_(0), + pre_main_(std::move(pre_main)), + post_main_(std::move(post_main)) { + Construct(&main_, std::move(main)); + } + + TryConcurrently(const TryConcurrently&) = delete; + TryConcurrently& operator=(const TryConcurrently&) = delete; + TryConcurrently(TryConcurrently&& other) noexcept + : done_bits_(0), + pre_main_(std::move(other.pre_main_)), + post_main_(std::move(other.post_main_)) { + GPR_DEBUG_ASSERT(other.done_bits_ == 0); + other.done_bits_ = HelperBits(); + Construct(&main_, std::move(other.main_)); + } + TryConcurrently& operator=(TryConcurrently&& other) noexcept { + GPR_DEBUG_ASSERT(other.done_bits_ == 0); + done_bits_ = 0; + other.done_bits_ = HelperBits(); + pre_main_ = std::move(other.pre_main_); + post_main_ = std::move(other.post_main_); + Construct(&main_, std::move(other.main_)); + return *this; + } + + ~TryConcurrently() { + if (done_bits_ & 1) { + Destruct(&result_); + } else { + Destruct(&main_); + } + pre_main_.template Destroy<1>(done_bits_); + post_main_.template Destroy<1 + PreMain::Size()>(done_bits_); + } + + using Result = + typename PollTraits>()())>::Type; + + Poll operator()() { + auto r = pre_main_.template Run(done_bits_); + if (auto* status = absl::get_if(&r)) { + GPR_DEBUG_ASSERT(!IsStatusOk(*status)); + return std::move(*status); + } + if ((done_bits_ & 1) == 0) { + auto p = main_(); + if (auto* status = absl::get_if(&p)) { + done_bits_ |= 1; + Destruct(&main_); + Construct(&result_, std::move(*status)); + } + } + r = post_main_.template Run(done_bits_); + if (auto* status = absl::get_if(&r)) { + GPR_DEBUG_ASSERT(!IsStatusOk(*status)); + return std::move(*status); + } + if ((done_bits_ & NecessaryBits()) == NecessaryBits()) { + return std::move(result_); + } + return Pending{}; + } + + template + auto NecessaryPush(P p); + template + auto NecessaryPull(P p); + template + auto Push(P p); + template + auto Pull(P p); + + private: + // Bitmask for done_bits_ specifying which promises must be completed prior to + // returning ok. + constexpr uint8_t NecessaryBits() { + return 1 | (PreMain::NecessaryBits() << 1) | + (PostMain::NecessaryBits() << (1 + PreMain::Size())); + } + // Bitmask for done_bits_ specifying what all of the promises being complete + // would look like. + constexpr uint8_t AllBits() { + return (1 << (1 + PreMain::Size() + PostMain::Size())) - 1; + } + // Bitmask of done_bits_ specifying which bits correspond to helper promises - + // that is all promises that are not the main one. + constexpr uint8_t HelperBits() { return AllBits() ^ 1; } + + // done_bits signifies which operations have completed. + // Bit 0 is set if main_ has completed. + // The next higher bits correspond one per pre-main promise. + // The next higher bits correspond one per post-main promise. + // So, going from most significant bit to least significant: + // +--------------+-------------+--------+ + // |post_main bits|pre_main bits|main bit| + // +--------------+-------------+--------+ + uint8_t done_bits_; + PreMain pre_main_; + union { + PromiseLike

main_; + Result result_; + }; + PostMain post_main_; +}; + +template +auto MakeTryConcurrently(Main&& main, PreMain&& pre_main, + PostMain&& post_main) { + return TryConcurrently( + std::forward
(main), std::forward(pre_main), + std::forward(post_main)); +} + +template +template +auto TryConcurrently::NecessaryPush(P p) { + GPR_DEBUG_ASSERT(done_bits_ == 0); + done_bits_ = HelperBits(); + return MakeTryConcurrently(std::move(main_), + pre_main_.With(Necessary

{std::move(p)}), + std::move(post_main_)); +} + +template +template +auto TryConcurrently::NecessaryPull(P p) { + GPR_DEBUG_ASSERT(done_bits_ == 0); + done_bits_ = HelperBits(); + return MakeTryConcurrently(std::move(main_), std::move(pre_main_), + post_main_.With(Necessary

{std::move(p)})); +} + +template +template +auto TryConcurrently::Push(P p) { + GPR_DEBUG_ASSERT(done_bits_ == 0); + done_bits_ = HelperBits(); + return MakeTryConcurrently(std::move(main_), + pre_main_.With(Helper

{std::move(p)}), + std::move(post_main_)); +} + +template +template +auto TryConcurrently::Pull(P p) { + GPR_DEBUG_ASSERT(done_bits_ == 0); + done_bits_ = HelperBits(); + return MakeTryConcurrently(std::move(main_), std::move(pre_main_), + post_main_.With(Helper

{std::move(p)})); +} + +} // namespace promise_detail + +// TryConcurrently runs a set of promises concurrently. +// There is a structure to the promises: +// - A 'main' promise dominates the others - it must complete before the +// overall promise successfully completes. Its result is chosen in the event +// of successful completion. +// - A set of (optional) push and pull promises to aid main. Push promises are +// polled before main, pull promises are polled after. In this way we can +// avoid overall wakeup churn - sending a message will tend to push things +// down the promise tree as its polled, so that send should be in a push +// promise - then as the main promise is polled and it calls into things +// lower in the stack they'll already see things there (this reasoning holds +// for receiving things and the pull promises too!). +// - Each push and pull promise is either necessary or optional. +// Necessary promises must complete successfully before the overall promise +// completes. Optional promises will just be cancelled once the main promise +// completes and any necessary helpers. +// - If any of the promises fail, the overall promise fails immediately. +// API: +// This function, TryConcurrently, is used to create a TryConcurrently promise. +// It takes a single argument, being the main promise. That promise also has +// a set of methods for attaching push and pull promises. The act of attachment +// returns a new TryConcurrently promise with previous contained promises moved +// out. +// The methods exposed: +// - Push, NecessaryPush: attach a push promise (with the first variant being +// optional, the second necessary). +// - Pull, NecessaryPull: attach a pull promise, with variants as above. +// Example: +// TryConcurrently(call_next_filter(std::move(call_args))) +// .Push(send_messages_promise) +// .Pull(recv_messages_promise) +template +auto TryConcurrently(Main main) { + return promise_detail::MakeTryConcurrently(std::move(main), + promise_detail::FusedSet<>(), + promise_detail::FusedSet<>()); +} + +} // namespace grpc_core + +#endif // GRPC_CORE_LIB_PROMISE_TRY_CONCURRENTLY_H diff --git a/test/core/promise/BUILD b/test/core/promise/BUILD index 831b932f650..15f9b286ab3 100644 --- a/test/core/promise/BUILD +++ b/test/core/promise/BUILD @@ -406,8 +406,8 @@ grpc_cc_test( ) grpc_cc_test( - name = "call_push_pull_test", - srcs = ["call_push_pull_test.cc"], + name = "try_concurrently_test", + srcs = ["try_concurrently_test.cc"], external_deps = [ "gtest", "absl/status", @@ -417,6 +417,6 @@ grpc_cc_test( uses_event_engine = False, uses_polling = False, deps = [ - "//src/core:call_push_pull", + "//src/core:try_concurrently", ], ) diff --git a/test/core/promise/arena_promise_test.cc b/test/core/promise/arena_promise_test.cc index b7c37900d80..f5f8164b16b 100644 --- a/test/core/promise/arena_promise_test.cc +++ b/test/core/promise/arena_promise_test.cc @@ -14,6 +14,7 @@ #include "src/core/lib/promise/arena_promise.h" +#include #include #include "absl/types/variant.h" @@ -71,6 +72,28 @@ TEST(ArenaPromiseTest, MoveAssignmentWorks) { p = ArenaPromise(); } +TEST(ArenaPromiseTest, AllocatedUniquePtrWorks) { + ExecCtx exec_ctx; + auto arena = MakeScopedArena(1024, g_memory_allocator); + TestContext context(arena.get()); + std::array garbage = {0, 1, 2, 3, 4}; + auto freer = [garbage](int* p) { free(p + garbage[0]); }; + using Ptr = std::unique_ptr; + Ptr x(([] { + int* p = static_cast(malloc(sizeof(*p))); + *p = 42; + return p; + })(), + freer); + static_assert(sizeof(x) > sizeof(arena_promise_detail::ArgType), + "This test assumes the unique ptr will go down the allocated " + "path for ArenaPromise"); + ArenaPromise initial_promise( + [x = std::move(x)]() mutable { return Poll(std::move(x)); }); + ArenaPromise p(std::move(initial_promise)); + EXPECT_EQ(*absl::get(p()), 42); +} + } // namespace grpc_core int main(int argc, char** argv) { diff --git a/test/core/promise/call_push_pull_test.cc b/test/core/promise/call_push_pull_test.cc deleted file mode 100644 index 89d2ab8107f..00000000000 --- a/test/core/promise/call_push_pull_test.cc +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2022 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "src/core/lib/promise/call_push_pull.h" - -#include - -#include "absl/status/status.h" -#include "gtest/gtest.h" - -namespace grpc_core { - -TEST(CallPushPullTest, Empty) { - auto p = CallPushPull([] { return absl::OkStatus(); }, - [] { return absl::OkStatus(); }, - [] { return absl::OkStatus(); }); - EXPECT_EQ(p(), Poll(absl::OkStatus())); -} - -TEST(CallPushPullTest, Paused) { - auto p = CallPushPull([]() -> Poll { return Pending{}; }, - []() -> Poll { return Pending{}; }, - []() -> Poll { return Pending{}; }); - EXPECT_EQ(p(), Poll(Pending{})); -} - -TEST(CallPushPullTest, OneReady) { - auto a = CallPushPull([]() -> Poll { return absl::OkStatus(); }, - []() -> Poll { return Pending{}; }, - []() -> Poll { return Pending{}; }); - EXPECT_EQ(a(), Poll(Pending{})); - auto b = CallPushPull([]() -> Poll { return Pending{}; }, - []() -> Poll { return absl::OkStatus(); }, - []() -> Poll { return Pending{}; }); - EXPECT_EQ(b(), Poll(Pending{})); - auto c = - CallPushPull([]() -> Poll { return Pending{}; }, - []() -> Poll { return Pending{}; }, - []() -> Poll { return absl::OkStatus(); }); - EXPECT_EQ(c(), Poll(Pending{})); -} - -TEST(CallPushPullTest, OneFailed) { - auto a = CallPushPull( - []() -> Poll { return absl::UnknownError("bah"); }, - []() -> Poll { return absl::OkStatus(); }, - []() -> Poll { return absl::OkStatus(); }); - EXPECT_EQ(a(), Poll(absl::UnknownError("bah"))); - auto b = CallPushPull( - []() -> Poll { return Pending{}; }, - []() -> Poll { return absl::UnknownError("humbug"); }, - []() -> Poll { return Pending{}; }); - EXPECT_EQ(b(), Poll(absl::UnknownError("humbug"))); - auto c = CallPushPull( - []() -> Poll { return Pending{}; }, - []() -> Poll { return Pending{}; }, - []() -> Poll { return absl::UnknownError("wha"); }); - EXPECT_EQ(c(), Poll(absl::UnknownError("wha"))); -} - -} // namespace grpc_core - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/test/core/promise/try_concurrently_test.cc b/test/core/promise/try_concurrently_test.cc new file mode 100644 index 00000000000..7013e840017 --- /dev/null +++ b/test/core/promise/try_concurrently_test.cc @@ -0,0 +1,160 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/lib/promise/try_concurrently.h" + +#include +#include +#include +#include +#include + +#include "absl/status/status.h" +#include "gtest/gtest.h" + +namespace grpc_core { + +class PromiseFactory { + public: + // Create a promise that resolves to Ok but has a memory allocation (to verify + // destruction) + auto OkPromise(std::string tag) { + return [this, tag = std::move(tag), + p = std::make_unique(absl::OkStatus())]() mutable { + order_.push_back(tag); + return std::move(*p); + }; + } + + // Create a promise that never resolves and carries a memory allocation + auto NeverPromise(std::string tag) { + return [this, tag = std::move(tag), + p = std::make_unique()]() -> Poll { + order_.push_back(tag); + return *p; + }; + } + + // Create a promise that fails and carries a memory allocation + auto FailPromise(std::string tag) { + return [this, p = std::make_unique(absl::UnknownError(tag)), + tag = std::move(tag)]() mutable { + order_.push_back(tag); + return std::move(*p); + }; + } + + // Finish one round and return a vector of strings representing which promises + // were polled and in which order. + std::vector Finish() { return std::exchange(order_, {}); } + + private: + std::vector order_; +}; + +std::ostream& operator<<(std::ostream& out, const Poll& p) { + return out << PollToString( + p, [](const absl::Status& s) { return s.ToString(); }); +} + +TEST(TryConcurrentlyTest, Immediate) { + PromiseFactory pf; + auto a = TryConcurrently(pf.OkPromise("1")); + EXPECT_EQ(a(), Poll(absl::OkStatus())); + EXPECT_EQ(pf.Finish(), std::vector({"1"})); + auto b = TryConcurrently(pf.OkPromise("1")).NecessaryPush(pf.OkPromise("2")); + EXPECT_EQ(b(), Poll(absl::OkStatus())); + EXPECT_EQ(pf.Finish(), std::vector({"2", "1"})); + auto c = TryConcurrently(pf.OkPromise("1")).NecessaryPull(pf.OkPromise("2")); + EXPECT_EQ(c(), Poll(absl::OkStatus())); + EXPECT_EQ(pf.Finish(), std::vector({"1", "2"})); + auto d = TryConcurrently(pf.OkPromise("1")) + .NecessaryPull(pf.OkPromise("2")) + .NecessaryPush(pf.OkPromise("3")); + EXPECT_EQ(d(), Poll(absl::OkStatus())); + EXPECT_EQ(pf.Finish(), std::vector({"3", "1", "2"})); + auto e = TryConcurrently(pf.OkPromise("1")).Push(pf.NeverPromise("2")); + EXPECT_EQ(e(), Poll(absl::OkStatus())); + EXPECT_EQ(pf.Finish(), std::vector({"2", "1"})); + auto f = TryConcurrently(pf.OkPromise("1")).Pull(pf.NeverPromise("2")); + EXPECT_EQ(f(), Poll(absl::OkStatus())); + EXPECT_EQ(pf.Finish(), std::vector({"1", "2"})); +} + +TEST(TryConcurrentlyTest, Paused) { + PromiseFactory pf; + auto a = TryConcurrently(pf.NeverPromise("1")); + EXPECT_EQ(a(), Poll(Pending{})); + EXPECT_EQ(pf.Finish(), std::vector({"1"})); + auto b = + TryConcurrently(pf.OkPromise("1")).NecessaryPush(pf.NeverPromise("2")); + EXPECT_EQ(b(), Poll(Pending{})); + EXPECT_EQ(pf.Finish(), std::vector({"2", "1"})); + auto c = + TryConcurrently(pf.OkPromise("1")).NecessaryPull(pf.NeverPromise("2")); + EXPECT_EQ(c(), Poll(Pending{})); + EXPECT_EQ(pf.Finish(), std::vector({"1", "2"})); +} + +TEST(TryConcurrentlyTest, OneFailed) { + PromiseFactory pf; + auto a = TryConcurrently(pf.FailPromise("bah")); + EXPECT_EQ(a(), Poll(absl::UnknownError("bah"))); + EXPECT_EQ(pf.Finish(), std::vector({"bah"})); + auto b = TryConcurrently(pf.NeverPromise("1")) + .NecessaryPush(pf.FailPromise("humbug")); + EXPECT_EQ(b(), Poll(absl::UnknownError("humbug"))); + EXPECT_EQ(pf.Finish(), std::vector({"humbug"})); + auto c = TryConcurrently(pf.NeverPromise("1")) + .NecessaryPull(pf.FailPromise("wha")); + EXPECT_EQ(c(), Poll(absl::UnknownError("wha"))); + EXPECT_EQ(pf.Finish(), std::vector({"1", "wha"})); +} + +// A pointer to an int designed to cause a double free if it's double destructed +// (to flush out bugs) +class ProblematicPointer { + public: + ProblematicPointer() : p_(new int(0)) {} + ~ProblematicPointer() { delete p_; } + ProblematicPointer(const ProblematicPointer&) = delete; + ProblematicPointer& operator=(const ProblematicPointer&) = delete; + // NOLINTNEXTLINE: we want to allocate during move + ProblematicPointer(ProblematicPointer&& other) : p_(new int(*other.p_ + 1)) {} + ProblematicPointer& operator=(ProblematicPointer&& other) = delete; + + private: + int* p_; +}; + +TEST(TryConcurrentlyTest, MoveItMoveIt) { + auto a = + TryConcurrently([x = ProblematicPointer()]() { return absl::OkStatus(); }) + .NecessaryPull( + [x = ProblematicPointer()]() { return absl::OkStatus(); }) + .NecessaryPush( + [x = ProblematicPointer()]() { return absl::OkStatus(); }) + .Push([x = ProblematicPointer()]() { return absl::OkStatus(); }) + .Pull([x = ProblematicPointer()]() { return absl::OkStatus(); }); + auto b = std::move(a); + auto c = std::move(b); + c(); +} + +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index dc22e0de296..c180ea013f7 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -2267,17 +2267,18 @@ src/core/lib/matchers/matchers.h \ src/core/lib/promise/activity.cc \ src/core/lib/promise/activity.h \ src/core/lib/promise/arena_promise.h \ -src/core/lib/promise/call_push_pull.h \ src/core/lib/promise/context.h \ src/core/lib/promise/detail/basic_seq.h \ src/core/lib/promise/detail/promise_factory.h \ src/core/lib/promise/detail/promise_like.h \ src/core/lib/promise/detail/status.h \ src/core/lib/promise/exec_ctx_wakeup_scheduler.h \ +src/core/lib/promise/for_each.h \ src/core/lib/promise/intra_activity_waiter.h \ src/core/lib/promise/latch.h \ src/core/lib/promise/loop.h \ src/core/lib/promise/map.h \ +src/core/lib/promise/map_pipe.h \ src/core/lib/promise/pipe.cc \ src/core/lib/promise/pipe.h \ src/core/lib/promise/poll.h \ @@ -2286,6 +2287,7 @@ src/core/lib/promise/race.h \ src/core/lib/promise/seq.h \ src/core/lib/promise/sleep.cc \ src/core/lib/promise/sleep.h \ +src/core/lib/promise/try_concurrently.h \ src/core/lib/promise/try_seq.h \ src/core/lib/resolver/resolver.cc \ src/core/lib/resolver/resolver.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 29ec195e306..a236f685f30 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -2058,17 +2058,18 @@ src/core/lib/matchers/matchers.h \ src/core/lib/promise/activity.cc \ src/core/lib/promise/activity.h \ src/core/lib/promise/arena_promise.h \ -src/core/lib/promise/call_push_pull.h \ src/core/lib/promise/context.h \ src/core/lib/promise/detail/basic_seq.h \ src/core/lib/promise/detail/promise_factory.h \ src/core/lib/promise/detail/promise_like.h \ src/core/lib/promise/detail/status.h \ src/core/lib/promise/exec_ctx_wakeup_scheduler.h \ +src/core/lib/promise/for_each.h \ src/core/lib/promise/intra_activity_waiter.h \ src/core/lib/promise/latch.h \ src/core/lib/promise/loop.h \ src/core/lib/promise/map.h \ +src/core/lib/promise/map_pipe.h \ src/core/lib/promise/pipe.cc \ src/core/lib/promise/pipe.h \ src/core/lib/promise/poll.h \ @@ -2077,6 +2078,7 @@ src/core/lib/promise/race.h \ src/core/lib/promise/seq.h \ src/core/lib/promise/sleep.cc \ src/core/lib/promise/sleep.h \ +src/core/lib/promise/try_concurrently.h \ src/core/lib/promise/try_seq.h \ src/core/lib/resolver/resolver.cc \ src/core/lib/resolver/resolver.h \ diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index b3bff15b8ae..6473637cac6 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -1415,30 +1415,6 @@ ], "uses_polling": true }, - { - "args": [], - "benchmark": false, - "ci_platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "cpu_cost": 1.0, - "exclude_configs": [], - "exclude_iomgrs": [], - "flaky": false, - "gtest": true, - "language": "c++", - "name": "call_push_pull_test", - "platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "uses_polling": false - }, { "args": [], "benchmark": false, @@ -7797,6 +7773,30 @@ ], "uses_polling": false }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "try_concurrently_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,