mirror of https://github.com/grpc/grpc.git
Promise join combinator (#26918)
* Add construct/destruct helper functions Will be used in upcoming promise implementation * Poll type for promises library * Library to talk about things that look like promises if you squint * Helper code for promises to deal with status types generically * Library to talk about things that make promises * build * Join combinator for promise library * Changes to sync required for promise activities * sanitized * Automated change: Fix sanity tests * Automated change: Fix sanity tests * Update basic_join.h * Automated change: Fix sanity tests Co-authored-by: ctiller <ctiller@users.noreply.github.com>pull/27069/head
parent
59da7bc42a
commit
f292f001ee
11 changed files with 663 additions and 1 deletions
@ -0,0 +1,191 @@ |
||||
// Copyright 2021 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_CORE_LIB_PROMISE_DETAIL_BASIC_JOIN_H |
||||
#define GRPC_CORE_LIB_PROMISE_DETAIL_BASIC_JOIN_H |
||||
|
||||
#include <grpc/impl/codegen/port_platform.h> |
||||
|
||||
#include <tuple> |
||||
#include <utility> |
||||
|
||||
#include "absl/utility/utility.h" |
||||
#include "src/core/lib/gprpp/bitset.h" |
||||
#include "src/core/lib/gprpp/construct_destruct.h" |
||||
#include "src/core/lib/promise/detail/promise_factory.h" |
||||
#include "src/core/lib/promise/poll.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace promise_detail { |
||||
|
||||
// This union can either be a functor, or the result of the functor (after
|
||||
// mapping via a trait). Allows us to remember the result of one joined functor
|
||||
// until the rest are ready.
|
||||
template <typename Traits, typename F> |
||||
union Fused { |
||||
explicit Fused(F&& f) : f(std::forward<F>(f)) {} |
||||
explicit Fused(PromiseLike<F>&& f) : f(std::forward<PromiseLike<F>>(f)) {} |
||||
~Fused() {} |
||||
// Wrap the functor in a PromiseLike to handle immediately returning functors
|
||||
// and the like.
|
||||
using Promise = PromiseLike<F>; |
||||
GPR_NO_UNIQUE_ADDRESS Promise f; |
||||
// Compute the result type: We take the result of the promise, and pass it via
|
||||
// our traits, so that, for example, TryJoin and take a StatusOr<T> and just
|
||||
// store a T.
|
||||
using Result = typename Traits::template ResultType<typename Promise::Result>; |
||||
GPR_NO_UNIQUE_ADDRESS Result result; |
||||
}; |
||||
|
||||
// A join gets composed of joints... these are just wrappers around a Fused for
|
||||
// their data, with some machinery as methods to get the system working.
|
||||
template <typename Traits, size_t kRemaining, typename... Fs> |
||||
struct Joint : public Joint<Traits, kRemaining - 1, Fs...> { |
||||
// The index into Fs for this Joint
|
||||
static constexpr size_t kIdx = sizeof...(Fs) - kRemaining; |
||||
// The next join (the one we derive from)
|
||||
using NextJoint = Joint<Traits, kRemaining - 1, Fs...>; |
||||
// From Fs, extract the functor for this joint.
|
||||
using F = typename std::tuple_element<kIdx, std::tuple<Fs...>>::type; |
||||
// Generate the Fused type for this functor.
|
||||
using Fsd = Fused<Traits, F>; |
||||
GPR_NO_UNIQUE_ADDRESS Fsd fused; |
||||
// Figure out what kind of bitmask will be used by the outer join.
|
||||
using Bits = BitSet<sizeof...(Fs)>; |
||||
// Initialize from a tuple of pointers to Fs
|
||||
explicit Joint(std::tuple<Fs*...> fs) |
||||
: NextJoint(fs), fused(std::move(*std::get<kIdx>(fs))) {} |
||||
// Copy: assume that the Fuse is still in the promise state (since it's not
|
||||
// legal to copy after the first poll!)
|
||||
Joint(const Joint& j) : NextJoint(j), fused(j.fused.f) {} |
||||
// Move: assume that the Fuse is still in the promise state (since it's not
|
||||
// legal to move after the first poll!)
|
||||
Joint(Joint&& j) noexcept |
||||
: NextJoint(std::forward<NextJoint>(j)), fused(std::move(j.fused.f)) {} |
||||
// Destruct: check bits to see if we're in promise or result state, and call
|
||||
// the appropriate destructor. Recursively, call up through the join.
|
||||
void DestructAll(const Bits& bits) { |
||||
if (!bits.is_set(kIdx)) { |
||||
Destruct(&fused.f); |
||||
} else { |
||||
Destruct(&fused.result); |
||||
} |
||||
NextJoint::DestructAll(bits); |
||||
} |
||||
// Poll all joints up, and then call finally.
|
||||
template <typename F> |
||||
auto Run(Bits* bits, F finally) -> decltype(finally()) { |
||||
// If we're still in the promise state...
|
||||
if (!bits->is_set(kIdx)) { |
||||
// Poll the promise
|
||||
auto r = fused.f(); |
||||
if (auto* p = absl::get_if<kPollReadyIdx>(&r)) { |
||||
// If it's done, then ask the trait to unwrap it and store that result
|
||||
// in the Fused, and continue the iteration. Note that OnResult could
|
||||
// instead choose to return a value instead of recursing through the
|
||||
// iteration, in that case we continue returning the same result up.
|
||||
// Here is where TryJoin can escape out.
|
||||
return Traits::OnResult( |
||||
std::move(*p), [this, bits, &finally](typename Fsd::Result result) { |
||||
bits->set(kIdx); |
||||
Destruct(&fused.f); |
||||
Construct(&fused.result, std::move(result)); |
||||
return NextJoint::Run(bits, std::move(finally)); |
||||
}); |
||||
} |
||||
} |
||||
// That joint is still pending... we'll still poll the result of the joints.
|
||||
return NextJoint::Run(bits, std::move(finally)); |
||||
} |
||||
}; |
||||
|
||||
// Terminating joint... for each of the recursions, do the thing we're supposed
|
||||
// to do at the end.
|
||||
template <typename Traits, typename... Fs> |
||||
struct Joint<Traits, 0, Fs...> { |
||||
explicit Joint(std::tuple<Fs*...>) {} |
||||
Joint(const Joint&) {} |
||||
Joint(Joint&&) noexcept {} |
||||
template <typename T> |
||||
void DestructAll(const T&) {} |
||||
template <typename F> |
||||
auto Run(BitSet<sizeof...(Fs)>*, F finally) -> decltype(finally()) { |
||||
return finally(); |
||||
} |
||||
}; |
||||
|
||||
template <typename Traits, typename... Fs> |
||||
class BasicJoin { |
||||
private: |
||||
// How many things are we joining?
|
||||
static constexpr size_t N = sizeof...(Fs); |
||||
// Bitset: if a bit is 0, that joint is still in promise state. If it's 1,
|
||||
// then the joint has a result.
|
||||
GPR_NO_UNIQUE_ADDRESS BitSet<N> state_; |
||||
// The actual joints, wrapped in an anonymous union to give us control of
|
||||
// construction/destruction.
|
||||
union { |
||||
GPR_NO_UNIQUE_ADDRESS Joint<Traits, sizeof...(Fs), Fs...> joints_; |
||||
}; |
||||
|
||||
// Access joint index I
|
||||
template <size_t I> |
||||
Joint<Traits, sizeof...(Fs) - I, Fs...>* GetJoint() { |
||||
return static_cast<Joint<Traits, sizeof...(Fs) - I, Fs...>*>(&joints_); |
||||
} |
||||
|
||||
// The tuple of results of all our promises
|
||||
using Tuple = std::tuple<typename Fused<Traits, Fs>::Result...>; |
||||
|
||||
// Collect up all the results and construct a tuple.
|
||||
template <size_t... I> |
||||
Tuple Finish(absl::index_sequence<I...>) { |
||||
return Tuple(std::move(GetJoint<I>()->fused.result)...); |
||||
} |
||||
|
||||
public: |
||||
explicit BasicJoin(Fs&&... fs) : joints_(std::tuple<Fs*...>(&fs...)) {} |
||||
BasicJoin& operator=(const BasicJoin&) = delete; |
||||
// Copy a join - only available before polling.
|
||||
BasicJoin(const BasicJoin& other) { |
||||
assert(other.state_.none()); |
||||
Construct(&joints_, other.joints_); |
||||
} |
||||
// Move a join - only available before polling.
|
||||
BasicJoin(BasicJoin&& other) noexcept { |
||||
assert(other.state_.none()); |
||||
Construct(&joints_, std::move(other.joints_)); |
||||
} |
||||
~BasicJoin() { joints_.DestructAll(state_); } |
||||
using Result = decltype(Traits::Wrap(std::declval<Tuple>())); |
||||
// Poll the join
|
||||
Poll<Result> operator()() { |
||||
// Poll the joints...
|
||||
return joints_.Run(&state_, [this]() -> Poll<Result> { |
||||
// If all of them are completed, collect the results, and then ask our
|
||||
// traits to wrap them - allowing for example TryJoin to turn tuple<A,B,C>
|
||||
// into StatusOr<tuple<A,B,C>>.
|
||||
if (state_.all()) { |
||||
return Traits::Wrap(Finish(absl::make_index_sequence<N>())); |
||||
} else { |
||||
return Pending(); |
||||
} |
||||
}); |
||||
} |
||||
}; |
||||
|
||||
} // namespace promise_detail
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_CORE_LIB_PROMISE_DETAIL_BASIC_JOIN_H
|
@ -0,0 +1,53 @@ |
||||
// Copyright 2021 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_CORE_LIB_PROMISE_JOIN_H |
||||
#define GRPC_CORE_LIB_PROMISE_JOIN_H |
||||
|
||||
#include <grpc/impl/codegen/port_platform.h> |
||||
|
||||
#include "src/core/lib/promise/detail/basic_join.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace promise_detail { |
||||
|
||||
struct JoinTraits { |
||||
template <typename T> |
||||
using ResultType = absl::remove_reference_t<T>; |
||||
template <typename T, typename F> |
||||
static auto OnResult(T result, F kontinue) |
||||
-> decltype(kontinue(std::move(result))) { |
||||
return kontinue(std::move(result)); |
||||
} |
||||
template <typename T> |
||||
static T Wrap(T x) { |
||||
return x; |
||||
} |
||||
}; |
||||
|
||||
template <typename... Promises> |
||||
using Join = BasicJoin<JoinTraits, Promises...>; |
||||
|
||||
} // namespace promise_detail
|
||||
|
||||
/// Combinator to run all promises to completion, and return a tuple
|
||||
/// of their results.
|
||||
template <typename... Promise> |
||||
promise_detail::Join<Promise...> Join(Promise... promises) { |
||||
return promise_detail::Join<Promise...>(std::move(promises)...); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_CORE_LIB_PROMISE_JOIN_H
|
@ -0,0 +1,76 @@ |
||||
// Copyright 2021 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_CORE_LIB_PROMISE_TRY_JOIN_H |
||||
#define GRPC_CORE_LIB_PROMISE_TRY_JOIN_H |
||||
|
||||
#include <grpc/impl/codegen/port_platform.h> |
||||
|
||||
#include "src/core/lib/promise/detail/basic_join.h" |
||||
#include "src/core/lib/promise/detail/status.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
namespace promise_detail { |
||||
|
||||
// Extract the T from a StatusOr<T>
|
||||
template <typename T> |
||||
T IntoResult(absl::StatusOr<T>* status) { |
||||
return std::move(**status); |
||||
} |
||||
|
||||
// TryJoin returns a StatusOr<tuple<A,B,C>> for f()->Poll<StatusOr<A>>,
|
||||
// g()->Poll<StatusOr<B>>, h()->Poll<StatusOr<C>>. If one of those should be a
|
||||
// Status instead, we need a placeholder type to return, and this is it.
|
||||
struct Empty {}; |
||||
inline Empty IntoResult(absl::Status*) { return Empty{}; } |
||||
|
||||
// Traits object to pass to BasicJoin
|
||||
struct TryJoinTraits { |
||||
template <typename T> |
||||
using ResultType = |
||||
decltype(IntoResult(std::declval<absl::remove_reference_t<T>*>())); |
||||
template <typename T, typename F> |
||||
static auto OnResult(T result, F kontinue) |
||||
-> decltype(kontinue(IntoResult(&result))) { |
||||
using Result = |
||||
typename PollTraits<decltype(kontinue(IntoResult(&result)))>::Type; |
||||
if (!result.ok()) { |
||||
return Result(IntoStatus(&result)); |
||||
} |
||||
return kontinue(IntoResult(&result)); |
||||
} |
||||
template <typename T> |
||||
static absl::StatusOr<T> Wrap(T x) { |
||||
return absl::StatusOr<T>(std::move(x)); |
||||
} |
||||
}; |
||||
|
||||
// Implementation of TryJoin combinator.
|
||||
template <typename... Promises> |
||||
using TryJoin = BasicJoin<TryJoinTraits, Promises...>; |
||||
|
||||
} // namespace promise_detail
|
||||
|
||||
// Run all promises.
|
||||
// If any fail, cancel the rest and return the failure.
|
||||
// If all succeed, return Ok(tuple-of-results).
|
||||
template <typename... Promises> |
||||
promise_detail::TryJoin<Promises...> TryJoin(Promises... promises) { |
||||
return promise_detail::TryJoin<Promises...>(std::move(promises)...); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_CORE_LIB_PROMISE_TRY_JOIN_H
|
@ -0,0 +1,40 @@ |
||||
// Copyright 2021 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/lib/promise/join.h" |
||||
#include <gtest/gtest.h> |
||||
|
||||
namespace grpc_core { |
||||
|
||||
TEST(JoinTest, Join1) { |
||||
EXPECT_EQ(Join([] { return 3; })(), |
||||
(Poll<std::tuple<int>>(std::make_tuple(3)))); |
||||
} |
||||
|
||||
TEST(JoinTest, Join2) { |
||||
EXPECT_EQ(Join([] { return 3; }, [] { return 4; })(), |
||||
(Poll<std::tuple<int, int>>(std::make_tuple(3, 4)))); |
||||
} |
||||
|
||||
TEST(JoinTest, Join3) { |
||||
EXPECT_EQ(Join([] { return 3; }, [] { return 4; }, [] { return 5; })(), |
||||
(Poll<std::tuple<int, int, int>>(std::make_tuple(3, 4, 5)))); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
return RUN_ALL_TESTS(); |
||||
} |
@ -0,0 +1,79 @@ |
||||
// Copyright 2021 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/lib/promise/try_join.h" |
||||
#include <gtest/gtest.h> |
||||
|
||||
namespace grpc_core { |
||||
|
||||
template <typename T> |
||||
using P = std::function<Poll<absl::StatusOr<T>>()>; |
||||
|
||||
template <typename T> |
||||
P<T> instant_ok(T x) { |
||||
return [x] { return absl::StatusOr<T>(x); }; |
||||
} |
||||
|
||||
template <typename T> |
||||
P<T> instant_fail() { |
||||
return [] { return absl::StatusOr<T>(); }; |
||||
} |
||||
|
||||
template <typename... T> |
||||
Poll<absl::StatusOr<std::tuple<T...>>> ok(T... x) { |
||||
return absl::StatusOr<std::tuple<T...>>(absl::in_place, x...); |
||||
} |
||||
|
||||
template <typename... T> |
||||
Poll<absl::StatusOr<std::tuple<T...>>> fail() { |
||||
return absl::StatusOr<std::tuple<T...>>(); |
||||
} |
||||
|
||||
template <typename T> |
||||
P<T> pending() { |
||||
return []() -> Poll<absl::StatusOr<T>> { return Pending(); }; |
||||
} |
||||
|
||||
TEST(TryJoinTest, Join1) { EXPECT_EQ(TryJoin(instant_ok(1))(), ok(1)); } |
||||
|
||||
TEST(TryJoinTest, Join1Fail) { |
||||
EXPECT_EQ(TryJoin(instant_fail<int>())(), fail<int>()); |
||||
} |
||||
|
||||
TEST(TryJoinTest, Join2Success) { |
||||
EXPECT_EQ(TryJoin(instant_ok(1), instant_ok(2))(), ok(1, 2)); |
||||
} |
||||
|
||||
TEST(TryJoinTest, Join2Fail1) { |
||||
EXPECT_EQ(TryJoin(instant_ok(1), instant_fail<int>())(), (fail<int, int>())); |
||||
} |
||||
|
||||
TEST(TryJoinTest, Join2Fail2) { |
||||
EXPECT_EQ(TryJoin(instant_fail<int>(), instant_ok(2))(), (fail<int, int>())); |
||||
} |
||||
|
||||
TEST(TryJoinTest, Join2Fail1P) { |
||||
EXPECT_EQ(TryJoin(pending<int>(), instant_fail<int>())(), (fail<int, int>())); |
||||
} |
||||
|
||||
TEST(TryJoinTest, Join2Fail2P) { |
||||
EXPECT_EQ(TryJoin(instant_fail<int>(), pending<int>())(), (fail<int, int>())); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
return RUN_ALL_TESTS(); |
||||
} |
Loading…
Reference in new issue