[promise] Introduce map_pipe, cleanup factories (#31430)

* [promise] Introduce map_pipe, cleanup factories

* Automated change: Fix sanity tests

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/31462/head
Craig Tiller 2 years ago committed by GitHub
parent ab3d62ae8f
commit 58c628a7ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 72
      CMakeLists.txt
  2. 2
      Makefile
  3. 101
      build_autogenerated.yaml
  4. 1
      config.m4
  5. 1
      config.w32
  6. 1
      gRPC-Core.podspec
  7. 1
      grpc.gemspec
  8. 2
      grpc.gyp
  9. 1
      package.xml
  10. 28
      src/core/BUILD
  11. 4
      src/core/lib/promise/activity.h
  12. 2
      src/core/lib/promise/detail/basic_seq.h
  13. 68
      src/core/lib/promise/detail/promise_factory.h
  14. 5
      src/core/lib/promise/detail/status.h
  15. 4
      src/core/lib/promise/for_each.h
  16. 12
      src/core/lib/promise/if.h
  17. 8
      src/core/lib/promise/loop.h
  18. 87
      src/core/lib/promise/map_pipe.h
  19. 19
      src/core/lib/promise/pipe.cc
  20. 43
      src/core/lib/promise/pipe.h
  21. 8
      src/core/lib/promise/seq.h
  22. 19
      src/core/lib/promise/try_seq.h
  23. 1
      src/python/grpcio/grpc_core_dependencies.py
  24. 22
      test/core/promise/BUILD
  25. 153
      test/core/promise/map_pipe_test.cc
  26. 29
      test/core/promise/promise_factory_test.cc
  27. 1
      tools/doxygen/Doxyfile.c++.internal
  28. 1
      tools/doxygen/Doxyfile.core.internal
  29. 24
      tools/run_tests/generated/tests.json

72
CMakeLists.txt generated

@ -1024,6 +1024,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx lock_free_event_test)
add_dependencies(buildtests_cxx log_test)
add_dependencies(buildtests_cxx loop_test)
add_dependencies(buildtests_cxx map_pipe_test)
add_dependencies(buildtests_cxx match_test)
add_dependencies(buildtests_cxx matchers_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
@ -2192,6 +2193,7 @@ add_library(grpc
src/core/lib/load_balancing/lb_policy_registry.cc
src/core/lib/matchers/matchers.cc
src/core/lib/promise/activity.cc
src/core/lib/promise/pipe.cc
src/core/lib/promise/sleep.cc
src/core/lib/resolver/resolver.cc
src/core/lib/resolver/resolver_registry.cc
@ -2841,6 +2843,7 @@ add_library(grpc_unsecure
src/core/lib/load_balancing/lb_policy.cc
src/core/lib/load_balancing/lb_policy_registry.cc
src/core/lib/promise/activity.cc
src/core/lib/promise/pipe.cc
src/core/lib/promise/sleep.cc
src/core/lib/resolver/resolver.cc
src/core/lib/resolver/resolver_registry.cc
@ -10244,6 +10247,7 @@ add_executable(for_each_test
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/iomgr_internal.cc
src/core/lib/promise/activity.cc
src/core/lib/promise/pipe.cc
src/core/lib/resource_quota/arena.cc
src/core/lib/resource_quota/memory_quota.cc
src/core/lib/resource_quota/periodic_update.cc
@ -13295,6 +13299,73 @@ target_link_libraries(loop_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(map_pipe_test
src/core/ext/upb-generated/google/protobuf/any.upb.c
src/core/ext/upb-generated/google/rpc/status.upb.c
src/core/lib/debug/trace.cc
src/core/lib/event_engine/memory_allocator.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/iomgr_internal.cc
src/core/lib/promise/activity.cc
src/core/lib/promise/pipe.cc
src/core/lib/resource_quota/arena.cc
src/core/lib/resource_quota/memory_quota.cc
src/core/lib/resource_quota/periodic_update.cc
src/core/lib/resource_quota/resource_quota.cc
src/core/lib/resource_quota/thread_quota.cc
src/core/lib/resource_quota/trace.cc
src/core/lib/slice/percent_encoding.cc
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_string_helpers.cc
test/core/promise/map_pipe_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(map_pipe_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(map_pipe_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
absl::any_invocable
absl::function_ref
absl::hash
absl::type_traits
absl::statusor
absl::utility
gpr
upb
)
endif()
if(gRPC_BUILD_TESTS)
@ -14578,6 +14649,7 @@ add_executable(pipe_test
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/iomgr_internal.cc
src/core/lib/promise/activity.cc
src/core/lib/promise/pipe.cc
src/core/lib/resource_quota/arena.cc
src/core/lib/resource_quota/memory_quota.cc
src/core/lib/resource_quota/periodic_update.cc

2
Makefile generated

@ -1508,6 +1508,7 @@ LIBGRPC_SRC = \
src/core/lib/load_balancing/lb_policy_registry.cc \
src/core/lib/matchers/matchers.cc \
src/core/lib/promise/activity.cc \
src/core/lib/promise/pipe.cc \
src/core/lib/promise/sleep.cc \
src/core/lib/resolver/resolver.cc \
src/core/lib/resolver/resolver_registry.cc \
@ -2020,6 +2021,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/load_balancing/lb_policy.cc \
src/core/lib/load_balancing/lb_policy_registry.cc \
src/core/lib/promise/activity.cc \
src/core/lib/promise/pipe.cc \
src/core/lib/promise/sleep.cc \
src/core/lib/resolver/resolver.cc \
src/core/lib/resolver/resolver_registry.cc \

@ -1588,6 +1588,7 @@ libs:
- src/core/lib/load_balancing/lb_policy_registry.cc
- src/core/lib/matchers/matchers.cc
- src/core/lib/promise/activity.cc
- src/core/lib/promise/pipe.cc
- src/core/lib/promise/sleep.cc
- src/core/lib/resolver/resolver.cc
- src/core/lib/resolver/resolver_registry.cc
@ -2488,6 +2489,7 @@ libs:
- src/core/lib/load_balancing/lb_policy.cc
- src/core/lib/load_balancing/lb_policy_registry.cc
- src/core/lib/promise/activity.cc
- src/core/lib/promise/pipe.cc
- src/core/lib/promise/sleep.cc
- src/core/lib/resolver/resolver.cc
- src/core/lib/resolver/resolver_registry.cc
@ -6458,6 +6460,7 @@ targets:
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/iomgr_internal.cc
- src/core/lib/promise/activity.cc
- src/core/lib/promise/pipe.cc
- src/core/lib/resource_quota/arena.cc
- src/core/lib/resource_quota/memory_quota.cc
- src/core/lib/resource_quota/periodic_update.cc
@ -7822,6 +7825,103 @@ targets:
- absl/types:variant
- absl/utility:utility
uses_polling: false
- name: map_pipe_test
gtest: true
build: test
language: c++
headers:
- src/core/ext/upb-generated/google/protobuf/any.upb.h
- src/core/ext/upb-generated/google/rpc/status.upb.h
- src/core/lib/debug/trace.h
- src/core/lib/experiments/config.h
- src/core/lib/experiments/experiments.h
- src/core/lib/gpr/spinlock.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/debug_location.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
- src/core/lib/gprpp/status_helper.h
- src/core/lib/gprpp/time.h
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/combiner.h
- src/core/lib/iomgr/error.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/iomgr_internal.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_join.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/for_each.h
- src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/join.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/map_pipe.h
- src/core/lib/promise/observable.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/promise/try_seq.h
- src/core/lib/promise/wait_set.h
- src/core/lib/resource_quota/arena.h
- src/core/lib/resource_quota/memory_quota.h
- src/core/lib/resource_quota/periodic_update.h
- src/core/lib/resource_quota/resource_quota.h
- src/core/lib/resource_quota/thread_quota.h
- src/core/lib/resource_quota/trace.h
- src/core/lib/slice/percent_encoding.h
- src/core/lib/slice/slice.h
- src/core/lib/slice/slice_internal.h
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_string_helpers.h
- test/core/promise/test_wakeup_schedulers.h
src:
- src/core/ext/upb-generated/google/protobuf/any.upb.c
- src/core/ext/upb-generated/google/rpc/status.upb.c
- src/core/lib/debug/trace.cc
- src/core/lib/event_engine/memory_allocator.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/iomgr_internal.cc
- src/core/lib/promise/activity.cc
- src/core/lib/promise/pipe.cc
- src/core/lib/resource_quota/arena.cc
- src/core/lib/resource_quota/memory_quota.cc
- src/core/lib/resource_quota/periodic_update.cc
- src/core/lib/resource_quota/resource_quota.cc
- src/core/lib/resource_quota/thread_quota.cc
- src/core/lib/resource_quota/trace.cc
- src/core/lib/slice/percent_encoding.cc
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_string_helpers.cc
- test/core/promise/map_pipe_test.cc
deps:
- absl/container:flat_hash_set
- absl/functional:any_invocable
- absl/functional:function_ref
- absl/hash:hash
- absl/meta:type_traits
- absl/status:statusor
- absl/utility:utility
- gpr
- upb
uses_polling: false
- name: match_test
gtest: true
build: test
@ -8437,6 +8537,7 @@ targets:
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/iomgr_internal.cc
- src/core/lib/promise/activity.cc
- src/core/lib/promise/pipe.cc
- src/core/lib/resource_quota/arena.cc
- src/core/lib/resource_quota/memory_quota.cc
- src/core/lib/resource_quota/periodic_update.cc

1
config.m4 generated

@ -632,6 +632,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/load_balancing/lb_policy_registry.cc \
src/core/lib/matchers/matchers.cc \
src/core/lib/promise/activity.cc \
src/core/lib/promise/pipe.cc \
src/core/lib/promise/sleep.cc \
src/core/lib/resolver/resolver.cc \
src/core/lib/resolver/resolver_registry.cc \

1
config.w32 generated

@ -598,6 +598,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\load_balancing\\lb_policy_registry.cc " +
"src\\core\\lib\\matchers\\matchers.cc " +
"src\\core\\lib\\promise\\activity.cc " +
"src\\core\\lib\\promise\\pipe.cc " +
"src\\core\\lib\\promise\\sleep.cc " +
"src\\core\\lib\\resolver\\resolver.cc " +
"src\\core\\lib\\resolver\\resolver_registry.cc " +

1
gRPC-Core.podspec generated

@ -1381,6 +1381,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/latch.h',
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',
'src/core/lib/promise/pipe.cc',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',
'src/core/lib/promise/promise.h',

1
grpc.gemspec generated

@ -1292,6 +1292,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/promise/latch.h )
s.files += %w( src/core/lib/promise/loop.h )
s.files += %w( src/core/lib/promise/map.h )
s.files += %w( src/core/lib/promise/pipe.cc )
s.files += %w( src/core/lib/promise/pipe.h )
s.files += %w( src/core/lib/promise/poll.h )
s.files += %w( src/core/lib/promise/promise.h )

2
grpc.gyp generated

@ -922,6 +922,7 @@
'src/core/lib/load_balancing/lb_policy_registry.cc',
'src/core/lib/matchers/matchers.cc',
'src/core/lib/promise/activity.cc',
'src/core/lib/promise/pipe.cc',
'src/core/lib/promise/sleep.cc',
'src/core/lib/resolver/resolver.cc',
'src/core/lib/resolver/resolver_registry.cc',
@ -1381,6 +1382,7 @@
'src/core/lib/load_balancing/lb_policy.cc',
'src/core/lib/load_balancing/lb_policy_registry.cc',
'src/core/lib/promise/activity.cc',
'src/core/lib/promise/pipe.cc',
'src/core/lib/promise/sleep.cc',
'src/core/lib/resolver/resolver.cc',
'src/core/lib/resolver/resolver_registry.cc',

1
package.xml generated

@ -1274,6 +1274,7 @@
<file baseinstalldir="/" name="src/core/lib/promise/latch.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/loop.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/map.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/pipe.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/pipe.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/poll.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/promise.h" role="src" />

@ -361,6 +361,22 @@ grpc_cc_library(
deps = ["//:gpr_platform"],
)
grpc_cc_library(
name = "map_pipe",
external_deps = ["absl/status"],
language = "c++",
public_hdrs = [
"lib/promise/map_pipe.h",
],
deps = [
"for_each",
"map",
"pipe",
"promise_factory",
"//:gpr_platform",
],
)
grpc_cc_library(
name = "call_push_pull",
hdrs = ["lib/promise/call_push_pull.h"],
@ -733,20 +749,26 @@ grpc_cc_library(
grpc_cc_library(
name = "pipe",
srcs = [
"lib/promise/pipe.cc",
],
hdrs = [
"lib/promise/pipe.h",
],
external_deps = [
"absl/strings",
"absl/types:optional",
"absl/types:variant",
],
language = "c++",
public_hdrs = [
"lib/promise/pipe.h",
],
deps = [
"activity",
"arena",
"context",
"intra_activity_waiter",
"poll",
"//:gpr",
"//:grpc_trace",
],
)

@ -405,7 +405,7 @@ template <class F, class WakeupScheduler, class OnDone, typename... Contexts>
class PromiseActivity final : public FreestandingActivity,
private ActivityContexts<Contexts...> {
public:
using Factory = PromiseFactory<void, F>;
using Factory = OncePromiseFactory<void, F>;
using ResultType = typename Factory::Promise::Result;
PromiseActivity(F promise_factory, WakeupScheduler wakeup_scheduler,
@ -533,7 +533,7 @@ class PromiseActivity final : public FreestandingActivity,
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
ScopedActivity scoped_activity(this);
ScopedContext contexts(this);
Construct(&promise_holder_.promise, promise_factory.Once());
Construct(&promise_holder_.promise, promise_factory.Make());
return StepLoop();
}

@ -68,7 +68,7 @@ struct SeqStateTypes {
// Wrap the factory callable in our factory wrapper to deal with common edge
// cases. We use the 'unwrapped type' from the traits, so for instance, TrySeq
// can pass back a T from a StatusOr<T>.
using Next = promise_detail::PromiseFactory<
using Next = promise_detail::OncePromiseFactory<
typename PromiseResultTraits::UnwrappedType, FNext>;
};

@ -74,6 +74,20 @@ struct ResultOfT<F(Args...),
using T = decltype(std::declval<RemoveCVRef<F>>()(std::declval<Args>()...));
};
template <typename F, typename... Args>
struct ResultOfT<F(Args...)&,
absl::void_t<decltype(std::declval<RemoveCVRef<F>>()(
std::declval<Args>()...))>> {
using T = decltype(std::declval<RemoveCVRef<F>>()(std::declval<Args>()...));
};
template <typename F, typename... Args>
struct ResultOfT<const F(Args...)&,
absl::void_t<decltype(std::declval<RemoveCVRef<F>>()(
std::declval<Args>()...))>> {
using T = decltype(std::declval<RemoveCVRef<F>>()(std::declval<Args>()...));
};
template <typename T>
using ResultOf = typename ResultOfT<T>::T;
@ -127,6 +141,14 @@ PromiseFactoryImpl(F&& f, A&& arg) {
return f(std::forward<A>(arg));
}
// Given a callable(A) -> Promise<T>, name it a PromiseFactory and use it.
template <typename A, typename F>
absl::enable_if_t<IsVoidCallable<ResultOf<F(A)>>::value,
PromiseLike<decltype(std::declval<F>()(std::declval<A>()))>>
PromiseFactoryImpl(F& f, A&& arg) {
return f(std::forward<A>(arg));
}
// Given a callable() -> Promise<T>, promote it to a
// PromiseFactory(A) -> Promise<T> by dropping the first argument.
template <typename A, typename F>
@ -145,7 +167,7 @@ PromiseFactoryImpl(F&& f) {
};
template <typename A, typename F>
class PromiseFactory {
class OncePromiseFactory {
private:
GPR_NO_UNIQUE_ADDRESS F f_;
@ -154,31 +176,57 @@ class PromiseFactory {
using Promise =
decltype(PromiseFactoryImpl(std::move(f_), std::declval<A>()));
explicit PromiseFactory(F f) : f_(std::move(f)) {}
explicit OncePromiseFactory(F f) : f_(std::move(f)) {}
Promise Once(Arg&& a) {
Promise Make(Arg&& a) {
return PromiseFactoryImpl(std::move(f_), std::forward<Arg>(a));
}
};
Promise Repeated(Arg&& a) const {
template <typename F>
class OncePromiseFactory<void, F> {
private:
GPR_NO_UNIQUE_ADDRESS F f_;
public:
using Arg = void;
using Promise = decltype(PromiseFactoryImpl(std::move(f_)));
explicit OncePromiseFactory(F f) : f_(std::move(f)) {}
Promise Make() { return PromiseFactoryImpl(std::move(f_)); }
};
template <typename A, typename F>
class RepeatedPromiseFactory {
private:
GPR_NO_UNIQUE_ADDRESS F f_;
public:
using Arg = A;
using Promise = decltype(PromiseFactoryImpl(f_, std::declval<A>()));
explicit RepeatedPromiseFactory(F f) : f_(std::move(f)) {}
Promise Make(Arg&& a) const {
return PromiseFactoryImpl(f_, std::forward<Arg>(a));
}
Promise Make(Arg&& a) { return PromiseFactoryImpl(f_, std::forward<Arg>(a)); }
};
template <typename F>
class PromiseFactory<void, F> {
class RepeatedPromiseFactory<void, F> {
private:
GPR_NO_UNIQUE_ADDRESS F f_;
public:
using Arg = void;
using Promise = decltype(PromiseFactoryImpl(std::move(f_)));
explicit PromiseFactory(F f) : f_(std::move(f)) {}
using Promise = decltype(PromiseFactoryImpl(f_));
Promise Once() { return PromiseFactoryImpl(std::move(f_)); }
explicit RepeatedPromiseFactory(F f) : f_(std::move(f)) {}
Promise Repeated() const { return PromiseFactoryImpl(f_); }
Promise Make() const { return PromiseFactoryImpl(f_); }
Promise Make() { return PromiseFactoryImpl(f_); }
};
} // namespace promise_detail

@ -53,6 +53,11 @@ struct StatusCastImpl<To, To> {
static To Cast(To&& t) { return std::move(t); }
};
template <typename To>
struct StatusCastImpl<To, const To&> {
static To Cast(const To& t) { return t; }
};
template <typename T>
struct StatusCastImpl<absl::StatusOr<T>, absl::Status> {
static absl::StatusOr<T> Cast(absl::Status&& t) { return std::move(t); }

@ -48,7 +48,7 @@ class ForEach {
typename PollTraits<decltype(std::declval<ReaderNext>()())>::Type;
using ReaderResultValue = typename ReaderResult::value_type;
using ActionFactory =
promise_detail::PromiseFactory<ReaderResultValue, Action>;
promise_detail::RepeatedPromiseFactory<ReaderResultValue, Action>;
using ActionPromise = typename ActionFactory::Promise;
public:
@ -90,7 +90,7 @@ class ForEach {
auto r = reader_next();
if (auto* p = absl::get_if<kPollReadyIdx>(&r)) {
if (p->has_value()) {
auto action = self->action_factory_.Repeated(std::move(**p));
auto action = self->action_factory_.Make(std::move(**p));
return (*this)(self->state_.template emplace<InAction>(
std::move(action), std::move(*p)));
} else {

@ -34,10 +34,10 @@ template <typename CallPoll, typename T, typename F>
typename CallPoll::PollResult ChooseIf(CallPoll call_poll, bool result,
T* if_true, F* if_false) {
if (result) {
auto promise = if_true->Once();
auto promise = if_true->Make();
return call_poll(promise);
} else {
auto promise = if_false->Once();
auto promise = if_false->Make();
return call_poll(promise);
}
}
@ -49,10 +49,10 @@ typename CallPoll::PollResult ChooseIf(CallPoll call_poll,
if (!result.ok()) {
return typename CallPoll::PollResult(result.status());
} else if (*result) {
auto promise = if_true->Once();
auto promise = if_true->Make();
return call_poll(promise);
} else {
auto promise = if_false->Once();
auto promise = if_false->Make();
return call_poll(promise);
}
}
@ -60,8 +60,8 @@ typename CallPoll::PollResult ChooseIf(CallPoll call_poll,
template <typename C, typename T, typename F>
class If {
private:
using TrueFactory = promise_detail::PromiseFactory<void, T>;
using FalseFactory = promise_detail::PromiseFactory<void, F>;
using TrueFactory = promise_detail::OncePromiseFactory<void, T>;
using FalseFactory = promise_detail::OncePromiseFactory<void, F>;
using ConditionPromise = PromiseLike<C>;
using TruePromise = typename TrueFactory::Promise;
using FalsePromise = typename FalseFactory::Promise;

@ -76,14 +76,14 @@ struct LoopTraits<absl::StatusOr<LoopCtl<absl::Status>>> {
template <typename F>
class Loop {
private:
using Factory = promise_detail::PromiseFactory<void, F>;
using PromiseType = decltype(std::declval<Factory>().Repeated());
using Factory = promise_detail::RepeatedPromiseFactory<void, F>;
using PromiseType = decltype(std::declval<Factory>().Make());
using PromiseResult = typename PromiseType::Result;
public:
using Result = typename LoopTraits<PromiseResult>::Result;
explicit Loop(F f) : factory_(std::move(f)), promise_(factory_.Repeated()) {}
explicit Loop(F f) : factory_(std::move(f)), promise_(factory_.Make()) {}
~Loop() { promise_.~PromiseType(); }
Loop(Loop&& loop) noexcept
@ -104,7 +104,7 @@ class Loop {
auto lc = LoopTraits<PromiseResult>::ToLoopCtl(*p);
if (absl::holds_alternative<Continue>(lc)) {
promise_.~PromiseType();
new (&promise_) PromiseType(factory_.Repeated());
new (&promise_) PromiseType(factory_.Make());
continue;
}
// - otherwise there's our result... return it out.

@ -0,0 +1,87 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_PROMISE_MAP_PIPE_H
#define GRPC_CORE_LIB_PROMISE_MAP_PIPE_H
#include <grpc/support/port_platform.h>
#include "absl/status/status.h"
#include "src/core/lib/promise/detail/promise_factory.h"
#include "src/core/lib/promise/for_each.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/pipe.h"
namespace grpc_core {
// Apply a (possibly async) mapping function to src, and output into dst.
//
// In psuedo-code:
// for each element in wait_for src.Next:
// x = wait_for filter_factory(element)
// wait_for dst.Push(x)
template <typename T, typename Filter>
auto MapPipe(PipeReceiver<T> src, PipeSender<T> dst, Filter filter_factory) {
return ForEach(
std::move(src),
[filter_factory = promise_detail::RepeatedPromiseFactory<T, Filter>(
std::move(filter_factory)),
dst = std::move(dst)](T t) mutable {
return TrySeq(filter_factory.Make(std::move(t)), [&dst](T t) {
return Map(dst.Push(std::move(t)), [](bool successful_push) {
if (successful_push) {
return absl::OkStatus();
}
return absl::CancelledError();
});
});
});
}
// Helper to intecept a pipe and apply a mapping function.
// Each of the `Intercept` constructors will take a PipeSender or PipeReceiver,
// construct a new pipe, and then replace the passed in pipe with its new end.
// In this way it can interject logic per-element.
// Next, the TakeAndRun function will return a promise that can be run to apply
// a mapping promise to each element of the pipe.
template <typename T>
class PipeMapper {
public:
static PipeMapper Intercept(PipeSender<T>& intercept_sender) {
PipeMapper<T> r;
r.interceptor_.sender.Swap(&intercept_sender);
return r;
}
static PipeMapper Intercept(PipeReceiver<T>& intercept_receiver) {
PipeMapper<T> r;
r.interceptor_.receiver.Swap(&intercept_receiver);
return r;
}
template <typename Filter>
auto TakeAndRun(Filter filter) {
return MapPipe(std::move(interceptor_.receiver),
std::move(interceptor_.sender), std::move(filter));
}
private:
PipeMapper() = default;
Pipe<T> interceptor_;
};
} // namespace grpc_core
#endif // GRPC_CORE_LIB_PROMISE_MAP_PIPE_H

@ -0,0 +1,19 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/promise/pipe.h"
grpc_core::DebugOnlyTraceFlag grpc_trace_promise_pipe(false, "promise_pipe");

@ -19,18 +19,24 @@
#include <stdint.h>
#include <string>
#include <utility>
#include "absl/strings/str_cat.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/intra_activity_waiter.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/resource_quota/arena.h"
extern grpc_core::DebugOnlyTraceFlag grpc_trace_promise_pipe;
namespace grpc_core {
namespace pipe_detail {
@ -102,6 +108,9 @@ class Center {
// Add one ref to the send side of this object, and return this.
Center* RefSend() {
if (grpc_trace_promise_pipe.enabled()) {
gpr_log(GPR_INFO, "%s", DebugOpString("RefSend").c_str());
}
send_refs_++;
GPR_ASSERT(send_refs_ != 0);
return this;
@ -109,6 +118,9 @@ class Center {
// Add one ref to the recv side of this object, and return this.
Center* RefRecv() {
if (grpc_trace_promise_pipe.enabled()) {
gpr_log(GPR_INFO, "%s", DebugOpString("RefRecv").c_str());
}
recv_refs_++;
GPR_ASSERT(recv_refs_ != 0);
return this;
@ -118,6 +130,9 @@ class Center {
// If no send refs remain, wake due to send closure
// If no refs remain, destroy this object
void UnrefSend() {
if (grpc_trace_promise_pipe.enabled()) {
gpr_log(GPR_INFO, "%s", DebugOpString("UnrefSend").c_str());
}
GPR_DEBUG_ASSERT(send_refs_ > 0);
send_refs_--;
if (0 == send_refs_) {
@ -133,6 +148,9 @@ class Center {
// If no recv refs remain, wake due to recv closure
// If no refs remain, destroy this object
void UnrefRecv() {
if (grpc_trace_promise_pipe.enabled()) {
gpr_log(GPR_INFO, "%s", DebugOpString("UnrefRecv").c_str());
}
GPR_DEBUG_ASSERT(recv_refs_ > 0);
recv_refs_--;
if (0 == recv_refs_) {
@ -151,6 +169,9 @@ class Center {
// Return true if the value was pushed.
// Return false if the recv end is closed.
Poll<bool> Push(T* value) {
if (grpc_trace_promise_pipe.enabled()) {
gpr_log(GPR_INFO, "%s", DebugOpString("Push").c_str());
}
GPR_DEBUG_ASSERT(send_refs_ != 0);
if (recv_refs_ == 0) return false;
if (value_state_ != ValueState::kEmpty) return on_empty_.pending();
@ -161,6 +182,9 @@ class Center {
}
Poll<bool> PollAck() {
if (grpc_trace_promise_pipe.enabled()) {
gpr_log(GPR_INFO, "%s", DebugOpString("PollAck").c_str());
}
GPR_DEBUG_ASSERT(send_refs_ != 0);
if (recv_refs_ == 0) return value_state_ == ValueState::kAcked;
if (value_state_ != ValueState::kAcked) return on_empty_.pending();
@ -173,6 +197,9 @@ class Center {
// Return the value if one was retrieved.
// Return nullopt if the send end is closed and no value had been pushed.
Poll<NextResult<T>> Next() {
if (grpc_trace_promise_pipe.enabled()) {
gpr_log(GPR_INFO, "%s", DebugOpString("Next").c_str());
}
GPR_DEBUG_ASSERT(recv_refs_ != 0);
if (value_state_ != ValueState::kReady) {
if (send_refs_ == 0) return NextResult<T>(nullptr);
@ -182,6 +209,9 @@ class Center {
}
void AckNext() {
if (grpc_trace_promise_pipe.enabled()) {
gpr_log(GPR_INFO, "%s", DebugOpString("AckNext").c_str());
}
GPR_DEBUG_ASSERT(value_state_ == ValueState::kReady);
value_state_ = ValueState::kAcked;
on_empty_.Wake();
@ -192,6 +222,15 @@ class Center {
const T& value() const { return value_; }
private:
std::string DebugTag() {
return absl::StrCat(Activity::current()->DebugTag(), "PIPE[0x",
reinterpret_cast<uintptr_t>(this), "]: ");
}
std::string DebugOpString(std::string op) {
return absl::StrCat(DebugTag(), op, " send_refs=", send_refs_,
" recv_refs=", recv_refs_,
" value_state=", ValueStateName(value_state_));
}
void ResetValue() {
// Fancy dance to move out of value in the off chance that we reclaim some
// memory earlier.
@ -263,6 +302,8 @@ class PipeSender {
if (auto* center = std::exchange(center_, nullptr)) center->UnrefSend();
}
void Swap(PipeSender<T>* other) { std::swap(center_, other->center_); }
// Send a single message along the pipe.
// Returns a promise that will resolve to a bool - true if the message was
// sent, false if it could never be sent. Blocks the promise until the
@ -297,6 +338,8 @@ class PipeReceiver {
if (center_ != nullptr) center_->UnrefRecv();
}
void Swap(PipeReceiver<T>* other) { std::swap(center_, other->center_); }
// Receive a single message from the pipe.
// Returns a promise that will resolve to an optional<T> - with a value if a
// message was received, or no value if the other end of the pipe was closed.

@ -32,13 +32,11 @@ struct SeqTraits {
using UnwrappedType = T;
using WrappedType = T;
template <typename Next>
static auto CallFactory(Next* next, T&& value)
-> decltype(next->Once(std::forward<T>(value))) {
return next->Once(std::forward<T>(value));
static auto CallFactory(Next* next, T&& value) {
return next->Make(std::forward<T>(value));
}
template <typename F, typename Elem>
static auto CallSeqFactory(F& f, Elem&& elem, T&& value)
-> decltype(f(std::forward<Elem>(elem), std::forward<T>(value))) {
static auto CallSeqFactory(F& f, Elem&& elem, T&& value) {
return f(std::forward<Elem>(elem), std::forward<T>(value));
}
template <typename Result, typename PriorResult, typename RunNext>

@ -37,9 +37,8 @@ struct TrySeqTraitsWithSfinae {
using UnwrappedType = T;
using WrappedType = absl::StatusOr<T>;
template <typename Next>
static auto CallFactory(Next* next, T&& value)
-> decltype(next->Once(std::forward<T>(value))) {
return next->Once(std::forward<T>(value));
static auto CallFactory(Next* next, T&& value) {
return next->Make(std::forward<T>(value));
}
template <typename F, typename Elem>
static auto CallSeqFactory(F& f, Elem&& elem, T&& value)
@ -57,9 +56,8 @@ struct TrySeqTraitsWithSfinae<absl::StatusOr<T>> {
using UnwrappedType = T;
using WrappedType = absl::StatusOr<T>;
template <typename Next>
static auto CallFactory(Next* next, absl::StatusOr<T>&& status)
-> decltype(next->Once(std::move(*status))) {
return next->Once(std::move(*status));
static auto CallFactory(Next* next, absl::StatusOr<T>&& status) {
return next->Make(std::move(*status));
}
template <typename F, typename Elem>
static auto CallSeqFactory(F& f, Elem&& elem, absl::StatusOr<T> value)
@ -84,8 +82,8 @@ struct TrySeqTraitsWithSfinae<
using UnwrappedType = void;
using WrappedType = T;
template <typename Next>
static auto CallFactory(Next* next, T&&) -> decltype(next->Once()) {
return next->Once();
static auto CallFactory(Next* next, T&&) {
return next->Make();
}
template <typename Result, typename RunNext>
static Poll<Result> CheckResultAndRunNext(T prior, RunNext run_next) {
@ -98,9 +96,8 @@ struct TrySeqTraitsWithSfinae<absl::Status> {
using UnwrappedType = void;
using WrappedType = absl::Status;
template <typename Next>
static auto CallFactory(Next* next, absl::Status&&)
-> decltype(next->Once()) {
return next->Once();
static auto CallFactory(Next* next, absl::Status&&) {
return next->Make();
}
template <typename Result, typename RunNext>
static Poll<Result> CheckResultAndRunNext(absl::Status prior,

@ -607,6 +607,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/load_balancing/lb_policy_registry.cc',
'src/core/lib/matchers/matchers.cc',
'src/core/lib/promise/activity.cc',
'src/core/lib/promise/pipe.cc',
'src/core/lib/promise/sleep.cc',
'src/core/lib/resolver/resolver.cc',
'src/core/lib/resolver/resolver_registry.cc',

@ -311,6 +311,28 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "map_pipe_test",
srcs = ["map_pipe_test.cc"],
external_deps = ["gtest"],
language = "c++",
tags = ["promise_test"],
uses_event_engine = False,
uses_polling = False,
deps = [
"test_wakeup_schedulers",
"//src/core:for_each",
"//src/core:join",
"//src/core:map",
"//src/core:map_pipe",
"//src/core:observable",
"//src/core:pipe",
"//src/core:resource_quota",
"//src/core:seq",
"//src/core:try_seq",
],
)
grpc_cc_test(
name = "pipe_test",
srcs = ["pipe_test.cc"],

@ -0,0 +1,153 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/promise/map_pipe.h"
#include <stdint.h>
#include <memory>
#include <utility>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/memory_allocator.h>
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/detail/basic_seq.h"
#include "src/core/lib/promise/for_each.h"
#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "test/core/promise/test_wakeup_schedulers.h"
using testing::Mock;
using testing::MockFunction;
using testing::StrictMock;
namespace grpc_core {
static auto* g_memory_allocator = new MemoryAllocator(
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"));
template <typename T>
class Delayed {
public:
explicit Delayed(T x) : x_(x) {}
Poll<T> operator()() {
Activity::current()->ForceImmediateRepoll();
++polls_;
if (polls_ == 10) return std::move(x_);
return Pending();
}
private:
int polls_ = 0;
T x_;
};
TEST(MapPipeTest, SendThriceWithPipeInterceptingReceive) {
int num_received = 0;
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[&num_received] {
Pipe<int> pipe;
auto filter =
PipeMapper<int>::Intercept(pipe.receiver).TakeAndRun([](int x) {
return Delayed<int>(x + 1);
});
auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
return Map(
Join(
std::move(filter),
// Push 3 things into a pipe -- 0, 1, then 2 -- then close.
Seq((*sender)->Push(0), [sender] { return (*sender)->Push(1); },
[sender] { return (*sender)->Push(2); },
[sender] {
sender->reset();
return absl::OkStatus();
}),
// Use a ForEach loop to read them out and verify all values are
// seen (but with 1 added).
ForEach(std::move(pipe.receiver),
[&num_received](int i) {
num_received++;
EXPECT_EQ(num_received, i);
return absl::OkStatus();
})),
JustElem<2>());
},
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); },
MakeScopedArena(1024, g_memory_allocator));
Mock::VerifyAndClearExpectations(&on_done);
EXPECT_EQ(num_received, 3);
}
TEST(MapPipeTest, SendThriceWithPipeInterceptingSend) {
int num_received = 0;
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[&num_received] {
Pipe<int> pipe;
auto filter =
PipeMapper<int>::Intercept(pipe.sender).TakeAndRun([](int x) {
return Delayed<int>(x + 1);
});
auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
return Map(
Join(
std::move(filter),
// Push 3 things into a pipe -- 0, 1, then 2 -- then close.
Seq((*sender)->Push(0), [sender] { return (*sender)->Push(1); },
[sender] { return (*sender)->Push(2); },
[sender] {
sender->reset();
return absl::OkStatus();
}),
// Use a ForEach loop to read them out and verify all values are
// seen (but with 1 added).
ForEach(std::move(pipe.receiver),
[&num_received](int i) {
num_received++;
EXPECT_EQ(num_received, i);
return absl::OkStatus();
})),
JustElem<2>());
},
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); },
MakeScopedArena(1024, g_memory_allocator));
Mock::VerifyAndClearExpectations(&on_done);
EXPECT_EQ(num_received, 3);
}
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -28,31 +28,36 @@ namespace promise_detail {
namespace testing {
template <typename Arg, typename F>
PromiseFactory<Arg, F> MakeFactory(F f) {
return PromiseFactory<Arg, F>(std::move(f));
auto MakeOnceFactory(F f) {
return OncePromiseFactory<Arg, F>(std::move(f));
}
template <typename Arg, typename F>
auto MakeRepeatedFactory(F f) {
return RepeatedPromiseFactory<Arg, F>(std::move(f));
}
TEST(AdaptorTest, FactoryFromPromise) {
EXPECT_EQ(
MakeFactory<void>([]() { return Poll<int>(Poll<int>(42)); }).Once()(),
Poll<int>(42));
EXPECT_EQ(
MakeFactory<void>([]() { return Poll<int>(Poll<int>(42)); }).Repeated()(),
MakeOnceFactory<void>([]() { return Poll<int>(Poll<int>(42)); }).Make()(),
Poll<int>(42));
EXPECT_EQ(MakeFactory<void>(Promise<int>([]() {
EXPECT_EQ(MakeRepeatedFactory<void>([]() {
return Poll<int>(Poll<int>(42));
}).Make()(),
Poll<int>(42));
EXPECT_EQ(MakeOnceFactory<void>(Promise<int>([]() {
return Poll<int>(Poll<int>(42));
})).Once()(),
})).Make()(),
Poll<int>(42));
EXPECT_EQ(MakeFactory<void>(Promise<int>([]() {
EXPECT_EQ(MakeRepeatedFactory<void>(Promise<int>([]() {
return Poll<int>(Poll<int>(42));
})).Repeated()(),
})).Make()(),
Poll<int>(42));
}
TEST(AdaptorTest, FactoryFromBindFrontPromise) {
EXPECT_EQ(MakeFactory<void>(
EXPECT_EQ(MakeOnceFactory<void>(
absl::bind_front([](int i) { return Poll<int>(i); }, 42))
.Once()(),
.Make()(),
Poll<int>(42));
}

@ -2277,6 +2277,7 @@ src/core/lib/promise/intra_activity_waiter.h \
src/core/lib/promise/latch.h \
src/core/lib/promise/loop.h \
src/core/lib/promise/map.h \
src/core/lib/promise/pipe.cc \
src/core/lib/promise/pipe.h \
src/core/lib/promise/poll.h \
src/core/lib/promise/promise.h \

@ -2068,6 +2068,7 @@ src/core/lib/promise/intra_activity_waiter.h \
src/core/lib/promise/latch.h \
src/core/lib/promise/loop.h \
src/core/lib/promise/map.h \
src/core/lib/promise/pipe.cc \
src/core/lib/promise/pipe.h \
src/core/lib/promise/poll.h \
src/core/lib/promise/promise.h \

@ -4481,6 +4481,30 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "map_pipe_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save