[sleep] Rewrite to avoid bug (#30471)

* [sleep] Rewrite to avoid bug

* comments

* Automated change: Fix sanity tests

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/30475/head
Craig Tiller 2 years ago committed by GitHub
parent 9d4e0e17fe
commit e008a6e8b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      BUILD
  2. 78
      src/core/lib/promise/sleep.cc
  3. 51
      src/core/lib/promise/sleep.h
  4. 33
      test/core/promise/sleep_test.cc

@ -1254,10 +1254,7 @@ grpc_cc_library(
hdrs = [
"src/core/lib/promise/sleep.h",
],
external_deps = [
"absl/base:core_headers",
"absl/status",
],
external_deps = ["absl/status"],
deps = [
"activity",
"event_engine_base",
@ -1265,6 +1262,7 @@ grpc_cc_library(
"exec_ctx",
"gpr_base",
"poll",
"ref_counted",
"time",
],
)

@ -16,10 +16,14 @@
#include "src/core/lib/promise/sleep.h"
#include <utility>
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/event_engine_factory.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/activity.h"
namespace grpc_core {
@ -28,54 +32,46 @@ using ::grpc_event_engine::experimental::GetDefaultEventEngine;
Sleep::Sleep(Timestamp deadline) : deadline_(deadline) {}
Sleep::~Sleep() {
if (deadline_ == Timestamp::InfPast()) return;
ReleasableMutexLock lock(&mu_);
switch (stage_) {
case Stage::kInitial:
break;
case Stage::kStarted:
if (GetDefaultEventEngine()->Cancel(timer_handle_)) {
lock.Release();
OnTimer();
}
break;
case Stage::kDone:
break;
if (closure_ != nullptr) closure_->Cancel();
}
Poll<absl::Status> Sleep::operator()() {
// Invalidate now so that we see a fresh version of the time.
// TODO(ctiller): the following can be safely removed when we remove ExecCtx.
ExecCtx::Get()->InvalidateNow();
// If the deadline is earlier than now we can just return.
if (deadline_ <= ExecCtx::Get()->Now()) return absl::OkStatus();
if (closure_ == nullptr) {
// TODO(ctiller): it's likely we'll want a pool of closures - probably per
// cpu? - to avoid allocating/deallocating on fast paths.
closure_ = new ActiveClosure(deadline_);
}
return Pending{};
}
void Sleep::OnTimer() {
Waker tmp_waker;
{
MutexLock lock(&mu_);
stage_ = Stage::kDone;
tmp_waker = std::move(waker_);
Sleep::ActiveClosure::ActiveClosure(Timestamp deadline)
: waker_(Activity::current()->MakeOwningWaker()),
timer_handle_(GetDefaultEventEngine()->RunAfter(
deadline - ExecCtx::Get()->Now(), this)) {}
void Sleep::ActiveClosure::Run() {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto waker = std::move(waker_);
if (refs_.Unref()) {
delete this;
} else {
waker.Wakeup();
}
tmp_waker.Wakeup();
}
Poll<absl::Status> Sleep::operator()() {
MutexLock lock(&mu_);
switch (stage_) {
case Stage::kInitial:
if (deadline_ <= ExecCtx::Get()->Now()) {
return absl::OkStatus();
}
stage_ = Stage::kStarted;
timer_handle_ = GetDefaultEventEngine()->RunAfter(
deadline_ - ExecCtx::Get()->Now(), [this] {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
OnTimer();
});
break;
case Stage::kStarted:
break;
case Stage::kDone:
return absl::OkStatus();
void Sleep::ActiveClosure::Cancel() {
// If we cancel correctly then we must own both refs still and can simply
// delete without unreffing twice, otherwise try unreffing since this may be
// the last owned ref.
if (GetDefaultEventEngine()->Cancel(timer_handle_) || refs_.Unref()) {
delete this;
}
waker_ = Activity::current()->MakeNonOwningWaker();
return Pending{};
}
} // namespace grpc_core

@ -17,14 +17,12 @@
#include <grpc/support/port_platform.h>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
@ -32,43 +30,48 @@
namespace grpc_core {
// Promise that sleeps until a deadline and then finishes.
class Sleep {
class Sleep final {
public:
explicit Sleep(Timestamp deadline);
~Sleep();
Sleep(const Sleep&) = delete;
Sleep& operator=(const Sleep&) = delete;
Sleep(Sleep&& other) noexcept
: deadline_(other.deadline_), timer_handle_(other.timer_handle_) {
MutexLock lock2(&other.mu_);
stage_ = other.stage_;
waker_ = std::move(other.waker_);
other.deadline_ = Timestamp::InfPast();
Sleep(Sleep&& other) noexcept : deadline_(other.deadline_) {
// Promises can be moved only until they're polled, and since we only create
// the closure when first polled we can assume it's nullptr here.
GPR_DEBUG_ASSERT(other.closure_ == nullptr);
};
Sleep& operator=(Sleep&& other) noexcept {
if (&other == this) return *this;
MutexLock lock1(&mu_);
MutexLock lock2(&other.mu_);
// Promises can be moved only until they're polled, and since we only create
// the closure when first polled we can assume it's nullptr here.
GPR_DEBUG_ASSERT(closure_ == nullptr);
GPR_DEBUG_ASSERT(other.closure_ == nullptr);
deadline_ = other.deadline_;
timer_handle_ = other.timer_handle_;
stage_ = other.stage_;
waker_ = std::move(other.waker_);
other.deadline_ = Timestamp::InfPast();
return *this;
};
Poll<absl::Status> operator()();
private:
enum class Stage { kInitial, kStarted, kDone };
void OnTimer();
class ActiveClosure final
: public grpc_event_engine::experimental::EventEngine::Closure {
public:
explicit ActiveClosure(Timestamp deadline);
void Run() override;
void Cancel();
private:
Waker waker_;
// One ref dropped by Run(), the other by Cancel().
RefCount refs_{2};
const grpc_event_engine::experimental::EventEngine::TaskHandle
timer_handle_;
};
Timestamp deadline_;
grpc_event_engine::experimental::EventEngine::TaskHandle timer_handle_;
Mutex mu_;
Stage stage_ ABSL_GUARDED_BY(mu_) = Stage::kInitial;
Waker waker_ ABSL_GUARDED_BY(mu_);
ActiveClosure* closure_{nullptr};
};
} // namespace grpc_core

@ -14,12 +14,19 @@
#include "src/core/lib/promise/sleep.h"
#include <algorithm>
#include <cstddef>
#include <memory>
#include <utility>
#include <vector>
#include "absl/synchronization/notification.h"
#include "gtest/gtest.h"
#include <grpc/grpc.h>
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
#include "src/core/lib/promise/race.h"
#include "test/core/promise/test_wakeup_schedulers.h"
@ -87,6 +94,32 @@ TEST(Sleep, MoveSemantics) {
EXPECT_GE(ExecCtx::Get()->Now(), done_time);
}
TEST(Sleep, StressTest) {
// Kick off a bunch sleeps for one second.
static const int kNumActivities = 100000;
ExecCtx exec_ctx;
std::vector<std::shared_ptr<absl::Notification>> notifications;
std::vector<ActivityPtr> activities;
gpr_log(GPR_INFO, "Starting %d sleeps for 1sec", kNumActivities);
for (int i = 0; i < kNumActivities; i++) {
auto notification = std::make_shared<absl::Notification>();
auto activity = MakeActivity(
Sleep(exec_ctx.Now() + Duration::Seconds(1)), ExecCtxWakeupScheduler(),
[notification](absl::Status r) { notification->Notify(); });
notifications.push_back(std::move(notification));
activities.push_back(std::move(activity));
}
gpr_log(GPR_INFO,
"Waiting for the first %d sleeps, whilst cancelling the other half",
kNumActivities / 2);
for (size_t i = 0; i < kNumActivities / 2; i++) {
notifications[i]->WaitForNotification();
activities[i].reset();
activities[i + kNumActivities / 2].reset();
exec_ctx.Flush();
}
}
} // namespace
} // namespace grpc_core

Loading…
Cancel
Save