[party] Auto-batch cross party wakeups (#36048)

If party1 wakes party2 we'd prefer that party1 finishes all its activations before it wakes party2:
intuition: party1 might wake party2 for other things too, and being able to coalesce them all into one wakeup significantly helps performance.

Closes #36048

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36048 from ctiller:wakey f6d4416685
PiperOrigin-RevId: 616970833
pull/36146/head
Craig Tiller 9 months ago committed by Copybara-Service
parent 669493b0b7
commit 15d64cf9c9
  1. 34
      src/core/lib/promise/party.cc
  2. 57
      test/core/promise/party_test.cc

@ -37,6 +37,11 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_party_state(false, "party_state");
namespace grpc_core {
namespace {
// TODO(ctiller): Once all activities are parties we can remove this.
thread_local Party** g_current_party_run_next = nullptr;
} // namespace
///////////////////////////////////////////////////////////////////////////////
// PartySyncUsingAtomics
@ -210,11 +215,38 @@ void Party::ForceImmediateRepoll(WakeupMask mask) {
}
void Party::RunLocked() {
// If there is a party running, then we don't run it immediately
// but instead add it to the end of the list of parties to run.
// This enables a fairly straightforward batching of work from a
// call to a transport (or back again).
if (g_current_party_run_next != nullptr) {
if (*g_current_party_run_next == nullptr) {
*g_current_party_run_next = this;
} else {
// But if there's already a party queued, we're better off asking event
// engine to run it so we can spread load.
event_engine()->Run([this]() {
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
RunLocked();
});
}
return;
}
auto body = [this]() {
if (RunParty()) {
GPR_DEBUG_ASSERT(g_current_party_run_next == nullptr);
Party* run_next = nullptr;
g_current_party_run_next = &run_next;
const bool done = RunParty();
GPR_DEBUG_ASSERT(g_current_party_run_next == &run_next);
g_current_party_run_next = nullptr;
if (done) {
ScopedActivity activity(this);
PartyOver();
}
if (run_next != nullptr) {
run_next->RunLocked();
}
};
#ifdef GRPC_MAXIMIZE_THREADYNESS
Thread thd(

@ -285,8 +285,7 @@ TEST_F(PartyTest, CanSpawnAndRun) {
"TestSpawn",
[i = 10]() mutable -> Poll<int> {
EXPECT_EQ(GetContext<Activity>()->DebugTag(), "TestParty");
gpr_log(GPR_DEBUG, "i=%d", i);
GPR_ASSERT(i > 0);
EXPECT_GT(i, 0);
GetContext<Activity>()->ForceImmediateRepoll();
--i;
if (i == 0) return 42;
@ -786,6 +785,60 @@ TEST_F(PartyTest, ThreadStressTestWithInnerSpawn) {
}
}
TEST_F(PartyTest, NestedWakeup) {
auto party1 = MakeRefCounted<TestParty>();
auto party2 = MakeRefCounted<TestParty>();
auto party3 = MakeRefCounted<TestParty>();
int whats_going_on = 0;
Notification started2;
Notification done2;
Notification started3;
Notification notify_done;
party1->Spawn(
"p1",
[&]() {
EXPECT_EQ(whats_going_on, 0);
whats_going_on = 1;
party2->Spawn(
"p2",
[&]() {
started2.Notify();
started3.WaitForNotification();
EXPECT_EQ(whats_going_on, 3);
whats_going_on = 4;
return Empty{};
},
[&](Empty) {
EXPECT_EQ(whats_going_on, 4);
whats_going_on = 5;
done2.Notify();
});
party3->Spawn(
"p3",
[&]() {
started2.WaitForNotification();
started3.Notify();
done2.WaitForNotification();
EXPECT_EQ(whats_going_on, 5);
whats_going_on = 6;
return Empty{};
},
[&](Empty) {
EXPECT_EQ(whats_going_on, 6);
whats_going_on = 7;
notify_done.Notify();
});
EXPECT_EQ(whats_going_on, 1);
whats_going_on = 2;
return Empty{};
},
[&](Empty) {
EXPECT_EQ(whats_going_on, 2);
whats_going_on = 3;
});
notify_done.WaitForNotification();
}
} // namespace grpc_core
int main(int argc, char** argv) {

Loading…
Cancel
Save