diff --git a/BUILD b/BUILD index 9ca52844eaa..92f1e3414ce 100644 --- a/BUILD +++ b/BUILD @@ -6389,11 +6389,9 @@ grpc_cc_library( hdrs = GRPCXX_HDRS, external_deps = [ "absl/base:core_headers", - "absl/memory", - "absl/status", "absl/strings", "absl/synchronization", - "absl/types:optional", + "absl/memory", "upb_lib", "protobuf_headers", ], @@ -6404,7 +6402,6 @@ grpc_cc_library( "arena", "channel_init", "config", - "default_event_engine_factory_hdrs", "gpr_base", "gpr_codegen", "grpc", @@ -6435,11 +6432,9 @@ grpc_cc_library( hdrs = GRPCXX_HDRS, external_deps = [ "absl/base:core_headers", - "absl/memory", - "absl/status", "absl/strings", "absl/synchronization", - "absl/types:optional", + "absl/memory", "upb_lib", "protobuf_headers", ], @@ -6451,7 +6446,6 @@ grpc_cc_library( "arena", "channel_init", "config", - "default_event_engine_factory_hdrs", "gpr_base", "gpr_codegen", "grpc++_codegen_base", diff --git a/src/cpp/common/alarm.cc b/src/cpp/common/alarm.cc index 9da0ae4af5c..2770c11bd13 100644 --- a/src/cpp/common/alarm.cc +++ b/src/cpp/common/alarm.cc @@ -20,10 +20,6 @@ #include #include -#include "absl/base/thread_annotations.h" -#include "absl/types/optional.h" - -#include #include #include #include @@ -33,23 +29,23 @@ #include #include -#include "src/core/lib/event_engine/event_engine_factory.h" -#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/time.h" +#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/surface/completion_queue.h" namespace grpc { namespace internal { - -using ::grpc_event_engine::experimental::EventEngine; -using ::grpc_event_engine::experimental::GetDefaultEventEngine; - class AlarmImpl : public grpc::internal::CompletionQueueTag { public: - AlarmImpl() : cq_(nullptr), tag_(nullptr) { gpr_ref_init(&refs_, 1); } + AlarmImpl() : cq_(nullptr), tag_(nullptr) { + gpr_ref_init(&refs_, 1); + grpc_timer_init_unset(&timer_); + } ~AlarmImpl() override {} bool FinalizeResult(void** tag, bool* /*status*/) override { *tag = tag_; @@ -57,46 +53,61 @@ class AlarmImpl : public grpc::internal::CompletionQueueTag { return true; } void Set(grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) { - grpc_core::MutexLock lock(&mu_); - GPR_ASSERT(!cq_timer_handle_.has_value() && - !callback_timer_handle_.has_value()); + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm"); cq_ = cq->cq(); tag_ = tag; GPR_ASSERT(grpc_cq_begin_op(cq_, this)); - Ref(); - cq_timer_handle_ = GetDefaultEventEngine()->RunAfter( - grpc_core::Timestamp::FromTimespecRoundUp(deadline) - - grpc_core::ExecCtx::Get()->Now(), - [this] { OnCQAlarm(GRPC_ERROR_NONE); }); + GRPC_CLOSURE_INIT( + &on_alarm_, + [](void* arg, grpc_error_handle error) { + // queue the op on the completion queue + AlarmImpl* alarm = static_cast(arg); + alarm->Ref(); + // Preserve the cq and reset the cq_ so that the alarm + // can be reset when the alarm tag is delivered. + grpc_completion_queue* cq = alarm->cq_; + alarm->cq_ = nullptr; + grpc_cq_end_op( + cq, alarm, error, + [](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, arg, + &alarm->completion_); + GRPC_CQ_INTERNAL_UNREF(cq, "alarm"); + }, + this, grpc_schedule_on_exec_ctx); + grpc_timer_init(&timer_, + grpc_core::Timestamp::FromTimespecRoundUp(deadline), + &on_alarm_); } void Set(gpr_timespec deadline, std::function f) { - grpc_core::MutexLock lock(&mu_); - GPR_ASSERT(!cq_timer_handle_.has_value() && - !callback_timer_handle_.has_value()); + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; // Don't use any CQ at all. Instead just use the timer to fire the function callback_ = std::move(f); Ref(); - callback_timer_handle_ = GetDefaultEventEngine()->RunAfter( - grpc_core::Timestamp::FromTimespecRoundUp(deadline) - - grpc_core::ExecCtx::Get()->Now(), - [this] { OnCallbackAlarm(true); }); + GRPC_CLOSURE_INIT( + &on_alarm_, + [](void* arg, grpc_error_handle error) { + grpc_core::Executor::Run( + GRPC_CLOSURE_CREATE( + [](void* arg, grpc_error_handle error) { + AlarmImpl* alarm = static_cast(arg); + alarm->callback_(GRPC_ERROR_IS_NONE(error)); + alarm->Unref(); + }, + arg, nullptr), + error); + }, + this, grpc_schedule_on_exec_ctx); + grpc_timer_init(&timer_, + grpc_core::Timestamp::FromTimespecRoundUp(deadline), + &on_alarm_); } void Cancel() { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; - grpc_core::MutexLock lock(&mu_); - if (callback_timer_handle_.has_value() && - GetDefaultEventEngine()->Cancel(*callback_timer_handle_)) { - GetDefaultEventEngine()->Run( - [this] { OnCallbackAlarm(/*is_ok=*/false); }); - callback_timer_handle_.reset(); - } else if (cq_timer_handle_.has_value() && - GetDefaultEventEngine()->Cancel(*cq_timer_handle_)) { - GetDefaultEventEngine()->Run([this] { OnCQAlarm(GRPC_ERROR_CANCELLED); }); - cq_timer_handle_.reset(); - } + grpc_timer_cancel(&timer_); } void Destroy() { Cancel(); @@ -104,35 +115,6 @@ class AlarmImpl : public grpc::internal::CompletionQueueTag { } private: - void OnCQAlarm(grpc_error_handle error) { - { - grpc_core::MutexLock lock(&mu_); - cq_timer_handle_.reset(); - } - grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; - grpc_core::ExecCtx exec_ctx; - // Preserve the cq and reset the cq_ so that the alarm - // can be reset when the alarm tag is delivered. - grpc_completion_queue* cq = cq_; - cq_ = nullptr; - grpc_cq_end_op( - cq, this, error, - [](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, nullptr, - &completion_); - GRPC_CQ_INTERNAL_UNREF(cq, "alarm"); - } - - void OnCallbackAlarm(bool is_ok) { - { - grpc_core::MutexLock lock(&mu_); - callback_timer_handle_.reset(); - } - grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; - grpc_core::ExecCtx exec_ctx; - callback_(is_ok); - Unref(); - } - void Ref() { gpr_ref(&refs_); } void Unref() { if (gpr_unref(&refs_)) { @@ -140,11 +122,9 @@ class AlarmImpl : public grpc::internal::CompletionQueueTag { } } - grpc_core::Mutex mu_; - absl::optional cq_timer_handle_ ABSL_GUARDED_BY(mu_); - absl::optional callback_timer_handle_ - ABSL_GUARDED_BY(mu_); + grpc_timer timer_; gpr_refcount refs_; + grpc_closure on_alarm_; grpc_cq_completion completion_; // completion queue where events about this alarm will be posted grpc_completion_queue* cq_;