Revert "Revert "Promise pipe redux (#28319)" (#28364)" (#28400)

* Revert "Revert "Promise pipe redux (#28319)" (#28364)"

This reverts commit 3df2a4ecea.

* rephrase to be hopefully more readable to msvc

* golfing
pull/28484/head
Craig Tiller 3 years ago committed by GitHub
parent e520fc137c
commit 0bd1ab364b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 150
      CMakeLists.txt
  3. 267
      build_autogenerated.yaml
  4. 69
      src/core/lib/promise/activity.h
  5. 572
      src/core/lib/promise/pipe.h
  6. 2
      test/core/promise/BUILD
  7. 23
      test/core/promise/for_each_test.cc
  8. 97
      test/core/promise/pipe_test.cc

@ -1372,6 +1372,7 @@ grpc_cc_library(
],
deps = [
"activity",
"arena",
"gpr_platform",
"intra_activity_waiter",
],

150
CMakeLists.txt generated

@ -10678,53 +10678,23 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(for_each_test
src/core/ext/upb-generated/google/protobuf/any.upb.c
src/core/ext/upb-generated/google/rpc/status.upb.c
src/core/lib/gpr/alloc.cc
src/core/lib/gpr/atm.cc
src/core/lib/gpr/cpu_iphone.cc
src/core/lib/gpr/cpu_linux.cc
src/core/lib/gpr/cpu_posix.cc
src/core/lib/gpr/cpu_windows.cc
src/core/lib/gpr/env_linux.cc
src/core/lib/gpr/env_posix.cc
src/core/lib/gpr/env_windows.cc
src/core/lib/gpr/log.cc
src/core/lib/gpr/log_android.cc
src/core/lib/gpr/log_linux.cc
src/core/lib/gpr/log_posix.cc
src/core/lib/gpr/log_windows.cc
src/core/lib/gpr/murmur_hash.cc
src/core/lib/gpr/string.cc
src/core/lib/gpr/string_posix.cc
src/core/lib/gpr/string_util_windows.cc
src/core/lib/gpr/string_windows.cc
src/core/lib/gpr/sync.cc
src/core/lib/gpr/sync_abseil.cc
src/core/lib/gpr/sync_posix.cc
src/core/lib/gpr/sync_windows.cc
src/core/lib/gpr/time.cc
src/core/lib/gpr/time_posix.cc
src/core/lib/gpr/time_precise.cc
src/core/lib/gpr/time_windows.cc
src/core/lib/gpr/tmpfile_msys.cc
src/core/lib/gpr/tmpfile_posix.cc
src/core/lib/gpr/tmpfile_windows.cc
src/core/lib/gpr/wrap_memcpy.cc
src/core/lib/gprpp/examine_stack.cc
src/core/lib/gprpp/fork.cc
src/core/lib/gprpp/global_config_env.cc
src/core/lib/gprpp/host_port.cc
src/core/lib/gprpp/mpscq.cc
src/core/lib/gprpp/stat_posix.cc
src/core/lib/gprpp/stat_windows.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/thd_posix.cc
src/core/lib/gprpp/thd_windows.cc
src/core/lib/gprpp/time_util.cc
src/core/lib/profiling/basic_timers.cc
src/core/lib/profiling/stap_timers.cc
src/core/lib/debug/trace.cc
src/core/lib/event_engine/memory_allocator.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/iomgr_internal.cc
src/core/lib/promise/activity.cc
src/core/lib/resource_quota/arena.cc
src/core/lib/resource_quota/memory_quota.cc
src/core/lib/resource_quota/resource_quota.cc
src/core/lib/resource_quota/thread_quota.cc
src/core/lib/resource_quota/trace.cc
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_refcount.cc
src/core/lib/slice/slice_string_helpers.cc
src/core/lib/slice/static_slice.cc
test/core/promise/for_each_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@ -10752,21 +10722,10 @@ target_include_directories(for_each_test
target_link_libraries(for_each_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::base
absl::core_headers
absl::flat_hash_set
absl::memory
absl::random_random
absl::status
absl::statusor
absl::cord
absl::str_format
absl::strings
absl::synchronization
absl::time
absl::optional
absl::variant
upb
gpr
)
@ -13349,53 +13308,23 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(pipe_test
src/core/ext/upb-generated/google/protobuf/any.upb.c
src/core/ext/upb-generated/google/rpc/status.upb.c
src/core/lib/gpr/alloc.cc
src/core/lib/gpr/atm.cc
src/core/lib/gpr/cpu_iphone.cc
src/core/lib/gpr/cpu_linux.cc
src/core/lib/gpr/cpu_posix.cc
src/core/lib/gpr/cpu_windows.cc
src/core/lib/gpr/env_linux.cc
src/core/lib/gpr/env_posix.cc
src/core/lib/gpr/env_windows.cc
src/core/lib/gpr/log.cc
src/core/lib/gpr/log_android.cc
src/core/lib/gpr/log_linux.cc
src/core/lib/gpr/log_posix.cc
src/core/lib/gpr/log_windows.cc
src/core/lib/gpr/murmur_hash.cc
src/core/lib/gpr/string.cc
src/core/lib/gpr/string_posix.cc
src/core/lib/gpr/string_util_windows.cc
src/core/lib/gpr/string_windows.cc
src/core/lib/gpr/sync.cc
src/core/lib/gpr/sync_abseil.cc
src/core/lib/gpr/sync_posix.cc
src/core/lib/gpr/sync_windows.cc
src/core/lib/gpr/time.cc
src/core/lib/gpr/time_posix.cc
src/core/lib/gpr/time_precise.cc
src/core/lib/gpr/time_windows.cc
src/core/lib/gpr/tmpfile_msys.cc
src/core/lib/gpr/tmpfile_posix.cc
src/core/lib/gpr/tmpfile_windows.cc
src/core/lib/gpr/wrap_memcpy.cc
src/core/lib/gprpp/examine_stack.cc
src/core/lib/gprpp/fork.cc
src/core/lib/gprpp/global_config_env.cc
src/core/lib/gprpp/host_port.cc
src/core/lib/gprpp/mpscq.cc
src/core/lib/gprpp/stat_posix.cc
src/core/lib/gprpp/stat_windows.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/thd_posix.cc
src/core/lib/gprpp/thd_windows.cc
src/core/lib/gprpp/time_util.cc
src/core/lib/profiling/basic_timers.cc
src/core/lib/profiling/stap_timers.cc
src/core/lib/debug/trace.cc
src/core/lib/event_engine/memory_allocator.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/iomgr_internal.cc
src/core/lib/promise/activity.cc
src/core/lib/resource_quota/arena.cc
src/core/lib/resource_quota/memory_quota.cc
src/core/lib/resource_quota/resource_quota.cc
src/core/lib/resource_quota/thread_quota.cc
src/core/lib/resource_quota/trace.cc
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_refcount.cc
src/core/lib/slice/slice_string_helpers.cc
src/core/lib/slice/static_slice.cc
test/core/promise/pipe_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@ -13423,20 +13352,9 @@ target_include_directories(pipe_test
target_link_libraries(pipe_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::base
absl::core_headers
absl::memory
absl::random_random
absl::status
absl::statusor
absl::cord
absl::str_format
absl::strings
absl::synchronization
absl::time
absl::optional
absl::variant
upb
gpr
)

@ -5839,38 +5839,21 @@ targets:
build: test
language: c++
headers:
- src/core/ext/upb-generated/google/protobuf/any.upb.h
- src/core/ext/upb-generated/google/rpc/status.upb.h
- src/core/lib/gpr/alloc.h
- src/core/lib/gpr/env.h
- src/core/lib/gpr/murmur_hash.h
- src/core/lib/gpr/spinlock.h
- src/core/lib/gpr/string.h
- src/core/lib/gpr/string_windows.h
- src/core/lib/gpr/time_precise.h
- src/core/lib/gpr/tls.h
- src/core/lib/gpr/tmpfile.h
- src/core/lib/gpr/useful.h
- src/core/lib/debug/trace.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/construct_destruct.h
- src/core/lib/gprpp/debug_location.h
- src/core/lib/gprpp/examine_stack.h
- src/core/lib/gprpp/fork.h
- src/core/lib/gprpp/global_config.h
- src/core/lib/gprpp/global_config_custom.h
- src/core/lib/gprpp/global_config_env.h
- src/core/lib/gprpp/global_config_generic.h
- src/core/lib/gprpp/host_port.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/memory.h
- src/core/lib/gprpp/mpscq.h
- src/core/lib/gprpp/stat.h
- src/core/lib/gprpp/status_helper.h
- src/core/lib/gprpp/sync.h
- src/core/lib/gprpp/thd.h
- src/core/lib/gprpp/time_util.h
- src/core/lib/profiling/timers.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/combiner.h
- src/core/lib/iomgr/error.h
- src/core/lib/iomgr/error_internal.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/iomgr_internal.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_join.h
@ -5879,81 +5862,55 @@ targets:
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/detail/switch.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/for_each.h
- src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/join.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/observable.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/promise/wait_set.h
- src/core/lib/resource_quota/arena.h
- src/core/lib/resource_quota/memory_quota.h
- src/core/lib/resource_quota/resource_quota.h
- src/core/lib/resource_quota/thread_quota.h
- src/core/lib/resource_quota/trace.h
- src/core/lib/slice/slice.h
- src/core/lib/slice/slice_internal.h
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_refcount_base.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/lib/slice/slice_utils.h
- src/core/lib/slice/static_slice.h
- test/core/promise/test_wakeup_schedulers.h
src:
- src/core/ext/upb-generated/google/protobuf/any.upb.c
- src/core/ext/upb-generated/google/rpc/status.upb.c
- src/core/lib/gpr/alloc.cc
- src/core/lib/gpr/atm.cc
- src/core/lib/gpr/cpu_iphone.cc
- src/core/lib/gpr/cpu_linux.cc
- src/core/lib/gpr/cpu_posix.cc
- src/core/lib/gpr/cpu_windows.cc
- src/core/lib/gpr/env_linux.cc
- src/core/lib/gpr/env_posix.cc
- src/core/lib/gpr/env_windows.cc
- src/core/lib/gpr/log.cc
- src/core/lib/gpr/log_android.cc
- src/core/lib/gpr/log_linux.cc
- src/core/lib/gpr/log_posix.cc
- src/core/lib/gpr/log_windows.cc
- src/core/lib/gpr/murmur_hash.cc
- src/core/lib/gpr/string.cc
- src/core/lib/gpr/string_posix.cc
- src/core/lib/gpr/string_util_windows.cc
- src/core/lib/gpr/string_windows.cc
- src/core/lib/gpr/sync.cc
- src/core/lib/gpr/sync_abseil.cc
- src/core/lib/gpr/sync_posix.cc
- src/core/lib/gpr/sync_windows.cc
- src/core/lib/gpr/time.cc
- src/core/lib/gpr/time_posix.cc
- src/core/lib/gpr/time_precise.cc
- src/core/lib/gpr/time_windows.cc
- src/core/lib/gpr/tmpfile_msys.cc
- src/core/lib/gpr/tmpfile_posix.cc
- src/core/lib/gpr/tmpfile_windows.cc
- src/core/lib/gpr/wrap_memcpy.cc
- src/core/lib/gprpp/examine_stack.cc
- src/core/lib/gprpp/fork.cc
- src/core/lib/gprpp/global_config_env.cc
- src/core/lib/gprpp/host_port.cc
- src/core/lib/gprpp/mpscq.cc
- src/core/lib/gprpp/stat_posix.cc
- src/core/lib/gprpp/stat_windows.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/thd_posix.cc
- src/core/lib/gprpp/thd_windows.cc
- src/core/lib/gprpp/time_util.cc
- src/core/lib/profiling/basic_timers.cc
- src/core/lib/profiling/stap_timers.cc
- src/core/lib/debug/trace.cc
- src/core/lib/event_engine/memory_allocator.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/iomgr_internal.cc
- src/core/lib/promise/activity.cc
- src/core/lib/resource_quota/arena.cc
- src/core/lib/resource_quota/memory_quota.cc
- src/core/lib/resource_quota/resource_quota.cc
- src/core/lib/resource_quota/thread_quota.cc
- src/core/lib/resource_quota/trace.cc
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_refcount.cc
- src/core/lib/slice/slice_string_helpers.cc
- src/core/lib/slice/static_slice.cc
- test/core/promise/for_each_test.cc
deps:
- absl/base:base
- absl/base:core_headers
- absl/container:flat_hash_set
- absl/memory:memory
- absl/random:random
- absl/status:status
- absl/status:statusor
- absl/strings:cord
- absl/strings:str_format
- absl/strings:strings
- absl/synchronization:synchronization
- absl/time:time
- absl/types:optional
- absl/types:variant
- upb
- gpr
uses_polling: false
- name: generic_end2end_test
gtest: true
@ -6969,38 +6926,21 @@ targets:
build: test
language: c++
headers:
- src/core/ext/upb-generated/google/protobuf/any.upb.h
- src/core/ext/upb-generated/google/rpc/status.upb.h
- src/core/lib/gpr/alloc.h
- src/core/lib/gpr/env.h
- src/core/lib/gpr/murmur_hash.h
- src/core/lib/gpr/spinlock.h
- src/core/lib/gpr/string.h
- src/core/lib/gpr/string_windows.h
- src/core/lib/gpr/time_precise.h
- src/core/lib/gpr/tls.h
- src/core/lib/gpr/tmpfile.h
- src/core/lib/gpr/useful.h
- src/core/lib/debug/trace.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/construct_destruct.h
- src/core/lib/gprpp/debug_location.h
- src/core/lib/gprpp/examine_stack.h
- src/core/lib/gprpp/fork.h
- src/core/lib/gprpp/global_config.h
- src/core/lib/gprpp/global_config_custom.h
- src/core/lib/gprpp/global_config_env.h
- src/core/lib/gprpp/global_config_generic.h
- src/core/lib/gprpp/host_port.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/memory.h
- src/core/lib/gprpp/mpscq.h
- src/core/lib/gprpp/stat.h
- src/core/lib/gprpp/status_helper.h
- src/core/lib/gprpp/sync.h
- src/core/lib/gprpp/thd.h
- src/core/lib/gprpp/time_util.h
- src/core/lib/profiling/timers.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/combiner.h
- src/core/lib/iomgr/error.h
- src/core/lib/iomgr/error_internal.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/iomgr_internal.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_join.h
@ -7009,77 +6949,52 @@ targets:
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/detail/switch.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/join.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/promise.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/resource_quota/arena.h
- src/core/lib/resource_quota/memory_quota.h
- src/core/lib/resource_quota/resource_quota.h
- src/core/lib/resource_quota/thread_quota.h
- src/core/lib/resource_quota/trace.h
- src/core/lib/slice/slice.h
- src/core/lib/slice/slice_internal.h
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_refcount_base.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/lib/slice/slice_utils.h
- src/core/lib/slice/static_slice.h
- test/core/promise/test_wakeup_schedulers.h
src:
- src/core/ext/upb-generated/google/protobuf/any.upb.c
- src/core/ext/upb-generated/google/rpc/status.upb.c
- src/core/lib/gpr/alloc.cc
- src/core/lib/gpr/atm.cc
- src/core/lib/gpr/cpu_iphone.cc
- src/core/lib/gpr/cpu_linux.cc
- src/core/lib/gpr/cpu_posix.cc
- src/core/lib/gpr/cpu_windows.cc
- src/core/lib/gpr/env_linux.cc
- src/core/lib/gpr/env_posix.cc
- src/core/lib/gpr/env_windows.cc
- src/core/lib/gpr/log.cc
- src/core/lib/gpr/log_android.cc
- src/core/lib/gpr/log_linux.cc
- src/core/lib/gpr/log_posix.cc
- src/core/lib/gpr/log_windows.cc
- src/core/lib/gpr/murmur_hash.cc
- src/core/lib/gpr/string.cc
- src/core/lib/gpr/string_posix.cc
- src/core/lib/gpr/string_util_windows.cc
- src/core/lib/gpr/string_windows.cc
- src/core/lib/gpr/sync.cc
- src/core/lib/gpr/sync_abseil.cc
- src/core/lib/gpr/sync_posix.cc
- src/core/lib/gpr/sync_windows.cc
- src/core/lib/gpr/time.cc
- src/core/lib/gpr/time_posix.cc
- src/core/lib/gpr/time_precise.cc
- src/core/lib/gpr/time_windows.cc
- src/core/lib/gpr/tmpfile_msys.cc
- src/core/lib/gpr/tmpfile_posix.cc
- src/core/lib/gpr/tmpfile_windows.cc
- src/core/lib/gpr/wrap_memcpy.cc
- src/core/lib/gprpp/examine_stack.cc
- src/core/lib/gprpp/fork.cc
- src/core/lib/gprpp/global_config_env.cc
- src/core/lib/gprpp/host_port.cc
- src/core/lib/gprpp/mpscq.cc
- src/core/lib/gprpp/stat_posix.cc
- src/core/lib/gprpp/stat_windows.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/thd_posix.cc
- src/core/lib/gprpp/thd_windows.cc
- src/core/lib/gprpp/time_util.cc
- src/core/lib/profiling/basic_timers.cc
- src/core/lib/profiling/stap_timers.cc
- src/core/lib/debug/trace.cc
- src/core/lib/event_engine/memory_allocator.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/iomgr_internal.cc
- src/core/lib/promise/activity.cc
- src/core/lib/resource_quota/arena.cc
- src/core/lib/resource_quota/memory_quota.cc
- src/core/lib/resource_quota/resource_quota.cc
- src/core/lib/resource_quota/thread_quota.cc
- src/core/lib/resource_quota/trace.cc
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_refcount.cc
- src/core/lib/slice/slice_string_helpers.cc
- src/core/lib/slice/static_slice.cc
- test/core/promise/pipe_test.cc
deps:
- absl/base:base
- absl/base:core_headers
- absl/memory:memory
- absl/random:random
- absl/status:status
- absl/status:statusor
- absl/strings:cord
- absl/strings:str_format
- absl/strings:strings
- absl/synchronization:synchronization
- absl/time:time
- absl/types:optional
- absl/types:variant
- upb
- gpr
uses_polling: false
- name: poll_test
gtest: true

@ -24,6 +24,7 @@
#include <atomic>
#include <functional>
#include <memory>
#include <type_traits>
#include <utility>
#include "absl/base/thread_annotations.h"
@ -267,6 +268,8 @@ namespace promise_detail {
template <typename Context>
class ContextHolder {
public:
using ContextType = Context;
explicit ContextHolder(Context value) : value_(std::move(value)) {}
Context* GetContext() { return &value_; }
@ -277,6 +280,8 @@ class ContextHolder {
template <typename Context>
class ContextHolder<Context*> {
public:
using ContextType = Context;
explicit ContextHolder(Context* value) : value_(value) {}
Context* GetContext() { return value_; }
@ -284,11 +289,35 @@ class ContextHolder<Context*> {
Context* value_;
};
template <typename Context, typename Deleter>
class ContextHolder<std::unique_ptr<Context, Deleter>> {
public:
using ContextType = Context;
explicit ContextHolder(std::unique_ptr<Context, Deleter> value)
: value_(std::move(value)) {}
Context* GetContext() { return value_.get(); }
private:
std::unique_ptr<Context, Deleter> value_;
};
template <typename HeldContext>
using ContextTypeFromHeld = typename ContextHolder<HeldContext>::ContextType;
template <typename... Contexts>
class EnterContexts : public promise_detail::Context<Contexts>... {
class ActivityContexts : public ContextHolder<Contexts>... {
public:
explicit EnterContexts(Contexts*... contexts)
: promise_detail::Context<Contexts>(contexts)... {}
explicit ActivityContexts(Contexts&&... contexts)
: ContextHolder<Contexts>(std::forward<Contexts>(contexts))... {}
class ScopedContext : public Context<ContextTypeFromHeld<Contexts>>... {
public:
explicit ScopedContext(ActivityContexts* contexts)
: Context<ContextTypeFromHeld<Contexts>>(
static_cast<ContextHolder<Contexts>*>(contexts)
->GetContext())... {}
};
};
// Implementation details for an Activity of an arbitrary type of promise.
@ -303,15 +332,14 @@ class EnterContexts : public promise_detail::Context<Contexts>... {
// invoked, and that a given activity will not be concurrently scheduled again
// until its RunScheduledWakeup() has been invoked.
template <class F, class WakeupScheduler, class OnDone, typename... Contexts>
class PromiseActivity final
: public Activity,
private promise_detail::ContextHolder<Contexts>... {
class PromiseActivity final : public Activity,
private ActivityContexts<Contexts...> {
public:
using Factory = PromiseFactory<void, F>;
PromiseActivity(F promise_factory, WakeupScheduler wakeup_scheduler,
OnDone on_done, Contexts... contexts)
OnDone on_done, Contexts&&... contexts)
: Activity(),
ContextHolder<Contexts>(std::move(contexts))...,
ActivityContexts<Contexts...>(std::forward<Contexts>(contexts)...),
wakeup_scheduler_(std::move(wakeup_scheduler)),
on_done_(std::move(on_done)) {
// Lock, construct an initial promise from the factory, and step it.
@ -361,6 +389,8 @@ class PromiseActivity final
}
private:
using typename ActivityContexts<Contexts...>::ScopedContext;
// Wakeup this activity. Arrange to poll the activity again at a convenient
// time: this could be inline if it's deemed safe, or it could be by passing
// the activity to an external threadpool to run. If the activity is already
@ -412,28 +442,27 @@ class PromiseActivity final
}
// The main body of a step: set the current activity, and any contexts, and
// then run the main polling loop. Contained in a function by itself in order
// to keep the scoping rules a little easier in Step().
// then run the main polling loop. Contained in a function by itself in
// order to keep the scoping rules a little easier in Step().
absl::optional<absl::Status> RunStep() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
ScopedActivity scoped_activity(this);
EnterContexts<Contexts...> contexts(
static_cast<ContextHolder<Contexts>*>(this)->GetContext()...);
ScopedContext contexts(this);
return StepLoop();
}
// Similarly to RunStep, but additionally construct the promise from a promise
// factory before entering the main loop. Called once from the constructor.
// Similarly to RunStep, but additionally construct the promise from a
// promise factory before entering the main loop. Called once from the
// constructor.
absl::optional<absl::Status> Start(Factory promise_factory)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
ScopedActivity scoped_activity(this);
EnterContexts<Contexts...> contexts(
static_cast<ContextHolder<Contexts>*>(this)->GetContext()...);
ScopedContext contexts(this);
Construct(&promise_holder_.promise, promise_factory.Once());
return StepLoop();
}
// Until there are no wakeups from within and the promise is incomplete: poll
// the promise.
// Until there are no wakeups from within and the promise is incomplete:
// poll the promise.
absl::optional<absl::Status> StepLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
GPR_ASSERT(is_current());
while (true) {
@ -486,12 +515,12 @@ template <typename Factory, typename WakeupScheduler, typename OnDone,
typename... Contexts>
ActivityPtr MakeActivity(Factory promise_factory,
WakeupScheduler wakeup_scheduler, OnDone on_done,
Contexts... contexts) {
Contexts&&... contexts) {
return ActivityPtr(
new promise_detail::PromiseActivity<Factory, WakeupScheduler, OnDone,
Contexts...>(
std::move(promise_factory), std::move(wakeup_scheduler),
std::move(on_done), std::move(contexts)...));
std::move(on_done), std::forward<Contexts>(contexts)...));
}
} // namespace grpc_core

@ -31,10 +31,13 @@
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/support/log.h>
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/detail/promise_factory.h"
#include "src/core/lib/promise/intra_activity_waiter.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/resource_quota/arena.h"
namespace grpc_core {
@ -52,38 +55,113 @@ class Push;
template <typename T>
class Next;
template <class T>
class Promise {
// Center sits between a sender and a receiver to provide a one-deep buffer of
// Ts
template <typename T>
class Center {
public:
virtual Poll<bool> Step(T* output) = 0;
virtual void Stop() = 0;
// Initialize with one send ref (held by PipeSender) and one recv ref (held by
// PipeReceiver)
Center() {
send_refs_ = 1;
recv_refs_ = 1;
has_value_ = false;
}
protected:
inline virtual ~Promise() = default;
};
// Add one ref to the send side of this object, and return this.
Center* RefSend() {
send_refs_++;
return this;
}
struct alignas(alignof(void*)) Scratch {
uint8_t scratch[32];
};
// Add one ref to the recv side of this object, and return this.
Center* RefRecv() {
recv_refs_++;
return this;
}
template <typename T>
class FilterInterface {
public:
FilterInterface() = default;
FilterInterface(const FilterInterface&) = delete;
FilterInterface& operator=(const FilterInterface&) = delete;
virtual Promise<T>* Step(T* p, Scratch* scratch_space) = 0;
virtual void UpdateReceiver(PipeReceiver<T>* receiver) = 0;
protected:
inline virtual ~FilterInterface() {}
static void SetReceiverIndex(PipeReceiver<T>* receiver, int idx,
FilterInterface* p);
char AllocIndex(PipeReceiver<T>* receiver);
};
// Drop a send side ref
// If no send refs remain, wake due to send closure
// If no refs remain, destroy this object
void UnrefSend() {
GPR_DEBUG_ASSERT(send_refs_ > 0);
send_refs_--;
if (0 == send_refs_) {
on_full_.Wake();
on_empty_.Wake();
if (0 == recv_refs_) {
this->~Center();
}
}
}
template <typename T, typename F>
class Filter;
// Drop a recv side ref
// If no recv refs remain, wake due to recv closure
// If no refs remain, destroy this object
void UnrefRecv() {
GPR_DEBUG_ASSERT(recv_refs_ > 0);
recv_refs_--;
if (0 == recv_refs_) {
on_full_.Wake();
on_empty_.Wake();
if (0 == send_refs_) {
this->~Center();
} else if (has_value_) {
ResetValue();
}
}
}
// Try to push *value into the pipe.
// Return Pending if there is no space.
// Return true if the value was pushed.
// Return false if the recv end is closed.
Poll<bool> Push(T* value) {
GPR_DEBUG_ASSERT(send_refs_ != 0);
if (recv_refs_ == 0) return false;
if (has_value_) return on_empty_.pending();
has_value_ = true;
value_ = std::move(*value);
on_full_.Wake();
return true;
}
// Try to receive a value from the pipe.
// Return Pending if there is no value.
// Return the value if one was retrieved.
// Return nullopt if the send end is closed and no value had been pushed.
Poll<absl::optional<T>> Next() {
GPR_DEBUG_ASSERT(recv_refs_ != 0);
if (!has_value_) {
if (send_refs_ == 0) return absl::nullopt;
return on_full_.pending();
}
has_value_ = false;
on_empty_.Wake();
return std::move(value_);
}
private:
void ResetValue() {
// Fancy dance to move out of value in the off chance that we reclaim some
// memory earlier.
[](T) {}(std::move(value_));
has_value_ = false;
}
T value_;
// Number of sending objects.
// 0 => send is closed.
// 1 ref each for PipeSender and Push.
uint8_t send_refs_ : 2;
// Number of receiving objects.
// 0 => recv is closed.
// 1 ref each for PipeReceiver and Next.
uint8_t recv_refs_ : 2;
// True iff there is a value in the pipe.
bool has_value_ : 1;
IntraActivityWaiter on_empty_;
IntraActivityWaiter on_full_;
};
} // namespace pipe_detail
@ -94,43 +172,18 @@ class PipeSender {
PipeSender(const PipeSender&) = delete;
PipeSender& operator=(const PipeSender&) = delete;
PipeSender(PipeSender&& other) noexcept
: receiver_(other.receiver_), push_(other.push_) {
if (receiver_ != nullptr) {
receiver_->sender_ = this;
other.receiver_ = nullptr;
}
if (push_ != nullptr) {
push_->sender_ = this;
other.push_ = nullptr;
}
PipeSender(PipeSender&& other) noexcept : center_(other.center_) {
other.center_ = nullptr;
}
PipeSender& operator=(PipeSender&& other) noexcept {
if (receiver_ != nullptr) {
receiver_->sender_ = nullptr;
}
if (push_ != nullptr) {
push_->sender_ = nullptr;
}
receiver_ = other.receiver_;
if (receiver_ != nullptr) {
receiver_->sender_ = this;
other.receiver_ = nullptr;
}
if (push_ != nullptr) {
push_->sender_ = this;
other.push_ = nullptr;
}
if (center_ != nullptr) center_->UnrefSend();
center_ = other.center_;
other.center_ = nullptr;
return *this;
}
~PipeSender() {
if (receiver_ != nullptr) {
receiver_->MarkClosed();
}
if (push_ != nullptr) {
push_->sender_ = nullptr;
}
if (center_ != nullptr) center_->UnrefSend();
}
// Send a single message along the pipe.
@ -139,21 +192,10 @@ class PipeSender {
// receiver is either closed or able to receive another message.
pipe_detail::Push<T> Push(T value);
// Attach a promise factory based filter to this pipe.
// The overall promise returned from this will be active until the pipe is
// closed. If this promise is cancelled before the pipe is closed, the pipe
// will close. The filter will be run _after_ any other registered filters.
template <typename F>
pipe_detail::Filter<T, F> Filter(F f);
private:
friend struct Pipe<T>;
friend class PipeReceiver<T>;
friend class pipe_detail::Next<T>;
friend class pipe_detail::Push<T>;
explicit PipeSender(PipeReceiver<T>* receiver) : receiver_(receiver) {}
PipeReceiver<T>* receiver_;
pipe_detail::Push<T>* push_ = nullptr;
explicit PipeSender(pipe_detail::Center<T>* center) : center_(center) {}
pipe_detail::Center<T>* center_;
};
// Receive end of a Pipe.
@ -163,56 +205,17 @@ class PipeReceiver {
PipeReceiver(const PipeReceiver&) = delete;
PipeReceiver& operator=(const PipeReceiver&) = delete;
PipeReceiver(PipeReceiver&& other) noexcept
: sender_(other.sender_),
next_(other.next_),
filters_(std::move(other.filters_)),
pending_(std::move(other.pending_)),
waiting_to_send_(std::move(other.waiting_to_send_)),
waiting_to_receive_(other.waiting_to_receive_) {
if (sender_ != nullptr) {
sender_->receiver_ = this;
other.sender_ = nullptr;
}
if (next_ != nullptr) {
next_->receiver_ = this;
other.next_ = nullptr;
}
for (auto filter : filters_) {
filter->UpdateReceiver(this);
}
PipeReceiver(PipeReceiver&& other) noexcept : center_(other.center_) {
other.center_ = nullptr;
}
PipeReceiver& operator=(PipeReceiver&& other) noexcept {
if (sender_ != nullptr) {
sender_->receiver_ = nullptr;
}
if (next_ != nullptr) {
next_->receiver_ = nullptr;
}
sender_ = other.sender_;
next_ = other.next_;
filters_ = std::move(other.filters_);
for (auto filter : filters_) {
filter->UpdateReceiver(this);
}
pending_ = std::move(other.pending_);
waiting_to_send_ = std::move(other.waiting_to_send_);
waiting_to_receive_ = std::move(other.waiting_to_receive_);
if (sender_ != nullptr) {
sender_->receiver_ = this;
other.sender_ = nullptr;
}
if (next_ != nullptr) {
next_->receiver_ = this;
other.next_ = nullptr;
}
if (center_ != nullptr) center_->UnrefRecv();
center_ = other.center_;
other.center_ = nullptr;
return *this;
}
~PipeReceiver() {
MarkClosed();
if (next_ != nullptr) {
next_->receiver_ = nullptr;
}
if (center_ != nullptr) center_->UnrefRecv();
}
// Receive a single message from the pipe.
@ -222,44 +225,10 @@ class PipeReceiver {
// available.
pipe_detail::Next<T> Next();
// Attach a promise factory based filter to this pipe.
// The overall promise returned from this will be active until the pipe is
// closed. If this promise is cancelled before the pipe is closed, the pipe
// will close. The filter will be run _after_ any other registered filters.
template <typename F>
pipe_detail::Filter<T, F> Filter(F f);
private:
friend struct Pipe<T>;
friend class PipeSender<T>;
friend class pipe_detail::Next<T>;
friend class pipe_detail::Push<T>;
friend class pipe_detail::FilterInterface<T>;
explicit PipeReceiver(PipeSender<T>* sender) : sender_(sender) {}
PipeSender<T>* sender_;
pipe_detail::Next<T>* next_ = nullptr;
absl::InlinedVector<pipe_detail::FilterInterface<T>*, 12> filters_;
absl::optional<T> pending_;
IntraActivityWaiter waiting_to_send_;
IntraActivityWaiter waiting_to_receive_;
void MarkClosed() {
if (sender_ == nullptr) {
return;
}
sender_->receiver_ = nullptr;
waiting_to_receive_.Wake();
waiting_to_send_.Wake();
sender_ = nullptr;
for (auto* filter : filters_) {
if (filter != nullptr) {
filter->UpdateReceiver(nullptr);
}
}
}
explicit PipeReceiver(pipe_detail::Center<T>* center) : center_(center) {}
pipe_detail::Center<T>* center_;
};
namespace pipe_detail {
@ -271,55 +240,28 @@ class Push {
Push(const Push&) = delete;
Push& operator=(const Push&) = delete;
Push(Push&& other) noexcept
: sender_(other.sender_), push_(std::move(other.push_)) {
if (sender_ != nullptr) {
sender_->push_ = this;
other.sender_ = nullptr;
}
: center_(other.center_), push_(std::move(other.push_)) {
other.center_ = nullptr;
}
Push& operator=(Push&& other) noexcept {
if (sender_ != nullptr) {
sender_->push_ = nullptr;
}
sender_ = other.sender_;
if (center_ != nullptr) center_->UnrefSend();
center_ = other.center_;
other.center_ = nullptr;
push_ = std::move(other.push_);
if (sender_ != nullptr) {
sender_->push_ = this;
other.sender_ = nullptr;
}
return *this;
}
~Push() {
if (sender_ != nullptr) {
assert(sender_->push_ == this);
sender_->push_ = nullptr;
}
if (center_ != nullptr) center_->UnrefSend();
}
Poll<bool> operator()() {
auto* receiver = sender_->receiver_;
if (receiver == nullptr) {
return false;
}
if (receiver->pending_.has_value()) {
return receiver->waiting_to_send_.pending();
}
receiver->pending_ = std::move(push_);
receiver->waiting_to_receive_.Wake();
sender_->push_ = nullptr;
sender_ = nullptr;
return true;
}
Poll<bool> operator()() { return center_->Push(&push_); }
private:
friend class PipeSender<T>;
Push(PipeSender<T>* sender, T push)
: sender_(sender), push_(std::move(push)) {
assert(sender_->push_ == nullptr);
sender_->push_ = this;
}
PipeSender<T>* sender_;
explicit Push(pipe_detail::Center<T>* center, T push)
: center_(center), push_(std::move(push)) {}
Center<T>* center_;
T push_;
};
@ -329,262 +271,56 @@ class Next {
public:
Next(const Next&) = delete;
Next& operator=(const Next&) = delete;
Next(Next&& other) noexcept
: receiver_(other.receiver_),
next_filter_(other.next_filter_),
current_promise_(nullptr) {
assert(other.current_promise_ == nullptr);
if (receiver_ != nullptr) {
receiver_->next_ = this;
other.receiver_ = nullptr;
}
Next(Next&& other) noexcept : center_(other.center_) {
other.center_ = nullptr;
}
Next& operator=(Next&& other) noexcept {
assert(current_promise_ == nullptr);
assert(other.current_promise_ == nullptr);
if (receiver_ != nullptr) {
receiver_->next_ = nullptr;
}
receiver_ = other.receiver_;
next_filter_ = other.next_filter_;
if (receiver_ != nullptr) {
receiver_->next_ = this;
other.receiver_ = nullptr;
}
if (center_ != nullptr) center_->UnrefRecv();
center_ = other.center_;
other.center_ = nullptr;
return *this;
}
~Next() {
if (receiver_ != nullptr) {
assert(receiver_->next_ == this);
receiver_->next_ = nullptr;
}
if (current_promise_ != nullptr) {
current_promise_->Stop();
}
if (center_ != nullptr) center_->UnrefRecv();
}
Poll<absl::optional<T>> operator()() {
if (receiver_->pending_.has_value()) {
auto* pending = &*receiver_->pending_;
if (current_promise_ != nullptr) {
auto r = current_promise_->Step(pending);
if (auto* p = absl::get_if<kPollReadyIdx>(&r)) {
current_promise_->Stop();
current_promise_ = nullptr;
if (!*p) {
receiver_->MarkClosed();
return absl::optional<T>();
}
} else {
return Pending();
}
}
while (true) {
if (next_filter_ >= receiver_->filters_.size()) {
auto result = absl::optional<T>(std::move(*pending));
receiver_->pending_.reset();
receiver_->waiting_to_send_.Wake();
receiver_->next_ = nullptr;
receiver_ = nullptr;
return result;
}
auto* filter = receiver_->filters_[next_filter_];
current_promise_ = filter ? filter->Step(pending, &scratch_) : nullptr;
next_filter_++;
if (current_promise_ ==
reinterpret_cast<Promise<T>*>(uintptr_t(false))) {
current_promise_ = nullptr;
receiver_->MarkClosed();
return absl::optional<T>();
} else if (current_promise_ ==
reinterpret_cast<Promise<T>*>(uintptr_t(true))) {
current_promise_ = nullptr;
} else {
return Pending();
}
}
}
if (receiver_->sender_ == nullptr) {
return absl::optional<T>();
}
return receiver_->waiting_to_receive_.pending();
}
Poll<absl::optional<T>> operator()() { return center_->Next(); }
private:
friend class PipeReceiver<T>;
explicit Next(PipeReceiver<T>* receiver) : receiver_(receiver) {
assert(receiver_->next_ == nullptr);
receiver_->next_ = this;
}
PipeReceiver<T>* receiver_;
size_t next_filter_ = 0;
Promise<T>* current_promise_ = nullptr;
Scratch scratch_;
};
template <typename T, typename F>
class Filter final : private FilterInterface<T> {
public:
Filter(PipeReceiver<T>* receiver, F f)
: active_{receiver, promise_detail::PromiseFactory<T, F>(std::move(f))},
index_(this->AllocIndex(receiver)){};
explicit Filter(absl::Status already_finished)
: done_(std::move(already_finished)) {}
~Filter() {
if (index_ != kTombstoneIndex) {
this->SetReceiverIndex(active_.receiver, index_, nullptr);
active_.~Active();
} else {
done_.~Status();
}
}
Filter(Filter&& other) noexcept : index_(other.index_) {
if (index_ != kTombstoneIndex) {
new (&active_) Active(std::move(other.active_));
other.active_.~Active();
new (&other.done_) absl::Status(absl::OkStatus());
other.index_ = kTombstoneIndex;
this->SetReceiverIndex(active_.receiver, index_, this);
} else {
new (&done_) absl::Status(std::move(other.done_));
}
}
Filter(const Filter&) = delete;
Filter& operator=(const Filter&) = delete;
Poll<absl::Status> operator()() {
if (index_ == kTombstoneIndex) {
return std::move(done_);
}
return Pending();
}
private:
static constexpr char kTombstoneIndex = -1;
struct Active {
GPR_NO_UNIQUE_ADDRESS PipeReceiver<T>* receiver;
GPR_NO_UNIQUE_ADDRESS promise_detail::PromiseFactory<T, F> factory;
};
union {
GPR_NO_UNIQUE_ADDRESS Active active_;
GPR_NO_UNIQUE_ADDRESS absl::Status done_;
};
GPR_NO_UNIQUE_ADDRESS char index_;
class PromiseImpl final : public ::grpc_core::pipe_detail::Promise<T> {
using PF = typename promise_detail::PromiseFactory<T, F>::Promise;
public:
PromiseImpl(PF f, Filter* filter) : f_(std::move(f)), filter_(filter) {}
Poll<bool> Step(T* output) final {
auto r = f_();
if (auto* p = absl::get_if<kPollReadyIdx>(&r)) {
if (p->ok()) {
*output = std::move(**p);
return true;
} else {
filter_->SetReceiverIndex(filter_->active_.receiver, filter_->index_,
nullptr);
filter_->active_.~Active();
filter_->index_ = kTombstoneIndex;
new (&filter_->done_) absl::Status(std::move(p->status()));
Activity::WakeupCurrent();
return false;
}
} else {
return Pending();
}
}
void Stop() final { this->~PromiseImpl(); }
private:
PF f_;
Filter* filter_;
};
Promise<T>* Step(T* p, Scratch* scratch) final {
if (index_ != kTombstoneIndex) {
PromiseImpl promise(active_.factory.Repeated(std::move(*p)), this);
auto r = promise.Step(p);
if (auto* result = absl::get_if<kPollReadyIdx>(&r)) {
return reinterpret_cast<Promise<T>*>(uintptr_t(*result));
}
static_assert(sizeof(promise) <= sizeof(Scratch),
"scratch size too small");
static_assert(alignof(decltype(promise)) <= alignof(Scratch),
"bad alignment");
return new (scratch) decltype(promise)(std::move(promise));
} else {
return nullptr;
}
}
void UpdateReceiver(PipeReceiver<T>* receiver) final {
if (index_ != kTombstoneIndex) {
if (receiver == nullptr) {
active_.~Active();
index_ = kTombstoneIndex;
new (&done_) absl::Status(absl::OkStatus());
} else {
active_.receiver = receiver;
}
Activity::WakeupCurrent();
}
}
explicit Next(pipe_detail::Center<T>* center) : center_(center) {}
Center<T>* center_;
};
template <typename T>
void FilterInterface<T>::SetReceiverIndex(PipeReceiver<T>* receiver, int idx,
FilterInterface* p) {
receiver->filters_[idx] = p;
}
template <typename T>
char FilterInterface<T>::AllocIndex(PipeReceiver<T>* receiver) {
auto r = receiver->filters_.size();
receiver->filters_.push_back(this);
return r;
}
} // namespace pipe_detail
template <typename T>
pipe_detail::Push<T> PipeSender<T>::Push(T value) {
return pipe_detail::Push<T>(this, std::move(value));
return pipe_detail::Push<T>(center_->RefSend(), std::move(value));
}
template <typename T>
pipe_detail::Next<T> PipeReceiver<T>::Next() {
return pipe_detail::Next<T>(this);
}
template <typename T>
template <typename F>
pipe_detail::Filter<T, F> PipeSender<T>::Filter(F f) {
if (receiver_) {
return pipe_detail::Filter<T, F>(receiver_, std::move(f));
} else {
return pipe_detail::Filter<T, F>(absl::OkStatus());
}
}
template <typename T>
template <typename F>
pipe_detail::Filter<T, F> PipeReceiver<T>::Filter(F f) {
return pipe_detail::Filter<T, F>(this, std::move(f));
return pipe_detail::Next<T>(center_->RefRecv());
}
// A Pipe is an intra-Activity communications channel that transmits T's from
// one end to the other.
// It is only safe to use a Pipe within the context of a single Activity.
// No synchronization is performed internally.
// The primary Pipe data structure is allocated from an arena, so the activity
// must have an arena as part of its context.
// By performing that allocation we can ensure stable pointer to shared data
// allowing PipeSender/PipeReceiver/Push/Next to be relatively simple in their
// implementation.
// This type has been optimized with the expectation that there are relatively
// few pipes per activity. If this assumption does not hold then a design
// allowing inline filtering of pipe contents (instead of connecting pipes with
// polling code) would likely be more appropriate.
template <typename T>
struct Pipe {
Pipe() : sender(&receiver), receiver(&sender) {}
Pipe() : Pipe(GetContext<Arena>()->New<pipe_detail::Center<T>>()) {}
Pipe(const Pipe&) = delete;
Pipe& operator=(const Pipe&) = delete;
Pipe(Pipe&&) noexcept = default;
@ -592,6 +328,10 @@ struct Pipe {
PipeSender<T> sender;
PipeReceiver<T> receiver;
private:
explicit Pipe(pipe_detail::Center<T>* center)
: sender(center), receiver(center) {}
};
} // namespace grpc_core

@ -260,6 +260,7 @@ grpc_cc_test(
"//:map",
"//:observable",
"//:pipe",
"//:resource_quota",
"//:seq",
"//test/core/util:grpc_suppressions",
],
@ -276,6 +277,7 @@ grpc_cc_test(
"//:join",
"//:pipe",
"//:promise",
"//:resource_quota",
"//:seq",
"//test/core/util:grpc_suppressions",
],

@ -22,6 +22,7 @@
#include "src/core/lib/promise/observable.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "test/core/promise/test_wakeup_schedulers.h"
using testing::Mock;
@ -30,22 +31,25 @@ using testing::StrictMock;
namespace grpc_core {
static auto* g_memory_allocator = new MemoryAllocator(
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"));
TEST(ForEachTest, SendThriceWithPipe) {
Pipe<int> pipe;
int num_received = 0;
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[&pipe, &num_received] {
[&num_received] {
Pipe<int> pipe;
auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
absl::make_unique<PipeSender<int>>(std::move(pipe.sender)));
return Map(
Join(
// Push 3 things into a pipe -- 1, 2, then 3 -- then close.
Seq(
pipe.sender.Push(1),
[&pipe] { return pipe.sender.Push(2); },
[&pipe] { return pipe.sender.Push(3); },
[&pipe] {
auto drop = std::move(pipe.sender);
Seq((*sender)->Push(1), [sender] { return (*sender)->Push(2); },
[sender] { return (*sender)->Push(3); },
[sender] {
sender->reset();
return absl::OkStatus();
}),
// Use a ForEach loop to read them out and verify all values are
@ -59,7 +63,8 @@ TEST(ForEachTest, SendThriceWithPipe) {
JustElem<1>());
},
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
[&on_done](absl::Status status) { on_done.Call(std::move(status)); },
MakeScopedArena(1024, g_memory_allocator));
Mock::VerifyAndClearExpectations(&on_done);
EXPECT_EQ(num_received, 3);
}

@ -22,6 +22,7 @@
#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "test/core/promise/test_wakeup_schedulers.h"
using testing::MockFunction;
@ -29,12 +30,15 @@ using testing::StrictMock;
namespace grpc_core {
static auto* g_memory_allocator = new MemoryAllocator(
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"));
TEST(PipeTest, CanSendAndReceive) {
Pipe<int> pipe;
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[&pipe] {
[] {
Pipe<int> pipe;
return Seq(
// Concurrently: send 42 into the pipe, and receive from the pipe.
Join(pipe.sender.Push(42), pipe.receiver.Next()),
@ -46,15 +50,16 @@ TEST(PipeTest, CanSendAndReceive) {
});
},
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
[&on_done](absl::Status status) { on_done.Call(std::move(status)); },
MakeScopedArena(1024, g_memory_allocator));
}
TEST(PipeTest, CanReceiveAndSend) {
Pipe<int> pipe;
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[&pipe] {
[] {
Pipe<int> pipe;
return Seq(
// Concurrently: receive from the pipe, and send 42 into the pipe.
Join(pipe.receiver.Next(), pipe.sender.Push(42)),
@ -66,28 +71,29 @@ TEST(PipeTest, CanReceiveAndSend) {
});
},
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
[&on_done](absl::Status status) { on_done.Call(std::move(status)); },
MakeScopedArena(1024, g_memory_allocator));
}
TEST(PipeTest, CanSeeClosedOnSend) {
Pipe<int> pipe;
StrictMock<MockFunction<void(absl::Status)>> on_done;
auto sender = std::move(pipe.sender);
auto receiver =
absl::make_unique<PipeReceiver<int>>(std::move(pipe.receiver));
EXPECT_CALL(on_done, Call(absl::OkStatus()));
// Push 42 onto the pipe - this will the pipe's one-deep send buffer.
EXPECT_TRUE(NowOrNever(sender.Push(42)).has_value());
MakeActivity(
[&sender, &receiver] {
[] {
Pipe<int> pipe;
auto sender = std::move(pipe.sender);
// Push 42 onto the pipe - this will the pipe's one-deep send buffer.
EXPECT_TRUE(NowOrNever(sender.Push(42)).has_value());
auto receiver = std::make_shared<std::unique_ptr<PipeReceiver<int>>>(
absl::make_unique<PipeReceiver<int>>(std::move(pipe.receiver)));
return Seq(
// Concurrently:
// - push 43 into the sender, which will stall because the buffer is
// full
// - and close the receiver, which will fail the pending send.
Join(sender.Push(43),
[&receiver] {
receiver.reset();
[receiver] {
receiver->reset();
return absl::OkStatus();
}),
// Verify both that the send failed and that we executed the close.
@ -97,17 +103,19 @@ TEST(PipeTest, CanSeeClosedOnSend) {
});
},
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
[&on_done](absl::Status status) { on_done.Call(std::move(status)); },
MakeScopedArena(1024, g_memory_allocator));
}
TEST(PipeTest, CanSeeClosedOnReceive) {
Pipe<int> pipe;
StrictMock<MockFunction<void(absl::Status)>> on_done;
auto sender = absl::make_unique<PipeSender<int>>(std::move(pipe.sender));
auto receiver = std::move(pipe.receiver);
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[&sender, &receiver] {
[] {
Pipe<int> pipe;
auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
absl::make_unique<PipeSender<int>>(std::move(pipe.sender)));
auto receiver = std::move(pipe.receiver);
return Seq(
// Concurrently:
// - wait for a received value (will stall forever since we push
@ -115,8 +123,8 @@ TEST(PipeTest, CanSeeClosedOnReceive) {
// - close the sender, which will signal the receiver to return an
// end-of-stream.
Join(receiver.Next(),
[&sender] {
sender.reset();
[sender] {
sender->reset();
return absl::OkStatus();
}),
// Verify we received end-of-stream and closed the sender.
@ -127,49 +135,8 @@ TEST(PipeTest, CanSeeClosedOnReceive) {
});
},
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
TEST(PipeTest, CanFilter) {
Pipe<int> pipe;
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[&pipe] {
// Setup some filters here, carefully getting ordering correct by doing
// so outside of the Join() since C++ does not define execution order
// between arguments.
// TODO(ctiller): A future change to Pipe will specify an ordering
// between filters added to sender and receiver, at which point these
// should move back.
auto doubler = pipe.receiver.Filter(
[](int p) { return absl::StatusOr<int>(p * 2); });
auto adder = pipe.sender.Filter(
[](int p) { return absl::StatusOr<int>(p + 1); });
return Seq(
// Concurrently:
// - push 42 into the pipe
// - wait for a value to be received, and filter it by doubling it
// - wait for a value to be received, and filter it by adding one to
// it
// - wait for a value to be received and close the pipe.
Join(pipe.sender.Push(42), std::move(doubler), std::move(adder),
Seq(pipe.receiver.Next(),
[&pipe](absl::optional<int> i) {
auto x = std::move(pipe.receiver);
return i;
})),
// Verify all of the above happened correctly.
[](std::tuple<bool, absl::Status, absl::Status, absl::optional<int>>
result) {
EXPECT_EQ(result, std::make_tuple(true, absl::OkStatus(),
absl::OkStatus(),
absl::optional<int>(85)));
return absl::OkStatus();
});
},
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
[&on_done](absl::Status status) { on_done.Call(std::move(status)); },
MakeScopedArena(1024, g_memory_allocator));
}
} // namespace grpc_core

Loading…
Cancel
Save