Revert "[promise] CallPushPull -> more general TryConcurrently (#31429)" (#31480)

This reverts commit 11a8f66cca.
pull/31481/head
AJ Heller 2 years ago committed by GitHub
parent 27206c981c
commit 821443e9b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      BUILD
  2. 105
      CMakeLists.txt
  3. 115
      build_autogenerated.yaml
  4. 8
      gRPC-C++.podspec
  5. 8
      gRPC-Core.podspec
  6. 4
      grpc.gemspec
  7. 4
      package.xml
  8. 32
      src/core/BUILD
  9. 14
      src/core/ext/filters/http/client/http_client_filter.cc
  10. 14
      src/core/ext/filters/http/server/http_server_filter.cc
  11. 125
      src/core/lib/promise/arena_promise.h
  12. 148
      src/core/lib/promise/call_push_pull.h
  13. 341
      src/core/lib/promise/try_concurrently.h
  14. 6
      test/core/promise/BUILD
  15. 18
      test/core/promise/arena_promise_test.cc
  16. 77
      test/core/promise/call_push_pull_test.cc
  17. 160
      test/core/promise/try_concurrently_test.cc
  18. 4
      tools/doxygen/Doxyfile.c++.internal
  19. 4
      tools/doxygen/Doxyfile.core.internal
  20. 48
      tools/run_tests/generated/tests.json

@ -3110,6 +3110,8 @@ grpc_cc_library(
"promise",
"//src/core:arena",
"//src/core:arena_promise",
"//src/core:basic_seq",
"//src/core:call_push_pull",
"//src/core:channel_fwd",
"//src/core:channel_init",
"//src/core:channel_stack_type",
@ -3122,7 +3124,6 @@ grpc_cc_library(
"//src/core:slice_buffer",
"//src/core:status_helper",
"//src/core:transport_fwd",
"//src/core:try_concurrently",
],
)

105
CMakeLists.txt generated

@ -847,6 +847,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx byte_buffer_test)
add_dependencies(buildtests_cxx c_slice_buffer_test)
add_dependencies(buildtests_cxx call_finalization_test)
add_dependencies(buildtests_cxx call_push_pull_test)
add_dependencies(buildtests_cxx cancel_ares_query_test)
add_dependencies(buildtests_cxx cel_authorization_engine_test)
add_dependencies(buildtests_cxx certificate_provider_registry_test)
@ -1216,7 +1217,6 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx transport_security_common_api_test)
add_dependencies(buildtests_cxx transport_security_test)
add_dependencies(buildtests_cxx transport_stream_receiver_test)
add_dependencies(buildtests_cxx try_concurrently_test)
add_dependencies(buildtests_cxx try_join_test)
add_dependencies(buildtests_cxx try_seq_metadata_test)
add_dependencies(buildtests_cxx try_seq_test)
@ -6777,6 +6777,45 @@ target_link_libraries(call_finalization_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(call_push_pull_test
test/core/promise/call_push_pull_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(call_push_pull_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(call_push_pull_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::type_traits
absl::status
absl::statusor
absl::strings
absl::variant
)
endif()
if(gRPC_BUILD_TESTS)
@ -19457,70 +19496,6 @@ target_link_libraries(transport_stream_receiver_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(try_concurrently_test
src/core/ext/upb-generated/google/protobuf/any.upb.c
src/core/ext/upb-generated/google/rpc/status.upb.c
src/core/lib/debug/trace.cc
src/core/lib/event_engine/memory_allocator.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/iomgr_internal.cc
src/core/lib/promise/activity.cc
src/core/lib/promise/pipe.cc
src/core/lib/resource_quota/arena.cc
src/core/lib/resource_quota/memory_quota.cc
src/core/lib/resource_quota/periodic_update.cc
src/core/lib/resource_quota/trace.cc
src/core/lib/slice/percent_encoding.cc
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_string_helpers.cc
test/core/promise/try_concurrently_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(try_concurrently_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(try_concurrently_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::any_invocable
absl::function_ref
absl::hash
absl::type_traits
absl::statusor
absl::utility
gpr
upb
)
endif()
if(gRPC_BUILD_TESTS)

@ -878,25 +878,23 @@ libs:
- src/core/lib/matchers/matchers.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/call_push_pull.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/for_each.h
- src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/latch.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/map_pipe.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/promise.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/promise/sleep.h
- src/core/lib/promise/try_concurrently.h
- src/core/lib/promise/try_seq.h
- src/core/lib/resolver/resolver.h
- src/core/lib/resolver/resolver_factory.h
@ -2135,25 +2133,23 @@ libs:
- src/core/lib/load_balancing/subchannel_interface.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/call_push_pull.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/for_each.h
- src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/latch.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/map_pipe.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/promise.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/promise/sleep.h
- src/core/lib/promise/try_concurrently.h
- src/core/lib/promise/try_seq.h
- src/core/lib/resolver/resolver.h
- src/core/lib/resolver/resolver_factory.h
@ -4568,6 +4564,27 @@ targets:
- test/core/channel/call_finalization_test.cc
deps:
- grpc_test_util
- name: call_push_pull_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/gpr/useful.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/construct_destruct.h
- src/core/lib/promise/call_push_pull.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/poll.h
src:
- test/core/promise/call_push_pull_test.cc
deps:
- absl/meta:type_traits
- absl/status:status
- absl/status:statusor
- absl/strings:strings
- absl/types:variant
uses_polling: false
- name: cancel_ares_query_test
gtest: true
build: test
@ -10752,92 +10769,6 @@ targets:
deps:
- grpc_test_util
uses_polling: false
- name: try_concurrently_test
gtest: true
build: test
language: c++
headers:
- src/core/ext/upb-generated/google/protobuf/any.upb.h
- src/core/ext/upb-generated/google/rpc/status.upb.h
- src/core/lib/debug/trace.h
- src/core/lib/experiments/config.h
- src/core/lib/experiments/experiments.h
- src/core/lib/gpr/spinlock.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/debug_location.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
- src/core/lib/gprpp/status_helper.h
- src/core/lib/gprpp/time.h
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/combiner.h
- src/core/lib/iomgr/error.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/iomgr_internal.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/for_each.h
- src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/map_pipe.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/promise/try_concurrently.h
- src/core/lib/resource_quota/arena.h
- src/core/lib/resource_quota/memory_quota.h
- src/core/lib/resource_quota/periodic_update.h
- src/core/lib/resource_quota/trace.h
- src/core/lib/slice/percent_encoding.h
- src/core/lib/slice/slice.h
- src/core/lib/slice/slice_internal.h
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_string_helpers.h
src:
- src/core/ext/upb-generated/google/protobuf/any.upb.c
- src/core/ext/upb-generated/google/rpc/status.upb.c
- src/core/lib/debug/trace.cc
- src/core/lib/event_engine/memory_allocator.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/iomgr_internal.cc
- src/core/lib/promise/activity.cc
- src/core/lib/promise/pipe.cc
- src/core/lib/resource_quota/arena.cc
- src/core/lib/resource_quota/memory_quota.cc
- src/core/lib/resource_quota/periodic_update.cc
- src/core/lib/resource_quota/trace.cc
- src/core/lib/slice/percent_encoding.cc
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_string_helpers.cc
- test/core/promise/try_concurrently_test.cc
deps:
- absl/functional:any_invocable
- absl/functional:function_ref
- absl/hash:hash
- absl/meta:type_traits
- absl/status:statusor
- absl/utility:utility
- gpr
- upb
uses_polling: false
- name: try_join_test
gtest: true
build: test

8
gRPC-C++.podspec generated

@ -841,25 +841,23 @@ Pod::Spec.new do |s|
'src/core/lib/matchers/matchers.h',
'src/core/lib/promise/activity.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/call_push_pull.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',
'src/core/lib/promise/detail/promise_like.h',
'src/core/lib/promise/detail/status.h',
'src/core/lib/promise/exec_ctx_wakeup_scheduler.h',
'src/core/lib/promise/for_each.h',
'src/core/lib/promise/intra_activity_waiter.h',
'src/core/lib/promise/latch.h',
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',
'src/core/lib/promise/map_pipe.h',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',
'src/core/lib/promise/promise.h',
'src/core/lib/promise/race.h',
'src/core/lib/promise/seq.h',
'src/core/lib/promise/sleep.h',
'src/core/lib/promise/try_concurrently.h',
'src/core/lib/promise/try_seq.h',
'src/core/lib/resolver/resolver.h',
'src/core/lib/resolver/resolver_factory.h',
@ -1719,25 +1717,23 @@ Pod::Spec.new do |s|
'src/core/lib/matchers/matchers.h',
'src/core/lib/promise/activity.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/call_push_pull.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',
'src/core/lib/promise/detail/promise_like.h',
'src/core/lib/promise/detail/status.h',
'src/core/lib/promise/exec_ctx_wakeup_scheduler.h',
'src/core/lib/promise/for_each.h',
'src/core/lib/promise/intra_activity_waiter.h',
'src/core/lib/promise/latch.h',
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',
'src/core/lib/promise/map_pipe.h',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',
'src/core/lib/promise/promise.h',
'src/core/lib/promise/race.h',
'src/core/lib/promise/seq.h',
'src/core/lib/promise/sleep.h',
'src/core/lib/promise/try_concurrently.h',
'src/core/lib/promise/try_seq.h',
'src/core/lib/resolver/resolver.h',
'src/core/lib/resolver/resolver_factory.h',

8
gRPC-Core.podspec generated

@ -1370,18 +1370,17 @@ Pod::Spec.new do |s|
'src/core/lib/promise/activity.cc',
'src/core/lib/promise/activity.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/call_push_pull.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',
'src/core/lib/promise/detail/promise_like.h',
'src/core/lib/promise/detail/status.h',
'src/core/lib/promise/exec_ctx_wakeup_scheduler.h',
'src/core/lib/promise/for_each.h',
'src/core/lib/promise/intra_activity_waiter.h',
'src/core/lib/promise/latch.h',
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',
'src/core/lib/promise/map_pipe.h',
'src/core/lib/promise/pipe.cc',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',
@ -1390,7 +1389,6 @@ Pod::Spec.new do |s|
'src/core/lib/promise/seq.h',
'src/core/lib/promise/sleep.cc',
'src/core/lib/promise/sleep.h',
'src/core/lib/promise/try_concurrently.h',
'src/core/lib/promise/try_seq.h',
'src/core/lib/resolver/resolver.cc',
'src/core/lib/resolver/resolver.h',
@ -2357,25 +2355,23 @@ Pod::Spec.new do |s|
'src/core/lib/matchers/matchers.h',
'src/core/lib/promise/activity.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/call_push_pull.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',
'src/core/lib/promise/detail/promise_like.h',
'src/core/lib/promise/detail/status.h',
'src/core/lib/promise/exec_ctx_wakeup_scheduler.h',
'src/core/lib/promise/for_each.h',
'src/core/lib/promise/intra_activity_waiter.h',
'src/core/lib/promise/latch.h',
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',
'src/core/lib/promise/map_pipe.h',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',
'src/core/lib/promise/promise.h',
'src/core/lib/promise/race.h',
'src/core/lib/promise/seq.h',
'src/core/lib/promise/sleep.h',
'src/core/lib/promise/try_concurrently.h',
'src/core/lib/promise/try_seq.h',
'src/core/lib/resolver/resolver.h',
'src/core/lib/resolver/resolver_factory.h',

4
grpc.gemspec generated

@ -1281,18 +1281,17 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/promise/activity.cc )
s.files += %w( src/core/lib/promise/activity.h )
s.files += %w( src/core/lib/promise/arena_promise.h )
s.files += %w( src/core/lib/promise/call_push_pull.h )
s.files += %w( src/core/lib/promise/context.h )
s.files += %w( src/core/lib/promise/detail/basic_seq.h )
s.files += %w( src/core/lib/promise/detail/promise_factory.h )
s.files += %w( src/core/lib/promise/detail/promise_like.h )
s.files += %w( src/core/lib/promise/detail/status.h )
s.files += %w( src/core/lib/promise/exec_ctx_wakeup_scheduler.h )
s.files += %w( src/core/lib/promise/for_each.h )
s.files += %w( src/core/lib/promise/intra_activity_waiter.h )
s.files += %w( src/core/lib/promise/latch.h )
s.files += %w( src/core/lib/promise/loop.h )
s.files += %w( src/core/lib/promise/map.h )
s.files += %w( src/core/lib/promise/map_pipe.h )
s.files += %w( src/core/lib/promise/pipe.cc )
s.files += %w( src/core/lib/promise/pipe.h )
s.files += %w( src/core/lib/promise/poll.h )
@ -1301,7 +1300,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/promise/seq.h )
s.files += %w( src/core/lib/promise/sleep.cc )
s.files += %w( src/core/lib/promise/sleep.h )
s.files += %w( src/core/lib/promise/try_concurrently.h )
s.files += %w( src/core/lib/promise/try_seq.h )
s.files += %w( src/core/lib/resolver/resolver.cc )
s.files += %w( src/core/lib/resolver/resolver.h )

4
package.xml generated

@ -1263,18 +1263,17 @@
<file baseinstalldir="/" name="src/core/lib/promise/activity.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/activity.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/arena_promise.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/call_push_pull.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/context.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/detail/basic_seq.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/detail/promise_factory.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/detail/promise_like.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/detail/status.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/exec_ctx_wakeup_scheduler.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/for_each.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/intra_activity_waiter.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/latch.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/loop.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/map.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/map_pipe.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/pipe.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/pipe.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/poll.h" role="src" />
@ -1283,7 +1282,6 @@
<file baseinstalldir="/" name="src/core/lib/promise/seq.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/sleep.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/sleep.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/try_concurrently.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/try_seq.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/resolver/resolver.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/resolver/resolver.h" role="src" />

@ -362,41 +362,32 @@ grpc_cc_library(
)
grpc_cc_library(
name = "try_concurrently",
hdrs = ["lib/promise/try_concurrently.h"],
external_deps = [
"absl/status",
"absl/types:variant",
],
name = "map_pipe",
external_deps = ["absl/status"],
language = "c++",
public_hdrs = [
"lib/promise/map_pipe.h",
],
deps = [
"construct_destruct",
"for_each",
"map",
"pipe",
"poll",
"promise_factory",
"promise_like",
"promise_status",
"//:gpr",
"//:gpr_platform",
],
)
grpc_cc_library(
name = "map_pipe",
external_deps = ["absl/status"],
name = "call_push_pull",
hdrs = ["lib/promise/call_push_pull.h"],
external_deps = ["absl/types:variant"],
language = "c++",
public_hdrs = [
"lib/promise/map_pipe.h",
],
deps = [
"for_each",
"map",
"pipe",
"promise_factory",
"bitset",
"construct_destruct",
"poll",
"promise_like",
"promise_status",
"//:gpr_platform",
],
)
@ -452,7 +443,6 @@ grpc_cc_library(
],
deps = [
"arena",
"construct_destruct",
"context",
"poll",
"//:gpr_platform",

@ -39,10 +39,11 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/promise/call_push_pull.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/basic_seq.h"
#include "src/core/lib/promise/latch.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/promise/try_concurrently.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/percent_encoding.h"
#include "src/core/lib/transport/status_conversion.h"
@ -121,17 +122,18 @@ ArenaPromise<ServerMetadataHandle> HttpClientFilter::MakeCallPromise(
auto* write_latch =
std::exchange(call_args.server_initial_metadata, read_latch);
return TryConcurrently(
return CallPushPull(
Seq(next_promise_factory(std::move(call_args)),
[](ServerMetadataHandle md) -> ServerMetadataHandle {
auto r = CheckServerMetadata(md.get());
if (!r.ok()) return ServerMetadataFromStatus(r);
return md;
}))
.NecessaryPull(Seq(read_latch->Wait(),
}),
[]() { return absl::OkStatus(); },
Seq(read_latch->Wait(),
[write_latch](ServerMetadata** md) -> absl::Status {
auto r = *md == nullptr ? absl::OkStatus()
: CheckServerMetadata(*md);
auto r =
*md == nullptr ? absl::OkStatus() : CheckServerMetadata(*md);
write_latch->Set(*md);
return r;
}));

@ -32,11 +32,12 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/promise/call_push_pull.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/basic_seq.h"
#include "src/core/lib/promise/latch.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/promise/try_concurrently.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/percent_encoding.h"
#include "src/core/lib/slice/slice.h"
@ -131,20 +132,21 @@ ArenaPromise<ServerMetadataHandle> HttpServerFilter::MakeCallPromise(
auto* write_latch =
std::exchange(call_args.server_initial_metadata, read_latch);
return TryConcurrently(
Seq(next_promise_factory(std::move(call_args)),
return CallPushPull(Seq(next_promise_factory(std::move(call_args)),
[](ServerMetadataHandle md) -> ServerMetadataHandle {
FilterOutgoingMetadata(md.get());
return md;
}))
.Push(Seq(read_latch->Wait(), [write_latch](ServerMetadata** md) {
}),
Seq(read_latch->Wait(),
[write_latch](ServerMetadata** md) {
FilterOutgoingMetadata(*md);
(*md)->Set(HttpStatusMetadata(), 200);
(*md)->Set(ContentTypeMetadata(),
ContentTypeMetadata::kApplicationGrpc);
write_latch->Set(*md);
return absl::OkStatus();
}));
}),
[]() { return absl::OkStatus(); });
}
absl::StatusOr<HttpServerFilter> HttpServerFilter::Create(

@ -19,12 +19,11 @@
#include <stdlib.h>
#include <memory>
#include <new>
#include <utility>
#include "absl/meta/type_traits.h"
#include "src/core/lib/gprpp/construct_destruct.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/resource_quota/arena.h"
@ -33,28 +32,20 @@ namespace grpc_core {
namespace arena_promise_detail {
using ArgType = std::aligned_storage_t<sizeof(void*)>;
template <typename T>
T*& ArgAsPtr(ArgType* arg) {
static_assert(sizeof(ArgType) >= sizeof(T**),
"Must have ArgType of at least one pointer size");
return *reinterpret_cast<T**>(arg);
}
template <typename T>
struct Vtable {
// Poll the promise, once.
Poll<T> (*poll_once)(ArgType* arg);
Poll<T> (*poll_once)(void** arg);
// Destroy the underlying callable object if there is one.
// Since we don't delete (the arena owns the memory) but we may need to call a
// destructor, we expose this for when the ArenaPromise object is destroyed.
void (*destroy)(ArgType* arg);
void (*destroy)(void** arg);
};
template <typename T>
struct VtableAndArg {
const Vtable<T>* vtable;
ArgType arg;
void* arg;
};
// Implementation of Vtable for an empty object.
@ -62,52 +53,37 @@ struct VtableAndArg {
// from. Since in either case these objects should not be polled, we simply
// crash if it is.
template <typename T>
struct Null {
static const Vtable<T> vtable;
static Poll<T> PollOnce(ArgType*) {
inline const Vtable<T>* null_impl() {
static const Vtable<T> vtable = {[](void**) -> Poll<T> {
abort();
GPR_UNREACHABLE_CODE(return Pending{});
}
static void Destroy(ArgType*) {}
};
template <typename T>
const Vtable<T> Null<T>::vtable = {PollOnce, Destroy};
},
[](void**) {}};
return &vtable;
}
// Implementation of ImplInterface for a callable object.
template <typename T, typename Callable>
struct AllocatedCallable {
static const Vtable<T> vtable;
static Poll<T> PollOnce(ArgType* arg) {
return poll_cast<T>((*ArgAsPtr<Callable>(arg))());
}
static void Destroy(ArgType* arg) { Destruct(ArgAsPtr<Callable>(arg)); }
};
template <typename T, typename Callable>
const Vtable<T> AllocatedCallable<T, Callable>::vtable = {PollOnce, Destroy};
inline const Vtable<T>* allocated_callable_impl() {
static const Vtable<T> vtable = {
[](void** arg) -> Poll<T> {
return poll_cast<T>((*static_cast<Callable*>(*arg))());
},
[](void** arg) { static_cast<Callable*>(*arg)->~Callable(); }};
return &vtable;
}
// Implementation of ImplInterface for a small callable object (one that fits
// within the ArgType arg)
// within the void* arg)
template <typename T, typename Callable>
struct Inlined {
static const Vtable<T> vtable;
static Poll<T> PollOnce(ArgType* arg) {
inline const Vtable<T>* inlined_callable_impl() {
static const Vtable<T> vtable = {
[](void** arg) -> Poll<T> {
return poll_cast<T>((*reinterpret_cast<Callable*>(arg))());
}
static void Destroy(ArgType* arg) {
Destruct(reinterpret_cast<Callable*>(arg));
}
};
template <typename T, typename Callable>
const Vtable<T> Inlined<T, Callable>::vtable = {PollOnce, Destroy};
},
[](void** arg) { reinterpret_cast<Callable*>(arg)->~Callable(); }};
return &vtable;
}
// If a callable object is empty we can substitute any instance of that callable
// for the one we call (for how could we tell the difference)?
@ -117,17 +93,12 @@ const Vtable<T> Inlined<T, Callable>::vtable = {PollOnce, Destroy};
// (this comes up often when the promise only accesses context data from the
// containing activity).
template <typename T, typename Callable>
struct SharedCallable {
static const Vtable<T> vtable;
static Poll<T> PollOnce(ArgType* arg) {
return (*reinterpret_cast<Callable*>(arg))();
}
};
template <typename T, typename Callable>
const Vtable<T> SharedCallable<T, Callable>::vtable = {PollOnce,
Null<T>::Destroy};
inline const Vtable<T>* shared_callable_impl(Callable&& callable) {
static Callable instance = std::forward<Callable>(callable);
static const Vtable<T> vtable = {[](void**) -> Poll<T> { return instance(); },
[](void**) {}};
return &vtable;
}
// Redirector type: given a callable type, expose a Make() function that creates
// the appropriate underlying implementation.
@ -138,11 +109,11 @@ template <typename T, typename Callable>
struct ChooseImplForCallable<
T, Callable,
absl::enable_if_t<!std::is_empty<Callable>::value &&
(sizeof(Callable) > sizeof(ArgType))>> {
(sizeof(Callable) > sizeof(void*))>> {
static void Make(Callable&& callable, VtableAndArg<T>* out) {
out->vtable = &AllocatedCallable<T, Callable>::vtable;
ArgAsPtr<Callable>(&out->arg) = GetContext<Arena>()->template New<Callable>(
std::forward<Callable>(callable));
*out = {allocated_callable_impl<T, Callable>(),
GetContext<Arena>()->template New<Callable>(
std::forward<Callable>(callable))};
}
};
@ -150,19 +121,19 @@ template <typename T, typename Callable>
struct ChooseImplForCallable<
T, Callable,
absl::enable_if_t<!std::is_empty<Callable>::value &&
(sizeof(Callable) <= sizeof(ArgType))>> {
(sizeof(Callable) <= sizeof(void*))>> {
static void Make(Callable&& callable, VtableAndArg<T>* out) {
out->vtable = &Inlined<T, Callable>::vtable;
Construct(reinterpret_cast<Callable*>(&out->arg),
std::forward<Callable>(callable));
out->vtable = inlined_callable_impl<T, Callable>();
new (&out->arg) Callable(std::forward<Callable>(callable));
}
};
template <typename T, typename Callable>
struct ChooseImplForCallable<
T, Callable, absl::enable_if_t<std::is_empty<Callable>::value>> {
static void Make(Callable&&, VtableAndArg<T>* out) {
out->vtable = &SharedCallable<T, Callable>::vtable;
static void Make(Callable&& callable, VtableAndArg<T>* out) {
out->vtable =
shared_callable_impl<T, Callable>(std::forward<Callable>(callable));
}
};
@ -188,8 +159,8 @@ class ArenaPromise {
absl::enable_if_t<!std::is_same<Callable, ArenaPromise>::value>>
// NOLINTNEXTLINE(google-explicit-constructor)
ArenaPromise(Callable&& callable) {
arena_promise_detail::MakeImplForCallable(std::forward<Callable>(callable),
&vtable_and_arg_);
arena_promise_detail::MakeImplForCallable<T>(
std::forward<Callable>(callable), &vtable_and_arg_);
}
// ArenaPromise is not copyable.
@ -198,12 +169,12 @@ class ArenaPromise {
// ArenaPromise is movable.
ArenaPromise(ArenaPromise&& other) noexcept
: vtable_and_arg_(other.vtable_and_arg_) {
other.vtable_and_arg_.vtable = &arena_promise_detail::Null<T>::vtable;
other.vtable_and_arg_.vtable = arena_promise_detail::null_impl<T>();
}
ArenaPromise& operator=(ArenaPromise&& other) noexcept {
vtable_and_arg_.vtable->destroy(&vtable_and_arg_.arg);
vtable_and_arg_ = other.vtable_and_arg_;
other.vtable_and_arg_.vtable = &arena_promise_detail::Null<T>::vtable;
other.vtable_and_arg_.vtable = arena_promise_detail::null_impl<T>();
return *this;
}
@ -216,13 +187,13 @@ class ArenaPromise {
}
bool has_value() const {
return vtable_and_arg_.vtable != &arena_promise_detail::Null<T>::vtable;
return vtable_and_arg_.vtable != arena_promise_detail::null_impl<T>();
}
private:
// Underlying impl object.
arena_promise_detail::VtableAndArg<T> vtable_and_arg_ = {
&arena_promise_detail::Null<T>::vtable, {}};
arena_promise_detail::null_impl<T>(), nullptr};
};
} // namespace grpc_core

@ -0,0 +1,148 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_PROMISE_CALL_PUSH_PULL_H
#define GRPC_CORE_LIB_PROMISE_CALL_PUSH_PULL_H
#include <grpc/support/port_platform.h>
#include <assert.h>
#include <type_traits>
#include "absl/types/variant.h"
#include "src/core/lib/gprpp/bitset.h"
#include "src/core/lib/gprpp/construct_destruct.h"
#include "src/core/lib/promise/detail/promise_like.h"
#include "src/core/lib/promise/detail/status.h"
#include "src/core/lib/promise/poll.h"
namespace grpc_core {
namespace promise_detail {
template <typename FMain, typename FPush, typename FPull>
class CallPushPull {
public:
CallPushPull(FMain f_main, FPush f_push, FPull f_pull)
: push_(std::move(f_push)), pull_(std::move(f_pull)) {
Construct(&main_, std::move(f_main));
}
CallPushPull(const CallPushPull&) = delete;
CallPushPull& operator=(const CallPushPull&) = delete;
CallPushPull(CallPushPull&& other) noexcept
: done_(other.done_),
push_(std::move(other.push_)),
pull_(std::move(other.pull_)) {
assert(!done_.is_set(kDoneMain));
Construct(&main_, std::move(other.main_));
}
CallPushPull& operator=(CallPushPull&& other) noexcept {
assert(!done_.is_set(kDoneMain));
done_ = other.done_;
assert(!done_.is_set(kDoneMain));
push_ = std::move(other.push_);
main_ = std::move(other.main_);
pull_ = std::move(other.pull_);
return *this;
}
~CallPushPull() {
if (done_.is_set(kDoneMain)) {
Destruct(&result_);
} else {
Destruct(&main_);
}
}
using Result =
typename PollTraits<decltype(std::declval<PromiseLike<FMain>>()())>::Type;
Poll<Result> operator()() {
if (!done_.is_set(kDonePush)) {
auto p = push_();
if (auto* status = absl::get_if<kPollReadyIdx>(&p)) {
if (IsStatusOk(*status)) {
done_.set(kDonePush);
} else {
return StatusCast<Result>(std::move(*status));
}
}
}
if (!done_.is_set(kDoneMain)) {
auto p = main_();
if (auto* status = absl::get_if<kPollReadyIdx>(&p)) {
done_.set(kDoneMain);
Destruct(&main_);
Construct(&result_, std::move(*status));
}
}
if (!done_.is_set(kDonePull)) {
auto p = pull_();
if (auto* status = absl::get_if<kPollReadyIdx>(&p)) {
if (IsStatusOk(*status)) {
done_.set(kDonePull);
} else {
return StatusCast<Result>(std::move(*status));
}
}
}
if (done_.all()) return std::move(result_);
return Pending{};
}
private:
enum { kDonePull = 0, kDoneMain = 1, kDonePush = 2 };
BitSet<3> done_;
GPR_NO_UNIQUE_ADDRESS PromiseLike<FPush> push_;
union {
PromiseLike<FMain> main_;
Result result_;
};
GPR_NO_UNIQUE_ADDRESS PromiseLike<FPull> pull_;
};
} // namespace promise_detail
// For promises representing calls a common pattern emerges:
// There's a process pushing data down the stack, a process handling the main
// call part, and a process pulling data back up the stack.
//
// This can reasonably be represented by the right combinations of TryJoins and
// Maps, but since the structure is fundamental to the domain we introduce
// this simple helper to make it easier to write the common case.
//
// It takes three promises: the main call, the push and the pull.
// When polling, the push is polled first, then the main call (descending the
// stack), then the pull (as we ascend once more).
//
// If the push or the pull fail early, then the entire call fails.
// If the main part of the call fails, we wait until both push and pull are also
// done.
//
// This strategy minimizes repolls.
template <typename FMain, typename FPush, typename FPull>
promise_detail::CallPushPull<FMain, FPush, FPull> CallPushPull(FMain f_main,
FPush f_push,
FPull f_pull) {
return promise_detail::CallPushPull<FMain, FPush, FPull>(
std::move(f_main), std::move(f_push), std::move(f_pull));
}
} // namespace grpc_core
#endif // GRPC_CORE_LIB_PROMISE_CALL_PUSH_PULL_H

@ -1,341 +0,0 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_PROMISE_TRY_CONCURRENTLY_H
#define GRPC_CORE_LIB_PROMISE_TRY_CONCURRENTLY_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <cstdint>
#include <utility>
#include "absl/types/variant.h"
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/construct_destruct.h"
#include "src/core/lib/promise/detail/promise_like.h"
#include "src/core/lib/promise/detail/status.h"
#include "src/core/lib/promise/poll.h"
namespace grpc_core {
namespace promise_detail {
template <typename Promise>
struct Necessary {
PromiseLike<Promise> promise;
static constexpr bool must_complete() { return true; }
};
template <typename Promise>
struct Helper {
PromiseLike<Promise> promise;
static constexpr bool must_complete() { return false; }
};
// A set of promises that can be polled concurrently.
// Fuses them when completed (that is, destroys the promise and records it
// completed).
// Relies on an external bit field to handle the recording: this saves a bunch
// of space, but means the implementation of this type is weird: it's really
// super tied to TryConcurrently and no attempt should be made to use this
// independently.
template <typename... Ts>
class FusedSet;
template <typename T, typename... Ts>
class FusedSet<T, Ts...> : public FusedSet<Ts...> {
public:
explicit FusedSet(T&& x, Ts&&... xs)
: FusedSet<Ts...>(std::forward<T>(xs)...) {
Construct(&wrapper_, std::forward<T>(x));
}
explicit FusedSet(T&& x, FusedSet<Ts...>&& xs)
: FusedSet<Ts...>(std::forward<FusedSet<Ts...>>(xs)) {
Construct(&wrapper_, std::forward<T>(x));
}
// Empty destructor: consumers must call Destroy() to ensure cleanup occurs
~FusedSet() {}
FusedSet(const FusedSet&) = delete;
FusedSet& operator=(const FusedSet&) = delete;
// Assumes all 'done_bits' for other are 0 and will be set to 1
FusedSet(FusedSet&& other) noexcept : FusedSet<Ts...>(std::move(other)) {
Construct(&wrapper_, std::move(other.wrapper_));
Destruct(&other.wrapper_);
}
static constexpr size_t Size() { return 1 + sizeof...(Ts); }
static constexpr uint8_t NecessaryBits() {
return (T::must_complete() ? 1 : 0) |
(FusedSet<Ts...>::NecessaryBits() << 1);
}
template <int kDoneBit>
void Destroy(uint8_t done_bits) {
if ((done_bits & (1 << kDoneBit)) == 0) {
Destruct(&wrapper_);
}
FusedSet<Ts...>::template Destroy<kDoneBit + 1>(done_bits);
}
template <typename Result, int kDoneBit>
Poll<Result> Run(uint8_t& done_bits) {
if ((done_bits & (1 << kDoneBit)) == 0) {
auto p = wrapper_.promise();
if (auto* status = absl::get_if<kPollReadyIdx>(&p)) {
done_bits |= (1 << kDoneBit);
Destruct(&wrapper_);
if (!IsStatusOk(*status)) {
return StatusCast<Result>(std::move(*status));
}
}
}
return FusedSet<Ts...>::template Run<Result, kDoneBit + 1>(done_bits);
}
template <typename P>
FusedSet<P, T, Ts...> With(P x) {
return FusedSet<P, T, Ts...>(std::move(x), std::move(*this));
}
private:
union {
T wrapper_;
};
};
template <>
class FusedSet<> {
public:
static constexpr size_t Size() { return 0; }
static constexpr uint8_t NecessaryBits() { return 0; }
template <typename Result, int kDoneBit>
Poll<Result> Run(uint8_t) {
return Pending{};
}
template <int kDoneBit>
void Destroy(uint8_t) {}
template <typename P>
FusedSet<P> With(P x) {
return FusedSet<P>(std::move(x));
}
};
template <typename Main, typename PreMain, typename PostMain>
class TryConcurrently {
public:
TryConcurrently(Main main, PreMain pre_main, PostMain post_main)
: done_bits_(0),
pre_main_(std::move(pre_main)),
post_main_(std::move(post_main)) {
Construct(&main_, std::move(main));
}
TryConcurrently(const TryConcurrently&) = delete;
TryConcurrently& operator=(const TryConcurrently&) = delete;
TryConcurrently(TryConcurrently&& other) noexcept
: done_bits_(0),
pre_main_(std::move(other.pre_main_)),
post_main_(std::move(other.post_main_)) {
GPR_DEBUG_ASSERT(other.done_bits_ == 0);
other.done_bits_ = HelperBits();
Construct(&main_, std::move(other.main_));
}
TryConcurrently& operator=(TryConcurrently&& other) noexcept {
GPR_DEBUG_ASSERT(other.done_bits_ == 0);
done_bits_ = 0;
other.done_bits_ = HelperBits();
pre_main_ = std::move(other.pre_main_);
post_main_ = std::move(other.post_main_);
Construct(&main_, std::move(other.main_));
return *this;
}
~TryConcurrently() {
if (done_bits_ & 1) {
Destruct(&result_);
} else {
Destruct(&main_);
}
pre_main_.template Destroy<1>(done_bits_);
post_main_.template Destroy<1 + PreMain::Size()>(done_bits_);
}
using Result =
typename PollTraits<decltype(std::declval<PromiseLike<Main>>()())>::Type;
Poll<Result> operator()() {
auto r = pre_main_.template Run<Result, 1>(done_bits_);
if (auto* status = absl::get_if<Result>(&r)) {
GPR_DEBUG_ASSERT(!IsStatusOk(*status));
return std::move(*status);
}
if ((done_bits_ & 1) == 0) {
auto p = main_();
if (auto* status = absl::get_if<kPollReadyIdx>(&p)) {
done_bits_ |= 1;
Destruct(&main_);
Construct(&result_, std::move(*status));
}
}
r = post_main_.template Run<Result, 1 + PreMain::Size()>(done_bits_);
if (auto* status = absl::get_if<Result>(&r)) {
GPR_DEBUG_ASSERT(!IsStatusOk(*status));
return std::move(*status);
}
if ((done_bits_ & NecessaryBits()) == NecessaryBits()) {
return std::move(result_);
}
return Pending{};
}
template <typename P>
auto NecessaryPush(P p);
template <typename P>
auto NecessaryPull(P p);
template <typename P>
auto Push(P p);
template <typename P>
auto Pull(P p);
private:
// Bitmask for done_bits_ specifying which promises must be completed prior to
// returning ok.
constexpr uint8_t NecessaryBits() {
return 1 | (PreMain::NecessaryBits() << 1) |
(PostMain::NecessaryBits() << (1 + PreMain::Size()));
}
// Bitmask for done_bits_ specifying what all of the promises being complete
// would look like.
constexpr uint8_t AllBits() {
return (1 << (1 + PreMain::Size() + PostMain::Size())) - 1;
}
// Bitmask of done_bits_ specifying which bits correspond to helper promises -
// that is all promises that are not the main one.
constexpr uint8_t HelperBits() { return AllBits() ^ 1; }
// done_bits signifies which operations have completed.
// Bit 0 is set if main_ has completed.
// The next higher bits correspond one per pre-main promise.
// The next higher bits correspond one per post-main promise.
// So, going from most significant bit to least significant:
// +--------------+-------------+--------+
// |post_main bits|pre_main bits|main bit|
// +--------------+-------------+--------+
uint8_t done_bits_;
PreMain pre_main_;
union {
PromiseLike<Main> main_;
Result result_;
};
PostMain post_main_;
};
template <typename Main, typename PreMain, typename PostMain>
auto MakeTryConcurrently(Main&& main, PreMain&& pre_main,
PostMain&& post_main) {
return TryConcurrently<Main, PreMain, PostMain>(
std::forward<Main>(main), std::forward<PreMain>(pre_main),
std::forward<PostMain>(post_main));
}
template <typename Main, typename PreMain, typename PostMain>
template <typename P>
auto TryConcurrently<Main, PreMain, PostMain>::NecessaryPush(P p) {
GPR_DEBUG_ASSERT(done_bits_ == 0);
done_bits_ = HelperBits();
return MakeTryConcurrently(std::move(main_),
pre_main_.With(Necessary<P>{std::move(p)}),
std::move(post_main_));
}
template <typename Main, typename PreMain, typename PostMain>
template <typename P>
auto TryConcurrently<Main, PreMain, PostMain>::NecessaryPull(P p) {
GPR_DEBUG_ASSERT(done_bits_ == 0);
done_bits_ = HelperBits();
return MakeTryConcurrently(std::move(main_), std::move(pre_main_),
post_main_.With(Necessary<P>{std::move(p)}));
}
template <typename Main, typename PreMain, typename PostMain>
template <typename P>
auto TryConcurrently<Main, PreMain, PostMain>::Push(P p) {
GPR_DEBUG_ASSERT(done_bits_ == 0);
done_bits_ = HelperBits();
return MakeTryConcurrently(std::move(main_),
pre_main_.With(Helper<P>{std::move(p)}),
std::move(post_main_));
}
template <typename Main, typename PreMain, typename PostMain>
template <typename P>
auto TryConcurrently<Main, PreMain, PostMain>::Pull(P p) {
GPR_DEBUG_ASSERT(done_bits_ == 0);
done_bits_ = HelperBits();
return MakeTryConcurrently(std::move(main_), std::move(pre_main_),
post_main_.With(Helper<P>{std::move(p)}));
}
} // namespace promise_detail
// TryConcurrently runs a set of promises concurrently.
// There is a structure to the promises:
// - A 'main' promise dominates the others - it must complete before the
// overall promise successfully completes. Its result is chosen in the event
// of successful completion.
// - A set of (optional) push and pull promises to aid main. Push promises are
// polled before main, pull promises are polled after. In this way we can
// avoid overall wakeup churn - sending a message will tend to push things
// down the promise tree as its polled, so that send should be in a push
// promise - then as the main promise is polled and it calls into things
// lower in the stack they'll already see things there (this reasoning holds
// for receiving things and the pull promises too!).
// - Each push and pull promise is either necessary or optional.
// Necessary promises must complete successfully before the overall promise
// completes. Optional promises will just be cancelled once the main promise
// completes and any necessary helpers.
// - If any of the promises fail, the overall promise fails immediately.
// API:
// This function, TryConcurrently, is used to create a TryConcurrently promise.
// It takes a single argument, being the main promise. That promise also has
// a set of methods for attaching push and pull promises. The act of attachment
// returns a new TryConcurrently promise with previous contained promises moved
// out.
// The methods exposed:
// - Push, NecessaryPush: attach a push promise (with the first variant being
// optional, the second necessary).
// - Pull, NecessaryPull: attach a pull promise, with variants as above.
// Example:
// TryConcurrently(call_next_filter(std::move(call_args)))
// .Push(send_messages_promise)
// .Pull(recv_messages_promise)
template <typename Main>
auto TryConcurrently(Main main) {
return promise_detail::MakeTryConcurrently(std::move(main),
promise_detail::FusedSet<>(),
promise_detail::FusedSet<>());
}
} // namespace grpc_core
#endif // GRPC_CORE_LIB_PROMISE_TRY_CONCURRENTLY_H

@ -406,8 +406,8 @@ grpc_cc_test(
)
grpc_cc_test(
name = "try_concurrently_test",
srcs = ["try_concurrently_test.cc"],
name = "call_push_pull_test",
srcs = ["call_push_pull_test.cc"],
external_deps = [
"gtest",
"absl/status",
@ -417,6 +417,6 @@ grpc_cc_test(
uses_event_engine = False,
uses_polling = False,
deps = [
"//src/core:try_concurrently",
"//src/core:call_push_pull",
],
)

@ -14,7 +14,6 @@
#include "src/core/lib/promise/arena_promise.h"
#include <array>
#include <memory>
#include "absl/types/variant.h"
@ -72,23 +71,6 @@ TEST(ArenaPromiseTest, MoveAssignmentWorks) {
p = ArenaPromise<int>();
}
TEST(ArenaPromiseTest, AllocatedUniquePtrWorks) {
ExecCtx exec_ctx;
auto arena = MakeScopedArena(1024, g_memory_allocator);
TestContext<Arena> context(arena.get());
std::array<int, 5> garbage = {0, 1, 2, 3, 4};
auto freer = [garbage](int* p) { free(p + garbage[0]); };
using Ptr = std::unique_ptr<int, decltype(freer)>;
Ptr x(new int(42), freer);
static_assert(sizeof(x) > sizeof(arena_promise_detail::ArgType),
"This test assumes the unique ptr will go down the allocated "
"path for ArenaPromise");
ArenaPromise<Ptr> initial_promise(
[x = std::move(x)]() mutable { return Poll<Ptr>(std::move(x)); });
ArenaPromise<Ptr> p(std::move(initial_promise));
EXPECT_EQ(*absl::get<Ptr>(p()), 42);
}
} // namespace grpc_core
int main(int argc, char** argv) {

@ -0,0 +1,77 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/promise/call_push_pull.h"
#include <utility>
#include "absl/status/status.h"
#include "gtest/gtest.h"
namespace grpc_core {
TEST(CallPushPullTest, Empty) {
auto p = CallPushPull([] { return absl::OkStatus(); },
[] { return absl::OkStatus(); },
[] { return absl::OkStatus(); });
EXPECT_EQ(p(), Poll<absl::Status>(absl::OkStatus()));
}
TEST(CallPushPullTest, Paused) {
auto p = CallPushPull([]() -> Poll<absl::Status> { return Pending{}; },
[]() -> Poll<absl::Status> { return Pending{}; },
[]() -> Poll<absl::Status> { return Pending{}; });
EXPECT_EQ(p(), Poll<absl::Status>(Pending{}));
}
TEST(CallPushPullTest, OneReady) {
auto a = CallPushPull([]() -> Poll<absl::Status> { return absl::OkStatus(); },
[]() -> Poll<absl::Status> { return Pending{}; },
[]() -> Poll<absl::Status> { return Pending{}; });
EXPECT_EQ(a(), Poll<absl::Status>(Pending{}));
auto b = CallPushPull([]() -> Poll<absl::Status> { return Pending{}; },
[]() -> Poll<absl::Status> { return absl::OkStatus(); },
[]() -> Poll<absl::Status> { return Pending{}; });
EXPECT_EQ(b(), Poll<absl::Status>(Pending{}));
auto c =
CallPushPull([]() -> Poll<absl::Status> { return Pending{}; },
[]() -> Poll<absl::Status> { return Pending{}; },
[]() -> Poll<absl::Status> { return absl::OkStatus(); });
EXPECT_EQ(c(), Poll<absl::Status>(Pending{}));
}
TEST(CallPushPullTest, OneFailed) {
auto a = CallPushPull(
[]() -> Poll<absl::Status> { return absl::UnknownError("bah"); },
[]() -> Poll<absl::Status> { return absl::OkStatus(); },
[]() -> Poll<absl::Status> { return absl::OkStatus(); });
EXPECT_EQ(a(), Poll<absl::Status>(absl::UnknownError("bah")));
auto b = CallPushPull(
[]() -> Poll<absl::Status> { return Pending{}; },
[]() -> Poll<absl::Status> { return absl::UnknownError("humbug"); },
[]() -> Poll<absl::Status> { return Pending{}; });
EXPECT_EQ(b(), Poll<absl::Status>(absl::UnknownError("humbug")));
auto c = CallPushPull(
[]() -> Poll<absl::Status> { return Pending{}; },
[]() -> Poll<absl::Status> { return Pending{}; },
[]() -> Poll<absl::Status> { return absl::UnknownError("wha"); });
EXPECT_EQ(c(), Poll<absl::Status>(absl::UnknownError("wha")));
}
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -1,160 +0,0 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/promise/try_concurrently.h"
#include <algorithm>
#include <iosfwd>
#include <memory>
#include <string>
#include <vector>
#include "absl/status/status.h"
#include "gtest/gtest.h"
namespace grpc_core {
class PromiseFactory {
public:
// Create a promise that resolves to Ok but has a memory allocation (to verify
// destruction)
auto OkPromise(std::string tag) {
return [this, tag = std::move(tag),
p = std::make_unique<absl::Status>(absl::OkStatus())]() mutable {
order_.push_back(tag);
return std::move(*p);
};
}
// Create a promise that never resolves and carries a memory allocation
auto NeverPromise(std::string tag) {
return [this, tag = std::move(tag),
p = std::make_unique<Pending>()]() -> Poll<absl::Status> {
order_.push_back(tag);
return *p;
};
}
// Create a promise that fails and carries a memory allocation
auto FailPromise(std::string tag) {
return [this, p = std::make_unique<absl::Status>(absl::UnknownError(tag)),
tag = std::move(tag)]() mutable {
order_.push_back(tag);
return std::move(*p);
};
}
// Finish one round and return a vector of strings representing which promises
// were polled and in which order.
std::vector<std::string> Finish() { return std::exchange(order_, {}); }
private:
std::vector<std::string> order_;
};
std::ostream& operator<<(std::ostream& out, const Poll<absl::Status>& p) {
return out << PollToString(
p, [](const absl::Status& s) { return s.ToString(); });
}
TEST(TryConcurrentlyTest, Immediate) {
PromiseFactory pf;
auto a = TryConcurrently(pf.OkPromise("1"));
EXPECT_EQ(a(), Poll<absl::Status>(absl::OkStatus()));
EXPECT_EQ(pf.Finish(), std::vector<std::string>({"1"}));
auto b = TryConcurrently(pf.OkPromise("1")).NecessaryPush(pf.OkPromise("2"));
EXPECT_EQ(b(), Poll<absl::Status>(absl::OkStatus()));
EXPECT_EQ(pf.Finish(), std::vector<std::string>({"2", "1"}));
auto c = TryConcurrently(pf.OkPromise("1")).NecessaryPull(pf.OkPromise("2"));
EXPECT_EQ(c(), Poll<absl::Status>(absl::OkStatus()));
EXPECT_EQ(pf.Finish(), std::vector<std::string>({"1", "2"}));
auto d = TryConcurrently(pf.OkPromise("1"))
.NecessaryPull(pf.OkPromise("2"))
.NecessaryPush(pf.OkPromise("3"));
EXPECT_EQ(d(), Poll<absl::Status>(absl::OkStatus()));
EXPECT_EQ(pf.Finish(), std::vector<std::string>({"3", "1", "2"}));
auto e = TryConcurrently(pf.OkPromise("1")).Push(pf.NeverPromise("2"));
EXPECT_EQ(e(), Poll<absl::Status>(absl::OkStatus()));
EXPECT_EQ(pf.Finish(), std::vector<std::string>({"2", "1"}));
auto f = TryConcurrently(pf.OkPromise("1")).Pull(pf.NeverPromise("2"));
EXPECT_EQ(f(), Poll<absl::Status>(absl::OkStatus()));
EXPECT_EQ(pf.Finish(), std::vector<std::string>({"1", "2"}));
}
TEST(TryConcurrentlyTest, Paused) {
PromiseFactory pf;
auto a = TryConcurrently(pf.NeverPromise("1"));
EXPECT_EQ(a(), Poll<absl::Status>(Pending{}));
EXPECT_EQ(pf.Finish(), std::vector<std::string>({"1"}));
auto b =
TryConcurrently(pf.OkPromise("1")).NecessaryPush(pf.NeverPromise("2"));
EXPECT_EQ(b(), Poll<absl::Status>(Pending{}));
EXPECT_EQ(pf.Finish(), std::vector<std::string>({"2", "1"}));
auto c =
TryConcurrently(pf.OkPromise("1")).NecessaryPull(pf.NeverPromise("2"));
EXPECT_EQ(c(), Poll<absl::Status>(Pending{}));
EXPECT_EQ(pf.Finish(), std::vector<std::string>({"1", "2"}));
}
TEST(TryConcurrentlyTest, OneFailed) {
PromiseFactory pf;
auto a = TryConcurrently(pf.FailPromise("bah"));
EXPECT_EQ(a(), Poll<absl::Status>(absl::UnknownError("bah")));
EXPECT_EQ(pf.Finish(), std::vector<std::string>({"bah"}));
auto b = TryConcurrently(pf.NeverPromise("1"))
.NecessaryPush(pf.FailPromise("humbug"));
EXPECT_EQ(b(), Poll<absl::Status>(absl::UnknownError("humbug")));
EXPECT_EQ(pf.Finish(), std::vector<std::string>({"humbug"}));
auto c = TryConcurrently(pf.NeverPromise("1"))
.NecessaryPull(pf.FailPromise("wha"));
EXPECT_EQ(c(), Poll<absl::Status>(absl::UnknownError("wha")));
EXPECT_EQ(pf.Finish(), std::vector<std::string>({"1", "wha"}));
}
// A pointer to an int designed to cause a double free if it's double destructed
// (to flush out bugs)
class ProblematicPointer {
public:
ProblematicPointer() : p_(new int(0)) {}
~ProblematicPointer() { delete p_; }
ProblematicPointer(const ProblematicPointer&) = delete;
ProblematicPointer& operator=(const ProblematicPointer&) = delete;
// NOLINTNEXTLINE: we want to allocate during move
ProblematicPointer(ProblematicPointer&& other) : p_(new int(*other.p_ + 1)) {}
ProblematicPointer& operator=(ProblematicPointer&& other) = delete;
private:
int* p_;
};
TEST(TryConcurrentlyTest, MoveItMoveIt) {
auto a =
TryConcurrently([x = ProblematicPointer()]() { return absl::OkStatus(); })
.NecessaryPull(
[x = ProblematicPointer()]() { return absl::OkStatus(); })
.NecessaryPush(
[x = ProblematicPointer()]() { return absl::OkStatus(); })
.Push([x = ProblematicPointer()]() { return absl::OkStatus(); })
.Pull([x = ProblematicPointer()]() { return absl::OkStatus(); });
auto b = std::move(a);
auto c = std::move(b);
c();
}
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -2266,18 +2266,17 @@ src/core/lib/matchers/matchers.h \
src/core/lib/promise/activity.cc \
src/core/lib/promise/activity.h \
src/core/lib/promise/arena_promise.h \
src/core/lib/promise/call_push_pull.h \
src/core/lib/promise/context.h \
src/core/lib/promise/detail/basic_seq.h \
src/core/lib/promise/detail/promise_factory.h \
src/core/lib/promise/detail/promise_like.h \
src/core/lib/promise/detail/status.h \
src/core/lib/promise/exec_ctx_wakeup_scheduler.h \
src/core/lib/promise/for_each.h \
src/core/lib/promise/intra_activity_waiter.h \
src/core/lib/promise/latch.h \
src/core/lib/promise/loop.h \
src/core/lib/promise/map.h \
src/core/lib/promise/map_pipe.h \
src/core/lib/promise/pipe.cc \
src/core/lib/promise/pipe.h \
src/core/lib/promise/poll.h \
@ -2286,7 +2285,6 @@ src/core/lib/promise/race.h \
src/core/lib/promise/seq.h \
src/core/lib/promise/sleep.cc \
src/core/lib/promise/sleep.h \
src/core/lib/promise/try_concurrently.h \
src/core/lib/promise/try_seq.h \
src/core/lib/resolver/resolver.cc \
src/core/lib/resolver/resolver.h \

@ -2057,18 +2057,17 @@ src/core/lib/matchers/matchers.h \
src/core/lib/promise/activity.cc \
src/core/lib/promise/activity.h \
src/core/lib/promise/arena_promise.h \
src/core/lib/promise/call_push_pull.h \
src/core/lib/promise/context.h \
src/core/lib/promise/detail/basic_seq.h \
src/core/lib/promise/detail/promise_factory.h \
src/core/lib/promise/detail/promise_like.h \
src/core/lib/promise/detail/status.h \
src/core/lib/promise/exec_ctx_wakeup_scheduler.h \
src/core/lib/promise/for_each.h \
src/core/lib/promise/intra_activity_waiter.h \
src/core/lib/promise/latch.h \
src/core/lib/promise/loop.h \
src/core/lib/promise/map.h \
src/core/lib/promise/map_pipe.h \
src/core/lib/promise/pipe.cc \
src/core/lib/promise/pipe.h \
src/core/lib/promise/poll.h \
@ -2077,7 +2076,6 @@ src/core/lib/promise/race.h \
src/core/lib/promise/seq.h \
src/core/lib/promise/sleep.cc \
src/core/lib/promise/sleep.h \
src/core/lib/promise/try_concurrently.h \
src/core/lib/promise/try_seq.h \
src/core/lib/resolver/resolver.cc \
src/core/lib/resolver/resolver.h \

@ -1415,6 +1415,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "call_push_pull_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
@ -7701,30 +7725,6 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "try_concurrently_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save