[promises] Implementation of deadline for server-based-calls (#31656)

* [promises] Implementation of deadline for server-based-calls

* Automated change: Fix sanity tests

* reset deadline on completion

* names

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/31899/head
Craig Tiller 2 years ago committed by GitHub
parent e85379bc86
commit efdae293fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      src/core/ext/filters/deadline/deadline_filter.cc
  2. 43
      src/core/lib/surface/call.cc
  3. 3
      src/core/lib/surface/call.h

@ -35,6 +35,7 @@
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/metadata_batch.h"
@ -360,7 +361,16 @@ const grpc_channel_filter grpc_client_deadline_filter = {
const grpc_channel_filter grpc_server_deadline_filter = {
deadline_server_start_transport_stream_op_batch,
nullptr,
[](grpc_channel_element*, grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory) {
auto deadline = call_args.client_initial_metadata->get(
grpc_core::GrpcTimeoutMetadata());
if (deadline.has_value()) {
grpc_core::GetContext<grpc_core::CallContext>()->UpdateDeadline(
*deadline);
}
return next_promise_factory(std::move(call_args));
},
grpc_channel_next_op,
sizeof(server_call_data),
deadline_init_call_elem,

@ -1819,7 +1819,11 @@ bool ValidateMetadata(size_t count, grpc_metadata* metadata) {
// PromiseBasedCall
// Will be folded into Call once the promise conversion is done
class PromiseBasedCall : public Call, public Activity, public Wakeable {
class PromiseBasedCall : public Call,
public Activity,
public Wakeable,
public grpc_event_engine::experimental::EventEngine::
Closure /* for deadlines */ {
public:
PromiseBasedCall(Arena* arena, const grpc_call_create_args& args);
@ -1946,6 +1950,11 @@ class PromiseBasedCall : public Call, public Activity, public Wakeable {
// for that functionality be invented)
grpc_call_stack* call_stack() override { return nullptr; }
void UpdateDeadline(Timestamp deadline);
void ResetDeadline();
// Implementation of EventEngine::Closure, called when deadline expires
void Run() override;
protected:
class ScopedContext
: public ScopedActivity,
@ -2186,6 +2195,9 @@ class PromiseBasedCall : public Call, public Activity, public Wakeable {
CompletionInfo completion_info_[6];
grpc_call_stats final_stats_{};
CallFinalization finalization_;
// Current deadline.
Timestamp deadline_ = Timestamp::InfFuture();
grpc_event_engine::experimental::EventEngine::TaskHandle deadline_task_;
};
template <typename T>
@ -2352,6 +2364,30 @@ void PromiseBasedCall::SetCompletionQueue(grpc_completion_queue* cq) {
grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
}
void PromiseBasedCall::UpdateDeadline(Timestamp deadline) {
if (deadline >= deadline_) return;
auto* const event_engine = channel()->event_engine();
if (deadline_ != Timestamp::InfFuture()) {
if (!event_engine->Cancel(deadline_task_)) return;
} else {
InternalRef("deadline");
}
event_engine->RunAfter(deadline - Timestamp::Now(), this);
}
void PromiseBasedCall::ResetDeadline() {
if (deadline_ == Timestamp::InfFuture()) return;
auto* const event_engine = channel()->event_engine();
if (!event_engine->Cancel(deadline_task_)) return;
deadline_ = Timestamp::InfFuture();
InternalUnref("deadline");
}
void PromiseBasedCall::Run() {
CancelWithError(absl::DeadlineExceededError("Deadline exceeded"));
InternalUnref("deadline");
}
///////////////////////////////////////////////////////////////////////////////
// CallContext
@ -2365,6 +2401,10 @@ void CallContext::IncrementRefCount(const char* reason) {
void CallContext::Unref(const char* reason) { call_->InternalUnref(reason); }
void CallContext::UpdateDeadline(Timestamp deadline) {
call_->UpdateDeadline(deadline);
}
///////////////////////////////////////////////////////////////////////////////
// ClientPromiseBasedCall
@ -2757,6 +2797,7 @@ void ClientPromiseBasedCall::Finish(ServerMetadataHandle trailing_metadata) {
trailing_metadata->DebugString().c_str());
}
promise_ = ArenaPromise<ServerMetadataHandle>();
ResetDeadline();
completed_ = true;
if (recv_initial_metadata_ != nullptr) {
ForceImmediateRepoll();

@ -80,6 +80,9 @@ class CallContext {
public:
explicit CallContext(PromiseBasedCall* call) : call_(call) {}
// Update the deadline (if deadline < the current deadline).
void UpdateDeadline(Timestamp deadline);
// Run some action in the call activity context. This is needed to adapt some
// legacy systems to promises, and will likely disappear once that conversion
// is complete.

Loading…
Cancel
Save