[party] Make it faster (#37132)

Notes:
* Adds a single participant `AddParticipant` variant for this common case (per #37056 which I'm abandoning)
* Folds the `PartySyncUsingAtomics` class back into `Party`, removes the `PartySyncUsingMutex` class
* Leverages this integration to find places where we're doing repeated CAS operations and folds them into single operations (for example an unlock/unref pair can be folded into a single CAS)
* Also lowers some `CHECK` statements into `DCHECK` - which I think is appropriate given the performance sensitivity of this code
* Adds code to deal with overflowing the number of participants added to a party -- for now we do a busy add by queuing to event engine and retrying -- this has the advantage of not adding cost to the normal path, but has the slightly worrying disadvantage of effectively being a busy poll. My expectation is that this will be ok in general (the condition clears very quickly), but if not we'll modify this to be a linked list of pending actions and take a hit on the fast path.
* Simplifies `PartyIsOver` (per #37113 which I'm abandoning)
* Keeps a per-object wakeup cache (`wakeup_mask_`) that is protected by the lock bit -- this allows waking up a participant during polling without resorting to an extra atomic operation - significantly speeding that wakeup path (17ns --> 6ns)

Before:
```
----------------------------------------------------------------------
Benchmark                            Time             CPU   Iterations
----------------------------------------------------------------------
BM_PartyCreate                     142 ns          142 ns     44952269
BM_PartyCreate                    73.8 ns         73.8 ns     44952269
BM_PartyCreate                    72.6 ns         72.6 ns     44952269
BM_PartyCreate                    72.5 ns         72.5 ns     44952269
BM_PartyCreate                    72.4 ns         72.4 ns     44952269
BM_PartyCreate                    72.5 ns         72.5 ns     44952269
BM_PartyCreate                    72.5 ns         72.5 ns     44952269
BM_PartyCreate                    72.6 ns         72.6 ns     44952269
BM_PartyCreate                    72.2 ns         72.2 ns     44952269
BM_PartyCreate                    72.5 ns         72.5 ns     44952269
BM_PartyCreate_mean               79.5 ns         79.5 ns           10
BM_PartyCreate_median             72.5 ns         72.5 ns           10
BM_PartyCreate_stddev             21.8 ns         21.8 ns           10
BM_PartyCreate_cv                27.46 %         27.46 %            10
BM_AddParticipant                 35.3 ns         35.3 ns    197041251
BM_AddParticipant                 35.3 ns         35.3 ns    197041251
BM_AddParticipant                 35.1 ns         35.1 ns    197041251
BM_AddParticipant                 35.4 ns         35.4 ns    197041251
BM_AddParticipant                 35.3 ns         35.3 ns    197041251
BM_AddParticipant                 35.2 ns         35.2 ns    197041251
BM_AddParticipant                 35.9 ns         35.9 ns    197041251
BM_AddParticipant                 36.0 ns         36.0 ns    197041251
BM_AddParticipant                 35.8 ns         35.8 ns    197041251
BM_AddParticipant                 36.0 ns         36.0 ns    197041251
BM_AddParticipant_mean            35.5 ns         35.5 ns           10
BM_AddParticipant_median          35.4 ns         35.4 ns           10
BM_AddParticipant_stddev         0.352 ns        0.352 ns           10
BM_AddParticipant_cv              0.99 %          0.99 %            10
BM_WakeupParticipant              17.1 ns         17.1 ns    406116840
BM_WakeupParticipant              16.9 ns         16.9 ns    406116840
BM_WakeupParticipant              16.8 ns         16.8 ns    406116840
BM_WakeupParticipant              16.8 ns         16.8 ns    406116840
BM_WakeupParticipant              16.9 ns         16.9 ns    406116840
BM_WakeupParticipant              16.9 ns         16.9 ns    406116840
BM_WakeupParticipant              17.0 ns         17.0 ns    406116840
BM_WakeupParticipant              17.0 ns         17.0 ns    406116840
BM_WakeupParticipant              16.9 ns         16.9 ns    406116840
BM_WakeupParticipant              17.0 ns         17.0 ns    406116840
BM_WakeupParticipant_mean         16.9 ns         16.9 ns           10
BM_WakeupParticipant_median       16.9 ns         16.9 ns           10
BM_WakeupParticipant_stddev      0.087 ns        0.087 ns           10
BM_WakeupParticipant_cv           0.51 %          0.51 %            10
```

After:
```
----------------------------------------------------------------------
Benchmark                            Time             CPU   Iterations
----------------------------------------------------------------------
BM_PartyCreate                     115 ns          115 ns     29602192
BM_PartyCreate                    56.5 ns         56.5 ns     29602192
BM_PartyCreate                    55.3 ns         55.3 ns     29602192
BM_PartyCreate                    55.9 ns         55.9 ns     29602192
BM_PartyCreate                    55.1 ns         55.1 ns     29602192
BM_PartyCreate                    55.2 ns         55.2 ns     29602192
BM_PartyCreate                    55.2 ns         55.2 ns     29602192
BM_PartyCreate                    56.2 ns         56.2 ns     29602192
BM_PartyCreate                    54.7 ns         54.7 ns     29602192
BM_PartyCreate                    55.8 ns         55.8 ns     29602192
BM_PartyCreate_mean               61.5 ns         61.5 ns           10
BM_PartyCreate_median             55.5 ns         55.5 ns           10
BM_PartyCreate_stddev             18.9 ns         18.9 ns           10
BM_PartyCreate_cv                30.68 %         30.68 %            10
BM_AddParticipant                 26.9 ns         26.9 ns    155407231
BM_AddParticipant                 26.5 ns         26.5 ns    155407231
BM_AddParticipant                 24.8 ns         24.8 ns    155407231
BM_AddParticipant                 24.9 ns         24.9 ns    155407231
BM_AddParticipant                 24.8 ns         24.8 ns    155407231
BM_AddParticipant                 25.3 ns         25.3 ns    155407231
BM_AddParticipant                 25.8 ns         25.8 ns    155407231
BM_AddParticipant                 25.3 ns         25.3 ns    155407231
BM_AddParticipant                 30.8 ns         30.8 ns    155407231
BM_AddParticipant                 27.7 ns         27.7 ns    155407231
BM_AddParticipant_mean            26.3 ns         26.3 ns           10
BM_AddParticipant_median          25.6 ns         25.6 ns           10
BM_AddParticipant_stddev          1.87 ns         1.87 ns           10
BM_AddParticipant_cv              7.11 %          7.10 %            10
BM_WakeupParticipant              6.75 ns         6.75 ns    623459241
BM_WakeupParticipant              6.77 ns         6.77 ns    623459241
BM_WakeupParticipant              6.74 ns         6.74 ns    623459241
BM_WakeupParticipant              6.73 ns         6.73 ns    623459241
BM_WakeupParticipant              6.74 ns         6.74 ns    623459241
BM_WakeupParticipant              6.70 ns         6.70 ns    623459241
BM_WakeupParticipant              6.70 ns         6.69 ns    623459241
BM_WakeupParticipant              6.79 ns         6.79 ns    623459241
BM_WakeupParticipant              6.76 ns         6.76 ns    623459241
BM_WakeupParticipant              6.78 ns         6.78 ns    623459241
BM_WakeupParticipant_mean         6.75 ns         6.75 ns           10
BM_WakeupParticipant_median       6.75 ns         6.75 ns           10
BM_WakeupParticipant_stddev      0.031 ns        0.031 ns           10
BM_WakeupParticipant_cv           0.46 %          0.46 %            10
```

Closes #37132

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37132 from ctiller:nineteen-ninety-nine 336c87bdd6
PiperOrigin-RevId: 656437265
pull/37252/head
Craig Tiller 6 months ago committed by Copybara-Service
parent a2b3704990
commit 7e089a22d7
  1. 346
      src/core/lib/promise/party.cc
  2. 413
      src/core/lib/promise/party.h
  3. 184
      test/core/promise/party_test.cc

@ -15,6 +15,7 @@
#include "src/core/lib/promise/party.h"
#include <atomic>
#include <cstdint>
#include "absl/base/thread_annotations.h"
#include "absl/log/check.h"
@ -39,7 +40,7 @@ namespace grpc_core {
///////////////////////////////////////////////////////////////////////////////
// PartySyncUsingAtomics
GRPC_MUST_USE_RESULT bool PartySyncUsingAtomics::RefIfNonZero() {
GRPC_MUST_USE_RESULT bool Party::RefIfNonZero() {
auto count = state_.load(std::memory_order_relaxed);
do {
// If zero, we are done (without an increment). If not, we must do a CAS
@ -55,33 +56,6 @@ GRPC_MUST_USE_RESULT bool PartySyncUsingAtomics::RefIfNonZero() {
return true;
}
bool PartySyncUsingAtomics::UnreffedLast() {
uint64_t prev_state =
state_.fetch_or(kDestroying | kLocked, std::memory_order_acq_rel);
LogStateChange("UnreffedLast", prev_state,
prev_state | kDestroying | kLocked);
return (prev_state & kLocked) == 0;
}
bool PartySyncUsingAtomics::ScheduleWakeup(WakeupMask mask) {
// Or in the wakeup bit for the participant, AND the locked bit.
uint64_t prev_state = state_.fetch_or((mask & kWakeupMask) | kLocked,
std::memory_order_acq_rel);
LogStateChange("ScheduleWakeup", prev_state,
prev_state | (mask & kWakeupMask) | kLocked);
// If the lock was not held now we hold it, so we need to run.
return ((prev_state & kLocked) == 0);
}
///////////////////////////////////////////////////////////////////////////////
// PartySyncUsingMutex
bool PartySyncUsingMutex::ScheduleWakeup(WakeupMask mask) {
MutexLock lock(&mu_);
wakeups_ |= mask;
return !std::exchange(locked_, true);
}
///////////////////////////////////////////////////////////////////////////////
// Party::Handle
@ -175,7 +149,7 @@ Party::Participant::~Participant() {
Party::~Party() {}
void Party::CancelRemainingParticipants() {
if (!sync_.has_participants()) return;
if ((state_.load(std::memory_order_relaxed) & kAllocatedMask) == 0) return;
ScopedActivity activity(this);
promise_detail::Context<Arena> arena_ctx(arena_.get());
for (size_t i = 0; i < party_detail::kMaxParticipants; i++) {
@ -206,37 +180,42 @@ Waker Party::MakeNonOwningWaker() {
void Party::ForceImmediateRepoll(WakeupMask mask) {
DCHECK(is_current());
sync_.ForceImmediateRepoll(mask);
wakeup_mask_ |= mask;
}
void Party::RunLocked(Party* party) {
void Party::RunLockedAndUnref(Party* party, uint64_t prev_state) {
GRPC_LATENT_SEE_PARENT_SCOPE("Party::RunLocked");
#ifdef GRPC_MAXIMIZE_THREADYNESS
Thread thd(
"RunParty",
[party]() {
[party, prev_state]() {
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
if (party->RunParty()) party->PartyIsOver();
party->RunPartyAndUnref(prev_state);
},
nullptr, Thread::Options().set_joinable(false));
thd.Start();
#else
struct RunState;
static thread_local RunState* g_run_state = nullptr;
struct PartyWakeup {
PartyWakeup() : party{nullptr} {}
PartyWakeup(Party* party, uint64_t prev_state)
: party{party}, prev_state{prev_state} {}
Party* party;
uint64_t prev_state;
};
struct RunState {
explicit RunState(Party* party) : running(party), next(nullptr) {}
Party* running;
Party* next;
void Run() {
explicit RunState(PartyWakeup first) : first{first}, next{} {}
PartyWakeup first;
PartyWakeup next;
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void Run() {
g_run_state = this;
do {
GRPC_LATENT_SEE_INNER_SCOPE("run_one_party");
if (running->RunParty()) {
running->PartyIsOver();
}
running = std::exchange(next, nullptr);
} while (running != nullptr);
first.party->RunPartyAndUnref(first.prev_state);
first = std::exchange(next, PartyWakeup{});
} while (first.party != nullptr);
DCHECK(g_run_state == this);
g_run_state = nullptr;
}
@ -245,119 +224,258 @@ void Party::RunLocked(Party* party) {
// but instead add it to the end of the list of parties to run.
// This enables a fairly straightforward batching of work from a
// call to a transport (or back again).
if (g_run_state != nullptr) {
if (g_run_state->running == party || g_run_state->next == party) {
// Already running or already queued.
if (GPR_UNLIKELY(g_run_state != nullptr)) {
if (g_run_state->first.party == party) {
g_run_state->first.prev_state = prev_state;
party->Unref();
return;
}
if (g_run_state->next != nullptr) {
if (g_run_state->next.party == party) {
g_run_state->next.prev_state = prev_state;
party->Unref();
return;
}
if (g_run_state->next.party != nullptr) {
// If there's already a different party queued, we're better off asking
// event engine to run it so we can spread load.
// We swap the oldest party to run on the event engine so that we don't
// accidentally end up with a tail latency problem whereby one party
// gets held for a really long time.
std::swap(g_run_state->next, party);
auto wakeup =
std::exchange(g_run_state->next, PartyWakeup{party, prev_state});
party->arena_->GetContext<grpc_event_engine::experimental::EventEngine>()
->Run([party]() {
->Run([wakeup]() {
GRPC_LATENT_SEE_PARENT_SCOPE("Party::RunLocked offload");
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
RunState{party}.Run();
RunState{wakeup}.Run();
});
return;
}
g_run_state->next = party;
g_run_state->next = PartyWakeup{party, prev_state};
return;
}
RunState{party}.Run();
RunState{{party, prev_state}}.Run();
#endif
}
bool Party::RunParty() {
void Party::RunPartyAndUnref(uint64_t prev_state) {
ScopedActivity activity(this);
promise_detail::Context<Arena> arena_ctx(arena_.get());
return sync_.RunParty([this](int i) { return RunOneParticipant(i); });
}
bool Party::RunOneParticipant(int i) {
GRPC_LATENT_SEE_INNER_SCOPE("Party::RunOneParticipant");
// If the participant is null, skip.
// This allows participants to complete whilst wakers still exist
// somewhere.
auto* participant = participants_[i].load(std::memory_order_acquire);
if (participant == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
LOG(INFO) << DebugTag() << "[party] wakeup " << i << " already complete";
DCHECK_EQ(prev_state & kLocked, 0u)
<< "Party should be unlocked prior to first wakeup";
DCHECK_GE(prev_state & kRefMask, kOneRef);
// Now update prev_state to be what we want the CAS to see below.
DCHECK_EQ(prev_state & ~(kRefMask | kAllocatedMask), 0u)
<< "Party should have contained no wakeups on lock";
prev_state |= kLocked;
for (;;) {
uint64_t keep_allocated_mask = kAllocatedMask;
// For each wakeup bit...
while (wakeup_mask_ != 0) {
auto wakeup_mask = std::exchange(wakeup_mask_, 0);
while (wakeup_mask != 0) {
const uint64_t t = LowestOneBit(wakeup_mask);
const int i = CountTrailingZeros(t);
wakeup_mask ^= t;
// If the participant is null, skip.
// This allows participants to complete whilst wakers still exist
// somewhere.
auto* participant = participants_[i].load(std::memory_order_acquire);
if (GPR_UNLIKELY(participant == nullptr)) {
GRPC_TRACE_LOG(promise_primitives, INFO)
<< "Party " << this << " Run:Wakeup " << i
<< " already complete";
continue;
}
GRPC_TRACE_LOG(promise_primitives, INFO)
<< "Party " << this << " Run:Wakeup " << i;
// Poll the participant.
currently_polling_ = i;
if (participant->PollParticipantPromise()) {
participants_[i].store(nullptr, std::memory_order_relaxed);
const uint64_t allocated_bit = (1u << i << kAllocatedShift);
keep_allocated_mask &= ~allocated_bit;
}
}
}
return false;
}
absl::string_view name;
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
name = participant->name();
LOG(INFO) << DebugTag() << "[" << name << "] begin job " << i;
}
// Poll the participant.
currently_polling_ = i;
bool done = participant->PollParticipantPromise();
currently_polling_ = kNotPolling;
if (done) {
if (!name.empty()) {
GRPC_TRACE_LOG(promise_primitives, INFO)
<< DebugTag() << "[" << name << "] end poll and finish job " << i;
currently_polling_ = kNotPolling;
// Try to CAS the state we expected to have (with no wakeups or adds)
// back to unlocked (by masking in only the ref mask - sans locked bit).
// If this succeeds then no wakeups were added, no adds were added, and we
// have successfully unlocked.
// Otherwise, we need to loop again.
// Note that if an owning waker is created or the weak cas spuriously
// fails we will also loop again, but in that case see no wakeups or adds
// and so will get back here fairly quickly.
// TODO(ctiller): consider mitigations for the accidental wakeup on owning
// waker creation case -- I currently expect this will be more expensive
// than this quick loop.
if (state_.compare_exchange_weak(
prev_state,
(prev_state & (kRefMask | keep_allocated_mask)) - kOneRef,
std::memory_order_acq_rel, std::memory_order_acquire)) {
LogStateChange("Run:End", prev_state,
prev_state & (kRefMask | kAllocatedMask) - kOneRef);
if ((prev_state & kRefMask) == kOneRef) {
// We're done with the party.
PartyIsOver();
}
return;
}
participants_[i].store(nullptr, std::memory_order_relaxed);
} else if (!name.empty()) {
GRPC_TRACE_LOG(promise_primitives, INFO)
<< DebugTag() << "[" << name << "] end poll";
// CAS out (but retrieve) any allocations and wakeups that occurred during
// the run.
while (!state_.compare_exchange_weak(
prev_state, prev_state & (kRefMask | kLocked | keep_allocated_mask))) {
// Nothing to do here.
}
LogStateChange("Run:Continue", prev_state,
prev_state & (kRefMask | kLocked | keep_allocated_mask));
DCHECK(prev_state & kLocked)
<< "Party should be locked; prev_state=" << prev_state;
DCHECK_GE(prev_state & kRefMask, kOneRef);
// From the previous state, extract which participants we're to wakeup.
wakeup_mask_ |= prev_state & kWakeupMask;
// Now update prev_state to be what we want the CAS to see once wakeups
// complete next iteration.
prev_state &= kRefMask | kLocked | keep_allocated_mask;
}
return done;
}
void Party::AddParticipants(Participant** participants, size_t count) {
bool run_party = sync_.AddParticipantsAndRef(count, [this, participants,
count](size_t* slots) {
uint64_t state = state_.load(std::memory_order_acquire);
uint64_t allocated;
size_t slots[party_detail::kMaxParticipants];
// Find slots for each new participant, ordering them from lowest available
// slot upwards to ensure the same poll ordering as presentation ordering to
// this function.
WakeupMask wakeup_mask;
uint64_t new_state;
do {
wakeup_mask = 0;
allocated = (state & kAllocatedMask) >> kAllocatedShift;
for (size_t i = 0; i < count; i++) {
if (GRPC_TRACE_FLAG_ENABLED(party_state)) {
LOG(INFO) << "Party " << &sync_ << " AddParticipant: "
<< participants[i]->name() << " @ " << slots[i]
<< " [participant=" << participants[i] << "]";
auto new_mask = LowestOneBit(~allocated);
if (GPR_UNLIKELY((new_mask & kWakeupMask) == 0)) {
DelayAddParticipants(participants, count);
return;
}
participants_[slots[i]].store(participants[i], std::memory_order_release);
wakeup_mask |= new_mask;
allocated |= new_mask;
slots[i] = CountTrailingZeros(new_mask);
}
});
if (run_party) RunLocked(this);
Unref();
// Try to allocate this slot and take a ref (atomically).
// Ref needs to be taken because once we store the participant it could be
// spuriously woken up and unref the party.
new_state = (state | (allocated << kAllocatedShift)) + kOneRef;
} while (!state_.compare_exchange_weak(
state, new_state, std::memory_order_acq_rel, std::memory_order_acquire));
LogStateChange("AddParticipantsAndRef", state, new_state);
for (size_t i = 0; i < count; i++) {
GRPC_TRACE_LOG(party_state, INFO)
<< "Party " << this << " AddParticipant: " << slots[i]
<< " " << participants[i];
participants_[slots[i]].store(participants[i], std::memory_order_release);
}
// Now we need to wake up the party.
WakeupFromState(new_state, wakeup_mask);
}
void Party::Wakeup(WakeupMask wakeup_mask) {
if (sync_.ScheduleWakeup(wakeup_mask)) RunLocked(this);
Unref();
void Party::AddParticipant(Participant* participant) {
uint64_t state = state_.load(std::memory_order_acquire);
uint64_t allocated;
size_t slot;
// Find slots for each new participant, ordering them from lowest available
// slot upwards to ensure the same poll ordering as presentation ordering to
// this function.
uint64_t wakeup_mask;
uint64_t new_state;
do {
allocated = (state & kAllocatedMask) >> kAllocatedShift;
wakeup_mask = LowestOneBit(~allocated);
if (GPR_UNLIKELY((wakeup_mask & kWakeupMask) == 0)) {
DelayAddParticipants(&participant, 1);
return;
}
DCHECK_NE(wakeup_mask & kWakeupMask, 0u)
<< "No available slots for new participant; allocated=" << allocated
<< " state=" << state << " wakeup_mask=" << wakeup_mask;
allocated |= wakeup_mask;
slot = CountTrailingZeros(wakeup_mask);
// Try to allocate this slot and take a ref (atomically).
// Ref needs to be taken because once we store the participant it could be
// spuriously woken up and unref the party.
new_state = (state | (allocated << kAllocatedShift)) + kOneRef;
} while (!state_.compare_exchange_weak(
state, new_state, std::memory_order_acq_rel, std::memory_order_acquire));
LogStateChange("AddParticipantsAndRef", state, new_state);
GRPC_TRACE_LOG(party_state, INFO)
<< "Party " << this << " AddParticipant: " << slot
<< " [participant=" << participant << "]";
participants_[slot].store(participant, std::memory_order_release);
// Now we need to wake up the party.
WakeupFromState(new_state, wakeup_mask);
}
void Party::DelayAddParticipants(Participant** participants, size_t count) {
// We need to delay the addition of participants.
IncrementRefCount();
VLOG_EVERY_N_SEC(2, 10) << "Delaying addition of " << count
<< " participants to party " << this
<< " because it is full.";
std::vector<Participant*> delayed_participants{participants,
participants + count};
arena_->GetContext<grpc_event_engine::experimental::EventEngine>()->Run(
[this, delayed_participants = std::move(delayed_participants)]() mutable {
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
AddParticipants(delayed_participants.data(),
delayed_participants.size());
Unref();
});
}
void Party::WakeupAsync(WakeupMask wakeup_mask) {
if (sync_.ScheduleWakeup(wakeup_mask)) {
arena_->GetContext<grpc_event_engine::experimental::EventEngine>()->Run(
[this]() {
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
RunLocked(this);
Unref();
});
} else {
Unref();
// Or in the wakeup bit for the participant, AND the locked bit.
uint64_t prev_state = state_.load(std::memory_order_relaxed);
LogStateChange("ScheduleWakeup", prev_state,
prev_state | (wakeup_mask & kWakeupMask) | kLocked);
while (true) {
if ((prev_state & kLocked) == 0) {
if (state_.compare_exchange_weak(prev_state, prev_state | kLocked,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
LogStateChange("WakeupAsync", prev_state, prev_state | kLocked);
wakeup_mask_ |= wakeup_mask;
arena_->GetContext<grpc_event_engine::experimental::EventEngine>()->Run(
[this, prev_state]() {
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
RunLockedAndUnref(this, prev_state);
});
return;
}
} else {
if (state_.compare_exchange_weak(
prev_state, (prev_state | wakeup_mask) - kOneRef,
std::memory_order_acq_rel, std::memory_order_acquire)) {
LogStateChange("WakeupAsync", prev_state, prev_state | wakeup_mask);
return;
}
}
}
}
void Party::Drop(WakeupMask) { Unref(); }
void Party::PartyIsOver() {
auto arena = arena_;
{
ScopedActivity activity(this);
promise_detail::Context<Arena> arena_ctx(arena_.get());
CancelRemainingParticipants();
arena->DestroyManagedNewObjects();
}
CancelRemainingParticipants();
auto arena = std::move(arena_);
this->~Party();
}

@ -44,25 +44,6 @@
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/util/useful.h"
// Two implementations of party synchronization are provided: one using a single
// atomic, the other using a mutex and a set of state variables.
// Originally the atomic implementation was implemented, but we found some race
// conditions on Arm that were not reported by our default TSAN implementation.
// The mutex implementation was added to see if it would fix the problem, and
// it did. Later we found the race condition, so there's no known reason to use
// the mutex version - however we keep it around as a just in case measure.
// There's a thought of fuzzing the two implementations against each other as
// a correctness check of both, but that's not implemented yet.
#define GRPC_PARTY_SYNC_USING_ATOMICS
// #define GRPC_PARTY_SYNC_USING_MUTEX
#if defined(GRPC_PARTY_SYNC_USING_ATOMICS) + \
defined(GRPC_PARTY_SYNC_USING_MUTEX) != \
1
#error Must define a party sync mechanism
#endif
namespace grpc_core {
namespace party_detail {
@ -73,264 +54,6 @@ static constexpr size_t kMaxParticipants = 16;
} // namespace party_detail
class PartySyncUsingAtomics {
public:
explicit PartySyncUsingAtomics(size_t initial_refs)
: state_(kOneRef * initial_refs) {}
void IncrementRefCount() {
const uint64_t prev_state =
state_.fetch_add(kOneRef, std::memory_order_relaxed);
LogStateChange("IncrementRefCount", prev_state, prev_state + kOneRef);
}
GRPC_MUST_USE_RESULT bool RefIfNonZero();
// Returns true if the ref count is now zero and the caller should call
// PartyIsOver
GRPC_MUST_USE_RESULT bool Unref() {
const uint64_t prev_state =
state_.fetch_sub(kOneRef, std::memory_order_acq_rel);
LogStateChange("Unref", prev_state, prev_state - kOneRef);
if ((prev_state & kRefMask) == kOneRef) {
return UnreffedLast();
}
return false;
}
void ForceImmediateRepoll(WakeupMask mask) {
// Or in the bit for the currently polling participant.
// Will be grabbed next round to force a repoll of this promise.
const uint64_t prev_state =
state_.fetch_or(mask, std::memory_order_relaxed);
LogStateChange("ForceImmediateRepoll", prev_state, prev_state | mask);
}
// Run the update loop: poll_one_participant is called with an integral index
// for the participant that should be polled. It should return true if the
// participant completed and should be removed from the allocated set.
template <typename F>
GRPC_MUST_USE_RESULT bool RunParty(F poll_one_participant) {
// Grab the current state, and clear the wakeup bits & add flag.
uint64_t prev_state = state_.fetch_and(kRefMask | kLocked | kAllocatedMask,
std::memory_order_acquire);
LogStateChange("Run", prev_state,
prev_state & (kRefMask | kLocked | kAllocatedMask));
CHECK(prev_state & kLocked);
if (prev_state & kDestroying) return true;
// From the previous state, extract which participants we're to wakeup.
uint64_t wakeups = prev_state & kWakeupMask;
// Now update prev_state to be what we want the CAS to see below.
prev_state &= kRefMask | kLocked | kAllocatedMask;
for (;;) {
uint64_t keep_allocated_mask = kAllocatedMask;
// For each wakeup bit...
while (wakeups != 0) {
uint64_t t = LowestOneBit(wakeups);
const int i = CountTrailingZeros(t);
wakeups ^= t;
// If the bit is not set, skip.
if (poll_one_participant(i)) {
const uint64_t allocated_bit = (1u << i << kAllocatedShift);
keep_allocated_mask &= ~allocated_bit;
}
}
// Try to CAS the state we expected to have (with no wakeups or adds)
// back to unlocked (by masking in only the ref mask - sans locked bit).
// If this succeeds then no wakeups were added, no adds were added, and we
// have successfully unlocked.
// Otherwise, we need to loop again.
// Note that if an owning waker is created or the weak cas spuriously
// fails we will also loop again, but in that case see no wakeups or adds
// and so will get back here fairly quickly.
// TODO(ctiller): consider mitigations for the accidental wakeup on owning
// waker creation case -- I currently expect this will be more expensive
// than this quick loop.
if (state_.compare_exchange_weak(
prev_state, (prev_state & (kRefMask | keep_allocated_mask)),
std::memory_order_acq_rel, std::memory_order_acquire)) {
LogStateChange("Run:End", prev_state,
prev_state & (kRefMask | kAllocatedMask));
return false;
}
while (!state_.compare_exchange_weak(
prev_state,
prev_state & (kRefMask | kLocked | keep_allocated_mask))) {
// Nothing to do here.
}
LogStateChange("Run:Continue", prev_state,
prev_state & (kRefMask | kLocked | keep_allocated_mask));
CHECK(prev_state & kLocked);
if (prev_state & kDestroying) return true;
// From the previous state, extract which participants we're to wakeup.
wakeups = prev_state & kWakeupMask;
// Now update prev_state to be what we want the CAS to see once wakeups
// complete next iteration.
prev_state &= kRefMask | kLocked | keep_allocated_mask;
}
return false;
}
// Add new participants to the party. Returns true if the caller should run
// the party. store is called with an array of indices of the new
// participants. Adds a ref that should be dropped by the caller after
// RunParty has been called (if that was required).
template <typename F>
GRPC_MUST_USE_RESULT bool AddParticipantsAndRef(size_t count, F store) {
uint64_t state = state_.load(std::memory_order_acquire);
uint64_t allocated;
size_t slots[party_detail::kMaxParticipants];
// Find slots for each new participant, ordering them from lowest available
// slot upwards to ensure the same poll ordering as presentation ordering to
// this function.
WakeupMask wakeup_mask;
do {
wakeup_mask = 0;
allocated = (state & kAllocatedMask) >> kAllocatedShift;
for (size_t i = 0; i < count; i++) {
auto new_mask = LowestOneBit(~allocated);
wakeup_mask |= new_mask;
allocated |= new_mask;
slots[i] = CountTrailingZeros(new_mask);
}
// Try to allocate this slot and take a ref (atomically).
// Ref needs to be taken because once we store the participant it could be
// spuriously woken up and unref the party.
} while (!state_.compare_exchange_weak(
state, (state | (allocated << kAllocatedShift)) + kOneRef,
std::memory_order_acq_rel, std::memory_order_acquire));
LogStateChange("AddParticipantsAndRef", state,
(state | (allocated << kAllocatedShift)) + kOneRef);
store(slots);
// Now we need to wake up the party.
state = state_.fetch_or(wakeup_mask | kLocked, std::memory_order_release);
LogStateChange("AddParticipantsAndRef:Wakeup", state,
state | wakeup_mask | kLocked);
// If the party was already locked, we're done.
return ((state & kLocked) == 0);
}
// Schedule a wakeup for the given participant.
// Returns true if the caller should run the party.
GRPC_MUST_USE_RESULT bool ScheduleWakeup(WakeupMask mask);
bool has_participants() const {
return (state_.load(std::memory_order_relaxed) & kAllocatedMask) != 0;
}
private:
bool UnreffedLast();
void LogStateChange(const char* op, uint64_t prev_state, uint64_t new_state,
DebugLocation loc = {}) {
if (GRPC_TRACE_FLAG_ENABLED(party_state)) {
LOG(INFO).AtLocation(loc.file(), loc.line())
<< absl::StrFormat("Party %p %30s: %016" PRIx64 " -> %016" PRIx64,
this, op, prev_state, new_state);
}
}
// State bits:
// The atomic state_ field is composed of the following:
// - 24 bits for ref counts
// 1 is owned by the party prior to Orphan()
// All others are owned by owning wakers
// - 1 bit to indicate whether the party is locked
// The first thread to set this owns the party until it is unlocked
// That thread will run the main loop until no further work needs to
// be done.
// - 1 bit to indicate whether there are participants waiting to be
// added
// - 16 bits, one per participant, indicating which participants have
// been
// woken up and should be polled next time the main loop runs.
// clang-format off
// Bits used to store 16 bits of wakeups
static constexpr uint64_t kWakeupMask = 0x0000'0000'0000'ffff;
// Bits used to store 16 bits of allocated participant slots.
static constexpr uint64_t kAllocatedMask = 0x0000'0000'ffff'0000;
// Bit indicating destruction has begun (refs went to zero)
static constexpr uint64_t kDestroying = 0x0000'0001'0000'0000;
// Bit indicating locked or not
static constexpr uint64_t kLocked = 0x0000'0008'0000'0000;
// Bits used to store 24 bits of ref counts
static constexpr uint64_t kRefMask = 0xffff'ff00'0000'0000;
// clang-format on
// Shift to get from a participant mask to an allocated mask.
static constexpr size_t kAllocatedShift = 16;
// How far to shift to get the refcount
static constexpr size_t kRefShift = 40;
// One ref count
static constexpr uint64_t kOneRef = 1ull << kRefShift;
std::atomic<uint64_t> state_;
};
class PartySyncUsingMutex {
public:
explicit PartySyncUsingMutex(size_t initial_refs) : refs_(initial_refs) {}
void IncrementRefCount() { refs_.Ref(); }
GRPC_MUST_USE_RESULT bool RefIfNonZero() { return refs_.RefIfNonZero(); }
GRPC_MUST_USE_RESULT bool Unref() { return refs_.Unref(); }
void ForceImmediateRepoll(WakeupMask mask) {
MutexLock lock(&mu_);
wakeups_ |= mask;
}
template <typename F>
GRPC_MUST_USE_RESULT bool RunParty(F poll_one_participant) {
WakeupMask freed = 0;
while (true) {
ReleasableMutexLock lock(&mu_);
CHECK(locked_);
allocated_ &= ~std::exchange(freed, 0);
auto wakeup = std::exchange(wakeups_, 0);
if (wakeup == 0) {
locked_ = false;
return false;
}
lock.Release();
for (size_t i = 0; wakeup != 0; i++, wakeup >>= 1) {
if ((wakeup & 1) == 0) continue;
if (poll_one_participant(i)) freed |= 1 << i;
}
}
}
template <typename F>
GRPC_MUST_USE_RESULT bool AddParticipantsAndRef(size_t count, F store) {
IncrementRefCount();
MutexLock lock(&mu_);
size_t slots[party_detail::kMaxParticipants];
WakeupMask wakeup_mask = 0;
size_t n = 0;
for (size_t bit = 0; n < count && bit < party_detail::kMaxParticipants;
bit++) {
if (allocated_ & (1 << bit)) continue;
slots[n++] = bit;
wakeup_mask |= 1 << bit;
allocated_ |= 1 << bit;
}
CHECK(n == count);
store(slots);
wakeups_ |= wakeup_mask;
return !std::exchange(locked_, true);
}
GRPC_MUST_USE_RESULT bool ScheduleWakeup(WakeupMask mask);
private:
RefCount refs_;
Mutex mu_;
WakeupMask allocated_ ABSL_GUARDED_BY(mu_) = 0;
WakeupMask wakeups_ ABSL_GUARDED_BY(mu_) = 0;
bool locked_ ABSL_GUARDED_BY(mu_) = false;
};
// A Party is an Activity with multiple participant promises.
class Party : public Activity, private Wakeable {
private:
@ -340,7 +63,6 @@ class Party : public Activity, private Wakeable {
// One participant in the party.
class Participant {
public:
explicit Participant(absl::string_view name) : name_(name) {}
// Poll the participant. Return true if complete.
// Participant should take care of its own deallocation in this case.
virtual bool PollParticipantPromise() = 0;
@ -351,14 +73,11 @@ class Party : public Activity, private Wakeable {
// Return a Handle instance for this participant.
Wakeable* MakeNonOwningWakeable(Party* party);
absl::string_view name() const { return name_; }
protected:
~Participant();
private:
Handle* handle_ = nullptr;
absl::string_view name_;
};
public:
@ -400,10 +119,17 @@ class Party : public Activity, private Wakeable {
Waker MakeNonOwningWaker() final;
std::string ActivityDebugTag(WakeupMask wakeup_mask) const final;
void IncrementRefCount() { sync_.IncrementRefCount(); }
void Unref() {
if (sync_.Unref()) PartyIsOver();
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void IncrementRefCount() {
const uint64_t prev_state =
state_.fetch_add(kOneRef, std::memory_order_relaxed);
LogStateChange("IncrementRefCount", prev_state, prev_state + kOneRef);
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void Unref() {
uint64_t prev_state = state_.fetch_sub(kOneRef, std::memory_order_acq_rel);
LogStateChange("Unref", prev_state, prev_state - kOneRef);
if ((prev_state & kRefMask) == kOneRef) PartyIsOver();
}
RefCountedPtr<Party> Ref() {
IncrementRefCount();
return RefCountedPtr<Party>(this);
@ -432,17 +158,15 @@ class Party : public Activity, private Wakeable {
friend class Arena;
// Derived types should be constructed upon `arena`.
explicit Party(RefCountedPtr<Arena> arena)
: sync_(1), arena_(std::move(arena)) {}
explicit Party(RefCountedPtr<Arena> arena) : arena_(std::move(arena)) {}
~Party() override;
// Main run loop. Must be locked.
// Polls participants and drains the add queue until there is no work left to
// be done.
// Returns true if the party is over.
GRPC_MUST_USE_RESULT bool RunParty();
void RunPartyAndUnref(uint64_t prev_state);
bool RefIfNonZero() { return sync_.RefIfNonZero(); }
bool RefIfNonZero();
private:
// Concrete implementation of a participant for some promise & oncomplete
@ -453,9 +177,9 @@ class Party : public Activity, private Wakeable {
using Promise = typename Factory::Promise;
public:
ParticipantImpl(absl::string_view name, SuppliedFactory promise_factory,
ParticipantImpl(absl::string_view, SuppliedFactory promise_factory,
OnComplete on_complete)
: Participant(name), on_complete_(std::move(on_complete)) {
: on_complete_(std::move(on_complete)) {
Construct(&factory_, std::move(promise_factory));
}
~ParticipantImpl() {
@ -503,9 +227,7 @@ class Party : public Activity, private Wakeable {
using Result = typename Promise::Result;
public:
PromiseParticipantImpl(absl::string_view name,
SuppliedFactory promise_factory)
: Participant(name) {
PromiseParticipantImpl(absl::string_view, SuppliedFactory promise_factory) {
Construct(&factory_, std::move(promise_factory));
}
@ -576,38 +298,113 @@ class Party : public Activity, private Wakeable {
std::atomic<State> state_{State::kFactory};
};
// State bits:
// The atomic state_ field is composed of the following:
// - 24 bits for ref counts
// 1 is owned by the party prior to Orphan()
// All others are owned by owning wakers
// - 1 bit to indicate whether the party is locked
// The first thread to set this owns the party until it is unlocked
// That thread will run the main loop until no further work needs to
// be done.
// - 1 bit to indicate whether there are participants waiting to be
// added
// - 16 bits, one per participant, indicating which participants have
// been
// woken up and should be polled next time the main loop runs.
// clang-format off
// Bits used to store 16 bits of wakeups
static constexpr uint64_t kWakeupMask = 0x0000'0000'0000'ffff;
// Bits used to store 16 bits of allocated participant slots.
static constexpr uint64_t kAllocatedMask = 0x0000'0000'ffff'0000;
// Bit indicating locked or not
static constexpr uint64_t kLocked = 0x0000'0008'0000'0000;
// Bits used to store 24 bits of ref counts
static constexpr uint64_t kRefMask = 0xffff'ff00'0000'0000;
// clang-format on
// Shift to get from a participant mask to an allocated mask.
static constexpr size_t kAllocatedShift = 16;
// How far to shift to get the refcount
static constexpr size_t kRefShift = 40;
// One ref count
static constexpr uint64_t kOneRef = 1ull << kRefShift;
// Destroy any remaining participants.
// Needs to have normal context setup before calling.
void CancelRemainingParticipants();
// Run the locked part of the party until it is unlocked.
static void RunLocked(Party* party);
static void RunLockedAndUnref(Party* party, uint64_t prev_state);
// Called in response to Unref() hitting zero - ultimately calls PartyOver,
// but needs to set some stuff up.
// Here so it gets compiled out of line.
void PartyIsOver();
// Wakeable implementation
void Wakeup(WakeupMask wakeup_mask) final;
void Wakeup(WakeupMask wakeup_mask) final {
if (Activity::current() == this) {
wakeup_mask_ |= wakeup_mask;
Unref();
return;
}
WakeupFromState(state_.load(std::memory_order_relaxed), wakeup_mask);
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void WakeupFromState(
uint64_t cur_state, WakeupMask wakeup_mask) {
DCHECK_NE(wakeup_mask & kWakeupMask, 0u)
<< "Wakeup mask must be non-zero: " << wakeup_mask;
while (true) {
if (cur_state & kLocked) {
// If the party is locked, we need to set the wakeup bits, and then
// we'll immediately unref. Since something is running this should never
// bring the refcount to zero.
DCHECK_GT(cur_state & kRefMask, kOneRef);
auto new_state = (cur_state | wakeup_mask) - kOneRef;
if (state_.compare_exchange_weak(cur_state, new_state,
std::memory_order_release)) {
LogStateChange("Wakeup", cur_state, cur_state | wakeup_mask);
return;
}
} else {
// If the party is not locked, we need to lock it and run.
DCHECK_EQ(cur_state & kWakeupMask, 0u);
if (state_.compare_exchange_weak(cur_state, cur_state | kLocked,
std::memory_order_acq_rel)) {
LogStateChange("WakeupAndRun", cur_state, cur_state | kLocked);
wakeup_mask_ |= wakeup_mask;
RunLockedAndUnref(this, cur_state);
return;
}
}
}
}
void WakeupAsync(WakeupMask wakeup_mask) final;
void Drop(WakeupMask wakeup_mask) final;
// Add a participant (backs Spawn, after type erasure to ParticipantFactory).
void AddParticipants(Participant** participant, size_t count);
bool RunOneParticipant(int i);
void AddParticipant(Participant* participant);
void DelayAddParticipants(Participant** participant, size_t count);
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void LogStateChange(
const char* op, uint64_t prev_state, uint64_t new_state,
DebugLocation loc = {}) {
GRPC_TRACE_LOG(party_state, INFO).AtLocation(loc.file(), loc.line())
<< DebugTag() << " " << op << " "
<< absl::StrFormat("%016" PRIx64 " -> %016" PRIx64, prev_state,
new_state);
}
// Sentinal value for currently_polling_ when no participant is being polled.
static constexpr uint8_t kNotPolling = 255;
#ifdef GRPC_PARTY_SYNC_USING_ATOMICS
PartySyncUsingAtomics sync_;
#elif defined(GRPC_PARTY_SYNC_USING_MUTEX)
PartySyncUsingMutex sync_;
#else
#error No synchronization method defined
#endif
std::atomic<uint64_t> state_{kOneRef};
uint8_t currently_polling_ = kNotPolling;
WakeupMask wakeup_mask_ = 0;
// All current participants, using a tagged format.
// If the lower bit is unset, then this is a Participant*.
// If the lower bit is set, then this is a ParticipantFactory*.
@ -633,8 +430,8 @@ void Party::BulkSpawner::Spawn(absl::string_view name, Factory promise_factory,
template <typename Factory, typename OnComplete>
void Party::Spawn(absl::string_view name, Factory promise_factory,
OnComplete on_complete) {
BulkSpawner(this).Spawn(name, std::move(promise_factory),
std::move(on_complete));
AddParticipant(new ParticipantImpl<Factory, OnComplete>(
name, std::move(promise_factory), std::move(on_complete)));
}
template <typename Factory>
@ -642,7 +439,7 @@ auto Party::SpawnWaitable(absl::string_view name, Factory promise_factory) {
auto participant = MakeRefCounted<PromiseParticipantImpl<Factory>>(
name, std::move(promise_factory));
Participant* p = participant->Ref().release();
AddParticipants(&p, 1);
AddParticipant(p);
return [participant = std::move(participant)]() mutable {
return participant->PollCompletion();
};

@ -48,187 +48,6 @@
namespace grpc_core {
///////////////////////////////////////////////////////////////////////////////
// PartySyncTest
template <typename T>
class PartySyncTest : public ::testing::Test {};
// PartySyncUsingMutex isn't working on Mac, but we don't use it for anything
// right now so that's fine.
#ifdef GPR_APPLE
using PartySyncTypes = ::testing::Types<PartySyncUsingAtomics>;
#else
using PartySyncTypes =
::testing::Types<PartySyncUsingAtomics, PartySyncUsingMutex>;
#endif
TYPED_TEST_SUITE(PartySyncTest, PartySyncTypes);
TYPED_TEST(PartySyncTest, NoOp) { TypeParam sync(1); }
TYPED_TEST(PartySyncTest, RefAndUnref) {
Notification half_way;
TypeParam sync(1);
std::thread thread1([&] {
for (int i = 0; i < 1000000; i++) {
sync.IncrementRefCount();
}
half_way.Notify();
for (int i = 0; i < 1000000; i++) {
sync.IncrementRefCount();
}
for (int i = 0; i < 2000000; i++) {
EXPECT_FALSE(sync.Unref());
}
});
half_way.WaitForNotification();
for (int i = 0; i < 2000000; i++) {
sync.IncrementRefCount();
}
for (int i = 0; i < 2000000; i++) {
EXPECT_FALSE(sync.Unref());
}
thread1.join();
EXPECT_TRUE(sync.Unref());
}
TYPED_TEST(PartySyncTest, AddAndRemoveParticipant) {
TypeParam sync(1);
std::vector<std::thread> threads;
std::atomic<std::atomic<bool>*> participants[party_detail::kMaxParticipants] =
{};
threads.reserve(8);
for (int i = 0; i < 8; i++) {
threads.emplace_back([&] {
for (int i = 0; i < 100000; i++) {
auto done = std::make_unique<std::atomic<bool>>(false);
int slot = -1;
bool run = sync.AddParticipantsAndRef(1, [&](size_t* idxs) {
slot = idxs[0];
participants[slot].store(done.get(), std::memory_order_release);
});
EXPECT_NE(slot, -1);
if (run) {
bool run_any = false;
bool run_me = false;
EXPECT_FALSE(sync.RunParty([&](int slot) {
run_any = true;
std::atomic<bool>* participant =
participants[slot].exchange(nullptr, std::memory_order_acquire);
if (participant == done.get()) run_me = true;
if (participant == nullptr) {
LOG(ERROR) << "Participant was null (spurious wakeup observed)";
return false;
}
participant->store(true, std::memory_order_release);
return true;
}));
EXPECT_TRUE(run_any);
EXPECT_TRUE(run_me);
}
EXPECT_FALSE(sync.Unref());
while (!done->load(std::memory_order_acquire)) {
}
}
});
}
for (auto& thread : threads) {
thread.join();
}
EXPECT_TRUE(sync.Unref());
}
TYPED_TEST(PartySyncTest, AddAndRemoveTwoParticipants) {
TypeParam sync(1);
std::vector<std::thread> threads;
std::atomic<std::atomic<int>*> participants[party_detail::kMaxParticipants] =
{};
threads.reserve(8);
for (int i = 0; i < 4; i++) {
threads.emplace_back([&] {
for (int i = 0; i < 100000; i++) {
auto done = std::make_unique<std::atomic<int>>(2);
int slots[2] = {-1, -1};
bool run = sync.AddParticipantsAndRef(2, [&](size_t* idxs) {
for (int i = 0; i < 2; i++) {
slots[i] = idxs[i];
participants[slots[i]].store(done.get(), std::memory_order_release);
}
});
EXPECT_NE(slots[0], -1);
EXPECT_NE(slots[1], -1);
EXPECT_GT(slots[1], slots[0]);
if (run) {
bool run_any = false;
int run_me = 0;
EXPECT_FALSE(sync.RunParty([&](int slot) {
run_any = true;
std::atomic<int>* participant =
participants[slot].exchange(nullptr, std::memory_order_acquire);
if (participant == done.get()) run_me++;
if (participant == nullptr) {
LOG(ERROR) << "Participant was null (spurious wakeup observed)";
return false;
}
participant->fetch_sub(1, std::memory_order_release);
return true;
}));
EXPECT_TRUE(run_any);
EXPECT_EQ(run_me, 2);
}
EXPECT_FALSE(sync.Unref());
while (done->load(std::memory_order_acquire) != 0) {
}
}
});
}
for (auto& thread : threads) {
thread.join();
}
EXPECT_TRUE(sync.Unref());
}
TYPED_TEST(PartySyncTest, UnrefWhileRunning) {
std::vector<std::thread> trials;
std::atomic<int> delete_paths_taken[3] = {{0}, {0}, {0}};
trials.reserve(100);
for (int i = 0; i < 100; i++) {
trials.emplace_back([&delete_paths_taken] {
TypeParam sync(1);
int delete_path = -1;
EXPECT_TRUE(sync.AddParticipantsAndRef(
1, [](size_t* slots) { EXPECT_EQ(slots[0], 0); }));
std::thread run_party([&] {
if (sync.RunParty([&sync, n = 0](int slot) mutable {
EXPECT_EQ(slot, 0);
++n;
if (n < 10) {
sync.ForceImmediateRepoll(1);
return false;
}
return true;
})) {
delete_path = 0;
}
});
std::thread unref([&] {
if (sync.Unref()) delete_path = 1;
});
if (sync.Unref()) delete_path = 2;
run_party.join();
unref.join();
EXPECT_GE(delete_path, 0);
delete_paths_taken[delete_path].fetch_add(1, std::memory_order_relaxed);
});
}
for (auto& trial : trials) {
trial.join();
}
fprintf(stderr, "DELETE_PATHS: RunParty:%d AsyncUnref:%d SyncUnref:%d\n",
delete_paths_taken[0].load(), delete_paths_taken[1].load(),
delete_paths_taken[2].load());
}
///////////////////////////////////////////////////////////////////////////////
// PartyTest
@ -704,6 +523,7 @@ TEST_F(PartyTest, NestedWakeup) {
auto party2 = MakeParty();
auto party3 = MakeParty();
int whats_going_on = 0;
Notification done1;
Notification started2;
Notification done2;
Notification started3;
@ -716,6 +536,7 @@ TEST_F(PartyTest, NestedWakeup) {
party2->Spawn(
"p2",
[&]() {
done1.WaitForNotification();
started2.Notify();
started3.WaitForNotification();
EXPECT_EQ(whats_going_on, 3);
@ -749,6 +570,7 @@ TEST_F(PartyTest, NestedWakeup) {
[&](Empty) {
EXPECT_EQ(whats_going_on, 2);
whats_going_on = 3;
done1.Notify();
});
notify_done.WaitForNotification();
}

Loading…
Cancel
Save