|
|
|
@ -22,6 +22,9 @@ |
|
|
|
|
#include "absl/status/status.h" |
|
|
|
|
#include "absl/types/variant.h" |
|
|
|
|
|
|
|
|
|
#include <grpc/support/log.h> |
|
|
|
|
|
|
|
|
|
#include "src/core/lib/gprpp/construct_destruct.h" |
|
|
|
|
#include "src/core/lib/promise/detail/promise_factory.h" |
|
|
|
|
#include "src/core/lib/promise/poll.h" |
|
|
|
|
|
|
|
|
@ -55,20 +58,39 @@ class ForEach { |
|
|
|
|
using Result = |
|
|
|
|
typename PollTraits<decltype(std::declval<ActionPromise>()())>::Type; |
|
|
|
|
ForEach(Reader reader, Action action) |
|
|
|
|
: reader_(std::move(reader)), |
|
|
|
|
action_factory_(std::move(action)), |
|
|
|
|
state_(reader_.Next()) {} |
|
|
|
|
: reader_(std::move(reader)), action_factory_(std::move(action)) { |
|
|
|
|
Construct(&reader_next_, reader_.Next()); |
|
|
|
|
} |
|
|
|
|
~ForEach() { |
|
|
|
|
if (reading_next_) { |
|
|
|
|
Destruct(&reader_next_); |
|
|
|
|
} else { |
|
|
|
|
Destruct(&in_action_); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ForEach(const ForEach&) = delete; |
|
|
|
|
ForEach& operator=(const ForEach&) = delete; |
|
|
|
|
// noexcept causes compiler errors on older gcc's
|
|
|
|
|
// NOLINTNEXTLINE(performance-noexcept-move-constructor)
|
|
|
|
|
ForEach(ForEach&&) = default; |
|
|
|
|
// noexcept causes compiler errors on older gcc's
|
|
|
|
|
// NOLINTNEXTLINE(performance-noexcept-move-constructor)
|
|
|
|
|
ForEach& operator=(ForEach&&) = default; |
|
|
|
|
|
|
|
|
|
Poll<Result> operator()() { return absl::visit(CallPoll{this}, state_); } |
|
|
|
|
ForEach(ForEach&& other) noexcept |
|
|
|
|
: reader_(std::move(other.reader_)), |
|
|
|
|
action_factory_(std::move(other.action_factory_)) { |
|
|
|
|
GPR_DEBUG_ASSERT(reading_next_); |
|
|
|
|
GPR_DEBUG_ASSERT(other.reading_next_); |
|
|
|
|
Construct(&reader_next_, std::move(other.reader_next_)); |
|
|
|
|
} |
|
|
|
|
ForEach& operator=(ForEach&& other) noexcept { |
|
|
|
|
GPR_DEBUG_ASSERT(reading_next_); |
|
|
|
|
GPR_DEBUG_ASSERT(other.reading_next_); |
|
|
|
|
reader_ = std::move(other.reader_); |
|
|
|
|
action_factory_ = std::move(other.action_factory_); |
|
|
|
|
reader_next_ = std::move(other.reader_next_); |
|
|
|
|
return *this; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Poll<Result> operator()() { |
|
|
|
|
if (reading_next_) return PollReaderNext(); |
|
|
|
|
return PollAction(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
struct InAction { |
|
|
|
@ -77,41 +99,44 @@ class ForEach { |
|
|
|
|
ActionPromise promise; |
|
|
|
|
ReaderResult result; |
|
|
|
|
}; |
|
|
|
|
Reader reader_; |
|
|
|
|
ActionFactory action_factory_; |
|
|
|
|
absl::variant<ReaderNext, InAction> state_; |
|
|
|
|
|
|
|
|
|
// Call the inner poll function, and if it's finished, start the next
|
|
|
|
|
// iteration.
|
|
|
|
|
struct CallPoll { |
|
|
|
|
ForEach* const self; |
|
|
|
|
|
|
|
|
|
Poll<Result> operator()(ReaderNext& reader_next) { |
|
|
|
|
auto r = reader_next(); |
|
|
|
|
if (auto* p = absl::get_if<kPollReadyIdx>(&r)) { |
|
|
|
|
if (p->has_value()) { |
|
|
|
|
auto action = self->action_factory_.Make(std::move(**p)); |
|
|
|
|
return (*this)(self->state_.template emplace<InAction>( |
|
|
|
|
std::move(action), std::move(*p))); |
|
|
|
|
} else { |
|
|
|
|
return Done<Result>::Make(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Poll<Result> PollReaderNext() { |
|
|
|
|
auto r = reader_next_(); |
|
|
|
|
if (auto* p = absl::get_if<kPollReadyIdx>(&r)) { |
|
|
|
|
if (p->has_value()) { |
|
|
|
|
Destruct(&reader_next_); |
|
|
|
|
auto action = action_factory_.Make(std::move(**p)); |
|
|
|
|
Construct(&in_action_, std::move(action), std::move(*p)); |
|
|
|
|
reading_next_ = false; |
|
|
|
|
return PollAction(); |
|
|
|
|
} else { |
|
|
|
|
return Done<Result>::Make(); |
|
|
|
|
} |
|
|
|
|
return Pending(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Poll<Result> operator()(InAction& in_action) { |
|
|
|
|
auto r = in_action.promise(); |
|
|
|
|
if (auto* p = absl::get_if<kPollReadyIdx>(&r)) { |
|
|
|
|
if (p->ok()) { |
|
|
|
|
return (*this)( |
|
|
|
|
self->state_.template emplace<ReaderNext>(self->reader_.Next())); |
|
|
|
|
} else { |
|
|
|
|
return std::move(*p); |
|
|
|
|
} |
|
|
|
|
return Pending(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Poll<Result> PollAction() { |
|
|
|
|
auto r = in_action_.promise(); |
|
|
|
|
if (auto* p = absl::get_if<kPollReadyIdx>(&r)) { |
|
|
|
|
if (p->ok()) { |
|
|
|
|
Destruct(&in_action_); |
|
|
|
|
Construct(&reader_next_, reader_.Next()); |
|
|
|
|
reading_next_ = true; |
|
|
|
|
return PollReaderNext(); |
|
|
|
|
} else { |
|
|
|
|
return std::move(*p); |
|
|
|
|
} |
|
|
|
|
return Pending(); |
|
|
|
|
} |
|
|
|
|
return Pending(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GPR_NO_UNIQUE_ADDRESS Reader reader_; |
|
|
|
|
GPR_NO_UNIQUE_ADDRESS ActionFactory action_factory_; |
|
|
|
|
bool reading_next_ = true; |
|
|
|
|
union { |
|
|
|
|
ReaderNext reader_next_; |
|
|
|
|
InAction in_action_; |
|
|
|
|
}; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|