From 97631cf34cd152f68cd1d6264e74dd68cf5ce4b4 Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Tue, 21 Sep 2021 09:17:14 -0700 Subject: [PATCH] EventEngine::Closure (#27395) This introduces the new `Closure` type, and new cancellation semantics. I've also fixed a few bugs in the EventEngine iomgr implementation that had gone unnoticed. --- include/grpc/event_engine/event_engine.h | 106 ++++++++++++++------ src/core/lib/iomgr/event_engine/closure.cc | 59 +++++++---- src/core/lib/iomgr/event_engine/closure.h | 11 +- src/core/lib/iomgr/event_engine/resolver.cc | 12 ++- src/core/lib/iomgr/event_engine/tcp.cc | 9 +- src/core/lib/iomgr/event_engine/timer.cc | 9 +- src/core/lib/iomgr/exec_ctx.cc | 4 +- 7 files changed, 146 insertions(+), 64 deletions(-) diff --git a/include/grpc/event_engine/event_engine.h b/include/grpc/event_engine/event_engine.h index 61e3ecbaa6c..a2dff9f8af1 100644 --- a/include/grpc/event_engine/event_engine.h +++ b/include/grpc/event_engine/event_engine.h @@ -73,12 +73,27 @@ namespace experimental { //////////////////////////////////////////////////////////////////////////////// class EventEngine { public: - /// Basic callable function. The first argument to all callbacks is an - /// absl::Status indicating the status of the operation associated with this - /// callback. Each EventEngine method that takes a callback parameter, defines - /// the expected sets and meanings of statuses for that use case. - using Callback = std::function; - /// Callback handle, used to cancel a callback. + /// A custom closure type for EventEngine task execution. + /// + /// Throughout the EventEngine API, \a Closure ownership is retained by the + /// caller - the EventEngine will never delete a Closure, and upon + /// cancellation, the EventEngine will simply forget the Closure exists. The + /// caller is responsible for all necessary cleanup. + class Closure { + public: + Closure() = default; + // Closure's are an interface, and thus non-copyable. + Closure(const Closure&) = delete; + Closure& operator=(const Closure&) = delete; + // Polymorphic type => virtual destructor + virtual ~Closure() = default; + // Run the contained code. + virtual void Run() = 0; + }; + /// Represents a scheduled task. + /// + /// \a TaskHandles are returned by \a Run* methods, and can be given to the + /// \a Cancel method. struct TaskHandle { intptr_t keys[2]; }; @@ -134,7 +149,8 @@ class EventEngine { /// For failed read operations, implementations should pass the appropriate /// statuses to \a on_read. For example, callbacks might expect to receive /// CANCELLED on endpoint shutdown. - virtual void Read(Callback on_read, SliceBuffer* buffer) = 0; + virtual void Read(std::function on_read, + SliceBuffer* buffer) = 0; /// Writes data out on the connection. /// /// \a on_writable is called when the connection is ready for more data. The @@ -153,7 +169,8 @@ class EventEngine { /// For failed write operations, implementations should pass the appropriate /// statuses to \a on_writable. For example, callbacks might expect to /// receive CANCELLED on endpoint shutdown. - virtual void Write(Callback on_writable, SliceBuffer* data) = 0; + virtual void Write(std::function on_writable, + SliceBuffer* data) = 0; /// Returns an address in the format described in DNSResolver. The returned /// values are expected to remain valid for the life of the Endpoint. virtual const ResolvedAddress& GetPeerAddress() const = 0; @@ -201,7 +218,8 @@ class EventEngine { /// The provided \a SliceAllocatorFactory is used to create \a SliceAllocators /// for Endpoint construction. virtual absl::StatusOr> CreateListener( - Listener::AcceptCallback on_accept, Callback on_shutdown, + Listener::AcceptCallback on_accept, + std::function on_shutdown, const EndpointConfig& config, std::unique_ptr slice_allocator_factory) = 0; /// Creates a client network connection to a remote network listener. @@ -257,8 +275,10 @@ class EventEngine { /// When the lookup is complete, the \a on_resolve callback will be invoked /// with a status indicating the success or failure of the lookup. /// Implementations should pass the appropriate statuses to the callback. - /// For example, callbacks might expect to receive DEADLINE_EXCEEDED when - /// the deadline is exceeded or CANCELLED if the lookup was cancelled. + /// For example, callbacks might expect to receive DEADLINE_EXCEEDED or + /// NOT_FOUND. + /// + /// If cancelled, \a on_resolve will not be executed. virtual LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve, absl::string_view address, absl::string_view default_port, @@ -278,7 +298,11 @@ class EventEngine { absl::string_view name, absl::Time deadline) = 0; /// Cancel an asynchronous lookup operation. - virtual void TryCancelLookup(LookupTaskHandle handle) = 0; + /// + /// This shares the same semantics with \a EventEngine::Cancel: successfully + /// cancelled lookups will not have their callbacks executed, and this + /// method returns true. + virtual bool CancelLookup(LookupTaskHandle handle) = 0; }; /// At time of destruction, the EventEngine must have no active @@ -297,32 +321,48 @@ class EventEngine { /// Creates and returns an instance of a DNSResolver. virtual std::unique_ptr GetDNSResolver() = 0; - /// Executes a callback as soon as possible. + /// Asynchronously executes a task as soon as possible. /// - /// The \a fn callback's \a status argument is used to indicate whether it was - /// executed normally. For example, the status may be CANCELLED if the - /// EventEngine is being shut down. \a fn is guaranteed to be called exactly - /// once. - virtual void Run(Callback fn) = 0; + /// \a Closures scheduled with \a Run cannot be cancelled. The \a closure will + /// not be deleted after it has been run, ownership remains with the caller. + virtual void Run(Closure* closure) = 0; + /// Asynchronously executes a task as soon as possible. + /// + /// \a Closures scheduled with \a Run cannot be cancelled. Unlike the + /// overloaded \a Closure alternative, the std::function version's \a closure + /// will be deleted by the EventEngine after the closure has been run. + /// + /// This version of \a Run may be less performant than the \a Closure version + /// in some scenarios. This overload is useful in situations where performance + /// is not a critical concern. + virtual void Run(std::function closure) = 0; /// Synonymous with scheduling an alarm to run at time \a when. /// - /// The callback \a fn will execute when either when time \a when arrives - /// (receiving status OK), or when the \a fn is cancelled (receiving status - /// CANCELLED). The callback is guaranteed to be called exactly once. - virtual TaskHandle RunAt(absl::Time when, Callback fn) = 0; - /// Attempts to cancel a callback. - /// Note that this is a "best effort" cancellation. No guarantee is made that - /// the callback will be cancelled, the call could be in any stage. + /// The \a closure will execute when time \a when arrives unless it has been + /// cancelled via the \a Cancel method. If cancelled, the closure will not be + /// run, nor will it be deleted. Ownership remains with the caller. + virtual TaskHandle RunAt(absl::Time when, Closure* closure) = 0; + /// Synonymous with scheduling an alarm to run at time \a when. + /// + /// The \a closure will execute when time \a when arrives unless it has been + /// cancelled via the \a Cancel method. If cancelled, the closure will not be + /// run. Unilke the overloaded \a Closure alternative, the std::function + /// version's \a closure will be deleted by the EventEngine after the closure + /// has been run, or upon cancellation. + /// + /// This version of \a RunAt may be less performant than the \a Closure + /// version in some scenarios. This overload is useful in situations where + /// performance is not a critical concern. + virtual TaskHandle RunAt(absl::Time when, std::function closure) = 0; + /// Request cancellation of a task. /// - /// There are three scenarios in which we may cancel a scheduled task: - /// 1. We cancel the execution before it has run. - /// 2. The callback has already run. - /// 3. We can't cancel it because it is "in flight". + /// If the associated closure has already been scheduled to run, it will not + /// be cancelled, and this function will return false. /// - /// In all cases, the cancellation is still considered successful, the - /// callback will be run exactly once from either cancellation or from its - /// activation. - virtual void TryCancel(TaskHandle handle) = 0; + /// If the associated callback has not been scheduled to run, it will be + /// cancelled, and the associated std::function or \a Closure* will not be + /// executed. In this case, Cancel will return true. + virtual bool Cancel(TaskHandle handle) = 0; }; // TODO(hork): finalize the API and document it. We need to firm up the story diff --git a/src/core/lib/iomgr/event_engine/closure.cc b/src/core/lib/iomgr/event_engine/closure.cc index d9d0394d177..e2afe8ab253 100644 --- a/src/core/lib/iomgr/event_engine/closure.cc +++ b/src/core/lib/iomgr/event_engine/closure.cc @@ -17,38 +17,61 @@ #include #include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/event_engine/closure.h" #include "src/core/lib/iomgr/event_engine/pollset.h" #include "src/core/lib/transport/error_utils.h" namespace grpc_event_engine { namespace experimental { -EventEngine::Callback GrpcClosureToCallback(grpc_closure* closure, - grpc_error_handle error) { - return [closure, error](absl::Status status) { - grpc_error_handle new_error = - grpc_error_add_child(error, absl_status_to_grpc_error(status)); +namespace { + +void RunClosure(grpc_closure* closure, grpc_error_handle error) { + GPR_ASSERT(closure != nullptr); #ifndef NDEBUG - closure->scheduled = false; - if (grpc_trace_closure.enabled()) { - gpr_log(GPR_DEBUG, - "EventEngine: running closure %p: created [%s:%d]: %s [%s:%d]", - closure, closure->file_created, closure->line_created, - closure->run ? "run" : "scheduled", closure->file_initiated, - closure->line_initiated); - } + closure->scheduled = false; + if (grpc_trace_closure.enabled()) { + gpr_log(GPR_DEBUG, + "EventEngine: running closure %p: created [%s:%d]: %s [%s:%d]", + closure, closure->file_created, closure->line_created, + closure->run ? "run" : "scheduled", closure->file_initiated, + closure->line_initiated); + } #endif - closure->cb(closure->cb_arg, new_error); + closure->cb(closure->cb_arg, error); #ifndef NDEBUG - if (grpc_trace_closure.enabled()) { - gpr_log(GPR_DEBUG, "EventEngine: closure %p finished", closure); - } + if (grpc_trace_closure.enabled()) { + gpr_log(GPR_DEBUG, "EventEngine: closure %p finished", closure); + } #endif - GRPC_ERROR_UNREF(error); +} + +} // namespace + +std::function GrpcClosureToStatusCallback( + grpc_closure* closure) { + return [closure](absl::Status status) { + RunClosure(closure, absl_status_to_grpc_error(status)); + grpc_pollset_ee_broadcast_event(); + }; +} + +std::function GrpcClosureToCallback(grpc_closure* closure) { + return [closure]() { + RunClosure(closure, GRPC_ERROR_NONE); + grpc_pollset_ee_broadcast_event(); + }; +} + +std::function GrpcClosureToCallback(grpc_closure* closure, + grpc_error_handle error) { + return [closure, error]() { + RunClosure(closure, error); grpc_pollset_ee_broadcast_event(); }; } } // namespace experimental } // namespace grpc_event_engine + #endif // GRPC_USE_EVENT_ENGINE diff --git a/src/core/lib/iomgr/event_engine/closure.h b/src/core/lib/iomgr/event_engine/closure.h index f1b4f7617cf..ecb332bf96b 100644 --- a/src/core/lib/iomgr/event_engine/closure.h +++ b/src/core/lib/iomgr/event_engine/closure.h @@ -24,7 +24,16 @@ namespace grpc_event_engine { namespace experimental { -EventEngine::Callback GrpcClosureToCallback(grpc_closure* closure, +/// Creates a callback that takes an error status argument. +std::function GrpcClosureToStatusCallback( + grpc_closure* closure); + +/// Create a callback that *does not* take an error status argument. +std::function GrpcClosureToCallback(grpc_closure* closure); + +/// Creates a callback that *does not* take an error status argument. +/// This version has a pre-bound error. +std::function GrpcClosureToCallback(grpc_closure* closure, grpc_error_handle error); } // namespace experimental diff --git a/src/core/lib/iomgr/event_engine/resolver.cc b/src/core/lib/iomgr/event_engine/resolver.cc index 39508df74db..628c305e106 100644 --- a/src/core/lib/iomgr/event_engine/resolver.cc +++ b/src/core/lib/iomgr/event_engine/resolver.cc @@ -79,13 +79,15 @@ void resolve_address(const char* addr, const char* default_port, grpc_pollset_set* /* interested_parties */, grpc_closure* on_done, grpc_resolved_addresses** addresses) { - auto dns_resolver = grpc_iomgr_event_engine()->GetDNSResolver(); - if (!dns_resolver.ok()) { - grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, - absl_status_to_grpc_error(dns_resolver.status())); + std::unique_ptr dns_resolver = + grpc_iomgr_event_engine()->GetDNSResolver(); + if (dns_resolver == nullptr) { + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, on_done, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to get DNS Resolver.")); return; } - new DnsRequest(std::move(*dns_resolver), addr, default_port, on_done, + new DnsRequest(std::move(dns_resolver), addr, default_port, on_done, addresses); } diff --git a/src/core/lib/iomgr/event_engine/tcp.cc b/src/core/lib/iomgr/event_engine/tcp.cc index d85503cb275..04f6216aea9 100644 --- a/src/core/lib/iomgr/event_engine/tcp.cc +++ b/src/core/lib/iomgr/event_engine/tcp.cc @@ -35,7 +35,7 @@ extern grpc_core::TraceFlag grpc_tcp_trace; namespace { using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig; using ::grpc_event_engine::experimental::EventEngine; -using ::grpc_event_engine::experimental::GrpcClosureToCallback; +using ::grpc_event_engine::experimental::GrpcClosureToStatusCallback; using ::grpc_event_engine::experimental::SliceAllocator; using ::grpc_event_engine::experimental::SliceAllocatorFactory; using ::grpc_event_engine::experimental::SliceBuffer; @@ -175,7 +175,8 @@ grpc_error_handle tcp_server_create( EventEngine* event_engine = grpc_iomgr_event_engine(); absl::StatusOr> listener = event_engine->CreateListener( - [server](std::unique_ptr ee_endpoint) { + [server](std::unique_ptr ee_endpoint, + const SliceAllocator& /*slice_allocator*/) { grpc_core::ExecCtx exec_ctx; GPR_ASSERT((*server)->on_accept_internal != nullptr); grpc_event_engine_endpoint* iomgr_endpoint = @@ -191,8 +192,8 @@ grpc_error_handle tcp_server_create( exec_ctx.Flush(); grpc_pollset_ee_broadcast_event(); }, - GrpcClosureToCallback(shutdown_complete, GRPC_ERROR_NONE), - endpoint_config, std::move(ee_slice_allocator_factory)); + GrpcClosureToStatusCallback(shutdown_complete), endpoint_config, + std::move(ee_slice_allocator_factory)); if (!listener.ok()) { return absl_status_to_grpc_error(listener.status()); } diff --git a/src/core/lib/iomgr/event_engine/timer.cc b/src/core/lib/iomgr/event_engine/timer.cc index b0a0047b530..bb7dbfb9ba4 100644 --- a/src/core/lib/iomgr/event_engine/timer.cc +++ b/src/core/lib/iomgr/event_engine/timer.cc @@ -19,6 +19,7 @@ #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/event_engine/closure.h" #include "src/core/lib/iomgr/event_engine/iomgr.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/surface/init.h" #include "src/core/lib/transport/error_utils.h" @@ -32,12 +33,16 @@ void timer_init(grpc_timer* timer, grpc_millis deadline, timer->ee_task_handle = grpc_iomgr_event_engine()->RunAt( grpc_core::ToAbslTime( grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME)), - GrpcClosureToCallback(closure, GRPC_ERROR_NONE), {}); + GrpcClosureToCallback(closure)); + timer->closure = closure; } void timer_cancel(grpc_timer* timer) { auto handle = timer->ee_task_handle; - grpc_iomgr_event_engine()->TryCancel(handle); + if (!grpc_iomgr_event_engine()->Cancel(handle)) { + grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, + GRPC_ERROR_CANCELLED); + } } /* Internal API */ diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc index b551106a117..f6dad6dcac3 100644 --- a/src/core/lib/iomgr/exec_ctx.cc +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -26,6 +26,7 @@ #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/combiner.h" +#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/event_engine/closure.h" #include "src/core/lib/iomgr/event_engine/iomgr.h" #include "src/core/lib/profiling/timers.h" @@ -52,7 +53,8 @@ static void exec_ctx_run(grpc_closure* closure, grpc_error_handle error) { static void exec_ctx_sched(grpc_closure* closure, grpc_error_handle error) { #if defined(GRPC_USE_EVENT_ENGINE) && \ defined(GRPC_EVENT_ENGINE_REPLACE_EXEC_CTX) - grpc_iomgr_event_engine()->Run(GrpcClosureToCallback(closure, error), {}); + grpc_iomgr_event_engine()->Run( + grpc_event_engine::experimental::GrpcClosureToCallback(closure, error)); #else grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure, error);