Revert "Promise pipe redux (#28319)" (#28364)

This reverts commit b9ea84ac9d.
pull/28365/head
Jan Tattermusch 3 years ago committed by GitHub
parent 56f85ba308
commit 3df2a4ecea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 150
      CMakeLists.txt
  3. 267
      build_autogenerated.yaml
  4. 22
      src/core/lib/promise/activity.h
  5. 572
      src/core/lib/promise/pipe.h
  6. 2
      test/core/promise/BUILD
  7. 23
      test/core/promise/for_each_test.cc
  8. 97
      test/core/promise/pipe_test.cc

@ -1372,7 +1372,6 @@ grpc_cc_library(
],
deps = [
"activity",
"arena",
"gpr_platform",
"intra_activity_waiter",
],

150
CMakeLists.txt generated

@ -10717,23 +10717,53 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(for_each_test
src/core/lib/debug/trace.cc
src/core/lib/event_engine/memory_allocator.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/iomgr_internal.cc
src/core/ext/upb-generated/google/protobuf/any.upb.c
src/core/ext/upb-generated/google/rpc/status.upb.c
src/core/lib/gpr/alloc.cc
src/core/lib/gpr/atm.cc
src/core/lib/gpr/cpu_iphone.cc
src/core/lib/gpr/cpu_linux.cc
src/core/lib/gpr/cpu_posix.cc
src/core/lib/gpr/cpu_windows.cc
src/core/lib/gpr/env_linux.cc
src/core/lib/gpr/env_posix.cc
src/core/lib/gpr/env_windows.cc
src/core/lib/gpr/log.cc
src/core/lib/gpr/log_android.cc
src/core/lib/gpr/log_linux.cc
src/core/lib/gpr/log_posix.cc
src/core/lib/gpr/log_windows.cc
src/core/lib/gpr/murmur_hash.cc
src/core/lib/gpr/string.cc
src/core/lib/gpr/string_posix.cc
src/core/lib/gpr/string_util_windows.cc
src/core/lib/gpr/string_windows.cc
src/core/lib/gpr/sync.cc
src/core/lib/gpr/sync_abseil.cc
src/core/lib/gpr/sync_posix.cc
src/core/lib/gpr/sync_windows.cc
src/core/lib/gpr/time.cc
src/core/lib/gpr/time_posix.cc
src/core/lib/gpr/time_precise.cc
src/core/lib/gpr/time_windows.cc
src/core/lib/gpr/tmpfile_msys.cc
src/core/lib/gpr/tmpfile_posix.cc
src/core/lib/gpr/tmpfile_windows.cc
src/core/lib/gpr/wrap_memcpy.cc
src/core/lib/gprpp/examine_stack.cc
src/core/lib/gprpp/fork.cc
src/core/lib/gprpp/global_config_env.cc
src/core/lib/gprpp/host_port.cc
src/core/lib/gprpp/mpscq.cc
src/core/lib/gprpp/stat_posix.cc
src/core/lib/gprpp/stat_windows.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/thd_posix.cc
src/core/lib/gprpp/thd_windows.cc
src/core/lib/gprpp/time_util.cc
src/core/lib/profiling/basic_timers.cc
src/core/lib/profiling/stap_timers.cc
src/core/lib/promise/activity.cc
src/core/lib/resource_quota/arena.cc
src/core/lib/resource_quota/memory_quota.cc
src/core/lib/resource_quota/resource_quota.cc
src/core/lib/resource_quota/thread_quota.cc
src/core/lib/resource_quota/trace.cc
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_refcount.cc
src/core/lib/slice/slice_string_helpers.cc
src/core/lib/slice/static_slice.cc
test/core/promise/for_each_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@ -10761,10 +10791,21 @@ target_include_directories(for_each_test
target_link_libraries(for_each_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::base
absl::core_headers
absl::flat_hash_set
absl::memory
absl::random_random
absl::status
absl::statusor
absl::cord
absl::str_format
absl::strings
absl::synchronization
absl::time
absl::optional
absl::variant
gpr
upb
)
@ -13353,23 +13394,53 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(pipe_test
src/core/lib/debug/trace.cc
src/core/lib/event_engine/memory_allocator.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/iomgr_internal.cc
src/core/ext/upb-generated/google/protobuf/any.upb.c
src/core/ext/upb-generated/google/rpc/status.upb.c
src/core/lib/gpr/alloc.cc
src/core/lib/gpr/atm.cc
src/core/lib/gpr/cpu_iphone.cc
src/core/lib/gpr/cpu_linux.cc
src/core/lib/gpr/cpu_posix.cc
src/core/lib/gpr/cpu_windows.cc
src/core/lib/gpr/env_linux.cc
src/core/lib/gpr/env_posix.cc
src/core/lib/gpr/env_windows.cc
src/core/lib/gpr/log.cc
src/core/lib/gpr/log_android.cc
src/core/lib/gpr/log_linux.cc
src/core/lib/gpr/log_posix.cc
src/core/lib/gpr/log_windows.cc
src/core/lib/gpr/murmur_hash.cc
src/core/lib/gpr/string.cc
src/core/lib/gpr/string_posix.cc
src/core/lib/gpr/string_util_windows.cc
src/core/lib/gpr/string_windows.cc
src/core/lib/gpr/sync.cc
src/core/lib/gpr/sync_abseil.cc
src/core/lib/gpr/sync_posix.cc
src/core/lib/gpr/sync_windows.cc
src/core/lib/gpr/time.cc
src/core/lib/gpr/time_posix.cc
src/core/lib/gpr/time_precise.cc
src/core/lib/gpr/time_windows.cc
src/core/lib/gpr/tmpfile_msys.cc
src/core/lib/gpr/tmpfile_posix.cc
src/core/lib/gpr/tmpfile_windows.cc
src/core/lib/gpr/wrap_memcpy.cc
src/core/lib/gprpp/examine_stack.cc
src/core/lib/gprpp/fork.cc
src/core/lib/gprpp/global_config_env.cc
src/core/lib/gprpp/host_port.cc
src/core/lib/gprpp/mpscq.cc
src/core/lib/gprpp/stat_posix.cc
src/core/lib/gprpp/stat_windows.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/thd_posix.cc
src/core/lib/gprpp/thd_windows.cc
src/core/lib/gprpp/time_util.cc
src/core/lib/profiling/basic_timers.cc
src/core/lib/profiling/stap_timers.cc
src/core/lib/promise/activity.cc
src/core/lib/resource_quota/arena.cc
src/core/lib/resource_quota/memory_quota.cc
src/core/lib/resource_quota/resource_quota.cc
src/core/lib/resource_quota/thread_quota.cc
src/core/lib/resource_quota/trace.cc
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_refcount.cc
src/core/lib/slice/slice_string_helpers.cc
src/core/lib/slice/static_slice.cc
test/core/promise/pipe_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@ -13397,9 +13468,20 @@ target_include_directories(pipe_test
target_link_libraries(pipe_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::base
absl::core_headers
absl::memory
absl::random_random
absl::status
absl::statusor
absl::cord
absl::str_format
absl::strings
absl::synchronization
absl::time
absl::optional
absl::variant
gpr
upb
)

@ -5853,21 +5853,38 @@ targets:
build: test
language: c++
headers:
- src/core/lib/debug/trace.h
- src/core/ext/upb-generated/google/protobuf/any.upb.h
- src/core/ext/upb-generated/google/rpc/status.upb.h
- src/core/lib/gpr/alloc.h
- src/core/lib/gpr/env.h
- src/core/lib/gpr/murmur_hash.h
- src/core/lib/gpr/spinlock.h
- src/core/lib/gpr/string.h
- src/core/lib/gpr/string_windows.h
- src/core/lib/gpr/time_precise.h
- src/core/lib/gpr/tls.h
- src/core/lib/gpr/tmpfile.h
- src/core/lib/gpr/useful.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/combiner.h
- src/core/lib/iomgr/error.h
- src/core/lib/iomgr/error_internal.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/iomgr_internal.h
- src/core/lib/gprpp/construct_destruct.h
- src/core/lib/gprpp/debug_location.h
- src/core/lib/gprpp/examine_stack.h
- src/core/lib/gprpp/fork.h
- src/core/lib/gprpp/global_config.h
- src/core/lib/gprpp/global_config_custom.h
- src/core/lib/gprpp/global_config_env.h
- src/core/lib/gprpp/global_config_generic.h
- src/core/lib/gprpp/host_port.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/memory.h
- src/core/lib/gprpp/mpscq.h
- src/core/lib/gprpp/stat.h
- src/core/lib/gprpp/status_helper.h
- src/core/lib/gprpp/sync.h
- src/core/lib/gprpp/thd.h
- src/core/lib/gprpp/time_util.h
- src/core/lib/profiling/timers.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_join.h
@ -5876,55 +5893,81 @@ targets:
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/detail/switch.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/for_each.h
- src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/join.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/observable.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/promise/wait_set.h
- src/core/lib/resource_quota/arena.h
- src/core/lib/resource_quota/memory_quota.h
- src/core/lib/resource_quota/resource_quota.h
- src/core/lib/resource_quota/thread_quota.h
- src/core/lib/resource_quota/trace.h
- src/core/lib/slice/slice.h
- src/core/lib/slice/slice_internal.h
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_refcount_base.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/lib/slice/slice_utils.h
- src/core/lib/slice/static_slice.h
- test/core/promise/test_wakeup_schedulers.h
src:
- src/core/lib/debug/trace.cc
- src/core/lib/event_engine/memory_allocator.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/iomgr_internal.cc
- src/core/ext/upb-generated/google/protobuf/any.upb.c
- src/core/ext/upb-generated/google/rpc/status.upb.c
- src/core/lib/gpr/alloc.cc
- src/core/lib/gpr/atm.cc
- src/core/lib/gpr/cpu_iphone.cc
- src/core/lib/gpr/cpu_linux.cc
- src/core/lib/gpr/cpu_posix.cc
- src/core/lib/gpr/cpu_windows.cc
- src/core/lib/gpr/env_linux.cc
- src/core/lib/gpr/env_posix.cc
- src/core/lib/gpr/env_windows.cc
- src/core/lib/gpr/log.cc
- src/core/lib/gpr/log_android.cc
- src/core/lib/gpr/log_linux.cc
- src/core/lib/gpr/log_posix.cc
- src/core/lib/gpr/log_windows.cc
- src/core/lib/gpr/murmur_hash.cc
- src/core/lib/gpr/string.cc
- src/core/lib/gpr/string_posix.cc
- src/core/lib/gpr/string_util_windows.cc
- src/core/lib/gpr/string_windows.cc
- src/core/lib/gpr/sync.cc
- src/core/lib/gpr/sync_abseil.cc
- src/core/lib/gpr/sync_posix.cc
- src/core/lib/gpr/sync_windows.cc
- src/core/lib/gpr/time.cc
- src/core/lib/gpr/time_posix.cc
- src/core/lib/gpr/time_precise.cc
- src/core/lib/gpr/time_windows.cc
- src/core/lib/gpr/tmpfile_msys.cc
- src/core/lib/gpr/tmpfile_posix.cc
- src/core/lib/gpr/tmpfile_windows.cc
- src/core/lib/gpr/wrap_memcpy.cc
- src/core/lib/gprpp/examine_stack.cc
- src/core/lib/gprpp/fork.cc
- src/core/lib/gprpp/global_config_env.cc
- src/core/lib/gprpp/host_port.cc
- src/core/lib/gprpp/mpscq.cc
- src/core/lib/gprpp/stat_posix.cc
- src/core/lib/gprpp/stat_windows.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/thd_posix.cc
- src/core/lib/gprpp/thd_windows.cc
- src/core/lib/gprpp/time_util.cc
- src/core/lib/profiling/basic_timers.cc
- src/core/lib/profiling/stap_timers.cc
- src/core/lib/promise/activity.cc
- src/core/lib/resource_quota/arena.cc
- src/core/lib/resource_quota/memory_quota.cc
- src/core/lib/resource_quota/resource_quota.cc
- src/core/lib/resource_quota/thread_quota.cc
- src/core/lib/resource_quota/trace.cc
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_refcount.cc
- src/core/lib/slice/slice_string_helpers.cc
- src/core/lib/slice/static_slice.cc
- test/core/promise/for_each_test.cc
deps:
- absl/base:base
- absl/base:core_headers
- absl/container:flat_hash_set
- absl/memory:memory
- absl/random:random
- absl/status:status
- absl/status:statusor
- absl/strings:cord
- absl/strings:str_format
- absl/strings:strings
- absl/synchronization:synchronization
- absl/time:time
- absl/types:optional
- absl/types:variant
- gpr
- upb
uses_polling: false
- name: generic_end2end_test
gtest: true
@ -6952,21 +6995,38 @@ targets:
build: test
language: c++
headers:
- src/core/lib/debug/trace.h
- src/core/ext/upb-generated/google/protobuf/any.upb.h
- src/core/ext/upb-generated/google/rpc/status.upb.h
- src/core/lib/gpr/alloc.h
- src/core/lib/gpr/env.h
- src/core/lib/gpr/murmur_hash.h
- src/core/lib/gpr/spinlock.h
- src/core/lib/gpr/string.h
- src/core/lib/gpr/string_windows.h
- src/core/lib/gpr/time_precise.h
- src/core/lib/gpr/tls.h
- src/core/lib/gpr/tmpfile.h
- src/core/lib/gpr/useful.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/combiner.h
- src/core/lib/iomgr/error.h
- src/core/lib/iomgr/error_internal.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/iomgr_internal.h
- src/core/lib/gprpp/construct_destruct.h
- src/core/lib/gprpp/debug_location.h
- src/core/lib/gprpp/examine_stack.h
- src/core/lib/gprpp/fork.h
- src/core/lib/gprpp/global_config.h
- src/core/lib/gprpp/global_config_custom.h
- src/core/lib/gprpp/global_config_env.h
- src/core/lib/gprpp/global_config_generic.h
- src/core/lib/gprpp/host_port.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/memory.h
- src/core/lib/gprpp/mpscq.h
- src/core/lib/gprpp/stat.h
- src/core/lib/gprpp/status_helper.h
- src/core/lib/gprpp/sync.h
- src/core/lib/gprpp/thd.h
- src/core/lib/gprpp/time_util.h
- src/core/lib/profiling/timers.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_join.h
@ -6975,52 +7035,77 @@ targets:
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/detail/switch.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/join.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/promise.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/resource_quota/arena.h
- src/core/lib/resource_quota/memory_quota.h
- src/core/lib/resource_quota/resource_quota.h
- src/core/lib/resource_quota/thread_quota.h
- src/core/lib/resource_quota/trace.h
- src/core/lib/slice/slice.h
- src/core/lib/slice/slice_internal.h
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_refcount_base.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/lib/slice/slice_utils.h
- src/core/lib/slice/static_slice.h
- test/core/promise/test_wakeup_schedulers.h
src:
- src/core/lib/debug/trace.cc
- src/core/lib/event_engine/memory_allocator.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/iomgr_internal.cc
- src/core/ext/upb-generated/google/protobuf/any.upb.c
- src/core/ext/upb-generated/google/rpc/status.upb.c
- src/core/lib/gpr/alloc.cc
- src/core/lib/gpr/atm.cc
- src/core/lib/gpr/cpu_iphone.cc
- src/core/lib/gpr/cpu_linux.cc
- src/core/lib/gpr/cpu_posix.cc
- src/core/lib/gpr/cpu_windows.cc
- src/core/lib/gpr/env_linux.cc
- src/core/lib/gpr/env_posix.cc
- src/core/lib/gpr/env_windows.cc
- src/core/lib/gpr/log.cc
- src/core/lib/gpr/log_android.cc
- src/core/lib/gpr/log_linux.cc
- src/core/lib/gpr/log_posix.cc
- src/core/lib/gpr/log_windows.cc
- src/core/lib/gpr/murmur_hash.cc
- src/core/lib/gpr/string.cc
- src/core/lib/gpr/string_posix.cc
- src/core/lib/gpr/string_util_windows.cc
- src/core/lib/gpr/string_windows.cc
- src/core/lib/gpr/sync.cc
- src/core/lib/gpr/sync_abseil.cc
- src/core/lib/gpr/sync_posix.cc
- src/core/lib/gpr/sync_windows.cc
- src/core/lib/gpr/time.cc
- src/core/lib/gpr/time_posix.cc
- src/core/lib/gpr/time_precise.cc
- src/core/lib/gpr/time_windows.cc
- src/core/lib/gpr/tmpfile_msys.cc
- src/core/lib/gpr/tmpfile_posix.cc
- src/core/lib/gpr/tmpfile_windows.cc
- src/core/lib/gpr/wrap_memcpy.cc
- src/core/lib/gprpp/examine_stack.cc
- src/core/lib/gprpp/fork.cc
- src/core/lib/gprpp/global_config_env.cc
- src/core/lib/gprpp/host_port.cc
- src/core/lib/gprpp/mpscq.cc
- src/core/lib/gprpp/stat_posix.cc
- src/core/lib/gprpp/stat_windows.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/thd_posix.cc
- src/core/lib/gprpp/thd_windows.cc
- src/core/lib/gprpp/time_util.cc
- src/core/lib/profiling/basic_timers.cc
- src/core/lib/profiling/stap_timers.cc
- src/core/lib/promise/activity.cc
- src/core/lib/resource_quota/arena.cc
- src/core/lib/resource_quota/memory_quota.cc
- src/core/lib/resource_quota/resource_quota.cc
- src/core/lib/resource_quota/thread_quota.cc
- src/core/lib/resource_quota/trace.cc
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_refcount.cc
- src/core/lib/slice/slice_string_helpers.cc
- src/core/lib/slice/static_slice.cc
- test/core/promise/pipe_test.cc
deps:
- absl/base:base
- absl/base:core_headers
- absl/memory:memory
- absl/random:random
- absl/status:status
- absl/status:statusor
- absl/strings:cord
- absl/strings:str_format
- absl/strings:strings
- absl/synchronization:synchronization
- absl/time:time
- absl/types:optional
- absl/types:variant
- gpr
- upb
uses_polling: false
- name: poll_test
gtest: true

@ -24,7 +24,6 @@
#include <atomic>
#include <functional>
#include <memory>
#include <type_traits>
#include <utility>
#include "absl/base/thread_annotations.h"
@ -285,17 +284,6 @@ class ContextHolder<Context*> {
Context* value_;
};
template <typename Context, typename Deleter>
class ContextHolder<std::unique_ptr<Context, Deleter>> {
public:
explicit ContextHolder(std::unique_ptr<Context, Deleter> value)
: value_(std::move(value)) {}
Context* GetContext() { return value_.get(); }
private:
std::unique_ptr<Context, Deleter> value_;
};
template <typename... Contexts>
class EnterContexts : public promise_detail::Context<Contexts>... {
public:
@ -428,9 +416,8 @@ class PromiseActivity final
// to keep the scoping rules a little easier in Step().
absl::optional<absl::Status> RunStep() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
ScopedActivity scoped_activity(this);
EnterContexts<typename std::remove_reference<decltype(
*static_cast<ContextHolder<Contexts>*>(this)->GetContext())>::type...>
contexts(static_cast<ContextHolder<Contexts>*>(this)->GetContext()...);
EnterContexts<Contexts...> contexts(
static_cast<ContextHolder<Contexts>*>(this)->GetContext()...);
return StepLoop();
}
@ -439,9 +426,8 @@ class PromiseActivity final
absl::optional<absl::Status> Start(Factory promise_factory)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
ScopedActivity scoped_activity(this);
EnterContexts<typename std::remove_reference<decltype(
*static_cast<ContextHolder<Contexts>*>(this)->GetContext())>::type...>
contexts(static_cast<ContextHolder<Contexts>*>(this)->GetContext()...);
EnterContexts<Contexts...> contexts(
static_cast<ContextHolder<Contexts>*>(this)->GetContext()...);
Construct(&promise_holder_.promise, promise_factory.Once());
return StepLoop();
}

@ -31,13 +31,10 @@
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/support/log.h>
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/detail/promise_factory.h"
#include "src/core/lib/promise/intra_activity_waiter.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/resource_quota/arena.h"
namespace grpc_core {
@ -55,114 +52,39 @@ class Push;
template <typename T>
class Next;
// Center sits between a sender and a receiver to provide a one-deep buffer of
// Ts
template <typename T>
class Center {
template <class T>
class Promise {
public:
// Initialize with one send ref (held by PipeSender) and one recv ref (held by
// PipeReceiver)
Center() {
send_refs_ = 1;
recv_refs_ = 1;
has_value_ = false;
}
// Add one ref to the send side of this object, and return this.
Center* RefSend() {
send_refs_++;
return this;
}
// Add one ref to the recv side of this object, and return this.
Center* RefRecv() {
recv_refs_++;
return this;
}
// Drop a send side ref
// If no send refs remain, wake due to send closure
// If no refs remain, destroy this object
void UnrefSend() {
GPR_DEBUG_ASSERT(send_refs_ > 0);
send_refs_--;
if (0 == send_refs_) {
on_full_.Wake();
on_empty_.Wake();
if (0 == recv_refs_) {
this->~Center();
}
}
}
// Drop a recv side ref
// If no recv refs remain, wake due to recv closure
// If no refs remain, destroy this object
void UnrefRecv() {
GPR_DEBUG_ASSERT(recv_refs_ > 0);
recv_refs_--;
if (0 == recv_refs_) {
on_full_.Wake();
on_empty_.Wake();
if (0 == send_refs_) {
this->~Center();
} else if (has_value_) {
ResetValue();
}
}
}
virtual Poll<bool> Step(T* output) = 0;
virtual void Stop() = 0;
// Try to push *value into the pipe.
// Return Pending if there is no space.
// Return true if the value was pushed.
// Return false if the recv end is closed.
Poll<bool> Push(T* value) {
GPR_DEBUG_ASSERT(send_refs_ != 0);
if (recv_refs_ == 0) return false;
if (has_value_) return on_empty_.pending();
has_value_ = true;
value_ = std::move(*value);
on_full_.Wake();
return true;
}
protected:
inline virtual ~Promise() = default;
};
// Try to receive a value from the pipe.
// Return Pending if there is no value.
// Return the value if one was retrieved.
// Return nullopt if the send end is closed and no value had been pushed.
Poll<absl::optional<T>> Next() {
GPR_DEBUG_ASSERT(recv_refs_ != 0);
if (!has_value_) {
if (send_refs_ == 0) return absl::nullopt;
return on_full_.pending();
}
has_value_ = false;
on_empty_.Wake();
return std::move(value_);
}
struct alignas(alignof(void*)) Scratch {
uint8_t scratch[32];
};
private:
void ResetValue() {
// Fancy dance to move out of value in the off chance that we reclaim some
// memory earlier.
[](T) {}(std::move(value_));
has_value_ = false;
}
T value_;
// Number of sending objects.
// 0 => send is closed.
// 1 ref each for PipeSender and Push.
uint8_t send_refs_ : 2;
// Number of receiving objects.
// 0 => recv is closed.
// 1 ref each for PipeReceiver and Next.
uint8_t recv_refs_ : 2;
// True iff there is a value in the pipe.
bool has_value_ : 1;
IntraActivityWaiter on_empty_;
IntraActivityWaiter on_full_;
template <typename T>
class FilterInterface {
public:
FilterInterface() = default;
FilterInterface(const FilterInterface&) = delete;
FilterInterface& operator=(const FilterInterface&) = delete;
virtual Promise<T>* Step(T* p, Scratch* scratch_space) = 0;
virtual void UpdateReceiver(PipeReceiver<T>* receiver) = 0;
protected:
inline virtual ~FilterInterface() {}
static void SetReceiverIndex(PipeReceiver<T>* receiver, int idx,
FilterInterface* p);
char AllocIndex(PipeReceiver<T>* receiver);
};
template <typename T, typename F>
class Filter;
} // namespace pipe_detail
// Send end of a Pipe.
@ -172,18 +94,43 @@ class PipeSender {
PipeSender(const PipeSender&) = delete;
PipeSender& operator=(const PipeSender&) = delete;
PipeSender(PipeSender&& other) noexcept : center_(other.center_) {
other.center_ = nullptr;
PipeSender(PipeSender&& other) noexcept
: receiver_(other.receiver_), push_(other.push_) {
if (receiver_ != nullptr) {
receiver_->sender_ = this;
other.receiver_ = nullptr;
}
if (push_ != nullptr) {
push_->sender_ = this;
other.push_ = nullptr;
}
}
PipeSender& operator=(PipeSender&& other) noexcept {
if (center_ != nullptr) center_->UnrefSend();
center_ = other.center_;
other.center_ = nullptr;
if (receiver_ != nullptr) {
receiver_->sender_ = nullptr;
}
if (push_ != nullptr) {
push_->sender_ = nullptr;
}
receiver_ = other.receiver_;
if (receiver_ != nullptr) {
receiver_->sender_ = this;
other.receiver_ = nullptr;
}
if (push_ != nullptr) {
push_->sender_ = this;
other.push_ = nullptr;
}
return *this;
}
~PipeSender() {
if (center_ != nullptr) center_->UnrefSend();
if (receiver_ != nullptr) {
receiver_->MarkClosed();
}
if (push_ != nullptr) {
push_->sender_ = nullptr;
}
}
// Send a single message along the pipe.
@ -192,10 +139,21 @@ class PipeSender {
// receiver is either closed or able to receive another message.
pipe_detail::Push<T> Push(T value);
// Attach a promise factory based filter to this pipe.
// The overall promise returned from this will be active until the pipe is
// closed. If this promise is cancelled before the pipe is closed, the pipe
// will close. The filter will be run _after_ any other registered filters.
template <typename F>
pipe_detail::Filter<T, F> Filter(F f);
private:
friend struct Pipe<T>;
explicit PipeSender(pipe_detail::Center<T>* center) : center_(center) {}
pipe_detail::Center<T>* center_;
friend class PipeReceiver<T>;
friend class pipe_detail::Next<T>;
friend class pipe_detail::Push<T>;
explicit PipeSender(PipeReceiver<T>* receiver) : receiver_(receiver) {}
PipeReceiver<T>* receiver_;
pipe_detail::Push<T>* push_ = nullptr;
};
// Receive end of a Pipe.
@ -205,17 +163,56 @@ class PipeReceiver {
PipeReceiver(const PipeReceiver&) = delete;
PipeReceiver& operator=(const PipeReceiver&) = delete;
PipeReceiver(PipeReceiver&& other) noexcept : center_(other.center_) {
other.center_ = nullptr;
PipeReceiver(PipeReceiver&& other) noexcept
: sender_(other.sender_),
next_(other.next_),
filters_(std::move(other.filters_)),
pending_(std::move(other.pending_)),
waiting_to_send_(std::move(other.waiting_to_send_)),
waiting_to_receive_(other.waiting_to_receive_) {
if (sender_ != nullptr) {
sender_->receiver_ = this;
other.sender_ = nullptr;
}
if (next_ != nullptr) {
next_->receiver_ = this;
other.next_ = nullptr;
}
for (auto filter : filters_) {
filter->UpdateReceiver(this);
}
}
PipeReceiver& operator=(PipeReceiver&& other) noexcept {
if (center_ != nullptr) center_->UnrefRecv();
center_ = other.center_;
other.center_ = nullptr;
if (sender_ != nullptr) {
sender_->receiver_ = nullptr;
}
if (next_ != nullptr) {
next_->receiver_ = nullptr;
}
sender_ = other.sender_;
next_ = other.next_;
filters_ = std::move(other.filters_);
for (auto filter : filters_) {
filter->UpdateReceiver(this);
}
pending_ = std::move(other.pending_);
waiting_to_send_ = std::move(other.waiting_to_send_);
waiting_to_receive_ = std::move(other.waiting_to_receive_);
if (sender_ != nullptr) {
sender_->receiver_ = this;
other.sender_ = nullptr;
}
if (next_ != nullptr) {
next_->receiver_ = this;
other.next_ = nullptr;
}
return *this;
}
~PipeReceiver() {
if (center_ != nullptr) center_->UnrefRecv();
MarkClosed();
if (next_ != nullptr) {
next_->receiver_ = nullptr;
}
}
// Receive a single message from the pipe.
@ -225,10 +222,44 @@ class PipeReceiver {
// available.
pipe_detail::Next<T> Next();
// Attach a promise factory based filter to this pipe.
// The overall promise returned from this will be active until the pipe is
// closed. If this promise is cancelled before the pipe is closed, the pipe
// will close. The filter will be run _after_ any other registered filters.
template <typename F>
pipe_detail::Filter<T, F> Filter(F f);
private:
friend struct Pipe<T>;
explicit PipeReceiver(pipe_detail::Center<T>* center) : center_(center) {}
pipe_detail::Center<T>* center_;
friend class PipeSender<T>;
friend class pipe_detail::Next<T>;
friend class pipe_detail::Push<T>;
friend class pipe_detail::FilterInterface<T>;
explicit PipeReceiver(PipeSender<T>* sender) : sender_(sender) {}
PipeSender<T>* sender_;
pipe_detail::Next<T>* next_ = nullptr;
absl::InlinedVector<pipe_detail::FilterInterface<T>*, 12> filters_;
absl::optional<T> pending_;
IntraActivityWaiter waiting_to_send_;
IntraActivityWaiter waiting_to_receive_;
void MarkClosed() {
if (sender_ == nullptr) {
return;
}
sender_->receiver_ = nullptr;
waiting_to_receive_.Wake();
waiting_to_send_.Wake();
sender_ = nullptr;
for (auto* filter : filters_) {
if (filter != nullptr) {
filter->UpdateReceiver(nullptr);
}
}
}
};
namespace pipe_detail {
@ -240,28 +271,55 @@ class Push {
Push(const Push&) = delete;
Push& operator=(const Push&) = delete;
Push(Push&& other) noexcept
: center_(other.center_), push_(std::move(other.push_)) {
other.center_ = nullptr;
: sender_(other.sender_), push_(std::move(other.push_)) {
if (sender_ != nullptr) {
sender_->push_ = this;
other.sender_ = nullptr;
}
}
Push& operator=(Push&& other) noexcept {
if (center_ != nullptr) center_->UnrefSend();
center_ = other.center_;
other.center_ = nullptr;
if (sender_ != nullptr) {
sender_->push_ = nullptr;
}
sender_ = other.sender_;
push_ = std::move(other.push_);
if (sender_ != nullptr) {
sender_->push_ = this;
other.sender_ = nullptr;
}
return *this;
}
~Push() {
if (center_ != nullptr) center_->UnrefSend();
if (sender_ != nullptr) {
assert(sender_->push_ == this);
sender_->push_ = nullptr;
}
}
Poll<bool> operator()() { return center_->Push(&push_); }
Poll<bool> operator()() {
auto* receiver = sender_->receiver_;
if (receiver == nullptr) {
return false;
}
if (receiver->pending_.has_value()) {
return receiver->waiting_to_send_.pending();
}
receiver->pending_ = std::move(push_);
receiver->waiting_to_receive_.Wake();
sender_->push_ = nullptr;
sender_ = nullptr;
return true;
}
private:
friend class PipeSender<T>;
explicit Push(pipe_detail::Center<T>* center, T push)
: center_(center), push_(std::move(push)) {}
Center<T>* center_;
Push(PipeSender<T>* sender, T push)
: sender_(sender), push_(std::move(push)) {
assert(sender_->push_ == nullptr);
sender_->push_ = this;
}
PipeSender<T>* sender_;
T push_;
};
@ -271,56 +329,262 @@ class Next {
public:
Next(const Next&) = delete;
Next& operator=(const Next&) = delete;
Next(Next&& other) noexcept : center_(other.center_) {
other.center_ = nullptr;
Next(Next&& other) noexcept
: receiver_(other.receiver_),
next_filter_(other.next_filter_),
current_promise_(nullptr) {
assert(other.current_promise_ == nullptr);
if (receiver_ != nullptr) {
receiver_->next_ = this;
other.receiver_ = nullptr;
}
}
Next& operator=(Next&& other) noexcept {
if (center_ != nullptr) center_->UnrefRecv();
center_ = other.center_;
other.center_ = nullptr;
assert(current_promise_ == nullptr);
assert(other.current_promise_ == nullptr);
if (receiver_ != nullptr) {
receiver_->next_ = nullptr;
}
receiver_ = other.receiver_;
next_filter_ = other.next_filter_;
if (receiver_ != nullptr) {
receiver_->next_ = this;
other.receiver_ = nullptr;
}
return *this;
}
~Next() {
if (center_ != nullptr) center_->UnrefRecv();
if (receiver_ != nullptr) {
assert(receiver_->next_ == this);
receiver_->next_ = nullptr;
}
if (current_promise_ != nullptr) {
current_promise_->Stop();
}
}
Poll<absl::optional<T>> operator()() { return center_->Next(); }
Poll<absl::optional<T>> operator()() {
if (receiver_->pending_.has_value()) {
auto* pending = &*receiver_->pending_;
if (current_promise_ != nullptr) {
auto r = current_promise_->Step(pending);
if (auto* p = absl::get_if<kPollReadyIdx>(&r)) {
current_promise_->Stop();
current_promise_ = nullptr;
if (!*p) {
receiver_->MarkClosed();
return absl::optional<T>();
}
} else {
return Pending();
}
}
while (true) {
if (next_filter_ >= receiver_->filters_.size()) {
auto result = absl::optional<T>(std::move(*pending));
receiver_->pending_.reset();
receiver_->waiting_to_send_.Wake();
receiver_->next_ = nullptr;
receiver_ = nullptr;
return result;
}
auto* filter = receiver_->filters_[next_filter_];
current_promise_ = filter ? filter->Step(pending, &scratch_) : nullptr;
next_filter_++;
if (current_promise_ ==
reinterpret_cast<Promise<T>*>(uintptr_t(false))) {
current_promise_ = nullptr;
receiver_->MarkClosed();
return absl::optional<T>();
} else if (current_promise_ ==
reinterpret_cast<Promise<T>*>(uintptr_t(true))) {
current_promise_ = nullptr;
} else {
return Pending();
}
}
}
if (receiver_->sender_ == nullptr) {
return absl::optional<T>();
}
return receiver_->waiting_to_receive_.pending();
}
private:
friend class PipeReceiver<T>;
explicit Next(pipe_detail::Center<T>* center) : center_(center) {}
Center<T>* center_;
explicit Next(PipeReceiver<T>* receiver) : receiver_(receiver) {
assert(receiver_->next_ == nullptr);
receiver_->next_ = this;
}
PipeReceiver<T>* receiver_;
size_t next_filter_ = 0;
Promise<T>* current_promise_ = nullptr;
Scratch scratch_;
};
template <typename T, typename F>
class Filter final : private FilterInterface<T> {
public:
Filter(PipeReceiver<T>* receiver, F f)
: active_{receiver, promise_detail::PromiseFactory<T, F>(std::move(f))},
index_(this->AllocIndex(receiver)){};
explicit Filter(absl::Status already_finished)
: done_(std::move(already_finished)) {}
~Filter() {
if (index_ != kTombstoneIndex) {
this->SetReceiverIndex(active_.receiver, index_, nullptr);
active_.~Active();
} else {
done_.~Status();
}
}
Filter(Filter&& other) noexcept : index_(other.index_) {
if (index_ != kTombstoneIndex) {
new (&active_) Active(std::move(other.active_));
other.active_.~Active();
new (&other.done_) absl::Status(absl::OkStatus());
other.index_ = kTombstoneIndex;
this->SetReceiverIndex(active_.receiver, index_, this);
} else {
new (&done_) absl::Status(std::move(other.done_));
}
}
Filter(const Filter&) = delete;
Filter& operator=(const Filter&) = delete;
Poll<absl::Status> operator()() {
if (index_ == kTombstoneIndex) {
return std::move(done_);
}
return Pending();
}
private:
static constexpr char kTombstoneIndex = -1;
struct Active {
GPR_NO_UNIQUE_ADDRESS PipeReceiver<T>* receiver;
GPR_NO_UNIQUE_ADDRESS promise_detail::PromiseFactory<T, F> factory;
};
union {
GPR_NO_UNIQUE_ADDRESS Active active_;
GPR_NO_UNIQUE_ADDRESS absl::Status done_;
};
GPR_NO_UNIQUE_ADDRESS char index_;
class PromiseImpl final : public ::grpc_core::pipe_detail::Promise<T> {
using PF = typename promise_detail::PromiseFactory<T, F>::Promise;
public:
PromiseImpl(PF f, Filter* filter) : f_(std::move(f)), filter_(filter) {}
Poll<bool> Step(T* output) final {
auto r = f_();
if (auto* p = absl::get_if<kPollReadyIdx>(&r)) {
if (p->ok()) {
*output = std::move(**p);
return true;
} else {
filter_->SetReceiverIndex(filter_->active_.receiver, filter_->index_,
nullptr);
filter_->active_.~Active();
filter_->index_ = kTombstoneIndex;
new (&filter_->done_) absl::Status(std::move(p->status()));
Activity::WakeupCurrent();
return false;
}
} else {
return Pending();
}
}
void Stop() final { this->~PromiseImpl(); }
private:
PF f_;
Filter* filter_;
};
Promise<T>* Step(T* p, Scratch* scratch) final {
if (index_ != kTombstoneIndex) {
PromiseImpl promise(active_.factory.Repeated(std::move(*p)), this);
auto r = promise.Step(p);
if (auto* result = absl::get_if<kPollReadyIdx>(&r)) {
return reinterpret_cast<Promise<T>*>(uintptr_t(*result));
}
static_assert(sizeof(promise) <= sizeof(Scratch),
"scratch size too small");
static_assert(alignof(decltype(promise)) <= alignof(Scratch),
"bad alignment");
return new (scratch) decltype(promise)(std::move(promise));
} else {
return nullptr;
}
}
void UpdateReceiver(PipeReceiver<T>* receiver) final {
if (index_ != kTombstoneIndex) {
if (receiver == nullptr) {
active_.~Active();
index_ = kTombstoneIndex;
new (&done_) absl::Status(absl::OkStatus());
} else {
active_.receiver = receiver;
}
Activity::WakeupCurrent();
}
}
};
template <typename T>
void FilterInterface<T>::SetReceiverIndex(PipeReceiver<T>* receiver, int idx,
FilterInterface* p) {
receiver->filters_[idx] = p;
}
template <typename T>
char FilterInterface<T>::AllocIndex(PipeReceiver<T>* receiver) {
auto r = receiver->filters_.size();
receiver->filters_.push_back(this);
return r;
}
} // namespace pipe_detail
template <typename T>
pipe_detail::Push<T> PipeSender<T>::Push(T value) {
return pipe_detail::Push<T>(center_->RefSend(), std::move(value));
return pipe_detail::Push<T>(this, std::move(value));
}
template <typename T>
pipe_detail::Next<T> PipeReceiver<T>::Next() {
return pipe_detail::Next<T>(center_->RefRecv());
return pipe_detail::Next<T>(this);
}
template <typename T>
template <typename F>
pipe_detail::Filter<T, F> PipeSender<T>::Filter(F f) {
if (receiver_) {
return pipe_detail::Filter<T, F>(receiver_, std::move(f));
} else {
return pipe_detail::Filter<T, F>(absl::OkStatus());
}
}
template <typename T>
template <typename F>
pipe_detail::Filter<T, F> PipeReceiver<T>::Filter(F f) {
return pipe_detail::Filter<T, F>(this, std::move(f));
}
// A Pipe is an intra-Activity communications channel that transmits T's from
// one end to the other.
// It is only safe to use a Pipe within the context of a single Activity.
// No synchronization is performed internally.
// The primary Pipe data structure is allocated from an arena, so the activity
// must have an arena as part of its context.
// By performing that allocation we can ensure stable pointer to shared data
// allowing PipeSender/PipeReceiver/Push/Next to be relatively simple in their
// implementation.
// This type has been optimized with the expectation that there are relatively
// few pipes per activity. If this assumption does not hold then a design
// allowing inline filtering of pipe contents (instead of connecting pipes with
// polling code) would likely be more appropriate.
template <typename T>
struct Pipe {
Pipe() : Pipe(GetContext<Arena>()->New<pipe_detail::Center<T>>()) {}
Pipe() : sender(&receiver), receiver(&sender) {}
Pipe(const Pipe&) = delete;
Pipe& operator=(const Pipe&) = delete;
Pipe(Pipe&&) noexcept = default;
@ -328,10 +592,6 @@ struct Pipe {
PipeSender<T> sender;
PipeReceiver<T> receiver;
private:
explicit Pipe(pipe_detail::Center<T>* center)
: sender(center), receiver(center) {}
};
} // namespace grpc_core

@ -260,7 +260,6 @@ grpc_cc_test(
"//:map",
"//:observable",
"//:pipe",
"//:resource_quota",
"//:seq",
"//test/core/util:grpc_suppressions",
],
@ -277,7 +276,6 @@ grpc_cc_test(
"//:join",
"//:pipe",
"//:promise",
"//:resource_quota",
"//:seq",
"//test/core/util:grpc_suppressions",
],

@ -22,7 +22,6 @@
#include "src/core/lib/promise/observable.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "test/core/promise/test_wakeup_schedulers.h"
using testing::Mock;
@ -31,25 +30,22 @@ using testing::StrictMock;
namespace grpc_core {
static auto* g_memory_allocator = new MemoryAllocator(
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"));
TEST(ForEachTest, SendThriceWithPipe) {
Pipe<int> pipe;
int num_received = 0;
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[&num_received] {
Pipe<int> pipe;
auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
absl::make_unique<PipeSender<int>>(std::move(pipe.sender)));
[&pipe, &num_received] {
return Map(
Join(
// Push 3 things into a pipe -- 1, 2, then 3 -- then close.
Seq((*sender)->Push(1), [sender] { return (*sender)->Push(2); },
[sender] { return (*sender)->Push(3); },
[sender] {
sender->reset();
Seq(
pipe.sender.Push(1),
[&pipe] { return pipe.sender.Push(2); },
[&pipe] { return pipe.sender.Push(3); },
[&pipe] {
auto drop = std::move(pipe.sender);
return absl::OkStatus();
}),
// Use a ForEach loop to read them out and verify all values are
@ -63,8 +59,7 @@ TEST(ForEachTest, SendThriceWithPipe) {
JustElem<1>());
},
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); },
MakeScopedArena(1024, g_memory_allocator));
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
Mock::VerifyAndClearExpectations(&on_done);
EXPECT_EQ(num_received, 3);
}

@ -22,7 +22,6 @@
#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "test/core/promise/test_wakeup_schedulers.h"
using testing::MockFunction;
@ -30,15 +29,12 @@ using testing::StrictMock;
namespace grpc_core {
static auto* g_memory_allocator = new MemoryAllocator(
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"));
TEST(PipeTest, CanSendAndReceive) {
Pipe<int> pipe;
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[] {
Pipe<int> pipe;
[&pipe] {
return Seq(
// Concurrently: send 42 into the pipe, and receive from the pipe.
Join(pipe.sender.Push(42), pipe.receiver.Next()),
@ -50,16 +46,15 @@ TEST(PipeTest, CanSendAndReceive) {
});
},
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); },
MakeScopedArena(1024, g_memory_allocator));
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
TEST(PipeTest, CanReceiveAndSend) {
Pipe<int> pipe;
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[] {
Pipe<int> pipe;
[&pipe] {
return Seq(
// Concurrently: receive from the pipe, and send 42 into the pipe.
Join(pipe.receiver.Next(), pipe.sender.Push(42)),
@ -71,29 +66,28 @@ TEST(PipeTest, CanReceiveAndSend) {
});
},
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); },
MakeScopedArena(1024, g_memory_allocator));
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
TEST(PipeTest, CanSeeClosedOnSend) {
Pipe<int> pipe;
StrictMock<MockFunction<void(absl::Status)>> on_done;
auto sender = std::move(pipe.sender);
auto receiver =
absl::make_unique<PipeReceiver<int>>(std::move(pipe.receiver));
EXPECT_CALL(on_done, Call(absl::OkStatus()));
// Push 42 onto the pipe - this will the pipe's one-deep send buffer.
EXPECT_TRUE(NowOrNever(sender.Push(42)).has_value());
MakeActivity(
[] {
Pipe<int> pipe;
auto sender = std::move(pipe.sender);
// Push 42 onto the pipe - this will the pipe's one-deep send buffer.
EXPECT_TRUE(NowOrNever(sender.Push(42)).has_value());
auto receiver = std::make_shared<std::unique_ptr<PipeReceiver<int>>>(
absl::make_unique<PipeReceiver<int>>(std::move(pipe.receiver)));
[&sender, &receiver] {
return Seq(
// Concurrently:
// - push 43 into the sender, which will stall because the buffer is
// full
// - and close the receiver, which will fail the pending send.
Join(sender.Push(43),
[receiver] {
receiver->reset();
[&receiver] {
receiver.reset();
return absl::OkStatus();
}),
// Verify both that the send failed and that we executed the close.
@ -103,19 +97,17 @@ TEST(PipeTest, CanSeeClosedOnSend) {
});
},
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); },
MakeScopedArena(1024, g_memory_allocator));
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
TEST(PipeTest, CanSeeClosedOnReceive) {
Pipe<int> pipe;
StrictMock<MockFunction<void(absl::Status)>> on_done;
auto sender = absl::make_unique<PipeSender<int>>(std::move(pipe.sender));
auto receiver = std::move(pipe.receiver);
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[] {
Pipe<int> pipe;
auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
absl::make_unique<PipeSender<int>>(std::move(pipe.sender)));
auto receiver = std::move(pipe.receiver);
[&sender, &receiver] {
return Seq(
// Concurrently:
// - wait for a received value (will stall forever since we push
@ -123,8 +115,8 @@ TEST(PipeTest, CanSeeClosedOnReceive) {
// - close the sender, which will signal the receiver to return an
// end-of-stream.
Join(receiver.Next(),
[sender] {
sender->reset();
[&sender] {
sender.reset();
return absl::OkStatus();
}),
// Verify we received end-of-stream and closed the sender.
@ -135,8 +127,49 @@ TEST(PipeTest, CanSeeClosedOnReceive) {
});
},
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); },
MakeScopedArena(1024, g_memory_allocator));
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
TEST(PipeTest, CanFilter) {
Pipe<int> pipe;
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[&pipe] {
// Setup some filters here, carefully getting ordering correct by doing
// so outside of the Join() since C++ does not define execution order
// between arguments.
// TODO(ctiller): A future change to Pipe will specify an ordering
// between filters added to sender and receiver, at which point these
// should move back.
auto doubler = pipe.receiver.Filter(
[](int p) { return absl::StatusOr<int>(p * 2); });
auto adder = pipe.sender.Filter(
[](int p) { return absl::StatusOr<int>(p + 1); });
return Seq(
// Concurrently:
// - push 42 into the pipe
// - wait for a value to be received, and filter it by doubling it
// - wait for a value to be received, and filter it by adding one to
// it
// - wait for a value to be received and close the pipe.
Join(pipe.sender.Push(42), std::move(doubler), std::move(adder),
Seq(pipe.receiver.Next(),
[&pipe](absl::optional<int> i) {
auto x = std::move(pipe.receiver);
return i;
})),
// Verify all of the above happened correctly.
[](std::tuple<bool, absl::Status, absl::Status, absl::optional<int>>
result) {
EXPECT_EQ(result, std::make_tuple(true, absl::OkStatus(),
absl::OkStatus(),
absl::optional<int>(85)));
return absl::OkStatus();
});
},
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
} // namespace grpc_core

Loading…
Cancel
Save