EventEngine::RunAt - promise/sleep (#29726)

* EventEngine::RunAt - promise/sleep

Also refactors time_util for easier conversion between
grpc_core::Timestamp/Duration to absl::Timestamp/Duration.

* move OnTimer to the state object

* generate_projects

* Automated change: Fix sanity tests

* undo time_util separation due to gpr_base dependency cycles

* less snark

* remove stale test & add missing unref

* Automated change: Fix sanity tests

* fix the auto-fixer's mistake (missing build dep)

* tidy

* rm Sleep::State

* Automated change: Fix sanity tests

* add missing exec_ctx into callback

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
Co-authored-by: Vignesh Babu <vigneshbabu@google.com>
pull/29744/head^2
AJ Heller 3 years ago committed by GitHub
parent 175580aedc
commit 5c743646e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      BUILD
  2. 80
      src/core/lib/promise/sleep.cc
  3. 50
      src/core/lib/promise/sleep.h
  4. 1
      test/core/promise/BUILD
  5. 18
      test/core/promise/sleep_test.cc

@ -1200,18 +1200,17 @@ grpc_cc_library(
external_deps = [
"absl/base:core_headers",
"absl/status",
"absl/time",
],
tags = ["grpc-autodeps"],
deps = [
"activity",
"closure",
"error",
"default_event_engine_factory_hdrs",
"event_engine_base_hdrs",
"exec_ctx",
"gpr_base",
"gpr_platform",
"iomgr_timer",
"poll",
"ref_counted",
"time",
],
)

@ -16,60 +16,78 @@
#include "src/core/lib/promise/sleep.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/event_engine_factory.h"
#include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc_core {
Sleep::Sleep(Timestamp deadline) : state_(new State(deadline)) {
GRPC_CLOSURE_INIT(&state_->on_timer, &OnTimer, state_, nullptr);
}
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
Sleep::Sleep(Timestamp deadline) : deadline_(deadline) {}
Sleep::~Sleep() {
if (state_ == nullptr) return;
{
MutexLock lock(&state_->mu);
switch (state_->stage) {
case Stage::kInitial:
state_->Unref();
break;
case Stage::kStarted:
grpc_timer_cancel(&state_->timer);
break;
case Stage::kDone:
break;
}
if (deadline_ == Timestamp::InfPast()) return;
ReleasableMutexLock lock(&mu_);
switch (stage_) {
case Stage::kInitial:
break;
case Stage::kStarted:
if (GetDefaultEventEngine()->Cancel(timer_handle_)) {
lock.Release();
OnTimer();
}
break;
case Stage::kDone:
break;
}
state_->Unref();
}
void Sleep::OnTimer(void* arg, grpc_error_handle) {
auto* state = static_cast<State*>(arg);
Waker waker;
void Sleep::OnTimer() {
Waker tmp_waker;
{
MutexLock lock(&state->mu);
state->stage = Stage::kDone;
waker = std::move(state->waker);
MutexLock lock(&mu_);
stage_ = Stage::kDone;
tmp_waker = std::move(waker_);
}
waker.Wakeup();
state->Unref();
tmp_waker.Wakeup();
}
// TODO(hork): refactor gpr_base to allow a separate time_util target.
namespace {
absl::Time ToAbslTime(Timestamp timestamp) {
if (timestamp == Timestamp::InfFuture()) return absl::InfiniteFuture();
if (timestamp == Timestamp::InfPast()) return absl::InfinitePast();
return absl::Now() +
absl::Milliseconds((timestamp - ExecCtx::Get()->Now()).millis());
}
} // namespace
Poll<absl::Status> Sleep::operator()() {
MutexLock lock(&state_->mu);
switch (state_->stage) {
MutexLock lock(&mu_);
switch (stage_) {
case Stage::kInitial:
if (state_->deadline <= ExecCtx::Get()->Now()) {
if (deadline_ <= ExecCtx::Get()->Now()) {
return absl::OkStatus();
}
state_->stage = Stage::kStarted;
grpc_timer_init(&state_->timer, state_->deadline, &state_->on_timer);
stage_ = Stage::kStarted;
timer_handle_ =
GetDefaultEventEngine()->RunAt(ToAbslTime(deadline_), [this] {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
OnTimer();
});
break;
case Stage::kStarted:
break;
case Stage::kDone:
return absl::OkStatus();
}
state_->waker = Activity::current()->MakeNonOwningWaker();
waker_ = Activity::current()->MakeNonOwningWaker();
return Pending{};
}

@ -22,12 +22,10 @@
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
@ -41,34 +39,36 @@ class Sleep {
Sleep(const Sleep&) = delete;
Sleep& operator=(const Sleep&) = delete;
Sleep(Sleep&& other) noexcept : state_(other.state_) {
other.state_ = nullptr;
}
Sleep(Sleep&& other) noexcept
: deadline_(other.deadline_), timer_handle_(other.timer_handle_) {
MutexLock lock2(&other.mu_);
stage_ = other.stage_;
waker_ = std::move(other.waker_);
other.deadline_ = Timestamp::InfPast();
};
Sleep& operator=(Sleep&& other) noexcept {
std::swap(state_, other.state_);
if (&other == this) return *this;
MutexLock lock1(&mu_);
MutexLock lock2(&other.mu_);
deadline_ = other.deadline_;
timer_handle_ = other.timer_handle_;
stage_ = other.stage_;
waker_ = std::move(other.waker_);
other.deadline_ = Timestamp::InfPast();
return *this;
}
};
Poll<absl::Status> operator()();
private:
static void OnTimer(void* arg, grpc_error_handle error);
enum class Stage { kInitial, kStarted, kDone };
struct State {
explicit State(Timestamp deadline) : deadline(deadline) {}
RefCount refs{2};
const Timestamp deadline;
grpc_timer timer;
grpc_closure on_timer;
Mutex mu;
Stage stage ABSL_GUARDED_BY(mu) = Stage::kInitial;
Waker waker ABSL_GUARDED_BY(mu);
void Unref() {
if (refs.Unref()) delete this;
}
};
State* state_;
void OnTimer();
Timestamp deadline_;
grpc_event_engine::experimental::EventEngine::TaskHandle timer_handle_;
Mutex mu_;
Stage stage_ ABSL_GUARDED_BY(mu_) = Stage::kInitial;
Waker waker_ ABSL_GUARDED_BY(mu_);
};
} // namespace grpc_core

@ -374,6 +374,7 @@ grpc_cc_test(
deps = [
"test_wakeup_schedulers",
"//:activity",
"//:exec_ctx",
"//:grpc",
"//:sleep",
"//test/core/util:grpc_suppressions",

@ -23,6 +23,7 @@
#include <grpc/grpc.h>
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/race.h"
#include "src/core/lib/promise/seq.h"
#include "test/core/promise/test_wakeup_schedulers.h"
@ -74,6 +75,23 @@ TEST(Sleep, Cancel) {
EXPECT_LT(ExecCtx::Get()->Now(), done_time);
}
TEST(Sleep, MoveSemantics) {
// ASAN should help determine if there are any memory leaks here
ExecCtx exec_ctx;
absl::Notification done;
Timestamp done_time = ExecCtx::Get()->Now() + Duration::Milliseconds(111);
Sleep donor(done_time);
Sleep sleeper = std::move(donor);
auto activity = MakeActivity(std::move(sleeper), InlineWakeupScheduler(),
[&done](absl::Status r) {
EXPECT_EQ(r, absl::OkStatus());
done.Notify();
});
done.WaitForNotification();
exec_ctx.InvalidateNow();
EXPECT_GE(ExecCtx::Get()->Now(), done_time);
}
} // namespace
} // namespace grpc_core

Loading…
Cancel
Save