From 5171aa70da93c01197b0f81a260e6201660bdfba Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Mon, 25 Jul 2022 21:42:42 +0000 Subject: [PATCH] Forking the posix poll based poller for iomgr event engine (#30230) * start * Forking the posix epoll1 poller for event engine * re-generate projects * fix test * fix sanity checks * fix * update fix * more build issue fixes * even more fixes * add no_windows tag * re-generate projects * update comment * cleanup * Automated change: Fix sanity tests * review comments * fix tsan issue * re-generate projects * cleanup * fix missing build dep * fix mac build issue * forking poll based poller for iomgr event engine * minor fix in test * update test to run both pollers * re-generate projects * simplifying build graph * re-generate projects * cleanup * fix macOS build issues * review comments * re-generate projects * add missing generated file * review comments * fix sanity checks * rename one more build target * fix build-dep issues * fix sanity checks * retry fix sanity checks * format * review comments * fix * fix * review comments * fix sanity checks * max build and run issues * fix review comments * Automated change: Fix sanity tests Co-authored-by: Vignesh2208 --- BUILD | 32 +- CMakeLists.txt | 1 + build_autogenerated.yaml | 2 + .../iomgr_engine/ev_epoll1_linux.cc | 30 +- .../iomgr_engine/ev_epoll1_linux.h | 5 + .../iomgr_engine/ev_poll_posix.cc | 858 ++++++++++++++++++ .../event_engine/iomgr_engine/ev_poll_posix.h | 89 ++ .../event_poller_posix_default.cc | 31 +- .../event_poller_posix_test.cc | 78 +- 9 files changed, 1081 insertions(+), 45 deletions(-) create mode 100644 src/core/lib/event_engine/iomgr_engine/ev_poll_posix.cc create mode 100644 src/core/lib/event_engine/iomgr_engine/ev_poll_posix.h diff --git a/BUILD b/BUILD index dec0de7dd95..2eafa5b92fe 100644 --- a/BUILD +++ b/BUILD @@ -2442,6 +2442,34 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "iomgr_ee_poller_posix_poll", + srcs = [ + "src/core/lib/event_engine/iomgr_engine/ev_poll_posix.cc", + ], + hdrs = [ + "src/core/lib/event_engine/iomgr_engine/ev_poll_posix.h", + ], + external_deps = [ + "absl/base:core_headers", + "absl/status", + "absl/status:statusor", + "absl/strings", + "absl/synchronization", + "absl/utility", + ], + deps = [ + "gpr_base", + "gpr_codegen", + "iomgr_ee_closure", + "iomgr_ee_event_poller", + "iomgr_ee_wakeup_fd_posix", + "iomgr_ee_wakeup_fd_posix_default", + "iomgr_port", + "time", + ], +) + grpc_cc_library( name = "iomgr_ee_poller_posix_default", srcs = [ @@ -2450,10 +2478,12 @@ grpc_cc_library( hdrs = [ "src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.h", ], + external_deps = ["absl/strings"], deps = [ - "gpr_platform", + "gpr_base", "iomgr_ee_event_poller", "iomgr_ee_poller_posix_epoll1", + "iomgr_ee_poller_posix_poll", ], ) diff --git a/CMakeLists.txt b/CMakeLists.txt index 6f593ffa7ff..3682658b5d8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9287,6 +9287,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_executable(event_poller_posix_test src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.cc + src/core/lib/event_engine/iomgr_engine/ev_poll_posix.cc src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.cc src/core/lib/event_engine/iomgr_engine/lockfree_event.cc src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.cc diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index d1308d08114..512bc3ef040 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -5197,6 +5197,7 @@ targets: language: c++ headers: - src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.h + - src/core/lib/event_engine/iomgr_engine/ev_poll_posix.h - src/core/lib/event_engine/iomgr_engine/event_poller.h - src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.h - src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h @@ -5207,6 +5208,7 @@ targets: - src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.h src: - src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.cc + - src/core/lib/event_engine/iomgr_engine/ev_poll_posix.cc - src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.cc - src/core/lib/event_engine/iomgr_engine/lockfree_event.cc - src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.cc diff --git a/src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.cc b/src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.cc index dc0419fd341..ce0ac981a48 100644 --- a/src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.cc +++ b/src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.cc @@ -61,15 +61,6 @@ using ::grpc_event_engine::iomgr_engine::WakeupFd; namespace grpc_event_engine { namespace iomgr_engine { -namespace { -// Only used when GRPC_ENABLE_FORK_SUPPORT=1 -struct ForkFdList { - Epoll1EventHandle* handle; - Epoll1EventHandle* next; - Epoll1EventHandle* prev; -}; -} // namespace - class Epoll1EventHandle : public EventHandle { public: Epoll1EventHandle(int fd, Epoll1Poller* poller) @@ -125,7 +116,7 @@ class Epoll1EventHandle : public EventHandle { LockfreeEvent* ReadClosure() { return read_closure_.get(); } LockfreeEvent* WriteClosure() { return write_closure_.get(); } LockfreeEvent* ErrorClosure() { return error_closure_.get(); } - ForkFdList& ForkFdListPos() { return list_; } + Epoll1Poller::HandlesList& ForkFdListPos() { return list_; } private: void HandleShutdownInternal(absl::Status why, bool releasing_fd); @@ -134,7 +125,7 @@ class Epoll1EventHandle : public EventHandle { absl::Mutex mu_; int fd_; int pending_actions_; - ForkFdList list_; + Epoll1Poller::HandlesList list_; Epoll1Poller* poller_; std::unique_ptr read_closure_; std::unique_ptr write_closure_; @@ -143,9 +134,6 @@ class Epoll1EventHandle : public EventHandle { namespace { -bool kEpoll1PollerSupported = false; -gpr_once g_init_epoll1_poller = GPR_ONCE_INIT; - int EpollCreateAndCloexec() { #ifdef GRPC_LINUX_EPOLL_CREATE1 int fd = epoll_create1(EPOLL_CLOEXEC); @@ -232,7 +220,7 @@ int PollDeadlineToMillisTimeout(grpc_core::Timestamp millis) { } } -void InitEpoll1PollerLinux(); +bool InitEpoll1PollerLinux(); // Called by the child process's post-fork handler to close open fds, // including the global epoll fd of each poller. This allows gRPC to shutdown in @@ -263,22 +251,20 @@ void ResetEventManagerOnFork() { // It is possible that GLIBC has epoll but the underlying kernel doesn't. // Create epoll_fd to make sure epoll support is available -void InitEpoll1PollerLinux() { +bool InitEpoll1PollerLinux() { if (!grpc_event_engine::iomgr_engine::SupportsWakeupFd()) { - kEpoll1PollerSupported = false; - return; + return false; } int fd = EpollCreateAndCloexec(); if (fd <= 0) { - kEpoll1PollerSupported = false; - return; + return false; } - kEpoll1PollerSupported = true; if (grpc_core::Fork::Enabled()) { gpr_mu_init(&fork_fd_list_mu); grpc_core::Fork::SetResetChildPollingEngineFunc(ResetEventManagerOnFork); } close(fd); + return true; } } // namespace @@ -532,7 +518,7 @@ void Epoll1Poller::Kick() { } Epoll1Poller* GetEpoll1Poller(Scheduler* scheduler) { - gpr_once_init(&g_init_epoll1_poller, []() { InitEpoll1PollerLinux(); }); + static bool kEpoll1PollerSupported = InitEpoll1PollerLinux(); if (kEpoll1PollerSupported) { return new Epoll1Poller(scheduler); } diff --git a/src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.h b/src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.h index 4424b834d1d..8fa93f5b6f6 100644 --- a/src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.h +++ b/src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.h @@ -58,6 +58,11 @@ class Epoll1Poller : public EventPoller { absl::Status ProcessEpollEvents(int max_epoll_events_to_handle, std::vector& pending_events); absl::Status DoEpollWait(grpc_core::Timestamp deadline); + struct HandlesList { + Epoll1EventHandle* handle; + Epoll1EventHandle* next; + Epoll1EventHandle* prev; + }; friend class Epoll1EventHandle; #ifdef GRPC_LINUX_EPOLL struct EpollSet { diff --git a/src/core/lib/event_engine/iomgr_engine/ev_poll_posix.cc b/src/core/lib/event_engine/iomgr_engine/ev_poll_posix.cc new file mode 100644 index 00000000000..43497f71d30 --- /dev/null +++ b/src/core/lib/event_engine/iomgr_engine/ev_poll_posix.cc @@ -0,0 +1,858 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/lib/event_engine/iomgr_engine/ev_poll_posix.h" + +#include +#include + +#include +#include +#include +#include + +#include "absl/status/statusor.h" +#include "absl/strings/str_cat.h" +#include "absl/utility/utility.h" + +#include +#include +#include +#include + +#include "src/core/lib/event_engine/iomgr_engine/event_poller.h" +#include "src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h" +#include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_POSIX_SOCKET_EV_POLL + +#include +#include +#include +#include +#include +#include + +#include + +#include "absl/synchronization/mutex.h" + +#include + +#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h" +#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.h" +#include "src/core/lib/gprpp/fork.h" +#include "src/core/lib/gprpp/global_config.h" +#include "src/core/lib/gprpp/time.h" + +GPR_GLOBAL_CONFIG_DECLARE_STRING(grpc_poll_strategy); + +using ::grpc_event_engine::iomgr_engine::WakeupFd; + +static const intptr_t kClosureNotReady = 0; +static const intptr_t kClosureReady = 1; +static const int kPollinCheck = POLLIN | POLLHUP | POLLERR; +static const int kPolloutCheck = POLLOUT | POLLHUP | POLLERR; + +namespace grpc_event_engine { +namespace iomgr_engine { + +class PollEventHandle : public EventHandle { + public: + PollEventHandle(int fd, PollPoller* poller) + : fd_(fd), + pending_actions_(0), + fork_fd_list_(), + poller_handles_list_(), + poller_(poller), + scheduler_(poller->GetScheduler()), + is_orphaned_(false), + is_shutdown_(false), + closed_(false), + released_(false), + pollhup_(false), + watch_mask_(-1), + shutdown_error_(absl::OkStatus()), + on_done_(nullptr), + read_closure_(reinterpret_cast(kClosureNotReady)), + write_closure_( + reinterpret_cast(kClosureNotReady)) { + poller_->Ref(); + absl::MutexLock lock(&poller_->mu_); + poller_->PollerHandlesListAddHandle(this); + } + PollPoller* Poller() { return poller_; } + void SetPendingActions(bool pending_read, bool pending_write) { + pending_actions_ |= pending_read; + if (pending_write) { + pending_actions_ |= (1 << 2); + } + } + void ForceRemoveHandleFromPoller() { + absl::MutexLock lock(&poller_->mu_); + poller_->PollerHandlesListRemoveHandle(this); + } + int WrappedFd() override { return fd_; } + bool IsOrphaned() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + return is_orphaned_; + } + void CloseFd() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + if (!released_ && !closed_) { + closed_ = true; + close(fd_); + } + } + bool IsPollhup() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return pollhup_; } + void SetPollhup(bool pollhup) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + pollhup_ = pollhup; + } + bool IsWatched(int& watch_mask) const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + watch_mask = watch_mask_; + return watch_mask_ != -1; + } + bool IsWatched() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + return watch_mask_ != -1; + } + void SetWatched(int watch_mask) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + watch_mask_ = watch_mask; + } + void OrphanHandle(IomgrEngineClosure* on_done, int* release_fd, + absl::string_view reason) override; + void ShutdownHandle(absl::Status why) override; + void NotifyOnRead(IomgrEngineClosure* on_read) override; + void NotifyOnWrite(IomgrEngineClosure* on_write) override; + void NotifyOnError(IomgrEngineClosure* on_error) override; + void SetReadable() override; + void SetWritable() override; + void SetHasError() override; + bool IsHandleShutdown() override { + absl::MutexLock lock(&mu_); + return is_shutdown_; + }; + void ExecutePendingActions() override { + int kick = 0; + { + absl::MutexLock lock(&mu_); + if ((pending_actions_ & 1UL)) { + if (SetReadyLocked(&read_closure_)) { + kick = 1; + } + } + if (((pending_actions_ >> 2) & 1UL)) { + if (SetReadyLocked(&write_closure_)) { + kick = 1; + } + } + pending_actions_ = 0; + } + if (kick) { + // SetReadyLocked immediately scheduled some closure. It would have set + // the closure state to NOT_READY. We need to wakeup the Work(...) thread + // to start polling on this fd. If this call is not made, it is possible + // that the poller will reach a state where all the fds under the + // poller's control are not polled for POLLIN/POLLOUT events thus leading + // to an indefinitely blocked Work(..) method. + poller_->KickExternal(false); + } + Unref(); + } + void Ref() { ref_count_.fetch_add(1, std::memory_order_relaxed); } + void Unref() { + if (ref_count_.fetch_sub(1, std::memory_order_acq_rel) == 1) { + if (on_done_ != nullptr) { + scheduler_->Run(on_done_); + } + poller_->Unref(); + delete this; + } + } + ~PollEventHandle() override = default; + absl::Mutex* mu() ABSL_LOCK_RETURNED(mu_) { return &mu_; } + PollPoller::HandlesList& ForkFdListPos() { return fork_fd_list_; } + PollPoller::HandlesList& PollerHandlesListPos() { + return poller_handles_list_; + } + uint32_t BeginPollLocked(uint32_t read_mask, uint32_t write_mask) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + bool EndPollLocked(int got_read, int got_write) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + + private: + int SetReadyLocked(IomgrEngineClosure** st); + int NotifyOnLocked(IomgrEngineClosure** st, IomgrEngineClosure* closure); + // See Epoll1Poller::ShutdownHandle for explanation on why a mutex is + // required. + absl::Mutex mu_; + std::atomic ref_count_{1}; + int fd_; + int pending_actions_; + PollPoller::HandlesList fork_fd_list_; + PollPoller::HandlesList poller_handles_list_; + PollPoller* poller_; + Scheduler* scheduler_; + bool is_orphaned_; + bool is_shutdown_; + bool closed_; + bool released_; + bool pollhup_; + int watch_mask_; + absl::Status shutdown_error_; + IomgrEngineClosure* on_done_; + IomgrEngineClosure* read_closure_; + IomgrEngineClosure* write_closure_; +}; + +namespace { + +// Only used when GRPC_ENABLE_FORK_SUPPORT=1 +std::list fork_poller_list; + +// Only used when GRPC_ENABLE_FORK_SUPPORT=1 +PollEventHandle* fork_fd_list_head = nullptr; +gpr_mu fork_fd_list_mu; + +void ForkFdListAddHandle(PollEventHandle* handle) { + if (grpc_core::Fork::Enabled()) { + gpr_mu_lock(&fork_fd_list_mu); + handle->ForkFdListPos().next = fork_fd_list_head; + handle->ForkFdListPos().prev = nullptr; + if (fork_fd_list_head != nullptr) { + fork_fd_list_head->ForkFdListPos().prev = handle; + } + fork_fd_list_head = handle; + gpr_mu_unlock(&fork_fd_list_mu); + } +} + +void ForkFdListRemoveHandle(PollEventHandle* handle) { + if (grpc_core::Fork::Enabled()) { + gpr_mu_lock(&fork_fd_list_mu); + if (fork_fd_list_head == handle) { + fork_fd_list_head = handle->ForkFdListPos().next; + } + if (handle->ForkFdListPos().prev != nullptr) { + handle->ForkFdListPos().prev->ForkFdListPos().next = + handle->ForkFdListPos().next; + } + if (handle->ForkFdListPos().next != nullptr) { + handle->ForkFdListPos().next->ForkFdListPos().prev = + handle->ForkFdListPos().prev; + } + gpr_mu_unlock(&fork_fd_list_mu); + } +} + +void ForkPollerListAddPoller(PollPoller* poller) { + if (grpc_core::Fork::Enabled()) { + gpr_mu_lock(&fork_fd_list_mu); + fork_poller_list.push_back(poller); + gpr_mu_unlock(&fork_fd_list_mu); + } +} + +void ForkPollerListRemovePoller(PollPoller* poller) { + if (grpc_core::Fork::Enabled()) { + gpr_mu_lock(&fork_fd_list_mu); + fork_poller_list.remove(poller); + gpr_mu_unlock(&fork_fd_list_mu); + } +} + +int PollDeadlineToMillisTimeout(grpc_core::Timestamp millis) { + if (millis == grpc_core::Timestamp::InfFuture()) return -1; + grpc_core::Timestamp now = + grpc_core::Timestamp::FromTimespecRoundDown(gpr_now(GPR_CLOCK_MONOTONIC)); + int64_t delta = (millis - now).millis(); + if (delta > INT_MAX) { + return INT_MAX; + } else if (delta < 0) { + return 0; + } else { + return static_cast(delta); + } +} + +bool InitPollPollerPosix(); + +// Called by the child process's post-fork handler to close open fds, +// including the global epoll fd of each poller. This allows gRPC to shutdown in +// the child process without interfering with connections or RPCs ongoing in the +// parent. +void ResetEventManagerOnFork() { + // Delete all pending Epoll1EventHandles. + gpr_mu_lock(&fork_fd_list_mu); + while (fork_fd_list_head != nullptr) { + close(fork_fd_list_head->WrappedFd()); + PollEventHandle* next = fork_fd_list_head->ForkFdListPos().next; + fork_fd_list_head->ForceRemoveHandleFromPoller(); + delete fork_fd_list_head; + fork_fd_list_head = next; + } + // Delete all registered pollers. + while (!fork_poller_list.empty()) { + PollPoller* poller = fork_poller_list.front(); + fork_poller_list.pop_front(); + delete poller; + } + gpr_mu_unlock(&fork_fd_list_mu); + if (grpc_core::Fork::Enabled()) { + gpr_mu_destroy(&fork_fd_list_mu); + grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr); + } + InitPollPollerPosix(); +} + +// It is possible that GLIBC has epoll but the underlying kernel doesn't. +// Create epoll_fd to make sure epoll support is available +bool InitPollPollerPosix() { + if (!grpc_event_engine::iomgr_engine::SupportsWakeupFd()) { + return false; + } + if (grpc_core::Fork::Enabled()) { + gpr_mu_init(&fork_fd_list_mu); + grpc_core::Fork::SetResetChildPollingEngineFunc(ResetEventManagerOnFork); + } + return true; +} + +} // namespace + +EventHandle* PollPoller::CreateHandle(int fd, absl::string_view /*name*/, + bool track_err) { + // Avoid unused-parameter warning for debug-only parameter + (void)track_err; + GPR_DEBUG_ASSERT(track_err == false); + PollEventHandle* handle = new PollEventHandle(fd, this); + ForkFdListAddHandle(handle); + return handle; +} + +void PollEventHandle::OrphanHandle(IomgrEngineClosure* on_done, int* release_fd, + absl::string_view /* reason */) { + ForkFdListRemoveHandle(this); + ForceRemoveHandleFromPoller(); + { + absl::ReleasableMutexLock lock(&mu_); + on_done_ = on_done; + released_ = release_fd != nullptr; + if (release_fd != nullptr) { + *release_fd = fd_; + } + GPR_ASSERT(!is_orphaned_); + is_orphaned_ = true; + // Perform shutdown operations if not already done so. + if (!is_shutdown_) { + is_shutdown_ = true; + shutdown_error_ = + absl::Status(absl::StatusCode::kInternal, "FD Shutdown"); + // signal read/write closed to OS so that future operations fail. + shutdown(fd_, SHUT_RDWR); + SetReadyLocked(&read_closure_); + SetReadyLocked(&write_closure_); + } + if (!IsWatched()) { + CloseFd(); + } else { + // It is watched i.e we cannot take action wihout breaking from the + // blocking poll. Mark it as Unwatched and kick the thread executing + // Work(...). That thread should proceed with the cleanup. + SetWatched(-1); + lock.Release(); + poller_->KickExternal(false); + } + } + Unref(); +} + +int PollEventHandle::NotifyOnLocked(IomgrEngineClosure** st, + IomgrEngineClosure* closure) { + if (is_shutdown_ || pollhup_) { + closure->SetStatus( + absl::Status(absl::StatusCode::kInternal, "FD Shutdown")); + scheduler_->Run(closure); + } else if (*st == reinterpret_cast(kClosureNotReady)) { + // not ready ==> switch to a waiting state by setting the closure + *st = closure; + } else if (*st == reinterpret_cast(kClosureReady)) { + // already ready ==> queue the closure to run immediately + *st = reinterpret_cast(kClosureNotReady); + closure->SetStatus(shutdown_error_); + scheduler_->Run(closure); + return 1; + } else { + /* upcallptr was set to a different closure. This is an error! */ + gpr_log(GPR_ERROR, + "User called a notify_on function with a previous callback still " + "pending"); + abort(); + } + return 0; +} + +// returns 1 if state becomes not ready +int PollEventHandle::SetReadyLocked(IomgrEngineClosure** st) { + if (*st == reinterpret_cast(kClosureReady)) { + // duplicate ready ==> ignore + return 0; + } else if (*st == reinterpret_cast(kClosureNotReady)) { + // not ready, and not waiting ==> flag ready + *st = reinterpret_cast(kClosureReady); + return 0; + } else { + // waiting ==> queue closure + IomgrEngineClosure* closure = *st; + *st = reinterpret_cast(kClosureNotReady); + closure->SetStatus(shutdown_error_); + scheduler_->Run(closure); + return 1; + } +} + +void PollEventHandle::ShutdownHandle(absl::Status why) { + // We need to take a Ref here because SetReadyLocked may trigger execution of + // a closure which calls OrphanHandle or poller->Shutdown() prematurely. + Ref(); + { + absl::MutexLock lock(&mu_); + // only shutdown once + if (!is_shutdown_) { + is_shutdown_ = true; + shutdown_error_ = why; + // signal read/write closed to OS so that future operations fail. + shutdown(fd_, SHUT_RDWR); + SetReadyLocked(&read_closure_); + SetReadyLocked(&write_closure_); + } + } + // For the Ref() taken at the begining of this function. + Unref(); +} + +void PollEventHandle::NotifyOnRead(IomgrEngineClosure* on_read) { + // We need to take a Ref here because NotifyOnLocked may trigger execution of + // a closure which calls OrphanHandle that may delete this object or call + // poller->Shutdown() prematurely. + Ref(); + { + absl::ReleasableMutexLock lock(&mu_); + if (NotifyOnLocked(&read_closure_, on_read)) { + lock.Release(); + // NotifyOnLocked immediately scheduled some closure. It would have set + // the closure state to NOT_READY. We need to wakeup the Work(...) thread + // to start polling on this fd. If this call is not made, it is possible + // that the poller will reach a state where all the fds under the + // poller's control are not polled for POLLIN/POLLOUT events thus leading + // to an indefinitely blocked Work(..) method. + poller_->KickExternal(false); + } + } + // For the Ref() taken at the begining of this function. + Unref(); +} + +void PollEventHandle::NotifyOnWrite(IomgrEngineClosure* on_write) { + // We need to take a Ref here because NotifyOnLocked may trigger execution of + // a closure which calls OrphanHandle that may delete this object or call + // poller->Shutdown() prematurely. + Ref(); + { + absl::ReleasableMutexLock lock(&mu_); + if (NotifyOnLocked(&write_closure_, on_write)) { + lock.Release(); + // NotifyOnLocked immediately scheduled some closure. It would have set + // the closure state to NOT_READY. We need to wakeup the Work(...) thread + // to start polling on this fd. If this call is not made, it is possible + // that the poller will reach a state where all the fds under the + // poller's control are not polled for POLLIN/POLLOUT events thus leading + // to an indefinitely blocked Work(..) method. + poller_->KickExternal(false); + } + } + // For the Ref() taken at the begining of this function. + Unref(); +} + +void PollEventHandle::NotifyOnError(IomgrEngineClosure* on_error) { + on_error->SetStatus( + absl::Status(absl::StatusCode::kCancelled, + "Polling engine does not support tracking errors")); + scheduler_->Run(on_error); +} + +void PollEventHandle::SetReadable() { + Ref(); + { + absl::MutexLock lock(&mu_); + SetReadyLocked(&read_closure_); + } + Unref(); +} + +void PollEventHandle::SetWritable() { + Ref(); + { + absl::MutexLock lock(&mu_); + SetReadyLocked(&write_closure_); + } + Unref(); +} + +void PollEventHandle::SetHasError() {} + +uint32_t PollEventHandle::BeginPollLocked(uint32_t read_mask, + uint32_t write_mask) { + uint32_t mask = 0; + bool read_ready = (pending_actions_ & 1UL); + bool write_ready = ((pending_actions_ >> 2) & 1UL); + Ref(); + // if we are shutdown, then no need to poll this fd. Set watch_mask to 0. + if (is_shutdown_) { + SetWatched(0); + return 0; + } + // if there is nobody polling for read, but we need to, then start doing so + if (read_mask && !read_ready && + read_closure_ != reinterpret_cast(kClosureReady)) { + mask |= read_mask; + } + // if there is nobody polling for write, but we need to, then start doing so + if (write_mask && !write_ready && + write_closure_ != reinterpret_cast(kClosureReady)) { + mask |= write_mask; + } + SetWatched(mask); + return mask; +} + +bool PollEventHandle::EndPollLocked(int got_read, int got_write) { + SetPendingActions(got_read, got_write); + if (is_orphaned_ && !IsWatched()) { + CloseFd(); + } + return !is_orphaned_ && (got_read || got_write); +} + +void PollPoller::KickExternal(bool ext) { + absl::MutexLock lock(&mu_); + if (was_kicked_) { + if (ext) { + was_kicked_ext_ = true; + } + return; + } + was_kicked_ = true; + was_kicked_ext_ = ext; + GPR_ASSERT(wakeup_fd_->Wakeup().ok()); +} + +void PollPoller::Kick() { KickExternal(true); } + +void PollPoller::PollerHandlesListAddHandle(PollEventHandle* handle) { + handle->PollerHandlesListPos().next = poll_handles_list_head_; + handle->PollerHandlesListPos().prev = nullptr; + if (poll_handles_list_head_ != nullptr) { + poll_handles_list_head_->PollerHandlesListPos().prev = handle; + } + poll_handles_list_head_ = handle; + ++num_poll_handles_; +} + +void PollPoller::PollerHandlesListRemoveHandle(PollEventHandle* handle) { + if (poll_handles_list_head_ == handle) { + poll_handles_list_head_ = handle->ForkFdListPos().next; + } + if (handle->PollerHandlesListPos().prev != nullptr) { + handle->PollerHandlesListPos().prev->PollerHandlesListPos().next = + handle->PollerHandlesListPos().next; + } + if (handle->PollerHandlesListPos().next != nullptr) { + handle->PollerHandlesListPos().next->PollerHandlesListPos().prev = + handle->PollerHandlesListPos().prev; + } + --num_poll_handles_; +} + +PollPoller::PollPoller(Scheduler* scheduler) + : scheduler_(scheduler), + use_phony_poll_(false), + was_kicked_(false), + was_kicked_ext_(false), + num_poll_handles_(0), + poll_handles_list_head_(nullptr) { + wakeup_fd_ = *CreateWakeupFd(); + GPR_ASSERT(wakeup_fd_ != nullptr); + ForkPollerListAddPoller(this); +} + +PollPoller::PollPoller(Scheduler* scheduler, bool use_phony_poll) + : scheduler_(scheduler), + use_phony_poll_(use_phony_poll), + was_kicked_(false), + was_kicked_ext_(false), + num_poll_handles_(0), + poll_handles_list_head_(nullptr) { + wakeup_fd_ = *CreateWakeupFd(); + GPR_ASSERT(wakeup_fd_ != nullptr); + ForkPollerListAddPoller(this); +} + +PollPoller::~PollPoller() { + // Assert that no active handles are present at the time of destruction. + // They should have been orphaned before reaching this state. + GPR_ASSERT(num_poll_handles_ == 0); + GPR_ASSERT(poll_handles_list_head_ == nullptr); +} + +absl::Status PollPoller::Work(grpc_core::Timestamp deadline, + std::vector& pending_events) { + absl::Status error = absl::OkStatus(); + // Avoid malloc for small number of elements. + enum { inline_elements = 96 }; + struct pollfd pollfd_space[inline_elements]; + PollEventHandle* watcher_space[inline_elements]; + mu_.Lock(); + if (absl::exchange(was_kicked_, false) && + absl::exchange(was_kicked_ext_, false)) { + // External kick. Need to break out. + mu_.Unlock(); + return absl::Status(absl::StatusCode::kInternal, "Kicked"); + } + + // Start polling, and keep doing so while we're being asked to + // re-evaluate our pollers (this allows poll() based pollers to + // ensure they don't miss wakeups). + while (error.ok() && pending_events.empty() && + deadline > grpc_core::Timestamp::FromTimespecRoundDown( + gpr_now(GPR_CLOCK_MONOTONIC))) { + int timeout = PollDeadlineToMillisTimeout(deadline); + int r = 0; + size_t i; + nfds_t pfd_count; + struct pollfd* pfds; + PollEventHandle** watchers; + if (num_poll_handles_ + 2 <= inline_elements) { + pfds = pollfd_space; + watchers = watcher_space; + } else { + const size_t pfd_size = sizeof(*pfds) * (num_poll_handles_ + 2); + const size_t watch_size = sizeof(*watchers) * (num_poll_handles_ + 2); + void* buf = gpr_malloc(pfd_size + watch_size); + pfds = static_cast(buf); + watchers = static_cast( + static_cast((static_cast(buf) + pfd_size))); + pfds = static_cast(buf); + } + + pfd_count = 1; + pfds[0].fd = wakeup_fd_->ReadFd(); + pfds[0].events = POLLIN; + pfds[0].revents = 0; + PollEventHandle* head = poll_handles_list_head_; + while (head != nullptr) { + { + absl::MutexLock lock(head->mu()); + // There shouldn't be any orphaned fds at this point. This is because + // prior to marking a handle as orphaned it is first removed from + // poll handle list for the poller under the poller lock. + GPR_ASSERT(!head->IsOrphaned()); + if (!head->IsPollhup()) { + pfds[pfd_count].fd = head->WrappedFd(); + watchers[pfd_count] = head; + // BeginPollLocked takes a ref of the handle. It also marks the + // fd as Watched with an appropriate watch_mask. The watch_mask + // is 0 if the fd is shutdown or if the fd is already ready (i.e + // both read and write events are already available) and doesn't + // need to be polled again. The watch_mask is > 0 otherwise + // indicating the fd needs to be polled. + pfds[pfd_count].events = head->BeginPollLocked(POLLIN, POLLOUT); + pfd_count++; + } + } + head = head->PollerHandlesListPos().next; + } + mu_.Unlock(); + + if (!use_phony_poll_ || timeout == 0) { + r = poll(pfds, pfd_count, timeout); + } else { + gpr_log(GPR_ERROR, + "Attempted a blocking poll when declared non-polling."); + GPR_ASSERT(false); + } + + if (r <= 0) { + if (r < 0 && errno != EINTR) { + // Save the error code + error = absl::Status(absl::StatusCode::kInternal, + absl::StrCat("poll: ", strerror(errno))); + } + + for (i = 1; i < pfd_count; i++) { + PollEventHandle* head = watchers[i]; + int watch_mask; + absl::ReleasableMutexLock lock(head->mu()); + if (head->IsWatched(watch_mask)) { + head->SetWatched(-1); + // This fd was Watched with a watch mask > 0. + if (watch_mask > 0 && r < 0) { + // This case implies the fd was polled (since watch_mask > 0 and the + // poll returned an error. Mark the fds as both readable and + // writable. + if (head->EndPollLocked(1, 1)) { + // Its safe to add to list of pending events because EndPollLocked + // returns a +ve number only when the handle is not orphaned. + // But an orphan might be initiated on the handle after this + // Work() method returns and before the next Work() method is + // invoked. To prevent the handle from being destroyed until the + // pending events are processed, take a Ref() of the handle. This + // Ref() will be Unref'ed in ExecutePendingActions. + head->Ref(); + pending_events.push_back(head); + } + } else { + // In this case, (1) watch_mask > 0 && r == 0 or (2) watch_mask == 0 + // and r < 0 or (3) watch_mask == 0 and r == 0. + // For case-1, no events are pending on the fd even though the fd + // was polled. For case-2 and 3, the fd was not polled + head->EndPollLocked(0, 0); + } + } else { + // It can enter this case if an orphan was invoked on the handle + // while it was being polled. + head->EndPollLocked(0, 0); + } + lock.Release(); + // Unref the ref taken at BeginPollLocked. + head->Unref(); + } + } else { + if (pfds[0].revents & kPollinCheck) { + GPR_ASSERT(wakeup_fd_->ConsumeWakeup().ok()); + } + for (i = 1; i < pfd_count; i++) { + PollEventHandle* head = watchers[i]; + int watch_mask; + absl::ReleasableMutexLock lock(head->mu()); + if (!head->IsWatched(watch_mask) || watch_mask == 0) { + // IsWatched will be false if an orphan was invoked on the + // handle while it was being polled. If watch_mask is 0, then the fd + // was not polled. + head->SetWatched(-1); + head->EndPollLocked(0, 0); + } else { + // Watched is true and watch_mask > 0 + if (pfds[i].revents & POLLHUP) { + head->SetPollhup(true); + } + head->SetWatched(-1); + if (head->EndPollLocked(pfds[i].revents & kPollinCheck, + pfds[i].revents & kPolloutCheck)) { + // Its safe to add to list of pending events because EndPollLocked + // returns a +ve number only when the handle is not orphaned. + // But an orphan might be initiated on the handle after this + // Work() method returns and before the next Work() method is + // invoked. To prevent the handle from being destroyed until the + // pending events are processed, take a Ref() of the handle. This + // Ref() will be Unref'ed in ExecutePendingActions. + head->Ref(); + pending_events.push_back(head); + } + } + lock.Release(); + // Unref the ref taken at BeginPollLocked. + head->Unref(); + } + } + + if (pfds != pollfd_space) { + gpr_free(pfds); + } + mu_.Lock(); + if (absl::exchange(was_kicked_, false) && + absl::exchange(was_kicked_ext_, false)) { + // External kick. Need to break out. + error = absl::Status(absl::StatusCode::kInternal, "Kicked"); + break; + } + } + mu_.Unlock(); + return error; +} + +void PollPoller::Shutdown() { + ForkPollerListRemovePoller(this); + Unref(); +} + +PollPoller* GetPollPoller(Scheduler* scheduler, bool use_phony_poll) { + static bool kPollPollerSupported = InitPollPollerPosix(); + if (kPollPollerSupported) { + return new PollPoller(scheduler, use_phony_poll); + } + return nullptr; +} + +} // namespace iomgr_engine +} // namespace grpc_event_engine + +#else /* GRPC_POSIX_SOCKET_EV_POLL */ + +namespace grpc_event_engine { +namespace iomgr_engine { + +PollPoller::PollPoller(Scheduler* /* engine */) { + GPR_ASSERT(false && "unimplemented"); +} + +void PollPoller::Shutdown() { GPR_ASSERT(false && "unimplemented"); } + +PollPoller::~PollPoller() { GPR_ASSERT(false && "unimplemented"); } + +EventHandle* PollPoller::CreateHandle(int /*fd*/, absl::string_view /*name*/, + bool /*track_err*/) { + GPR_ASSERT(false && "unimplemented"); +} + +absl::Status PollPoller::Work(grpc_core::Timestamp /*deadline*/, + std::vector& /*pending_events*/) { + GPR_ASSERT(false && "unimplemented"); +} + +void PollPoller::Kick() { GPR_ASSERT(false && "unimplemented"); } + +// If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return +// nullptr. +PollPoller* GetPollPoller(Scheduler* /*scheduler*/, bool /* use_phony_poll */) { + return nullptr; +} + +void PollPoller::KickExternal(bool /*ext*/) { + GPR_ASSERT(false && "unimplemented"); +} + +void PollPoller::PollerHandlesListAddHandle(PollEventHandle* /*handle*/) { + GPR_ASSERT(false && "unimplemented"); +} + +void PollPoller::PollerHandlesListRemoveHandle(PollEventHandle* /*handle*/) { + GPR_ASSERT(false && "unimplemented"); +} + +} // namespace iomgr_engine +} // namespace grpc_event_engine + +#endif /* GRPC_POSIX_SOCKET_EV_POLL */ diff --git a/src/core/lib/event_engine/iomgr_engine/ev_poll_posix.h b/src/core/lib/event_engine/iomgr_engine/ev_poll_posix.h new file mode 100644 index 00000000000..4e9daa9a7eb --- /dev/null +++ b/src/core/lib/event_engine/iomgr_engine/ev_poll_posix.h @@ -0,0 +1,89 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EV_POLL_POSIX_H +#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EV_POLL_POSIX_H + +#include + +#include +#include +#include + +#include "absl/base/thread_annotations.h" +#include "absl/status/status.h" +#include "absl/strings/string_view.h" +#include "absl/synchronization/mutex.h" + +#include "src/core/lib/event_engine/iomgr_engine/event_poller.h" +#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h" +#include "src/core/lib/gprpp/time.h" + +namespace grpc_event_engine { +namespace iomgr_engine { + +class PollEventHandle; + +// Definition of poll based poller. +class PollPoller : public EventPoller { + public: + explicit PollPoller(Scheduler* scheduler); + PollPoller(Scheduler* scheduler, bool use_phony_poll); + EventHandle* CreateHandle(int fd, absl::string_view name, + bool track_err) override; + absl::Status Work(grpc_core::Timestamp deadline, + std::vector& pending_events) override; + void Kick() override; + Scheduler* GetScheduler() { return scheduler_; } + void Shutdown() override; + ~PollPoller() override; + + private: + void Ref() { ref_count_.fetch_add(1, std::memory_order_relaxed); } + void Unref() { + if (ref_count_.fetch_sub(1, std::memory_order_acq_rel) == 1) { + delete this; + } + } + void KickExternal(bool ext); + void PollerHandlesListAddHandle(PollEventHandle* handle) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + void PollerHandlesListRemoveHandle(PollEventHandle* handle) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + friend class PollEventHandle; + struct HandlesList { + PollEventHandle* handle; + PollEventHandle* next; + PollEventHandle* prev; + }; + absl::Mutex mu_; + Scheduler* scheduler_; + std::atomic ref_count_{1}; + bool use_phony_poll_; + bool was_kicked_ ABSL_GUARDED_BY(mu_); + bool was_kicked_ext_ ABSL_GUARDED_BY(mu_); + int num_poll_handles_ ABSL_GUARDED_BY(mu_); + PollEventHandle* poll_handles_list_head_ ABSL_GUARDED_BY(mu_) = nullptr; + std::unique_ptr wakeup_fd_; +}; + +// Return an instance of a poll based poller tied to the specified scheduler. +// It use_phony_poll is true, it implies that the poller is declared non-polling +// and any attempt to schedule a blocking poll will result in a crash failure. +PollPoller* GetPollPoller(Scheduler* scheduler, bool use_phony_poll); + +} // namespace iomgr_engine +} // namespace grpc_event_engine + +#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EV_POLL_POSIX_H \ No newline at end of file diff --git a/src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.cc b/src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.cc index 113cdc558b5..9e01c5c5fe6 100644 --- a/src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.cc +++ b/src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.cc @@ -14,14 +14,43 @@ #include +#include "absl/strings/str_split.h" +#include "absl/strings/string_view.h" + #include "src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.h" +#include "src/core/lib/event_engine/iomgr_engine/ev_poll_posix.h" #include "src/core/lib/event_engine/iomgr_engine/event_poller.h" +#include "src/core/lib/gprpp/global_config.h" +#include "src/core/lib/gprpp/memory.h" + +GPR_GLOBAL_CONFIG_DECLARE_STRING(grpc_poll_strategy); namespace grpc_event_engine { namespace iomgr_engine { +namespace { + +bool PollStrategyMatches(absl::string_view strategy, absl::string_view want) { + return strategy == "all" || strategy == want; +} + +} // namespace + EventPoller* GetDefaultPoller(Scheduler* scheduler) { - EventPoller* poller = GetEpoll1Poller(scheduler); + grpc_core::UniquePtr poll_strategy = + GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy); + EventPoller* poller = nullptr; + auto strings = absl::StrSplit(poll_strategy.get(), ','); + for (auto it = strings.begin(); it != strings.end() && poller == nullptr; + it++) { + if (PollStrategyMatches(*it, "epoll1")) { + poller = GetEpoll1Poller(scheduler); + } else if (PollStrategyMatches(*it, "poll")) { + poller = GetPollPoller(scheduler, /*use_phony_poll=*/false); + } else if (PollStrategyMatches(*it, "none")) { + poller = GetPollPoller(scheduler, /*use_phony_poll=*/true); + } + } return poller; } diff --git a/test/core/event_engine/iomgr_event_engine/event_poller_posix_test.cc b/test/core/event_engine/iomgr_event_engine/event_poller_posix_test.cc index 8dd0d541b1a..ed75bbe3086 100644 --- a/test/core/event_engine/iomgr_event_engine/event_poller_posix_test.cc +++ b/test/core/event_engine/iomgr_event_engine/event_poller_posix_test.cc @@ -46,9 +46,11 @@ #include "src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.h" #include "src/core/lib/event_engine/iomgr_engine/iomgr_engine.h" #include "src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h" -#include "src/core/lib/iomgr/socket_utils_posix.h" +#include "src/core/lib/gprpp/global_config.h" #include "test/core/util/port.h" +GPR_GLOBAL_CONFIG_DECLARE_STRING(grpc_poll_strategy); + using ::grpc_event_engine::iomgr_engine::EventPoller; static gpr_mu g_mu; @@ -80,6 +82,13 @@ class TestScheduler : public Scheduler { experimental::EventEngine* engine_; }; +absl::Status SetSocketSendBuf(int fd, int buffer_size_bytes) { + return 0 == setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buffer_size_bytes, + sizeof(buffer_size_bytes)) + ? absl::OkStatus() + : absl::Status(absl::StatusCode::kInternal, strerror(errno)); +} + // Create a test socket with the right properties for testing. // port is the TCP port to listen or connect to. // Return a socket FD and sockaddr_in. @@ -93,8 +102,8 @@ void CreateTestSocket(int port, int* socket_fd, struct sockaddr_in6* sin) { setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); // Reset the size of socket send buffer to the minimal value to facilitate // buffer filling up and triggering notify_on_write - EXPECT_EQ(grpc_set_socket_sndbuf(fd, buffer_size_bytes), GRPC_ERROR_NONE); - EXPECT_EQ(grpc_set_socket_rcvbuf(fd, buffer_size_bytes), GRPC_ERROR_NONE); + EXPECT_TRUE(SetSocketSendBuf(fd, buffer_size_bytes).ok()); + EXPECT_TRUE(SetSocketSendBuf(fd, buffer_size_bytes).ok()); // Make fd non-blocking. flags = fcntl(fd, F_GETFL, 0); EXPECT_EQ(fcntl(fd, F_SETFL, flags | O_NONBLOCK), 0); @@ -357,14 +366,44 @@ void WaitAndShutdown(server* sv, client* cl) { gpr_mu_unlock(&g_mu); } +std::string TestScenarioName( + const ::testing::TestParamInfo& info) { + return info.param; +} + +class EventPollerTest : public ::testing::TestWithParam { + void SetUp() override { + engine_ = + absl::make_unique(); + EXPECT_NE(engine_, nullptr); + scheduler_ = + absl::make_unique( + engine_.get()); + EXPECT_NE(scheduler_, nullptr); + GPR_GLOBAL_CONFIG_SET(grpc_poll_strategy, GetParam().c_str()); + g_event_poller = GetDefaultPoller(scheduler_.get()); + } + void TearDown() override { + if (g_event_poller != nullptr) { + g_event_poller->Shutdown(); + } + } + + private: + std::unique_ptr engine_; + std::unique_ptr scheduler_; +}; + // Test grpc_fd. Start an upload server and client, upload a stream of bytes -// from the client to the server, and verify that the total number of sent bytes -// is equal to the total number of received bytes. -TEST(EventPollerTest, TestEventPollerHandle) { +// from the client to the server, and verify that the total number of sent +// bytes is equal to the total number of received bytes. +TEST_P(EventPollerTest, TestEventPollerHandle) { server sv; client cl; int port; - + if (g_event_poller == nullptr) { + return; + } ServerInit(&sv); port = ServerStart(&sv); ClientInit(&cl); @@ -400,13 +439,16 @@ void SecondReadCallback(FdChangeData* fdc, absl::Status /*status*/) { // Note that we have two different but almost identical callbacks above -- the // point is to have two different function pointers and two different data // pointers and make sure that changing both really works. -TEST(EventPollerTest, TestEventPollerHandleChange) { +TEST_P(EventPollerTest, TestEventPollerHandleChange) { EventHandle* em_fd; FdChangeData a, b; int flags; int sv[2]; char data; ssize_t result; + if (g_event_poller == nullptr) { + return; + } IomgrEngineClosure* first_closure = IomgrEngineClosure::TestOnlyToClosure( [a = &a](absl::Status status) { FirstReadCallback(a, status); }); IomgrEngineClosure* second_closure = IomgrEngineClosure::TestOnlyToClosure( @@ -471,25 +513,19 @@ TEST(EventPollerTest, TestEventPollerHandleChange) { close(sv[1]); } +INSTANTIATE_TEST_SUITE_P(EventPoller, EventPollerTest, + ::testing::ValuesIn({std::string("epoll1"), + std::string("poll")}), + &TestScenarioName); + } // namespace } // namespace iomgr_engine } // namespace grpc_event_engine int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); - auto engine = - absl::make_unique(); - EXPECT_NE(engine, nullptr); - grpc_event_engine::iomgr_engine::TestScheduler scheduler(engine.get()); - g_event_poller = - grpc_event_engine::iomgr_engine::GetDefaultPoller(&scheduler); - if (g_event_poller == nullptr) { - // Poller is not supported on this system. - return 0; - } - int result = RUN_ALL_TESTS(); - g_event_poller->Shutdown(); - return result; + gpr_mu_init(&g_mu); + return RUN_ALL_TESTS(); } #else /* GRPC_POSIX_SOCKET_EV */