[party] Eliminate the `AfterCurrentPoll` mechanism (#37036)

We're not using this right now and so let's eliminate it until we've got a solid use-case.

`bm_party` results -- before:
```
---------------------------------------------------------------
Benchmark                     Time             CPU   Iterations
---------------------------------------------------------------
BM_PartyCreate             39.4 ns         39.4 ns    106472775
BM_AddParticipant          44.2 ns         44.2 ns     94802899
BM_WakeupParticipant       17.4 ns         17.4 ns    241990117
```
and after:
```
---------------------------------------------------------------
Benchmark                     Time             CPU   Iterations
---------------------------------------------------------------
BM_PartyCreate             37.6 ns         37.6 ns    111332125
BM_AddParticipant          40.1 ns         40.1 ns    104740937
BM_WakeupParticipant       17.3 ns         17.3 ns    242484270
```

Closes #37036

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37036 from ctiller:no-after 42f396900f
PiperOrigin-RevId: 646994419
pull/37064/head
Craig Tiller 5 months ago committed by Copybara-Service
parent 678ee0a45d
commit d5fa1814b6
  1. 35
      src/core/lib/promise/party.h
  2. 48
      test/core/promise/party_test.cc

@ -108,7 +108,6 @@ class PartySyncUsingAtomics {
template <typename F>
GRPC_MUST_USE_RESULT bool RunParty(F poll_one_participant) {
uint64_t prev_state;
iteration_.fetch_add(1, std::memory_order_relaxed);
for (;;) {
// Grab the current state, and clear the wakeup bits & add flag.
prev_state = state_.fetch_and(kRefMask | kLocked | kAllocatedMask,
@ -145,7 +144,6 @@ class PartySyncUsingAtomics {
// TODO(ctiller): consider mitigations for the accidental wakeup on owning
// waker creation case -- I currently expect this will be more expensive
// than this quick loop.
if (wake_after_poll_ == 0) {
if (state_.compare_exchange_weak(
prev_state, (prev_state & (kRefMask | kAllocatedMask)),
std::memory_order_acq_rel, std::memory_order_acquire)) {
@ -153,18 +151,6 @@ class PartySyncUsingAtomics {
prev_state & (kRefMask | kAllocatedMask));
return false;
}
} else {
if (state_.compare_exchange_weak(
prev_state,
(prev_state & (kRefMask | kAllocatedMask | kLocked)) |
wake_after_poll_,
std::memory_order_acq_rel, std::memory_order_acquire)) {
LogStateChange("Run:EndIteration", prev_state,
prev_state & (kRefMask | kAllocatedMask));
iteration_.fetch_add(1, std::memory_order_relaxed);
wake_after_poll_ = 0;
}
}
}
return false;
}
@ -220,11 +206,6 @@ class PartySyncUsingAtomics {
// Returns true if the caller should run the party.
GRPC_MUST_USE_RESULT bool ScheduleWakeup(WakeupMask mask);
void WakeAfterPoll(WakeupMask mask) { wake_after_poll_ |= mask; }
uint32_t iteration() const {
return iteration_.load(std::memory_order_relaxed);
}
bool has_participants() const {
return (state_.load(std::memory_order_relaxed) & kAllocatedMask) != 0;
}
@ -277,8 +258,6 @@ class PartySyncUsingAtomics {
static constexpr uint64_t kOneRef = 1ull << kRefShift;
std::atomic<uint64_t> state_;
std::atomic<uint32_t> iteration_{0};
WakeupMask wake_after_poll_ = 0;
};
class PartySyncUsingMutex {
@ -422,20 +401,6 @@ class Party : public Activity, private Wakeable {
Arena* arena() { return arena_.get(); }
// Return a promise that resolves to Empty{} when the current party poll is
// complete.
// This is useful for implementing batching and the like: we can hold some
// action until the rest of the party resolves itself.
auto AfterCurrentPoll() {
DCHECK(GetContext<Activity>() == this);
sync_.WakeAfterPoll(CurrentParticipant());
return [this, iteration = sync_.iteration()]() -> Poll<Empty> {
DCHECK(GetContext<Activity>() == this);
if (iteration == sync_.iteration()) return Pending{};
return Empty{};
};
}
class BulkSpawner {
public:
explicit BulkSpawner(Party* party) : party_(party) {}

@ -451,54 +451,6 @@ TEST_F(PartyTest, CanBulkSpawn) {
n2.WaitForNotification();
}
TEST_F(PartyTest, AfterCurrentPollWorks) {
auto party = MakeParty();
Notification n;
int state = 0;
{
Party::BulkSpawner spawner(party.get());
// BulkSpawner will schedule and poll this promise first, but the
// `AfterCurrentPoll` will pause it.
// Then spawn1, spawn2, and spawn3 will run in order (with EXPECT_EQ checks
// demonstrating this), at which point the poll will complete, causing
// spawn_final to be awoken and scheduled and see the final state.
spawner.Spawn(
"spawn_final",
[&state, &party]() {
return Seq(party->AfterCurrentPoll(), [&state]() {
EXPECT_EQ(state, 3);
return Empty{};
});
},
[&n](Empty) { n.Notify(); });
spawner.Spawn(
"spawn1",
[&state]() {
EXPECT_EQ(state, 0);
state = 1;
return Empty{};
},
[](Empty) {});
spawner.Spawn(
"spawn2",
[&state]() {
EXPECT_EQ(state, 1);
state = 2;
return Empty{};
},
[](Empty) {});
spawner.Spawn(
"spawn3",
[&state]() {
EXPECT_EQ(state, 2);
state = 3;
return Empty{};
},
[](Empty) {});
}
n.WaitForNotification();
}
TEST_F(PartyTest, ThreadStressTest) {
auto party = MakeParty();
std::vector<std::thread> threads;

Loading…
Cancel
Save