[promises] End-to-end flow control for a series of pipes (#30994)

* fix promise

* prototype

* progress

* implement new api

* Revert "fix promise"

This reverts commit ded85e7d19.

* Revert "Revert "fix promise""

This reverts commit c2acef1958.

* progress

* done

* Automated change: Fix sanity tests

* fix

* fix

* fix

* Automated change: Fix sanity tests

* comments

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/31115/head
Craig Tiller 2 years ago committed by GitHub
parent 3439cc29d2
commit 1402e974f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      BUILD
  2. 6
      src/core/lib/promise/activity.h
  3. 1
      src/core/lib/promise/for_each.h
  4. 161
      src/core/lib/promise/pipe.h
  5. 2
      test/core/promise/for_each_test.cc
  6. 86
      test/core/promise/pipe_test.cc

@ -1753,7 +1753,10 @@ grpc_cc_library(
grpc_cc_library(
name = "pipe",
external_deps = ["absl/types:optional"],
external_deps = [
"absl/types:optional",
"absl/types:variant",
],
language = "c++",
public_hdrs = [
"src/core/lib/promise/pipe.h",

@ -437,7 +437,11 @@ class PromiseActivity final : public FreestandingActivity,
MutexLock lock(mu());
// Check if we were done, and flag done.
was_done = done_;
if (!done_) MarkDone();
if (!done_) {
ScopedActivity scoped_activity(this);
ScopedContext contexts(this);
MarkDone();
}
}
// If we were not done, then call the on_done callback.
if (!was_done) {

@ -101,6 +101,7 @@ class ForEach {
if (auto* p = absl::get_if<kPollReadyIdx>(&r)) {
if (p->has_value()) {
auto action = self->action_factory_.Repeated(std::move(**p));
p->reset();
return CallPoll<true>{self}(action);
} else {
return Done<Result>::Make();

@ -19,7 +19,10 @@
#include <stdint.h>
#include <utility>
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/support/log.h>
@ -30,9 +33,53 @@
namespace grpc_core {
namespace pipe_detail {
template <typename T>
class Center;
}
template <typename T>
struct Pipe;
// Result of Pipe::Next - represents a received value.
// If has_value() is false, the pipe was closed by the time we polled for the
// next value. No value was received, nor will there ever be.
// This type is movable but not copyable.
// Once the final move is destroyed the pipe will ack the read and unblock the
// send.
template <typename T>
class NextResult final {
public:
explicit NextResult(pipe_detail::Center<T>* center) : center_(center) {}
~NextResult();
NextResult(const NextResult&) = delete;
NextResult& operator=(const NextResult&) = delete;
NextResult(NextResult&& other) noexcept
: center_(std::exchange(other.center_, nullptr)) {}
NextResult& operator=(NextResult&& other) noexcept {
center_ = std::exchange(other.center_, nullptr);
return *this;
}
using value_type = T;
void reset();
bool has_value() const;
const T& value() const {
GPR_ASSERT(has_value());
return **this;
}
T& value() {
GPR_ASSERT(has_value());
return **this;
}
const T& operator*() const;
T& operator*();
private:
pipe_detail::Center<T>* center_;
};
namespace pipe_detail {
template <typename T>
@ -50,18 +97,20 @@ class Center {
Center() {
send_refs_ = 1;
recv_refs_ = 1;
has_value_ = false;
value_state_ = ValueState::kEmpty;
}
// Add one ref to the send side of this object, and return this.
Center* RefSend() {
send_refs_++;
GPR_ASSERT(send_refs_ != 0);
return this;
}
// Add one ref to the recv side of this object, and return this.
Center* RefRecv() {
recv_refs_++;
GPR_ASSERT(recv_refs_ != 0);
return this;
}
@ -91,7 +140,7 @@ class Center {
on_empty_.Wake();
if (0 == send_refs_) {
this->~Center();
} else if (has_value_) {
} else if (value_state_ == ValueState::kReady) {
ResetValue();
}
}
@ -104,34 +153,71 @@ class Center {
Poll<bool> Push(T* value) {
GPR_DEBUG_ASSERT(send_refs_ != 0);
if (recv_refs_ == 0) return false;
if (has_value_) return on_empty_.pending();
has_value_ = true;
if (value_state_ != ValueState::kEmpty) return on_empty_.pending();
value_state_ = ValueState::kReady;
value_ = std::move(*value);
on_full_.Wake();
return true;
}
Poll<bool> PollAck() {
GPR_DEBUG_ASSERT(send_refs_ != 0);
if (recv_refs_ == 0) return value_state_ == ValueState::kAcked;
if (value_state_ != ValueState::kAcked) return on_empty_.pending();
value_state_ = ValueState::kEmpty;
return true;
}
// Try to receive a value from the pipe.
// Return Pending if there is no value.
// Return the value if one was retrieved.
// Return nullopt if the send end is closed and no value had been pushed.
Poll<absl::optional<T>> Next() {
Poll<NextResult<T>> Next() {
GPR_DEBUG_ASSERT(recv_refs_ != 0);
if (!has_value_) {
if (send_refs_ == 0) return absl::nullopt;
if (value_state_ != ValueState::kReady) {
if (send_refs_ == 0) return NextResult<T>(nullptr);
return on_full_.pending();
}
has_value_ = false;
return NextResult<T>(RefRecv());
}
void AckNext() {
GPR_DEBUG_ASSERT(value_state_ == ValueState::kReady);
value_state_ = ValueState::kAcked;
on_empty_.Wake();
return std::move(value_);
UnrefRecv();
}
T& value() { return value_; }
const T& value() const { return value_; }
private:
void ResetValue() {
// Fancy dance to move out of value in the off chance that we reclaim some
// memory earlier.
[](T) {}(std::move(value_));
has_value_ = false;
value_state_ = ValueState::kEmpty;
}
// State of value_.
enum class ValueState : uint8_t {
// No value is set, it's possible to send.
kEmpty,
// Value has been pushed but not acked, it's possible to receive.
kReady,
// Value has been received and acked, we can unblock senders and transition
// to empty.
kAcked,
};
static const char* ValueStateName(ValueState state) {
switch (state) {
case ValueState::kEmpty:
return "kEmpty";
case ValueState::kReady:
return "kReady";
case ValueState::kAcked:
return "kAcked";
}
GPR_UNREACHABLE_CODE(return "unknown");
}
T value_;
// Number of sending objects.
@ -140,10 +226,10 @@ class Center {
uint8_t send_refs_ : 2;
// Number of receiving objects.
// 0 => recv is closed.
// 1 ref each for PipeReceiver and Next.
// 1 ref each for PipeReceiver, Next, and NextResult.
uint8_t recv_refs_ : 2;
// True iff there is a value in the pipe.
bool has_value_ : 1;
// Current state of the value.
ValueState value_state_ : 2;
IntraActivityWaiter on_empty_;
IntraActivityWaiter on_full_;
};
@ -240,14 +326,26 @@ class Push {
if (center_ != nullptr) center_->UnrefSend();
}
Poll<bool> operator()() { return center_->Push(&push_); }
Poll<bool> operator()() {
if (push_.has_value()) {
auto r = center_->Push(&*push_);
if (auto* ok = absl::get_if<bool>(&r)) {
push_.reset();
if (!*ok) return false;
} else {
return Pending{};
}
}
GPR_DEBUG_ASSERT(!push_.has_value());
return center_->PollAck();
}
private:
friend class PipeSender<T>;
explicit Push(pipe_detail::Center<T>* center, T push)
: center_(center), push_(std::move(push)) {}
Center<T>* center_;
T push_;
absl::optional<T> push_;
};
// Implementation of PipeReceiver::Next promise.
@ -270,7 +368,13 @@ class Next {
if (center_ != nullptr) center_->UnrefRecv();
}
Poll<absl::optional<T>> operator()() { return center_->Next(); }
Poll<NextResult<T>> operator()() {
auto r = center_->Next();
if (!absl::holds_alternative<Pending>(r)) {
std::exchange(center_, nullptr)->UnrefRecv();
}
return r;
}
private:
friend class PipeReceiver<T>;
@ -290,6 +394,31 @@ pipe_detail::Next<T> PipeReceiver<T>::Next() {
return pipe_detail::Next<T>(center_->RefRecv());
}
template <typename T>
bool NextResult<T>::has_value() const {
return center_ != nullptr;
}
template <typename T>
T& NextResult<T>::operator*() {
return center_->value();
}
template <typename T>
const T& NextResult<T>::operator*() const {
return center_->value();
}
template <typename T>
NextResult<T>::~NextResult() {
if (center_ != nullptr) center_->AckNext();
}
template <typename T>
void NextResult<T>::reset() {
if (auto* p = std::exchange(center_, nullptr)) p->AckNext();
}
// A Pipe is an intra-Activity communications channel that transmits T's from
// one end to the other.
// It is only safe to use a Pipe within the context of a single Activity.

@ -14,6 +14,8 @@
#include "src/core/lib/promise/for_each.h"
#include <stdint.h>
#include <memory>
#include "absl/memory/memory.h"

@ -24,12 +24,13 @@
#include "gtest/gtest.h"
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/detail/basic_seq.h"
#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/resource_quota/resource_quota.h"
@ -51,11 +52,14 @@ TEST(PipeTest, CanSendAndReceive) {
Pipe<int> pipe;
return Seq(
// Concurrently: send 42 into the pipe, and receive from the pipe.
Join(pipe.sender.Push(42), pipe.receiver.Next()),
Join(pipe.sender.Push(42),
Map(pipe.receiver.Next(),
[](NextResult<int> r) { return r.value(); })),
// Once complete, verify successful sending and the received value
// is 42.
[](std::tuple<bool, absl::optional<int>> result) {
EXPECT_EQ(result, std::make_tuple(true, absl::optional<int>(42)));
[](std::tuple<bool, int> result) {
EXPECT_TRUE(std::get<0>(result));
EXPECT_EQ(42, std::get<1>(result));
return absl::OkStatus();
});
},
@ -72,11 +76,14 @@ TEST(PipeTest, CanReceiveAndSend) {
Pipe<int> pipe;
return Seq(
// Concurrently: receive from the pipe, and send 42 into the pipe.
Join(pipe.receiver.Next(), pipe.sender.Push(42)),
Join(Map(pipe.receiver.Next(),
[](NextResult<int> r) { return r.value(); }),
pipe.sender.Push(42)),
// Once complete, verify the received value is 42 and successful
// sending.
[](std::tuple<absl::optional<int>, bool> result) {
EXPECT_EQ(result, std::make_tuple(absl::optional<int>(42), true));
[](std::tuple<int, bool> result) {
EXPECT_EQ(std::get<0>(result), 42);
EXPECT_TRUE(std::get<1>(result));
return absl::OkStatus();
});
},
@ -92,14 +99,12 @@ TEST(PipeTest, CanSeeClosedOnSend) {
[] {
Pipe<int> pipe;
auto sender = std::move(pipe.sender);
// Push 42 onto the pipe - this will the pipe's one-deep send buffer.
EXPECT_TRUE(NowOrNever(sender.Push(42)).has_value());
auto receiver = std::make_shared<std::unique_ptr<PipeReceiver<int>>>(
absl::make_unique<PipeReceiver<int>>(std::move(pipe.receiver)));
return Seq(
// Concurrently:
// - push 43 into the sender, which will stall because the buffer is
// full
// - push 43 into the sender, which will stall because there is no
// reader
// - and close the receiver, which will fail the pending send.
Join(sender.Push(43),
[receiver] {
@ -107,7 +112,7 @@ TEST(PipeTest, CanSeeClosedOnSend) {
return absl::OkStatus();
}),
// Verify both that the send failed and that we executed the close.
[](std::tuple<bool, absl::Status> result) {
[](const std::tuple<bool, absl::Status>& result) {
EXPECT_EQ(result, std::make_tuple(false, absl::OkStatus()));
return absl::OkStatus();
});
@ -138,9 +143,9 @@ TEST(PipeTest, CanSeeClosedOnReceive) {
return absl::OkStatus();
}),
// Verify we received end-of-stream and closed the sender.
[](std::tuple<absl::optional<int>, absl::Status> result) {
EXPECT_EQ(result, std::make_tuple(absl::optional<int>(),
absl::OkStatus()));
[](std::tuple<NextResult<int>, absl::Status> result) {
EXPECT_FALSE(std::get<0>(result).has_value());
EXPECT_EQ(std::get<1>(result), absl::OkStatus());
return absl::OkStatus();
});
},
@ -149,9 +154,60 @@ TEST(PipeTest, CanSeeClosedOnReceive) {
MakeScopedArena(1024, g_memory_allocator));
}
TEST(PipeTest, CanFlowControlThroughManyStages) {
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
auto done = std::make_shared<bool>(false);
// Push a value through multiple pipes.
// Ensure that it's possible to do so and get flow control throughout the
// entire pipe: ie that the push down does not complete until the last pipe
// completes.
MakeActivity(
[done] {
Pipe<int> pipe1;
Pipe<int> pipe2;
Pipe<int> pipe3;
auto sender1 = std::move(pipe1.sender);
auto receiver1 = std::move(pipe1.receiver);
auto sender2 = std::move(pipe2.sender);
auto receiver2 = std::move(pipe2.receiver);
auto sender3 = std::move(pipe3.sender);
auto receiver3 = std::move(pipe3.receiver);
return Seq(
Join(Seq(sender1.Push(1),
[done] {
*done = true;
return 1;
}),
Seq(receiver1.Next(),
[sender2 = std::move(sender2)](NextResult<int> r) mutable {
return sender2.Push(r.value());
}),
Seq(receiver2.Next(),
[sender3 = std::move(sender3)](NextResult<int> r) mutable {
return sender3.Push(r.value());
}),
Seq(receiver3.Next(),
[done](NextResult<int> r) {
EXPECT_EQ(r.value(), 1);
EXPECT_FALSE(*done);
return 2;
})),
[](std::tuple<int, bool, bool, int> result) {
EXPECT_EQ(result, std::make_tuple(1, true, true, 2));
return absl::OkStatus();
});
},
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); },
MakeScopedArena(1024, g_memory_allocator));
ASSERT_TRUE(*done);
}
} // namespace grpc_core
int main(int argc, char** argv) {
gpr_log_verbosity_init();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

Loading…
Cancel
Save