[promises] Add AtomicWaker type (#30561)

* [promises] Add AtomicWaker type

* Automated change: Fix sanity tests

* review feedback

* fix

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/30587/head
Craig Tiller 3 years ago committed by GitHub
parent ee7c0a8e4c
commit 8d37f43bfd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 61
      src/core/lib/promise/activity.h
  2. 71
      test/core/promise/activity_test.cc

@ -57,12 +57,24 @@ class Wakeable {
inline ~Wakeable() {}
};
namespace activity_detail {
struct Unwakeable final : public Wakeable {
void Wakeup() override {}
void Drop() override {}
};
static Unwakeable* unwakeable() {
return NoDestructSingleton<Unwakeable>::Get();
}
} // namespace activity_detail
class AtomicWaker;
// An owning reference to a Wakeable.
// This type is non-copyable but movable.
class Waker {
public:
explicit Waker(Wakeable* wakeable) : wakeable_(wakeable) {}
Waker() : Waker(unwakeable()) {}
Waker() : Waker(activity_detail::unwakeable()) {}
~Waker() { wakeable_->Drop(); }
Waker(const Waker&) = delete;
Waker& operator=(const Waker&) = delete;
@ -85,19 +97,50 @@ class Waker {
}
private:
struct Unwakeable final : public Wakeable {
void Wakeup() override {}
void Drop() override {}
};
static Unwakeable* unwakeable() {
return NoDestructSingleton<Unwakeable>::Get();
}
friend class AtomicWaker;
Wakeable* Take() { return absl::exchange(wakeable_, unwakeable()); }
Wakeable* Take() {
return absl::exchange(wakeable_, activity_detail::unwakeable());
}
Wakeable* wakeable_;
};
// An atomic variant of Waker - this type is non-copyable and non-movable.
class AtomicWaker {
public:
explicit AtomicWaker(Wakeable* wakeable) : wakeable_(wakeable) {}
AtomicWaker() : AtomicWaker(activity_detail::unwakeable()) {}
explicit AtomicWaker(Waker waker) : AtomicWaker(waker.Take()) {}
~AtomicWaker() { wakeable_.load(std::memory_order_acquire)->Drop(); }
AtomicWaker(const AtomicWaker&) = delete;
AtomicWaker& operator=(const AtomicWaker&) = delete;
AtomicWaker(AtomicWaker&& other) noexcept = delete;
AtomicWaker& operator=(AtomicWaker&& other) noexcept = delete;
// Wake the underlying activity.
void Wakeup() { Take()->Wakeup(); }
// Return true if there is a not-unwakeable wakeable present.
bool Armed() const noexcept {
return wakeable_.load(std::memory_order_relaxed) !=
activity_detail::unwakeable();
}
// Set to some new waker
void Set(Waker waker) {
wakeable_.exchange(waker.Take(), std::memory_order_acq_rel)->Wakeup();
}
private:
Wakeable* Take() {
return wakeable_.exchange(activity_detail::unwakeable(),
std::memory_order_acq_rel);
}
std::atomic<Wakeable*> wakeable_;
};
// An Activity tracks execution of a single promise.
// It executes the promise under a mutex.
// When the promise stalls, it registers the containing activity to be woken up

@ -16,8 +16,12 @@
#include <stdlib.h>
#include <atomic>
#include <chrono>
#include <functional>
#include <thread>
#include <tuple>
#include <vector>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
@ -321,6 +325,73 @@ TEST(WakerTest, CanWakeupEmptyWaker) {
Waker().Wakeup();
}
TEST(AtomicWakerTest, CanWakeupEmptyWaker) {
// Empty wakers should not do anything upon wakeup.
AtomicWaker waker;
EXPECT_FALSE(waker.Armed());
waker.Wakeup();
}
class TestWakeable final : public Wakeable {
public:
TestWakeable(std::atomic<int>* wakeups, std::atomic<int>* drops)
: wakeups_(wakeups), drops_(drops) {}
void Wakeup() override {
wakeups_->fetch_add(1, std::memory_order_relaxed);
delete this;
}
void Drop() override {
drops_->fetch_add(1, std::memory_order_relaxed);
delete this;
}
private:
std::atomic<int>* const wakeups_;
std::atomic<int>* const drops_;
};
TEST(AtomicWakerTest, ThreadStress) {
std::vector<std::thread> threads;
std::atomic<bool> done{false};
std::atomic<int> wakeups{0};
std::atomic<int> drops{0};
std::atomic<int> armed{0};
std::atomic<int> not_armed{0};
AtomicWaker waker;
threads.reserve(90);
for (int i = 0; i < 30; i++) {
threads.emplace_back([&] {
while (!done.load(std::memory_order_relaxed)) {
waker.Wakeup();
}
});
}
for (int i = 0; i < 30; i++) {
threads.emplace_back([&] {
while (!done.load(std::memory_order_relaxed)) {
waker.Set(Waker(new TestWakeable(&wakeups, &drops)));
}
});
}
for (int i = 0; i < 30; i++) {
threads.emplace_back([&] {
while (!done.load(std::memory_order_relaxed)) {
(waker.Armed() ? &armed : &not_armed)
->fetch_add(1, std::memory_order_relaxed);
}
});
}
do {
std::this_thread::sleep_for(std::chrono::seconds(1));
} while (wakeups.load(std::memory_order_relaxed) == 0 ||
armed.load(std::memory_order_relaxed) == 0 ||
not_armed.load(std::memory_order_relaxed) == 0);
done.store(true, std::memory_order_relaxed);
for (auto& t : threads) t.join();
}
} // namespace grpc_core
int main(int argc, char** argv) {

Loading…
Cancel
Save