From 3df2a4eceabbbcc5522c70ad3da837f3b46c285a Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 16 Dec 2021 16:57:07 +0100 Subject: [PATCH] Revert "Promise pipe redux (#28319)" (#28364) This reverts commit b9ea84ac9dcca0e257b42d3203a615c9de220e07. --- BUILD | 1 - CMakeLists.txt | 150 ++++++-- build_autogenerated.yaml | 267 +++++++++----- src/core/lib/promise/activity.h | 22 +- src/core/lib/promise/pipe.h | 572 +++++++++++++++++++++-------- test/core/promise/BUILD | 2 - test/core/promise/for_each_test.cc | 23 +- test/core/promise/pipe_test.cc | 97 +++-- 8 files changed, 786 insertions(+), 348 deletions(-) diff --git a/BUILD b/BUILD index b344d88a86e..164dfcfd468 100644 --- a/BUILD +++ b/BUILD @@ -1372,7 +1372,6 @@ grpc_cc_library( ], deps = [ "activity", - "arena", "gpr_platform", "intra_activity_waiter", ], diff --git a/CMakeLists.txt b/CMakeLists.txt index b89f2988815..166638421df 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10717,23 +10717,53 @@ endif() if(gRPC_BUILD_TESTS) add_executable(for_each_test - src/core/lib/debug/trace.cc - src/core/lib/event_engine/memory_allocator.cc - src/core/lib/iomgr/combiner.cc - src/core/lib/iomgr/error.cc - src/core/lib/iomgr/exec_ctx.cc - src/core/lib/iomgr/executor.cc - src/core/lib/iomgr/iomgr_internal.cc + src/core/ext/upb-generated/google/protobuf/any.upb.c + src/core/ext/upb-generated/google/rpc/status.upb.c + src/core/lib/gpr/alloc.cc + src/core/lib/gpr/atm.cc + src/core/lib/gpr/cpu_iphone.cc + src/core/lib/gpr/cpu_linux.cc + src/core/lib/gpr/cpu_posix.cc + src/core/lib/gpr/cpu_windows.cc + src/core/lib/gpr/env_linux.cc + src/core/lib/gpr/env_posix.cc + src/core/lib/gpr/env_windows.cc + src/core/lib/gpr/log.cc + src/core/lib/gpr/log_android.cc + src/core/lib/gpr/log_linux.cc + src/core/lib/gpr/log_posix.cc + src/core/lib/gpr/log_windows.cc + src/core/lib/gpr/murmur_hash.cc + src/core/lib/gpr/string.cc + src/core/lib/gpr/string_posix.cc + src/core/lib/gpr/string_util_windows.cc + src/core/lib/gpr/string_windows.cc + src/core/lib/gpr/sync.cc + src/core/lib/gpr/sync_abseil.cc + src/core/lib/gpr/sync_posix.cc + src/core/lib/gpr/sync_windows.cc + src/core/lib/gpr/time.cc + src/core/lib/gpr/time_posix.cc + src/core/lib/gpr/time_precise.cc + src/core/lib/gpr/time_windows.cc + src/core/lib/gpr/tmpfile_msys.cc + src/core/lib/gpr/tmpfile_posix.cc + src/core/lib/gpr/tmpfile_windows.cc + src/core/lib/gpr/wrap_memcpy.cc + src/core/lib/gprpp/examine_stack.cc + src/core/lib/gprpp/fork.cc + src/core/lib/gprpp/global_config_env.cc + src/core/lib/gprpp/host_port.cc + src/core/lib/gprpp/mpscq.cc + src/core/lib/gprpp/stat_posix.cc + src/core/lib/gprpp/stat_windows.cc + src/core/lib/gprpp/status_helper.cc + src/core/lib/gprpp/thd_posix.cc + src/core/lib/gprpp/thd_windows.cc + src/core/lib/gprpp/time_util.cc + src/core/lib/profiling/basic_timers.cc + src/core/lib/profiling/stap_timers.cc src/core/lib/promise/activity.cc - src/core/lib/resource_quota/arena.cc - src/core/lib/resource_quota/memory_quota.cc - src/core/lib/resource_quota/resource_quota.cc - src/core/lib/resource_quota/thread_quota.cc - src/core/lib/resource_quota/trace.cc - src/core/lib/slice/slice.cc - src/core/lib/slice/slice_refcount.cc - src/core/lib/slice/slice_string_helpers.cc - src/core/lib/slice/static_slice.cc test/core/promise/for_each_test.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc @@ -10761,10 +10791,21 @@ target_include_directories(for_each_test target_link_libraries(for_each_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::base + absl::core_headers absl::flat_hash_set + absl::memory + absl::random_random + absl::status absl::statusor + absl::cord + absl::str_format + absl::strings + absl::synchronization + absl::time + absl::optional absl::variant - gpr + upb ) @@ -13353,23 +13394,53 @@ endif() if(gRPC_BUILD_TESTS) add_executable(pipe_test - src/core/lib/debug/trace.cc - src/core/lib/event_engine/memory_allocator.cc - src/core/lib/iomgr/combiner.cc - src/core/lib/iomgr/error.cc - src/core/lib/iomgr/exec_ctx.cc - src/core/lib/iomgr/executor.cc - src/core/lib/iomgr/iomgr_internal.cc + src/core/ext/upb-generated/google/protobuf/any.upb.c + src/core/ext/upb-generated/google/rpc/status.upb.c + src/core/lib/gpr/alloc.cc + src/core/lib/gpr/atm.cc + src/core/lib/gpr/cpu_iphone.cc + src/core/lib/gpr/cpu_linux.cc + src/core/lib/gpr/cpu_posix.cc + src/core/lib/gpr/cpu_windows.cc + src/core/lib/gpr/env_linux.cc + src/core/lib/gpr/env_posix.cc + src/core/lib/gpr/env_windows.cc + src/core/lib/gpr/log.cc + src/core/lib/gpr/log_android.cc + src/core/lib/gpr/log_linux.cc + src/core/lib/gpr/log_posix.cc + src/core/lib/gpr/log_windows.cc + src/core/lib/gpr/murmur_hash.cc + src/core/lib/gpr/string.cc + src/core/lib/gpr/string_posix.cc + src/core/lib/gpr/string_util_windows.cc + src/core/lib/gpr/string_windows.cc + src/core/lib/gpr/sync.cc + src/core/lib/gpr/sync_abseil.cc + src/core/lib/gpr/sync_posix.cc + src/core/lib/gpr/sync_windows.cc + src/core/lib/gpr/time.cc + src/core/lib/gpr/time_posix.cc + src/core/lib/gpr/time_precise.cc + src/core/lib/gpr/time_windows.cc + src/core/lib/gpr/tmpfile_msys.cc + src/core/lib/gpr/tmpfile_posix.cc + src/core/lib/gpr/tmpfile_windows.cc + src/core/lib/gpr/wrap_memcpy.cc + src/core/lib/gprpp/examine_stack.cc + src/core/lib/gprpp/fork.cc + src/core/lib/gprpp/global_config_env.cc + src/core/lib/gprpp/host_port.cc + src/core/lib/gprpp/mpscq.cc + src/core/lib/gprpp/stat_posix.cc + src/core/lib/gprpp/stat_windows.cc + src/core/lib/gprpp/status_helper.cc + src/core/lib/gprpp/thd_posix.cc + src/core/lib/gprpp/thd_windows.cc + src/core/lib/gprpp/time_util.cc + src/core/lib/profiling/basic_timers.cc + src/core/lib/profiling/stap_timers.cc src/core/lib/promise/activity.cc - src/core/lib/resource_quota/arena.cc - src/core/lib/resource_quota/memory_quota.cc - src/core/lib/resource_quota/resource_quota.cc - src/core/lib/resource_quota/thread_quota.cc - src/core/lib/resource_quota/trace.cc - src/core/lib/slice/slice.cc - src/core/lib/slice/slice_refcount.cc - src/core/lib/slice/slice_string_helpers.cc - src/core/lib/slice/static_slice.cc test/core/promise/pipe_test.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc @@ -13397,9 +13468,20 @@ target_include_directories(pipe_test target_link_libraries(pipe_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::base + absl::core_headers + absl::memory + absl::random_random + absl::status absl::statusor + absl::cord + absl::str_format + absl::strings + absl::synchronization + absl::time + absl::optional absl::variant - gpr + upb ) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 93893fa0d4d..ee35213f99a 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -5853,21 +5853,38 @@ targets: build: test language: c++ headers: - - src/core/lib/debug/trace.h + - src/core/ext/upb-generated/google/protobuf/any.upb.h + - src/core/ext/upb-generated/google/rpc/status.upb.h + - src/core/lib/gpr/alloc.h + - src/core/lib/gpr/env.h + - src/core/lib/gpr/murmur_hash.h + - src/core/lib/gpr/spinlock.h + - src/core/lib/gpr/string.h + - src/core/lib/gpr/string_windows.h + - src/core/lib/gpr/time_precise.h + - src/core/lib/gpr/tls.h + - src/core/lib/gpr/tmpfile.h + - src/core/lib/gpr/useful.h - src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/bitset.h - - src/core/lib/gprpp/cpp_impl_of.h - - src/core/lib/gprpp/dual_ref_counted.h - - src/core/lib/gprpp/orphanable.h - - src/core/lib/gprpp/ref_counted.h - - src/core/lib/gprpp/ref_counted_ptr.h - - src/core/lib/iomgr/closure.h - - src/core/lib/iomgr/combiner.h - - src/core/lib/iomgr/error.h - - src/core/lib/iomgr/error_internal.h - - src/core/lib/iomgr/exec_ctx.h - - src/core/lib/iomgr/executor.h - - src/core/lib/iomgr/iomgr_internal.h + - src/core/lib/gprpp/construct_destruct.h + - src/core/lib/gprpp/debug_location.h + - src/core/lib/gprpp/examine_stack.h + - src/core/lib/gprpp/fork.h + - src/core/lib/gprpp/global_config.h + - src/core/lib/gprpp/global_config_custom.h + - src/core/lib/gprpp/global_config_env.h + - src/core/lib/gprpp/global_config_generic.h + - src/core/lib/gprpp/host_port.h + - src/core/lib/gprpp/manual_constructor.h + - src/core/lib/gprpp/memory.h + - src/core/lib/gprpp/mpscq.h + - src/core/lib/gprpp/stat.h + - src/core/lib/gprpp/status_helper.h + - src/core/lib/gprpp/sync.h + - src/core/lib/gprpp/thd.h + - src/core/lib/gprpp/time_util.h + - src/core/lib/profiling/timers.h - src/core/lib/promise/activity.h - src/core/lib/promise/context.h - src/core/lib/promise/detail/basic_join.h @@ -5876,55 +5893,81 @@ targets: - src/core/lib/promise/detail/promise_like.h - src/core/lib/promise/detail/status.h - src/core/lib/promise/detail/switch.h - - src/core/lib/promise/exec_ctx_wakeup_scheduler.h - src/core/lib/promise/for_each.h - src/core/lib/promise/intra_activity_waiter.h - src/core/lib/promise/join.h - - src/core/lib/promise/loop.h - src/core/lib/promise/map.h - src/core/lib/promise/observable.h - src/core/lib/promise/pipe.h - src/core/lib/promise/poll.h - - src/core/lib/promise/race.h - src/core/lib/promise/seq.h - src/core/lib/promise/wait_set.h - - src/core/lib/resource_quota/arena.h - - src/core/lib/resource_quota/memory_quota.h - - src/core/lib/resource_quota/resource_quota.h - - src/core/lib/resource_quota/thread_quota.h - - src/core/lib/resource_quota/trace.h - - src/core/lib/slice/slice.h - - src/core/lib/slice/slice_internal.h - - src/core/lib/slice/slice_refcount.h - - src/core/lib/slice/slice_refcount_base.h - - src/core/lib/slice/slice_string_helpers.h - - src/core/lib/slice/slice_utils.h - - src/core/lib/slice/static_slice.h - test/core/promise/test_wakeup_schedulers.h src: - - src/core/lib/debug/trace.cc - - src/core/lib/event_engine/memory_allocator.cc - - src/core/lib/iomgr/combiner.cc - - src/core/lib/iomgr/error.cc - - src/core/lib/iomgr/exec_ctx.cc - - src/core/lib/iomgr/executor.cc - - src/core/lib/iomgr/iomgr_internal.cc + - src/core/ext/upb-generated/google/protobuf/any.upb.c + - src/core/ext/upb-generated/google/rpc/status.upb.c + - src/core/lib/gpr/alloc.cc + - src/core/lib/gpr/atm.cc + - src/core/lib/gpr/cpu_iphone.cc + - src/core/lib/gpr/cpu_linux.cc + - src/core/lib/gpr/cpu_posix.cc + - src/core/lib/gpr/cpu_windows.cc + - src/core/lib/gpr/env_linux.cc + - src/core/lib/gpr/env_posix.cc + - src/core/lib/gpr/env_windows.cc + - src/core/lib/gpr/log.cc + - src/core/lib/gpr/log_android.cc + - src/core/lib/gpr/log_linux.cc + - src/core/lib/gpr/log_posix.cc + - src/core/lib/gpr/log_windows.cc + - src/core/lib/gpr/murmur_hash.cc + - src/core/lib/gpr/string.cc + - src/core/lib/gpr/string_posix.cc + - src/core/lib/gpr/string_util_windows.cc + - src/core/lib/gpr/string_windows.cc + - src/core/lib/gpr/sync.cc + - src/core/lib/gpr/sync_abseil.cc + - src/core/lib/gpr/sync_posix.cc + - src/core/lib/gpr/sync_windows.cc + - src/core/lib/gpr/time.cc + - src/core/lib/gpr/time_posix.cc + - src/core/lib/gpr/time_precise.cc + - src/core/lib/gpr/time_windows.cc + - src/core/lib/gpr/tmpfile_msys.cc + - src/core/lib/gpr/tmpfile_posix.cc + - src/core/lib/gpr/tmpfile_windows.cc + - src/core/lib/gpr/wrap_memcpy.cc + - src/core/lib/gprpp/examine_stack.cc + - src/core/lib/gprpp/fork.cc + - src/core/lib/gprpp/global_config_env.cc + - src/core/lib/gprpp/host_port.cc + - src/core/lib/gprpp/mpscq.cc + - src/core/lib/gprpp/stat_posix.cc + - src/core/lib/gprpp/stat_windows.cc + - src/core/lib/gprpp/status_helper.cc + - src/core/lib/gprpp/thd_posix.cc + - src/core/lib/gprpp/thd_windows.cc + - src/core/lib/gprpp/time_util.cc + - src/core/lib/profiling/basic_timers.cc + - src/core/lib/profiling/stap_timers.cc - src/core/lib/promise/activity.cc - - src/core/lib/resource_quota/arena.cc - - src/core/lib/resource_quota/memory_quota.cc - - src/core/lib/resource_quota/resource_quota.cc - - src/core/lib/resource_quota/thread_quota.cc - - src/core/lib/resource_quota/trace.cc - - src/core/lib/slice/slice.cc - - src/core/lib/slice/slice_refcount.cc - - src/core/lib/slice/slice_string_helpers.cc - - src/core/lib/slice/static_slice.cc - test/core/promise/for_each_test.cc deps: + - absl/base:base + - absl/base:core_headers - absl/container:flat_hash_set + - absl/memory:memory + - absl/random:random + - absl/status:status - absl/status:statusor + - absl/strings:cord + - absl/strings:str_format + - absl/strings:strings + - absl/synchronization:synchronization + - absl/time:time + - absl/types:optional - absl/types:variant - - gpr + - upb uses_polling: false - name: generic_end2end_test gtest: true @@ -6952,21 +6995,38 @@ targets: build: test language: c++ headers: - - src/core/lib/debug/trace.h + - src/core/ext/upb-generated/google/protobuf/any.upb.h + - src/core/ext/upb-generated/google/rpc/status.upb.h + - src/core/lib/gpr/alloc.h + - src/core/lib/gpr/env.h + - src/core/lib/gpr/murmur_hash.h + - src/core/lib/gpr/spinlock.h + - src/core/lib/gpr/string.h + - src/core/lib/gpr/string_windows.h + - src/core/lib/gpr/time_precise.h + - src/core/lib/gpr/tls.h + - src/core/lib/gpr/tmpfile.h + - src/core/lib/gpr/useful.h - src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/bitset.h - - src/core/lib/gprpp/cpp_impl_of.h - - src/core/lib/gprpp/dual_ref_counted.h - - src/core/lib/gprpp/orphanable.h - - src/core/lib/gprpp/ref_counted.h - - src/core/lib/gprpp/ref_counted_ptr.h - - src/core/lib/iomgr/closure.h - - src/core/lib/iomgr/combiner.h - - src/core/lib/iomgr/error.h - - src/core/lib/iomgr/error_internal.h - - src/core/lib/iomgr/exec_ctx.h - - src/core/lib/iomgr/executor.h - - src/core/lib/iomgr/iomgr_internal.h + - src/core/lib/gprpp/construct_destruct.h + - src/core/lib/gprpp/debug_location.h + - src/core/lib/gprpp/examine_stack.h + - src/core/lib/gprpp/fork.h + - src/core/lib/gprpp/global_config.h + - src/core/lib/gprpp/global_config_custom.h + - src/core/lib/gprpp/global_config_env.h + - src/core/lib/gprpp/global_config_generic.h + - src/core/lib/gprpp/host_port.h + - src/core/lib/gprpp/manual_constructor.h + - src/core/lib/gprpp/memory.h + - src/core/lib/gprpp/mpscq.h + - src/core/lib/gprpp/stat.h + - src/core/lib/gprpp/status_helper.h + - src/core/lib/gprpp/sync.h + - src/core/lib/gprpp/thd.h + - src/core/lib/gprpp/time_util.h + - src/core/lib/profiling/timers.h - src/core/lib/promise/activity.h - src/core/lib/promise/context.h - src/core/lib/promise/detail/basic_join.h @@ -6975,52 +7035,77 @@ targets: - src/core/lib/promise/detail/promise_like.h - src/core/lib/promise/detail/status.h - src/core/lib/promise/detail/switch.h - - src/core/lib/promise/exec_ctx_wakeup_scheduler.h - src/core/lib/promise/intra_activity_waiter.h - src/core/lib/promise/join.h - - src/core/lib/promise/loop.h - - src/core/lib/promise/map.h - src/core/lib/promise/pipe.h - src/core/lib/promise/poll.h - src/core/lib/promise/promise.h - - src/core/lib/promise/race.h - src/core/lib/promise/seq.h - - src/core/lib/resource_quota/arena.h - - src/core/lib/resource_quota/memory_quota.h - - src/core/lib/resource_quota/resource_quota.h - - src/core/lib/resource_quota/thread_quota.h - - src/core/lib/resource_quota/trace.h - - src/core/lib/slice/slice.h - - src/core/lib/slice/slice_internal.h - - src/core/lib/slice/slice_refcount.h - - src/core/lib/slice/slice_refcount_base.h - - src/core/lib/slice/slice_string_helpers.h - - src/core/lib/slice/slice_utils.h - - src/core/lib/slice/static_slice.h - test/core/promise/test_wakeup_schedulers.h src: - - src/core/lib/debug/trace.cc - - src/core/lib/event_engine/memory_allocator.cc - - src/core/lib/iomgr/combiner.cc - - src/core/lib/iomgr/error.cc - - src/core/lib/iomgr/exec_ctx.cc - - src/core/lib/iomgr/executor.cc - - src/core/lib/iomgr/iomgr_internal.cc + - src/core/ext/upb-generated/google/protobuf/any.upb.c + - src/core/ext/upb-generated/google/rpc/status.upb.c + - src/core/lib/gpr/alloc.cc + - src/core/lib/gpr/atm.cc + - src/core/lib/gpr/cpu_iphone.cc + - src/core/lib/gpr/cpu_linux.cc + - src/core/lib/gpr/cpu_posix.cc + - src/core/lib/gpr/cpu_windows.cc + - src/core/lib/gpr/env_linux.cc + - src/core/lib/gpr/env_posix.cc + - src/core/lib/gpr/env_windows.cc + - src/core/lib/gpr/log.cc + - src/core/lib/gpr/log_android.cc + - src/core/lib/gpr/log_linux.cc + - src/core/lib/gpr/log_posix.cc + - src/core/lib/gpr/log_windows.cc + - src/core/lib/gpr/murmur_hash.cc + - src/core/lib/gpr/string.cc + - src/core/lib/gpr/string_posix.cc + - src/core/lib/gpr/string_util_windows.cc + - src/core/lib/gpr/string_windows.cc + - src/core/lib/gpr/sync.cc + - src/core/lib/gpr/sync_abseil.cc + - src/core/lib/gpr/sync_posix.cc + - src/core/lib/gpr/sync_windows.cc + - src/core/lib/gpr/time.cc + - src/core/lib/gpr/time_posix.cc + - src/core/lib/gpr/time_precise.cc + - src/core/lib/gpr/time_windows.cc + - src/core/lib/gpr/tmpfile_msys.cc + - src/core/lib/gpr/tmpfile_posix.cc + - src/core/lib/gpr/tmpfile_windows.cc + - src/core/lib/gpr/wrap_memcpy.cc + - src/core/lib/gprpp/examine_stack.cc + - src/core/lib/gprpp/fork.cc + - src/core/lib/gprpp/global_config_env.cc + - src/core/lib/gprpp/host_port.cc + - src/core/lib/gprpp/mpscq.cc + - src/core/lib/gprpp/stat_posix.cc + - src/core/lib/gprpp/stat_windows.cc + - src/core/lib/gprpp/status_helper.cc + - src/core/lib/gprpp/thd_posix.cc + - src/core/lib/gprpp/thd_windows.cc + - src/core/lib/gprpp/time_util.cc + - src/core/lib/profiling/basic_timers.cc + - src/core/lib/profiling/stap_timers.cc - src/core/lib/promise/activity.cc - - src/core/lib/resource_quota/arena.cc - - src/core/lib/resource_quota/memory_quota.cc - - src/core/lib/resource_quota/resource_quota.cc - - src/core/lib/resource_quota/thread_quota.cc - - src/core/lib/resource_quota/trace.cc - - src/core/lib/slice/slice.cc - - src/core/lib/slice/slice_refcount.cc - - src/core/lib/slice/slice_string_helpers.cc - - src/core/lib/slice/static_slice.cc - test/core/promise/pipe_test.cc deps: + - absl/base:base + - absl/base:core_headers + - absl/memory:memory + - absl/random:random + - absl/status:status - absl/status:statusor + - absl/strings:cord + - absl/strings:str_format + - absl/strings:strings + - absl/synchronization:synchronization + - absl/time:time + - absl/types:optional - absl/types:variant - - gpr + - upb uses_polling: false - name: poll_test gtest: true diff --git a/src/core/lib/promise/activity.h b/src/core/lib/promise/activity.h index 1499b9921e7..5403aa8d368 100644 --- a/src/core/lib/promise/activity.h +++ b/src/core/lib/promise/activity.h @@ -24,7 +24,6 @@ #include #include #include -#include #include #include "absl/base/thread_annotations.h" @@ -285,17 +284,6 @@ class ContextHolder { Context* value_; }; -template -class ContextHolder> { - public: - explicit ContextHolder(std::unique_ptr value) - : value_(std::move(value)) {} - Context* GetContext() { return value_.get(); } - - private: - std::unique_ptr value_; -}; - template class EnterContexts : public promise_detail::Context... { public: @@ -428,9 +416,8 @@ class PromiseActivity final // to keep the scoping rules a little easier in Step(). absl::optional RunStep() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { ScopedActivity scoped_activity(this); - EnterContexts*>(this)->GetContext())>::type...> - contexts(static_cast*>(this)->GetContext()...); + EnterContexts contexts( + static_cast*>(this)->GetContext()...); return StepLoop(); } @@ -439,9 +426,8 @@ class PromiseActivity final absl::optional Start(Factory promise_factory) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { ScopedActivity scoped_activity(this); - EnterContexts*>(this)->GetContext())>::type...> - contexts(static_cast*>(this)->GetContext()...); + EnterContexts contexts( + static_cast*>(this)->GetContext()...); Construct(&promise_holder_.promise, promise_factory.Once()); return StepLoop(); } diff --git a/src/core/lib/promise/pipe.h b/src/core/lib/promise/pipe.h index c2de5da4161..fb53ff17476 100644 --- a/src/core/lib/promise/pipe.h +++ b/src/core/lib/promise/pipe.h @@ -31,13 +31,10 @@ #include "absl/types/optional.h" #include "absl/types/variant.h" -#include - #include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/detail/promise_factory.h" #include "src/core/lib/promise/intra_activity_waiter.h" #include "src/core/lib/promise/poll.h" -#include "src/core/lib/resource_quota/arena.h" namespace grpc_core { @@ -55,114 +52,39 @@ class Push; template class Next; -// Center sits between a sender and a receiver to provide a one-deep buffer of -// Ts -template -class Center { +template +class Promise { public: - // Initialize with one send ref (held by PipeSender) and one recv ref (held by - // PipeReceiver) - Center() { - send_refs_ = 1; - recv_refs_ = 1; - has_value_ = false; - } - - // Add one ref to the send side of this object, and return this. - Center* RefSend() { - send_refs_++; - return this; - } - - // Add one ref to the recv side of this object, and return this. - Center* RefRecv() { - recv_refs_++; - return this; - } - - // Drop a send side ref - // If no send refs remain, wake due to send closure - // If no refs remain, destroy this object - void UnrefSend() { - GPR_DEBUG_ASSERT(send_refs_ > 0); - send_refs_--; - if (0 == send_refs_) { - on_full_.Wake(); - on_empty_.Wake(); - if (0 == recv_refs_) { - this->~Center(); - } - } - } - - // Drop a recv side ref - // If no recv refs remain, wake due to recv closure - // If no refs remain, destroy this object - void UnrefRecv() { - GPR_DEBUG_ASSERT(recv_refs_ > 0); - recv_refs_--; - if (0 == recv_refs_) { - on_full_.Wake(); - on_empty_.Wake(); - if (0 == send_refs_) { - this->~Center(); - } else if (has_value_) { - ResetValue(); - } - } - } + virtual Poll Step(T* output) = 0; + virtual void Stop() = 0; - // Try to push *value into the pipe. - // Return Pending if there is no space. - // Return true if the value was pushed. - // Return false if the recv end is closed. - Poll Push(T* value) { - GPR_DEBUG_ASSERT(send_refs_ != 0); - if (recv_refs_ == 0) return false; - if (has_value_) return on_empty_.pending(); - has_value_ = true; - value_ = std::move(*value); - on_full_.Wake(); - return true; - } + protected: + inline virtual ~Promise() = default; +}; - // Try to receive a value from the pipe. - // Return Pending if there is no value. - // Return the value if one was retrieved. - // Return nullopt if the send end is closed and no value had been pushed. - Poll> Next() { - GPR_DEBUG_ASSERT(recv_refs_ != 0); - if (!has_value_) { - if (send_refs_ == 0) return absl::nullopt; - return on_full_.pending(); - } - has_value_ = false; - on_empty_.Wake(); - return std::move(value_); - } +struct alignas(alignof(void*)) Scratch { + uint8_t scratch[32]; +}; - private: - void ResetValue() { - // Fancy dance to move out of value in the off chance that we reclaim some - // memory earlier. - [](T) {}(std::move(value_)); - has_value_ = false; - } - T value_; - // Number of sending objects. - // 0 => send is closed. - // 1 ref each for PipeSender and Push. - uint8_t send_refs_ : 2; - // Number of receiving objects. - // 0 => recv is closed. - // 1 ref each for PipeReceiver and Next. - uint8_t recv_refs_ : 2; - // True iff there is a value in the pipe. - bool has_value_ : 1; - IntraActivityWaiter on_empty_; - IntraActivityWaiter on_full_; +template +class FilterInterface { + public: + FilterInterface() = default; + FilterInterface(const FilterInterface&) = delete; + FilterInterface& operator=(const FilterInterface&) = delete; + virtual Promise* Step(T* p, Scratch* scratch_space) = 0; + virtual void UpdateReceiver(PipeReceiver* receiver) = 0; + + protected: + inline virtual ~FilterInterface() {} + static void SetReceiverIndex(PipeReceiver* receiver, int idx, + FilterInterface* p); + char AllocIndex(PipeReceiver* receiver); }; +template +class Filter; + } // namespace pipe_detail // Send end of a Pipe. @@ -172,18 +94,43 @@ class PipeSender { PipeSender(const PipeSender&) = delete; PipeSender& operator=(const PipeSender&) = delete; - PipeSender(PipeSender&& other) noexcept : center_(other.center_) { - other.center_ = nullptr; + PipeSender(PipeSender&& other) noexcept + : receiver_(other.receiver_), push_(other.push_) { + if (receiver_ != nullptr) { + receiver_->sender_ = this; + other.receiver_ = nullptr; + } + if (push_ != nullptr) { + push_->sender_ = this; + other.push_ = nullptr; + } } PipeSender& operator=(PipeSender&& other) noexcept { - if (center_ != nullptr) center_->UnrefSend(); - center_ = other.center_; - other.center_ = nullptr; + if (receiver_ != nullptr) { + receiver_->sender_ = nullptr; + } + if (push_ != nullptr) { + push_->sender_ = nullptr; + } + receiver_ = other.receiver_; + if (receiver_ != nullptr) { + receiver_->sender_ = this; + other.receiver_ = nullptr; + } + if (push_ != nullptr) { + push_->sender_ = this; + other.push_ = nullptr; + } return *this; } ~PipeSender() { - if (center_ != nullptr) center_->UnrefSend(); + if (receiver_ != nullptr) { + receiver_->MarkClosed(); + } + if (push_ != nullptr) { + push_->sender_ = nullptr; + } } // Send a single message along the pipe. @@ -192,10 +139,21 @@ class PipeSender { // receiver is either closed or able to receive another message. pipe_detail::Push Push(T value); + // Attach a promise factory based filter to this pipe. + // The overall promise returned from this will be active until the pipe is + // closed. If this promise is cancelled before the pipe is closed, the pipe + // will close. The filter will be run _after_ any other registered filters. + template + pipe_detail::Filter Filter(F f); + private: friend struct Pipe; - explicit PipeSender(pipe_detail::Center* center) : center_(center) {} - pipe_detail::Center* center_; + friend class PipeReceiver; + friend class pipe_detail::Next; + friend class pipe_detail::Push; + explicit PipeSender(PipeReceiver* receiver) : receiver_(receiver) {} + PipeReceiver* receiver_; + pipe_detail::Push* push_ = nullptr; }; // Receive end of a Pipe. @@ -205,17 +163,56 @@ class PipeReceiver { PipeReceiver(const PipeReceiver&) = delete; PipeReceiver& operator=(const PipeReceiver&) = delete; - PipeReceiver(PipeReceiver&& other) noexcept : center_(other.center_) { - other.center_ = nullptr; + PipeReceiver(PipeReceiver&& other) noexcept + : sender_(other.sender_), + next_(other.next_), + filters_(std::move(other.filters_)), + pending_(std::move(other.pending_)), + waiting_to_send_(std::move(other.waiting_to_send_)), + waiting_to_receive_(other.waiting_to_receive_) { + if (sender_ != nullptr) { + sender_->receiver_ = this; + other.sender_ = nullptr; + } + if (next_ != nullptr) { + next_->receiver_ = this; + other.next_ = nullptr; + } + for (auto filter : filters_) { + filter->UpdateReceiver(this); + } } PipeReceiver& operator=(PipeReceiver&& other) noexcept { - if (center_ != nullptr) center_->UnrefRecv(); - center_ = other.center_; - other.center_ = nullptr; + if (sender_ != nullptr) { + sender_->receiver_ = nullptr; + } + if (next_ != nullptr) { + next_->receiver_ = nullptr; + } + sender_ = other.sender_; + next_ = other.next_; + filters_ = std::move(other.filters_); + for (auto filter : filters_) { + filter->UpdateReceiver(this); + } + pending_ = std::move(other.pending_); + waiting_to_send_ = std::move(other.waiting_to_send_); + waiting_to_receive_ = std::move(other.waiting_to_receive_); + if (sender_ != nullptr) { + sender_->receiver_ = this; + other.sender_ = nullptr; + } + if (next_ != nullptr) { + next_->receiver_ = this; + other.next_ = nullptr; + } return *this; } ~PipeReceiver() { - if (center_ != nullptr) center_->UnrefRecv(); + MarkClosed(); + if (next_ != nullptr) { + next_->receiver_ = nullptr; + } } // Receive a single message from the pipe. @@ -225,10 +222,44 @@ class PipeReceiver { // available. pipe_detail::Next Next(); + // Attach a promise factory based filter to this pipe. + // The overall promise returned from this will be active until the pipe is + // closed. If this promise is cancelled before the pipe is closed, the pipe + // will close. The filter will be run _after_ any other registered filters. + template + pipe_detail::Filter Filter(F f); + private: friend struct Pipe; - explicit PipeReceiver(pipe_detail::Center* center) : center_(center) {} - pipe_detail::Center* center_; + friend class PipeSender; + friend class pipe_detail::Next; + friend class pipe_detail::Push; + friend class pipe_detail::FilterInterface; + explicit PipeReceiver(PipeSender* sender) : sender_(sender) {} + PipeSender* sender_; + pipe_detail::Next* next_ = nullptr; + absl::InlinedVector*, 12> filters_; + absl::optional pending_; + IntraActivityWaiter waiting_to_send_; + IntraActivityWaiter waiting_to_receive_; + + void MarkClosed() { + if (sender_ == nullptr) { + return; + } + + sender_->receiver_ = nullptr; + + waiting_to_receive_.Wake(); + waiting_to_send_.Wake(); + sender_ = nullptr; + + for (auto* filter : filters_) { + if (filter != nullptr) { + filter->UpdateReceiver(nullptr); + } + } + } }; namespace pipe_detail { @@ -240,28 +271,55 @@ class Push { Push(const Push&) = delete; Push& operator=(const Push&) = delete; Push(Push&& other) noexcept - : center_(other.center_), push_(std::move(other.push_)) { - other.center_ = nullptr; + : sender_(other.sender_), push_(std::move(other.push_)) { + if (sender_ != nullptr) { + sender_->push_ = this; + other.sender_ = nullptr; + } } Push& operator=(Push&& other) noexcept { - if (center_ != nullptr) center_->UnrefSend(); - center_ = other.center_; - other.center_ = nullptr; + if (sender_ != nullptr) { + sender_->push_ = nullptr; + } + sender_ = other.sender_; push_ = std::move(other.push_); + if (sender_ != nullptr) { + sender_->push_ = this; + other.sender_ = nullptr; + } return *this; } ~Push() { - if (center_ != nullptr) center_->UnrefSend(); + if (sender_ != nullptr) { + assert(sender_->push_ == this); + sender_->push_ = nullptr; + } } - Poll operator()() { return center_->Push(&push_); } + Poll operator()() { + auto* receiver = sender_->receiver_; + if (receiver == nullptr) { + return false; + } + if (receiver->pending_.has_value()) { + return receiver->waiting_to_send_.pending(); + } + receiver->pending_ = std::move(push_); + receiver->waiting_to_receive_.Wake(); + sender_->push_ = nullptr; + sender_ = nullptr; + return true; + } private: friend class PipeSender; - explicit Push(pipe_detail::Center* center, T push) - : center_(center), push_(std::move(push)) {} - Center* center_; + Push(PipeSender* sender, T push) + : sender_(sender), push_(std::move(push)) { + assert(sender_->push_ == nullptr); + sender_->push_ = this; + } + PipeSender* sender_; T push_; }; @@ -271,56 +329,262 @@ class Next { public: Next(const Next&) = delete; Next& operator=(const Next&) = delete; - Next(Next&& other) noexcept : center_(other.center_) { - other.center_ = nullptr; + Next(Next&& other) noexcept + : receiver_(other.receiver_), + next_filter_(other.next_filter_), + current_promise_(nullptr) { + assert(other.current_promise_ == nullptr); + if (receiver_ != nullptr) { + receiver_->next_ = this; + other.receiver_ = nullptr; + } } Next& operator=(Next&& other) noexcept { - if (center_ != nullptr) center_->UnrefRecv(); - center_ = other.center_; - other.center_ = nullptr; + assert(current_promise_ == nullptr); + assert(other.current_promise_ == nullptr); + if (receiver_ != nullptr) { + receiver_->next_ = nullptr; + } + receiver_ = other.receiver_; + next_filter_ = other.next_filter_; + if (receiver_ != nullptr) { + receiver_->next_ = this; + other.receiver_ = nullptr; + } return *this; } ~Next() { - if (center_ != nullptr) center_->UnrefRecv(); + if (receiver_ != nullptr) { + assert(receiver_->next_ == this); + receiver_->next_ = nullptr; + } + if (current_promise_ != nullptr) { + current_promise_->Stop(); + } } - Poll> operator()() { return center_->Next(); } + Poll> operator()() { + if (receiver_->pending_.has_value()) { + auto* pending = &*receiver_->pending_; + if (current_promise_ != nullptr) { + auto r = current_promise_->Step(pending); + if (auto* p = absl::get_if(&r)) { + current_promise_->Stop(); + current_promise_ = nullptr; + if (!*p) { + receiver_->MarkClosed(); + return absl::optional(); + } + } else { + return Pending(); + } + } + while (true) { + if (next_filter_ >= receiver_->filters_.size()) { + auto result = absl::optional(std::move(*pending)); + receiver_->pending_.reset(); + receiver_->waiting_to_send_.Wake(); + receiver_->next_ = nullptr; + receiver_ = nullptr; + return result; + } + auto* filter = receiver_->filters_[next_filter_]; + current_promise_ = filter ? filter->Step(pending, &scratch_) : nullptr; + next_filter_++; + if (current_promise_ == + reinterpret_cast*>(uintptr_t(false))) { + current_promise_ = nullptr; + receiver_->MarkClosed(); + return absl::optional(); + } else if (current_promise_ == + reinterpret_cast*>(uintptr_t(true))) { + current_promise_ = nullptr; + } else { + return Pending(); + } + } + } + if (receiver_->sender_ == nullptr) { + return absl::optional(); + } + return receiver_->waiting_to_receive_.pending(); + } private: friend class PipeReceiver; - explicit Next(pipe_detail::Center* center) : center_(center) {} - Center* center_; + explicit Next(PipeReceiver* receiver) : receiver_(receiver) { + assert(receiver_->next_ == nullptr); + receiver_->next_ = this; + } + PipeReceiver* receiver_; + size_t next_filter_ = 0; + Promise* current_promise_ = nullptr; + Scratch scratch_; +}; + +template +class Filter final : private FilterInterface { + public: + Filter(PipeReceiver* receiver, F f) + : active_{receiver, promise_detail::PromiseFactory(std::move(f))}, + index_(this->AllocIndex(receiver)){}; + explicit Filter(absl::Status already_finished) + : done_(std::move(already_finished)) {} + ~Filter() { + if (index_ != kTombstoneIndex) { + this->SetReceiverIndex(active_.receiver, index_, nullptr); + active_.~Active(); + } else { + done_.~Status(); + } + } + Filter(Filter&& other) noexcept : index_(other.index_) { + if (index_ != kTombstoneIndex) { + new (&active_) Active(std::move(other.active_)); + other.active_.~Active(); + new (&other.done_) absl::Status(absl::OkStatus()); + other.index_ = kTombstoneIndex; + this->SetReceiverIndex(active_.receiver, index_, this); + } else { + new (&done_) absl::Status(std::move(other.done_)); + } + } + + Filter(const Filter&) = delete; + Filter& operator=(const Filter&) = delete; + + Poll operator()() { + if (index_ == kTombstoneIndex) { + return std::move(done_); + } + return Pending(); + } + + private: + static constexpr char kTombstoneIndex = -1; + struct Active { + GPR_NO_UNIQUE_ADDRESS PipeReceiver* receiver; + GPR_NO_UNIQUE_ADDRESS promise_detail::PromiseFactory factory; + }; + union { + GPR_NO_UNIQUE_ADDRESS Active active_; + GPR_NO_UNIQUE_ADDRESS absl::Status done_; + }; + GPR_NO_UNIQUE_ADDRESS char index_; + + class PromiseImpl final : public ::grpc_core::pipe_detail::Promise { + using PF = typename promise_detail::PromiseFactory::Promise; + + public: + PromiseImpl(PF f, Filter* filter) : f_(std::move(f)), filter_(filter) {} + + Poll Step(T* output) final { + auto r = f_(); + if (auto* p = absl::get_if(&r)) { + if (p->ok()) { + *output = std::move(**p); + return true; + } else { + filter_->SetReceiverIndex(filter_->active_.receiver, filter_->index_, + nullptr); + filter_->active_.~Active(); + filter_->index_ = kTombstoneIndex; + new (&filter_->done_) absl::Status(std::move(p->status())); + Activity::WakeupCurrent(); + return false; + } + } else { + return Pending(); + } + } + + void Stop() final { this->~PromiseImpl(); } + + private: + PF f_; + Filter* filter_; + }; + + Promise* Step(T* p, Scratch* scratch) final { + if (index_ != kTombstoneIndex) { + PromiseImpl promise(active_.factory.Repeated(std::move(*p)), this); + auto r = promise.Step(p); + if (auto* result = absl::get_if(&r)) { + return reinterpret_cast*>(uintptr_t(*result)); + } + static_assert(sizeof(promise) <= sizeof(Scratch), + "scratch size too small"); + static_assert(alignof(decltype(promise)) <= alignof(Scratch), + "bad alignment"); + return new (scratch) decltype(promise)(std::move(promise)); + } else { + return nullptr; + } + } + + void UpdateReceiver(PipeReceiver* receiver) final { + if (index_ != kTombstoneIndex) { + if (receiver == nullptr) { + active_.~Active(); + index_ = kTombstoneIndex; + new (&done_) absl::Status(absl::OkStatus()); + } else { + active_.receiver = receiver; + } + Activity::WakeupCurrent(); + } + } }; +template +void FilterInterface::SetReceiverIndex(PipeReceiver* receiver, int idx, + FilterInterface* p) { + receiver->filters_[idx] = p; +} + +template +char FilterInterface::AllocIndex(PipeReceiver* receiver) { + auto r = receiver->filters_.size(); + receiver->filters_.push_back(this); + return r; +} + } // namespace pipe_detail template pipe_detail::Push PipeSender::Push(T value) { - return pipe_detail::Push(center_->RefSend(), std::move(value)); + return pipe_detail::Push(this, std::move(value)); } template pipe_detail::Next PipeReceiver::Next() { - return pipe_detail::Next(center_->RefRecv()); + return pipe_detail::Next(this); +} + +template +template +pipe_detail::Filter PipeSender::Filter(F f) { + if (receiver_) { + return pipe_detail::Filter(receiver_, std::move(f)); + } else { + return pipe_detail::Filter(absl::OkStatus()); + } +} + +template +template +pipe_detail::Filter PipeReceiver::Filter(F f) { + return pipe_detail::Filter(this, std::move(f)); } // A Pipe is an intra-Activity communications channel that transmits T's from // one end to the other. // It is only safe to use a Pipe within the context of a single Activity. // No synchronization is performed internally. -// The primary Pipe data structure is allocated from an arena, so the activity -// must have an arena as part of its context. -// By performing that allocation we can ensure stable pointer to shared data -// allowing PipeSender/PipeReceiver/Push/Next to be relatively simple in their -// implementation. -// This type has been optimized with the expectation that there are relatively -// few pipes per activity. If this assumption does not hold then a design -// allowing inline filtering of pipe contents (instead of connecting pipes with -// polling code) would likely be more appropriate. template struct Pipe { - Pipe() : Pipe(GetContext()->New>()) {} + Pipe() : sender(&receiver), receiver(&sender) {} Pipe(const Pipe&) = delete; Pipe& operator=(const Pipe&) = delete; Pipe(Pipe&&) noexcept = default; @@ -328,10 +592,6 @@ struct Pipe { PipeSender sender; PipeReceiver receiver; - - private: - explicit Pipe(pipe_detail::Center* center) - : sender(center), receiver(center) {} }; } // namespace grpc_core diff --git a/test/core/promise/BUILD b/test/core/promise/BUILD index 9f363859907..41061142e3e 100644 --- a/test/core/promise/BUILD +++ b/test/core/promise/BUILD @@ -260,7 +260,6 @@ grpc_cc_test( "//:map", "//:observable", "//:pipe", - "//:resource_quota", "//:seq", "//test/core/util:grpc_suppressions", ], @@ -277,7 +276,6 @@ grpc_cc_test( "//:join", "//:pipe", "//:promise", - "//:resource_quota", "//:seq", "//test/core/util:grpc_suppressions", ], diff --git a/test/core/promise/for_each_test.cc b/test/core/promise/for_each_test.cc index ebf1477f3fd..d668c3d4e26 100644 --- a/test/core/promise/for_each_test.cc +++ b/test/core/promise/for_each_test.cc @@ -22,7 +22,6 @@ #include "src/core/lib/promise/observable.h" #include "src/core/lib/promise/pipe.h" #include "src/core/lib/promise/seq.h" -#include "src/core/lib/resource_quota/resource_quota.h" #include "test/core/promise/test_wakeup_schedulers.h" using testing::Mock; @@ -31,25 +30,22 @@ using testing::StrictMock; namespace grpc_core { -static auto* g_memory_allocator = new MemoryAllocator( - ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test")); - TEST(ForEachTest, SendThriceWithPipe) { + Pipe pipe; int num_received = 0; StrictMock> on_done; EXPECT_CALL(on_done, Call(absl::OkStatus())); MakeActivity( - [&num_received] { - Pipe pipe; - auto sender = std::make_shared>>( - absl::make_unique>(std::move(pipe.sender))); + [&pipe, &num_received] { return Map( Join( // Push 3 things into a pipe -- 1, 2, then 3 -- then close. - Seq((*sender)->Push(1), [sender] { return (*sender)->Push(2); }, - [sender] { return (*sender)->Push(3); }, - [sender] { - sender->reset(); + Seq( + pipe.sender.Push(1), + [&pipe] { return pipe.sender.Push(2); }, + [&pipe] { return pipe.sender.Push(3); }, + [&pipe] { + auto drop = std::move(pipe.sender); return absl::OkStatus(); }), // Use a ForEach loop to read them out and verify all values are @@ -63,8 +59,7 @@ TEST(ForEachTest, SendThriceWithPipe) { JustElem<1>()); }, NoWakeupScheduler(), - [&on_done](absl::Status status) { on_done.Call(std::move(status)); }, - MakeScopedArena(1024, g_memory_allocator)); + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); Mock::VerifyAndClearExpectations(&on_done); EXPECT_EQ(num_received, 3); } diff --git a/test/core/promise/pipe_test.cc b/test/core/promise/pipe_test.cc index a2025809e66..ba38fbef666 100644 --- a/test/core/promise/pipe_test.cc +++ b/test/core/promise/pipe_test.cc @@ -22,7 +22,6 @@ #include "src/core/lib/promise/join.h" #include "src/core/lib/promise/promise.h" #include "src/core/lib/promise/seq.h" -#include "src/core/lib/resource_quota/resource_quota.h" #include "test/core/promise/test_wakeup_schedulers.h" using testing::MockFunction; @@ -30,15 +29,12 @@ using testing::StrictMock; namespace grpc_core { -static auto* g_memory_allocator = new MemoryAllocator( - ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test")); - TEST(PipeTest, CanSendAndReceive) { + Pipe pipe; StrictMock> on_done; EXPECT_CALL(on_done, Call(absl::OkStatus())); MakeActivity( - [] { - Pipe pipe; + [&pipe] { return Seq( // Concurrently: send 42 into the pipe, and receive from the pipe. Join(pipe.sender.Push(42), pipe.receiver.Next()), @@ -50,16 +46,15 @@ TEST(PipeTest, CanSendAndReceive) { }); }, NoWakeupScheduler(), - [&on_done](absl::Status status) { on_done.Call(std::move(status)); }, - MakeScopedArena(1024, g_memory_allocator)); + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); } TEST(PipeTest, CanReceiveAndSend) { + Pipe pipe; StrictMock> on_done; EXPECT_CALL(on_done, Call(absl::OkStatus())); MakeActivity( - [] { - Pipe pipe; + [&pipe] { return Seq( // Concurrently: receive from the pipe, and send 42 into the pipe. Join(pipe.receiver.Next(), pipe.sender.Push(42)), @@ -71,29 +66,28 @@ TEST(PipeTest, CanReceiveAndSend) { }); }, NoWakeupScheduler(), - [&on_done](absl::Status status) { on_done.Call(std::move(status)); }, - MakeScopedArena(1024, g_memory_allocator)); + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); } TEST(PipeTest, CanSeeClosedOnSend) { + Pipe pipe; StrictMock> on_done; + auto sender = std::move(pipe.sender); + auto receiver = + absl::make_unique>(std::move(pipe.receiver)); EXPECT_CALL(on_done, Call(absl::OkStatus())); + // Push 42 onto the pipe - this will the pipe's one-deep send buffer. + EXPECT_TRUE(NowOrNever(sender.Push(42)).has_value()); MakeActivity( - [] { - Pipe pipe; - auto sender = std::move(pipe.sender); - // Push 42 onto the pipe - this will the pipe's one-deep send buffer. - EXPECT_TRUE(NowOrNever(sender.Push(42)).has_value()); - auto receiver = std::make_shared>>( - absl::make_unique>(std::move(pipe.receiver))); + [&sender, &receiver] { return Seq( // Concurrently: // - push 43 into the sender, which will stall because the buffer is // full // - and close the receiver, which will fail the pending send. Join(sender.Push(43), - [receiver] { - receiver->reset(); + [&receiver] { + receiver.reset(); return absl::OkStatus(); }), // Verify both that the send failed and that we executed the close. @@ -103,19 +97,17 @@ TEST(PipeTest, CanSeeClosedOnSend) { }); }, NoWakeupScheduler(), - [&on_done](absl::Status status) { on_done.Call(std::move(status)); }, - MakeScopedArena(1024, g_memory_allocator)); + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); } TEST(PipeTest, CanSeeClosedOnReceive) { + Pipe pipe; StrictMock> on_done; + auto sender = absl::make_unique>(std::move(pipe.sender)); + auto receiver = std::move(pipe.receiver); EXPECT_CALL(on_done, Call(absl::OkStatus())); MakeActivity( - [] { - Pipe pipe; - auto sender = std::make_shared>>( - absl::make_unique>(std::move(pipe.sender))); - auto receiver = std::move(pipe.receiver); + [&sender, &receiver] { return Seq( // Concurrently: // - wait for a received value (will stall forever since we push @@ -123,8 +115,8 @@ TEST(PipeTest, CanSeeClosedOnReceive) { // - close the sender, which will signal the receiver to return an // end-of-stream. Join(receiver.Next(), - [sender] { - sender->reset(); + [&sender] { + sender.reset(); return absl::OkStatus(); }), // Verify we received end-of-stream and closed the sender. @@ -135,8 +127,49 @@ TEST(PipeTest, CanSeeClosedOnReceive) { }); }, NoWakeupScheduler(), - [&on_done](absl::Status status) { on_done.Call(std::move(status)); }, - MakeScopedArena(1024, g_memory_allocator)); + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); +} + +TEST(PipeTest, CanFilter) { + Pipe pipe; + StrictMock> on_done; + EXPECT_CALL(on_done, Call(absl::OkStatus())); + MakeActivity( + [&pipe] { + // Setup some filters here, carefully getting ordering correct by doing + // so outside of the Join() since C++ does not define execution order + // between arguments. + // TODO(ctiller): A future change to Pipe will specify an ordering + // between filters added to sender and receiver, at which point these + // should move back. + auto doubler = pipe.receiver.Filter( + [](int p) { return absl::StatusOr(p * 2); }); + auto adder = pipe.sender.Filter( + [](int p) { return absl::StatusOr(p + 1); }); + return Seq( + // Concurrently: + // - push 42 into the pipe + // - wait for a value to be received, and filter it by doubling it + // - wait for a value to be received, and filter it by adding one to + // it + // - wait for a value to be received and close the pipe. + Join(pipe.sender.Push(42), std::move(doubler), std::move(adder), + Seq(pipe.receiver.Next(), + [&pipe](absl::optional i) { + auto x = std::move(pipe.receiver); + return i; + })), + // Verify all of the above happened correctly. + [](std::tuple> + result) { + EXPECT_EQ(result, std::make_tuple(true, absl::OkStatus(), + absl::OkStatus(), + absl::optional(85))); + return absl::OkStatus(); + }); + }, + NoWakeupScheduler(), + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); } } // namespace grpc_core