[promise] Add a facility to delay promise execution until after the current poll (#35413)

Mirrors what we had with combiner, but allows it to occur at arbitrary points.

We'll use this in chaotic-good to:
1. combine fragments into a single frame
2. combine writes from different calls into a single syscall

Closes #35413

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35413 from ctiller:group-on 9f20f34523
PiperOrigin-RevId: 596004767
pull/35331/head
Craig Tiller 11 months ago committed by Copybara-Service
parent a3e24ed692
commit f82f8966f0
  1. 64
      src/core/lib/promise/party.cc
  2. 45
      src/core/lib/promise/party.h
  3. 48
      test/core/promise/party_test.cc

@ -227,40 +227,42 @@ void Party::RunLocked() {
bool Party::RunParty() {
ScopedActivity activity(this);
promise_detail::Context<Arena> arena_ctx(arena_);
return sync_.RunParty([this](int i) {
// If the participant is null, skip.
// This allows participants to complete whilst wakers still exist
// somewhere.
auto* participant = participants_[i].load(std::memory_order_acquire);
if (participant == nullptr) {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG, "%s[party] wakeup %d already complete",
DebugTag().c_str(), i);
}
return false;
}
absl::string_view name;
return sync_.RunParty([this](int i) { return RunOneParticipant(i); });
}
bool Party::RunOneParticipant(int i) {
// If the participant is null, skip.
// This allows participants to complete whilst wakers still exist
// somewhere.
auto* participant = participants_[i].load(std::memory_order_acquire);
if (participant == nullptr) {
if (grpc_trace_promise_primitives.enabled()) {
name = participant->name();
gpr_log(GPR_DEBUG, "%s[%s] begin job %d", DebugTag().c_str(),
std::string(name).c_str(), i);
gpr_log(GPR_DEBUG, "%s[party] wakeup %d already complete",
DebugTag().c_str(), i);
}
// Poll the participant.
currently_polling_ = i;
bool done = participant->PollParticipantPromise();
currently_polling_ = kNotPolling;
if (done) {
if (!name.empty()) {
gpr_log(GPR_DEBUG, "%s[%s] end poll and finish job %d",
DebugTag().c_str(), std::string(name).c_str(), i);
}
participants_[i].store(nullptr, std::memory_order_relaxed);
} else if (!name.empty()) {
gpr_log(GPR_DEBUG, "%s[%s] end poll", DebugTag().c_str(),
std::string(name).c_str());
return false;
}
absl::string_view name;
if (grpc_trace_promise_primitives.enabled()) {
name = participant->name();
gpr_log(GPR_DEBUG, "%s[%s] begin job %d", DebugTag().c_str(),
std::string(name).c_str(), i);
}
// Poll the participant.
currently_polling_ = i;
bool done = participant->PollParticipantPromise();
currently_polling_ = kNotPolling;
if (done) {
if (!name.empty()) {
gpr_log(GPR_DEBUG, "%s[%s] end poll and finish job %d",
DebugTag().c_str(), std::string(name).c_str(), i);
}
return done;
});
participants_[i].store(nullptr, std::memory_order_relaxed);
} else if (!name.empty()) {
gpr_log(GPR_DEBUG, "%s[%s] end poll", DebugTag().c_str(),
std::string(name).c_str());
}
return done;
}
void Party::AddParticipants(Participant** participants, size_t count) {

@ -102,7 +102,8 @@ class PartySyncUsingAtomics {
template <typename F>
GRPC_MUST_USE_RESULT bool RunParty(F poll_one_participant) {
uint64_t prev_state;
do {
iteration_.fetch_add(1, std::memory_order_relaxed);
for (;;) {
// Grab the current state, and clear the wakeup bits & add flag.
prev_state = state_.fetch_and(kRefMask | kLocked | kAllocatedMask,
std::memory_order_acquire);
@ -133,9 +134,23 @@ class PartySyncUsingAtomics {
// TODO(ctiller): consider mitigations for the accidental wakeup on owning
// waker creation case -- I currently expect this will be more expensive
// than this quick loop.
} while (!state_.compare_exchange_weak(
prev_state, (prev_state & (kRefMask | kAllocatedMask)),
std::memory_order_acq_rel, std::memory_order_acquire));
if (wake_after_poll_ == 0) {
if (state_.compare_exchange_weak(
prev_state, (prev_state & (kRefMask | kAllocatedMask)),
std::memory_order_acq_rel, std::memory_order_acquire)) {
return false;
}
} else {
if (state_.compare_exchange_weak(
prev_state,
(prev_state & (kRefMask | kAllocatedMask | kLocked)) |
wake_after_poll_,
std::memory_order_acq_rel, std::memory_order_acquire)) {
iteration_.fetch_add(1, std::memory_order_relaxed);
wake_after_poll_ = 0;
}
}
}
return false;
}
@ -186,6 +201,11 @@ class PartySyncUsingAtomics {
// Returns true if the caller should run the party.
GRPC_MUST_USE_RESULT bool ScheduleWakeup(WakeupMask mask);
void WakeAfterPoll(WakeupMask mask) { wake_after_poll_ |= mask; }
uint32_t iteration() const {
return iteration_.load(std::memory_order_relaxed);
}
private:
bool UnreffedLast();
@ -225,6 +245,8 @@ class PartySyncUsingAtomics {
static constexpr uint64_t kOneRef = 1ull << kRefShift;
std::atomic<uint64_t> state_;
std::atomic<uint32_t> iteration_{0};
WakeupMask wake_after_poll_ = 0;
};
class PartySyncUsingMutex {
@ -358,6 +380,20 @@ class Party : public Activity, private Wakeable {
Arena* arena() const { return arena_; }
// Return a promise that resolves to Empty{} when the current party poll is
// complete.
// This is useful for implementing batching and the like: we can hold some
// action until the rest of the party resolves itself.
auto AfterCurrentPoll() {
GPR_DEBUG_ASSERT(Activity::current() == this);
sync_.WakeAfterPoll(CurrentParticipant());
return [this, iteration = sync_.iteration()]() -> Poll<Empty> {
GPR_DEBUG_ASSERT(Activity::current() == this);
if (iteration == sync_.iteration()) return Pending{};
return Empty{};
};
}
class BulkSpawner {
public:
explicit BulkSpawner(Party* party) : party_(party) {}
@ -548,6 +584,7 @@ class Party : public Activity, private Wakeable {
// Add a participant (backs Spawn, after type erasure to ParticipantFactory).
void AddParticipants(Participant** participant, size_t count);
bool RunOneParticipant(int i);
virtual grpc_event_engine::experimental::EventEngine* event_engine()
const = 0;

@ -490,6 +490,54 @@ TEST_F(PartyTest, CanBulkSpawn) {
n2.WaitForNotification();
}
TEST_F(PartyTest, AfterCurrentPollWorks) {
auto party = MakeRefCounted<TestParty>();
Notification n;
int state = 0;
{
Party::BulkSpawner spawner(party.get());
// BulkSpawner will schedule and poll this promise first, but the
// `AfterCurrentPoll` will pause it.
// Then spawn1, spawn2, and spawn3 will run in order (with EXPECT_EQ checks
// demonstrating this), at which point the poll will complete, causing
// spawn_final to be awoken and scheduled and see the final state.
spawner.Spawn(
"spawn_final",
[&state, &party]() {
return Seq(party->AfterCurrentPoll(), [&state]() {
EXPECT_EQ(state, 3);
return Empty{};
});
},
[&n](Empty) { n.Notify(); });
spawner.Spawn(
"spawn1",
[&state]() {
EXPECT_EQ(state, 0);
state = 1;
return Empty{};
},
[](Empty) {});
spawner.Spawn(
"spawn2",
[&state]() {
EXPECT_EQ(state, 1);
state = 2;
return Empty{};
},
[](Empty) {});
spawner.Spawn(
"spawn3",
[&state]() {
EXPECT_EQ(state, 2);
state = 3;
return Empty{};
},
[](Empty) {});
}
n.WaitForNotification();
}
TEST_F(PartyTest, ThreadStressTest) {
auto party = MakeRefCounted<TestParty>();
std::vector<std::thread> threads;

Loading…
Cancel
Save