[promises] ForEach fixes (#31300)

* [promises] ForEach fixes

* Automated change: Fix sanity tests

* first pass fixes

* fix

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/31307/head
Craig Tiller 2 years ago committed by GitHub
parent 1267b0c0f4
commit b0a0e8983f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      build_autogenerated.yaml
  2. 59
      src/core/lib/promise/for_each.h
  3. 1
      test/core/promise/BUILD
  4. 119
      test/core/promise/for_each_test.cc

@ -6211,6 +6211,7 @@ targets:
- src/core/lib/promise/poll.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/promise/try_seq.h
- src/core/lib/promise/wait_set.h
- src/core/lib/resource_quota/arena.h
- src/core/lib/resource_quota/memory_quota.h

@ -29,19 +29,6 @@ namespace grpc_core {
namespace for_each_detail {
// Helper function: at the end of each iteration of a for-each loop, this is
// called. If the iteration failed, return failure. If the iteration succeeded,
// then call the next iteration.
template <typename Reader, typename CallPoll>
Poll<absl::Status> FinishIteration(absl::Status* r, Reader* reader,
CallPoll call_poll) {
if (r->ok()) {
auto next = reader->Next();
return call_poll(next);
}
return std::move(*r);
}
// Done creates statuses for the end of the iteration. It's templated on the
// type of the result of the ForEach loop, so that we can introduce new types
// easily.
@ -57,9 +44,11 @@ template <typename Reader, typename Action>
class ForEach {
private:
using ReaderNext = decltype(std::declval<Reader>().Next());
using ReaderResult = typename PollTraits<
decltype(std::declval<ReaderNext>()())>::Type::value_type;
using ActionFactory = promise_detail::PromiseFactory<ReaderResult, Action>;
using ReaderResult =
typename PollTraits<decltype(std::declval<ReaderNext>()())>::Type;
using ReaderResultValue = typename ReaderResult::value_type;
using ActionFactory =
promise_detail::PromiseFactory<ReaderResultValue, Action>;
using ActionPromise = typename ActionFactory::Promise;
public:
@ -79,20 +68,21 @@ class ForEach {
// NOLINTNEXTLINE(performance-noexcept-move-constructor)
ForEach& operator=(ForEach&&) = default;
Poll<Result> operator()() {
return absl::visit(CallPoll<false>{this}, state_);
}
Poll<Result> operator()() { return absl::visit(CallPoll{this}, state_); }
private:
struct InAction {
InAction(ActionPromise promise, ReaderResult result)
: promise(std::move(promise)), result(std::move(result)) {}
ActionPromise promise;
ReaderResult result;
};
Reader reader_;
ActionFactory action_factory_;
absl::variant<ReaderNext, ActionPromise> state_;
absl::variant<ReaderNext, InAction> state_;
// Call the inner poll function, and if it's finished, start the next
// iteration. If kSetState==true, also set the current state in self->state_.
// We omit that on the first iteration because it's common to poll once and
// not change state, which saves us some work.
template <bool kSetState>
// iteration.
struct CallPoll {
ForEach* const self;
@ -101,25 +91,24 @@ class ForEach {
if (auto* p = absl::get_if<kPollReadyIdx>(&r)) {
if (p->has_value()) {
auto action = self->action_factory_.Repeated(std::move(**p));
p->reset();
return CallPoll<true>{self}(action);
return (*this)(self->state_.template emplace<InAction>(
std::move(action), std::move(*p)));
} else {
return Done<Result>::Make();
}
}
if (kSetState) {
self->state_.template emplace<ReaderNext>(std::move(reader_next));
}
return Pending();
}
Poll<Result> operator()(ActionPromise& promise) {
auto r = promise();
Poll<Result> operator()(InAction& in_action) {
auto r = in_action.promise();
if (auto* p = absl::get_if<kPollReadyIdx>(&r)) {
return FinishIteration(p, &self->reader_, CallPoll<true>{self});
}
if (kSetState) {
self->state_.template emplace<ActionPromise>(std::move(promise));
if (p->ok()) {
return (*this)(
self->state_.template emplace<ReaderNext>(self->reader_.Next()));
} else {
return std::move(*p);
}
}
return Pending();
}

@ -307,6 +307,7 @@ grpc_cc_test(
"//:pipe",
"//:resource_quota",
"//:seq",
"//:try_seq",
],
)

@ -30,6 +30,7 @@
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/resource_quota/resource_quota.h"
@ -79,6 +80,124 @@ TEST(ForEachTest, SendThriceWithPipe) {
EXPECT_EQ(num_received, 3);
}
// Pollable type that stays movable until it's polled, then causes the test to
// fail if it's moved again.
// Promises have the property that they can be moved until polled, and this
// helps us check that the internals of ForEach respect this rule.
class MoveableUntilPolled {
public:
MoveableUntilPolled() = default;
MoveableUntilPolled(const MoveableUntilPolled&) = delete;
MoveableUntilPolled& operator=(const MoveableUntilPolled&) = delete;
MoveableUntilPolled(MoveableUntilPolled&& other) noexcept : polls_(0) {
EXPECT_EQ(other.polls_, 0);
}
MoveableUntilPolled& operator=(MoveableUntilPolled&& other) noexcept {
EXPECT_EQ(other.polls_, 0);
polls_ = 0;
return *this;
}
Poll<absl::Status> operator()() {
Activity::current()->ForceImmediateRepoll();
++polls_;
if (polls_ == 10) return absl::OkStatus();
return Pending();
}
private:
int polls_ = 0;
};
TEST(ForEachTest, NoMoveAfterPoll) {
int num_received = 0;
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[&num_received] {
Pipe<int> pipe;
auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
return Map(
Join(
// Push one things into a pipe, then close.
Seq((*sender)->Push(1),
[sender] {
sender->reset();
return absl::OkStatus();
}),
// Use a ForEach loop to read them out and verify all
// values are seen.
// Inject a MoveableUntilPolled into the loop to ensure that
// ForEach doesn't internally move a promise post-polling.
ForEach(std::move(pipe.receiver),
[&num_received](int i) {
num_received++;
EXPECT_EQ(num_received, i);
return MoveableUntilPolled();
})),
JustElem<1>());
},
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); },
MakeScopedArena(1024, g_memory_allocator));
Mock::VerifyAndClearExpectations(&on_done);
EXPECT_EQ(num_received, 1);
}
TEST(ForEachTest, NextResultHeldThroughCallback) {
int num_received = 0;
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[&num_received] {
Pipe<int> pipe;
auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
return Map(
Join(
// Push one things into a pipe, then close.
Seq((*sender)->Push(1),
[sender] {
sender->reset();
return absl::OkStatus();
}),
// Use a ForEach loop to read them out and verify all
// values are seen.
ForEach(std::move(pipe.receiver),
[&num_received, sender](int i) {
// While we're processing a value NextResult
// should be held disallowing new items to be
// pushed.
// We also should not have reached the
// sender->reset() line above yet either, as
// the Push() should block until this code
// completes.
EXPECT_TRUE(absl::holds_alternative<Pending>(
(*sender)->Push(2)()));
num_received++;
EXPECT_EQ(num_received, i);
return TrySeq(
// has the side effect of stalling for some
// iterations
MoveableUntilPolled(), [sender] {
// Perform the same test verifying the same
// properties for NextResult holding: all should
// still be true.
EXPECT_TRUE(absl::holds_alternative<Pending>(
(*sender)->Push(2)()));
return absl::OkStatus();
});
})),
JustElem<1>());
},
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); },
MakeScopedArena(1024, g_memory_allocator));
Mock::VerifyAndClearExpectations(&on_done);
EXPECT_EQ(num_received, 1);
}
} // namespace grpc_core
int main(int argc, char** argv) {

Loading…
Cancel
Save