diff --git a/CMakeLists.txt b/CMakeLists.txt index c966ec332c8..b9e7e45885e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1024,6 +1024,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx lock_free_event_test) add_dependencies(buildtests_cxx log_test) add_dependencies(buildtests_cxx loop_test) + add_dependencies(buildtests_cxx map_pipe_test) add_dependencies(buildtests_cxx match_test) add_dependencies(buildtests_cxx matchers_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) @@ -2192,6 +2193,7 @@ add_library(grpc src/core/lib/load_balancing/lb_policy_registry.cc src/core/lib/matchers/matchers.cc src/core/lib/promise/activity.cc + src/core/lib/promise/pipe.cc src/core/lib/promise/sleep.cc src/core/lib/resolver/resolver.cc src/core/lib/resolver/resolver_registry.cc @@ -2841,6 +2843,7 @@ add_library(grpc_unsecure src/core/lib/load_balancing/lb_policy.cc src/core/lib/load_balancing/lb_policy_registry.cc src/core/lib/promise/activity.cc + src/core/lib/promise/pipe.cc src/core/lib/promise/sleep.cc src/core/lib/resolver/resolver.cc src/core/lib/resolver/resolver_registry.cc @@ -10244,6 +10247,7 @@ add_executable(for_each_test src/core/lib/iomgr/executor.cc src/core/lib/iomgr/iomgr_internal.cc src/core/lib/promise/activity.cc + src/core/lib/promise/pipe.cc src/core/lib/resource_quota/arena.cc src/core/lib/resource_quota/memory_quota.cc src/core/lib/resource_quota/periodic_update.cc @@ -13295,6 +13299,73 @@ target_link_libraries(loop_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(map_pipe_test + src/core/ext/upb-generated/google/protobuf/any.upb.c + src/core/ext/upb-generated/google/rpc/status.upb.c + src/core/lib/debug/trace.cc + src/core/lib/event_engine/memory_allocator.cc + src/core/lib/experiments/config.cc + src/core/lib/experiments/experiments.cc + src/core/lib/gprpp/status_helper.cc + src/core/lib/gprpp/time.cc + src/core/lib/iomgr/combiner.cc + src/core/lib/iomgr/error.cc + src/core/lib/iomgr/exec_ctx.cc + src/core/lib/iomgr/executor.cc + src/core/lib/iomgr/iomgr_internal.cc + src/core/lib/promise/activity.cc + src/core/lib/promise/pipe.cc + src/core/lib/resource_quota/arena.cc + src/core/lib/resource_quota/memory_quota.cc + src/core/lib/resource_quota/periodic_update.cc + src/core/lib/resource_quota/resource_quota.cc + src/core/lib/resource_quota/thread_quota.cc + src/core/lib/resource_quota/trace.cc + src/core/lib/slice/percent_encoding.cc + src/core/lib/slice/slice.cc + src/core/lib/slice/slice_string_helpers.cc + test/core/promise/map_pipe_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(map_pipe_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(map_pipe_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + absl::flat_hash_set + absl::any_invocable + absl::function_ref + absl::hash + absl::type_traits + absl::statusor + absl::utility + gpr + upb +) + + endif() if(gRPC_BUILD_TESTS) @@ -14578,6 +14649,7 @@ add_executable(pipe_test src/core/lib/iomgr/executor.cc src/core/lib/iomgr/iomgr_internal.cc src/core/lib/promise/activity.cc + src/core/lib/promise/pipe.cc src/core/lib/resource_quota/arena.cc src/core/lib/resource_quota/memory_quota.cc src/core/lib/resource_quota/periodic_update.cc diff --git a/Makefile b/Makefile index bb8731ea6b8..31eedc9e018 100644 --- a/Makefile +++ b/Makefile @@ -1508,6 +1508,7 @@ LIBGRPC_SRC = \ src/core/lib/load_balancing/lb_policy_registry.cc \ src/core/lib/matchers/matchers.cc \ src/core/lib/promise/activity.cc \ + src/core/lib/promise/pipe.cc \ src/core/lib/promise/sleep.cc \ src/core/lib/resolver/resolver.cc \ src/core/lib/resolver/resolver_registry.cc \ @@ -2020,6 +2021,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/load_balancing/lb_policy.cc \ src/core/lib/load_balancing/lb_policy_registry.cc \ src/core/lib/promise/activity.cc \ + src/core/lib/promise/pipe.cc \ src/core/lib/promise/sleep.cc \ src/core/lib/resolver/resolver.cc \ src/core/lib/resolver/resolver_registry.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index a97eb85ddae..8da80c4a287 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -1588,6 +1588,7 @@ libs: - src/core/lib/load_balancing/lb_policy_registry.cc - src/core/lib/matchers/matchers.cc - src/core/lib/promise/activity.cc + - src/core/lib/promise/pipe.cc - src/core/lib/promise/sleep.cc - src/core/lib/resolver/resolver.cc - src/core/lib/resolver/resolver_registry.cc @@ -2488,6 +2489,7 @@ libs: - src/core/lib/load_balancing/lb_policy.cc - src/core/lib/load_balancing/lb_policy_registry.cc - src/core/lib/promise/activity.cc + - src/core/lib/promise/pipe.cc - src/core/lib/promise/sleep.cc - src/core/lib/resolver/resolver.cc - src/core/lib/resolver/resolver_registry.cc @@ -6458,6 +6460,7 @@ targets: - src/core/lib/iomgr/executor.cc - src/core/lib/iomgr/iomgr_internal.cc - src/core/lib/promise/activity.cc + - src/core/lib/promise/pipe.cc - src/core/lib/resource_quota/arena.cc - src/core/lib/resource_quota/memory_quota.cc - src/core/lib/resource_quota/periodic_update.cc @@ -7822,6 +7825,103 @@ targets: - absl/types:variant - absl/utility:utility uses_polling: false +- name: map_pipe_test + gtest: true + build: test + language: c++ + headers: + - src/core/ext/upb-generated/google/protobuf/any.upb.h + - src/core/ext/upb-generated/google/rpc/status.upb.h + - src/core/lib/debug/trace.h + - src/core/lib/experiments/config.h + - src/core/lib/experiments/experiments.h + - src/core/lib/gpr/spinlock.h + - src/core/lib/gprpp/atomic_utils.h + - src/core/lib/gprpp/bitset.h + - src/core/lib/gprpp/cpp_impl_of.h + - src/core/lib/gprpp/debug_location.h + - src/core/lib/gprpp/manual_constructor.h + - src/core/lib/gprpp/orphanable.h + - src/core/lib/gprpp/ref_counted.h + - src/core/lib/gprpp/ref_counted_ptr.h + - src/core/lib/gprpp/status_helper.h + - src/core/lib/gprpp/time.h + - src/core/lib/iomgr/closure.h + - src/core/lib/iomgr/combiner.h + - src/core/lib/iomgr/error.h + - src/core/lib/iomgr/exec_ctx.h + - src/core/lib/iomgr/executor.h + - src/core/lib/iomgr/iomgr_internal.h + - src/core/lib/promise/activity.h + - src/core/lib/promise/context.h + - src/core/lib/promise/detail/basic_join.h + - src/core/lib/promise/detail/basic_seq.h + - src/core/lib/promise/detail/promise_factory.h + - src/core/lib/promise/detail/promise_like.h + - src/core/lib/promise/detail/status.h + - src/core/lib/promise/exec_ctx_wakeup_scheduler.h + - src/core/lib/promise/for_each.h + - src/core/lib/promise/intra_activity_waiter.h + - src/core/lib/promise/join.h + - src/core/lib/promise/loop.h + - src/core/lib/promise/map.h + - src/core/lib/promise/map_pipe.h + - src/core/lib/promise/observable.h + - src/core/lib/promise/pipe.h + - src/core/lib/promise/poll.h + - src/core/lib/promise/race.h + - src/core/lib/promise/seq.h + - src/core/lib/promise/try_seq.h + - src/core/lib/promise/wait_set.h + - src/core/lib/resource_quota/arena.h + - src/core/lib/resource_quota/memory_quota.h + - src/core/lib/resource_quota/periodic_update.h + - src/core/lib/resource_quota/resource_quota.h + - src/core/lib/resource_quota/thread_quota.h + - src/core/lib/resource_quota/trace.h + - src/core/lib/slice/percent_encoding.h + - src/core/lib/slice/slice.h + - src/core/lib/slice/slice_internal.h + - src/core/lib/slice/slice_refcount.h + - src/core/lib/slice/slice_string_helpers.h + - test/core/promise/test_wakeup_schedulers.h + src: + - src/core/ext/upb-generated/google/protobuf/any.upb.c + - src/core/ext/upb-generated/google/rpc/status.upb.c + - src/core/lib/debug/trace.cc + - src/core/lib/event_engine/memory_allocator.cc + - src/core/lib/experiments/config.cc + - src/core/lib/experiments/experiments.cc + - src/core/lib/gprpp/status_helper.cc + - src/core/lib/gprpp/time.cc + - src/core/lib/iomgr/combiner.cc + - src/core/lib/iomgr/error.cc + - src/core/lib/iomgr/exec_ctx.cc + - src/core/lib/iomgr/executor.cc + - src/core/lib/iomgr/iomgr_internal.cc + - src/core/lib/promise/activity.cc + - src/core/lib/promise/pipe.cc + - src/core/lib/resource_quota/arena.cc + - src/core/lib/resource_quota/memory_quota.cc + - src/core/lib/resource_quota/periodic_update.cc + - src/core/lib/resource_quota/resource_quota.cc + - src/core/lib/resource_quota/thread_quota.cc + - src/core/lib/resource_quota/trace.cc + - src/core/lib/slice/percent_encoding.cc + - src/core/lib/slice/slice.cc + - src/core/lib/slice/slice_string_helpers.cc + - test/core/promise/map_pipe_test.cc + deps: + - absl/container:flat_hash_set + - absl/functional:any_invocable + - absl/functional:function_ref + - absl/hash:hash + - absl/meta:type_traits + - absl/status:statusor + - absl/utility:utility + - gpr + - upb + uses_polling: false - name: match_test gtest: true build: test @@ -8437,6 +8537,7 @@ targets: - src/core/lib/iomgr/executor.cc - src/core/lib/iomgr/iomgr_internal.cc - src/core/lib/promise/activity.cc + - src/core/lib/promise/pipe.cc - src/core/lib/resource_quota/arena.cc - src/core/lib/resource_quota/memory_quota.cc - src/core/lib/resource_quota/periodic_update.cc diff --git a/config.m4 b/config.m4 index a174c65f06d..d95d861b2fc 100644 --- a/config.m4 +++ b/config.m4 @@ -632,6 +632,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/load_balancing/lb_policy_registry.cc \ src/core/lib/matchers/matchers.cc \ src/core/lib/promise/activity.cc \ + src/core/lib/promise/pipe.cc \ src/core/lib/promise/sleep.cc \ src/core/lib/resolver/resolver.cc \ src/core/lib/resolver/resolver_registry.cc \ diff --git a/config.w32 b/config.w32 index 761adf04f16..a9e5bfc8790 100644 --- a/config.w32 +++ b/config.w32 @@ -598,6 +598,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\load_balancing\\lb_policy_registry.cc " + "src\\core\\lib\\matchers\\matchers.cc " + "src\\core\\lib\\promise\\activity.cc " + + "src\\core\\lib\\promise\\pipe.cc " + "src\\core\\lib\\promise\\sleep.cc " + "src\\core\\lib\\resolver\\resolver.cc " + "src\\core\\lib\\resolver\\resolver_registry.cc " + diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 8e81682d138..62b53d25d7e 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1381,6 +1381,7 @@ Pod::Spec.new do |s| 'src/core/lib/promise/latch.h', 'src/core/lib/promise/loop.h', 'src/core/lib/promise/map.h', + 'src/core/lib/promise/pipe.cc', 'src/core/lib/promise/pipe.h', 'src/core/lib/promise/poll.h', 'src/core/lib/promise/promise.h', diff --git a/grpc.gemspec b/grpc.gemspec index 389d6772292..8a3f357dea7 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -1292,6 +1292,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/promise/latch.h ) s.files += %w( src/core/lib/promise/loop.h ) s.files += %w( src/core/lib/promise/map.h ) + s.files += %w( src/core/lib/promise/pipe.cc ) s.files += %w( src/core/lib/promise/pipe.h ) s.files += %w( src/core/lib/promise/poll.h ) s.files += %w( src/core/lib/promise/promise.h ) diff --git a/grpc.gyp b/grpc.gyp index 3e07040d2cf..beb5bf85d39 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -922,6 +922,7 @@ 'src/core/lib/load_balancing/lb_policy_registry.cc', 'src/core/lib/matchers/matchers.cc', 'src/core/lib/promise/activity.cc', + 'src/core/lib/promise/pipe.cc', 'src/core/lib/promise/sleep.cc', 'src/core/lib/resolver/resolver.cc', 'src/core/lib/resolver/resolver_registry.cc', @@ -1381,6 +1382,7 @@ 'src/core/lib/load_balancing/lb_policy.cc', 'src/core/lib/load_balancing/lb_policy_registry.cc', 'src/core/lib/promise/activity.cc', + 'src/core/lib/promise/pipe.cc', 'src/core/lib/promise/sleep.cc', 'src/core/lib/resolver/resolver.cc', 'src/core/lib/resolver/resolver_registry.cc', diff --git a/package.xml b/package.xml index d8acf73610e..5c1b2e85df0 100644 --- a/package.xml +++ b/package.xml @@ -1274,6 +1274,7 @@ + diff --git a/src/core/BUILD b/src/core/BUILD index c7f59b0c728..9d59c5938eb 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -361,6 +361,22 @@ grpc_cc_library( deps = ["//:gpr_platform"], ) +grpc_cc_library( + name = "map_pipe", + external_deps = ["absl/status"], + language = "c++", + public_hdrs = [ + "lib/promise/map_pipe.h", + ], + deps = [ + "for_each", + "map", + "pipe", + "promise_factory", + "//:gpr_platform", + ], +) + grpc_cc_library( name = "call_push_pull", hdrs = ["lib/promise/call_push_pull.h"], @@ -733,20 +749,26 @@ grpc_cc_library( grpc_cc_library( name = "pipe", + srcs = [ + "lib/promise/pipe.cc", + ], + hdrs = [ + "lib/promise/pipe.h", + ], external_deps = [ + "absl/strings", "absl/types:optional", "absl/types:variant", ], language = "c++", - public_hdrs = [ - "lib/promise/pipe.h", - ], deps = [ + "activity", "arena", "context", "intra_activity_waiter", "poll", "//:gpr", + "//:grpc_trace", ], ) diff --git a/src/core/lib/promise/activity.h b/src/core/lib/promise/activity.h index 090f89a93f5..d0825c59b55 100644 --- a/src/core/lib/promise/activity.h +++ b/src/core/lib/promise/activity.h @@ -405,7 +405,7 @@ template class PromiseActivity final : public FreestandingActivity, private ActivityContexts { public: - using Factory = PromiseFactory; + using Factory = OncePromiseFactory; using ResultType = typename Factory::Promise::Result; PromiseActivity(F promise_factory, WakeupScheduler wakeup_scheduler, @@ -533,7 +533,7 @@ class PromiseActivity final : public FreestandingActivity, ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { ScopedActivity scoped_activity(this); ScopedContext contexts(this); - Construct(&promise_holder_.promise, promise_factory.Once()); + Construct(&promise_holder_.promise, promise_factory.Make()); return StepLoop(); } diff --git a/src/core/lib/promise/detail/basic_seq.h b/src/core/lib/promise/detail/basic_seq.h index bf9d992bb8b..2c8487ca963 100644 --- a/src/core/lib/promise/detail/basic_seq.h +++ b/src/core/lib/promise/detail/basic_seq.h @@ -68,7 +68,7 @@ struct SeqStateTypes { // Wrap the factory callable in our factory wrapper to deal with common edge // cases. We use the 'unwrapped type' from the traits, so for instance, TrySeq // can pass back a T from a StatusOr. - using Next = promise_detail::PromiseFactory< + using Next = promise_detail::OncePromiseFactory< typename PromiseResultTraits::UnwrappedType, FNext>; }; diff --git a/src/core/lib/promise/detail/promise_factory.h b/src/core/lib/promise/detail/promise_factory.h index dd333670c96..f7a3f223a0d 100644 --- a/src/core/lib/promise/detail/promise_factory.h +++ b/src/core/lib/promise/detail/promise_factory.h @@ -74,6 +74,20 @@ struct ResultOfT>()(std::declval()...)); }; +template +struct ResultOfT>()( + std::declval()...))>> { + using T = decltype(std::declval>()(std::declval()...)); +}; + +template +struct ResultOfT>()( + std::declval()...))>> { + using T = decltype(std::declval>()(std::declval()...)); +}; + template using ResultOf = typename ResultOfT::T; @@ -127,6 +141,14 @@ PromiseFactoryImpl(F&& f, A&& arg) { return f(std::forward(arg)); } +// Given a callable(A) -> Promise, name it a PromiseFactory and use it. +template +absl::enable_if_t>::value, + PromiseLike()(std::declval()))>> +PromiseFactoryImpl(F& f, A&& arg) { + return f(std::forward(arg)); +} + // Given a callable() -> Promise, promote it to a // PromiseFactory(A) -> Promise by dropping the first argument. template @@ -145,7 +167,7 @@ PromiseFactoryImpl(F&& f) { }; template -class PromiseFactory { +class OncePromiseFactory { private: GPR_NO_UNIQUE_ADDRESS F f_; @@ -154,31 +176,57 @@ class PromiseFactory { using Promise = decltype(PromiseFactoryImpl(std::move(f_), std::declval())); - explicit PromiseFactory(F f) : f_(std::move(f)) {} + explicit OncePromiseFactory(F f) : f_(std::move(f)) {} - Promise Once(Arg&& a) { + Promise Make(Arg&& a) { return PromiseFactoryImpl(std::move(f_), std::forward(a)); } +}; - Promise Repeated(Arg&& a) const { +template +class OncePromiseFactory { + private: + GPR_NO_UNIQUE_ADDRESS F f_; + + public: + using Arg = void; + using Promise = decltype(PromiseFactoryImpl(std::move(f_))); + + explicit OncePromiseFactory(F f) : f_(std::move(f)) {} + + Promise Make() { return PromiseFactoryImpl(std::move(f_)); } +}; + +template +class RepeatedPromiseFactory { + private: + GPR_NO_UNIQUE_ADDRESS F f_; + + public: + using Arg = A; + using Promise = decltype(PromiseFactoryImpl(f_, std::declval())); + + explicit RepeatedPromiseFactory(F f) : f_(std::move(f)) {} + + Promise Make(Arg&& a) const { return PromiseFactoryImpl(f_, std::forward(a)); } + Promise Make(Arg&& a) { return PromiseFactoryImpl(f_, std::forward(a)); } }; template -class PromiseFactory { +class RepeatedPromiseFactory { private: GPR_NO_UNIQUE_ADDRESS F f_; public: using Arg = void; - using Promise = decltype(PromiseFactoryImpl(std::move(f_))); - - explicit PromiseFactory(F f) : f_(std::move(f)) {} + using Promise = decltype(PromiseFactoryImpl(f_)); - Promise Once() { return PromiseFactoryImpl(std::move(f_)); } + explicit RepeatedPromiseFactory(F f) : f_(std::move(f)) {} - Promise Repeated() const { return PromiseFactoryImpl(f_); } + Promise Make() const { return PromiseFactoryImpl(f_); } + Promise Make() { return PromiseFactoryImpl(f_); } }; } // namespace promise_detail diff --git a/src/core/lib/promise/detail/status.h b/src/core/lib/promise/detail/status.h index 6b71feb3e07..cfd969a0d38 100644 --- a/src/core/lib/promise/detail/status.h +++ b/src/core/lib/promise/detail/status.h @@ -53,6 +53,11 @@ struct StatusCastImpl { static To Cast(To&& t) { return std::move(t); } }; +template +struct StatusCastImpl { + static To Cast(const To& t) { return t; } +}; + template struct StatusCastImpl, absl::Status> { static absl::StatusOr Cast(absl::Status&& t) { return std::move(t); } diff --git a/src/core/lib/promise/for_each.h b/src/core/lib/promise/for_each.h index 6d59afb51d5..6ab06a68bbf 100644 --- a/src/core/lib/promise/for_each.h +++ b/src/core/lib/promise/for_each.h @@ -48,7 +48,7 @@ class ForEach { typename PollTraits()())>::Type; using ReaderResultValue = typename ReaderResult::value_type; using ActionFactory = - promise_detail::PromiseFactory; + promise_detail::RepeatedPromiseFactory; using ActionPromise = typename ActionFactory::Promise; public: @@ -90,7 +90,7 @@ class ForEach { auto r = reader_next(); if (auto* p = absl::get_if(&r)) { if (p->has_value()) { - auto action = self->action_factory_.Repeated(std::move(**p)); + auto action = self->action_factory_.Make(std::move(**p)); return (*this)(self->state_.template emplace( std::move(action), std::move(*p))); } else { diff --git a/src/core/lib/promise/if.h b/src/core/lib/promise/if.h index 89f11479b7a..d39fdfba0be 100644 --- a/src/core/lib/promise/if.h +++ b/src/core/lib/promise/if.h @@ -34,10 +34,10 @@ template typename CallPoll::PollResult ChooseIf(CallPoll call_poll, bool result, T* if_true, F* if_false) { if (result) { - auto promise = if_true->Once(); + auto promise = if_true->Make(); return call_poll(promise); } else { - auto promise = if_false->Once(); + auto promise = if_false->Make(); return call_poll(promise); } } @@ -49,10 +49,10 @@ typename CallPoll::PollResult ChooseIf(CallPoll call_poll, if (!result.ok()) { return typename CallPoll::PollResult(result.status()); } else if (*result) { - auto promise = if_true->Once(); + auto promise = if_true->Make(); return call_poll(promise); } else { - auto promise = if_false->Once(); + auto promise = if_false->Make(); return call_poll(promise); } } @@ -60,8 +60,8 @@ typename CallPoll::PollResult ChooseIf(CallPoll call_poll, template class If { private: - using TrueFactory = promise_detail::PromiseFactory; - using FalseFactory = promise_detail::PromiseFactory; + using TrueFactory = promise_detail::OncePromiseFactory; + using FalseFactory = promise_detail::OncePromiseFactory; using ConditionPromise = PromiseLike; using TruePromise = typename TrueFactory::Promise; using FalsePromise = typename FalseFactory::Promise; diff --git a/src/core/lib/promise/loop.h b/src/core/lib/promise/loop.h index 4a1c379354e..84874e13e63 100644 --- a/src/core/lib/promise/loop.h +++ b/src/core/lib/promise/loop.h @@ -76,14 +76,14 @@ struct LoopTraits>> { template class Loop { private: - using Factory = promise_detail::PromiseFactory; - using PromiseType = decltype(std::declval().Repeated()); + using Factory = promise_detail::RepeatedPromiseFactory; + using PromiseType = decltype(std::declval().Make()); using PromiseResult = typename PromiseType::Result; public: using Result = typename LoopTraits::Result; - explicit Loop(F f) : factory_(std::move(f)), promise_(factory_.Repeated()) {} + explicit Loop(F f) : factory_(std::move(f)), promise_(factory_.Make()) {} ~Loop() { promise_.~PromiseType(); } Loop(Loop&& loop) noexcept @@ -104,7 +104,7 @@ class Loop { auto lc = LoopTraits::ToLoopCtl(*p); if (absl::holds_alternative(lc)) { promise_.~PromiseType(); - new (&promise_) PromiseType(factory_.Repeated()); + new (&promise_) PromiseType(factory_.Make()); continue; } // - otherwise there's our result... return it out. diff --git a/src/core/lib/promise/map_pipe.h b/src/core/lib/promise/map_pipe.h new file mode 100644 index 00000000000..e7ef679d6ee --- /dev/null +++ b/src/core/lib/promise/map_pipe.h @@ -0,0 +1,87 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_PROMISE_MAP_PIPE_H +#define GRPC_CORE_LIB_PROMISE_MAP_PIPE_H + +#include + +#include "absl/status/status.h" + +#include "src/core/lib/promise/detail/promise_factory.h" +#include "src/core/lib/promise/for_each.h" +#include "src/core/lib/promise/map.h" +#include "src/core/lib/promise/pipe.h" + +namespace grpc_core { + +// Apply a (possibly async) mapping function to src, and output into dst. +// +// In psuedo-code: +// for each element in wait_for src.Next: +// x = wait_for filter_factory(element) +// wait_for dst.Push(x) +template +auto MapPipe(PipeReceiver src, PipeSender dst, Filter filter_factory) { + return ForEach( + std::move(src), + [filter_factory = promise_detail::RepeatedPromiseFactory( + std::move(filter_factory)), + dst = std::move(dst)](T t) mutable { + return TrySeq(filter_factory.Make(std::move(t)), [&dst](T t) { + return Map(dst.Push(std::move(t)), [](bool successful_push) { + if (successful_push) { + return absl::OkStatus(); + } + return absl::CancelledError(); + }); + }); + }); +} + +// Helper to intecept a pipe and apply a mapping function. +// Each of the `Intercept` constructors will take a PipeSender or PipeReceiver, +// construct a new pipe, and then replace the passed in pipe with its new end. +// In this way it can interject logic per-element. +// Next, the TakeAndRun function will return a promise that can be run to apply +// a mapping promise to each element of the pipe. +template +class PipeMapper { + public: + static PipeMapper Intercept(PipeSender& intercept_sender) { + PipeMapper r; + r.interceptor_.sender.Swap(&intercept_sender); + return r; + } + + static PipeMapper Intercept(PipeReceiver& intercept_receiver) { + PipeMapper r; + r.interceptor_.receiver.Swap(&intercept_receiver); + return r; + } + + template + auto TakeAndRun(Filter filter) { + return MapPipe(std::move(interceptor_.receiver), + std::move(interceptor_.sender), std::move(filter)); + } + + private: + PipeMapper() = default; + Pipe interceptor_; +}; + +} // namespace grpc_core + +#endif // GRPC_CORE_LIB_PROMISE_MAP_PIPE_H diff --git a/src/core/lib/promise/pipe.cc b/src/core/lib/promise/pipe.cc new file mode 100644 index 00000000000..cbb408bea19 --- /dev/null +++ b/src/core/lib/promise/pipe.cc @@ -0,0 +1,19 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/lib/promise/pipe.h" + +grpc_core::DebugOnlyTraceFlag grpc_trace_promise_pipe(false, "promise_pipe"); diff --git a/src/core/lib/promise/pipe.h b/src/core/lib/promise/pipe.h index d54e4b9ac9a..1a2067848b1 100644 --- a/src/core/lib/promise/pipe.h +++ b/src/core/lib/promise/pipe.h @@ -19,18 +19,24 @@ #include +#include #include +#include "absl/strings/str_cat.h" #include "absl/types/optional.h" #include "absl/types/variant.h" #include +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/context.h" #include "src/core/lib/promise/intra_activity_waiter.h" #include "src/core/lib/promise/poll.h" #include "src/core/lib/resource_quota/arena.h" +extern grpc_core::DebugOnlyTraceFlag grpc_trace_promise_pipe; + namespace grpc_core { namespace pipe_detail { @@ -102,6 +108,9 @@ class Center { // Add one ref to the send side of this object, and return this. Center* RefSend() { + if (grpc_trace_promise_pipe.enabled()) { + gpr_log(GPR_INFO, "%s", DebugOpString("RefSend").c_str()); + } send_refs_++; GPR_ASSERT(send_refs_ != 0); return this; @@ -109,6 +118,9 @@ class Center { // Add one ref to the recv side of this object, and return this. Center* RefRecv() { + if (grpc_trace_promise_pipe.enabled()) { + gpr_log(GPR_INFO, "%s", DebugOpString("RefRecv").c_str()); + } recv_refs_++; GPR_ASSERT(recv_refs_ != 0); return this; @@ -118,6 +130,9 @@ class Center { // If no send refs remain, wake due to send closure // If no refs remain, destroy this object void UnrefSend() { + if (grpc_trace_promise_pipe.enabled()) { + gpr_log(GPR_INFO, "%s", DebugOpString("UnrefSend").c_str()); + } GPR_DEBUG_ASSERT(send_refs_ > 0); send_refs_--; if (0 == send_refs_) { @@ -133,6 +148,9 @@ class Center { // If no recv refs remain, wake due to recv closure // If no refs remain, destroy this object void UnrefRecv() { + if (grpc_trace_promise_pipe.enabled()) { + gpr_log(GPR_INFO, "%s", DebugOpString("UnrefRecv").c_str()); + } GPR_DEBUG_ASSERT(recv_refs_ > 0); recv_refs_--; if (0 == recv_refs_) { @@ -151,6 +169,9 @@ class Center { // Return true if the value was pushed. // Return false if the recv end is closed. Poll Push(T* value) { + if (grpc_trace_promise_pipe.enabled()) { + gpr_log(GPR_INFO, "%s", DebugOpString("Push").c_str()); + } GPR_DEBUG_ASSERT(send_refs_ != 0); if (recv_refs_ == 0) return false; if (value_state_ != ValueState::kEmpty) return on_empty_.pending(); @@ -161,6 +182,9 @@ class Center { } Poll PollAck() { + if (grpc_trace_promise_pipe.enabled()) { + gpr_log(GPR_INFO, "%s", DebugOpString("PollAck").c_str()); + } GPR_DEBUG_ASSERT(send_refs_ != 0); if (recv_refs_ == 0) return value_state_ == ValueState::kAcked; if (value_state_ != ValueState::kAcked) return on_empty_.pending(); @@ -173,6 +197,9 @@ class Center { // Return the value if one was retrieved. // Return nullopt if the send end is closed and no value had been pushed. Poll> Next() { + if (grpc_trace_promise_pipe.enabled()) { + gpr_log(GPR_INFO, "%s", DebugOpString("Next").c_str()); + } GPR_DEBUG_ASSERT(recv_refs_ != 0); if (value_state_ != ValueState::kReady) { if (send_refs_ == 0) return NextResult(nullptr); @@ -182,6 +209,9 @@ class Center { } void AckNext() { + if (grpc_trace_promise_pipe.enabled()) { + gpr_log(GPR_INFO, "%s", DebugOpString("AckNext").c_str()); + } GPR_DEBUG_ASSERT(value_state_ == ValueState::kReady); value_state_ = ValueState::kAcked; on_empty_.Wake(); @@ -192,6 +222,15 @@ class Center { const T& value() const { return value_; } private: + std::string DebugTag() { + return absl::StrCat(Activity::current()->DebugTag(), "PIPE[0x", + reinterpret_cast(this), "]: "); + } + std::string DebugOpString(std::string op) { + return absl::StrCat(DebugTag(), op, " send_refs=", send_refs_, + " recv_refs=", recv_refs_, + " value_state=", ValueStateName(value_state_)); + } void ResetValue() { // Fancy dance to move out of value in the off chance that we reclaim some // memory earlier. @@ -263,6 +302,8 @@ class PipeSender { if (auto* center = std::exchange(center_, nullptr)) center->UnrefSend(); } + void Swap(PipeSender* other) { std::swap(center_, other->center_); } + // Send a single message along the pipe. // Returns a promise that will resolve to a bool - true if the message was // sent, false if it could never be sent. Blocks the promise until the @@ -297,6 +338,8 @@ class PipeReceiver { if (center_ != nullptr) center_->UnrefRecv(); } + void Swap(PipeReceiver* other) { std::swap(center_, other->center_); } + // Receive a single message from the pipe. // Returns a promise that will resolve to an optional - with a value if a // message was received, or no value if the other end of the pipe was closed. diff --git a/src/core/lib/promise/seq.h b/src/core/lib/promise/seq.h index c2b757d02d1..ea2cefc5b61 100644 --- a/src/core/lib/promise/seq.h +++ b/src/core/lib/promise/seq.h @@ -32,13 +32,11 @@ struct SeqTraits { using UnwrappedType = T; using WrappedType = T; template - static auto CallFactory(Next* next, T&& value) - -> decltype(next->Once(std::forward(value))) { - return next->Once(std::forward(value)); + static auto CallFactory(Next* next, T&& value) { + return next->Make(std::forward(value)); } template - static auto CallSeqFactory(F& f, Elem&& elem, T&& value) - -> decltype(f(std::forward(elem), std::forward(value))) { + static auto CallSeqFactory(F& f, Elem&& elem, T&& value) { return f(std::forward(elem), std::forward(value)); } template diff --git a/src/core/lib/promise/try_seq.h b/src/core/lib/promise/try_seq.h index 32e657627f2..6409c445b91 100644 --- a/src/core/lib/promise/try_seq.h +++ b/src/core/lib/promise/try_seq.h @@ -37,9 +37,8 @@ struct TrySeqTraitsWithSfinae { using UnwrappedType = T; using WrappedType = absl::StatusOr; template - static auto CallFactory(Next* next, T&& value) - -> decltype(next->Once(std::forward(value))) { - return next->Once(std::forward(value)); + static auto CallFactory(Next* next, T&& value) { + return next->Make(std::forward(value)); } template static auto CallSeqFactory(F& f, Elem&& elem, T&& value) @@ -57,9 +56,8 @@ struct TrySeqTraitsWithSfinae> { using UnwrappedType = T; using WrappedType = absl::StatusOr; template - static auto CallFactory(Next* next, absl::StatusOr&& status) - -> decltype(next->Once(std::move(*status))) { - return next->Once(std::move(*status)); + static auto CallFactory(Next* next, absl::StatusOr&& status) { + return next->Make(std::move(*status)); } template static auto CallSeqFactory(F& f, Elem&& elem, absl::StatusOr value) @@ -84,8 +82,8 @@ struct TrySeqTraitsWithSfinae< using UnwrappedType = void; using WrappedType = T; template - static auto CallFactory(Next* next, T&&) -> decltype(next->Once()) { - return next->Once(); + static auto CallFactory(Next* next, T&&) { + return next->Make(); } template static Poll CheckResultAndRunNext(T prior, RunNext run_next) { @@ -98,9 +96,8 @@ struct TrySeqTraitsWithSfinae { using UnwrappedType = void; using WrappedType = absl::Status; template - static auto CallFactory(Next* next, absl::Status&&) - -> decltype(next->Once()) { - return next->Once(); + static auto CallFactory(Next* next, absl::Status&&) { + return next->Make(); } template static Poll CheckResultAndRunNext(absl::Status prior, diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 208c545d1f6..ab797e7460f 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -607,6 +607,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/load_balancing/lb_policy_registry.cc', 'src/core/lib/matchers/matchers.cc', 'src/core/lib/promise/activity.cc', + 'src/core/lib/promise/pipe.cc', 'src/core/lib/promise/sleep.cc', 'src/core/lib/resolver/resolver.cc', 'src/core/lib/resolver/resolver_registry.cc', diff --git a/test/core/promise/BUILD b/test/core/promise/BUILD index 3e0b4cb73b0..831b932f650 100644 --- a/test/core/promise/BUILD +++ b/test/core/promise/BUILD @@ -311,6 +311,28 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "map_pipe_test", + srcs = ["map_pipe_test.cc"], + external_deps = ["gtest"], + language = "c++", + tags = ["promise_test"], + uses_event_engine = False, + uses_polling = False, + deps = [ + "test_wakeup_schedulers", + "//src/core:for_each", + "//src/core:join", + "//src/core:map", + "//src/core:map_pipe", + "//src/core:observable", + "//src/core:pipe", + "//src/core:resource_quota", + "//src/core:seq", + "//src/core:try_seq", + ], +) + grpc_cc_test( name = "pipe_test", srcs = ["pipe_test.cc"], diff --git a/test/core/promise/map_pipe_test.cc b/test/core/promise/map_pipe_test.cc new file mode 100644 index 00000000000..d05ebd23ab0 --- /dev/null +++ b/test/core/promise/map_pipe_test.cc @@ -0,0 +1,153 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/lib/promise/map_pipe.h" + +#include + +#include +#include + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include + +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/detail/basic_seq.h" +#include "src/core/lib/promise/for_each.h" +#include "src/core/lib/promise/join.h" +#include "src/core/lib/promise/map.h" +#include "src/core/lib/promise/pipe.h" +#include "src/core/lib/promise/poll.h" +#include "src/core/lib/promise/seq.h" +#include "src/core/lib/promise/try_seq.h" +#include "src/core/lib/resource_quota/arena.h" +#include "src/core/lib/resource_quota/memory_quota.h" +#include "src/core/lib/resource_quota/resource_quota.h" +#include "test/core/promise/test_wakeup_schedulers.h" + +using testing::Mock; +using testing::MockFunction; +using testing::StrictMock; + +namespace grpc_core { + +static auto* g_memory_allocator = new MemoryAllocator( + ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test")); + +template +class Delayed { + public: + explicit Delayed(T x) : x_(x) {} + + Poll operator()() { + Activity::current()->ForceImmediateRepoll(); + ++polls_; + if (polls_ == 10) return std::move(x_); + return Pending(); + } + + private: + int polls_ = 0; + T x_; +}; + +TEST(MapPipeTest, SendThriceWithPipeInterceptingReceive) { + int num_received = 0; + StrictMock> on_done; + EXPECT_CALL(on_done, Call(absl::OkStatus())); + MakeActivity( + [&num_received] { + Pipe pipe; + auto filter = + PipeMapper::Intercept(pipe.receiver).TakeAndRun([](int x) { + return Delayed(x + 1); + }); + auto sender = std::make_shared>>( + std::make_unique>(std::move(pipe.sender))); + return Map( + Join( + std::move(filter), + // Push 3 things into a pipe -- 0, 1, then 2 -- then close. + Seq((*sender)->Push(0), [sender] { return (*sender)->Push(1); }, + [sender] { return (*sender)->Push(2); }, + [sender] { + sender->reset(); + return absl::OkStatus(); + }), + // Use a ForEach loop to read them out and verify all values are + // seen (but with 1 added). + ForEach(std::move(pipe.receiver), + [&num_received](int i) { + num_received++; + EXPECT_EQ(num_received, i); + return absl::OkStatus(); + })), + JustElem<2>()); + }, + NoWakeupScheduler(), + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }, + MakeScopedArena(1024, g_memory_allocator)); + Mock::VerifyAndClearExpectations(&on_done); + EXPECT_EQ(num_received, 3); +} + +TEST(MapPipeTest, SendThriceWithPipeInterceptingSend) { + int num_received = 0; + StrictMock> on_done; + EXPECT_CALL(on_done, Call(absl::OkStatus())); + MakeActivity( + [&num_received] { + Pipe pipe; + auto filter = + PipeMapper::Intercept(pipe.sender).TakeAndRun([](int x) { + return Delayed(x + 1); + }); + auto sender = std::make_shared>>( + std::make_unique>(std::move(pipe.sender))); + return Map( + Join( + std::move(filter), + // Push 3 things into a pipe -- 0, 1, then 2 -- then close. + Seq((*sender)->Push(0), [sender] { return (*sender)->Push(1); }, + [sender] { return (*sender)->Push(2); }, + [sender] { + sender->reset(); + return absl::OkStatus(); + }), + // Use a ForEach loop to read them out and verify all values are + // seen (but with 1 added). + ForEach(std::move(pipe.receiver), + [&num_received](int i) { + num_received++; + EXPECT_EQ(num_received, i); + return absl::OkStatus(); + })), + JustElem<2>()); + }, + NoWakeupScheduler(), + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }, + MakeScopedArena(1024, g_memory_allocator)); + Mock::VerifyAndClearExpectations(&on_done); + EXPECT_EQ(num_received, 3); +} + +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/core/promise/promise_factory_test.cc b/test/core/promise/promise_factory_test.cc index e1ffee836c5..47d8a3a855b 100644 --- a/test/core/promise/promise_factory_test.cc +++ b/test/core/promise/promise_factory_test.cc @@ -28,31 +28,36 @@ namespace promise_detail { namespace testing { template -PromiseFactory MakeFactory(F f) { - return PromiseFactory(std::move(f)); +auto MakeOnceFactory(F f) { + return OncePromiseFactory(std::move(f)); +} +template +auto MakeRepeatedFactory(F f) { + return RepeatedPromiseFactory(std::move(f)); } TEST(AdaptorTest, FactoryFromPromise) { EXPECT_EQ( - MakeFactory([]() { return Poll(Poll(42)); }).Once()(), - Poll(42)); - EXPECT_EQ( - MakeFactory([]() { return Poll(Poll(42)); }).Repeated()(), + MakeOnceFactory([]() { return Poll(Poll(42)); }).Make()(), Poll(42)); - EXPECT_EQ(MakeFactory(Promise([]() { + EXPECT_EQ(MakeRepeatedFactory([]() { + return Poll(Poll(42)); + }).Make()(), + Poll(42)); + EXPECT_EQ(MakeOnceFactory(Promise([]() { return Poll(Poll(42)); - })).Once()(), + })).Make()(), Poll(42)); - EXPECT_EQ(MakeFactory(Promise([]() { + EXPECT_EQ(MakeRepeatedFactory(Promise([]() { return Poll(Poll(42)); - })).Repeated()(), + })).Make()(), Poll(42)); } TEST(AdaptorTest, FactoryFromBindFrontPromise) { - EXPECT_EQ(MakeFactory( + EXPECT_EQ(MakeOnceFactory( absl::bind_front([](int i) { return Poll(i); }, 42)) - .Once()(), + .Make()(), Poll(42)); } diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index d9798161d26..11117bf5316 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -2277,6 +2277,7 @@ src/core/lib/promise/intra_activity_waiter.h \ src/core/lib/promise/latch.h \ src/core/lib/promise/loop.h \ src/core/lib/promise/map.h \ +src/core/lib/promise/pipe.cc \ src/core/lib/promise/pipe.h \ src/core/lib/promise/poll.h \ src/core/lib/promise/promise.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index f78238d8063..bc0ebd3b1f0 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -2068,6 +2068,7 @@ src/core/lib/promise/intra_activity_waiter.h \ src/core/lib/promise/latch.h \ src/core/lib/promise/loop.h \ src/core/lib/promise/map.h \ +src/core/lib/promise/pipe.cc \ src/core/lib/promise/pipe.h \ src/core/lib/promise/poll.h \ src/core/lib/promise/promise.h \ diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index fc286d3d5c7..f2b594569bd 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -4481,6 +4481,30 @@ ], "uses_polling": false }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "map_pipe_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,