From d5fa1814b6b2b8ad755efdb5b8b35b184d320f3d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 26 Jun 2024 10:23:49 -0700 Subject: [PATCH] [party] Eliminate the `AfterCurrentPoll` mechanism (#37036) We're not using this right now and so let's eliminate it until we've got a solid use-case. `bm_party` results -- before: ``` --------------------------------------------------------------- Benchmark Time CPU Iterations --------------------------------------------------------------- BM_PartyCreate 39.4 ns 39.4 ns 106472775 BM_AddParticipant 44.2 ns 44.2 ns 94802899 BM_WakeupParticipant 17.4 ns 17.4 ns 241990117 ``` and after: ``` --------------------------------------------------------------- Benchmark Time CPU Iterations --------------------------------------------------------------- BM_PartyCreate 37.6 ns 37.6 ns 111332125 BM_AddParticipant 40.1 ns 40.1 ns 104740937 BM_WakeupParticipant 17.3 ns 17.3 ns 242484270 ``` Closes #37036 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37036 from ctiller:no-after 42f396900fe125725cad5da0c8ce146f5c192a38 PiperOrigin-RevId: 646994419 --- src/core/lib/promise/party.h | 47 +++++--------------------------- test/core/promise/party_test.cc | 48 --------------------------------- 2 files changed, 6 insertions(+), 89 deletions(-) diff --git a/src/core/lib/promise/party.h b/src/core/lib/promise/party.h index ce953f5468d..012e6dcfba9 100644 --- a/src/core/lib/promise/party.h +++ b/src/core/lib/promise/party.h @@ -108,7 +108,6 @@ class PartySyncUsingAtomics { template GRPC_MUST_USE_RESULT bool RunParty(F poll_one_participant) { uint64_t prev_state; - iteration_.fetch_add(1, std::memory_order_relaxed); for (;;) { // Grab the current state, and clear the wakeup bits & add flag. prev_state = state_.fetch_and(kRefMask | kLocked | kAllocatedMask, @@ -145,25 +144,12 @@ class PartySyncUsingAtomics { // TODO(ctiller): consider mitigations for the accidental wakeup on owning // waker creation case -- I currently expect this will be more expensive // than this quick loop. - if (wake_after_poll_ == 0) { - if (state_.compare_exchange_weak( - prev_state, (prev_state & (kRefMask | kAllocatedMask)), - std::memory_order_acq_rel, std::memory_order_acquire)) { - LogStateChange("Run:End", prev_state, - prev_state & (kRefMask | kAllocatedMask)); - return false; - } - } else { - if (state_.compare_exchange_weak( - prev_state, - (prev_state & (kRefMask | kAllocatedMask | kLocked)) | - wake_after_poll_, - std::memory_order_acq_rel, std::memory_order_acquire)) { - LogStateChange("Run:EndIteration", prev_state, - prev_state & (kRefMask | kAllocatedMask)); - iteration_.fetch_add(1, std::memory_order_relaxed); - wake_after_poll_ = 0; - } + if (state_.compare_exchange_weak( + prev_state, (prev_state & (kRefMask | kAllocatedMask)), + std::memory_order_acq_rel, std::memory_order_acquire)) { + LogStateChange("Run:End", prev_state, + prev_state & (kRefMask | kAllocatedMask)); + return false; } } return false; @@ -220,11 +206,6 @@ class PartySyncUsingAtomics { // Returns true if the caller should run the party. GRPC_MUST_USE_RESULT bool ScheduleWakeup(WakeupMask mask); - void WakeAfterPoll(WakeupMask mask) { wake_after_poll_ |= mask; } - uint32_t iteration() const { - return iteration_.load(std::memory_order_relaxed); - } - bool has_participants() const { return (state_.load(std::memory_order_relaxed) & kAllocatedMask) != 0; } @@ -277,8 +258,6 @@ class PartySyncUsingAtomics { static constexpr uint64_t kOneRef = 1ull << kRefShift; std::atomic state_; - std::atomic iteration_{0}; - WakeupMask wake_after_poll_ = 0; }; class PartySyncUsingMutex { @@ -422,20 +401,6 @@ class Party : public Activity, private Wakeable { Arena* arena() { return arena_.get(); } - // Return a promise that resolves to Empty{} when the current party poll is - // complete. - // This is useful for implementing batching and the like: we can hold some - // action until the rest of the party resolves itself. - auto AfterCurrentPoll() { - DCHECK(GetContext() == this); - sync_.WakeAfterPoll(CurrentParticipant()); - return [this, iteration = sync_.iteration()]() -> Poll { - DCHECK(GetContext() == this); - if (iteration == sync_.iteration()) return Pending{}; - return Empty{}; - }; - } - class BulkSpawner { public: explicit BulkSpawner(Party* party) : party_(party) {} diff --git a/test/core/promise/party_test.cc b/test/core/promise/party_test.cc index 6f0212596c7..110377d4982 100644 --- a/test/core/promise/party_test.cc +++ b/test/core/promise/party_test.cc @@ -451,54 +451,6 @@ TEST_F(PartyTest, CanBulkSpawn) { n2.WaitForNotification(); } -TEST_F(PartyTest, AfterCurrentPollWorks) { - auto party = MakeParty(); - Notification n; - int state = 0; - { - Party::BulkSpawner spawner(party.get()); - // BulkSpawner will schedule and poll this promise first, but the - // `AfterCurrentPoll` will pause it. - // Then spawn1, spawn2, and spawn3 will run in order (with EXPECT_EQ checks - // demonstrating this), at which point the poll will complete, causing - // spawn_final to be awoken and scheduled and see the final state. - spawner.Spawn( - "spawn_final", - [&state, &party]() { - return Seq(party->AfterCurrentPoll(), [&state]() { - EXPECT_EQ(state, 3); - return Empty{}; - }); - }, - [&n](Empty) { n.Notify(); }); - spawner.Spawn( - "spawn1", - [&state]() { - EXPECT_EQ(state, 0); - state = 1; - return Empty{}; - }, - [](Empty) {}); - spawner.Spawn( - "spawn2", - [&state]() { - EXPECT_EQ(state, 1); - state = 2; - return Empty{}; - }, - [](Empty) {}); - spawner.Spawn( - "spawn3", - [&state]() { - EXPECT_EQ(state, 2); - state = 3; - return Empty{}; - }, - [](Empty) {}); - } - n.WaitForNotification(); -} - TEST_F(PartyTest, ThreadStressTest) { auto party = MakeParty(); std::vector threads;