diff --git a/BUILD b/BUILD index 3d105ffd689..d0a6c546f2b 100644 --- a/BUILD +++ b/BUILD @@ -6591,6 +6591,8 @@ grpc_cc_library( ], visibility = ["@grpc:public"], deps = [ + "debug_location", + "default_event_engine_factory_hdrs", "gpr", "grpc++", "grpc++_codegen_base", diff --git a/src/cpp/server/orca/orca_service.cc b/src/cpp/server/orca/orca_service.cc index 290241c5446..22c402d1123 100644 --- a/src/cpp/server/orca/orca_service.cc +++ b/src/cpp/server/orca/orca_service.cc @@ -21,6 +21,7 @@ #include #include "absl/base/thread_annotations.h" +#include "absl/time/clock.h" #include "absl/time/time.h" #include "absl/types/optional.h" #include "google/protobuf/duration.upb.h" @@ -29,6 +30,7 @@ #include "xds/data/orca/v3/orca_load_report.upb.h" #include "xds/service/orca/v3/orca.upb.h" +#include #include #include #include @@ -42,17 +44,19 @@ #include #include +#include "src/core/lib/event_engine/event_engine_factory.h" +#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/time.h" -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/iomgr/timer.h" namespace grpc { namespace experimental { +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::GetDefaultEventEngine; + // // OrcaService::Reactor // @@ -62,7 +66,6 @@ class OrcaService::Reactor : public ServerWriteReactor, public: explicit Reactor(OrcaService* service, const ByteBuffer* request_buffer) : service_(service) { - GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr); // Get slice from request. Slice slice; GPR_ASSERT(request_buffer->DumpToSingleSlice(&slice).ok()); @@ -120,39 +123,34 @@ class OrcaService::Reactor : public ServerWriteReactor, void ScheduleTimer() { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; - Ref().release(); // Ref held by timer. grpc::internal::MutexLock lock(&timer_mu_); - timer_pending_ = true; - grpc_timer_init(&timer_, exec_ctx.Now() + report_interval_, &on_timer_); + timer_handle_ = GetDefaultEventEngine()->RunAt( + absl::Now() + absl::Milliseconds(report_interval_.millis()), + [self = Ref(DEBUG_LOCATION, "Orca Service")] { self->OnTimer(); }); } void MaybeCancelTimer() { - grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; - grpc_core::ExecCtx exec_ctx; grpc::internal::MutexLock lock(&timer_mu_); - if (timer_pending_) { - timer_pending_ = false; - grpc_timer_cancel(&timer_); + if (timer_handle_.has_value() && + GetDefaultEventEngine()->Cancel(*timer_handle_)) { + timer_handle_.reset(); } } - static void OnTimer(void* arg, grpc_error_handle error) { - grpc_core::RefCountedPtr self(static_cast(arg)); - grpc::internal::MutexLock lock(&self->timer_mu_); - if (GRPC_ERROR_IS_NONE(error) && self->timer_pending_) { - self->timer_pending_ = false; - self->SendResponse(); - } + void OnTimer() { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + grpc_core::ExecCtx exec_ctx; + grpc::internal::MutexLock lock(&timer_mu_); + timer_handle_.reset(); + SendResponse(); } OrcaService* service_; - // TODO(roth): Change this to use the EventEngine API once it becomes - // available. grpc::internal::Mutex timer_mu_; - bool timer_pending_ ABSL_GUARDED_BY(&timer_mu_) = false; - grpc_timer timer_ ABSL_GUARDED_BY(&timer_mu_); - grpc_closure on_timer_; + absl::optional timer_handle_ + ABSL_GUARDED_BY(&timer_mu_); + ; grpc_core::Duration report_interval_; ByteBuffer response_;