[sleep] Make this robust against poorly implemented event engines (#30617)

* [sleep] Make this robust against poorly implemented event engines

* Automated change: Fix sanity tests

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/30632/head
Craig Tiller 2 years ago committed by GitHub
parent 36a1e0dd5e
commit 2a7286a67c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 16
      src/core/lib/promise/sleep.cc
  3. 10
      src/core/lib/promise/sleep.h

@ -1273,7 +1273,6 @@ grpc_cc_library(
"exec_ctx",
"gpr",
"poll",
"ref_counted",
"time",
],
)

@ -24,6 +24,7 @@
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
namespace grpc_core {
@ -46,6 +47,7 @@ Poll<absl::Status> Sleep::operator()() {
// cpu? - to avoid allocating/deallocating on fast paths.
closure_ = new ActiveClosure(deadline_);
}
if (closure_->HasRun()) return absl::OkStatus();
return Pending{};
}
@ -58,7 +60,7 @@ void Sleep::ActiveClosure::Run() {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto waker = std::move(waker_);
if (refs_.Unref()) {
if (Unref()) {
delete this;
} else {
waker.Wakeup();
@ -69,9 +71,19 @@ void Sleep::ActiveClosure::Cancel() {
// If we cancel correctly then we must own both refs still and can simply
// delete without unreffing twice, otherwise try unreffing since this may be
// the last owned ref.
if (GetDefaultEventEngine()->Cancel(timer_handle_) || refs_.Unref()) {
if (GetDefaultEventEngine()->Cancel(timer_handle_) || Unref()) {
delete this;
}
}
bool Sleep::ActiveClosure::Unref() {
return (refs_.fetch_sub(1, std::memory_order_acq_rel) == 1);
}
bool Sleep::ActiveClosure::HasRun() const {
// If the closure has run (ie woken up the activity) then it will have
// decremented this ref count once.
return refs_.load(std::memory_order_acquire) == 1;
}
} // namespace grpc_core

@ -17,12 +17,13 @@
#include <grpc/support/port_platform.h>
#include <atomic>
#include "absl/status/status.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
@ -60,12 +61,17 @@ class Sleep final {
explicit ActiveClosure(Timestamp deadline);
void Run() override;
// After calling Cancel, it's no longer safe to access this object.
void Cancel();
bool HasRun() const;
private:
bool Unref();
Waker waker_;
// One ref dropped by Run(), the other by Cancel().
RefCount refs_{2};
std::atomic<int> refs_{2};
const grpc_event_engine::experimental::EventEngine::TaskHandle
timer_handle_;
};

Loading…
Cancel
Save