EventEngine::Closure (#27395)

This introduces the new `Closure` type, and new cancellation semantics.

I've also fixed a few bugs in the EventEngine iomgr implementation that
had gone unnoticed.
pull/27429/head
AJ Heller 3 years ago committed by GitHub
parent b669a3c521
commit 97631cf34c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 106
      include/grpc/event_engine/event_engine.h
  2. 59
      src/core/lib/iomgr/event_engine/closure.cc
  3. 11
      src/core/lib/iomgr/event_engine/closure.h
  4. 12
      src/core/lib/iomgr/event_engine/resolver.cc
  5. 9
      src/core/lib/iomgr/event_engine/tcp.cc
  6. 9
      src/core/lib/iomgr/event_engine/timer.cc
  7. 4
      src/core/lib/iomgr/exec_ctx.cc

@ -73,12 +73,27 @@ namespace experimental {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
class EventEngine { class EventEngine {
public: public:
/// Basic callable function. The first argument to all callbacks is an /// A custom closure type for EventEngine task execution.
/// absl::Status indicating the status of the operation associated with this ///
/// callback. Each EventEngine method that takes a callback parameter, defines /// Throughout the EventEngine API, \a Closure ownership is retained by the
/// the expected sets and meanings of statuses for that use case. /// caller - the EventEngine will never delete a Closure, and upon
using Callback = std::function<void(absl::Status)>; /// cancellation, the EventEngine will simply forget the Closure exists. The
/// Callback handle, used to cancel a callback. /// caller is responsible for all necessary cleanup.
class Closure {
public:
Closure() = default;
// Closure's are an interface, and thus non-copyable.
Closure(const Closure&) = delete;
Closure& operator=(const Closure&) = delete;
// Polymorphic type => virtual destructor
virtual ~Closure() = default;
// Run the contained code.
virtual void Run() = 0;
};
/// Represents a scheduled task.
///
/// \a TaskHandles are returned by \a Run* methods, and can be given to the
/// \a Cancel method.
struct TaskHandle { struct TaskHandle {
intptr_t keys[2]; intptr_t keys[2];
}; };
@ -134,7 +149,8 @@ class EventEngine {
/// For failed read operations, implementations should pass the appropriate /// For failed read operations, implementations should pass the appropriate
/// statuses to \a on_read. For example, callbacks might expect to receive /// statuses to \a on_read. For example, callbacks might expect to receive
/// CANCELLED on endpoint shutdown. /// CANCELLED on endpoint shutdown.
virtual void Read(Callback on_read, SliceBuffer* buffer) = 0; virtual void Read(std::function<void(absl::Status)> on_read,
SliceBuffer* buffer) = 0;
/// Writes data out on the connection. /// Writes data out on the connection.
/// ///
/// \a on_writable is called when the connection is ready for more data. The /// \a on_writable is called when the connection is ready for more data. The
@ -153,7 +169,8 @@ class EventEngine {
/// For failed write operations, implementations should pass the appropriate /// For failed write operations, implementations should pass the appropriate
/// statuses to \a on_writable. For example, callbacks might expect to /// statuses to \a on_writable. For example, callbacks might expect to
/// receive CANCELLED on endpoint shutdown. /// receive CANCELLED on endpoint shutdown.
virtual void Write(Callback on_writable, SliceBuffer* data) = 0; virtual void Write(std::function<void(absl::Status)> on_writable,
SliceBuffer* data) = 0;
/// Returns an address in the format described in DNSResolver. The returned /// Returns an address in the format described in DNSResolver. The returned
/// values are expected to remain valid for the life of the Endpoint. /// values are expected to remain valid for the life of the Endpoint.
virtual const ResolvedAddress& GetPeerAddress() const = 0; virtual const ResolvedAddress& GetPeerAddress() const = 0;
@ -201,7 +218,8 @@ class EventEngine {
/// The provided \a SliceAllocatorFactory is used to create \a SliceAllocators /// The provided \a SliceAllocatorFactory is used to create \a SliceAllocators
/// for Endpoint construction. /// for Endpoint construction.
virtual absl::StatusOr<std::unique_ptr<Listener>> CreateListener( virtual absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback on_accept, Callback on_shutdown, Listener::AcceptCallback on_accept,
std::function<void(absl::Status)> on_shutdown,
const EndpointConfig& config, const EndpointConfig& config,
std::unique_ptr<SliceAllocatorFactory> slice_allocator_factory) = 0; std::unique_ptr<SliceAllocatorFactory> slice_allocator_factory) = 0;
/// Creates a client network connection to a remote network listener. /// Creates a client network connection to a remote network listener.
@ -257,8 +275,10 @@ class EventEngine {
/// When the lookup is complete, the \a on_resolve callback will be invoked /// When the lookup is complete, the \a on_resolve callback will be invoked
/// with a status indicating the success or failure of the lookup. /// with a status indicating the success or failure of the lookup.
/// Implementations should pass the appropriate statuses to the callback. /// Implementations should pass the appropriate statuses to the callback.
/// For example, callbacks might expect to receive DEADLINE_EXCEEDED when /// For example, callbacks might expect to receive DEADLINE_EXCEEDED or
/// the deadline is exceeded or CANCELLED if the lookup was cancelled. /// NOT_FOUND.
///
/// If cancelled, \a on_resolve will not be executed.
virtual LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve, virtual LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve,
absl::string_view address, absl::string_view address,
absl::string_view default_port, absl::string_view default_port,
@ -278,7 +298,11 @@ class EventEngine {
absl::string_view name, absl::string_view name,
absl::Time deadline) = 0; absl::Time deadline) = 0;
/// Cancel an asynchronous lookup operation. /// Cancel an asynchronous lookup operation.
virtual void TryCancelLookup(LookupTaskHandle handle) = 0; ///
/// This shares the same semantics with \a EventEngine::Cancel: successfully
/// cancelled lookups will not have their callbacks executed, and this
/// method returns true.
virtual bool CancelLookup(LookupTaskHandle handle) = 0;
}; };
/// At time of destruction, the EventEngine must have no active /// At time of destruction, the EventEngine must have no active
@ -297,32 +321,48 @@ class EventEngine {
/// Creates and returns an instance of a DNSResolver. /// Creates and returns an instance of a DNSResolver.
virtual std::unique_ptr<DNSResolver> GetDNSResolver() = 0; virtual std::unique_ptr<DNSResolver> GetDNSResolver() = 0;
/// Executes a callback as soon as possible. /// Asynchronously executes a task as soon as possible.
/// ///
/// The \a fn callback's \a status argument is used to indicate whether it was /// \a Closures scheduled with \a Run cannot be cancelled. The \a closure will
/// executed normally. For example, the status may be CANCELLED if the /// not be deleted after it has been run, ownership remains with the caller.
/// EventEngine is being shut down. \a fn is guaranteed to be called exactly virtual void Run(Closure* closure) = 0;
/// once. /// Asynchronously executes a task as soon as possible.
virtual void Run(Callback fn) = 0; ///
/// \a Closures scheduled with \a Run cannot be cancelled. Unlike the
/// overloaded \a Closure alternative, the std::function version's \a closure
/// will be deleted by the EventEngine after the closure has been run.
///
/// This version of \a Run may be less performant than the \a Closure version
/// in some scenarios. This overload is useful in situations where performance
/// is not a critical concern.
virtual void Run(std::function<void()> closure) = 0;
/// Synonymous with scheduling an alarm to run at time \a when. /// Synonymous with scheduling an alarm to run at time \a when.
/// ///
/// The callback \a fn will execute when either when time \a when arrives /// The \a closure will execute when time \a when arrives unless it has been
/// (receiving status OK), or when the \a fn is cancelled (receiving status /// cancelled via the \a Cancel method. If cancelled, the closure will not be
/// CANCELLED). The callback is guaranteed to be called exactly once. /// run, nor will it be deleted. Ownership remains with the caller.
virtual TaskHandle RunAt(absl::Time when, Callback fn) = 0; virtual TaskHandle RunAt(absl::Time when, Closure* closure) = 0;
/// Attempts to cancel a callback. /// Synonymous with scheduling an alarm to run at time \a when.
/// Note that this is a "best effort" cancellation. No guarantee is made that ///
/// the callback will be cancelled, the call could be in any stage. /// The \a closure will execute when time \a when arrives unless it has been
/// cancelled via the \a Cancel method. If cancelled, the closure will not be
/// run. Unilke the overloaded \a Closure alternative, the std::function
/// version's \a closure will be deleted by the EventEngine after the closure
/// has been run, or upon cancellation.
///
/// This version of \a RunAt may be less performant than the \a Closure
/// version in some scenarios. This overload is useful in situations where
/// performance is not a critical concern.
virtual TaskHandle RunAt(absl::Time when, std::function<void()> closure) = 0;
/// Request cancellation of a task.
/// ///
/// There are three scenarios in which we may cancel a scheduled task: /// If the associated closure has already been scheduled to run, it will not
/// 1. We cancel the execution before it has run. /// be cancelled, and this function will return false.
/// 2. The callback has already run.
/// 3. We can't cancel it because it is "in flight".
/// ///
/// In all cases, the cancellation is still considered successful, the /// If the associated callback has not been scheduled to run, it will be
/// callback will be run exactly once from either cancellation or from its /// cancelled, and the associated std::function or \a Closure* will not be
/// activation. /// executed. In this case, Cancel will return true.
virtual void TryCancel(TaskHandle handle) = 0; virtual bool Cancel(TaskHandle handle) = 0;
}; };
// TODO(hork): finalize the API and document it. We need to firm up the story // TODO(hork): finalize the API and document it. We need to firm up the story

@ -17,38 +17,61 @@
#include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/event_engine.h>
#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/event_engine/closure.h"
#include "src/core/lib/iomgr/event_engine/pollset.h" #include "src/core/lib/iomgr/event_engine/pollset.h"
#include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/error_utils.h"
namespace grpc_event_engine { namespace grpc_event_engine {
namespace experimental { namespace experimental {
EventEngine::Callback GrpcClosureToCallback(grpc_closure* closure, namespace {
grpc_error_handle error) {
return [closure, error](absl::Status status) { void RunClosure(grpc_closure* closure, grpc_error_handle error) {
grpc_error_handle new_error = GPR_ASSERT(closure != nullptr);
grpc_error_add_child(error, absl_status_to_grpc_error(status));
#ifndef NDEBUG #ifndef NDEBUG
closure->scheduled = false; closure->scheduled = false;
if (grpc_trace_closure.enabled()) { if (grpc_trace_closure.enabled()) {
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"EventEngine: running closure %p: created [%s:%d]: %s [%s:%d]", "EventEngine: running closure %p: created [%s:%d]: %s [%s:%d]",
closure, closure->file_created, closure->line_created, closure, closure->file_created, closure->line_created,
closure->run ? "run" : "scheduled", closure->file_initiated, closure->run ? "run" : "scheduled", closure->file_initiated,
closure->line_initiated); closure->line_initiated);
} }
#endif #endif
closure->cb(closure->cb_arg, new_error); closure->cb(closure->cb_arg, error);
#ifndef NDEBUG #ifndef NDEBUG
if (grpc_trace_closure.enabled()) { if (grpc_trace_closure.enabled()) {
gpr_log(GPR_DEBUG, "EventEngine: closure %p finished", closure); gpr_log(GPR_DEBUG, "EventEngine: closure %p finished", closure);
} }
#endif #endif
GRPC_ERROR_UNREF(error); }
} // namespace
std::function<void(absl::Status)> GrpcClosureToStatusCallback(
grpc_closure* closure) {
return [closure](absl::Status status) {
RunClosure(closure, absl_status_to_grpc_error(status));
grpc_pollset_ee_broadcast_event();
};
}
std::function<void()> GrpcClosureToCallback(grpc_closure* closure) {
return [closure]() {
RunClosure(closure, GRPC_ERROR_NONE);
grpc_pollset_ee_broadcast_event();
};
}
std::function<void()> GrpcClosureToCallback(grpc_closure* closure,
grpc_error_handle error) {
return [closure, error]() {
RunClosure(closure, error);
grpc_pollset_ee_broadcast_event(); grpc_pollset_ee_broadcast_event();
}; };
} }
} // namespace experimental } // namespace experimental
} // namespace grpc_event_engine } // namespace grpc_event_engine
#endif // GRPC_USE_EVENT_ENGINE #endif // GRPC_USE_EVENT_ENGINE

@ -24,7 +24,16 @@
namespace grpc_event_engine { namespace grpc_event_engine {
namespace experimental { namespace experimental {
EventEngine::Callback GrpcClosureToCallback(grpc_closure* closure, /// Creates a callback that takes an error status argument.
std::function<void(absl::Status)> GrpcClosureToStatusCallback(
grpc_closure* closure);
/// Create a callback that *does not* take an error status argument.
std::function<void()> GrpcClosureToCallback(grpc_closure* closure);
/// Creates a callback that *does not* take an error status argument.
/// This version has a pre-bound error.
std::function<void()> GrpcClosureToCallback(grpc_closure* closure,
grpc_error_handle error); grpc_error_handle error);
} // namespace experimental } // namespace experimental

@ -79,13 +79,15 @@ void resolve_address(const char* addr, const char* default_port,
grpc_pollset_set* /* interested_parties */, grpc_pollset_set* /* interested_parties */,
grpc_closure* on_done, grpc_closure* on_done,
grpc_resolved_addresses** addresses) { grpc_resolved_addresses** addresses) {
auto dns_resolver = grpc_iomgr_event_engine()->GetDNSResolver(); std::unique_ptr<EventEngine::DNSResolver> dns_resolver =
if (!dns_resolver.ok()) { grpc_iomgr_event_engine()->GetDNSResolver();
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, if (dns_resolver == nullptr) {
absl_status_to_grpc_error(dns_resolver.status())); grpc_core::ExecCtx::Run(
DEBUG_LOCATION, on_done,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to get DNS Resolver."));
return; return;
} }
new DnsRequest(std::move(*dns_resolver), addr, default_port, on_done, new DnsRequest(std::move(dns_resolver), addr, default_port, on_done,
addresses); addresses);
} }

@ -35,7 +35,7 @@ extern grpc_core::TraceFlag grpc_tcp_trace;
namespace { namespace {
using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig; using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
using ::grpc_event_engine::experimental::EventEngine; using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GrpcClosureToCallback; using ::grpc_event_engine::experimental::GrpcClosureToStatusCallback;
using ::grpc_event_engine::experimental::SliceAllocator; using ::grpc_event_engine::experimental::SliceAllocator;
using ::grpc_event_engine::experimental::SliceAllocatorFactory; using ::grpc_event_engine::experimental::SliceAllocatorFactory;
using ::grpc_event_engine::experimental::SliceBuffer; using ::grpc_event_engine::experimental::SliceBuffer;
@ -175,7 +175,8 @@ grpc_error_handle tcp_server_create(
EventEngine* event_engine = grpc_iomgr_event_engine(); EventEngine* event_engine = grpc_iomgr_event_engine();
absl::StatusOr<std::unique_ptr<EventEngine::Listener>> listener = absl::StatusOr<std::unique_ptr<EventEngine::Listener>> listener =
event_engine->CreateListener( event_engine->CreateListener(
[server](std::unique_ptr<EventEngine::Endpoint> ee_endpoint) { [server](std::unique_ptr<EventEngine::Endpoint> ee_endpoint,
const SliceAllocator& /*slice_allocator*/) {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
GPR_ASSERT((*server)->on_accept_internal != nullptr); GPR_ASSERT((*server)->on_accept_internal != nullptr);
grpc_event_engine_endpoint* iomgr_endpoint = grpc_event_engine_endpoint* iomgr_endpoint =
@ -191,8 +192,8 @@ grpc_error_handle tcp_server_create(
exec_ctx.Flush(); exec_ctx.Flush();
grpc_pollset_ee_broadcast_event(); grpc_pollset_ee_broadcast_event();
}, },
GrpcClosureToCallback(shutdown_complete, GRPC_ERROR_NONE), GrpcClosureToStatusCallback(shutdown_complete), endpoint_config,
endpoint_config, std::move(ee_slice_allocator_factory)); std::move(ee_slice_allocator_factory));
if (!listener.ok()) { if (!listener.ok()) {
return absl_status_to_grpc_error(listener.status()); return absl_status_to_grpc_error(listener.status());
} }

@ -19,6 +19,7 @@
#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/event_engine/closure.h" #include "src/core/lib/iomgr/event_engine/closure.h"
#include "src/core/lib/iomgr/event_engine/iomgr.h" #include "src/core/lib/iomgr/event_engine/iomgr.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/init.h" #include "src/core/lib/surface/init.h"
#include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/error_utils.h"
@ -32,12 +33,16 @@ void timer_init(grpc_timer* timer, grpc_millis deadline,
timer->ee_task_handle = grpc_iomgr_event_engine()->RunAt( timer->ee_task_handle = grpc_iomgr_event_engine()->RunAt(
grpc_core::ToAbslTime( grpc_core::ToAbslTime(
grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME)), grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME)),
GrpcClosureToCallback(closure, GRPC_ERROR_NONE), {}); GrpcClosureToCallback(closure));
timer->closure = closure;
} }
void timer_cancel(grpc_timer* timer) { void timer_cancel(grpc_timer* timer) {
auto handle = timer->ee_task_handle; auto handle = timer->ee_task_handle;
grpc_iomgr_event_engine()->TryCancel(handle); if (!grpc_iomgr_event_engine()->Cancel(handle)) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure,
GRPC_ERROR_CANCELLED);
}
} }
/* Internal API */ /* Internal API */

@ -26,6 +26,7 @@
#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/event_engine/closure.h" #include "src/core/lib/iomgr/event_engine/closure.h"
#include "src/core/lib/iomgr/event_engine/iomgr.h" #include "src/core/lib/iomgr/event_engine/iomgr.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
@ -52,7 +53,8 @@ static void exec_ctx_run(grpc_closure* closure, grpc_error_handle error) {
static void exec_ctx_sched(grpc_closure* closure, grpc_error_handle error) { static void exec_ctx_sched(grpc_closure* closure, grpc_error_handle error) {
#if defined(GRPC_USE_EVENT_ENGINE) && \ #if defined(GRPC_USE_EVENT_ENGINE) && \
defined(GRPC_EVENT_ENGINE_REPLACE_EXEC_CTX) defined(GRPC_EVENT_ENGINE_REPLACE_EXEC_CTX)
grpc_iomgr_event_engine()->Run(GrpcClosureToCallback(closure, error), {}); grpc_iomgr_event_engine()->Run(
grpc_event_engine::experimental::GrpcClosureToCallback(closure, error));
#else #else
grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure, grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure,
error); error);

Loading…
Cancel
Save