From daf6220d4244fead8d2992aa4f8a2c5e42ebb45d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 4 Mar 2024 17:14:03 +0000 Subject: [PATCH] improvements --- src/core/lib/promise/party.cc | 28 +++++++++++++++++----------- src/core/lib/promise/party.h | 1 - test/core/promise/party_test.cc | 27 ++++++++++++++++++++++++--- 3 files changed, 41 insertions(+), 15 deletions(-) diff --git a/src/core/lib/promise/party.cc b/src/core/lib/promise/party.cc index 77e06b6ec49..59006a0490c 100644 --- a/src/core/lib/promise/party.cc +++ b/src/core/lib/promise/party.cc @@ -39,7 +39,7 @@ namespace grpc_core { namespace { // TODO(ctiller): Once all activities are parties we can remove this. -thread_local Party* g_current_party_ = nullptr; +thread_local Party** g_current_party_run_next = nullptr; } // namespace /////////////////////////////////////////////////////////////////////////////// @@ -219,21 +219,27 @@ void Party::RunLocked() { // but instead add it to the end of the list of parties to run. // This enables a fairly straightforward batching of work from a // call to a transport (or back again). - if (g_current_party_ != nullptr) { - Party* after = g_current_party_; - while (after->run_next_ != nullptr) { - after = after->run_next_; + if (g_current_party_run_next != nullptr) { + if (*g_current_party_run_next == nullptr) { + *g_current_party_run_next = this; + } else { + // But if there's already a party queued, we're better off asking event + // engine to run it so we can spread load. + event_engine()->Run([this]() { + ApplicationCallbackExecCtx app_exec_ctx; + ExecCtx exec_ctx; + RunLocked(); + }); } - after->run_next_ = this; return; } auto body = [this]() { - GPR_DEBUG_ASSERT(g_current_party_ == nullptr); - g_current_party_ = this; + GPR_DEBUG_ASSERT(g_current_party_run_next == nullptr); + Party* run_next = nullptr; + g_current_party_run_next = &run_next; const bool done = RunParty(); - GPR_DEBUG_ASSERT(g_current_party_ == this); - Party* run_next = std::exchange(run_next_, nullptr); - g_current_party_ = nullptr; + GPR_DEBUG_ASSERT(g_current_party_run_next == &run_next); + g_current_party_run_next = nullptr; if (done) { ScopedActivity activity(this); PartyOver(); diff --git a/src/core/lib/promise/party.h b/src/core/lib/promise/party.h index 7a4bcab14dc..76c76f22990 100644 --- a/src/core/lib/promise/party.h +++ b/src/core/lib/promise/party.h @@ -628,7 +628,6 @@ class Party : public Activity, private Wakeable { Arena* const arena_; uint8_t currently_polling_ = kNotPolling; - Party* run_next_ = nullptr; // All current participants, using a tagged format. // If the lower bit is unset, then this is a Participant*. // If the lower bit is set, then this is a ParticipantFactory*. diff --git a/test/core/promise/party_test.cc b/test/core/promise/party_test.cc index de9c88ce46b..02a93a038f3 100644 --- a/test/core/promise/party_test.cc +++ b/test/core/promise/party_test.cc @@ -788,8 +788,12 @@ TEST_F(PartyTest, ThreadStressTestWithInnerSpawn) { TEST_F(PartyTest, NestedWakeup) { auto party1 = MakeRefCounted(); auto party2 = MakeRefCounted(); + auto party3 = MakeRefCounted(); int whats_going_on = 0; - Notification n; + Notification started2; + Notification done2; + Notification started3; + Notification notify_done; party1->Spawn( "p1", [&]() { @@ -798,6 +802,8 @@ TEST_F(PartyTest, NestedWakeup) { party2->Spawn( "p2", [&]() { + started2.Notify(); + started3.WaitForNotification(); EXPECT_EQ(whats_going_on, 3); whats_going_on = 4; return Empty{}; @@ -805,7 +811,22 @@ TEST_F(PartyTest, NestedWakeup) { [&](Empty) { EXPECT_EQ(whats_going_on, 4); whats_going_on = 5; - n.Notify(); + done2.Notify(); + }); + party3->Spawn( + "p3", + [&]() { + started2.WaitForNotification(); + started3.Notify(); + done2.WaitForNotification(); + EXPECT_EQ(whats_going_on, 5); + whats_going_on = 6; + return Empty{}; + }, + [&](Empty) { + EXPECT_EQ(whats_going_on, 6); + whats_going_on = 7; + notify_done.Notify(); }); EXPECT_EQ(whats_going_on, 1); whats_going_on = 2; @@ -815,7 +836,7 @@ TEST_F(PartyTest, NestedWakeup) { EXPECT_EQ(whats_going_on, 2); whats_going_on = 3; }); - n.WaitForNotification(); + notify_done.WaitForNotification(); } } // namespace grpc_core