From efdae293fb198cfad5d0912ebd5a34a75b5f5e6b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 15 Dec 2022 09:29:13 -0800 Subject: [PATCH] [promises] Implementation of deadline for server-based-calls (#31656) * [promises] Implementation of deadline for server-based-calls * Automated change: Fix sanity tests * reset deadline on completion * names Co-authored-by: ctiller --- .../ext/filters/deadline/deadline_filter.cc | 12 +++++- src/core/lib/surface/call.cc | 43 ++++++++++++++++++- src/core/lib/surface/call.h | 3 ++ 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc index 4ba697c4fc5..41848a7c292 100644 --- a/src/core/ext/filters/deadline/deadline_filter.cc +++ b/src/core/ext/filters/deadline/deadline_filter.cc @@ -35,6 +35,7 @@ #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/surface/channel_stack_type.h" #include "src/core/lib/transport/metadata_batch.h" @@ -360,7 +361,16 @@ const grpc_channel_filter grpc_client_deadline_filter = { const grpc_channel_filter grpc_server_deadline_filter = { deadline_server_start_transport_stream_op_batch, - nullptr, + [](grpc_channel_element*, grpc_core::CallArgs call_args, + grpc_core::NextPromiseFactory next_promise_factory) { + auto deadline = call_args.client_initial_metadata->get( + grpc_core::GrpcTimeoutMetadata()); + if (deadline.has_value()) { + grpc_core::GetContext()->UpdateDeadline( + *deadline); + } + return next_promise_factory(std::move(call_args)); + }, grpc_channel_next_op, sizeof(server_call_data), deadline_init_call_elem, diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 7785965d7af..1452bc8f7ae 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -1819,7 +1819,11 @@ bool ValidateMetadata(size_t count, grpc_metadata* metadata) { // PromiseBasedCall // Will be folded into Call once the promise conversion is done -class PromiseBasedCall : public Call, public Activity, public Wakeable { +class PromiseBasedCall : public Call, + public Activity, + public Wakeable, + public grpc_event_engine::experimental::EventEngine:: + Closure /* for deadlines */ { public: PromiseBasedCall(Arena* arena, const grpc_call_create_args& args); @@ -1946,6 +1950,11 @@ class PromiseBasedCall : public Call, public Activity, public Wakeable { // for that functionality be invented) grpc_call_stack* call_stack() override { return nullptr; } + void UpdateDeadline(Timestamp deadline); + void ResetDeadline(); + // Implementation of EventEngine::Closure, called when deadline expires + void Run() override; + protected: class ScopedContext : public ScopedActivity, @@ -2186,6 +2195,9 @@ class PromiseBasedCall : public Call, public Activity, public Wakeable { CompletionInfo completion_info_[6]; grpc_call_stats final_stats_{}; CallFinalization finalization_; + // Current deadline. + Timestamp deadline_ = Timestamp::InfFuture(); + grpc_event_engine::experimental::EventEngine::TaskHandle deadline_task_; }; template @@ -2352,6 +2364,30 @@ void PromiseBasedCall::SetCompletionQueue(grpc_completion_queue* cq) { grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)); } +void PromiseBasedCall::UpdateDeadline(Timestamp deadline) { + if (deadline >= deadline_) return; + auto* const event_engine = channel()->event_engine(); + if (deadline_ != Timestamp::InfFuture()) { + if (!event_engine->Cancel(deadline_task_)) return; + } else { + InternalRef("deadline"); + } + event_engine->RunAfter(deadline - Timestamp::Now(), this); +} + +void PromiseBasedCall::ResetDeadline() { + if (deadline_ == Timestamp::InfFuture()) return; + auto* const event_engine = channel()->event_engine(); + if (!event_engine->Cancel(deadline_task_)) return; + deadline_ = Timestamp::InfFuture(); + InternalUnref("deadline"); +} + +void PromiseBasedCall::Run() { + CancelWithError(absl::DeadlineExceededError("Deadline exceeded")); + InternalUnref("deadline"); +} + /////////////////////////////////////////////////////////////////////////////// // CallContext @@ -2365,6 +2401,10 @@ void CallContext::IncrementRefCount(const char* reason) { void CallContext::Unref(const char* reason) { call_->InternalUnref(reason); } +void CallContext::UpdateDeadline(Timestamp deadline) { + call_->UpdateDeadline(deadline); +} + /////////////////////////////////////////////////////////////////////////////// // ClientPromiseBasedCall @@ -2757,6 +2797,7 @@ void ClientPromiseBasedCall::Finish(ServerMetadataHandle trailing_metadata) { trailing_metadata->DebugString().c_str()); } promise_ = ArenaPromise(); + ResetDeadline(); completed_ = true; if (recv_initial_metadata_ != nullptr) { ForceImmediateRepoll(); diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index 432da9f2a88..dd291f624a6 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -80,6 +80,9 @@ class CallContext { public: explicit CallContext(PromiseBasedCall* call) : call_(call) {} + // Update the deadline (if deadline < the current deadline). + void UpdateDeadline(Timestamp deadline); + // Run some action in the call activity context. This is needed to adapt some // legacy systems to promises, and will likely disappear once that conversion // is complete.