[party] Fix some edge cases in wakeup batching (#37072)

1. fix some cases where we were missing opportunities to batch wakeups
2. eliminate recursion when switching parties post-batching (makes profiles significantly easier to read, saves a few cycles)

Closes #37072

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37072 from ctiller:rolling-party 9b56542b95
PiperOrigin-RevId: 648427987
pull/37076/head^2
Craig Tiller 8 months ago committed by Copybara-Service
parent 5ff56f6bcf
commit 55cc0af10e
  1. 97
      src/core/lib/promise/party.cc
  2. 2
      src/core/lib/promise/party.h

@ -36,11 +36,6 @@
namespace grpc_core {
namespace {
// TODO(ctiller): Once all activities are parties we can remove this.
thread_local Party** g_current_party_run_next = nullptr;
} // namespace
///////////////////////////////////////////////////////////////////////////////
// PartySyncUsingAtomics
@ -214,53 +209,67 @@ void Party::ForceImmediateRepoll(WakeupMask mask) {
sync_.ForceImmediateRepoll(mask);
}
void Party::RunLocked() {
void Party::RunLocked(Party* party) {
GRPC_LATENT_SEE_PARENT_SCOPE("Party::RunLocked");
#ifdef GRPC_MAXIMIZE_THREADYNESS
Thread thd(
"RunParty",
[party]() {
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
if (party->RunParty()) party->PartyIsOver();
},
nullptr, Thread::Options().set_joinable(false));
thd.Start();
#else
struct RunState;
static thread_local RunState* g_run_state = nullptr;
struct RunState {
explicit RunState(Party* party) : running(party), next(nullptr) {}
Party* running;
Party* next;
void Run() {
g_run_state = this;
do {
GRPC_LATENT_SEE_INNER_SCOPE("run_one_party");
if (running->RunParty()) {
running->PartyIsOver();
}
running = std::exchange(next, nullptr);
} while (running != nullptr);
DCHECK(g_run_state == this);
g_run_state = nullptr;
}
};
// If there is a party running, then we don't run it immediately
// but instead add it to the end of the list of parties to run.
// This enables a fairly straightforward batching of work from a
// call to a transport (or back again).
if (g_current_party_run_next != nullptr) {
if (*g_current_party_run_next == nullptr) {
*g_current_party_run_next = this;
} else {
// But if there's already a party queued, we're better off asking event
// engine to run it so we can spread load.
arena_->GetContext<grpc_event_engine::experimental::EventEngine>()->Run(
[this]() {
if (g_run_state != nullptr) {
if (g_run_state->running == party || g_run_state->next == party) {
// Already running or already queued.
return;
}
if (g_run_state->next != nullptr) {
// If there's already a different party queued, we're better off asking
// event engine to run it so we can spread load.
// We swap the oldest party to run on the event engine so that we don't
// accidentally end up with a tail latency problem whereby one party
// gets held for a really long time.
std::swap(g_run_state->next, party);
party->arena_->GetContext<grpc_event_engine::experimental::EventEngine>()
->Run([party]() {
GRPC_LATENT_SEE_PARENT_SCOPE("Party::RunLocked offload");
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
RunLocked();
RunState{party}.Run();
});
return;
}
g_run_state->next = party;
return;
}
auto body = [this]() {
DCHECK_EQ(g_current_party_run_next, nullptr);
Party* run_next = nullptr;
g_current_party_run_next = &run_next;
const bool done = RunParty();
DCHECK(g_current_party_run_next == &run_next);
g_current_party_run_next = nullptr;
if (done) {
PartyIsOver();
}
if (run_next != nullptr) {
run_next->RunLocked();
}
};
#ifdef GRPC_MAXIMIZE_THREADYNESS
Thread thd(
"RunParty",
[body]() {
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
body();
},
nullptr, Thread::Options().set_joinable(false));
thd.Start();
#else
body();
RunState{party}.Run();
#endif
}
@ -320,12 +329,12 @@ void Party::AddParticipants(Participant** participants, size_t count) {
participants_[slots[i]].store(participants[i], std::memory_order_release);
}
});
if (run_party) RunLocked();
if (run_party) RunLocked(this);
Unref();
}
void Party::Wakeup(WakeupMask wakeup_mask) {
if (sync_.ScheduleWakeup(wakeup_mask)) RunLocked();
if (sync_.ScheduleWakeup(wakeup_mask)) RunLocked(this);
Unref();
}
@ -335,7 +344,7 @@ void Party::WakeupAsync(WakeupMask wakeup_mask) {
[this]() {
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
RunLocked();
RunLocked(this);
Unref();
});
} else {

@ -581,7 +581,7 @@ class Party : public Activity, private Wakeable {
void CancelRemainingParticipants();
// Run the locked part of the party until it is unlocked.
void RunLocked();
static void RunLocked(Party* party);
// Called in response to Unref() hitting zero - ultimately calls PartyOver,
// but needs to set some stuff up.
// Here so it gets compiled out of line.

Loading…
Cancel
Save