EventEngine::RunAt: Orca Service Timer (#30011)

* EventEngine::RunAt: Orca Service Timer

* optional changes to optionals

* Automated change: Fix sanity tests

* fix the build (the auto-fixer broke it)

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
pull/30017/head
AJ Heller 3 years ago committed by GitHub
parent 1be6e2c9eb
commit 6598793c9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 46
      src/cpp/server/orca/orca_service.cc

@ -6591,6 +6591,8 @@ grpc_cc_library(
],
visibility = ["@grpc:public"],
deps = [
"debug_location",
"default_event_engine_factory_hdrs",
"gpr",
"grpc++",
"grpc++_codegen_base",

@ -21,6 +21,7 @@
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "absl/types/optional.h"
#include "google/protobuf/duration.upb.h"
@ -29,6 +30,7 @@
#include "xds/data/orca/v3/orca_load_report.upb.h"
#include "xds/service/orca/v3/orca.upb.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>
#include <grpcpp/ext/orca_service.h>
#include <grpcpp/impl/codegen/server_callback_handlers.h>
@ -42,17 +44,19 @@
#include <grpcpp/support/slice.h>
#include <grpcpp/support/status.h>
#include "src/core/lib/event_engine/event_engine_factory.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/timer.h"
namespace grpc {
namespace experimental {
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
//
// OrcaService::Reactor
//
@ -62,7 +66,6 @@ class OrcaService::Reactor : public ServerWriteReactor<ByteBuffer>,
public:
explicit Reactor(OrcaService* service, const ByteBuffer* request_buffer)
: service_(service) {
GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr);
// Get slice from request.
Slice slice;
GPR_ASSERT(request_buffer->DumpToSingleSlice(&slice).ok());
@ -120,39 +123,34 @@ class OrcaService::Reactor : public ServerWriteReactor<ByteBuffer>,
void ScheduleTimer() {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
Ref().release(); // Ref held by timer.
grpc::internal::MutexLock lock(&timer_mu_);
timer_pending_ = true;
grpc_timer_init(&timer_, exec_ctx.Now() + report_interval_, &on_timer_);
timer_handle_ = GetDefaultEventEngine()->RunAt(
absl::Now() + absl::Milliseconds(report_interval_.millis()),
[self = Ref(DEBUG_LOCATION, "Orca Service")] { self->OnTimer(); });
}
void MaybeCancelTimer() {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc::internal::MutexLock lock(&timer_mu_);
if (timer_pending_) {
timer_pending_ = false;
grpc_timer_cancel(&timer_);
if (timer_handle_.has_value() &&
GetDefaultEventEngine()->Cancel(*timer_handle_)) {
timer_handle_.reset();
}
}
static void OnTimer(void* arg, grpc_error_handle error) {
grpc_core::RefCountedPtr<Reactor> self(static_cast<Reactor*>(arg));
grpc::internal::MutexLock lock(&self->timer_mu_);
if (GRPC_ERROR_IS_NONE(error) && self->timer_pending_) {
self->timer_pending_ = false;
self->SendResponse();
}
void OnTimer() {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc::internal::MutexLock lock(&timer_mu_);
timer_handle_.reset();
SendResponse();
}
OrcaService* service_;
// TODO(roth): Change this to use the EventEngine API once it becomes
// available.
grpc::internal::Mutex timer_mu_;
bool timer_pending_ ABSL_GUARDED_BY(&timer_mu_) = false;
grpc_timer timer_ ABSL_GUARDED_BY(&timer_mu_);
grpc_closure on_timer_;
absl::optional<EventEngine::TaskHandle> timer_handle_
ABSL_GUARDED_BY(&timer_mu_);
;
grpc_core::Duration report_interval_;
ByteBuffer response_;

Loading…
Cancel
Save