diff --git a/BUILD b/BUILD index bfe7fbca795..2a5b1cc7a3a 100644 --- a/BUILD +++ b/BUILD @@ -1946,7 +1946,6 @@ grpc_cc_library( "//src/core:channel_args", "//src/core:channel_init", "//src/core:closure", - "//src/core:default_event_engine", "//src/core:error", "//src/core:gpr_atm", "//src/core:gpr_manual_constructor", diff --git a/include/grpcpp/alarm.h b/include/grpcpp/alarm.h index ab3e451cf67..b35dca2771d 100644 --- a/include/grpcpp/alarm.h +++ b/include/grpcpp/alarm.h @@ -23,7 +23,6 @@ #include -#include #include #include #include diff --git a/src/cpp/common/alarm.cc b/src/cpp/common/alarm.cc index 8e7efbf95cb..c119d7eb904 100644 --- a/src/cpp/common/alarm.cc +++ b/src/cpp/common/alarm.cc @@ -18,14 +18,8 @@ #include #include -#include #include -#include "absl/base/thread_annotations.h" -#include "absl/status/status.h" -#include "absl/types/optional.h" - -#include #include #include #include @@ -34,28 +28,22 @@ #include #include -#include "src/core/lib/event_engine/default_event_engine.h" -#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/time.h" +#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/surface/completion_queue.h" namespace grpc { namespace internal { - -namespace { -using ::grpc_event_engine::experimental::EventEngine; -} // namespace - class AlarmImpl : public grpc::internal::CompletionQueueTag { public: - AlarmImpl() - : event_engine_(grpc_event_engine::experimental::GetDefaultEventEngine()), - cq_(nullptr), - tag_(nullptr) { + AlarmImpl() : cq_(nullptr), tag_(nullptr) { gpr_ref_init(&refs_, 1); + grpc_timer_init_unset(&timer_); } ~AlarmImpl() override {} bool FinalizeResult(void** tag, bool* /*status*/) override { @@ -64,46 +52,61 @@ class AlarmImpl : public grpc::internal::CompletionQueueTag { return true; } void Set(grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) { - grpc_core::MutexLock lock(&mu_); - GPR_ASSERT(!cq_timer_handle_.has_value() && - !callback_timer_handle_.has_value()); + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm"); cq_ = cq->cq(); tag_ = tag; GPR_ASSERT(grpc_cq_begin_op(cq_, this)); - Ref(); - cq_timer_handle_ = event_engine_->RunAfter( - grpc_core::Timestamp::FromTimespecRoundUp(deadline) - - grpc_core::ExecCtx::Get()->Now(), - [this] { OnCQAlarm(absl::OkStatus()); }); + GRPC_CLOSURE_INIT( + &on_alarm_, + [](void* arg, grpc_error_handle error) { + // queue the op on the completion queue + AlarmImpl* alarm = static_cast(arg); + alarm->Ref(); + // Preserve the cq and reset the cq_ so that the alarm + // can be reset when the alarm tag is delivered. + grpc_completion_queue* cq = alarm->cq_; + alarm->cq_ = nullptr; + grpc_cq_end_op( + cq, alarm, error, + [](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, arg, + &alarm->completion_); + GRPC_CQ_INTERNAL_UNREF(cq, "alarm"); + }, + this, grpc_schedule_on_exec_ctx); + grpc_timer_init(&timer_, + grpc_core::Timestamp::FromTimespecRoundUp(deadline), + &on_alarm_); } void Set(gpr_timespec deadline, std::function f) { - grpc_core::MutexLock lock(&mu_); - GPR_ASSERT(!cq_timer_handle_.has_value() && - !callback_timer_handle_.has_value()); + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; // Don't use any CQ at all. Instead just use the timer to fire the function callback_ = std::move(f); Ref(); - callback_timer_handle_ = event_engine_->RunAfter( - grpc_core::Timestamp::FromTimespecRoundUp(deadline) - - grpc_core::ExecCtx::Get()->Now(), - [this] { OnCallbackAlarm(true); }); + GRPC_CLOSURE_INIT( + &on_alarm_, + [](void* arg, grpc_error_handle error) { + grpc_core::Executor::Run(GRPC_CLOSURE_CREATE( + [](void* arg, grpc_error_handle error) { + AlarmImpl* alarm = + static_cast(arg); + alarm->callback_(error.ok()); + alarm->Unref(); + }, + arg, nullptr), + error); + }, + this, grpc_schedule_on_exec_ctx); + grpc_timer_init(&timer_, + grpc_core::Timestamp::FromTimespecRoundUp(deadline), + &on_alarm_); } void Cancel() { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; - grpc_core::MutexLock lock(&mu_); - if (callback_timer_handle_.has_value() && - event_engine_->Cancel(*callback_timer_handle_)) { - event_engine_->Run([this] { OnCallbackAlarm(/*is_ok=*/false); }); - callback_timer_handle_.reset(); - } else if (cq_timer_handle_.has_value() && - event_engine_->Cancel(*cq_timer_handle_)) { - event_engine_->Run( - [this] { OnCQAlarm(absl::CancelledError("cancelled")); }); - cq_timer_handle_.reset(); - } + grpc_timer_cancel(&timer_); } void Destroy() { Cancel(); @@ -111,35 +114,6 @@ class AlarmImpl : public grpc::internal::CompletionQueueTag { } private: - void OnCQAlarm(grpc_error_handle error) { - { - grpc_core::MutexLock lock(&mu_); - cq_timer_handle_.reset(); - } - grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; - grpc_core::ExecCtx exec_ctx; - // Preserve the cq and reset the cq_ so that the alarm - // can be reset when the alarm tag is delivered. - grpc_completion_queue* cq = cq_; - cq_ = nullptr; - grpc_cq_end_op( - cq, this, error, - [](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, nullptr, - &completion_); - GRPC_CQ_INTERNAL_UNREF(cq, "alarm"); - } - - void OnCallbackAlarm(bool is_ok) { - { - grpc_core::MutexLock lock(&mu_); - callback_timer_handle_.reset(); - } - grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; - grpc_core::ExecCtx exec_ctx; - callback_(is_ok); - Unref(); - } - void Ref() { gpr_ref(&refs_); } void Unref() { if (gpr_unref(&refs_)) { @@ -147,12 +121,9 @@ class AlarmImpl : public grpc::internal::CompletionQueueTag { } } - grpc_core::Mutex mu_; - std::shared_ptr event_engine_; - absl::optional cq_timer_handle_ ABSL_GUARDED_BY(mu_); - absl::optional callback_timer_handle_ - ABSL_GUARDED_BY(mu_); + grpc_timer timer_; gpr_refcount refs_; + grpc_closure on_alarm_; grpc_cq_completion completion_; // completion queue where events about this alarm will be posted grpc_completion_queue* cq_;