From 029f94550423f96f0c5c9818ae3ac8e1a050a5d9 Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Wed, 31 Aug 2022 17:45:30 -0700 Subject: [PATCH] Migrating posix event pollers to use new event poller interface (#30419) * Migrating posix event pollers to use new event poller interface * add inline attributes * Automated change: Fix sanity tests * remove ref/unref from iomgr engine closure ad add custom closure types internal to the pollers * updating time util usage * use unique_ptrs * update comments * Automated change: Fix sanity tests * fix review comments * review comments * cleanup * update comments * fix * cleanup * update comments * Automated change: Fix sanity tests * fix misleading comments * bug fixes * fix * fix * revert some changes for bug fixes and allow spurious wakeups for poll based poller * sanity * fix * review comments * fix * comment * remove re-defined function * fix review comments * fix windows iocp build issue due to removed function * change Milliseconds return type * remove header * regenerate projects * fix sanity * fix sanity * Automated change: Fix sanity tests * delete unused file * build issue * cleanup Co-authored-by: Vignesh2208 --- BUILD | 17 +- CMakeLists.txt | 2 + Makefile | 2 + build_autogenerated.yaml | 5 + config.m4 | 1 + config.w32 | 1 + gRPC-C++.podspec | 2 + gRPC-Core.podspec | 3 + grpc.gemspec | 2 + grpc.gyp | 2 + package.xml | 2 + .../posix_engine/ev_epoll1_linux.cc | 197 ++++++++----- .../posix_engine/ev_epoll1_linux.h | 40 ++- .../posix_engine/ev_poll_posix.cc | 255 +++++++++-------- .../event_engine/posix_engine/ev_poll_posix.h | 27 +- .../event_engine/posix_engine/event_poller.h | 59 ++-- .../event_poller_posix_default.cc | 4 +- .../posix_engine/event_poller_posix_default.h | 4 +- .../posix_engine/lockfree_event.cc | 6 +- .../posix_engine/lockfree_event.h | 2 +- .../posix_engine/posix_engine_closure.h | 28 +- src/core/lib/event_engine/time_util.cc | 2 +- src/core/lib/event_engine/time_util.h | 4 +- src/core/lib/event_engine/utils.cc | 5 - src/core/lib/event_engine/utils.h | 4 - src/core/lib/event_engine/windows/iocp.cc | 2 +- src/python/grpcio/grpc_core_dependencies.py | 1 + test/core/event_engine/posix/BUILD | 2 + .../posix/event_poller_posix_test.cc | 270 ++++++++++++++++-- .../posix/lock_free_event_test.cc | 12 +- tools/doxygen/Doxyfile.c++.internal | 2 + tools/doxygen/Doxyfile.core.internal | 2 + 32 files changed, 656 insertions(+), 311 deletions(-) diff --git a/BUILD b/BUILD index 998c174f8e8..6d7ff2d2167 100644 --- a/BUILD +++ b/BUILD @@ -2422,14 +2422,15 @@ grpc_cc_library( "src/core/lib/event_engine/posix_engine/event_poller.h", ], external_deps = [ + "absl/functional:any_invocable", "absl/status", "absl/strings", ], deps = [ "event_engine_base_hdrs", + "event_engine_poller", "gpr_platform", "posix_event_engine_closure", - "time", ], ) @@ -2548,6 +2549,7 @@ grpc_cc_library( ], external_deps = [ "absl/base:core_headers", + "absl/functional:any_invocable", "absl/memory", "absl/status", "absl/status:statusor", @@ -2555,15 +2557,17 @@ grpc_cc_library( "absl/synchronization", ], deps = [ + "common_event_engine_closures", + "event_engine_base_hdrs", + "event_engine_poller", + "event_engine_time_util", "gpr", - "gpr_codegen", "iomgr_port", "posix_event_engine_closure", "posix_event_engine_event_poller", "posix_event_engine_lockfree_event", "posix_event_engine_wakeup_fd_posix", "posix_event_engine_wakeup_fd_posix_default", - "time", ], ) @@ -2577,12 +2581,17 @@ grpc_cc_library( ], external_deps = [ "absl/base:core_headers", + "absl/functional:any_invocable", "absl/status", "absl/status:statusor", "absl/strings", "absl/synchronization", ], deps = [ + "common_event_engine_closures", + "event_engine_base_hdrs", + "event_engine_poller", + "event_engine_time_util", "gpr", "gpr_codegen", "iomgr_port", @@ -2737,8 +2746,8 @@ grpc_cc_library( "event_engine_executor", "event_engine_poller", "event_engine_socket_notifier", + "event_engine_time_util", "event_engine_trace", - "event_engine_utils", "gpr", "gpr_platform", ], diff --git a/CMakeLists.txt b/CMakeLists.txt index c2b4f4ceb50..fb17f1ceafd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2119,6 +2119,7 @@ add_library(grpc src/core/lib/event_engine/slice.cc src/core/lib/event_engine/slice_buffer.cc src/core/lib/event_engine/thread_pool.cc + src/core/lib/event_engine/time_util.cc src/core/lib/event_engine/trace.cc src/core/lib/event_engine/utils.cc src/core/lib/event_engine/windows/iocp.cc @@ -2754,6 +2755,7 @@ add_library(grpc_unsecure src/core/lib/event_engine/slice.cc src/core/lib/event_engine/slice_buffer.cc src/core/lib/event_engine/thread_pool.cc + src/core/lib/event_engine/time_util.cc src/core/lib/event_engine/trace.cc src/core/lib/event_engine/utils.cc src/core/lib/event_engine/windows/iocp.cc diff --git a/Makefile b/Makefile index 77b17125ace..08272b1abc3 100644 --- a/Makefile +++ b/Makefile @@ -1435,6 +1435,7 @@ LIBGRPC_SRC = \ src/core/lib/event_engine/slice.cc \ src/core/lib/event_engine/slice_buffer.cc \ src/core/lib/event_engine/thread_pool.cc \ + src/core/lib/event_engine/time_util.cc \ src/core/lib/event_engine/trace.cc \ src/core/lib/event_engine/utils.cc \ src/core/lib/event_engine/windows/iocp.cc \ @@ -1934,6 +1935,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/event_engine/slice.cc \ src/core/lib/event_engine/slice_buffer.cc \ src/core/lib/event_engine/thread_pool.cc \ + src/core/lib/event_engine/time_util.cc \ src/core/lib/event_engine/trace.cc \ src/core/lib/event_engine/utils.cc \ src/core/lib/event_engine/windows/iocp.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 2aefd39fa88..0af1fb72a80 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -790,6 +790,7 @@ libs: - src/core/lib/event_engine/promise.h - src/core/lib/event_engine/socket_notifier.h - src/core/lib/event_engine/thread_pool.h + - src/core/lib/event_engine/time_util.h - src/core/lib/event_engine/trace.h - src/core/lib/event_engine/utils.h - src/core/lib/event_engine/windows/iocp.h @@ -1492,6 +1493,7 @@ libs: - src/core/lib/event_engine/slice.cc - src/core/lib/event_engine/slice_buffer.cc - src/core/lib/event_engine/thread_pool.cc + - src/core/lib/event_engine/time_util.cc - src/core/lib/event_engine/trace.cc - src/core/lib/event_engine/utils.cc - src/core/lib/event_engine/windows/iocp.cc @@ -2007,6 +2009,7 @@ libs: - src/core/lib/event_engine/promise.h - src/core/lib/event_engine/socket_notifier.h - src/core/lib/event_engine/thread_pool.h + - src/core/lib/event_engine/time_util.h - src/core/lib/event_engine/trace.h - src/core/lib/event_engine/utils.h - src/core/lib/event_engine/windows/iocp.h @@ -2349,6 +2352,7 @@ libs: - src/core/lib/event_engine/slice.cc - src/core/lib/event_engine/slice_buffer.cc - src/core/lib/event_engine/thread_pool.cc + - src/core/lib/event_engine/time_util.cc - src/core/lib/event_engine/trace.cc - src/core/lib/event_engine/utils.cc - src/core/lib/event_engine/windows/iocp.cc @@ -5799,6 +5803,7 @@ targets: run: false language: c++ headers: + - src/core/lib/event_engine/common_closures.h - src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h - src/core/lib/event_engine/posix_engine/ev_poll_posix.h - src/core/lib/event_engine/posix_engine/event_poller.h diff --git a/config.m4 b/config.m4 index 3994fb80f4b..ee602b1b290 100644 --- a/config.m4 +++ b/config.m4 @@ -483,6 +483,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/event_engine/slice.cc \ src/core/lib/event_engine/slice_buffer.cc \ src/core/lib/event_engine/thread_pool.cc \ + src/core/lib/event_engine/time_util.cc \ src/core/lib/event_engine/trace.cc \ src/core/lib/event_engine/utils.cc \ src/core/lib/event_engine/windows/iocp.cc \ diff --git a/config.w32 b/config.w32 index 32847fe0417..493335baed5 100644 --- a/config.w32 +++ b/config.w32 @@ -449,6 +449,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\event_engine\\slice.cc " + "src\\core\\lib\\event_engine\\slice_buffer.cc " + "src\\core\\lib\\event_engine\\thread_pool.cc " + + "src\\core\\lib\\event_engine\\time_util.cc " + "src\\core\\lib\\event_engine\\trace.cc " + "src\\core\\lib\\event_engine\\utils.cc " + "src\\core\\lib\\event_engine\\windows\\iocp.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 1cade5b9012..29d2c096a36 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -695,6 +695,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/promise.h', 'src/core/lib/event_engine/socket_notifier.h', 'src/core/lib/event_engine/thread_pool.h', + 'src/core/lib/event_engine/time_util.h', 'src/core/lib/event_engine/trace.h', 'src/core/lib/event_engine/utils.h', 'src/core/lib/event_engine/windows/iocp.h', @@ -1553,6 +1554,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/promise.h', 'src/core/lib/event_engine/socket_notifier.h', 'src/core/lib/event_engine/thread_pool.h', + 'src/core/lib/event_engine/time_util.h', 'src/core/lib/event_engine/trace.h', 'src/core/lib/event_engine/utils.h', 'src/core/lib/event_engine/windows/iocp.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 10c883785ca..2bdf2c4e539 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1076,6 +1076,8 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/socket_notifier.h', 'src/core/lib/event_engine/thread_pool.cc', 'src/core/lib/event_engine/thread_pool.h', + 'src/core/lib/event_engine/time_util.cc', + 'src/core/lib/event_engine/time_util.h', 'src/core/lib/event_engine/trace.cc', 'src/core/lib/event_engine/trace.h', 'src/core/lib/event_engine/utils.cc', @@ -2176,6 +2178,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/promise.h', 'src/core/lib/event_engine/socket_notifier.h', 'src/core/lib/event_engine/thread_pool.h', + 'src/core/lib/event_engine/time_util.h', 'src/core/lib/event_engine/trace.h', 'src/core/lib/event_engine/utils.h', 'src/core/lib/event_engine/windows/iocp.h', diff --git a/grpc.gemspec b/grpc.gemspec index 22583fd3288..b9723f5ae3a 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -989,6 +989,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/event_engine/socket_notifier.h ) s.files += %w( src/core/lib/event_engine/thread_pool.cc ) s.files += %w( src/core/lib/event_engine/thread_pool.h ) + s.files += %w( src/core/lib/event_engine/time_util.cc ) + s.files += %w( src/core/lib/event_engine/time_util.h ) s.files += %w( src/core/lib/event_engine/trace.cc ) s.files += %w( src/core/lib/event_engine/trace.h ) s.files += %w( src/core/lib/event_engine/utils.cc ) diff --git a/grpc.gyp b/grpc.gyp index a173a35b0bd..1ae4b63e199 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -814,6 +814,7 @@ 'src/core/lib/event_engine/slice.cc', 'src/core/lib/event_engine/slice_buffer.cc', 'src/core/lib/event_engine/thread_pool.cc', + 'src/core/lib/event_engine/time_util.cc', 'src/core/lib/event_engine/trace.cc', 'src/core/lib/event_engine/utils.cc', 'src/core/lib/event_engine/windows/iocp.cc', @@ -1258,6 +1259,7 @@ 'src/core/lib/event_engine/slice.cc', 'src/core/lib/event_engine/slice_buffer.cc', 'src/core/lib/event_engine/thread_pool.cc', + 'src/core/lib/event_engine/time_util.cc', 'src/core/lib/event_engine/trace.cc', 'src/core/lib/event_engine/utils.cc', 'src/core/lib/event_engine/windows/iocp.cc', diff --git a/package.xml b/package.xml index 4c04a727a28..b4b856f9fb1 100644 --- a/package.xml +++ b/package.xml @@ -971,6 +971,8 @@ + + diff --git a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc index 2fb1a93938c..e6c4559e451 100644 --- a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc +++ b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc @@ -17,18 +17,20 @@ #include -#include +#include #include +#include "absl/functional/any_invocable.h" #include "absl/memory/memory.h" +#include "absl/status/status.h" #include "absl/status/statusor.h" -#include "absl/strings/str_cat.h" -#include +#include #include #include -#include +#include "src/core/lib/event_engine/poller.h" +#include "src/core/lib/event_engine/time_util.h" #include "src/core/lib/iomgr/port.h" // This polling engine is only relevant on linux kernels supporting epoll @@ -41,17 +43,15 @@ #include #include -#include - #include "absl/synchronization/mutex.h" +#include "src/core/lib/event_engine/common_closures.h" #include "src/core/lib/event_engine/posix_engine/event_poller.h" #include "src/core/lib/event_engine/posix_engine/lockfree_event.h" #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h" #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h" #include "src/core/lib/gprpp/fork.h" -#include "src/core/lib/gprpp/time.h" using ::grpc_event_engine::posix_engine::LockfreeEvent; using ::grpc_event_engine::posix_engine::WakeupFd; @@ -61,13 +61,19 @@ using ::grpc_event_engine::posix_engine::WakeupFd; namespace grpc_event_engine { namespace posix_engine { +using ::grpc_event_engine::experimental::AnyInvocableClosure; +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::Poller; +using ::grpc_event_engine::posix_engine::LockfreeEvent; +using ::grpc_event_engine::posix_engine::WakeupFd; + class Epoll1EventHandle : public EventHandle { public: Epoll1EventHandle(int fd, Epoll1Poller* poller) : fd_(fd), - pending_actions_(0), - list_(), + list_(this), poller_(poller), + exec_actions_closure_([this]() { ExecutePendingActions(); }), read_closure_(absl::make_unique(poller->GetScheduler())), write_closure_( absl::make_unique(poller->GetScheduler())), @@ -76,47 +82,77 @@ class Epoll1EventHandle : public EventHandle { read_closure_->InitEvent(); write_closure_->InitEvent(); error_closure_->InitEvent(); - pending_actions_ = 0; + pending_read_.store(false, std::memory_order_relaxed); + pending_write_.store(false, std::memory_order_relaxed); + pending_error_.store(false, std::memory_order_relaxed); + } + void ReInit(int fd) { + fd_ = fd; + read_closure_->InitEvent(); + write_closure_->InitEvent(); + error_closure_->InitEvent(); + pending_read_.store(false, std::memory_order_relaxed); + pending_write_.store(false, std::memory_order_relaxed); + pending_error_.store(false, std::memory_order_relaxed); } Epoll1Poller* Poller() { return poller_; } - void SetPendingActions(bool pending_read, bool pending_write, - bool pending_error) { - pending_actions_ |= pending_read; + EventEngine::Closure* SetPendingActions(bool pending_read, bool pending_write, + bool pending_error) { + // Another thread may be executing ExecutePendingActions() at this point + // This is possible for instance, if one instantiation of Work(..) sets + // an fd to be readable while the next instantiation of Work(...) may + // set the fd to be writable. While the second instantiation is running, + // ExecutePendingActions() of the first instantiation may execute in + // parallel and read the pending_<***>_ variables. So we need to use + // atomics to manipulate pending_<***>_ variables. + + if (pending_read) { + pending_read_.store(true, std::memory_order_release); + } + if (pending_write) { - pending_actions_ |= (1 << 2); + pending_write_.store(true, std::memory_order_release); } + if (pending_error) { - pending_actions_ |= (1 << 3); + pending_error_.store(true, std::memory_order_release); } + + if (pending_read || pending_write || pending_error) { + return &exec_actions_closure_; + } + return nullptr; } int WrappedFd() override { return fd_; } - void OrphanHandle(IomgrEngineClosure* on_done, int* release_fd, + void OrphanHandle(PosixEngineClosure* on_done, int* release_fd, absl::string_view reason) override; void ShutdownHandle(absl::Status why) override; - void NotifyOnRead(IomgrEngineClosure* on_read) override; - void NotifyOnWrite(IomgrEngineClosure* on_write) override; - void NotifyOnError(IomgrEngineClosure* on_error) override; + void NotifyOnRead(PosixEngineClosure* on_read) override; + void NotifyOnWrite(PosixEngineClosure* on_write) override; + void NotifyOnError(PosixEngineClosure* on_error) override; void SetReadable() override; void SetWritable() override; void SetHasError() override; bool IsHandleShutdown() override; - void ExecutePendingActions() override { - if (pending_actions_ & 1UL) { + inline void ExecutePendingActions() { + // These may execute in Parallel with ShutdownHandle. Thats not an issue + // because the lockfree event implementation should be able to handle it. + if (pending_read_.exchange(false, std::memory_order_acq_rel)) { read_closure_->SetReady(); } - if ((pending_actions_ >> 2) & 1UL) { + if (pending_write_.exchange(false, std::memory_order_acq_rel)) { write_closure_->SetReady(); } - if ((pending_actions_ >> 3) & 1UL) { + if (pending_error_.exchange(false, std::memory_order_acq_rel)) { error_closure_->SetReady(); } - pending_actions_ = 0; } absl::Mutex* mu() { return &mu_; } LockfreeEvent* ReadClosure() { return read_closure_.get(); } LockfreeEvent* WriteClosure() { return write_closure_.get(); } LockfreeEvent* ErrorClosure() { return error_closure_.get(); } Epoll1Poller::HandlesList& ForkFdListPos() { return list_; } + ~Epoll1EventHandle() override = default; private: void HandleShutdownInternal(absl::Status why, bool releasing_fd); @@ -124,9 +160,14 @@ class Epoll1EventHandle : public EventHandle { // required. absl::Mutex mu_; int fd_; - int pending_actions_; + // See Epoll1Poller::SetPendingActions for explanation on why pending_<***>_ + // need to be atomic. + std::atomic pending_read_{false}; + std::atomic pending_write_{false}; + std::atomic pending_error_{false}; Epoll1Poller::HandlesList list_; Epoll1Poller* poller_; + AnyInvocableClosure exec_actions_closure_; std::unique_ptr read_closure_; std::unique_ptr write_closure_; std::unique_ptr error_closure_; @@ -206,20 +247,6 @@ void ForkPollerListRemovePoller(Epoll1Poller* poller) { } } -int PollDeadlineToMillisTimeout(grpc_core::Timestamp millis) { - if (millis == grpc_core::Timestamp::InfFuture()) return -1; - grpc_core::Timestamp now = - grpc_core::Timestamp::FromTimespecRoundDown(gpr_now(GPR_CLOCK_MONOTONIC)); - int64_t delta = (millis - now).millis(); - if (delta > INT_MAX) { - return INT_MAX; - } else if (delta < 0) { - return 0; - } else { - return static_cast(delta); - } -} - bool InitEpoll1PollerLinux(); // Called by the child process's post-fork handler to close open fds, @@ -269,7 +296,7 @@ bool InitEpoll1PollerLinux() { } // namespace -void Epoll1EventHandle::OrphanHandle(IomgrEngineClosure* on_done, +void Epoll1EventHandle::OrphanHandle(PosixEngineClosure* on_done, int* release_fd, absl::string_view reason) { bool is_release_fd = (release_fd != nullptr); @@ -295,12 +322,13 @@ void Epoll1EventHandle::OrphanHandle(IomgrEngineClosure* on_done, write_closure_->DestroyEvent(); error_closure_->DestroyEvent(); } - + pending_read_.store(false, std::memory_order_release); + pending_write_.store(false, std::memory_order_release); + pending_error_.store(false, std::memory_order_release); { absl::MutexLock lock(&poller_->mu_); poller_->free_epoll1_handles_list_.push_back(this); } - if (on_done != nullptr) { on_done->SetStatus(absl::OkStatus()); poller_->GetScheduler()->Run(on_done); @@ -376,6 +404,7 @@ EventHandle* Epoll1Poller::CreateHandle(int fd, absl::string_view /*name*/, new_handle = reinterpret_cast( free_epoll1_handles_list_.front()); free_epoll1_handles_list_.pop_front(); + new_handle->ReInit(fd); } } ForkFdListAddHandle(new_handle); @@ -397,10 +426,13 @@ EventHandle* Epoll1Poller::CreateHandle(int fd, absl::string_view /*name*/, // Process the epoll events found by DoEpollWait() function. // - g_epoll_set.cursor points to the index of the first event to be processed -// - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and -// updates the g_epoll_set.cursor -absl::Status Epoll1Poller::ProcessEpollEvents( - int max_epoll_events_to_handle, std::vector& pending_events) { +// - This function then processes up-to max_epoll_events_to_handle and +// updates the g_epoll_set.cursor. +// It returns true, it there was a Kick that forced invocation of this +// function. It also returns the list of closures to run to take action +// on file descriptors that became readable/writable. +bool Epoll1Poller::ProcessEpollEvents(int max_epoll_events_to_handle, + Poller::Events& pending_events) { int64_t num_events = g_epoll_set_.num_events; int64_t cursor = g_epoll_set_.cursor; bool was_kicked = false; @@ -422,35 +454,37 @@ absl::Status Epoll1Poller::ProcessEpollEvents( bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; bool write_ev = (ev->events & EPOLLOUT) != 0; bool err_fallback = error && !track_err; - - handle->SetPendingActions(read_ev || cancel || err_fallback, - write_ev || cancel || err_fallback, - error && !err_fallback); - pending_events.push_back(handle); + if (EventEngine::Closure* closure = handle->SetPendingActions( + read_ev || cancel || err_fallback, + write_ev || cancel || err_fallback, error && !err_fallback)) { + pending_events.push_back(closure); + } } } g_epoll_set_.cursor = cursor; - return was_kicked ? absl::Status(absl::StatusCode::kInternal, "Kicked") - : absl::OkStatus(); + return was_kicked; } // Do epoll_wait and store the events in g_epoll_set.events field. This does // not "process" any of the events yet; that is done in ProcessEpollEvents(). -// See ProcessEpollEvents() function for more details. -absl::Status Epoll1Poller::DoEpollWait(grpc_core::Timestamp deadline) { +// See ProcessEpollEvents() function for more details. It returns the number +// of events generated by epoll_wait. +int Epoll1Poller::DoEpollWait(EventEngine::Duration timeout) { int r; - int timeout = PollDeadlineToMillisTimeout(deadline); do { r = epoll_wait(g_epoll_set_.epfd, g_epoll_set_.events, MAX_EPOLL_EVENTS, - timeout); + static_cast( + grpc_event_engine::experimental::Milliseconds(timeout))); } while (r < 0 && errno == EINTR); if (r < 0) { - return absl::Status(absl::StatusCode::kInternal, - absl::StrCat("epoll_wait: ", strerror(errno))); + gpr_log(GPR_ERROR, + "(event_engine) Epoll1Poller:%p encountered epoll_wait error: %s", + this, strerror(errno)); + GPR_ASSERT(false); } g_epoll_set_.num_events = r; g_epoll_set_.cursor = 0; - return absl::OkStatus(); + return r; } // Might be called multiple times @@ -469,15 +503,15 @@ bool Epoll1EventHandle::IsHandleShutdown() { return read_closure_->IsShutdown(); } -void Epoll1EventHandle::NotifyOnRead(IomgrEngineClosure* on_read) { +void Epoll1EventHandle::NotifyOnRead(PosixEngineClosure* on_read) { read_closure_->NotifyOn(on_read); } -void Epoll1EventHandle::NotifyOnWrite(IomgrEngineClosure* on_write) { +void Epoll1EventHandle::NotifyOnWrite(PosixEngineClosure* on_write) { write_closure_->NotifyOn(on_write); } -void Epoll1EventHandle::NotifyOnError(IomgrEngineClosure* on_error) { +void Epoll1EventHandle::NotifyOnError(PosixEngineClosure* on_error) { error_closure_->NotifyOn(on_error); } @@ -487,24 +521,28 @@ void Epoll1EventHandle::SetWritable() { write_closure_->SetReady(); } void Epoll1EventHandle::SetHasError() { error_closure_->SetReady(); } -absl::Status Epoll1Poller::Work(grpc_core::Timestamp deadline, - std::vector& pending_events) { +// Polls the registered Fds for events until timeout is reached or there is a +// Kick(). If there is a Kick(), it returns any previously un-processed events. +// If there are no un-processed events, it returns Poller::WorkResult::Kicked{} +Poller::WorkResult Epoll1Poller::Work(EventEngine::Duration timeout) { + Poller::Events pending_events; if (g_epoll_set_.cursor == g_epoll_set_.num_events) { - auto status = DoEpollWait(deadline); - if (!status.ok()) { - return status; + if (DoEpollWait(timeout) == 0) { + return Poller::DeadlineExceeded{}; } } { absl::MutexLock lock(&mu_); // If was_kicked_ is true, collect all pending events in this iteration. - auto status = ProcessEpollEvents( - was_kicked_ ? INT_MAX : MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION, - pending_events); - if (!status.ok()) { + if (ProcessEpollEvents( + was_kicked_ ? INT_MAX : MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION, + pending_events)) { was_kicked_ = false; } - return status; + if (pending_events.empty()) { + return Poller::Kicked{}; + } + return pending_events; } } @@ -534,6 +572,9 @@ Epoll1Poller* GetEpoll1Poller(Scheduler* scheduler) { namespace grpc_event_engine { namespace posix_engine { +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::Poller; + Epoll1Poller::Epoll1Poller(Scheduler* /* engine */) { GPR_ASSERT(false && "unimplemented"); } @@ -547,18 +588,16 @@ EventHandle* Epoll1Poller::CreateHandle(int /*fd*/, absl::string_view /*name*/, GPR_ASSERT(false && "unimplemented"); } -absl::Status Epoll1Poller::ProcessEpollEvents( - int /*max_epoll_events_to_handle*/, - std::vector& /*pending_events*/) { +bool Epoll1Poller::ProcessEpollEvents(int /*max_epoll_events_to_handle*/, + Poller::Events& /*pending_events*/) { GPR_ASSERT(false && "unimplemented"); } -absl::Status Epoll1Poller::DoEpollWait(grpc_core::Timestamp /*deadline*/) { +int Epoll1Poller::DoEpollWait(EventEngine::Duration /*timeout*/) { GPR_ASSERT(false && "unimplemented"); } -absl::Status Epoll1Poller::Work(grpc_core::Timestamp /*deadline*/, - std::vector& /*pending_events*/) { +Poller::WorkResult Epoll1Poller::Work(EventEngine::Duration /*timeout*/) { GPR_ASSERT(false && "unimplemented"); } diff --git a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h index 103b062a20a..582fc8d48e1 100644 --- a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h +++ b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h @@ -18,16 +18,17 @@ #include #include -#include +#include #include "absl/base/thread_annotations.h" -#include "absl/status/status.h" #include "absl/strings/string_view.h" #include "absl/synchronization/mutex.h" +#include + +#include "src/core/lib/event_engine/poller.h" #include "src/core/lib/event_engine/posix_engine/event_poller.h" #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h" -#include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/port.h" #ifdef GRPC_LINUX_EPOLL @@ -42,26 +43,41 @@ namespace posix_engine { class Epoll1EventHandle; // Definition of epoll1 based poller. -class Epoll1Poller : public EventPoller { +class Epoll1Poller : public PosixEventPoller { public: explicit Epoll1Poller(Scheduler* scheduler); EventHandle* CreateHandle(int fd, absl::string_view name, bool track_err) override; - absl::Status Work(grpc_core::Timestamp deadline, - std::vector& pending_events) override; + Poller::WorkResult Work( + grpc_event_engine::experimental::EventEngine::Duration timeout) override; + std::string Name() override { return "epoll1"; } void Kick() override; Scheduler* GetScheduler() { return scheduler_; } void Shutdown() override; ~Epoll1Poller() override; private: - absl::Status ProcessEpollEvents(int max_epoll_events_to_handle, - std::vector& pending_events); - absl::Status DoEpollWait(grpc_core::Timestamp deadline); - struct HandlesList { + // Process the epoll events found by DoEpollWait() function. + // - g_epoll_set.cursor points to the index of the first event to be processed + // - This function then processes up-to max_epoll_events_to_handle and + // updates the g_epoll_set.cursor. + // It returns true, it there was a Kick that forced invocation of this + // function. It also returns the list of closures to run to take action + // on file descriptors that became readable/writable. + bool ProcessEpollEvents(int max_epoll_events_to_handle, + Poller::Events& pending_events); + // Do epoll_wait and store the events in g_epoll_set.events field. This does + // not "process" any of the events yet; that is done in ProcessEpollEvents(). + // See ProcessEpollEvents() function for more details. It returns the number + // of events generated by epoll_wait. + int DoEpollWait( + grpc_event_engine::experimental::EventEngine::Duration timeout); + class HandlesList { + public: + explicit HandlesList(Epoll1EventHandle* handle) : handle(handle) {} Epoll1EventHandle* handle; - Epoll1EventHandle* next; - Epoll1EventHandle* prev; + Epoll1EventHandle* next = nullptr; + Epoll1EventHandle* prev = nullptr; }; friend class Epoll1EventHandle; #ifdef GRPC_LINUX_EPOLL diff --git a/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc b/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc index 1ce5d4f7edd..d1be9b16999 100644 --- a/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc +++ b/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc @@ -19,20 +19,22 @@ #include #include -#include #include #include #include #include +#include "absl/functional/any_invocable.h" +#include "absl/status/status.h" #include "absl/status/statusor.h" -#include "absl/strings/str_cat.h" +#include #include #include #include #include +#include "src/core/lib/event_engine/poller.h" #include "src/core/lib/event_engine/posix_engine/event_poller.h" #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" #include "src/core/lib/gprpp/memory.h" @@ -47,22 +49,20 @@ #include #include -#include - #include "absl/synchronization/mutex.h" #include +#include "src/core/lib/event_engine/common_closures.h" #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h" #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h" +#include "src/core/lib/event_engine/time_util.h" #include "src/core/lib/gprpp/fork.h" #include "src/core/lib/gprpp/global_config.h" #include "src/core/lib/gprpp/time.h" GPR_GLOBAL_CONFIG_DECLARE_STRING(grpc_poll_strategy); -using ::grpc_event_engine::posix_engine::WakeupFd; - static const intptr_t kClosureNotReady = 0; static const intptr_t kClosureReady = 1; static const int kPollinCheck = POLLIN | POLLHUP | POLLERR; @@ -71,13 +71,18 @@ static const int kPolloutCheck = POLLOUT | POLLHUP | POLLERR; namespace grpc_event_engine { namespace posix_engine { +using ::grpc_event_engine::experimental::AnyInvocableClosure; +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::Poller; +using ::grpc_event_engine::posix_engine::WakeupFd; + class PollEventHandle : public EventHandle { public: PollEventHandle(int fd, PollPoller* poller) : fd_(fd), pending_actions_(0), - fork_fd_list_(), - poller_handles_list_(), + fork_fd_list_(this), + poller_handles_list_(this), poller_(poller), scheduler_(poller->GetScheduler()), is_orphaned_(false), @@ -87,20 +92,29 @@ class PollEventHandle : public EventHandle { pollhup_(false), watch_mask_(-1), shutdown_error_(absl::OkStatus()), + exec_actions_closure_([this]() { ExecutePendingActions(); }), on_done_(nullptr), - read_closure_(reinterpret_cast(kClosureNotReady)), + read_closure_(reinterpret_cast(kClosureNotReady)), write_closure_( - reinterpret_cast(kClosureNotReady)) { + reinterpret_cast(kClosureNotReady)) { poller_->Ref(); absl::MutexLock lock(&poller_->mu_); poller_->PollerHandlesListAddHandle(this); } PollPoller* Poller() { return poller_; } - void SetPendingActions(bool pending_read, bool pending_write) { + EventEngine::Closure* SetPendingActions(bool pending_read, + bool pending_write) { pending_actions_ |= pending_read; if (pending_write) { pending_actions_ |= (1 << 2); } + if (pending_read || pending_write) { + // The closure is going to be executed. We'll Unref this handle in + // ExecutePendingActions. + Ref(); + return &exec_actions_closure_; + } + return nullptr; } void ForceRemoveHandleFromPoller() { absl::MutexLock lock(&poller_->mu_); @@ -130,12 +144,12 @@ class PollEventHandle : public EventHandle { void SetWatched(int watch_mask) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { watch_mask_ = watch_mask; } - void OrphanHandle(IomgrEngineClosure* on_done, int* release_fd, + void OrphanHandle(PosixEngineClosure* on_done, int* release_fd, absl::string_view reason) override; void ShutdownHandle(absl::Status why) override; - void NotifyOnRead(IomgrEngineClosure* on_read) override; - void NotifyOnWrite(IomgrEngineClosure* on_write) override; - void NotifyOnError(IomgrEngineClosure* on_error) override; + void NotifyOnRead(PosixEngineClosure* on_read) override; + void NotifyOnWrite(PosixEngineClosure* on_write) override; + void NotifyOnError(PosixEngineClosure* on_error) override; void SetReadable() override; void SetWritable() override; void SetHasError() override; @@ -143,7 +157,7 @@ class PollEventHandle : public EventHandle { absl::MutexLock lock(&mu_); return is_shutdown_; }; - void ExecutePendingActions() override { + inline void ExecutePendingActions() { int kick = 0; { absl::MutexLock lock(&mu_); @@ -161,11 +175,11 @@ class PollEventHandle : public EventHandle { } if (kick) { // SetReadyLocked immediately scheduled some closure. It would have set - // the closure state to NOT_READY. We need to wakeup the Work(...) thread - // to start polling on this fd. If this call is not made, it is possible - // that the poller will reach a state where all the fds under the - // poller's control are not polled for POLLIN/POLLOUT events thus leading - // to an indefinitely blocked Work(..) method. + // the closure state to NOT_READY. We need to wakeup the Work(...) + // thread to start polling on this fd. If this call is not made, it is + // possible that the poller will reach a state where all the fds under + // the poller's control are not polled for POLLIN/POLLOUT events thus + // leading to an indefinitely blocked Work(..) method. poller_->KickExternal(false); } Unref(); @@ -188,12 +202,12 @@ class PollEventHandle : public EventHandle { } uint32_t BeginPollLocked(uint32_t read_mask, uint32_t write_mask) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - bool EndPollLocked(int got_read, int got_write) + EventEngine::Closure* EndPollLocked(int got_read, int got_write) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); private: - int SetReadyLocked(IomgrEngineClosure** st); - int NotifyOnLocked(IomgrEngineClosure** st, IomgrEngineClosure* closure); + int SetReadyLocked(PosixEngineClosure** st); + int NotifyOnLocked(PosixEngineClosure** st, PosixEngineClosure* closure); // See Epoll1Poller::ShutdownHandle for explanation on why a mutex is // required. absl::Mutex mu_; @@ -211,13 +225,13 @@ class PollEventHandle : public EventHandle { bool pollhup_; int watch_mask_; absl::Status shutdown_error_; - IomgrEngineClosure* on_done_; - IomgrEngineClosure* read_closure_; - IomgrEngineClosure* write_closure_; + AnyInvocableClosure exec_actions_closure_; + PosixEngineClosure* on_done_; + PosixEngineClosure* read_closure_; + PosixEngineClosure* write_closure_; }; namespace { - // Only used when GRPC_ENABLE_FORK_SUPPORT=1 std::list fork_poller_list; @@ -272,11 +286,12 @@ void ForkPollerListRemovePoller(PollPoller* poller) { } } -int PollDeadlineToMillisTimeout(grpc_core::Timestamp millis) { - if (millis == grpc_core::Timestamp::InfFuture()) return -1; +// Returns the number of milliseconds elapsed between now and start timestamp. +int PollElapsedTimeToMillis(grpc_core::Timestamp start) { + if (start == grpc_core::Timestamp::InfFuture()) return -1; grpc_core::Timestamp now = grpc_core::Timestamp::FromTimespecRoundDown(gpr_now(GPR_CLOCK_MONOTONIC)); - int64_t delta = (millis - now).millis(); + int64_t delta = (now - start).millis(); if (delta > INT_MAX) { return INT_MAX; } else if (delta < 0) { @@ -289,9 +304,9 @@ int PollDeadlineToMillisTimeout(grpc_core::Timestamp millis) { bool InitPollPollerPosix(); // Called by the child process's post-fork handler to close open fds, -// including the global epoll fd of each poller. This allows gRPC to shutdown in -// the child process without interfering with connections or RPCs ongoing in the -// parent. +// including the global epoll fd of each poller. This allows gRPC to shutdown +// in the child process without interfering with connections or RPCs ongoing +// in the parent. void ResetEventManagerOnFork() { // Delete all pending Epoll1EventHandles. gpr_mu_lock(&fork_fd_list_mu); @@ -338,10 +353,13 @@ EventHandle* PollPoller::CreateHandle(int fd, absl::string_view /*name*/, GPR_DEBUG_ASSERT(track_err == false); PollEventHandle* handle = new PollEventHandle(fd, this); ForkFdListAddHandle(handle); + // We need to send a kick to the thread executing Work(..) so that it can + // add this new Fd into the list of Fds to poll. + KickExternal(false); return handle; } -void PollEventHandle::OrphanHandle(IomgrEngineClosure* on_done, int* release_fd, +void PollEventHandle::OrphanHandle(PosixEngineClosure* on_done, int* release_fd, absl::string_view /* reason */) { ForkFdListRemoveHandle(this); ForceRemoveHandleFromPoller(); @@ -358,9 +376,11 @@ void PollEventHandle::OrphanHandle(IomgrEngineClosure* on_done, int* release_fd, if (!is_shutdown_) { is_shutdown_ = true; shutdown_error_ = - absl::Status(absl::StatusCode::kInternal, "FD Shutdown"); + absl::Status(absl::StatusCode::kInternal, "FD Orphaned"); // signal read/write closed to OS so that future operations fail. - shutdown(fd_, SHUT_RDWR); + if (!released_) { + shutdown(fd_, SHUT_RDWR); + } SetReadyLocked(&read_closure_); SetReadyLocked(&write_closure_); } @@ -378,18 +398,18 @@ void PollEventHandle::OrphanHandle(IomgrEngineClosure* on_done, int* release_fd, Unref(); } -int PollEventHandle::NotifyOnLocked(IomgrEngineClosure** st, - IomgrEngineClosure* closure) { +int PollEventHandle::NotifyOnLocked(PosixEngineClosure** st, + PosixEngineClosure* closure) { if (is_shutdown_ || pollhup_) { - closure->SetStatus( - absl::Status(absl::StatusCode::kInternal, "FD Shutdown")); + closure->SetStatus(shutdown_error_); scheduler_->Run(closure); - } else if (*st == reinterpret_cast(kClosureNotReady)) { + } else if (*st == reinterpret_cast(kClosureNotReady)) { // not ready ==> switch to a waiting state by setting the closure *st = closure; - } else if (*st == reinterpret_cast(kClosureReady)) { + return 0; + } else if (*st == reinterpret_cast(kClosureReady)) { // already ready ==> queue the closure to run immediately - *st = reinterpret_cast(kClosureNotReady); + *st = reinterpret_cast(kClosureNotReady); closure->SetStatus(shutdown_error_); scheduler_->Run(closure); return 1; @@ -404,18 +424,18 @@ int PollEventHandle::NotifyOnLocked(IomgrEngineClosure** st, } // returns 1 if state becomes not ready -int PollEventHandle::SetReadyLocked(IomgrEngineClosure** st) { - if (*st == reinterpret_cast(kClosureReady)) { +int PollEventHandle::SetReadyLocked(PosixEngineClosure** st) { + if (*st == reinterpret_cast(kClosureReady)) { // duplicate ready ==> ignore return 0; - } else if (*st == reinterpret_cast(kClosureNotReady)) { + } else if (*st == reinterpret_cast(kClosureNotReady)) { // not ready, and not waiting ==> flag ready - *st = reinterpret_cast(kClosureReady); + *st = reinterpret_cast(kClosureReady); return 0; } else { // waiting ==> queue closure - IomgrEngineClosure* closure = *st; - *st = reinterpret_cast(kClosureNotReady); + PosixEngineClosure* closure = *st; + *st = reinterpret_cast(kClosureNotReady); closure->SetStatus(shutdown_error_); scheduler_->Run(closure); return 1; @@ -423,8 +443,8 @@ int PollEventHandle::SetReadyLocked(IomgrEngineClosure** st) { } void PollEventHandle::ShutdownHandle(absl::Status why) { - // We need to take a Ref here because SetReadyLocked may trigger execution of - // a closure which calls OrphanHandle or poller->Shutdown() prematurely. + // We need to take a Ref here because SetReadyLocked may trigger execution + // of a closure which calls OrphanHandle or poller->Shutdown() prematurely. Ref(); { absl::MutexLock lock(&mu_); @@ -442,9 +462,9 @@ void PollEventHandle::ShutdownHandle(absl::Status why) { Unref(); } -void PollEventHandle::NotifyOnRead(IomgrEngineClosure* on_read) { - // We need to take a Ref here because NotifyOnLocked may trigger execution of - // a closure which calls OrphanHandle that may delete this object or call +void PollEventHandle::NotifyOnRead(PosixEngineClosure* on_read) { + // We need to take a Ref here because NotifyOnLocked may trigger execution + // of a closure which calls OrphanHandle that may delete this object or call // poller->Shutdown() prematurely. Ref(); { @@ -464,9 +484,9 @@ void PollEventHandle::NotifyOnRead(IomgrEngineClosure* on_read) { Unref(); } -void PollEventHandle::NotifyOnWrite(IomgrEngineClosure* on_write) { - // We need to take a Ref here because NotifyOnLocked may trigger execution of - // a closure which calls OrphanHandle that may delete this object or call +void PollEventHandle::NotifyOnWrite(PosixEngineClosure* on_write) { + // We need to take a Ref here because NotifyOnLocked may trigger execution + // of a closure which calls OrphanHandle that may delete this object or call // poller->Shutdown() prematurely. Ref(); { @@ -486,7 +506,7 @@ void PollEventHandle::NotifyOnWrite(IomgrEngineClosure* on_write) { Unref(); } -void PollEventHandle::NotifyOnError(IomgrEngineClosure* on_error) { +void PollEventHandle::NotifyOnError(PosixEngineClosure* on_error) { on_error->SetStatus( absl::Status(absl::StatusCode::kCancelled, "Polling engine does not support tracking errors")); @@ -519,31 +539,35 @@ uint32_t PollEventHandle::BeginPollLocked(uint32_t read_mask, bool read_ready = (pending_actions_ & 1UL); bool write_ready = ((pending_actions_ >> 2) & 1UL); Ref(); - // if we are shutdown, then no need to poll this fd. Set watch_mask to 0. + // If we are shutdown, then no need to poll this fd. Set watch_mask to 0. if (is_shutdown_) { SetWatched(0); return 0; } - // if there is nobody polling for read, but we need to, then start doing so + // If there is nobody polling for read, but we need to, then start doing so. if (read_mask && !read_ready && - read_closure_ != reinterpret_cast(kClosureReady)) { + read_closure_ != reinterpret_cast(kClosureReady)) { mask |= read_mask; } - // if there is nobody polling for write, but we need to, then start doing so + + // If there is nobody polling for write, but we need to, then start doing so if (write_mask && !write_ready && - write_closure_ != reinterpret_cast(kClosureReady)) { + write_closure_ != reinterpret_cast(kClosureReady)) { mask |= write_mask; } SetWatched(mask); return mask; } -bool PollEventHandle::EndPollLocked(int got_read, int got_write) { - SetPendingActions(got_read, got_write); +EventEngine::Closure* PollEventHandle::EndPollLocked(int got_read, + int got_write) { + EventEngine::Closure* closure = nullptr; if (is_orphaned_ && !IsWatched()) { CloseFd(); + } else if (!is_orphaned_) { + closure = SetPendingActions(got_read, got_write); } - return !is_orphaned_ && (got_read || got_write); + return closure; } void PollPoller::KickExternal(bool ext) { @@ -573,7 +597,7 @@ void PollPoller::PollerHandlesListAddHandle(PollEventHandle* handle) { void PollPoller::PollerHandlesListRemoveHandle(PollEventHandle* handle) { if (poll_handles_list_head_ == handle) { - poll_handles_list_head_ = handle->ForkFdListPos().next; + poll_handles_list_head_ = handle->PollerHandlesListPos().next; } if (handle->PollerHandlesListPos().prev != nullptr) { handle->PollerHandlesListPos().prev->PollerHandlesListPos().next = @@ -617,33 +641,28 @@ PollPoller::~PollPoller() { GPR_ASSERT(poll_handles_list_head_ == nullptr); } -absl::Status PollPoller::Work(grpc_core::Timestamp deadline, - std::vector& pending_events) { - absl::Status error = absl::OkStatus(); +Poller::WorkResult PollPoller::Work(EventEngine::Duration timeout) { // Avoid malloc for small number of elements. enum { inline_elements = 96 }; struct pollfd pollfd_space[inline_elements]; + bool was_kicked_ext = false; PollEventHandle* watcher_space[inline_elements]; + Poller::Events pending_events; + int timeout_ms = + static_cast(grpc_event_engine::experimental::Milliseconds(timeout)); mu_.Lock(); - if (std::exchange(was_kicked_, false) && - std::exchange(was_kicked_ext_, false)) { - // External kick. Need to break out. - mu_.Unlock(); - return absl::Status(absl::StatusCode::kInternal, "Kicked"); - } - // Start polling, and keep doing so while we're being asked to // re-evaluate our pollers (this allows poll() based pollers to // ensure they don't miss wakeups). - while (error.ok() && pending_events.empty() && - deadline > grpc_core::Timestamp::FromTimespecRoundDown( - gpr_now(GPR_CLOCK_MONOTONIC))) { - int timeout = PollDeadlineToMillisTimeout(deadline); + while (pending_events.empty() && timeout_ms >= 0) { int r = 0; size_t i; nfds_t pfd_count; struct pollfd* pfds; PollEventHandle** watchers; + // Estimate start time for a poll iteration. + grpc_core::Timestamp start = grpc_core::Timestamp::FromTimespecRoundDown( + gpr_now(GPR_CLOCK_MONOTONIC)); if (num_poll_handles_ + 2 <= inline_elements) { pfds = pollfd_space; watchers = watcher_space; @@ -686,8 +705,8 @@ absl::Status PollPoller::Work(grpc_core::Timestamp deadline, } mu_.Unlock(); - if (!use_phony_poll_ || timeout == 0) { - r = poll(pfds, pfd_count, timeout); + if (!use_phony_poll_ || timeout_ms == 0) { + r = poll(pfds, pfd_count, timeout_ms); } else { gpr_log(GPR_ERROR, "Attempted a blocking poll when declared non-polling."); @@ -696,9 +715,11 @@ absl::Status PollPoller::Work(grpc_core::Timestamp deadline, if (r <= 0) { if (r < 0 && errno != EINTR) { - // Save the error code - error = absl::Status(absl::StatusCode::kInternal, - absl::StrCat("poll: ", strerror(errno))); + // Abort fail here. + gpr_log(GPR_ERROR, + "(event_engine) PollPoller:%p encountered poll error: %s", this, + strerror(errno)); + GPR_ASSERT(false); } for (i = 1; i < pfd_count; i++) { @@ -709,25 +730,22 @@ absl::Status PollPoller::Work(grpc_core::Timestamp deadline, head->SetWatched(-1); // This fd was Watched with a watch mask > 0. if (watch_mask > 0 && r < 0) { - // This case implies the fd was polled (since watch_mask > 0 and the - // poll returned an error. Mark the fds as both readable and + // This case implies the fd was polled (since watch_mask > 0 and + // the poll returned an error. Mark the fds as both readable and // writable. - if (head->EndPollLocked(1, 1)) { - // Its safe to add to list of pending events because EndPollLocked - // returns a +ve number only when the handle is not orphaned. - // But an orphan might be initiated on the handle after this - // Work() method returns and before the next Work() method is - // invoked. To prevent the handle from being destroyed until the - // pending events are processed, take a Ref() of the handle. This - // Ref() will be Unref'ed in ExecutePendingActions. - head->Ref(); - pending_events.push_back(head); + if (EventEngine::Closure* closure = head->EndPollLocked(1, 1)) { + // Its safe to add to list of pending events because + // EndPollLocked returns a +ve number only when the handle is + // not orphaned. But an orphan might be initiated on the handle + // after this Work() method returns and before the next Work() + // method is invoked. + pending_events.push_back(closure); } } else { - // In this case, (1) watch_mask > 0 && r == 0 or (2) watch_mask == 0 - // and r < 0 or (3) watch_mask == 0 and r == 0. - // For case-1, no events are pending on the fd even though the fd - // was polled. For case-2 and 3, the fd was not polled + // In this case, (1) watch_mask > 0 && r == 0 or (2) watch_mask == + // 0 and r < 0 or (3) watch_mask == 0 and r == 0. For case-1, no + // events are pending on the fd even though the fd was polled. For + // case-2 and 3, the fd was not polled head->EndPollLocked(0, 0); } } else { @@ -759,17 +777,15 @@ absl::Status PollPoller::Work(grpc_core::Timestamp deadline, head->SetPollhup(true); } head->SetWatched(-1); - if (head->EndPollLocked(pfds[i].revents & kPollinCheck, - pfds[i].revents & kPolloutCheck)) { + if (EventEngine::Closure* closure = + head->EndPollLocked(pfds[i].revents & kPollinCheck, + pfds[i].revents & kPolloutCheck)) { // Its safe to add to list of pending events because EndPollLocked // returns a +ve number only when the handle is not orphaned. // But an orphan might be initiated on the handle after this // Work() method returns and before the next Work() method is - // invoked. To prevent the handle from being destroyed until the - // pending events are processed, take a Ref() of the handle. This - // Ref() will be Unref'ed in ExecutePendingActions. - head->Ref(); - pending_events.push_back(head); + // invoked. + pending_events.push_back(closure); } } lock.Release(); @@ -781,16 +797,25 @@ absl::Status PollPoller::Work(grpc_core::Timestamp deadline, if (pfds != pollfd_space) { gpr_free(pfds); } + + // End of poll iteration. Update how much time is remaining. + timeout_ms -= PollElapsedTimeToMillis(start); mu_.Lock(); if (std::exchange(was_kicked_, false) && std::exchange(was_kicked_ext_, false)) { // External kick. Need to break out. - error = absl::Status(absl::StatusCode::kInternal, "Kicked"); + was_kicked_ext = true; break; } } mu_.Unlock(); - return error; + if (pending_events.empty()) { + if (was_kicked_ext) { + return Poller::Kicked{}; + } + return Poller::DeadlineExceeded{}; + } + return pending_events; } void PollPoller::Shutdown() { @@ -814,6 +839,9 @@ PollPoller* GetPollPoller(Scheduler* scheduler, bool use_phony_poll) { namespace grpc_event_engine { namespace posix_engine { +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::Poller; + PollPoller::PollPoller(Scheduler* /* engine */) { GPR_ASSERT(false && "unimplemented"); } @@ -827,8 +855,7 @@ EventHandle* PollPoller::CreateHandle(int /*fd*/, absl::string_view /*name*/, GPR_ASSERT(false && "unimplemented"); } -absl::Status PollPoller::Work(grpc_core::Timestamp /*deadline*/, - std::vector& /*pending_events*/) { +Poller::WorkResult PollPoller::Work(EventEngine::Duration /*timeout*/) { GPR_ASSERT(false && "unimplemented"); } diff --git a/src/core/lib/event_engine/posix_engine/ev_poll_posix.h b/src/core/lib/event_engine/posix_engine/ev_poll_posix.h index 89f83b6b033..0b212707861 100644 --- a/src/core/lib/event_engine/posix_engine/ev_poll_posix.h +++ b/src/core/lib/event_engine/posix_engine/ev_poll_posix.h @@ -19,16 +19,17 @@ #include #include -#include +#include #include "absl/base/thread_annotations.h" -#include "absl/status/status.h" #include "absl/strings/string_view.h" #include "absl/synchronization/mutex.h" +#include + +#include "src/core/lib/event_engine/poller.h" #include "src/core/lib/event_engine/posix_engine/event_poller.h" #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h" -#include "src/core/lib/gprpp/time.h" namespace grpc_event_engine { namespace posix_engine { @@ -36,14 +37,15 @@ namespace posix_engine { class PollEventHandle; // Definition of poll based poller. -class PollPoller : public EventPoller { +class PollPoller : public PosixEventPoller { public: explicit PollPoller(Scheduler* scheduler); PollPoller(Scheduler* scheduler, bool use_phony_poll); EventHandle* CreateHandle(int fd, absl::string_view name, bool track_err) override; - absl::Status Work(grpc_core::Timestamp deadline, - std::vector& pending_events) override; + Poller::WorkResult Work( + grpc_event_engine::experimental::EventEngine::Duration timeout) override; + std::string Name() override { return "poll"; } void Kick() override; Scheduler* GetScheduler() { return scheduler_; } void Shutdown() override; @@ -62,10 +64,12 @@ class PollPoller : public EventPoller { void PollerHandlesListRemoveHandle(PollEventHandle* handle) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); friend class PollEventHandle; - struct HandlesList { + class HandlesList { + public: + explicit HandlesList(PollEventHandle* handle) : handle(handle) {} PollEventHandle* handle; - PollEventHandle* next; - PollEventHandle* prev; + PollEventHandle* next = nullptr; + PollEventHandle* prev = nullptr; }; absl::Mutex mu_; Scheduler* scheduler_; @@ -79,8 +83,9 @@ class PollPoller : public EventPoller { }; // Return an instance of a poll based poller tied to the specified scheduler. -// It use_phony_poll is true, it implies that the poller is declared non-polling -// and any attempt to schedule a blocking poll will result in a crash failure. +// It use_phony_poll is true, it implies that the poller is declared +// non-polling and any attempt to schedule a blocking poll will result in a +// crash failure. PollPoller* GetPollPoller(Scheduler* scheduler, bool use_phony_poll); } // namespace posix_engine diff --git a/src/core/lib/event_engine/posix_engine/event_poller.h b/src/core/lib/event_engine/posix_engine/event_poller.h index 8da16268942..3a742a60b38 100644 --- a/src/core/lib/event_engine/posix_engine/event_poller.h +++ b/src/core/lib/event_engine/posix_engine/event_poller.h @@ -16,15 +16,16 @@ #define GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_EVENT_POLLER_H #include -#include +#include +#include "absl/functional/any_invocable.h" #include "absl/status/status.h" #include "absl/strings/string_view.h" #include +#include "src/core/lib/event_engine/poller.h" #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" -#include "src/core/lib/gprpp/time.h" namespace grpc_event_engine { namespace posix_engine { @@ -32,6 +33,7 @@ namespace posix_engine { class Scheduler { public: virtual void Run(experimental::EventEngine::Closure* closure) = 0; + virtual void Run(absl::AnyInvocable) = 0; virtual ~Scheduler() = default; }; @@ -41,21 +43,34 @@ class EventHandle { // Delete the handle and optionally close the underlying file descriptor if // release_fd != nullptr. The on_done closure is scheduled to be invoked // after the operation is complete. After this operation, NotifyXXX and SetXXX - // operations cannot be performed on the handle. - virtual void OrphanHandle(IomgrEngineClosure* on_done, int* release_fd, + // operations cannot be performed on the handle. In general, this method + // should only be called after ShutdownHandle and after all existing NotifyXXX + // closures have run and there is no waiting NotifyXXX closure. + virtual void OrphanHandle(PosixEngineClosure* on_done, int* release_fd, absl::string_view reason) = 0; - // Shutdown a handle. After this operation, NotifyXXX and SetXXX operations - // cannot be performed. + // Shutdown a handle. If there is an attempt to call NotifyXXX operations + // after Shutdown handle, those closures will be run immediately with the + // absl::Status provided here being passed to the callbacks enclosed within + // the PosixEngineClosure object. virtual void ShutdownHandle(absl::Status why) = 0; // Schedule on_read to be invoked when the underlying file descriptor - // becomes readable. - virtual void NotifyOnRead(IomgrEngineClosure* on_read) = 0; + // becomes readable. When the on_read closure is run, it may check + // if the handle is shutdown using the IsHandleShutdown method and take + // appropriate actions (for instance it should not try to invoke another + // recursive NotifyOnRead if the handle is shutdown). + virtual void NotifyOnRead(PosixEngineClosure* on_read) = 0; // Schedule on_write to be invoked when the underlying file descriptor - // becomes writable. - virtual void NotifyOnWrite(IomgrEngineClosure* on_write) = 0; + // becomes writable. When the on_write closure is run, it may check + // if the handle is shutdown using the IsHandleShutdown method and take + // appropriate actions (for instance it should not try to invoke another + // recursive NotifyOnWrite if the handle is shutdown). + virtual void NotifyOnWrite(PosixEngineClosure* on_write) = 0; // Schedule on_error to be invoked when the underlying file descriptor - // encounters errors. - virtual void NotifyOnError(IomgrEngineClosure* on_error) = 0; + // encounters errors. When the on_error closure is run, it may check + // if the handle is shutdown using the IsHandleShutdown method and take + // appropriate actions (for instance it should not try to invoke another + // recursive NotifyOnError if the handle is shutdown). + virtual void NotifyOnError(PosixEngineClosure* on_error) = 0; // Force set a readable event on the underlying file descriptor. virtual void SetReadable() = 0; // Force set a writable event on the underlying file descriptor. @@ -64,17 +79,15 @@ class EventHandle { virtual void SetHasError() = 0; // Returns true if the handle has been shutdown. virtual bool IsHandleShutdown() = 0; - // Execute any pending actions that may have been set to a handle after the - // last invocation of Work(...) function. - virtual void ExecutePendingActions() = 0; virtual ~EventHandle() = default; }; -class EventPoller { +class PosixEventPoller : public grpc_event_engine::experimental::Poller { public: // Return an opaque handle to perform actions on the provided file descriptor. virtual EventHandle* CreateHandle(int fd, absl::string_view name, bool track_err) = 0; + virtual std::string Name() = 0; // Shuts down and deletes the poller. It is legal to call this function // only when no other poller method is in progress. For instance, it is // not safe to call this method, while a thread is blocked on Work(...). @@ -84,19 +97,7 @@ class EventPoller { // thread to return. // 3. Call Shutdown() on the poller. virtual void Shutdown() = 0; - // Poll all the underlying file descriptors for the specified period - // and return a vector containing a list of handles which have pending - // events. The calling thread should invoke ExecutePendingActions on each - // returned handle to take the necessary pending actions. Only one thread - // may invoke the Work function at any given point in time. The Work(...) - // method returns an absl Non-OK status if it was Kicked. - virtual absl::Status Work(grpc_core::Timestamp deadline, - std::vector& pending_events) = 0; - // Trigger the thread executing Work(..) to break out as soon as possible. - // This function is useful in tests. It may also be used to break a thread - // out of Work(...) before calling Shutdown() on the poller. - virtual void Kick() = 0; - virtual ~EventPoller() = default; + ~PosixEventPoller() override = default; }; } // namespace posix_engine diff --git a/src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc b/src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc index 8001e392b9e..a4a54868bbb 100644 --- a/src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc +++ b/src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc @@ -36,10 +36,10 @@ bool PollStrategyMatches(absl::string_view strategy, absl::string_view want) { } // namespace -EventPoller* GetDefaultPoller(Scheduler* scheduler) { +PosixEventPoller* GetDefaultPoller(Scheduler* scheduler) { grpc_core::UniquePtr poll_strategy = GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy); - EventPoller* poller = nullptr; + PosixEventPoller* poller = nullptr; auto strings = absl::StrSplit(poll_strategy.get(), ','); for (auto it = strings.begin(); it != strings.end() && poller == nullptr; it++) { diff --git a/src/core/lib/event_engine/posix_engine/event_poller_posix_default.h b/src/core/lib/event_engine/posix_engine/event_poller_posix_default.h index 5faad720195..59563bd76c9 100644 --- a/src/core/lib/event_engine/posix_engine/event_poller_posix_default.h +++ b/src/core/lib/event_engine/posix_engine/event_poller_posix_default.h @@ -20,12 +20,12 @@ namespace grpc_event_engine { namespace posix_engine { -class EventPoller; +class PosixEventPoller; class Scheduler; // Return an instance of an event poller which is tied to the specified // scheduler. -EventPoller* GetDefaultPoller(Scheduler* scheduler); +PosixEventPoller* GetDefaultPoller(Scheduler* scheduler); } // namespace posix_engine } // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/posix_engine/lockfree_event.cc b/src/core/lib/event_engine/posix_engine/lockfree_event.cc index b8b535549cc..a5a4d3821c6 100644 --- a/src/core/lib/event_engine/posix_engine/lockfree_event.cc +++ b/src/core/lib/event_engine/posix_engine/lockfree_event.cc @@ -90,7 +90,7 @@ void LockfreeEvent::DestroyEvent() { std::memory_order_relaxed)); } -void LockfreeEvent::NotifyOn(IomgrEngineClosure* closure) { +void LockfreeEvent::NotifyOn(PosixEngineClosure* closure) { // This load needs to be an acquire load because this can be a shutdown // error that we might need to reference. Adding acquire semantics makes // sure that the shutdown error has been initialized properly before us @@ -199,7 +199,7 @@ bool LockfreeEvent::SetShutdown(absl::Status shutdown_error) { if (state_.compare_exchange_strong(curr, new_state, std::memory_order_acq_rel, std::memory_order_relaxed)) { - auto closure = reinterpret_cast(curr); + auto closure = reinterpret_cast(curr); closure->SetStatus(shutdown_error); scheduler_->Run(closure); return true; @@ -248,7 +248,7 @@ void LockfreeEvent::SetReady() { // Full cas: acquire pairs with this cas' release in the event of a // spurious set_ready; release pairs with this or the acquire in // notify_on (or set_shutdown) - auto closure = reinterpret_cast(curr); + auto closure = reinterpret_cast(curr); closure->SetStatus(absl::OkStatus()); scheduler_->Run(closure); return; diff --git a/src/core/lib/event_engine/posix_engine/lockfree_event.h b/src/core/lib/event_engine/posix_engine/lockfree_event.h index 3924ad55aff..922a03fd811 100644 --- a/src/core/lib/event_engine/posix_engine/lockfree_event.h +++ b/src/core/lib/event_engine/posix_engine/lockfree_event.h @@ -51,7 +51,7 @@ class LockfreeEvent { // received, in which case the closure would be scheduled immediately. // If the shutdown state has already been set, then \a closure is scheduled // with the shutdown error. - void NotifyOn(IomgrEngineClosure* closure); + void NotifyOn(PosixEngineClosure* closure); // Sets the shutdown state. If a closure had been provided by NotifyOn and has // not yet been scheduled, it will be scheduled with \a shutdown_error. diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_closure.h b/src/core/lib/event_engine/posix_engine/posix_engine_closure.h index 683a9443b26..bf14cfeab04 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine_closure.h +++ b/src/core/lib/event_engine/posix_engine/posix_engine_closure.h @@ -30,36 +30,42 @@ namespace posix_engine { // argument - this is important for the tcp code to function correctly. We need // a custom closure type because the default EventEngine::Closure type doesn't // provide a way to pass a status when the callback is run. -class IomgrEngineClosure final +class PosixEngineClosure final : public grpc_event_engine::experimental::EventEngine::Closure { public: - IomgrEngineClosure() = default; - IomgrEngineClosure(absl::AnyInvocable cb, + PosixEngineClosure() = default; + PosixEngineClosure(absl::AnyInvocable cb, bool is_permanent) : cb_(std::move(cb)), is_permanent_(is_permanent), status_(absl::OkStatus()) {} - ~IomgrEngineClosure() final = default; + ~PosixEngineClosure() final = default; void SetStatus(absl::Status status) { status_ = status; } void Run() override { - cb_(std::exchange(status_, absl::OkStatus())); + // We need to read the is_permanent_ variable before executing the + // enclosed callback. This is because a permanent closure may delete this + // object within the callback itself and thus reading this variable after + // the callback execution is not safe. if (!is_permanent_) { + cb_(std::exchange(status_, absl::OkStatus())); delete this; + } else { + cb_(std::exchange(status_, absl::OkStatus())); } } - // This closure clean doesn't itself up after execution. It is expected to be - // cleaned up by the caller at the appropriate time. - static IomgrEngineClosure* ToPermanentClosure( + // This closure clean doesn't itself up after execution by default. The caller + // should take care if its lifetime. + static PosixEngineClosure* ToPermanentClosure( absl::AnyInvocable cb) { - return new IomgrEngineClosure(std::move(cb), true); + return new PosixEngineClosure(std::move(cb), true); } // This closure clean's itself up after execution. It is expected to be // used only in tests. - static IomgrEngineClosure* TestOnlyToClosure( + static PosixEngineClosure* TestOnlyToClosure( absl::AnyInvocable cb) { - return new IomgrEngineClosure(std::move(cb), false); + return new PosixEngineClosure(std::move(cb), false); } private: diff --git a/src/core/lib/event_engine/time_util.cc b/src/core/lib/event_engine/time_util.cc index 63a720132ca..0422f059895 100644 --- a/src/core/lib/event_engine/time_util.cc +++ b/src/core/lib/event_engine/time_util.cc @@ -22,7 +22,7 @@ namespace grpc_event_engine { namespace experimental { -int64_t Milliseconds(EventEngine::Duration d) { +size_t Milliseconds(EventEngine::Duration d) { return std::chrono::duration_cast(d).count(); } diff --git a/src/core/lib/event_engine/time_util.h b/src/core/lib/event_engine/time_util.h index add87bdb0e4..82b23ef3946 100644 --- a/src/core/lib/event_engine/time_util.h +++ b/src/core/lib/event_engine/time_util.h @@ -16,7 +16,7 @@ #include -#include +#include #include @@ -24,7 +24,7 @@ namespace grpc_event_engine { namespace experimental { // Convert a duration to milliseconds -int64_t Milliseconds(EventEngine::Duration d); +size_t Milliseconds(EventEngine::Duration d); } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/utils.cc b/src/core/lib/event_engine/utils.cc index 4d7d68ad024..9769faeeb8b 100644 --- a/src/core/lib/event_engine/utils.cc +++ b/src/core/lib/event_engine/utils.cc @@ -18,7 +18,6 @@ #include #include -#include #include "absl/strings/str_cat.h" @@ -41,9 +40,5 @@ grpc_core::Timestamp ToTimestamp(grpc_core::Timestamp now, grpc_core::Duration::Milliseconds(1); } -size_t Milliseconds(EventEngine::Duration d) { - return std::chrono::duration_cast(d).count(); -} - } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/utils.h b/src/core/lib/event_engine/utils.h index 9304e183fa1..768d16ceea9 100644 --- a/src/core/lib/event_engine/utils.h +++ b/src/core/lib/event_engine/utils.h @@ -16,8 +16,6 @@ #include -#include - #include #include @@ -32,8 +30,6 @@ std::string HandleToString(EventEngine::TaskHandle handle); grpc_core::Timestamp ToTimestamp(grpc_core::Timestamp now, EventEngine::Duration delta); -size_t Milliseconds(EventEngine::Duration d); - } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/windows/iocp.cc b/src/core/lib/event_engine/windows/iocp.cc index 4f543890a41..adbc57ae4b7 100644 --- a/src/core/lib/event_engine/windows/iocp.cc +++ b/src/core/lib/event_engine/windows/iocp.cc @@ -22,8 +22,8 @@ #include #include +#include "src/core/lib/event_engine/time_util.h" #include "src/core/lib/event_engine/trace.h" -#include "src/core/lib/event_engine/utils.h" #include "src/core/lib/event_engine/windows/iocp.h" #include "src/core/lib/event_engine/windows/win_socket.h" diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 8162829ff14..3fcb76ad987 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -458,6 +458,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/event_engine/slice.cc', 'src/core/lib/event_engine/slice_buffer.cc', 'src/core/lib/event_engine/thread_pool.cc', + 'src/core/lib/event_engine/time_util.cc', 'src/core/lib/event_engine/trace.cc', 'src/core/lib/event_engine/utils.cc', 'src/core/lib/event_engine/windows/iocp.cc', diff --git a/test/core/event_engine/posix/BUILD b/test/core/event_engine/posix/BUILD index 36b3ea8e657..c5871a05963 100644 --- a/test/core/event_engine/posix/BUILD +++ b/test/core/event_engine/posix/BUILD @@ -73,6 +73,8 @@ grpc_cc_test( uses_event_engine = True, uses_polling = True, deps = [ + "//:common_event_engine_closures", + "//:event_engine_poller", "//:posix_event_engine", "//:posix_event_engine_closure", "//:posix_event_engine_event_poller", diff --git a/test/core/event_engine/posix/event_poller_posix_test.cc b/test/core/event_engine/posix/event_poller_posix_test.cc index 511e4e945cb..300a5b84c1b 100644 --- a/test/core/event_engine/posix/event_poller_posix_test.cc +++ b/test/core/event_engine/posix/event_poller_posix_test.cc @@ -14,6 +14,13 @@ #include +#include "absl/functional/any_invocable.h" +#include "absl/time/time.h" +#include "absl/types/variant.h" + +#include "src/core/lib/event_engine/poller.h" +#include "src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h" +#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h" #include "src/core/lib/iomgr/port.h" // This test won't work except with posix sockets enabled @@ -42,19 +49,23 @@ #include #include +#include "src/core/lib/event_engine/common_closures.h" #include "src/core/lib/event_engine/posix_engine/event_poller.h" #include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h" #include "src/core/lib/event_engine/posix_engine/posix_engine.h" #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" +#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h" +#include "src/core/lib/event_engine/promise.h" +#include "src/core/lib/gprpp/dual_ref_counted.h" #include "src/core/lib/gprpp/global_config.h" #include "test/core/util/port.h" GPR_GLOBAL_CONFIG_DECLARE_STRING(grpc_poll_strategy); -using ::grpc_event_engine::posix_engine::EventPoller; +using ::grpc_event_engine::posix_engine::PosixEventPoller; static gpr_mu g_mu; -static EventPoller* g_event_poller = nullptr; +static PosixEventPoller* g_event_poller = nullptr; // buffer size used to send and receive data. // 1024 is the minimal value to set TCP send and receive buffer. @@ -69,6 +80,12 @@ static EventPoller* g_event_poller = nullptr; namespace grpc_event_engine { namespace posix_engine { +using ::grpc_event_engine::experimental::Poller; +using ::grpc_event_engine::experimental::Promise; +using ::grpc_event_engine::experimental::SelfDeletingClosure; +using ::grpc_event_engine::posix_engine::PosixEventPoller; +using namespace std::chrono_literals; + namespace { class TestScheduler : public Scheduler { @@ -78,6 +95,10 @@ class TestScheduler : public Scheduler { engine_->Run(closure); } + void Run(absl::AnyInvocable cb) override { + engine_->Run(std::move(cb)); + } + private: experimental::EventEngine* engine_; }; @@ -125,7 +146,7 @@ typedef struct { EventHandle* em_fd; /* listening fd */ ssize_t read_bytes_total; /* total number of received bytes */ int done; /* set to 1 when a server finishes serving */ - IomgrEngineClosure* listen_closure; + PosixEngineClosure* listen_closure; } server; void ServerInit(server* sv) { @@ -139,7 +160,7 @@ typedef struct { server* sv; /* not owned by a single session */ EventHandle* em_fd; /* fd to read upload bytes */ char read_buf[BUF_SIZE]; /* buffer to store upload bytes */ - IomgrEngineClosure* session_read_closure; + PosixEngineClosure* session_read_closure; } session; // Called when an upload session can be safely shutdown. @@ -185,7 +206,7 @@ void SessionReadCb(session* se, absl::Status status) { // in the polling thread, such that polling only happens after this // callback, and will catch read edge event if data is available again // before notify_on_read. - se->session_read_closure = IomgrEngineClosure::TestOnlyToClosure( + se->session_read_closure = PosixEngineClosure::TestOnlyToClosure( [se](absl::Status status) { SessionReadCb(se, status); }); se->em_fd->NotifyOnRead(se->session_read_closure); } @@ -220,7 +241,7 @@ void ListenCb(server* sv, absl::Status status) { reinterpret_cast(&ss), &slen); } while (fd < 0 && errno == EINTR); if (fd < 0 && errno == EAGAIN) { - sv->listen_closure = IomgrEngineClosure::TestOnlyToClosure( + sv->listen_closure = PosixEngineClosure::TestOnlyToClosure( [sv](absl::Status status) { ListenCb(sv, status); }); listen_em_fd->NotifyOnRead(sv->listen_closure); return; @@ -232,10 +253,10 @@ void ListenCb(server* sv, absl::Status status) { se = static_cast(gpr_malloc(sizeof(*se))); se->sv = sv; se->em_fd = g_event_poller->CreateHandle(fd, "listener", false); - se->session_read_closure = IomgrEngineClosure::TestOnlyToClosure( + se->session_read_closure = PosixEngineClosure::TestOnlyToClosure( [se](absl::Status status) { SessionReadCb(se, status); }); se->em_fd->NotifyOnRead(se->session_read_closure); - sv->listen_closure = IomgrEngineClosure::TestOnlyToClosure( + sv->listen_closure = PosixEngineClosure::TestOnlyToClosure( [sv](absl::Status status) { ListenCb(sv, status); }); listen_em_fd->NotifyOnRead(sv->listen_closure); } @@ -258,7 +279,7 @@ int ServerStart(server* sv) { EXPECT_EQ(listen(fd, MAX_NUM_FD), 0); sv->em_fd = g_event_poller->CreateHandle(fd, "server", false); - sv->listen_closure = IomgrEngineClosure::TestOnlyToClosure( + sv->listen_closure = PosixEngineClosure::TestOnlyToClosure( [sv](absl::Status status) { ListenCb(sv, status); }); sv->em_fd->NotifyOnRead(sv->listen_closure); return port; @@ -275,7 +296,7 @@ typedef struct { // notify_on_write to schedule another write. int client_write_cnt; int done; - IomgrEngineClosure* write_closure; + PosixEngineClosure* write_closure; } client; void ClientInit(client* cl) { @@ -312,7 +333,7 @@ void ClientSessionWrite(client* cl, absl::Status status) { EXPECT_EQ(errno, EAGAIN); gpr_mu_lock(&g_mu); if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) { - cl->write_closure = IomgrEngineClosure::TestOnlyToClosure( + cl->write_closure = PosixEngineClosure::TestOnlyToClosure( [cl](absl::Status status) { ClientSessionWrite(cl, status); }); cl->client_write_cnt++; gpr_mu_unlock(&g_mu); @@ -351,16 +372,18 @@ void ClientStart(client* cl, int port) { // Wait for the signal to shutdown client and server. void WaitAndShutdown(server* sv, client* cl) { - std::vector pending_events; + Poller::WorkResult result; gpr_mu_lock(&g_mu); while (!sv->done || !cl->done) { gpr_mu_unlock(&g_mu); - (void)g_event_poller->Work(grpc_core::Timestamp::InfFuture(), - pending_events); - for (auto it = pending_events.begin(); it != pending_events.end(); ++it) { - (*it)->ExecutePendingActions(); + result = g_event_poller->Work(24h); + if (absl::holds_alternative(result)) { + auto pending_events = absl::get(result); + for (auto it = pending_events.begin(); it != pending_events.end(); ++it) { + (*it)->Run(); + } + pending_events.clear(); } - pending_events.clear(); gpr_mu_lock(&g_mu); } gpr_mu_unlock(&g_mu); @@ -382,13 +405,20 @@ class EventPollerTest : public ::testing::TestWithParam { EXPECT_NE(scheduler_, nullptr); GPR_GLOBAL_CONFIG_SET(grpc_poll_strategy, GetParam().c_str()); g_event_poller = GetDefaultPoller(scheduler_.get()); + if (g_event_poller != nullptr) { + EXPECT_EQ(g_event_poller->Name(), GetParam()); + } } + void TearDown() override { if (g_event_poller != nullptr) { g_event_poller->Shutdown(); } } + public: + TestScheduler* Scheduler() { return scheduler_.get(); } + private: std::unique_ptr engine_; std::unique_ptr scheduler_; @@ -449,9 +479,9 @@ TEST_P(EventPollerTest, TestEventPollerHandleChange) { if (g_event_poller == nullptr) { return; } - IomgrEngineClosure* first_closure = IomgrEngineClosure::TestOnlyToClosure( + PosixEngineClosure* first_closure = PosixEngineClosure::TestOnlyToClosure( [a = &a](absl::Status status) { FirstReadCallback(a, status); }); - IomgrEngineClosure* second_closure = IomgrEngineClosure::TestOnlyToClosure( + PosixEngineClosure* second_closure = PosixEngineClosure::TestOnlyToClosure( [b = &b](absl::Status status) { SecondReadCallback(b, status); }); InitChangeData(&a); InitChangeData(&b); @@ -473,16 +503,19 @@ TEST_P(EventPollerTest, TestEventPollerHandleChange) { // And now wait for it to run. auto poller_work = [](FdChangeData* fdc) { - std::vector pending_events; + Poller::WorkResult result; gpr_mu_lock(&g_mu); while (fdc->cb_that_ran == nullptr) { gpr_mu_unlock(&g_mu); - (void)g_event_poller->Work(grpc_core::Timestamp::InfFuture(), - pending_events); - for (auto it = pending_events.begin(); it != pending_events.end(); ++it) { - (*it)->ExecutePendingActions(); + result = g_event_poller->Work(24h); + if (absl::holds_alternative(result)) { + auto pending_events = absl::get(result); + for (auto it = pending_events.begin(); it != pending_events.end(); + ++it) { + (*it)->Run(); + } + pending_events.clear(); } - pending_events.clear(); gpr_mu_lock(&g_mu); } }; @@ -513,7 +546,192 @@ TEST_P(EventPollerTest, TestEventPollerHandleChange) { close(sv[1]); } -INSTANTIATE_TEST_SUITE_P(EventPoller, EventPollerTest, +std::atomic kTotalActiveWakeupFdHandles{0}; + +// A helper class representing one file descriptor. Its implemented using +// a WakeupFd. It registers itself with the poller and waits to be notified +// of read events. Upon receiving a read event, (1) it processes it, +// (2) registes to be notified of the next read event and (3) schedules +// generation of the next read event. The Fd orphanes itself after processing +// a specified number of read events. +class WakeupFdHandle : public grpc_core::DualRefCounted { + public: + WakeupFdHandle(int num_wakeups, Scheduler* scheduler, + PosixEventPoller* poller) + : num_wakeups_(num_wakeups), + scheduler_(scheduler), + poller_(poller), + on_read_( + PosixEngineClosure::ToPermanentClosure([this](absl::Status status) { + EXPECT_TRUE(status.ok()); + status = ReadPipe(); + if (!status.ok()) { + // Rarely epoll1 poller may generate an EPOLLHUP - which is a + // spurious wakeup. Poll based poller may also likely generate a + // lot of spurious wakeups because of the level triggered nature + // of poll In such cases do not bother changing the number of + // wakeups received. + EXPECT_EQ(status, absl::InternalError("Spurious Wakeup")); + handle_->NotifyOnRead(on_read_); + return; + } + if (--num_wakeups_ == 0) { + // This should invoke the registered NotifyOnRead callbacks with + // the shutdown error. When those callbacks call Unref(), the + // WakeupFdHandle should call OrphanHandle in the Unref() method + // implementation. + handle_->ShutdownHandle(absl::InternalError("Shutting down")); + Unref(); + } else { + handle_->NotifyOnRead(on_read_); + Ref().release(); + // Schedule next wakeup to trigger the registered NotifyOnRead + // callback. + scheduler_->Run(SelfDeletingClosure::Create([this]() { + // Send next wakeup. + EXPECT_TRUE(wakeup_fd_->Wakeup().ok()); + Unref(); + })); + } + })) { + WeakRef().release(); + ++kTotalActiveWakeupFdHandles; + EXPECT_GT(num_wakeups_, 0); + EXPECT_NE(scheduler_, nullptr); + EXPECT_NE(poller_, nullptr); + wakeup_fd_ = *PipeWakeupFd::CreatePipeWakeupFd(); + handle_ = poller_->CreateHandle(wakeup_fd_->ReadFd(), "test", false); + EXPECT_NE(handle_, nullptr); + handle_->NotifyOnRead(on_read_); + // Send a wakeup initially. + EXPECT_TRUE(wakeup_fd_->Wakeup().ok()); + } + + ~WakeupFdHandle() override { delete on_read_; } + + void Orphan() override { + // Once the handle has orphaned itself, decrement + // kTotalActiveWakeupFdHandles. Once all handles have orphaned themselves, + // send a Kick to the poller. + handle_->OrphanHandle( + PosixEngineClosure::TestOnlyToClosure( + [poller = poller_, wakeupfd_handle = this](absl::Status status) { + EXPECT_TRUE(status.ok()); + if (--kTotalActiveWakeupFdHandles == 0) { + poller->Kick(); + } + wakeupfd_handle->WeakUnref(); + }), + nullptr, ""); + } + + private: + absl::Status ReadPipe() { + char buf[128]; + ssize_t r; + int total_bytes_read = 0; + for (;;) { + r = read(wakeup_fd_->ReadFd(), buf, sizeof(buf)); + if (r > 0) { + total_bytes_read += r; + continue; + } + if (r == 0) return absl::OkStatus(); + switch (errno) { + case EAGAIN: + return total_bytes_read > 0 ? absl::OkStatus() + : absl::InternalError("Spurious Wakeup"); + case EINTR: + continue; + default: + return absl::Status(absl::StatusCode::kInternal, + absl::StrCat("read: ", strerror(errno))); + } + } + } + int num_wakeups_; + Scheduler* scheduler_; + PosixEventPoller* poller_; + PosixEngineClosure* on_read_; + std::unique_ptr wakeup_fd_; + EventHandle* handle_; +}; + +// A helper class to create Fds and drive the polling for these Fds. It +// repeatedly calls the Work(..) method on the poller to get pet pending events, +// then schedules another parallel Work(..) instantiation and processes these +// pending events. This continues until all Fds have orphaned themselves. +class Worker : public grpc_core::DualRefCounted { + public: + Worker(Scheduler* scheduler, PosixEventPoller* poller, int num_handles, + int num_wakeups_per_handle) + : scheduler_(scheduler), poller_(poller) { + handles_.reserve(num_handles); + for (int i = 0; i < num_handles; i++) { + handles_.push_back( + new WakeupFdHandle(num_wakeups_per_handle, scheduler_, poller_)); + } + WeakRef().release(); + } + void Orphan() override { promise.Set(true); } + void Start() { + // Start executing Work(..). + scheduler_->Run([this]() { Work(); }); + } + + void Wait() { + EXPECT_TRUE(promise.WaitWithTimeout(absl::Seconds(60))); + WeakUnref(); + } + + private: + void Work() { + auto result = g_event_poller->Work(24h); + if (absl::holds_alternative(result)) { + // Schedule next work instantiation immediately and take a Ref for + // the next instantiation. + Ref().release(); + scheduler_->Run([this]() { Work(); }); + // Process pending events of current Work(..) instantiation. + auto pending_events = absl::get(result); + for (auto it = pending_events.begin(); it != pending_events.end(); ++it) { + (*it)->Run(); + } + pending_events.clear(); + // Corresponds to the Ref taken for the current instantiation. + Unref(); + } else { + // The poller got kicked. This can only happen when all the Fds have + // orphaned themselves. + EXPECT_TRUE(absl::holds_alternative(result)); + Unref(); + } + } + Scheduler* scheduler_; + PosixEventPoller* poller_; + Promise promise; + std::vector handles_; +}; + +// This test creates kNumHandles file descriptors and kNumWakeupsPerHandle +// separate read events to the created Fds. The Fds use the NotifyOnRead API to +// wait for a read event, upon receiving a read event they process it +// immediately and schedule the wait for the next read event. A new read event +// is also generated for each fd in parallel after the previous one is +// processed. +TEST_P(EventPollerTest, TestMultipleHandles) { + static constexpr int kNumHandles = 100; + static constexpr int kNumWakeupsPerHandle = 100; + if (g_event_poller == nullptr) { + return; + } + Worker* worker = new Worker(Scheduler(), g_event_poller, kNumHandles, + kNumWakeupsPerHandle); + worker->Start(); + worker->Wait(); +} + +INSTANTIATE_TEST_SUITE_P(PosixEventPoller, EventPollerTest, ::testing::ValuesIn({std::string("epoll1"), std::string("poll")}), &TestScenarioName); diff --git a/test/core/event_engine/posix/lock_free_event_test.cc b/test/core/event_engine/posix/lock_free_event_test.cc index f142649b6f4..b07eb87af00 100644 --- a/test/core/event_engine/posix/lock_free_event_test.cc +++ b/test/core/event_engine/posix/lock_free_event_test.cc @@ -38,6 +38,10 @@ class TestScheduler : public Scheduler { engine_->Run(closure); } + void Run(absl::AnyInvocable cb) override { + engine_->Run(std::move(cb)); + } + private: grpc_event_engine::experimental::EventEngine* engine_; }; @@ -57,7 +61,7 @@ TEST(LockFreeEventTest, BasicTest) { grpc_core::MutexLock lock(&mu); // Set NotifyOn first and then SetReady event.NotifyOn( - IomgrEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) { + PosixEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) { grpc_core::MutexLock lock(&mu); EXPECT_TRUE(status.ok()); cv.Signal(); @@ -68,7 +72,7 @@ TEST(LockFreeEventTest, BasicTest) { // SetReady first first and then call NotifyOn event.SetReady(); event.NotifyOn( - IomgrEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) { + PosixEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) { grpc_core::MutexLock lock(&mu); EXPECT_TRUE(status.ok()); cv.Signal(); @@ -77,7 +81,7 @@ TEST(LockFreeEventTest, BasicTest) { // Set NotifyOn and then call SetShutdown event.NotifyOn( - IomgrEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) { + PosixEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) { grpc_core::MutexLock lock(&mu); EXPECT_FALSE(status.ok()); EXPECT_EQ(status, absl::CancelledError("Shutdown")); @@ -110,7 +114,7 @@ TEST(LockFreeEventTest, MultiThreadedTest) { } active++; if (thread_id == 0) { - event.NotifyOn(IomgrEngineClosure::TestOnlyToClosure( + event.NotifyOn(PosixEngineClosure::TestOnlyToClosure( [&mu, &cv, &signalled](absl::Status status) { grpc_core::MutexLock lock(&mu); EXPECT_TRUE(status.ok()); diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index cdc7f4551f6..bbe32e5bdd9 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1972,6 +1972,8 @@ src/core/lib/event_engine/slice_buffer.cc \ src/core/lib/event_engine/socket_notifier.h \ src/core/lib/event_engine/thread_pool.cc \ src/core/lib/event_engine/thread_pool.h \ +src/core/lib/event_engine/time_util.cc \ +src/core/lib/event_engine/time_util.h \ src/core/lib/event_engine/trace.cc \ src/core/lib/event_engine/trace.h \ src/core/lib/event_engine/utils.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index cb15c9afefd..8de0de53ee3 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1762,6 +1762,8 @@ src/core/lib/event_engine/slice_buffer.cc \ src/core/lib/event_engine/socket_notifier.h \ src/core/lib/event_engine/thread_pool.cc \ src/core/lib/event_engine/thread_pool.h \ +src/core/lib/event_engine/time_util.cc \ +src/core/lib/event_engine/time_util.h \ src/core/lib/event_engine/trace.cc \ src/core/lib/event_engine/trace.h \ src/core/lib/event_engine/utils.cc \