EventEngine::RunAt: C++ Alarm (#30024)

* EventEngine::RunAt: C++ Alarm

* format

* mutex

* iwyu

* RunAfter EE API change

* Automated change: Fix sanity tests

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
pull/29999/head^2
AJ Heller 2 years ago committed by GitHub
parent d374a2ea9e
commit da0eb19b22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      BUILD
  2. 120
      src/cpp/common/alarm.cc

10
BUILD

@ -6280,9 +6280,11 @@ grpc_cc_library(
hdrs = GRPCXX_HDRS,
external_deps = [
"absl/base:core_headers",
"absl/memory",
"absl/status",
"absl/strings",
"absl/synchronization",
"absl/memory",
"absl/types:optional",
"upb_lib",
"protobuf_headers",
],
@ -6293,6 +6295,7 @@ grpc_cc_library(
"arena",
"channel_init",
"config",
"default_event_engine_factory_hdrs",
"gpr_base",
"gpr_codegen",
"grpc",
@ -6323,9 +6326,11 @@ grpc_cc_library(
hdrs = GRPCXX_HDRS,
external_deps = [
"absl/base:core_headers",
"absl/memory",
"absl/status",
"absl/strings",
"absl/synchronization",
"absl/memory",
"absl/types:optional",
"upb_lib",
"protobuf_headers",
],
@ -6337,6 +6342,7 @@ grpc_cc_library(
"arena",
"channel_init",
"config",
"default_event_engine_factory_hdrs",
"gpr_base",
"gpr_codegen",
"grpc++_codegen_base",

@ -20,6 +20,10 @@
#include <functional>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/gpr_types.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>
@ -29,23 +33,23 @@
#include <grpcpp/impl/codegen/completion_queue_tag.h>
#include <grpcpp/impl/grpc_library.h>
#include "src/core/lib/event_engine/event_engine_factory.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/completion_queue.h"
namespace grpc {
namespace internal {
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
class AlarmImpl : public grpc::internal::CompletionQueueTag {
public:
AlarmImpl() : cq_(nullptr), tag_(nullptr) {
gpr_ref_init(&refs_, 1);
grpc_timer_init_unset(&timer_);
}
AlarmImpl() : cq_(nullptr), tag_(nullptr) { gpr_ref_init(&refs_, 1); }
~AlarmImpl() override {}
bool FinalizeResult(void** tag, bool* /*status*/) override {
*tag = tag_;
@ -53,61 +57,46 @@ class AlarmImpl : public grpc::internal::CompletionQueueTag {
return true;
}
void Set(grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::MutexLock lock(&mu_);
GPR_ASSERT(!cq_timer_handle_.has_value() &&
!callback_timer_handle_.has_value());
grpc_core::ExecCtx exec_ctx;
GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm");
cq_ = cq->cq();
tag_ = tag;
GPR_ASSERT(grpc_cq_begin_op(cq_, this));
GRPC_CLOSURE_INIT(
&on_alarm_,
[](void* arg, grpc_error_handle error) {
// queue the op on the completion queue
AlarmImpl* alarm = static_cast<AlarmImpl*>(arg);
alarm->Ref();
// Preserve the cq and reset the cq_ so that the alarm
// can be reset when the alarm tag is delivered.
grpc_completion_queue* cq = alarm->cq_;
alarm->cq_ = nullptr;
grpc_cq_end_op(
cq, alarm, error,
[](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, arg,
&alarm->completion_);
GRPC_CQ_INTERNAL_UNREF(cq, "alarm");
},
this, grpc_schedule_on_exec_ctx);
grpc_timer_init(&timer_,
grpc_core::Timestamp::FromTimespecRoundUp(deadline),
&on_alarm_);
Ref();
cq_timer_handle_ = GetDefaultEventEngine()->RunAfter(
grpc_core::Timestamp::FromTimespecRoundUp(deadline) -
grpc_core::ExecCtx::Get()->Now(),
[this] { OnCQAlarm(GRPC_ERROR_NONE); });
}
void Set(gpr_timespec deadline, std::function<void(bool)> f) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::MutexLock lock(&mu_);
GPR_ASSERT(!cq_timer_handle_.has_value() &&
!callback_timer_handle_.has_value());
grpc_core::ExecCtx exec_ctx;
// Don't use any CQ at all. Instead just use the timer to fire the function
callback_ = std::move(f);
Ref();
GRPC_CLOSURE_INIT(
&on_alarm_,
[](void* arg, grpc_error_handle error) {
grpc_core::Executor::Run(
GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error_handle error) {
AlarmImpl* alarm = static_cast<AlarmImpl*>(arg);
alarm->callback_(GRPC_ERROR_IS_NONE(error));
alarm->Unref();
},
arg, nullptr),
error);
},
this, grpc_schedule_on_exec_ctx);
grpc_timer_init(&timer_,
grpc_core::Timestamp::FromTimespecRoundUp(deadline),
&on_alarm_);
callback_timer_handle_ = GetDefaultEventEngine()->RunAfter(
grpc_core::Timestamp::FromTimespecRoundUp(deadline) -
grpc_core::ExecCtx::Get()->Now(),
[this] { OnCallbackAlarm(true); });
}
void Cancel() {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_timer_cancel(&timer_);
grpc_core::MutexLock lock(&mu_);
if (callback_timer_handle_.has_value() &&
GetDefaultEventEngine()->Cancel(*callback_timer_handle_)) {
GetDefaultEventEngine()->Run(
[this] { OnCallbackAlarm(/*is_ok=*/false); });
callback_timer_handle_.reset();
} else if (cq_timer_handle_.has_value() &&
GetDefaultEventEngine()->Cancel(*cq_timer_handle_)) {
GetDefaultEventEngine()->Run([this] { OnCQAlarm(GRPC_ERROR_CANCELLED); });
cq_timer_handle_.reset();
}
}
void Destroy() {
Cancel();
@ -115,6 +104,35 @@ class AlarmImpl : public grpc::internal::CompletionQueueTag {
}
private:
void OnCQAlarm(grpc_error_handle error) {
{
grpc_core::MutexLock lock(&mu_);
cq_timer_handle_.reset();
}
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
// Preserve the cq and reset the cq_ so that the alarm
// can be reset when the alarm tag is delivered.
grpc_completion_queue* cq = cq_;
cq_ = nullptr;
grpc_cq_end_op(
cq, this, error,
[](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, nullptr,
&completion_);
GRPC_CQ_INTERNAL_UNREF(cq, "alarm");
}
void OnCallbackAlarm(bool is_ok) {
{
grpc_core::MutexLock lock(&mu_);
callback_timer_handle_.reset();
}
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
callback_(is_ok);
Unref();
}
void Ref() { gpr_ref(&refs_); }
void Unref() {
if (gpr_unref(&refs_)) {
@ -122,9 +140,11 @@ class AlarmImpl : public grpc::internal::CompletionQueueTag {
}
}
grpc_timer timer_;
grpc_core::Mutex mu_;
absl::optional<EventEngine::TaskHandle> cq_timer_handle_ ABSL_GUARDED_BY(mu_);
absl::optional<EventEngine::TaskHandle> callback_timer_handle_
ABSL_GUARDED_BY(mu_);
gpr_refcount refs_;
grpc_closure on_alarm_;
grpc_cq_completion completion_;
// completion queue where events about this alarm will be posted
grpc_completion_queue* cq_;

Loading…
Cancel
Save