The C based gRPC (C++, Python, Ruby, Objective-C, PHP, C#)
https://grpc.io/
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
672 lines
24 KiB
672 lines
24 KiB
// Copyright 2023 gRPC authors. |
|
// |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
#ifndef GRPC_SRC_CORE_LIB_PROMISE_PARTY_H |
|
#define GRPC_SRC_CORE_LIB_PROMISE_PARTY_H |
|
|
|
#include <stddef.h> |
|
#include <stdint.h> |
|
|
|
#include <atomic> |
|
#include <string> |
|
#include <utility> |
|
|
|
#include "absl/base/attributes.h" |
|
#include "absl/base/thread_annotations.h" |
|
#include "absl/log/check.h" |
|
#include "absl/strings/string_view.h" |
|
|
|
#include <grpc/event_engine/event_engine.h> |
|
#include <grpc/support/log.h> |
|
#include <grpc/support/port_platform.h> |
|
|
|
#include "src/core/lib/gprpp/construct_destruct.h" |
|
#include "src/core/lib/gprpp/crash.h" |
|
#include "src/core/lib/gprpp/ref_counted.h" |
|
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
|
#include "src/core/lib/gprpp/sync.h" |
|
#include "src/core/lib/promise/activity.h" |
|
#include "src/core/lib/promise/context.h" |
|
#include "src/core/lib/promise/detail/promise_factory.h" |
|
#include "src/core/lib/promise/poll.h" |
|
#include "src/core/lib/promise/trace.h" |
|
|
|
// Two implementations of party synchronization are provided: one using a single |
|
// atomic, the other using a mutex and a set of state variables. |
|
// Originally the atomic implementation was implemented, but we found some race |
|
// conditions on Arm that were not reported by our default TSAN implementation. |
|
// The mutex implementation was added to see if it would fix the problem, and |
|
// it did. Later we found the race condition, so there's no known reason to use |
|
// the mutex version - however we keep it around as a just in case measure. |
|
// There's a thought of fuzzing the two implementations against each other as |
|
// a correctness check of both, but that's not implemented yet. |
|
|
|
extern grpc_core::DebugOnlyTraceFlag grpc_trace_party_state; |
|
|
|
#define GRPC_PARTY_SYNC_USING_ATOMICS |
|
// #define GRPC_PARTY_SYNC_USING_MUTEX |
|
|
|
#if defined(GRPC_PARTY_SYNC_USING_ATOMICS) + \ |
|
defined(GRPC_PARTY_SYNC_USING_MUTEX) != \ |
|
1 |
|
#error Must define a party sync mechanism |
|
#endif |
|
|
|
namespace grpc_core { |
|
|
|
namespace party_detail { |
|
|
|
// Number of bits reserved for wakeups gives us the maximum number of |
|
// participants. |
|
static constexpr size_t kMaxParticipants = 16; |
|
|
|
} // namespace party_detail |
|
|
|
class PartySyncUsingAtomics { |
|
public: |
|
explicit PartySyncUsingAtomics(size_t initial_refs) |
|
: state_(kOneRef * initial_refs) {} |
|
|
|
void IncrementRefCount() { |
|
const uint64_t prev_state = |
|
state_.fetch_add(kOneRef, std::memory_order_relaxed); |
|
LogStateChange("IncrementRefCount", prev_state, prev_state + kOneRef); |
|
} |
|
GRPC_MUST_USE_RESULT bool RefIfNonZero(); |
|
// Returns true if the ref count is now zero and the caller should call |
|
// PartyOver |
|
GRPC_MUST_USE_RESULT bool Unref() { |
|
const uint64_t prev_state = |
|
state_.fetch_sub(kOneRef, std::memory_order_acq_rel); |
|
LogStateChange("Unref", prev_state, prev_state - kOneRef); |
|
if ((prev_state & kRefMask) == kOneRef) { |
|
return UnreffedLast(); |
|
} |
|
return false; |
|
} |
|
void ForceImmediateRepoll(WakeupMask mask) { |
|
// Or in the bit for the currently polling participant. |
|
// Will be grabbed next round to force a repoll of this promise. |
|
const uint64_t prev_state = |
|
state_.fetch_or(mask, std::memory_order_relaxed); |
|
LogStateChange("ForceImmediateRepoll", prev_state, prev_state | mask); |
|
} |
|
|
|
// Run the update loop: poll_one_participant is called with an integral index |
|
// for the participant that should be polled. It should return true if the |
|
// participant completed and should be removed from the allocated set. |
|
template <typename F> |
|
GRPC_MUST_USE_RESULT bool RunParty(F poll_one_participant) { |
|
uint64_t prev_state; |
|
iteration_.fetch_add(1, std::memory_order_relaxed); |
|
for (;;) { |
|
// Grab the current state, and clear the wakeup bits & add flag. |
|
prev_state = state_.fetch_and(kRefMask | kLocked | kAllocatedMask, |
|
std::memory_order_acquire); |
|
LogStateChange("Run", prev_state, |
|
prev_state & (kRefMask | kLocked | kAllocatedMask)); |
|
CHECK(prev_state & kLocked); |
|
if (prev_state & kDestroying) return true; |
|
// From the previous state, extract which participants we're to wakeup. |
|
uint64_t wakeups = prev_state & kWakeupMask; |
|
// Now update prev_state to be what we want the CAS to see below. |
|
prev_state &= kRefMask | kLocked | kAllocatedMask; |
|
// For each wakeup bit... |
|
for (size_t i = 0; wakeups != 0; i++, wakeups >>= 1) { |
|
// If the bit is not set, skip. |
|
if ((wakeups & 1) == 0) continue; |
|
if (poll_one_participant(i)) { |
|
const uint64_t allocated_bit = (1u << i << kAllocatedShift); |
|
prev_state &= ~allocated_bit; |
|
uint64_t finished_prev_state = |
|
state_.fetch_and(~allocated_bit, std::memory_order_release); |
|
LogStateChange("Run:ParticipantComplete", finished_prev_state, |
|
finished_prev_state & ~allocated_bit); |
|
} |
|
} |
|
// Try to CAS the state we expected to have (with no wakeups or adds) |
|
// back to unlocked (by masking in only the ref mask - sans locked bit). |
|
// If this succeeds then no wakeups were added, no adds were added, and we |
|
// have successfully unlocked. |
|
// Otherwise, we need to loop again. |
|
// Note that if an owning waker is created or the weak cas spuriously |
|
// fails we will also loop again, but in that case see no wakeups or adds |
|
// and so will get back here fairly quickly. |
|
// TODO(ctiller): consider mitigations for the accidental wakeup on owning |
|
// waker creation case -- I currently expect this will be more expensive |
|
// than this quick loop. |
|
if (wake_after_poll_ == 0) { |
|
if (state_.compare_exchange_weak( |
|
prev_state, (prev_state & (kRefMask | kAllocatedMask)), |
|
std::memory_order_acq_rel, std::memory_order_acquire)) { |
|
LogStateChange("Run:End", prev_state, |
|
prev_state & (kRefMask | kAllocatedMask)); |
|
return false; |
|
} |
|
} else { |
|
if (state_.compare_exchange_weak( |
|
prev_state, |
|
(prev_state & (kRefMask | kAllocatedMask | kLocked)) | |
|
wake_after_poll_, |
|
std::memory_order_acq_rel, std::memory_order_acquire)) { |
|
LogStateChange("Run:EndIteration", prev_state, |
|
prev_state & (kRefMask | kAllocatedMask)); |
|
iteration_.fetch_add(1, std::memory_order_relaxed); |
|
wake_after_poll_ = 0; |
|
} |
|
} |
|
} |
|
return false; |
|
} |
|
|
|
// Add new participants to the party. Returns true if the caller should run |
|
// the party. store is called with an array of indices of the new |
|
// participants. Adds a ref that should be dropped by the caller after |
|
// RunParty has been called (if that was required). |
|
template <typename F> |
|
GRPC_MUST_USE_RESULT bool AddParticipantsAndRef(size_t count, F store) { |
|
uint64_t state = state_.load(std::memory_order_acquire); |
|
uint64_t allocated; |
|
|
|
size_t slots[party_detail::kMaxParticipants]; |
|
|
|
// Find slots for each new participant, ordering them from lowest available |
|
// slot upwards to ensure the same poll ordering as presentation ordering to |
|
// this function. |
|
WakeupMask wakeup_mask; |
|
do { |
|
wakeup_mask = 0; |
|
allocated = (state & kAllocatedMask) >> kAllocatedShift; |
|
size_t n = 0; |
|
for (size_t bit = 0; n < count && bit < party_detail::kMaxParticipants; |
|
bit++) { |
|
if (allocated & (1 << bit)) continue; |
|
wakeup_mask |= (1 << bit); |
|
slots[n++] = bit; |
|
allocated |= 1 << bit; |
|
} |
|
CHECK(n == count); |
|
// Try to allocate this slot and take a ref (atomically). |
|
// Ref needs to be taken because once we store the participant it could be |
|
// spuriously woken up and unref the party. |
|
} while (!state_.compare_exchange_weak( |
|
state, (state | (allocated << kAllocatedShift)) + kOneRef, |
|
std::memory_order_acq_rel, std::memory_order_acquire)); |
|
LogStateChange("AddParticipantsAndRef", state, |
|
(state | (allocated << kAllocatedShift)) + kOneRef); |
|
|
|
store(slots); |
|
|
|
// Now we need to wake up the party. |
|
state = state_.fetch_or(wakeup_mask | kLocked, std::memory_order_release); |
|
LogStateChange("AddParticipantsAndRef:Wakeup", state, |
|
state | wakeup_mask | kLocked); |
|
|
|
// If the party was already locked, we're done. |
|
return ((state & kLocked) == 0); |
|
} |
|
|
|
// Schedule a wakeup for the given participant. |
|
// Returns true if the caller should run the party. |
|
GRPC_MUST_USE_RESULT bool ScheduleWakeup(WakeupMask mask); |
|
|
|
void WakeAfterPoll(WakeupMask mask) { wake_after_poll_ |= mask; } |
|
uint32_t iteration() const { |
|
return iteration_.load(std::memory_order_relaxed); |
|
} |
|
|
|
private: |
|
bool UnreffedLast(); |
|
|
|
void LogStateChange(const char* op, uint64_t prev_state, uint64_t new_state, |
|
DebugLocation loc = {}) { |
|
if (grpc_trace_party_state.enabled()) { |
|
gpr_log(loc.file(), loc.line(), GPR_LOG_SEVERITY_INFO, |
|
"Party %p %30s: %016" PRIx64 " -> %016" PRIx64, this, op, |
|
prev_state, new_state); |
|
} |
|
} |
|
|
|
// State bits: |
|
// The atomic state_ field is composed of the following: |
|
// - 24 bits for ref counts |
|
// 1 is owned by the party prior to Orphan() |
|
// All others are owned by owning wakers |
|
// - 1 bit to indicate whether the party is locked |
|
// The first thread to set this owns the party until it is unlocked |
|
// That thread will run the main loop until no further work needs to |
|
// be done. |
|
// - 1 bit to indicate whether there are participants waiting to be |
|
// added |
|
// - 16 bits, one per participant, indicating which participants have |
|
// been |
|
// woken up and should be polled next time the main loop runs. |
|
|
|
// clang-format off |
|
// Bits used to store 16 bits of wakeups |
|
static constexpr uint64_t kWakeupMask = 0x0000'0000'0000'ffff; |
|
// Bits used to store 16 bits of allocated participant slots. |
|
static constexpr uint64_t kAllocatedMask = 0x0000'0000'ffff'0000; |
|
// Bit indicating destruction has begun (refs went to zero) |
|
static constexpr uint64_t kDestroying = 0x0000'0001'0000'0000; |
|
// Bit indicating locked or not |
|
static constexpr uint64_t kLocked = 0x0000'0008'0000'0000; |
|
// Bits used to store 24 bits of ref counts |
|
static constexpr uint64_t kRefMask = 0xffff'ff00'0000'0000; |
|
// clang-format on |
|
|
|
// Shift to get from a participant mask to an allocated mask. |
|
static constexpr size_t kAllocatedShift = 16; |
|
// How far to shift to get the refcount |
|
static constexpr size_t kRefShift = 40; |
|
// One ref count |
|
static constexpr uint64_t kOneRef = 1ull << kRefShift; |
|
|
|
std::atomic<uint64_t> state_; |
|
std::atomic<uint32_t> iteration_{0}; |
|
WakeupMask wake_after_poll_ = 0; |
|
}; |
|
|
|
class PartySyncUsingMutex { |
|
public: |
|
explicit PartySyncUsingMutex(size_t initial_refs) : refs_(initial_refs) {} |
|
|
|
void IncrementRefCount() { refs_.Ref(); } |
|
GRPC_MUST_USE_RESULT bool RefIfNonZero() { return refs_.RefIfNonZero(); } |
|
GRPC_MUST_USE_RESULT bool Unref() { return refs_.Unref(); } |
|
void ForceImmediateRepoll(WakeupMask mask) { |
|
MutexLock lock(&mu_); |
|
wakeups_ |= mask; |
|
} |
|
template <typename F> |
|
GRPC_MUST_USE_RESULT bool RunParty(F poll_one_participant) { |
|
WakeupMask freed = 0; |
|
while (true) { |
|
ReleasableMutexLock lock(&mu_); |
|
CHECK(locked_); |
|
allocated_ &= ~std::exchange(freed, 0); |
|
auto wakeup = std::exchange(wakeups_, 0); |
|
if (wakeup == 0) { |
|
locked_ = false; |
|
return false; |
|
} |
|
lock.Release(); |
|
for (size_t i = 0; wakeup != 0; i++, wakeup >>= 1) { |
|
if ((wakeup & 1) == 0) continue; |
|
if (poll_one_participant(i)) freed |= 1 << i; |
|
} |
|
} |
|
} |
|
|
|
template <typename F> |
|
GRPC_MUST_USE_RESULT bool AddParticipantsAndRef(size_t count, F store) { |
|
IncrementRefCount(); |
|
MutexLock lock(&mu_); |
|
size_t slots[party_detail::kMaxParticipants]; |
|
WakeupMask wakeup_mask = 0; |
|
size_t n = 0; |
|
for (size_t bit = 0; n < count && bit < party_detail::kMaxParticipants; |
|
bit++) { |
|
if (allocated_ & (1 << bit)) continue; |
|
slots[n++] = bit; |
|
wakeup_mask |= 1 << bit; |
|
allocated_ |= 1 << bit; |
|
} |
|
CHECK(n == count); |
|
store(slots); |
|
wakeups_ |= wakeup_mask; |
|
return !std::exchange(locked_, true); |
|
} |
|
|
|
GRPC_MUST_USE_RESULT bool ScheduleWakeup(WakeupMask mask); |
|
|
|
private: |
|
RefCount refs_; |
|
Mutex mu_; |
|
WakeupMask allocated_ ABSL_GUARDED_BY(mu_) = 0; |
|
WakeupMask wakeups_ ABSL_GUARDED_BY(mu_) = 0; |
|
bool locked_ ABSL_GUARDED_BY(mu_) = false; |
|
}; |
|
|
|
// A Party is an Activity with multiple participant promises. |
|
class Party : public Activity, private Wakeable { |
|
private: |
|
// Non-owning wakeup handle. |
|
class Handle; |
|
|
|
// One participant in the party. |
|
class Participant { |
|
public: |
|
explicit Participant(absl::string_view name) : name_(name) {} |
|
// Poll the participant. Return true if complete. |
|
// Participant should take care of its own deallocation in this case. |
|
virtual bool PollParticipantPromise() = 0; |
|
|
|
// Destroy the participant before finishing. |
|
virtual void Destroy() = 0; |
|
|
|
// Return a Handle instance for this participant. |
|
Wakeable* MakeNonOwningWakeable(Party* party); |
|
|
|
absl::string_view name() const { return name_; } |
|
|
|
protected: |
|
~Participant(); |
|
|
|
private: |
|
Handle* handle_ = nullptr; |
|
absl::string_view name_; |
|
}; |
|
|
|
public: |
|
Party(const Party&) = delete; |
|
Party& operator=(const Party&) = delete; |
|
|
|
// Spawn one promise into the party. |
|
// The promise will be polled until it is resolved, or until the party is shut |
|
// down. |
|
// The on_complete callback will be called with the result of the promise if |
|
// it completes. |
|
// A maximum of sixteen promises can be spawned onto a party. |
|
template <typename Factory, typename OnComplete> |
|
void Spawn(absl::string_view name, Factory promise_factory, |
|
OnComplete on_complete); |
|
|
|
template <typename Factory> |
|
auto SpawnWaitable(absl::string_view name, Factory factory); |
|
|
|
void Orphan() final { Crash("unused"); } |
|
|
|
// Activity implementation: not allowed to be overridden by derived types. |
|
void ForceImmediateRepoll(WakeupMask mask) final; |
|
WakeupMask CurrentParticipant() const final { |
|
DCHECK(currently_polling_ != kNotPolling); |
|
return 1u << currently_polling_; |
|
} |
|
Waker MakeOwningWaker() final; |
|
Waker MakeNonOwningWaker() final; |
|
std::string ActivityDebugTag(WakeupMask wakeup_mask) const final; |
|
|
|
void IncrementRefCount() { sync_.IncrementRefCount(); } |
|
void Unref() { |
|
if (sync_.Unref()) PartyIsOver(); |
|
} |
|
RefCountedPtr<Party> Ref() { |
|
IncrementRefCount(); |
|
return RefCountedPtr<Party>(this); |
|
} |
|
|
|
// Return a promise that resolves to Empty{} when the current party poll is |
|
// complete. |
|
// This is useful for implementing batching and the like: we can hold some |
|
// action until the rest of the party resolves itself. |
|
auto AfterCurrentPoll() { |
|
DCHECK(GetContext<Activity>() == this); |
|
sync_.WakeAfterPoll(CurrentParticipant()); |
|
return [this, iteration = sync_.iteration()]() -> Poll<Empty> { |
|
DCHECK(GetContext<Activity>() == this); |
|
if (iteration == sync_.iteration()) return Pending{}; |
|
return Empty{}; |
|
}; |
|
} |
|
|
|
class BulkSpawner { |
|
public: |
|
explicit BulkSpawner(Party* party) : party_(party) {} |
|
~BulkSpawner() { |
|
party_->AddParticipants(participants_, num_participants_); |
|
} |
|
|
|
template <typename Factory, typename OnComplete> |
|
void Spawn(absl::string_view name, Factory promise_factory, |
|
OnComplete on_complete); |
|
|
|
private: |
|
Party* const party_; |
|
size_t num_participants_ = 0; |
|
Participant* participants_[party_detail::kMaxParticipants]; |
|
}; |
|
|
|
protected: |
|
explicit Party(size_t initial_refs) : sync_(initial_refs) {} |
|
~Party() override; |
|
|
|
// Main run loop. Must be locked. |
|
// Polls participants and drains the add queue until there is no work left to |
|
// be done. |
|
// Derived types will likely want to override this to set up their |
|
// contexts before polling. |
|
// Should not be called by derived types except as a tail call to the base |
|
// class RunParty when overriding this method to add custom context. |
|
// Returns true if the party is over. |
|
GRPC_MUST_USE_RESULT virtual bool RunParty(); |
|
|
|
bool RefIfNonZero() { return sync_.RefIfNonZero(); } |
|
|
|
// Destroy any remaining participants. |
|
// Should be called by derived types in response to PartyOver. |
|
// Needs to have normal context setup before calling. |
|
void CancelRemainingParticipants(); |
|
|
|
private: |
|
// Concrete implementation of a participant for some promise & oncomplete |
|
// type. |
|
template <typename SuppliedFactory, typename OnComplete> |
|
class ParticipantImpl final : public Participant { |
|
using Factory = promise_detail::OncePromiseFactory<void, SuppliedFactory>; |
|
using Promise = typename Factory::Promise; |
|
|
|
public: |
|
ParticipantImpl(absl::string_view name, SuppliedFactory promise_factory, |
|
OnComplete on_complete) |
|
: Participant(name), on_complete_(std::move(on_complete)) { |
|
Construct(&factory_, std::move(promise_factory)); |
|
} |
|
~ParticipantImpl() { |
|
if (!started_) { |
|
Destruct(&factory_); |
|
} else { |
|
Destruct(&promise_); |
|
} |
|
} |
|
|
|
bool PollParticipantPromise() override { |
|
if (!started_) { |
|
auto p = factory_.Make(); |
|
Destruct(&factory_); |
|
Construct(&promise_, std::move(p)); |
|
started_ = true; |
|
} |
|
auto p = promise_(); |
|
if (auto* r = p.value_if_ready()) { |
|
on_complete_(std::move(*r)); |
|
delete this; |
|
return true; |
|
} |
|
return false; |
|
} |
|
|
|
void Destroy() override { delete this; } |
|
|
|
private: |
|
union { |
|
GPR_NO_UNIQUE_ADDRESS Factory factory_; |
|
GPR_NO_UNIQUE_ADDRESS Promise promise_; |
|
}; |
|
GPR_NO_UNIQUE_ADDRESS OnComplete on_complete_; |
|
bool started_ = false; |
|
}; |
|
|
|
template <typename SuppliedFactory> |
|
class PromiseParticipantImpl final |
|
: public RefCounted<PromiseParticipantImpl<SuppliedFactory>, |
|
NonPolymorphicRefCount>, |
|
public Participant { |
|
using Factory = promise_detail::OncePromiseFactory<void, SuppliedFactory>; |
|
using Promise = typename Factory::Promise; |
|
using Result = typename Promise::Result; |
|
|
|
public: |
|
PromiseParticipantImpl(absl::string_view name, |
|
SuppliedFactory promise_factory) |
|
: Participant(name) { |
|
Construct(&factory_, std::move(promise_factory)); |
|
} |
|
|
|
~PromiseParticipantImpl() { |
|
switch (state_.load(std::memory_order_acquire)) { |
|
case State::kFactory: |
|
Destruct(&factory_); |
|
break; |
|
case State::kPromise: |
|
Destruct(&promise_); |
|
break; |
|
case State::kResult: |
|
Destruct(&result_); |
|
break; |
|
} |
|
} |
|
|
|
// Inside party poll: drive from factory -> promise -> result |
|
bool PollParticipantPromise() override { |
|
switch (state_.load(std::memory_order_relaxed)) { |
|
case State::kFactory: { |
|
auto p = factory_.Make(); |
|
Destruct(&factory_); |
|
Construct(&promise_, std::move(p)); |
|
state_.store(State::kPromise, std::memory_order_relaxed); |
|
} |
|
ABSL_FALLTHROUGH_INTENDED; |
|
case State::kPromise: { |
|
auto p = promise_(); |
|
if (auto* r = p.value_if_ready()) { |
|
Destruct(&promise_); |
|
Construct(&result_, std::move(*r)); |
|
state_.store(State::kResult, std::memory_order_release); |
|
waiter_.Wakeup(); |
|
this->Unref(); |
|
return true; |
|
} |
|
return false; |
|
} |
|
case State::kResult: |
|
Crash( |
|
"unreachable: promises should not be repolled after completion"); |
|
} |
|
} |
|
|
|
// Outside party poll: check whether the spawning party has completed this |
|
// promise. |
|
Poll<Result> PollCompletion() { |
|
switch (state_.load(std::memory_order_acquire)) { |
|
case State::kFactory: |
|
case State::kPromise: |
|
return Pending{}; |
|
case State::kResult: |
|
return std::move(result_); |
|
} |
|
} |
|
|
|
void Destroy() override { this->Unref(); } |
|
|
|
private: |
|
enum class State : uint8_t { kFactory, kPromise, kResult }; |
|
union { |
|
GPR_NO_UNIQUE_ADDRESS Factory factory_; |
|
GPR_NO_UNIQUE_ADDRESS Promise promise_; |
|
GPR_NO_UNIQUE_ADDRESS Result result_; |
|
}; |
|
Waker waiter_{GetContext<Activity>()->MakeOwningWaker()}; |
|
std::atomic<State> state_{State::kFactory}; |
|
}; |
|
|
|
// Notification that the party has finished and this instance can be deleted. |
|
// Derived types should arrange to call CancelRemainingParticipants during |
|
// this sequence. |
|
virtual void PartyOver() = 0; |
|
|
|
// Run the locked part of the party until it is unlocked. |
|
void RunLocked(); |
|
// Called in response to Unref() hitting zero - ultimately calls PartyOver, |
|
// but needs to set some stuff up. |
|
// Here so it gets compiled out of line. |
|
void PartyIsOver(); |
|
|
|
// Wakeable implementation |
|
void Wakeup(WakeupMask wakeup_mask) final; |
|
void WakeupAsync(WakeupMask wakeup_mask) final; |
|
void Drop(WakeupMask wakeup_mask) final; |
|
|
|
// Add a participant (backs Spawn, after type erasure to ParticipantFactory). |
|
void AddParticipants(Participant** participant, size_t count); |
|
bool RunOneParticipant(int i); |
|
|
|
virtual grpc_event_engine::experimental::EventEngine* event_engine() |
|
const = 0; |
|
|
|
// Sentinal value for currently_polling_ when no participant is being polled. |
|
static constexpr uint8_t kNotPolling = 255; |
|
|
|
#ifdef GRPC_PARTY_SYNC_USING_ATOMICS |
|
PartySyncUsingAtomics sync_; |
|
#elif defined(GRPC_PARTY_SYNC_USING_MUTEX) |
|
PartySyncUsingMutex sync_; |
|
#else |
|
#error No synchronization method defined |
|
#endif |
|
|
|
uint8_t currently_polling_ = kNotPolling; |
|
// All current participants, using a tagged format. |
|
// If the lower bit is unset, then this is a Participant*. |
|
// If the lower bit is set, then this is a ParticipantFactory*. |
|
std::atomic<Participant*> participants_[party_detail::kMaxParticipants] = {}; |
|
}; |
|
|
|
template <> |
|
struct ContextSubclass<Party> { |
|
using Base = Activity; |
|
}; |
|
|
|
template <typename Factory, typename OnComplete> |
|
void Party::BulkSpawner::Spawn(absl::string_view name, Factory promise_factory, |
|
OnComplete on_complete) { |
|
if (grpc_trace_promise_primitives.enabled()) { |
|
gpr_log(GPR_DEBUG, "%s[bulk_spawn] On %p queue %s", |
|
party_->DebugTag().c_str(), this, std::string(name).c_str()); |
|
} |
|
participants_[num_participants_++] = new ParticipantImpl<Factory, OnComplete>( |
|
name, std::move(promise_factory), std::move(on_complete)); |
|
} |
|
|
|
template <typename Factory, typename OnComplete> |
|
void Party::Spawn(absl::string_view name, Factory promise_factory, |
|
OnComplete on_complete) { |
|
BulkSpawner(this).Spawn(name, std::move(promise_factory), |
|
std::move(on_complete)); |
|
} |
|
|
|
template <typename Factory> |
|
auto Party::SpawnWaitable(absl::string_view name, Factory promise_factory) { |
|
auto participant = MakeRefCounted<PromiseParticipantImpl<Factory>>( |
|
name, std::move(promise_factory)); |
|
Participant* p = participant->Ref().release(); |
|
AddParticipants(&p, 1); |
|
return [participant = std::move(participant)]() mutable { |
|
return participant->PollCompletion(); |
|
}; |
|
} |
|
|
|
} // namespace grpc_core |
|
|
|
#endif // GRPC_SRC_CORE_LIB_PROMISE_PARTY_H
|
|
|